Compare commits

...

6 Commits

Author SHA1 Message Date
970b94bfa7 Fix unclear session errors 2023-04-12 19:14:50 +02:00
33454f68b8 Encode username/password 2023-04-12 02:10:37 +02:00
6b2c60d552 Fix session issues 2023-04-12 01:07:33 +02:00
46e6a85e84 add diagnose property for devices 2023-04-11 22:14:36 +02:00
8c832b44cd Fix token refresh problems 2023-04-11 17:09:02 +02:00
b4b782c52c Improve logging for authentication errors 2023-04-11 00:59:00 +02:00
8 changed files with 217 additions and 140 deletions

83
pyhon/__main__.py Executable file → Normal file
View File

@ -10,7 +10,7 @@ from pathlib import Path
if __name__ == "__main__":
sys.path.insert(0, str(Path(__file__).parent.parent))
from pyhon import Hon, HonAPI
from pyhon import Hon, HonAPI, helper
_LOGGER = logging.getLogger(__name__)
@ -34,61 +34,6 @@ def get_arguments():
return vars(parser.parse_args())
# yaml.dump() would be done the same, but needs an additional dependency...
def pretty_print(data, key="", intend=0, is_list=False):
if type(data) is list:
if key:
print(f"{' ' * intend}{'- ' if is_list else ''}{key}:")
intend += 1
for i, value in enumerate(data):
pretty_print(value, intend=intend, is_list=True)
elif type(data) is dict:
if key:
print(f"{' ' * intend}{'- ' if is_list else ''}{key}:")
intend += 1
for i, (key, value) in enumerate(sorted(data.items())):
if is_list and not i:
pretty_print(value, key=key, intend=intend, is_list=True)
elif is_list:
pretty_print(value, key=key, intend=intend + 1)
else:
pretty_print(value, key=key, intend=intend)
else:
print(
f"{' ' * intend}{'- ' if is_list else ''}{key}{': ' if key else ''}{data}"
)
def key_print(data, key="", start=True):
if type(data) is list:
for i, value in enumerate(data):
key_print(value, key=f"{key}.{i}", start=False)
elif type(data) is dict:
for k, value in sorted(data.items()):
key_print(value, key=k if start else f"{key}.{k}", start=False)
else:
print(f"{key}: {data}")
def create_command(commands, concat=False):
result = {}
for name, command in commands.items():
if not concat:
result[name] = {}
for parameter, data in command.parameters.items():
if data.typology == "enum":
value = data.values
elif data.typology == "range":
value = {"min": data.min, "max": data.max, "step": data.step}
else:
continue
if not concat:
result[name][parameter] = value
else:
result[f"{name}.{parameter}"] = value
return result
async def translate(language, json_output=False):
async with HonAPI(anonymous=True) as hon:
keys = await hon.translation_keys(language)
@ -102,7 +47,7 @@ async def translate(language, json_output=False):
.replace("\\r", "")
)
keys = json.loads(clean_keys)
pretty_print(keys)
print(helper.pretty_print(keys))
async def main():
@ -120,13 +65,25 @@ async def main():
if args.get("keys"):
data = device.data.copy()
attr = "get" if args.get("all") else "pop"
key_print(data["attributes"].__getattribute__(attr)("parameters"))
key_print(data.__getattribute__(attr)("appliance"))
key_print(data)
pretty_print(create_command(device.commands, concat=True))
print(
helper.key_print(
data["attributes"].__getattribute__(attr)("parameters")
)
)
print(helper.key_print(data.__getattribute__(attr)("appliance")))
print(helper.key_print(data))
print(
helper.pretty_print(
helper.create_command(device.commands, concat=True)
)
)
else:
pretty_print({"data": device.data})
pretty_print({"settings": create_command(device.commands)})
print(helper.pretty_print({"data": device.data}))
print(
helper.pretty_print(
{"settings": helper.create_command(device.commands)}
)
)
def start():

View File

@ -1,6 +1,7 @@
import importlib
from contextlib import suppress
from pyhon import helper
from pyhon.commands import HonCommand
from pyhon.parameter import HonParameterFixed
@ -172,3 +173,15 @@ class HonAppliance:
if self._extra:
return self._extra.data(result)
return result
@property
def diagnose(self):
data = self.data.copy()
for sensible in ["PK", "SK", "serialNumber", "code"]:
data["appliance"].pop(sensible, None)
result = helper.pretty_print({"data": self.data}, whitespace="\u200B \u200B ")
result += helper.pretty_print(
{"commands": helper.create_command(self.commands)},
whitespace="\u200B \u200B ",
)
return result.replace(self.mac_address, "12-34-56-78-90-ab")

View File

@ -2,14 +2,14 @@ import json
import logging
import re
import secrets
import sys
import urllib
from pprint import pformat
from urllib import parse
from urllib.parse import quote
from yarl import URL
from pyhon import const
from pyhon import const, exceptions
_LOGGER = logging.getLogger(__name__)
@ -24,6 +24,7 @@ class HonAuth:
self._cognito_token = ""
self._id_token = ""
self._device = device
self._called_urls = []
@property
def cognito_token(self):
@ -41,6 +42,16 @@ class HonAuth:
def refresh_token(self):
return self._refresh_token
async def _error_logger(self, response, fail=True):
result = "hOn Authentication Error\n"
for i, (status, url) in enumerate(self._called_urls):
result += f" {i + 1: 2d} {status} - {url}\n"
result += f"ERROR - {response.status} - {response.request_info.url}\n"
result += f"{15 * '='} Response {15 * '='}\n{await response.text()}\n{40 * '='}"
_LOGGER.error(result)
if fail:
raise exceptions.HonAuthenticationError("Can't login")
async def _load_login(self):
nonce = secrets.token_hex(16)
nonce = f"{nonce[:8]}-{nonce[8:12]}-{nonce[12:16]}-{nonce[16:20]}-{nonce[20:]}"
@ -58,22 +69,33 @@ class HonAuth:
async with self._session.get(
f"{const.AUTH_API}/services/oauth2/authorize/expid_Login?{params}"
) as response:
_LOGGER.debug("%s - %s", response.status, response.request_info.url)
if not (login_url := re.findall("url = '(.+?)'", await response.text())):
self._called_urls.append((response.status, response.request_info.url))
text = await response.text()
if not (login_url := re.findall("url = '(.+?)'", text)):
if "oauth/done#access_token=" in text:
self._parse_token_data(text)
raise exceptions.HonNoAuthenticationNeeded()
await self._error_logger(response)
return False
async with self._session.get(login_url[0], allow_redirects=False) as redirect1:
_LOGGER.debug("%s - %s", redirect1.status, redirect1.request_info.url)
self._called_urls.append((redirect1.status, redirect1.request_info.url))
if not (url := redirect1.headers.get("Location")):
await self._error_logger(redirect1)
return False
async with self._session.get(url, allow_redirects=False) as redirect2:
_LOGGER.debug("%s - %s", redirect2.status, redirect2.request_info.url)
self._called_urls.append((redirect2.status, redirect2.request_info.url))
if not (
url := redirect2.headers.get("Location")
+ "&System=IoT_Mobile_App&RegistrationSubChannel=hOn"
):
await self._error_logger(redirect2)
return False
async with self._session.get(URL(url, encoded=True)) as login_screen:
_LOGGER.debug("%s - %s", login_screen.status, login_screen.request_info.url)
async with self._session.get(
URL(url, encoded=True), headers={"user-agent": const.USER_AGENT}
) as login_screen:
self._called_urls.append(
(login_screen.status, login_screen.request_info.url)
)
if context := re.findall(
'"fwuid":"(.*?)","loaded":(\\{.*?})', await login_screen.text()
):
@ -83,6 +105,7 @@ class HonAuth:
"/".join(const.AUTH_API.split("/")[:-1]), ""
)
return fw_uid, loaded, login_url
await self._error_logger(login_screen)
return False
async def _login(self, fw_uid, loaded, login_url):
@ -94,8 +117,8 @@ class HonAuth:
"descriptor": "apex://LightningLoginCustomController/ACTION$login",
"callingDescriptor": "markup://c:loginForm",
"params": {
"username": self._email,
"password": self._password,
"username": quote(self._email),
"password": quote(self._password),
"startUrl": parse.unquote(
login_url.split("startURL=")[-1]
).split("%3D")[0],
@ -122,7 +145,7 @@ class HonAuth:
data="&".join(f"{k}={json.dumps(v)}" for k, v in data.items()),
params=params,
) as response:
_LOGGER.debug("%s - %s", response.status, response.request_info.url)
self._called_urls.append((response.status, response.request_info.url))
if response.status == 200:
try:
data = await response.json()
@ -133,69 +156,72 @@ class HonAuth:
_LOGGER.error(
"Can't get login url - %s", pformat(await response.json())
)
_LOGGER.error(
"Unable to login: %s\n%s", response.status, await response.text()
)
await self._error_logger(response)
return ""
async def _get_token(self, url):
async with self._session.get(url) as response:
_LOGGER.debug("%s - %s", response.status, response.request_info.url)
if response.status != 200:
_LOGGER.error("Unable to get token: %s", response.status)
return False
url = re.findall("href\\s*=\\s*[\"'](.+?)[\"']", await response.text())
if not url:
_LOGGER.error("Can't get login url - \n%s", await response.text())
raise PermissionError
if "ProgressiveLogin" in url[0]:
async with self._session.get(url[0]) as response:
_LOGGER.debug("%s - %s", response.status, response.request_info.url)
if response.status != 200:
_LOGGER.error("Unable to get token: %s", response.status)
return False
url = re.findall("href\\s*=\\s*[\"'](.*?)[\"']", await response.text())
url = "/".join(const.AUTH_API.split("/")[:-1]) + url[0]
async with self._session.get(url) as response:
_LOGGER.debug("%s - %s", response.status, response.request_info.url)
if response.status != 200:
_LOGGER.error(
"Unable to connect to the login service: %s", response.status
)
return False
text = await response.text()
def _parse_token_data(self, text):
if access_token := re.findall("access_token=(.*?)&", text):
self._access_token = access_token[0]
if refresh_token := re.findall("refresh_token=(.*?)&", text):
self._refresh_token = refresh_token[0]
if id_token := re.findall("id_token=(.*?)&", text):
self._id_token = id_token[0]
async def _get_token(self, url):
async with self._session.get(url) as response:
self._called_urls.append((response.status, response.request_info.url))
if response.status != 200:
await self._error_logger(response)
return False
url = re.findall("href\\s*=\\s*[\"'](.+?)[\"']", await response.text())
if not url:
await self._error_logger(response)
return False
if "ProgressiveLogin" in url[0]:
async with self._session.get(url[0]) as response:
self._called_urls.append((response.status, response.request_info.url))
if response.status != 200:
await self._error_logger(response)
return False
url = re.findall("href\\s*=\\s*[\"'](.*?)[\"']", await response.text())
url = "/".join(const.AUTH_API.split("/")[:-1]) + url[0]
async with self._session.get(url) as response:
self._called_urls.append((response.status, response.request_info.url))
if response.status != 200:
await self._error_logger(response)
return False
self._parse_token_data(await response.text())
return True
async def authorize(self):
if login_site := await self._load_login():
fw_uid, loaded, login_url = login_site
else:
return False
if not (url := await self._login(fw_uid, loaded, login_url)):
return False
if not await self._get_token(url):
return False
async def _api_auth(self):
post_headers = {"id-token": self._id_token}
data = self._device.get()
async with self._session.post(
f"{const.API_URL}/auth/v1/login", headers=post_headers, json=data
) as response:
_LOGGER.debug("%s - %s", response.status, response.request_info.url)
self._called_urls.append((response.status, response.request_info.url))
try:
json_data = await response.json()
except json.JSONDecodeError:
_LOGGER.error("No JSON Data after POST: %s", await response.text())
await self._error_logger(response)
return False
self._cognito_token = json_data["cognitoUser"]["Token"]
return True
async def authenticate(self):
self.clear()
try:
if not (login_site := await self._load_login()):
raise exceptions.HonAuthenticationError("Can't open login page")
if not (url := await self._login(*login_site)):
raise exceptions.HonAuthenticationError("Can't login")
if not await self._get_token(url):
raise exceptions.HonAuthenticationError("Can't get token")
if not await self._api_auth():
raise exceptions.HonAuthenticationError("Can't get api token")
except exceptions.HonNoAuthenticationNeeded:
return
async def refresh(self):
params = {
"client_id": const.CLIENT_ID,
@ -205,10 +231,18 @@ class HonAuth:
async with self._session.post(
f"{const.AUTH_API}/services/oauth2/token", params=params
) as response:
_LOGGER.debug("%s - %s", response.status, response.request_info.url)
self._called_urls.append((response.status, response.request_info.url))
if response.status >= 400:
await self._error_logger(response, fail=False)
return False
data = await response.json()
self._id_token = data["id_token"]
self._access_token = data["access_token"]
return True
return await self._api_auth()
def clear(self):
self._session.cookie_jar.clear_domain(const.AUTH_API.split("/")[-2])
self._cognito_token = ""
self._id_token = ""
self._access_token = ""
self._refresh_token = ""

View File

@ -6,12 +6,14 @@ import aiohttp
from pyhon import const
from pyhon.connection.auth import HonAuth, _LOGGER
from pyhon.connection.device import HonDevice
from pyhon.exceptions import HonAuthenticationError
class HonBaseConnectionHandler:
_HEADERS = {"user-agent": const.USER_AGENT, "Content-Type": "application/json"}
def __init__(self, session=None):
self._create_session = session is None
self._session = session
self._auth = None
@ -22,7 +24,8 @@ class HonBaseConnectionHandler:
await self.close()
async def create(self):
self._session = aiohttp.ClientSession(headers=self._HEADERS)
if self._create_session:
self._session = aiohttp.ClientSession()
return self
@asynccontextmanager
@ -40,7 +43,8 @@ class HonBaseConnectionHandler:
yield response
async def close(self):
await self._session.close()
if self._create_session:
await self._session.close()
class HonConnectionHandler(HonBaseConnectionHandler):
@ -50,10 +54,9 @@ class HonConnectionHandler(HonBaseConnectionHandler):
self._email = email
self._password = password
if not self._email:
raise PermissionError("Login-Error - An email address must be specified")
raise HonAuthenticationError("An email address must be specified")
if not self._password:
raise PermissionError("Login-Error - A password address must be specified")
self._request_headers = {}
raise HonAuthenticationError("A password address must be specified")
@property
def device(self):
@ -65,26 +68,24 @@ class HonConnectionHandler(HonBaseConnectionHandler):
return self
async def _check_headers(self, headers):
if (
"cognito-token" not in self._request_headers
or "id-token" not in self._request_headers
):
if await self._auth.authorize():
self._request_headers["cognito-token"] = self._auth.cognito_token
self._request_headers["id-token"] = self._auth.id_token
else:
raise PermissionError("Can't Login")
return {h: v for h, v in self._request_headers.items() if h not in headers}
if not (self._auth.cognito_token and self._auth.id_token):
await self._auth.authenticate()
headers["cognito-token"] = self._auth.cognito_token
headers["id-token"] = self._auth.id_token
return self._HEADERS | headers
@asynccontextmanager
async def _intercept(self, method, *args, loop=0, **kwargs):
kwargs["headers"] = await self._check_headers(kwargs.get("headers", {}))
async with method(*args, **kwargs) as response:
if response.status == 403 and not loop:
if response.status in [401, 403] and loop == 0:
_LOGGER.info("Try refreshing token...")
await self._auth.refresh()
yield await self._intercept(method, *args, loop=loop + 1, **kwargs)
elif response.status == 403 and loop < 2:
async with self._intercept(
method, *args, loop=loop + 1, **kwargs
) as result:
yield result
elif response.status in [401, 403] and loop == 1:
_LOGGER.warning(
"%s - Error %s - %s",
response.request_info.url,
@ -92,7 +93,10 @@ class HonConnectionHandler(HonBaseConnectionHandler):
await response.text(),
)
await self.create()
yield await self._intercept(method, *args, loop=loop + 1, **kwargs)
async with self._intercept(
method, *args, loop=loop + 1, **kwargs
) as result:
yield result
elif loop >= 2:
_LOGGER.error(
"%s - Error %s - %s",
@ -100,7 +104,7 @@ class HonConnectionHandler(HonBaseConnectionHandler):
response.status,
await response.text(),
)
raise PermissionError("Login failure")
raise HonAuthenticationError("Login failure")
else:
try:
await response.json()
@ -123,5 +127,5 @@ class HonAnonymousConnectionHandler(HonBaseConnectionHandler):
kwargs["headers"] = kwargs.pop("headers", {}) | self._HEADERS
async with method(*args, **kwargs) as response:
if response.status == 403:
print("Can't authorize")
_LOGGER.error("Can't authenticate anymore")
yield response

6
pyhon/exceptions.py Normal file
View File

@ -0,0 +1,6 @@
class HonAuthenticationError(Exception):
pass
class HonNoAuthenticationNeeded(Exception):
pass

63
pyhon/helper.py Normal file
View File

@ -0,0 +1,63 @@
def key_print(data, key="", start=True):
result = ""
if isinstance(data, list):
for i, value in enumerate(data):
result += key_print(value, key=f"{key}.{i}", start=False)
elif isinstance(data, dict):
for k, value in sorted(data.items()):
result += key_print(value, key=k if start else f"{key}.{k}", start=False)
else:
result += f"{key}: {data}\n"
return result
# yaml.dump() would be done the same, but needs an additional dependency...
def pretty_print(data, key="", intend=0, is_list=False, whitespace=" "):
result = ""
if isinstance(data, list):
if key:
result += f"{whitespace * intend}{'- ' if is_list else ''}{key}:\n"
intend += 1
for i, value in enumerate(data):
result += pretty_print(
value, intend=intend, is_list=True, whitespace=whitespace
)
elif isinstance(data, dict):
if key:
result += f"{whitespace * intend}{'- ' if is_list else ''}{key}:\n"
intend += 1
for i, (key, value) in enumerate(sorted(data.items())):
if is_list and not i:
result += pretty_print(
value, key=key, intend=intend, is_list=True, whitespace=whitespace
)
elif is_list:
result += pretty_print(
value, key=key, intend=intend + 1, whitespace=whitespace
)
else:
result += pretty_print(
value, key=key, intend=intend, whitespace=whitespace
)
else:
result += f"{whitespace * intend}{'- ' if is_list else ''}{key}{': ' if key else ''}{data}\n"
return result
def create_command(commands, concat=False):
result = {}
for name, command in commands.items():
if not concat:
result[name] = {}
for parameter, data in command.parameters.items():
if data.typology == "enum":
value = data.values
elif data.typology == "range":
value = {"min": data.min, "max": data.max, "step": data.step}
else:
continue
if not concat:
result[name][parameter] = value
else:
result[f"{name}.{parameter}"] = value
return result

View File

@ -5,7 +5,7 @@ def str_to_float(string):
try:
return int(string)
except ValueError:
return float(str(string.replace(",", ".")))
return float(str(string).replace(",", "."))
class HonParameter:

View File

@ -7,7 +7,7 @@ with open("README.md", "r") as f:
setup(
name="pyhOn",
version="0.6.3",
version="0.7.3",
author="Andre Basche",
description="Control hOn devices with python",
long_description=long_description,