Compare commits

...

6 Commits

Author SHA1 Message Date
46e6a85e84 add diagnose property for devices 2023-04-11 22:14:36 +02:00
8c832b44cd Fix token refresh problems 2023-04-11 17:09:02 +02:00
b4b782c52c Improve logging for authentication errors 2023-04-11 00:59:00 +02:00
e857fe91de Fix login issue 2023-04-10 20:32:35 +02:00
79c9678492 Fix float convertion 2023-04-10 18:48:46 +02:00
7c49589944 Use float for range parameter 2023-04-10 16:59:10 +02:00
9 changed files with 187 additions and 108 deletions

View File

@ -2,7 +2,7 @@ name: Python check
on: on:
push: push:
branches: [ "main", "refactor" ] branches: [ "main" ]
pull_request: pull_request:
branches: [ "main" ] branches: [ "main" ]

83
pyhon/__main__.py Executable file → Normal file
View File

@ -10,7 +10,7 @@ from pathlib import Path
if __name__ == "__main__": if __name__ == "__main__":
sys.path.insert(0, str(Path(__file__).parent.parent)) sys.path.insert(0, str(Path(__file__).parent.parent))
from pyhon import Hon, HonAPI from pyhon import Hon, HonAPI, helper
_LOGGER = logging.getLogger(__name__) _LOGGER = logging.getLogger(__name__)
@ -34,61 +34,6 @@ def get_arguments():
return vars(parser.parse_args()) return vars(parser.parse_args())
# yaml.dump() would be done the same, but needs an additional dependency...
def pretty_print(data, key="", intend=0, is_list=False):
if type(data) is list:
if key:
print(f"{' ' * intend}{'- ' if is_list else ''}{key}:")
intend += 1
for i, value in enumerate(data):
pretty_print(value, intend=intend, is_list=True)
elif type(data) is dict:
if key:
print(f"{' ' * intend}{'- ' if is_list else ''}{key}:")
intend += 1
for i, (key, value) in enumerate(sorted(data.items())):
if is_list and not i:
pretty_print(value, key=key, intend=intend, is_list=True)
elif is_list:
pretty_print(value, key=key, intend=intend + 1)
else:
pretty_print(value, key=key, intend=intend)
else:
print(
f"{' ' * intend}{'- ' if is_list else ''}{key}{': ' if key else ''}{data}"
)
def key_print(data, key="", start=True):
if type(data) is list:
for i, value in enumerate(data):
key_print(value, key=f"{key}.{i}", start=False)
elif type(data) is dict:
for k, value in sorted(data.items()):
key_print(value, key=k if start else f"{key}.{k}", start=False)
else:
print(f"{key}: {data}")
def create_command(commands, concat=False):
result = {}
for name, command in commands.items():
if not concat:
result[name] = {}
for parameter, data in command.parameters.items():
if data.typology == "enum":
value = data.values
elif data.typology == "range":
value = {"min": data.min, "max": data.max, "step": data.step}
else:
continue
if not concat:
result[name][parameter] = value
else:
result[f"{name}.{parameter}"] = value
return result
async def translate(language, json_output=False): async def translate(language, json_output=False):
async with HonAPI(anonymous=True) as hon: async with HonAPI(anonymous=True) as hon:
keys = await hon.translation_keys(language) keys = await hon.translation_keys(language)
@ -102,7 +47,7 @@ async def translate(language, json_output=False):
.replace("\\r", "") .replace("\\r", "")
) )
keys = json.loads(clean_keys) keys = json.loads(clean_keys)
pretty_print(keys) print(helper.pretty_print(keys))
async def main(): async def main():
@ -120,13 +65,25 @@ async def main():
if args.get("keys"): if args.get("keys"):
data = device.data.copy() data = device.data.copy()
attr = "get" if args.get("all") else "pop" attr = "get" if args.get("all") else "pop"
key_print(data["attributes"].__getattribute__(attr)("parameters")) print(
key_print(data.__getattribute__(attr)("appliance")) helper.key_print(
key_print(data) data["attributes"].__getattribute__(attr)("parameters")
pretty_print(create_command(device.commands, concat=True)) )
)
print(helper.key_print(data.__getattribute__(attr)("appliance")))
print(helper.key_print(data))
print(
helper.pretty_print(
helper.create_command(device.commands, concat=True)
)
)
else: else:
pretty_print({"data": device.data}) print(helper.pretty_print({"data": device.data}))
pretty_print({"settings": create_command(device.commands)}) print(
helper.pretty_print(
{"settings": helper.create_command(device.commands)}
)
)
def start(): def start():

View File

@ -1,6 +1,7 @@
import importlib import importlib
from contextlib import suppress from contextlib import suppress
from pyhon import helper
from pyhon.commands import HonCommand from pyhon.commands import HonCommand
from pyhon.parameter import HonParameterFixed from pyhon.parameter import HonParameterFixed
@ -172,3 +173,15 @@ class HonAppliance:
if self._extra: if self._extra:
return self._extra.data(result) return self._extra.data(result)
return result return result
@property
def diagnose(self):
data = self.data.copy()
for sensible in ["PK", "SK", "serialNumber", "code"]:
data["appliance"].pop(sensible, None)
result = helper.pretty_print({"data": self.data}, whitespace="\u200B \u200B ")
result += helper.pretty_print(
{"commands": helper.create_command(self.commands)},
whitespace="\u200B \u200B ",
)
return result.replace(self.mac_address, "12-34-56-78-90-ab")

View File

@ -9,8 +9,9 @@ from urllib import parse
from yarl import URL from yarl import URL
from pyhon import const from pyhon import const
from pyhon.exceptions import HonAuthenticationError
_LOGGER = logging.getLogger() _LOGGER = logging.getLogger(__name__)
class HonAuth: class HonAuth:
@ -23,6 +24,7 @@ class HonAuth:
self._cognito_token = "" self._cognito_token = ""
self._id_token = "" self._id_token = ""
self._device = device self._device = device
self._called_urls = []
@property @property
def cognito_token(self): def cognito_token(self):
@ -40,6 +42,16 @@ class HonAuth:
def refresh_token(self): def refresh_token(self):
return self._refresh_token return self._refresh_token
async def _error_logger(self, response, fail=True):
result = "hOn Authentication Error\n"
for i, (status, url) in enumerate(self._called_urls):
result += f" {i + 1: 2d} {status} - {url}\n"
result += f"ERROR - {response.status} - {response.request_info.url}\n"
result += f"{15 * '='} Response {15 * '='}\n{await response.text()}\n{40 * '='}"
_LOGGER.error(result)
if fail:
raise HonAuthenticationError("Can't login")
async def _load_login(self): async def _load_login(self):
nonce = secrets.token_hex(16) nonce = secrets.token_hex(16)
nonce = f"{nonce[:8]}-{nonce[8:12]}-{nonce[12:16]}-{nonce[16:20]}-{nonce[20:]}" nonce = f"{nonce[:8]}-{nonce[8:12]}-{nonce[12:16]}-{nonce[16:20]}-{nonce[20:]}"
@ -56,19 +68,28 @@ class HonAuth:
params = "&".join([f"{k}={v}" for k, v in params.items()]) params = "&".join([f"{k}={v}" for k, v in params.items()])
async with self._session.get( async with self._session.get(
f"{const.AUTH_API}/services/oauth2/authorize/expid_Login?{params}" f"{const.AUTH_API}/services/oauth2/authorize/expid_Login?{params}"
) as resp: ) as response:
if not (login_url := re.findall("url = '(.+?)'", await resp.text())): self._called_urls.append((response.status, response.request_info.url))
if not (login_url := re.findall("url = '(.+?)'", await response.text())):
await self._error_logger(response)
return False return False
async with self._session.get(login_url[0], allow_redirects=False) as redirect1: async with self._session.get(login_url[0], allow_redirects=False) as redirect1:
self._called_urls.append((redirect1.status, redirect1.request_info.url))
if not (url := redirect1.headers.get("Location")): if not (url := redirect1.headers.get("Location")):
await self._error_logger(redirect1)
return False return False
async with self._session.get(url, allow_redirects=False) as redirect2: async with self._session.get(url, allow_redirects=False) as redirect2:
self._called_urls.append((redirect2.status, redirect2.request_info.url))
if not ( if not (
url := redirect2.headers.get("Location") url := redirect2.headers.get("Location")
+ "&System=IoT_Mobile_App&RegistrationSubChannel=hOn" + "&System=IoT_Mobile_App&RegistrationSubChannel=hOn"
): ):
await self._error_logger(redirect2)
return False return False
async with self._session.get(URL(url, encoded=True)) as login_screen: async with self._session.get(URL(url, encoded=True)) as login_screen:
self._called_urls.append(
(login_screen.status, login_screen.request_info.url)
)
if context := re.findall( if context := re.findall(
'"fwuid":"(.*?)","loaded":(\\{.*?})', await login_screen.text() '"fwuid":"(.*?)","loaded":(\\{.*?})', await login_screen.text()
): ):
@ -78,6 +99,7 @@ class HonAuth:
"/".join(const.AUTH_API.split("/")[:-1]), "" "/".join(const.AUTH_API.split("/")[:-1]), ""
) )
return fw_uid, loaded, login_url return fw_uid, loaded, login_url
await self._error_logger(login_screen)
return False return False
async def _login(self, fw_uid, loaded, login_url): async def _login(self, fw_uid, loaded, login_url):
@ -117,6 +139,7 @@ class HonAuth:
data="&".join(f"{k}={json.dumps(v)}" for k, v in data.items()), data="&".join(f"{k}={json.dumps(v)}" for k, v in data.items()),
params=params, params=params,
) as response: ) as response:
self._called_urls.append((response.status, response.request_info.url))
if response.status == 200: if response.status == 200:
try: try:
data = await response.json() data = await response.json()
@ -127,31 +150,33 @@ class HonAuth:
_LOGGER.error( _LOGGER.error(
"Can't get login url - %s", pformat(await response.json()) "Can't get login url - %s", pformat(await response.json())
) )
_LOGGER.error( await self._error_logger(response)
"Unable to login: %s\n%s", response.status, await response.text()
)
return "" return ""
async def _get_token(self, url): async def _get_token(self, url):
async with self._session.get(url) as resp: async with self._session.get(url) as response:
if resp.status != 200: self._called_urls.append((response.status, response.request_info.url))
_LOGGER.error("Unable to get token: %s", resp.status) if response.status != 200:
await self._error_logger(response)
return False return False
url = re.findall("href\\s*=\\s*[\"'](http.+?)[\"']", await resp.text()) url = re.findall("href\\s*=\\s*[\"'](.+?)[\"']", await response.text())
if not url: if not url:
_LOGGER.error("Can't get login url - \n%s", await resp.text()) await self._error_logger(response)
raise PermissionError
async with self._session.get(url[0]) as resp:
if resp.status != 200:
_LOGGER.error("Unable to get token: %s", resp.status)
return False return False
url = re.findall("href\\s*=\\s*[\"'](.*?)[\"']", await resp.text()) if "ProgressiveLogin" in url[0]:
url = "/".join(const.AUTH_API.split("/")[:-1]) + url[0] async with self._session.get(url[0]) as response:
async with self._session.get(url) as resp: self._called_urls.append((response.status, response.request_info.url))
if resp.status != 200: if response.status != 200:
_LOGGER.error("Unable to connect to the login service: %s", resp.status) await self._error_logger(response)
return False
url = re.findall("href\\s*=\\s*[\"'](.*?)[\"']", await response.text())
url = "/".join(const.AUTH_API.split("/")[:-1]) + url[0]
async with self._session.get(url) as response:
self._called_urls.append((response.status, response.request_info.url))
if response.status != 200:
await self._error_logger(response)
return False return False
text = await resp.text() text = await response.text()
if access_token := re.findall("access_token=(.*?)&", text): if access_token := re.findall("access_token=(.*?)&", text):
self._access_token = access_token[0] self._access_token = access_token[0]
if refresh_token := re.findall("refresh_token=(.*?)&", text): if refresh_token := re.findall("refresh_token=(.*?)&", text):
@ -169,16 +194,19 @@ class HonAuth:
return False return False
if not await self._get_token(url): if not await self._get_token(url):
return False return False
return await self._api_auth()
async def _api_auth(self):
post_headers = {"id-token": self._id_token} post_headers = {"id-token": self._id_token}
data = self._device.get() data = self._device.get()
async with self._session.post( async with self._session.post(
f"{const.API_URL}/auth/v1/login", headers=post_headers, json=data f"{const.API_URL}/auth/v1/login", headers=post_headers, json=data
) as resp: ) as response:
self._called_urls.append((response.status, response.request_info.url))
try: try:
json_data = await resp.json() json_data = await response.json()
except json.JSONDecodeError: except json.JSONDecodeError:
_LOGGER.error("No JSON Data after POST: %s", await resp.text()) await self._error_logger(response)
return False return False
self._cognito_token = json_data["cognitoUser"]["Token"] self._cognito_token = json_data["cognitoUser"]["Token"]
return True return True
@ -191,10 +219,12 @@ class HonAuth:
} }
async with self._session.post( async with self._session.post(
f"{const.AUTH_API}/services/oauth2/token", params=params f"{const.AUTH_API}/services/oauth2/token", params=params
) as resp: ) as response:
if resp.status >= 400: self._called_urls.append((response.status, response.request_info.url))
if response.status >= 400:
await self._error_logger(response, fail=False)
return False return False
data = await resp.json() data = await response.json()
self._id_token = data["id_token"] self._id_token = data["id_token"]
self._access_token = data["access_token"] self._access_token = data["access_token"]
return True return await self._api_auth()

View File

@ -6,6 +6,7 @@ import aiohttp
from pyhon import const from pyhon import const
from pyhon.connection.auth import HonAuth, _LOGGER from pyhon.connection.auth import HonAuth, _LOGGER
from pyhon.connection.device import HonDevice from pyhon.connection.device import HonDevice
from pyhon.exceptions import HonAuthenticationError
class HonBaseConnectionHandler: class HonBaseConnectionHandler:
@ -50,9 +51,9 @@ class HonConnectionHandler(HonBaseConnectionHandler):
self._email = email self._email = email
self._password = password self._password = password
if not self._email: if not self._email:
raise PermissionError("Login-Error - An email address must be specified") raise HonAuthenticationError("An email address must be specified")
if not self._password: if not self._password:
raise PermissionError("Login-Error - A password address must be specified") raise HonAuthenticationError("A password address must be specified")
self._request_headers = {} self._request_headers = {}
@property @property
@ -73,18 +74,21 @@ class HonConnectionHandler(HonBaseConnectionHandler):
self._request_headers["cognito-token"] = self._auth.cognito_token self._request_headers["cognito-token"] = self._auth.cognito_token
self._request_headers["id-token"] = self._auth.id_token self._request_headers["id-token"] = self._auth.id_token
else: else:
raise PermissionError("Can't Login") raise HonAuthenticationError("Can't login")
return {h: v for h, v in self._request_headers.items() if h not in headers} return headers | self._request_headers
@asynccontextmanager @asynccontextmanager
async def _intercept(self, method, *args, loop=0, **kwargs): async def _intercept(self, method, *args, loop=0, **kwargs):
kwargs["headers"] = await self._check_headers(kwargs.get("headers", {})) kwargs["headers"] = await self._check_headers(kwargs.get("headers", {}))
async with method(*args, **kwargs) as response: async with method(*args, **kwargs) as response:
if response.status == 403 and not loop: if response.status in [401, 403] and loop == 0:
_LOGGER.info("Try refreshing token...") _LOGGER.info("Try refreshing token...")
await self._auth.refresh() await self._auth.refresh()
yield await self._intercept(method, *args, loop=loop + 1, **kwargs) async with self._intercept(
elif response.status == 403 and loop < 2: method, *args, loop=loop + 1, **kwargs
) as result:
yield result
elif response.status in [401, 403] and loop == 1:
_LOGGER.warning( _LOGGER.warning(
"%s - Error %s - %s", "%s - Error %s - %s",
response.request_info.url, response.request_info.url,
@ -92,7 +96,10 @@ class HonConnectionHandler(HonBaseConnectionHandler):
await response.text(), await response.text(),
) )
await self.create() await self.create()
yield await self._intercept(method, *args, loop=loop + 1, **kwargs) async with self._intercept(
method, *args, loop=loop + 1, **kwargs
) as result:
yield result
elif loop >= 2: elif loop >= 2:
_LOGGER.error( _LOGGER.error(
"%s - Error %s - %s", "%s - Error %s - %s",
@ -100,7 +107,7 @@ class HonConnectionHandler(HonBaseConnectionHandler):
response.status, response.status,
await response.text(), await response.text(),
) )
raise PermissionError("Login failure") raise HonAuthenticationError("Login failure")
else: else:
try: try:
await response.json() await response.json()
@ -123,5 +130,5 @@ class HonAnonymousConnectionHandler(HonBaseConnectionHandler):
kwargs["headers"] = kwargs.pop("headers", {}) | self._HEADERS kwargs["headers"] = kwargs.pop("headers", {}) | self._HEADERS
async with method(*args, **kwargs) as response: async with method(*args, **kwargs) as response:
if response.status == 403: if response.status == 403:
print("Can't authorize") _LOGGER.error("Can't authenticate anymore")
yield response yield response

2
pyhon/exceptions.py Normal file
View File

@ -0,0 +1,2 @@
class HonAuthenticationError(Exception):
pass

63
pyhon/helper.py Normal file
View File

@ -0,0 +1,63 @@
def key_print(data, key="", start=True):
result = ""
if isinstance(data, list):
for i, value in enumerate(data):
result += key_print(value, key=f"{key}.{i}", start=False)
elif isinstance(data, dict):
for k, value in sorted(data.items()):
result += key_print(value, key=k if start else f"{key}.{k}", start=False)
else:
result += f"{key}: {data}\n"
return result
# yaml.dump() would be done the same, but needs an additional dependency...
def pretty_print(data, key="", intend=0, is_list=False, whitespace=" "):
result = ""
if isinstance(data, list):
if key:
result += f"{whitespace * intend}{'- ' if is_list else ''}{key}:\n"
intend += 1
for i, value in enumerate(data):
result += pretty_print(
value, intend=intend, is_list=True, whitespace=whitespace
)
elif isinstance(data, dict):
if key:
result += f"{whitespace * intend}{'- ' if is_list else ''}{key}:\n"
intend += 1
for i, (key, value) in enumerate(sorted(data.items())):
if is_list and not i:
result += pretty_print(
value, key=key, intend=intend, is_list=True, whitespace=whitespace
)
elif is_list:
result += pretty_print(
value, key=key, intend=intend + 1, whitespace=whitespace
)
else:
result += pretty_print(
value, key=key, intend=intend, whitespace=whitespace
)
else:
result += f"{whitespace * intend}{'- ' if is_list else ''}{key}{': ' if key else ''}{data}\n"
return result
def create_command(commands, concat=False):
result = {}
for name, command in commands.items():
if not concat:
result[name] = {}
for parameter, data in command.parameters.items():
if data.typology == "enum":
value = data.values
elif data.typology == "range":
value = {"min": data.min, "max": data.max, "step": data.step}
else:
continue
if not concat:
result[name][parameter] = value
else:
result[f"{name}.{parameter}"] = value
return result

View File

@ -1,6 +1,13 @@
import re import re
def str_to_float(string):
try:
return int(string)
except ValueError:
return float(str(string).replace(",", "."))
class HonParameter: class HonParameter:
def __init__(self, key, attributes): def __init__(self, key, attributes):
self._key = key self._key = key
@ -51,10 +58,10 @@ class HonParameterFixed(HonParameter):
class HonParameterRange(HonParameter): class HonParameterRange(HonParameter):
def __init__(self, key, attributes): def __init__(self, key, attributes):
super().__init__(key, attributes) super().__init__(key, attributes)
self._min = int(attributes["minimumValue"]) self._min = str_to_float(attributes["minimumValue"])
self._max = int(attributes["maximumValue"]) self._max = str_to_float(attributes["maximumValue"])
self._step = int(attributes["incrementValue"]) self._step = str_to_float(attributes["incrementValue"])
self._default = int(attributes.get("defaultValue", self._min)) self._default = str_to_float(attributes.get("defaultValue", self._min))
self._value = self._default self._value = self._default
def __repr__(self): def __repr__(self):
@ -78,7 +85,7 @@ class HonParameterRange(HonParameter):
@value.setter @value.setter
def value(self, value): def value(self, value):
value = int(value) value = str_to_float(value)
if self._min <= value <= self._max and not value % self._step: if self._min <= value <= self._max and not value % self._step:
self._value = value self._value = value
else: else:

View File

@ -7,7 +7,7 @@ with open("README.md", "r") as f:
setup( setup(
name="pyhOn", name="pyhOn",
version="0.6.0", version="0.7.0",
author="Andre Basche", author="Andre Basche",
description="Control hOn devices with python", description="Control hOn devices with python",
long_description=long_description, long_description=long_description,