|
|
|
@ -2,6 +2,7 @@ import json
|
|
|
|
|
import logging
|
|
|
|
|
import re
|
|
|
|
|
import secrets
|
|
|
|
|
import sys
|
|
|
|
|
import urllib
|
|
|
|
|
from pprint import pformat
|
|
|
|
|
from urllib import parse
|
|
|
|
@ -10,7 +11,7 @@ from yarl import URL
|
|
|
|
|
|
|
|
|
|
from pyhon import const
|
|
|
|
|
|
|
|
|
|
_LOGGER = logging.getLogger()
|
|
|
|
|
_LOGGER = logging.getLogger(__name__)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class HonAuth:
|
|
|
|
@ -56,19 +57,23 @@ class HonAuth:
|
|
|
|
|
params = "&".join([f"{k}={v}" for k, v in params.items()])
|
|
|
|
|
async with self._session.get(
|
|
|
|
|
f"{const.AUTH_API}/services/oauth2/authorize/expid_Login?{params}"
|
|
|
|
|
) as resp:
|
|
|
|
|
if not (login_url := re.findall("url = '(.+?)'", await resp.text())):
|
|
|
|
|
) as response:
|
|
|
|
|
_LOGGER.debug("%s - %s", response.status, response.request_info.url)
|
|
|
|
|
if not (login_url := re.findall("url = '(.+?)'", await response.text())):
|
|
|
|
|
return False
|
|
|
|
|
async with self._session.get(login_url[0], allow_redirects=False) as redirect1:
|
|
|
|
|
_LOGGER.debug("%s - %s", redirect1.status, redirect1.request_info.url)
|
|
|
|
|
if not (url := redirect1.headers.get("Location")):
|
|
|
|
|
return False
|
|
|
|
|
async with self._session.get(url, allow_redirects=False) as redirect2:
|
|
|
|
|
_LOGGER.debug("%s - %s", redirect2.status, redirect2.request_info.url)
|
|
|
|
|
if not (
|
|
|
|
|
url := redirect2.headers.get("Location")
|
|
|
|
|
+ "&System=IoT_Mobile_App&RegistrationSubChannel=hOn"
|
|
|
|
|
):
|
|
|
|
|
return False
|
|
|
|
|
async with self._session.get(URL(url, encoded=True)) as login_screen:
|
|
|
|
|
_LOGGER.debug("%s - %s", login_screen.status, login_screen.request_info.url)
|
|
|
|
|
if context := re.findall(
|
|
|
|
|
'"fwuid":"(.*?)","loaded":(\\{.*?})', await login_screen.text()
|
|
|
|
|
):
|
|
|
|
@ -117,6 +122,7 @@ class HonAuth:
|
|
|
|
|
data="&".join(f"{k}={json.dumps(v)}" for k, v in data.items()),
|
|
|
|
|
params=params,
|
|
|
|
|
) as response:
|
|
|
|
|
_LOGGER.debug("%s - %s", response.status, response.request_info.url)
|
|
|
|
|
if response.status == 200:
|
|
|
|
|
try:
|
|
|
|
|
data = await response.json()
|
|
|
|
@ -133,25 +139,31 @@ class HonAuth:
|
|
|
|
|
return ""
|
|
|
|
|
|
|
|
|
|
async def _get_token(self, url):
|
|
|
|
|
async with self._session.get(url) as resp:
|
|
|
|
|
if resp.status != 200:
|
|
|
|
|
_LOGGER.error("Unable to get token: %s", resp.status)
|
|
|
|
|
async with self._session.get(url) as response:
|
|
|
|
|
_LOGGER.debug("%s - %s", response.status, response.request_info.url)
|
|
|
|
|
if response.status != 200:
|
|
|
|
|
_LOGGER.error("Unable to get token: %s", response.status)
|
|
|
|
|
return False
|
|
|
|
|
url = re.findall("href\\s*=\\s*[\"'](http.+?)[\"']", await resp.text())
|
|
|
|
|
if not url:
|
|
|
|
|
_LOGGER.error("Can't get login url - \n%s", await resp.text())
|
|
|
|
|
raise PermissionError
|
|
|
|
|
async with self._session.get(url[0]) as resp:
|
|
|
|
|
if resp.status != 200:
|
|
|
|
|
_LOGGER.error("Unable to get token: %s", resp.status)
|
|
|
|
|
url = re.findall("href\\s*=\\s*[\"'](.+?)[\"']", await response.text())
|
|
|
|
|
if not url:
|
|
|
|
|
_LOGGER.error("Can't get login url - \n%s", await response.text())
|
|
|
|
|
raise PermissionError
|
|
|
|
|
if "ProgressiveLogin" in url[0]:
|
|
|
|
|
async with self._session.get(url[0]) as response:
|
|
|
|
|
_LOGGER.debug("%s - %s", response.status, response.request_info.url)
|
|
|
|
|
if response.status != 200:
|
|
|
|
|
_LOGGER.error("Unable to get token: %s", response.status)
|
|
|
|
|
return False
|
|
|
|
|
url = re.findall("href\\s*=\\s*[\"'](.*?)[\"']", await response.text())
|
|
|
|
|
url = "/".join(const.AUTH_API.split("/")[:-1]) + url[0]
|
|
|
|
|
async with self._session.get(url) as response:
|
|
|
|
|
_LOGGER.debug("%s - %s", response.status, response.request_info.url)
|
|
|
|
|
if response.status != 200:
|
|
|
|
|
_LOGGER.error(
|
|
|
|
|
"Unable to connect to the login service: %s", response.status
|
|
|
|
|
)
|
|
|
|
|
return False
|
|
|
|
|
url = re.findall("href\\s*=\\s*[\"'](.*?)[\"']", await resp.text())
|
|
|
|
|
url = "/".join(const.AUTH_API.split("/")[:-1]) + url[0]
|
|
|
|
|
async with self._session.get(url) as resp:
|
|
|
|
|
if resp.status != 200:
|
|
|
|
|
_LOGGER.error("Unable to connect to the login service: %s", resp.status)
|
|
|
|
|
return False
|
|
|
|
|
text = await resp.text()
|
|
|
|
|
text = await response.text()
|
|
|
|
|
if access_token := re.findall("access_token=(.*?)&", text):
|
|
|
|
|
self._access_token = access_token[0]
|
|
|
|
|
if refresh_token := re.findall("refresh_token=(.*?)&", text):
|
|
|
|
@ -174,11 +186,12 @@ class HonAuth:
|
|
|
|
|
data = self._device.get()
|
|
|
|
|
async with self._session.post(
|
|
|
|
|
f"{const.API_URL}/auth/v1/login", headers=post_headers, json=data
|
|
|
|
|
) as resp:
|
|
|
|
|
) as response:
|
|
|
|
|
_LOGGER.debug("%s - %s", response.status, response.request_info.url)
|
|
|
|
|
try:
|
|
|
|
|
json_data = await resp.json()
|
|
|
|
|
json_data = await response.json()
|
|
|
|
|
except json.JSONDecodeError:
|
|
|
|
|
_LOGGER.error("No JSON Data after POST: %s", await resp.text())
|
|
|
|
|
_LOGGER.error("No JSON Data after POST: %s", await response.text())
|
|
|
|
|
return False
|
|
|
|
|
self._cognito_token = json_data["cognitoUser"]["Token"]
|
|
|
|
|
return True
|
|
|
|
@ -191,10 +204,11 @@ class HonAuth:
|
|
|
|
|
}
|
|
|
|
|
async with self._session.post(
|
|
|
|
|
f"{const.AUTH_API}/services/oauth2/token", params=params
|
|
|
|
|
) as resp:
|
|
|
|
|
if resp.status >= 400:
|
|
|
|
|
) as response:
|
|
|
|
|
_LOGGER.debug("%s - %s", response.status, response.request_info.url)
|
|
|
|
|
if response.status >= 400:
|
|
|
|
|
return False
|
|
|
|
|
data = await resp.json()
|
|
|
|
|
data = await response.json()
|
|
|
|
|
self._id_token = data["id_token"]
|
|
|
|
|
self._access_token = data["access_token"]
|
|
|
|
|
return True
|
|
|
|
|