Compare commits

...

10 Commits

11 changed files with 371 additions and 206 deletions

83
pyhon/__main__.py Executable file → Normal file
View File

@ -10,7 +10,7 @@ from pathlib import Path
if __name__ == "__main__": if __name__ == "__main__":
sys.path.insert(0, str(Path(__file__).parent.parent)) sys.path.insert(0, str(Path(__file__).parent.parent))
from pyhon import Hon, HonAPI from pyhon import Hon, HonAPI, helper
_LOGGER = logging.getLogger(__name__) _LOGGER = logging.getLogger(__name__)
@ -34,61 +34,6 @@ def get_arguments():
return vars(parser.parse_args()) return vars(parser.parse_args())
# yaml.dump() would be done the same, but needs an additional dependency...
def pretty_print(data, key="", intend=0, is_list=False):
if type(data) is list:
if key:
print(f"{' ' * intend}{'- ' if is_list else ''}{key}:")
intend += 1
for i, value in enumerate(data):
pretty_print(value, intend=intend, is_list=True)
elif type(data) is dict:
if key:
print(f"{' ' * intend}{'- ' if is_list else ''}{key}:")
intend += 1
for i, (key, value) in enumerate(sorted(data.items())):
if is_list and not i:
pretty_print(value, key=key, intend=intend, is_list=True)
elif is_list:
pretty_print(value, key=key, intend=intend + 1)
else:
pretty_print(value, key=key, intend=intend)
else:
print(
f"{' ' * intend}{'- ' if is_list else ''}{key}{': ' if key else ''}{data}"
)
def key_print(data, key="", start=True):
if type(data) is list:
for i, value in enumerate(data):
key_print(value, key=f"{key}.{i}", start=False)
elif type(data) is dict:
for k, value in sorted(data.items()):
key_print(value, key=k if start else f"{key}.{k}", start=False)
else:
print(f"{key}: {data}")
def create_command(commands, concat=False):
result = {}
for name, command in commands.items():
if not concat:
result[name] = {}
for parameter, data in command.parameters.items():
if data.typology == "enum":
value = data.values
elif data.typology == "range":
value = {"min": data.min, "max": data.max, "step": data.step}
else:
continue
if not concat:
result[name][parameter] = value
else:
result[f"{name}.{parameter}"] = value
return result
async def translate(language, json_output=False): async def translate(language, json_output=False):
async with HonAPI(anonymous=True) as hon: async with HonAPI(anonymous=True) as hon:
keys = await hon.translation_keys(language) keys = await hon.translation_keys(language)
@ -102,7 +47,7 @@ async def translate(language, json_output=False):
.replace("\\r", "") .replace("\\r", "")
) )
keys = json.loads(clean_keys) keys = json.loads(clean_keys)
pretty_print(keys) print(helper.pretty_print(keys))
async def main(): async def main():
@ -120,13 +65,25 @@ async def main():
if args.get("keys"): if args.get("keys"):
data = device.data.copy() data = device.data.copy()
attr = "get" if args.get("all") else "pop" attr = "get" if args.get("all") else "pop"
key_print(data["attributes"].__getattribute__(attr)("parameters")) print(
key_print(data.__getattribute__(attr)("appliance")) helper.key_print(
key_print(data) data["attributes"].__getattribute__(attr)("parameters")
pretty_print(create_command(device.commands, concat=True)) )
)
print(helper.key_print(data.__getattribute__(attr)("appliance")))
print(helper.key_print(data))
print(
helper.pretty_print(
helper.create_command(device.commands, concat=True)
)
)
else: else:
pretty_print({"data": device.data}) print(helper.pretty_print({"data": device.data}))
pretty_print({"settings": create_command(device.commands)}) print(
helper.pretty_print(
{"settings": helper.create_command(device.commands)}
)
)
def start(): def start():

View File

@ -1,6 +1,7 @@
import importlib import importlib
from contextlib import suppress from contextlib import suppress
from pyhon import helper
from pyhon.commands import HonCommand from pyhon.commands import HonCommand
from pyhon.parameter import HonParameterFixed from pyhon.parameter import HonParameterFixed
@ -172,3 +173,15 @@ class HonAppliance:
if self._extra: if self._extra:
return self._extra.data(result) return self._extra.data(result)
return result return result
@property
def diagnose(self):
data = self.data.copy()
for sensible in ["PK", "SK", "serialNumber", "code", "coords"]:
data["appliance"].pop(sensible, None)
result = helper.pretty_print({"data": self.data}, whitespace="\u200B \u200B ")
result += helper.pretty_print(
{"commands": helper.create_command(self.commands)},
whitespace="\u200B \u200B ",
)
return result.replace(self.mac_address, "12-34-56-78-90-ab")

View File

@ -1,46 +1,75 @@
import json import json
import logging import logging
from datetime import datetime from datetime import datetime
from typing import Dict, Optional
from typing_extensions import Self
from pyhon import const from aiohttp import ClientSession
from pyhon import const, exceptions
from pyhon.appliance import HonAppliance from pyhon.appliance import HonAppliance
from pyhon.connection.auth import HonAuth
from pyhon.connection.handler import HonConnectionHandler, HonAnonymousConnectionHandler from pyhon.connection.handler import HonConnectionHandler, HonAnonymousConnectionHandler
_LOGGER = logging.getLogger() _LOGGER = logging.getLogger()
class HonAPI: class HonAPI:
def __init__(self, email="", password="", anonymous=False, session=None) -> None: def __init__(
self,
email: str = "",
password: str = "",
anonymous: bool = False,
session: Optional[ClientSession] = None,
) -> None:
super().__init__() super().__init__()
self._email = email self._email: str = email
self._password = password self._password: str = password
self._anonymous = anonymous self._anonymous: bool = anonymous
self._hon = None self._hon_handler: Optional[HonConnectionHandler] = None
self._hon_anonymous = None self._hon_anonymous_handler: Optional[HonAnonymousConnectionHandler] = None
self._session = session self._session: Optional[ClientSession] = session
async def __aenter__(self): async def __aenter__(self) -> Self:
return await self.create() return await self.create()
async def __aexit__(self, exc_type, exc_val, exc_tb): async def __aexit__(self, exc_type, exc_val, exc_tb) -> None:
await self.close() await self.close()
async def create(self): @property
self._hon_anonymous = await HonAnonymousConnectionHandler( def auth(self) -> HonAuth:
if self._hon is None or self._hon.auth is None:
raise exceptions.NoAuthenticationException
return self._hon.auth
@property
def _hon(self):
if self._hon_handler is None:
raise exceptions.NoAuthenticationException
return self._hon_handler
@property
def _hon_anonymous(self):
if self._hon_anonymous_handler is None:
raise exceptions.NoAuthenticationException
return self._hon_anonymous_handler
async def create(self) -> Self:
self._hon_anonymous_handler = await HonAnonymousConnectionHandler(
self._session self._session
).create() ).create()
if not self._anonymous: if not self._anonymous:
self._hon = await HonConnectionHandler( self._hon_handler = await HonConnectionHandler(
self._email, self._password, self._session self._email, self._password, self._session
).create() ).create()
return self return self
async def load_appliances(self): async def load_appliances(self) -> Dict:
async with self._hon.get(f"{const.API_URL}/commands/v1/appliance") as resp: async with self._hon.get(f"{const.API_URL}/commands/v1/appliance") as resp:
return await resp.json() return await resp.json()
async def load_commands(self, appliance: HonAppliance): async def load_commands(self, appliance: HonAppliance) -> Dict:
params = { params: Dict = {
"applianceType": appliance.appliance_type, "applianceType": appliance.appliance_type,
"code": appliance.info["code"], "code": appliance.info["code"],
"applianceModelId": appliance.appliance_model_id, "applianceModelId": appliance.appliance_model_id,
@ -51,58 +80,66 @@ class HonAPI:
"appVersion": const.APP_VERSION, "appVersion": const.APP_VERSION,
"series": appliance.info["series"], "series": appliance.info["series"],
} }
url = f"{const.API_URL}/commands/v1/retrieve" url: str = f"{const.API_URL}/commands/v1/retrieve"
async with self._hon.get(url, params=params) as response: async with self._hon.get(url, params=params) as response:
result = (await response.json()).get("payload", {}) result: Dict = (await response.json()).get("payload", {})
if not result or result.pop("resultCode") != "0": if not result or result.pop("resultCode") != "0":
return {} return {}
return result return result
async def command_history(self, appliance: HonAppliance): async def command_history(self, appliance: HonAppliance) -> Dict:
url = f"{const.API_URL}/commands/v1/appliance/{appliance.mac_address}/history" url: str = (
f"{const.API_URL}/commands/v1/appliance/{appliance.mac_address}/history"
)
async with self._hon.get(url) as response: async with self._hon.get(url) as response:
result = await response.json() result: Dict = await response.json()
if not result or not result.get("payload"): if not result or not result.get("payload"):
return {} return {}
return result["payload"]["history"] return result["payload"]["history"]
async def last_activity(self, appliance: HonAppliance): async def last_activity(self, appliance: HonAppliance) -> Dict:
url = f"{const.API_URL}/commands/v1/retrieve-last-activity" url: str = f"{const.API_URL}/commands/v1/retrieve-last-activity"
params = {"macAddress": appliance.mac_address} params: Dict = {"macAddress": appliance.mac_address}
async with self._hon.get(url, params=params) as response: async with self._hon.get(url, params=params) as response:
result = await response.json() result: Dict = await response.json()
if result and (activity := result.get("attributes")): if result and (activity := result.get("attributes")):
return activity return activity
return {} return {}
async def load_attributes(self, appliance: HonAppliance): async def load_attributes(self, appliance: HonAppliance) -> Dict:
params = { params: Dict = {
"macAddress": appliance.mac_address, "macAddress": appliance.mac_address,
"applianceType": appliance.appliance_type, "applianceType": appliance.appliance_type,
"category": "CYCLE", "category": "CYCLE",
} }
url = f"{const.API_URL}/commands/v1/context" url: str = f"{const.API_URL}/commands/v1/context"
async with self._hon.get(url, params=params) as response: async with self._hon.get(url, params=params) as response:
return (await response.json()).get("payload", {}) return (await response.json()).get("payload", {})
async def load_statistics(self, appliance: HonAppliance): async def load_statistics(self, appliance: HonAppliance) -> Dict:
params = { params: Dict = {
"macAddress": appliance.mac_address, "macAddress": appliance.mac_address,
"applianceType": appliance.appliance_type, "applianceType": appliance.appliance_type,
} }
url = f"{const.API_URL}/commands/v1/statistics" url: str = f"{const.API_URL}/commands/v1/statistics"
async with self._hon.get(url, params=params) as response: async with self._hon.get(url, params=params) as response:
return (await response.json()).get("payload", {}) return (await response.json()).get("payload", {})
async def send_command(self, appliance, command, parameters, ancillary_parameters): async def send_command(
now = datetime.utcnow().isoformat() self,
data = { appliance: HonAppliance,
command: str,
parameters: Dict,
ancillary_parameters: Dict,
) -> bool:
now: str = datetime.utcnow().isoformat()
data: Dict = {
"macAddress": appliance.mac_address, "macAddress": appliance.mac_address,
"timestamp": f"{now[:-3]}Z", "timestamp": f"{now[:-3]}Z",
"commandName": command, "commandName": command,
"transactionId": f"{appliance.mac_address}_{now[:-3]}Z", "transactionId": f"{appliance.mac_address}_{now[:-3]}Z",
"applianceOptions": appliance.commands_options, "applianceOptions": appliance.commands_options,
"appliance": self._hon.device.get(), "device": self._hon.device.get(mobile=True),
"attributes": { "attributes": {
"channel": "mobileApp", "channel": "mobileApp",
"origin": "standardProgram", "origin": "standardProgram",
@ -112,36 +149,37 @@ class HonAPI:
"parameters": parameters, "parameters": parameters,
"applianceType": appliance.appliance_type, "applianceType": appliance.appliance_type,
} }
url = f"{const.API_URL}/commands/v1/send" url: str = f"{const.API_URL}/commands/v1/send"
async with self._hon.post(url, json=data) as resp: async with self._hon.post(url, json=data) as response:
json_data = await resp.json() json_data: Dict = await response.json()
if json_data.get("payload", {}).get("resultCode") == "0": if json_data.get("payload", {}).get("resultCode") == "0":
return True return True
_LOGGER.error(await response.text())
return False return False
async def appliance_configuration(self): async def appliance_configuration(self) -> Dict:
url = f"{const.API_URL}/config/v1/appliance-configuration" url: str = f"{const.API_URL}/config/v1/appliance-configuration"
async with self._hon_anonymous.get(url) as response: async with self._hon_anonymous.get(url) as response:
result = await response.json() result: Dict = await response.json()
if result and (data := result.get("payload")): if result and (data := result.get("payload")):
return data return data
return {} return {}
async def app_config(self, language="en", beta=True): async def app_config(self, language: str = "en", beta: bool = True) -> Dict:
url = f"{const.API_URL}/app-config" url: str = f"{const.API_URL}/app-config"
payload = { payload_data: Dict = {
"languageCode": language, "languageCode": language,
"beta": beta, "beta": beta,
"appVersion": const.APP_VERSION, "appVersion": const.APP_VERSION,
"os": const.OS, "os": const.OS,
} }
payload = json.dumps(payload, separators=(",", ":")) payload: str = json.dumps(payload_data, separators=(",", ":"))
async with self._hon_anonymous.post(url, data=payload) as response: async with self._hon_anonymous.post(url, data=payload) as response:
if (result := await response.json()) and (data := result.get("payload")): if (result := await response.json()) and (data := result.get("payload")):
return data return data
return {} return {}
async def translation_keys(self, language="en"): async def translation_keys(self, language: str = "en") -> Dict:
config = await self.app_config(language=language) config = await self.app_config(language=language)
if url := config.get("language", {}).get("jsonPath"): if url := config.get("language", {}).get("jsonPath"):
async with self._hon_anonymous.get(url) as response: async with self._hon_anonymous.get(url) as response:
@ -149,8 +187,8 @@ class HonAPI:
return result return result
return {} return {}
async def close(self): async def close(self) -> None:
if self._hon: if self._hon_handler is not None:
await self._hon.close() await self._hon_handler.close()
if self._hon_anonymous: if self._hon_anonymous_handler is not None:
await self._hon_anonymous.close() await self._hon_anonymous_handler.close()

View File

@ -3,18 +3,23 @@ import logging
import re import re
import secrets import secrets
import urllib import urllib
from datetime import datetime, timedelta
from pprint import pformat from pprint import pformat
from typing import List, Tuple
from urllib import parse from urllib import parse
from urllib.parse import quote
from yarl import URL from yarl import URL
from pyhon import const from pyhon import const, exceptions
from pyhon.exceptions import HonAuthenticationError
_LOGGER = logging.getLogger(__name__) _LOGGER = logging.getLogger(__name__)
class HonAuth: class HonAuth:
_TOKEN_EXPIRES_AFTER_HOURS = 8
_TOKEN_EXPIRE_WARNING_HOURS = 7
def __init__(self, session, email, password, device) -> None: def __init__(self, session, email, password, device) -> None:
self._session = session self._session = session
self._email = email self._email = email
@ -24,7 +29,8 @@ class HonAuth:
self._cognito_token = "" self._cognito_token = ""
self._id_token = "" self._id_token = ""
self._device = device self._device = device
self._called_urls = [] self._called_urls: List[Tuple[int, str]] = []
self._expires: datetime = datetime.utcnow()
@property @property
def cognito_token(self): def cognito_token(self):
@ -42,6 +48,17 @@ class HonAuth:
def refresh_token(self): def refresh_token(self):
return self._refresh_token return self._refresh_token
def _check_token_expiration(self, hours):
return datetime.utcnow() >= self._expires + timedelta(hours=hours)
@property
def token_is_expired(self) -> bool:
return self._check_token_expiration(self._TOKEN_EXPIRES_AFTER_HOURS)
@property
def token_expires_soon(self) -> bool:
return self._check_token_expiration(self._TOKEN_EXPIRE_WARNING_HOURS)
async def _error_logger(self, response, fail=True): async def _error_logger(self, response, fail=True):
result = "hOn Authentication Error\n" result = "hOn Authentication Error\n"
for i, (status, url) in enumerate(self._called_urls): for i, (status, url) in enumerate(self._called_urls):
@ -50,7 +67,7 @@ class HonAuth:
result += f"{15 * '='} Response {15 * '='}\n{await response.text()}\n{40 * '='}" result += f"{15 * '='} Response {15 * '='}\n{await response.text()}\n{40 * '='}"
_LOGGER.error(result) _LOGGER.error(result)
if fail: if fail:
raise HonAuthenticationError("Can't login") raise exceptions.HonAuthenticationError("Can't login")
async def _load_login(self): async def _load_login(self):
nonce = secrets.token_hex(16) nonce = secrets.token_hex(16)
@ -70,7 +87,12 @@ class HonAuth:
f"{const.AUTH_API}/services/oauth2/authorize/expid_Login?{params}" f"{const.AUTH_API}/services/oauth2/authorize/expid_Login?{params}"
) as response: ) as response:
self._called_urls.append((response.status, response.request_info.url)) self._called_urls.append((response.status, response.request_info.url))
if not (login_url := re.findall("url = '(.+?)'", await response.text())): text = await response.text()
self._expires = datetime.utcnow()
if not (login_url := re.findall("url = '(.+?)'", text)):
if "oauth/done#access_token=" in text:
self._parse_token_data(text)
raise exceptions.HonNoAuthenticationNeeded()
await self._error_logger(response) await self._error_logger(response)
return False return False
async with self._session.get(login_url[0], allow_redirects=False) as redirect1: async with self._session.get(login_url[0], allow_redirects=False) as redirect1:
@ -86,7 +108,9 @@ class HonAuth:
): ):
await self._error_logger(redirect2) await self._error_logger(redirect2)
return False return False
async with self._session.get(URL(url, encoded=True)) as login_screen: async with self._session.get(
URL(url, encoded=True), headers={"user-agent": const.USER_AGENT}
) as login_screen:
self._called_urls.append( self._called_urls.append(
(login_screen.status, login_screen.request_info.url) (login_screen.status, login_screen.request_info.url)
) )
@ -111,8 +135,8 @@ class HonAuth:
"descriptor": "apex://LightningLoginCustomController/ACTION$login", "descriptor": "apex://LightningLoginCustomController/ACTION$login",
"callingDescriptor": "markup://c:loginForm", "callingDescriptor": "markup://c:loginForm",
"params": { "params": {
"username": self._email, "username": quote(self._email),
"password": self._password, "password": quote(self._password),
"startUrl": parse.unquote( "startUrl": parse.unquote(
login_url.split("startURL=")[-1] login_url.split("startURL=")[-1]
).split("%3D")[0], ).split("%3D")[0],
@ -153,6 +177,14 @@ class HonAuth:
await self._error_logger(response) await self._error_logger(response)
return "" return ""
def _parse_token_data(self, text):
if access_token := re.findall("access_token=(.*?)&", text):
self._access_token = access_token[0]
if refresh_token := re.findall("refresh_token=(.*?)&", text):
self._refresh_token = refresh_token[0]
if id_token := re.findall("id_token=(.*?)&", text):
self._id_token = id_token[0]
async def _get_token(self, url): async def _get_token(self, url):
async with self._session.get(url) as response: async with self._session.get(url) as response:
self._called_urls.append((response.status, response.request_info.url)) self._called_urls.append((response.status, response.request_info.url))
@ -176,25 +208,10 @@ class HonAuth:
if response.status != 200: if response.status != 200:
await self._error_logger(response) await self._error_logger(response)
return False return False
text = await response.text() self._parse_token_data(await response.text())
if access_token := re.findall("access_token=(.*?)&", text):
self._access_token = access_token[0]
if refresh_token := re.findall("refresh_token=(.*?)&", text):
self._refresh_token = refresh_token[0]
if id_token := re.findall("id_token=(.*?)&", text):
self._id_token = id_token[0]
return True return True
async def authorize(self): async def _api_auth(self):
if login_site := await self._load_login():
fw_uid, loaded, login_url = login_site
else:
return False
if not (url := await self._login(fw_uid, loaded, login_url)):
return False
if not await self._get_token(url):
return False
post_headers = {"id-token": self._id_token} post_headers = {"id-token": self._id_token}
data = self._device.get() data = self._device.get()
async with self._session.post( async with self._session.post(
@ -209,6 +226,20 @@ class HonAuth:
self._cognito_token = json_data["cognitoUser"]["Token"] self._cognito_token = json_data["cognitoUser"]["Token"]
return True return True
async def authenticate(self):
self.clear()
try:
if not (login_site := await self._load_login()):
raise exceptions.HonAuthenticationError("Can't open login page")
if not (url := await self._login(*login_site)):
raise exceptions.HonAuthenticationError("Can't login")
if not await self._get_token(url):
raise exceptions.HonAuthenticationError("Can't get token")
if not await self._api_auth():
raise exceptions.HonAuthenticationError("Can't get api token")
except exceptions.HonNoAuthenticationNeeded:
return
async def refresh(self): async def refresh(self):
params = { params = {
"client_id": const.CLIENT_ID, "client_id": const.CLIENT_ID,
@ -223,6 +254,15 @@ class HonAuth:
await self._error_logger(response, fail=False) await self._error_logger(response, fail=False)
return False return False
data = await response.json() data = await response.json()
self._expires = datetime.utcnow()
self._id_token = data["id_token"] self._id_token = data["id_token"]
self._access_token = data["access_token"] self._access_token = data["access_token"]
return True return await self._api_auth()
def clear(self):
self._session.cookie_jar.clear_domain(const.AUTH_API.split("/")[-2])
self._called_urls = []
self._cognito_token = ""
self._id_token = ""
self._access_token = ""
self._refresh_token = ""

View File

@ -1,41 +1,43 @@
import secrets import secrets
from typing import Dict
from pyhon import const from pyhon import const
class HonDevice: class HonDevice:
def __init__(self): def __init__(self) -> None:
self._app_version = const.APP_VERSION self._app_version: str = const.APP_VERSION
self._os_version = const.OS_VERSION self._os_version: int = const.OS_VERSION
self._os = const.OS self._os: str = const.OS
self._device_model = const.DEVICE_MODEL self._device_model: str = const.DEVICE_MODEL
self._mobile_id = secrets.token_hex(8) self._mobile_id: str = secrets.token_hex(8)
@property @property
def app_version(self): def app_version(self) -> str:
return self._app_version return self._app_version
@property @property
def os_version(self): def os_version(self) -> int:
return self._os_version return self._os_version
@property @property
def os(self): def os(self) -> str:
return self._os return self._os
@property @property
def device_model(self): def device_model(self) -> str:
return self._device_model return self._device_model
@property @property
def mobile_id(self): def mobile_id(self) -> str:
return self._mobile_id return self._mobile_id
def get(self): def get(self, mobile: bool = False) -> Dict:
return { result = {
"appVersion": self.app_version, "appVersion": self.app_version,
"mobileId": self.mobile_id, "mobileId": self.mobile_id,
"osVersion": self.os_version,
"os": self.os, "os": self.os,
"osVersion": self.os_version,
"deviceModel": self.device_model, "deviceModel": self.device_model,
} }
return (result | {"mobileOs": result.pop("os")}) if mobile else result

View File

@ -1,91 +1,117 @@
import json import json
from collections.abc import Generator, AsyncIterator, Coroutine
from contextlib import asynccontextmanager from contextlib import asynccontextmanager
from typing import Optional, Callable, Dict
from typing_extensions import Self
import aiohttp import aiohttp
from pyhon import const from pyhon import const, exceptions
from pyhon.connection.auth import HonAuth, _LOGGER from pyhon.connection.auth import HonAuth, _LOGGER
from pyhon.connection.device import HonDevice from pyhon.connection.device import HonDevice
from pyhon.exceptions import HonAuthenticationError from pyhon.exceptions import HonAuthenticationError
class HonBaseConnectionHandler: class HonBaseConnectionHandler:
_HEADERS = {"user-agent": const.USER_AGENT, "Content-Type": "application/json"} _HEADERS: Dict = {
"user-agent": const.USER_AGENT,
"Content-Type": "application/json",
}
def __init__(self, session=None): def __init__(self, session: Optional[aiohttp.ClientSession] = None) -> None:
self._session = session self._create_session: bool = session is None
self._auth = None self._session: Optional[aiohttp.ClientSession] = session
self._auth: Optional[HonAuth] = None
async def __aenter__(self): async def __aenter__(self) -> Self:
return await self.create() return await self.create()
async def __aexit__(self, exc_type, exc_val, exc_tb): async def __aexit__(self, exc_type, exc_val, exc_tb) -> None:
await self.close() await self.close()
async def create(self): @property
self._session = aiohttp.ClientSession(headers=self._HEADERS) def auth(self) -> Optional[HonAuth]:
return self._auth
async def create(self) -> Self:
if self._create_session:
self._session = aiohttp.ClientSession()
return self return self
@asynccontextmanager @asynccontextmanager
async def _intercept(self, method, *args, loop=0, **kwargs): def _intercept(self, method: Callable, *args, loop: int = 0, **kwargs):
raise NotImplementedError raise NotImplementedError
@asynccontextmanager @asynccontextmanager
async def get(self, *args, **kwargs): async def get(self, *args, **kwargs) -> AsyncIterator[Callable]:
if self._session is None:
raise exceptions.NoSessionException()
response: Callable
async with self._intercept(self._session.get, *args, **kwargs) as response: async with self._intercept(self._session.get, *args, **kwargs) as response:
yield response yield response
@asynccontextmanager @asynccontextmanager
async def post(self, *args, **kwargs): async def post(self, *args, **kwargs) -> AsyncIterator[Callable]:
if self._session is None:
raise exceptions.NoSessionException()
response: Callable
async with self._intercept(self._session.post, *args, **kwargs) as response: async with self._intercept(self._session.post, *args, **kwargs) as response:
yield response yield response
async def close(self): async def close(self) -> None:
if self._create_session and self._session is not None:
await self._session.close() await self._session.close()
class HonConnectionHandler(HonBaseConnectionHandler): class HonConnectionHandler(HonBaseConnectionHandler):
def __init__(self, email, password, session=None): def __init__(
self, email: str, password: str, session: Optional[aiohttp.ClientSession] = None
) -> None:
super().__init__(session=session) super().__init__(session=session)
self._device = HonDevice() self._device: HonDevice = HonDevice()
self._email = email self._email: str = email
self._password = password self._password: str = password
if not self._email: if not self._email:
raise HonAuthenticationError("An email address must be specified") raise HonAuthenticationError("An email address must be specified")
if not self._password: if not self._password:
raise HonAuthenticationError("A password address must be specified") raise HonAuthenticationError("A password address must be specified")
self._request_headers = {}
@property @property
def device(self): def device(self) -> HonDevice:
return self._device return self._device
async def create(self): async def create(self) -> Self:
await super().create() await super().create()
self._auth = HonAuth(self._session, self._email, self._password, self._device) self._auth: HonAuth = HonAuth(
self._session, self._email, self._password, self._device
)
return self return self
async def _check_headers(self, headers): async def _check_headers(self, headers: Dict) -> Dict:
if ( if not (self._auth.cognito_token and self._auth.id_token):
"cognito-token" not in self._request_headers await self._auth.authenticate()
or "id-token" not in self._request_headers headers["cognito-token"] = self._auth.cognito_token
): headers["id-token"] = self._auth.id_token
if await self._auth.authorize(): return self._HEADERS | headers
self._request_headers["cognito-token"] = self._auth.cognito_token
self._request_headers["id-token"] = self._auth.id_token
else:
raise HonAuthenticationError("Can't login")
return {h: v for h, v in self._request_headers.items() if h not in headers}
@asynccontextmanager @asynccontextmanager
async def _intercept(self, method, *args, loop=0, **kwargs): async def _intercept(
self, method: Callable, *args, loop: int = 0, **kwargs
) -> AsyncIterator:
kwargs["headers"] = await self._check_headers(kwargs.get("headers", {})) kwargs["headers"] = await self._check_headers(kwargs.get("headers", {}))
async with method(*args, **kwargs) as response: async with method(*args, **kwargs) as response:
if response.status == 403 and not loop: if (
self._auth.token_expires_soon or response.status in [401, 403]
) and loop == 0:
_LOGGER.info("Try refreshing token...") _LOGGER.info("Try refreshing token...")
await self._auth.refresh() await self._auth.refresh()
yield await self._intercept(method, *args, loop=loop + 1, **kwargs) async with self._intercept(
elif response.status == 403 and loop < 2: method, *args, loop=loop + 1, **kwargs
) as result:
yield result
elif (
self._auth.token_is_expired or response.status in [401, 403]
) and loop == 1:
_LOGGER.warning( _LOGGER.warning(
"%s - Error %s - %s", "%s - Error %s - %s",
response.request_info.url, response.request_info.url,
@ -93,7 +119,10 @@ class HonConnectionHandler(HonBaseConnectionHandler):
await response.text(), await response.text(),
) )
await self.create() await self.create()
yield await self._intercept(method, *args, loop=loop + 1, **kwargs) async with self._intercept(
method, *args, loop=loop + 1, **kwargs
) as result:
yield result
elif loop >= 2: elif loop >= 2:
_LOGGER.error( _LOGGER.error(
"%s - Error %s - %s", "%s - Error %s - %s",
@ -113,14 +142,16 @@ class HonConnectionHandler(HonBaseConnectionHandler):
response.status, response.status,
await response.text(), await response.text(),
) )
yield {} raise HonAuthenticationError("Decode Error")
class HonAnonymousConnectionHandler(HonBaseConnectionHandler): class HonAnonymousConnectionHandler(HonBaseConnectionHandler):
_HEADERS = HonBaseConnectionHandler._HEADERS | {"x-api-key": const.API_KEY} _HEADERS: Dict = HonBaseConnectionHandler._HEADERS | {"x-api-key": const.API_KEY}
@asynccontextmanager @asynccontextmanager
async def _intercept(self, method, *args, loop=0, **kwargs): async def _intercept(
self, method: Callable, *args, loop: int = 0, **kwargs
) -> AsyncIterator:
kwargs["headers"] = kwargs.pop("headers", {}) | self._HEADERS kwargs["headers"] = kwargs.pop("headers", {}) | self._HEADERS
async with method(*args, **kwargs) as response: async with method(*args, **kwargs) as response:
if response.status == 403: if response.status == 403:

View File

@ -1,2 +1,14 @@
class HonAuthenticationError(Exception): class HonAuthenticationError(Exception):
pass pass
class HonNoAuthenticationNeeded(Exception):
pass
class NoSessionException(Exception):
pass
class NoAuthenticationException(Exception):
pass

63
pyhon/helper.py Normal file
View File

@ -0,0 +1,63 @@
def key_print(data, key="", start=True):
result = ""
if isinstance(data, list):
for i, value in enumerate(data):
result += key_print(value, key=f"{key}.{i}", start=False)
elif isinstance(data, dict):
for k, value in sorted(data.items()):
result += key_print(value, key=k if start else f"{key}.{k}", start=False)
else:
result += f"{key}: {data}\n"
return result
# yaml.dump() would be done the same, but needs an additional dependency...
def pretty_print(data, key="", intend=0, is_list=False, whitespace=" "):
result = ""
if isinstance(data, list):
if key:
result += f"{whitespace * intend}{'- ' if is_list else ''}{key}:\n"
intend += 1
for i, value in enumerate(data):
result += pretty_print(
value, intend=intend, is_list=True, whitespace=whitespace
)
elif isinstance(data, dict):
if key:
result += f"{whitespace * intend}{'- ' if is_list else ''}{key}:\n"
intend += 1
for i, (key, value) in enumerate(sorted(data.items())):
if is_list and not i:
result += pretty_print(
value, key=key, intend=intend, is_list=True, whitespace=whitespace
)
elif is_list:
result += pretty_print(
value, key=key, intend=intend + 1, whitespace=whitespace
)
else:
result += pretty_print(
value, key=key, intend=intend, whitespace=whitespace
)
else:
result += f"{whitespace * intend}{'- ' if is_list else ''}{key}{': ' if key else ''}{data}\n"
return result
def create_command(commands, concat=False):
result = {}
for name, command in commands.items():
if not concat:
result[name] = {}
for parameter, data in command.parameters.items():
if data.typology == "enum":
value = data.values
elif data.typology == "range":
value = {"min": data.min, "max": data.max, "step": data.step}
else:
continue
if not concat:
result[name][parameter] = value
else:
result[f"{name}.{parameter}"] = value
return result

View File

@ -1,17 +1,20 @@
import asyncio import asyncio
from typing import List from typing import List, Optional
from typing_extensions import Self
from pyhon import HonAPI from aiohttp import ClientSession
from pyhon import HonAPI, exceptions
from pyhon.appliance import HonAppliance from pyhon.appliance import HonAppliance
class Hon: class Hon:
def __init__(self, email, password, session=None): def __init__(self, email: str, password: str, session: ClientSession | None = None):
self._email = email self._email: str = email
self._password = password self._password: str = password
self._session = session self._session: ClientSession | None = session
self._appliances = [] self._appliances: List[HonAppliance] = []
self._api = None self._api: Optional[HonAPI] = None
async def __aenter__(self): async def __aenter__(self):
return await self.create() return await self.create()
@ -19,7 +22,13 @@ class Hon:
async def __aexit__(self, exc_type, exc_val, exc_tb): async def __aexit__(self, exc_type, exc_val, exc_tb):
await self.close() await self.close()
async def create(self): @property
def api(self) -> HonAPI:
if self._api is None:
raise exceptions.NoAuthenticationException
return self._api
async def create(self) -> Self:
self._api = await HonAPI( self._api = await HonAPI(
self._email, self._password, session=self._session self._email, self._password, session=self._session
).create() ).create()

View File

@ -5,7 +5,7 @@ def str_to_float(string):
try: try:
return int(string) return int(string)
except ValueError: except ValueError:
return float(str(string.replace(",", "."))) return float(str(string).replace(",", "."))
class HonParameter: class HonParameter:

View File

@ -7,7 +7,7 @@ with open("README.md", "r") as f:
setup( setup(
name="pyhOn", name="pyhOn",
version="0.6.4", version="0.7.4",
author="Andre Basche", author="Andre Basche",
description="Control hOn devices with python", description="Control hOn devices with python",
long_description=long_description, long_description=long_description,