Compare commits

...

3 Commits

Author SHA1 Message Date
970b94bfa7 Fix unclear session errors 2023-04-12 19:14:50 +02:00
33454f68b8 Encode username/password 2023-04-12 02:10:37 +02:00
6b2c60d552 Fix session issues 2023-04-12 01:07:33 +02:00
4 changed files with 58 additions and 39 deletions

View File

@ -5,11 +5,11 @@ import secrets
import urllib
from pprint import pformat
from urllib import parse
from urllib.parse import quote
from yarl import URL
from pyhon import const
from pyhon.exceptions import HonAuthenticationError
from pyhon import const, exceptions
_LOGGER = logging.getLogger(__name__)
@ -50,7 +50,7 @@ class HonAuth:
result += f"{15 * '='} Response {15 * '='}\n{await response.text()}\n{40 * '='}"
_LOGGER.error(result)
if fail:
raise HonAuthenticationError("Can't login")
raise exceptions.HonAuthenticationError("Can't login")
async def _load_login(self):
nonce = secrets.token_hex(16)
@ -70,7 +70,11 @@ class HonAuth:
f"{const.AUTH_API}/services/oauth2/authorize/expid_Login?{params}"
) as response:
self._called_urls.append((response.status, response.request_info.url))
if not (login_url := re.findall("url = '(.+?)'", await response.text())):
text = await response.text()
if not (login_url := re.findall("url = '(.+?)'", text)):
if "oauth/done#access_token=" in text:
self._parse_token_data(text)
raise exceptions.HonNoAuthenticationNeeded()
await self._error_logger(response)
return False
async with self._session.get(login_url[0], allow_redirects=False) as redirect1:
@ -86,7 +90,9 @@ class HonAuth:
):
await self._error_logger(redirect2)
return False
async with self._session.get(URL(url, encoded=True)) as login_screen:
async with self._session.get(
URL(url, encoded=True), headers={"user-agent": const.USER_AGENT}
) as login_screen:
self._called_urls.append(
(login_screen.status, login_screen.request_info.url)
)
@ -111,8 +117,8 @@ class HonAuth:
"descriptor": "apex://LightningLoginCustomController/ACTION$login",
"callingDescriptor": "markup://c:loginForm",
"params": {
"username": self._email,
"password": self._password,
"username": quote(self._email),
"password": quote(self._password),
"startUrl": parse.unquote(
login_url.split("startURL=")[-1]
).split("%3D")[0],
@ -153,6 +159,14 @@ class HonAuth:
await self._error_logger(response)
return ""
def _parse_token_data(self, text):
if access_token := re.findall("access_token=(.*?)&", text):
self._access_token = access_token[0]
if refresh_token := re.findall("refresh_token=(.*?)&", text):
self._refresh_token = refresh_token[0]
if id_token := re.findall("id_token=(.*?)&", text):
self._id_token = id_token[0]
async def _get_token(self, url):
async with self._session.get(url) as response:
self._called_urls.append((response.status, response.request_info.url))
@ -176,26 +190,9 @@ class HonAuth:
if response.status != 200:
await self._error_logger(response)
return False
text = await response.text()
if access_token := re.findall("access_token=(.*?)&", text):
self._access_token = access_token[0]
if refresh_token := re.findall("refresh_token=(.*?)&", text):
self._refresh_token = refresh_token[0]
if id_token := re.findall("id_token=(.*?)&", text):
self._id_token = id_token[0]
self._parse_token_data(await response.text())
return True
async def authorize(self):
if login_site := await self._load_login():
fw_uid, loaded, login_url = login_site
else:
return False
if not (url := await self._login(fw_uid, loaded, login_url)):
return False
if not await self._get_token(url):
return False
return await self._api_auth()
async def _api_auth(self):
post_headers = {"id-token": self._id_token}
data = self._device.get()
@ -211,6 +208,20 @@ class HonAuth:
self._cognito_token = json_data["cognitoUser"]["Token"]
return True
async def authenticate(self):
self.clear()
try:
if not (login_site := await self._load_login()):
raise exceptions.HonAuthenticationError("Can't open login page")
if not (url := await self._login(*login_site)):
raise exceptions.HonAuthenticationError("Can't login")
if not await self._get_token(url):
raise exceptions.HonAuthenticationError("Can't get token")
if not await self._api_auth():
raise exceptions.HonAuthenticationError("Can't get api token")
except exceptions.HonNoAuthenticationNeeded:
return
async def refresh(self):
params = {
"client_id": const.CLIENT_ID,
@ -228,3 +239,10 @@ class HonAuth:
self._id_token = data["id_token"]
self._access_token = data["access_token"]
return await self._api_auth()
def clear(self):
self._session.cookie_jar.clear_domain(const.AUTH_API.split("/")[-2])
self._cognito_token = ""
self._id_token = ""
self._access_token = ""
self._refresh_token = ""

View File

@ -13,6 +13,7 @@ class HonBaseConnectionHandler:
_HEADERS = {"user-agent": const.USER_AGENT, "Content-Type": "application/json"}
def __init__(self, session=None):
self._create_session = session is None
self._session = session
self._auth = None
@ -23,7 +24,8 @@ class HonBaseConnectionHandler:
await self.close()
async def create(self):
self._session = aiohttp.ClientSession(headers=self._HEADERS)
if self._create_session:
self._session = aiohttp.ClientSession()
return self
@asynccontextmanager
@ -41,6 +43,7 @@ class HonBaseConnectionHandler:
yield response
async def close(self):
if self._create_session:
await self._session.close()
@ -54,7 +57,6 @@ class HonConnectionHandler(HonBaseConnectionHandler):
raise HonAuthenticationError("An email address must be specified")
if not self._password:
raise HonAuthenticationError("A password address must be specified")
self._request_headers = {}
@property
def device(self):
@ -66,16 +68,11 @@ class HonConnectionHandler(HonBaseConnectionHandler):
return self
async def _check_headers(self, headers):
if (
"cognito-token" not in self._request_headers
or "id-token" not in self._request_headers
):
if await self._auth.authorize():
self._request_headers["cognito-token"] = self._auth.cognito_token
self._request_headers["id-token"] = self._auth.id_token
else:
raise HonAuthenticationError("Can't login")
return headers | self._request_headers
if not (self._auth.cognito_token and self._auth.id_token):
await self._auth.authenticate()
headers["cognito-token"] = self._auth.cognito_token
headers["id-token"] = self._auth.id_token
return self._HEADERS | headers
@asynccontextmanager
async def _intercept(self, method, *args, loop=0, **kwargs):

View File

@ -1,2 +1,6 @@
class HonAuthenticationError(Exception):
pass
class HonNoAuthenticationNeeded(Exception):
pass

View File

@ -7,7 +7,7 @@ with open("README.md", "r") as f:
setup(
name="pyhOn",
version="0.7.0",
version="0.7.3",
author="Andre Basche",
description="Control hOn devices with python",
long_description=long_description,