Compare commits

..

12 Commits

Author SHA1 Message Date
a1618bb18a Fix missing zone attribute 2023-04-15 22:25:34 +02:00
a957d7ac0f Fix error for zone devices 2023-04-15 21:58:20 +02:00
f54b7b2dbf Add mypy checks 2023-04-15 15:57:36 +02:00
b6ca12ebff Use dataclass for login data 2023-04-15 14:37:27 +02:00
4a0ee8569b Refactor, add hon auth handler 2023-04-15 14:22:04 +02:00
d52d622785 Add zone support 2023-04-15 04:16:50 +02:00
9643f66549 Bump version to v0.7.4 2023-04-15 00:31:45 +02:00
d26e33a055 Fix error in starting programs 2023-04-15 00:29:24 +02:00
0301427497 Remove coords from diagnose 2023-04-14 23:24:31 +02:00
272556586e Refresh token workaround because expires to fast 2023-04-14 23:15:27 +02:00
e82c14ec99 Add some type hints 2023-04-13 23:35:43 +02:00
970b94bfa7 Fix unclear session errors 2023-04-12 19:14:50 +02:00
18 changed files with 598 additions and 385 deletions

View File

@ -25,12 +25,15 @@ jobs:
run: | run: |
python -m pip install --upgrade pip python -m pip install --upgrade pip
python -m pip install -r requirements.txt python -m pip install -r requirements.txt
python -m pip install flake8 pylint black python -m pip install flake8 pylint black mypy
- name: Lint with flake8 - name: Lint with flake8
run: | run: |
# stop the build if there are Python syntax errors or undefined names # stop the build if there are Python syntax errors or undefined names
flake8 . --count --select=E9,F63,F7,F82 --show-source --statistics flake8 . --count --select=E9,F63,F7,F82 --show-source --statistics
flake8 . --count --exit-zero --max-complexity=10 --max-line-length=88 --statistics flake8 . --count --exit-zero --max-complexity=10 --max-line-length=88 --statistics
- name: Type check with mypy
run: |
mypy pyhon/
# - name: Analysing the code with pylint # - name: Analysing the code with pylint
# run: | # run: |
# pylint --max-line-length 88 $(git ls-files '*.py') # pylint --max-line-length 88 $(git ls-files '*.py')

1
.gitignore vendored
View File

@ -4,3 +4,4 @@ __pycache__/
dist/ dist/
**/*.egg-info/ **/*.egg-info/
test* test*
build/

View File

@ -6,7 +6,7 @@
[![PyPI - Python Version](https://img.shields.io/pypi/pyversions/pyhOn)](https://www.python.org/) [![PyPI - Python Version](https://img.shields.io/pypi/pyversions/pyhOn)](https://www.python.org/)
[![PyPI - License](https://img.shields.io/pypi/l/pyhOn)](https://github.com/Andre0512/pyhOn/blob/main/LICENSE) [![PyPI - License](https://img.shields.io/pypi/l/pyhOn)](https://github.com/Andre0512/pyhOn/blob/main/LICENSE)
[![PyPI - Downloads](https://img.shields.io/pypi/dm/pyhOn)](https://pypistats.org/packages/pyhon) [![PyPI - Downloads](https://img.shields.io/pypi/dm/pyhOn)](https://pypistats.org/packages/pyhon)
Control your Haier appliances with python! Control your Haier, Candy and Hoover appliances with python!
The idea behind this library is, to make the use of all available commands as simple as possible. The idea behind this library is, to make the use of all available commands as simple as possible.
## Installation ## Installation
@ -100,8 +100,6 @@ This generates a huge output. It is recommended to pipe this into a file
$ pyhOn translate fr > hon_fr.yaml $ pyhOn translate fr > hon_fr.yaml
$ pyhOn translate en --json > hon_en.json $ pyhOn translate en --json > hon_en.json
``` ```
## Tested devices
- Haier Washing Machine HW90
## Usage example ## Usage example
This library is used for the custom [HomeAssistant Integration "Haier hOn"](https://github.com/Andre0512/hOn). This library is used for the custom [HomeAssistant Integration "Haier hOn"](https://github.com/Andre0512/hOn).

0
pyhon/__main__.py Normal file → Executable file
View File

View File

@ -1,22 +1,30 @@
import importlib import importlib
from contextlib import suppress from contextlib import suppress
from typing import Optional, Dict, Any
from typing import TYPE_CHECKING
from pyhon import helper from pyhon import helper
from pyhon.commands import HonCommand from pyhon.commands import HonCommand
from pyhon.parameter import HonParameterFixed from pyhon.parameter import HonParameterFixed
if TYPE_CHECKING:
from pyhon import HonAPI
class HonAppliance: class HonAppliance:
def __init__(self, api, info): def __init__(
self, api: Optional["HonAPI"], info: Dict[str, Any], zone: int = 0
) -> None:
if attributes := info.get("attributes"): if attributes := info.get("attributes"):
info["attributes"] = {v["parName"]: v["parValue"] for v in attributes} info["attributes"] = {v["parName"]: v["parValue"] for v in attributes}
self._info = info self._info: Dict = info
self._api = api self._api: Optional[HonAPI] = api
self._appliance_model = {} self._appliance_model: Dict = {}
self._commands = {} self._commands: Dict = {}
self._statistics = {} self._statistics: Dict = {}
self._attributes = {} self._attributes: Dict = {}
self._zone: int = zone
try: try:
self._extra = importlib.import_module( self._extra = importlib.import_module(
@ -26,20 +34,21 @@ class HonAppliance:
self._extra = None self._extra = None
def __getitem__(self, item): def __getitem__(self, item):
if self._zone:
item += f"Z{self._zone}"
if "." in item: if "." in item:
result = self.data result = self.data
for key in item.split("."): for key in item.split("."):
if all([k in "0123456789" for k in key]) and type(result) is list: if all(k in "0123456789" for k in key) and isinstance(result, list):
result = result[int(key)] result = result[int(key)]
else: else:
result = result[key] result = result[key]
return result return result
else: if item in self.data:
if item in self.data: return self.data[item]
return self.data[item] if item in self.attributes["parameters"]:
if item in self.attributes["parameters"]: return self.attributes["parameters"].get(item)
return self.attributes["parameters"].get(item) return self.info[item]
return self.info[item]
def get(self, item, default=None): def get(self, item, default=None):
try: try:
@ -47,25 +56,35 @@ class HonAppliance:
except (KeyError, IndexError): except (KeyError, IndexError):
return default return default
@property def _check_name_zone(self, name: str, frontend: bool = True) -> str:
def appliance_model_id(self): middle = " Z" if frontend else "_z"
return self._info.get("applianceModelId") if (attribute := self._info.get(name, "")) and self._zone:
return f"{attribute}{middle}{self._zone}"
return attribute
@property @property
def appliance_type(self): def appliance_model_id(self) -> str:
return self._info.get("applianceTypeName") return self._info.get("applianceModelId", "")
@property @property
def mac_address(self): def appliance_type(self) -> str:
return self._info.get("macAddress") return self._info.get("applianceTypeName", "")
@property @property
def model_name(self): def mac_address(self) -> str:
return self._info.get("modelName") return self.info.get("macAddress", "")
@property @property
def nick_name(self): def unique_id(self) -> str:
return self._info.get("nickName") return self._check_name_zone("macAddress", frontend=False)
@property
def model_name(self) -> str:
return self._check_name_zone("modelName")
@property
def nick_name(self) -> str:
return self._check_name_zone("nickName")
@property @property
def commands_options(self): def commands_options(self):
@ -87,6 +106,10 @@ class HonAppliance:
def info(self): def info(self):
return self._info return self._info
@property
def zone(self) -> int:
return self._zone
async def _recover_last_command_states(self, commands): async def _recover_last_command_states(self, commands):
command_history = await self._api.command_history(self) command_history = await self._api.command_history(self)
for name, command in commands.items(): for name, command in commands.items():
@ -177,7 +200,7 @@ class HonAppliance:
@property @property
def diagnose(self): def diagnose(self):
data = self.data.copy() data = self.data.copy()
for sensible in ["PK", "SK", "serialNumber", "code"]: for sensible in ["PK", "SK", "serialNumber", "code", "coords"]:
data["appliance"].pop(sensible, None) data["appliance"].pop(sensible, None)
result = helper.pretty_print({"data": self.data}, whitespace="\u200B \u200B ") result = helper.pretty_print({"data": self.data}, whitespace="\u200B \u200B ")
result += helper.pretty_print( result += helper.pretty_print(

View File

@ -25,6 +25,8 @@ class HonCommand:
def _create_parameters(self, parameters): def _create_parameters(self, parameters):
result = {} result = {}
for parameter, attributes in parameters.items(): for parameter, attributes in parameters.items():
if parameter == "zoneMap" and self._device.zone:
attributes["default"] = self._device.zone
match attributes.get("typology"): match attributes.get("typology"):
case "range": case "range":
result[parameter] = HonParameterRange(parameter, attributes) result[parameter] = HonParameterRange(parameter, attributes)

View File

@ -1,46 +1,76 @@
import json import json
import logging import logging
from datetime import datetime from datetime import datetime
from typing import Dict, Optional
from typing_extensions import Self
from pyhon import const from aiohttp import ClientSession
from pyhon import const, exceptions
from pyhon.appliance import HonAppliance from pyhon.appliance import HonAppliance
from pyhon.connection.handler import HonConnectionHandler, HonAnonymousConnectionHandler from pyhon.connection.auth import HonAuth
from pyhon.connection.handler.hon import HonConnectionHandler
from pyhon.connection.handler.anonym import HonAnonymousConnectionHandler
_LOGGER = logging.getLogger() _LOGGER = logging.getLogger()
class HonAPI: class HonAPI:
def __init__(self, email="", password="", anonymous=False, session=None) -> None: def __init__(
self,
email: str = "",
password: str = "",
anonymous: bool = False,
session: Optional[ClientSession] = None,
) -> None:
super().__init__() super().__init__()
self._email = email self._email: str = email
self._password = password self._password: str = password
self._anonymous = anonymous self._anonymous: bool = anonymous
self._hon = None self._hon_handler: Optional[HonConnectionHandler] = None
self._hon_anonymous = None self._hon_anonymous_handler: Optional[HonAnonymousConnectionHandler] = None
self._session = session self._session: Optional[ClientSession] = session
async def __aenter__(self): async def __aenter__(self) -> Self:
return await self.create() return await self.create()
async def __aexit__(self, exc_type, exc_val, exc_tb): async def __aexit__(self, exc_type, exc_val, exc_tb) -> None:
await self.close() await self.close()
async def create(self): @property
self._hon_anonymous = await HonAnonymousConnectionHandler( def auth(self) -> HonAuth:
if self._hon is None or self._hon.auth is None:
raise exceptions.NoAuthenticationException
return self._hon.auth
@property
def _hon(self):
if self._hon_handler is None:
raise exceptions.NoAuthenticationException
return self._hon_handler
@property
def _hon_anonymous(self):
if self._hon_anonymous_handler is None:
raise exceptions.NoAuthenticationException
return self._hon_anonymous_handler
async def create(self) -> Self:
self._hon_anonymous_handler = await HonAnonymousConnectionHandler(
self._session self._session
).create() ).create()
if not self._anonymous: if not self._anonymous:
self._hon = await HonConnectionHandler( self._hon_handler = await HonConnectionHandler(
self._email, self._password, self._session self._email, self._password, self._session
).create() ).create()
return self return self
async def load_appliances(self): async def load_appliances(self) -> Dict:
async with self._hon.get(f"{const.API_URL}/commands/v1/appliance") as resp: async with self._hon.get(f"{const.API_URL}/commands/v1/appliance") as resp:
return await resp.json() return await resp.json()
async def load_commands(self, appliance: HonAppliance): async def load_commands(self, appliance: HonAppliance) -> Dict:
params = { params: Dict = {
"applianceType": appliance.appliance_type, "applianceType": appliance.appliance_type,
"code": appliance.info["code"], "code": appliance.info["code"],
"applianceModelId": appliance.appliance_model_id, "applianceModelId": appliance.appliance_model_id,
@ -51,58 +81,66 @@ class HonAPI:
"appVersion": const.APP_VERSION, "appVersion": const.APP_VERSION,
"series": appliance.info["series"], "series": appliance.info["series"],
} }
url = f"{const.API_URL}/commands/v1/retrieve" url: str = f"{const.API_URL}/commands/v1/retrieve"
async with self._hon.get(url, params=params) as response: async with self._hon.get(url, params=params) as response:
result = (await response.json()).get("payload", {}) result: Dict = (await response.json()).get("payload", {})
if not result or result.pop("resultCode") != "0": if not result or result.pop("resultCode") != "0":
return {} return {}
return result return result
async def command_history(self, appliance: HonAppliance): async def command_history(self, appliance: HonAppliance) -> Dict:
url = f"{const.API_URL}/commands/v1/appliance/{appliance.mac_address}/history" url: str = (
f"{const.API_URL}/commands/v1/appliance/{appliance.mac_address}/history"
)
async with self._hon.get(url) as response: async with self._hon.get(url) as response:
result = await response.json() result: Dict = await response.json()
if not result or not result.get("payload"): if not result or not result.get("payload"):
return {} return {}
return result["payload"]["history"] return result["payload"]["history"]
async def last_activity(self, appliance: HonAppliance): async def last_activity(self, appliance: HonAppliance) -> Dict:
url = f"{const.API_URL}/commands/v1/retrieve-last-activity" url: str = f"{const.API_URL}/commands/v1/retrieve-last-activity"
params = {"macAddress": appliance.mac_address} params: Dict = {"macAddress": appliance.mac_address}
async with self._hon.get(url, params=params) as response: async with self._hon.get(url, params=params) as response:
result = await response.json() result: Dict = await response.json()
if result and (activity := result.get("attributes")): if result and (activity := result.get("attributes")):
return activity return activity
return {} return {}
async def load_attributes(self, appliance: HonAppliance): async def load_attributes(self, appliance: HonAppliance) -> Dict:
params = { params: Dict = {
"macAddress": appliance.mac_address, "macAddress": appliance.mac_address,
"applianceType": appliance.appliance_type, "applianceType": appliance.appliance_type,
"category": "CYCLE", "category": "CYCLE",
} }
url = f"{const.API_URL}/commands/v1/context" url: str = f"{const.API_URL}/commands/v1/context"
async with self._hon.get(url, params=params) as response: async with self._hon.get(url, params=params) as response:
return (await response.json()).get("payload", {}) return (await response.json()).get("payload", {})
async def load_statistics(self, appliance: HonAppliance): async def load_statistics(self, appliance: HonAppliance) -> Dict:
params = { params: Dict = {
"macAddress": appliance.mac_address, "macAddress": appliance.mac_address,
"applianceType": appliance.appliance_type, "applianceType": appliance.appliance_type,
} }
url = f"{const.API_URL}/commands/v1/statistics" url: str = f"{const.API_URL}/commands/v1/statistics"
async with self._hon.get(url, params=params) as response: async with self._hon.get(url, params=params) as response:
return (await response.json()).get("payload", {}) return (await response.json()).get("payload", {})
async def send_command(self, appliance, command, parameters, ancillary_parameters): async def send_command(
now = datetime.utcnow().isoformat() self,
data = { appliance: HonAppliance,
command: str,
parameters: Dict,
ancillary_parameters: Dict,
) -> bool:
now: str = datetime.utcnow().isoformat()
data: Dict = {
"macAddress": appliance.mac_address, "macAddress": appliance.mac_address,
"timestamp": f"{now[:-3]}Z", "timestamp": f"{now[:-3]}Z",
"commandName": command, "commandName": command,
"transactionId": f"{appliance.mac_address}_{now[:-3]}Z", "transactionId": f"{appliance.mac_address}_{now[:-3]}Z",
"applianceOptions": appliance.commands_options, "applianceOptions": appliance.commands_options,
"appliance": self._hon.device.get(), "device": self._hon.device.get(mobile=True),
"attributes": { "attributes": {
"channel": "mobileApp", "channel": "mobileApp",
"origin": "standardProgram", "origin": "standardProgram",
@ -112,36 +150,37 @@ class HonAPI:
"parameters": parameters, "parameters": parameters,
"applianceType": appliance.appliance_type, "applianceType": appliance.appliance_type,
} }
url = f"{const.API_URL}/commands/v1/send" url: str = f"{const.API_URL}/commands/v1/send"
async with self._hon.post(url, json=data) as resp: async with self._hon.post(url, json=data) as response:
json_data = await resp.json() json_data: Dict = await response.json()
if json_data.get("payload", {}).get("resultCode") == "0": if json_data.get("payload", {}).get("resultCode") == "0":
return True return True
_LOGGER.error(await response.text())
return False return False
async def appliance_configuration(self): async def appliance_configuration(self) -> Dict:
url = f"{const.API_URL}/config/v1/appliance-configuration" url: str = f"{const.API_URL}/config/v1/appliance-configuration"
async with self._hon_anonymous.get(url) as response: async with self._hon_anonymous.get(url) as response:
result = await response.json() result: Dict = await response.json()
if result and (data := result.get("payload")): if result and (data := result.get("payload")):
return data return data
return {} return {}
async def app_config(self, language="en", beta=True): async def app_config(self, language: str = "en", beta: bool = True) -> Dict:
url = f"{const.API_URL}/app-config" url: str = f"{const.API_URL}/app-config"
payload = { payload_data: Dict = {
"languageCode": language, "languageCode": language,
"beta": beta, "beta": beta,
"appVersion": const.APP_VERSION, "appVersion": const.APP_VERSION,
"os": const.OS, "os": const.OS,
} }
payload = json.dumps(payload, separators=(",", ":")) payload: str = json.dumps(payload_data, separators=(",", ":"))
async with self._hon_anonymous.post(url, data=payload) as response: async with self._hon_anonymous.post(url, data=payload) as response:
if (result := await response.json()) and (data := result.get("payload")): if (result := await response.json()) and (data := result.get("payload")):
return data return data
return {} return {}
async def translation_keys(self, language="en"): async def translation_keys(self, language: str = "en") -> Dict:
config = await self.app_config(language=language) config = await self.app_config(language=language)
if url := config.get("language", {}).get("jsonPath"): if url := config.get("language", {}).get("jsonPath"):
async with self._hon_anonymous.get(url) as response: async with self._hon_anonymous.get(url) as response:
@ -149,8 +188,8 @@ class HonAPI:
return result return result
return {} return {}
async def close(self): async def close(self) -> None:
if self._hon: if self._hon_handler is not None:
await self._hon.close() await self._hon_handler.close()
if self._hon_anonymous: if self._hon_anonymous_handler is not None:
await self._hon_anonymous.close() await self._hon_anonymous_handler.close()

View File

@ -3,209 +3,227 @@ import logging
import re import re
import secrets import secrets
import urllib import urllib
from contextlib import suppress
from dataclasses import dataclass
from datetime import datetime, timedelta
from pprint import pformat from pprint import pformat
from typing import Dict, Optional, List
from urllib import parse from urllib import parse
from urllib.parse import quote from urllib.parse import quote
from aiohttp import ClientResponse
from yarl import URL from yarl import URL
from pyhon import const from pyhon import const, exceptions
from pyhon.exceptions import HonAuthenticationError from pyhon.connection.handler.auth import HonAuthConnectionHandler
_LOGGER = logging.getLogger(__name__) _LOGGER = logging.getLogger(__name__)
@dataclass
class HonLoginData:
url: str = ""
email: str = ""
password: str = ""
fw_uid: str = ""
loaded: Optional[Dict] = None
class HonAuth: class HonAuth:
_TOKEN_EXPIRES_AFTER_HOURS = 8
_TOKEN_EXPIRE_WARNING_HOURS = 7
def __init__(self, session, email, password, device) -> None: def __init__(self, session, email, password, device) -> None:
self._session = session self._session = session
self._email = email self._request = HonAuthConnectionHandler(session)
self._password = password self._login_data = HonLoginData()
self._login_data.email = email
self._login_data.password = password
self._access_token = "" self._access_token = ""
self._refresh_token = "" self._refresh_token = ""
self._cognito_token = "" self._cognito_token = ""
self._id_token = "" self._id_token = ""
self._device = device self._device = device
self._called_urls = [] self._expires: datetime = datetime.utcnow()
@property @property
def cognito_token(self): def cognito_token(self) -> str:
return self._cognito_token return self._cognito_token
@property @property
def id_token(self): def id_token(self) -> str:
return self._id_token return self._id_token
@property @property
def access_token(self): def access_token(self) -> str:
return self._access_token return self._access_token
@property @property
def refresh_token(self): def refresh_token(self) -> str:
return self._refresh_token return self._refresh_token
async def _error_logger(self, response, fail=True): def _check_token_expiration(self, hours: int) -> bool:
result = "hOn Authentication Error\n" return datetime.utcnow() >= self._expires + timedelta(hours=hours)
for i, (status, url) in enumerate(self._called_urls):
result += f" {i + 1: 2d} {status} - {url}\n"
result += f"ERROR - {response.status} - {response.request_info.url}\n"
result += f"{15 * '='} Response {15 * '='}\n{await response.text()}\n{40 * '='}"
_LOGGER.error(result)
if fail:
raise HonAuthenticationError("Can't login")
async def _load_login(self): @property
def token_is_expired(self) -> bool:
return self._check_token_expiration(self._TOKEN_EXPIRES_AFTER_HOURS)
@property
def token_expires_soon(self) -> bool:
return self._check_token_expiration(self._TOKEN_EXPIRE_WARNING_HOURS)
async def _error_logger(self, response: ClientResponse, fail: bool = True) -> None:
output = "hOn Authentication Error\n"
for i, (status, url) in enumerate(self._request.called_urls):
output += f" {i + 1: 2d} {status} - {url}\n"
output += f"ERROR - {response.status} - {response.request_info.url}\n"
output += f"{15 * '='} Response {15 * '='}\n{await response.text()}\n{40 * '='}"
_LOGGER.error(output)
if fail:
raise exceptions.HonAuthenticationError("Can't login")
@staticmethod
def _generate_nonce() -> str:
nonce = secrets.token_hex(16) nonce = secrets.token_hex(16)
nonce = f"{nonce[:8]}-{nonce[8:12]}-{nonce[12:16]}-{nonce[16:20]}-{nonce[20:]}" return f"{nonce[:8]}-{nonce[8:12]}-{nonce[12:16]}-{nonce[16:20]}-{nonce[20:]}"
async def _load_login(self) -> bool:
login_url = await self._introduce()
login_url = await self._handle_redirects(login_url)
return await self._login_url(login_url)
async def _introduce(self) -> str:
redirect_uri = urllib.parse.quote(f"{const.APP}://mobilesdk/detect/oauth/done")
params = { params = {
"response_type": "token+id_token", "response_type": "token+id_token",
"client_id": const.CLIENT_ID, "client_id": const.CLIENT_ID,
"redirect_uri": urllib.parse.quote( "redirect_uri": redirect_uri,
f"{const.APP}://mobilesdk/detect/oauth/done"
),
"display": "touch", "display": "touch",
"scope": "api openid refresh_token web", "scope": "api openid refresh_token web",
"nonce": nonce, "nonce": self._generate_nonce(),
} }
params = "&".join([f"{k}={v}" for k, v in params.items()]) params_encode = "&".join([f"{k}={v}" for k, v in params.items()])
async with self._session.get( url = f"{const.AUTH_API}/services/oauth2/authorize/expid_Login?{params_encode}"
f"{const.AUTH_API}/services/oauth2/authorize/expid_Login?{params}" async with self._request.get(url) as response:
) as response: text = await response.text()
self._called_urls.append((response.status, response.request_info.url)) self._expires = datetime.utcnow()
if not (login_url := re.findall("url = '(.+?)'", await response.text())): if not (login_url := re.findall("url = '(.+?)'", text)):
if "oauth/done#access_token=" in text:
self._parse_token_data(text)
raise exceptions.HonNoAuthenticationNeeded()
await self._error_logger(response) await self._error_logger(response)
return False return login_url[0]
async with self._session.get(login_url[0], allow_redirects=False) as redirect1:
self._called_urls.append((redirect1.status, redirect1.request_info.url)) async def _manual_redirect(self, url: str) -> str:
if not (url := redirect1.headers.get("Location")): async with self._request.get(url, allow_redirects=False) as response:
await self._error_logger(redirect1) if not (new_location := response.headers.get("Location", "")):
return False await self._error_logger(response)
async with self._session.get(url, allow_redirects=False) as redirect2: return new_location
self._called_urls.append((redirect2.status, redirect2.request_info.url))
if not ( async def _handle_redirects(self, login_url) -> str:
url := redirect2.headers.get("Location") redirect1 = await self._manual_redirect(login_url)
+ "&System=IoT_Mobile_App&RegistrationSubChannel=hOn" redirect2 = await self._manual_redirect(redirect1)
): return f"{redirect2}&System=IoT_Mobile_App&RegistrationSubChannel=hOn"
await self._error_logger(redirect2)
return False async def _login_url(self, login_url: str) -> bool:
async with self._session.get( headers = {"user-agent": const.USER_AGENT}
URL(url, encoded=True), headers={"user-agent": const.USER_AGENT} url = URL(login_url, encoded=True)
) as login_screen: async with self._request.get(url, headers=headers) as response:
self._called_urls.append( text = await response.text()
(login_screen.status, login_screen.request_info.url) if context := re.findall('"fwuid":"(.*?)","loaded":(\\{.*?})', text):
)
if context := re.findall(
'"fwuid":"(.*?)","loaded":(\\{.*?})', await login_screen.text()
):
fw_uid, loaded_str = context[0] fw_uid, loaded_str = context[0]
loaded = json.loads(loaded_str) self._login_data.fw_uid = fw_uid
login_url = login_url[0].replace( self._login_data.loaded = json.loads(loaded_str)
self._login_data.url = login_url.replace(
"/".join(const.AUTH_API.split("/")[:-1]), "" "/".join(const.AUTH_API.split("/")[:-1]), ""
) )
return fw_uid, loaded, login_url return True
await self._error_logger(login_screen) await self._error_logger(response)
return False return False
async def _login(self, fw_uid, loaded, login_url): async def _login(self) -> str:
data = { start_url = self._login_data.url.rsplit("startURL=", maxsplit=1)[-1]
"message": { start_url = parse.unquote(start_url).split("%3D")[0]
"actions": [ action = {
{ "id": "79;a",
"id": "79;a", "descriptor": "apex://LightningLoginCustomController/ACTION$login",
"descriptor": "apex://LightningLoginCustomController/ACTION$login", "callingDescriptor": "markup://c:loginForm",
"callingDescriptor": "markup://c:loginForm", "params": {
"params": { "username": quote(self._login_data.email),
"username": quote(self._email), "password": quote(self._login_data.password),
"password": quote(self._password), "startUrl": start_url,
"startUrl": parse.unquote(
login_url.split("startURL=")[-1]
).split("%3D")[0],
},
}
]
}, },
}
data = {
"message": {"actions": [action]},
"aura.context": { "aura.context": {
"mode": "PROD", "mode": "PROD",
"fwuid": fw_uid, "fwuid": self._login_data.fw_uid,
"app": "siteforce:loginApp2", "app": "siteforce:loginApp2",
"loaded": loaded, "loaded": self._login_data.loaded,
"dn": [], "dn": [],
"globals": {}, "globals": {},
"uad": False, "uad": False,
}, },
"aura.pageURI": login_url, "aura.pageURI": self._login_data.url,
"aura.token": None, "aura.token": None,
} }
params = {"r": 3, "other.LightningLoginCustom.login": 1} params = {"r": 3, "other.LightningLoginCustom.login": 1}
async with self._session.post( async with self._request.post(
const.AUTH_API + "/s/sfsites/aura", const.AUTH_API + "/s/sfsites/aura",
headers={"Content-Type": "application/x-www-form-urlencoded"}, headers={"Content-Type": "application/x-www-form-urlencoded"},
data="&".join(f"{k}={json.dumps(v)}" for k, v in data.items()), data="&".join(f"{k}={json.dumps(v)}" for k, v in data.items()),
params=params, params=params,
) as response: ) as response:
self._called_urls.append((response.status, response.request_info.url))
if response.status == 200: if response.status == 200:
try: with suppress(json.JSONDecodeError, KeyError):
data = await response.json() result = await response.json()
return data["events"][0]["attributes"]["values"]["url"] return result["events"][0]["attributes"]["values"]["url"]
except json.JSONDecodeError:
pass
except KeyError:
_LOGGER.error(
"Can't get login url - %s", pformat(await response.json())
)
await self._error_logger(response) await self._error_logger(response)
return "" return ""
async def _get_token(self, url): def _parse_token_data(self, text: str) -> None:
async with self._session.get(url) as response:
self._called_urls.append((response.status, response.request_info.url))
if response.status != 200:
await self._error_logger(response)
return False
url = re.findall("href\\s*=\\s*[\"'](.+?)[\"']", await response.text())
if not url:
await self._error_logger(response)
return False
if "ProgressiveLogin" in url[0]:
async with self._session.get(url[0]) as response:
self._called_urls.append((response.status, response.request_info.url))
if response.status != 200:
await self._error_logger(response)
return False
url = re.findall("href\\s*=\\s*[\"'](.*?)[\"']", await response.text())
url = "/".join(const.AUTH_API.split("/")[:-1]) + url[0]
async with self._session.get(url) as response:
self._called_urls.append((response.status, response.request_info.url))
if response.status != 200:
await self._error_logger(response)
return False
text = await response.text()
if access_token := re.findall("access_token=(.*?)&", text): if access_token := re.findall("access_token=(.*?)&", text):
self._access_token = access_token[0] self._access_token = access_token[0]
if refresh_token := re.findall("refresh_token=(.*?)&", text): if refresh_token := re.findall("refresh_token=(.*?)&", text):
self._refresh_token = refresh_token[0] self._refresh_token = refresh_token[0]
if id_token := re.findall("id_token=(.*?)&", text): if id_token := re.findall("id_token=(.*?)&", text):
self._id_token = id_token[0] self._id_token = id_token[0]
async def _get_token(self, url: str) -> bool:
async with self._request.get(url) as response:
if response.status != 200:
await self._error_logger(response)
return False
url_search = re.findall(
"href\\s*=\\s*[\"'](.+?)[\"']", await response.text()
)
if not url_search:
await self._error_logger(response)
return False
if "ProgressiveLogin" in url_search[0]:
async with self._request.get(url_search[0]) as response:
if response.status != 200:
await self._error_logger(response)
return False
url_search = re.findall(
"href\\s*=\\s*[\"'](.*?)[\"']", await response.text()
)
url = "/".join(const.AUTH_API.split("/")[:-1]) + url_search[0]
async with self._request.get(url) as response:
if response.status != 200:
await self._error_logger(response)
return False
self._parse_token_data(await response.text())
return True return True
async def authorize(self): async def _api_auth(self) -> bool:
if login_site := await self._load_login():
fw_uid, loaded, login_url = login_site
else:
return False
if not (url := await self._login(fw_uid, loaded, login_url)):
return False
if not await self._get_token(url):
return False
return await self._api_auth()
async def _api_auth(self):
post_headers = {"id-token": self._id_token} post_headers = {"id-token": self._id_token}
data = self._device.get() data = self._device.get()
async with self._session.post( async with self._request.post(
f"{const.API_URL}/auth/v1/login", headers=post_headers, json=data f"{const.API_URL}/auth/v1/login", headers=post_headers, json=data
) as response: ) as response:
self._called_urls.append((response.status, response.request_info.url))
try: try:
json_data = await response.json() json_data = await response.json()
except json.JSONDecodeError: except json.JSONDecodeError:
@ -214,20 +232,42 @@ class HonAuth:
self._cognito_token = json_data["cognitoUser"]["Token"] self._cognito_token = json_data["cognitoUser"]["Token"]
return True return True
async def refresh(self): async def authenticate(self) -> None:
self.clear()
try:
if not await self._load_login():
raise exceptions.HonAuthenticationError("Can't open login page")
if not (url := await self._login()):
raise exceptions.HonAuthenticationError("Can't login")
if not await self._get_token(url):
raise exceptions.HonAuthenticationError("Can't get token")
if not await self._api_auth():
raise exceptions.HonAuthenticationError("Can't get api token")
except exceptions.HonNoAuthenticationNeeded:
return
async def refresh(self) -> bool:
params = { params = {
"client_id": const.CLIENT_ID, "client_id": const.CLIENT_ID,
"refresh_token": self._refresh_token, "refresh_token": self._refresh_token,
"grant_type": "refresh_token", "grant_type": "refresh_token",
} }
async with self._session.post( async with self._request.post(
f"{const.AUTH_API}/services/oauth2/token", params=params f"{const.AUTH_API}/services/oauth2/token", params=params
) as response: ) as response:
self._called_urls.append((response.status, response.request_info.url))
if response.status >= 400: if response.status >= 400:
await self._error_logger(response, fail=False) await self._error_logger(response, fail=False)
return False return False
data = await response.json() data = await response.json()
self._expires = datetime.utcnow()
self._id_token = data["id_token"] self._id_token = data["id_token"]
self._access_token = data["access_token"] self._access_token = data["access_token"]
return await self._api_auth() return await self._api_auth()
def clear(self) -> None:
self._session.cookie_jar.clear_domain(const.AUTH_API.split("/")[-2])
self._request.called_urls = []
self._cognito_token = ""
self._id_token = ""
self._access_token = ""
self._refresh_token = ""

View File

@ -1,41 +1,43 @@
import secrets import secrets
from typing import Dict
from pyhon import const from pyhon import const
class HonDevice: class HonDevice:
def __init__(self): def __init__(self) -> None:
self._app_version = const.APP_VERSION self._app_version: str = const.APP_VERSION
self._os_version = const.OS_VERSION self._os_version: int = const.OS_VERSION
self._os = const.OS self._os: str = const.OS
self._device_model = const.DEVICE_MODEL self._device_model: str = const.DEVICE_MODEL
self._mobile_id = secrets.token_hex(8) self._mobile_id: str = secrets.token_hex(8)
@property @property
def app_version(self): def app_version(self) -> str:
return self._app_version return self._app_version
@property @property
def os_version(self): def os_version(self) -> int:
return self._os_version return self._os_version
@property @property
def os(self): def os(self) -> str:
return self._os return self._os
@property @property
def device_model(self): def device_model(self) -> str:
return self._device_model return self._device_model
@property @property
def mobile_id(self): def mobile_id(self) -> str:
return self._mobile_id return self._mobile_id
def get(self): def get(self, mobile: bool = False) -> Dict:
return { result = {
"appVersion": self.app_version, "appVersion": self.app_version,
"mobileId": self.mobile_id, "mobileId": self.mobile_id,
"osVersion": self.os_version,
"os": self.os, "os": self.os,
"osVersion": self.os_version,
"deviceModel": self.device_model, "deviceModel": self.device_model,
} }
return (result | {"mobileOs": result.pop("os")}) if mobile else result

View File

@ -1,139 +0,0 @@
import json
from contextlib import asynccontextmanager
import aiohttp
from pyhon import const
from pyhon.connection.auth import HonAuth, _LOGGER
from pyhon.connection.device import HonDevice
from pyhon.exceptions import HonAuthenticationError
class HonBaseConnectionHandler:
_HEADERS = {"user-agent": const.USER_AGENT, "Content-Type": "application/json"}
def __init__(self, session=None):
self._create_session = session is None
self._session = session
self._auth = None
async def __aenter__(self):
return await self.create()
async def __aexit__(self, exc_type, exc_val, exc_tb):
await self.close()
async def create(self):
if self._create_session:
self._session = aiohttp.ClientSession()
return self
@asynccontextmanager
async def _intercept(self, method, *args, loop=0, **kwargs):
raise NotImplementedError
@asynccontextmanager
async def get(self, *args, **kwargs):
async with self._intercept(self._session.get, *args, **kwargs) as response:
yield response
@asynccontextmanager
async def post(self, *args, **kwargs):
async with self._intercept(self._session.post, *args, **kwargs) as response:
yield response
async def close(self):
if self._create_session:
await self._session.close()
class HonConnectionHandler(HonBaseConnectionHandler):
def __init__(self, email, password, session=None):
super().__init__(session=session)
self._device = HonDevice()
self._email = email
self._password = password
if not self._email:
raise HonAuthenticationError("An email address must be specified")
if not self._password:
raise HonAuthenticationError("A password address must be specified")
self._request_headers = {}
@property
def device(self):
return self._device
async def create(self):
await super().create()
self._auth = HonAuth(self._session, self._email, self._password, self._device)
return self
async def _check_headers(self, headers):
if (
"cognito-token" not in self._request_headers
or "id-token" not in self._request_headers
):
if await self._auth.authorize():
self._request_headers["cognito-token"] = self._auth.cognito_token
self._request_headers["id-token"] = self._auth.id_token
else:
raise HonAuthenticationError("Can't login")
return self._HEADERS | headers | self._request_headers
@asynccontextmanager
async def _intercept(self, method, *args, loop=0, **kwargs):
kwargs["headers"] = await self._check_headers(kwargs.get("headers", {}))
async with method(*args, **kwargs) as response:
if response.status in [401, 403] and loop == 0:
_LOGGER.info("Try refreshing token...")
await self._auth.refresh()
async with self._intercept(
method, *args, loop=loop + 1, **kwargs
) as result:
yield result
elif response.status in [401, 403] and loop == 1:
_LOGGER.warning(
"%s - Error %s - %s",
response.request_info.url,
response.status,
await response.text(),
)
self._request_headers = {}
self._session.cookie_jar.clear_domain(const.AUTH_API.split("/")[-2])
await self.create()
async with self._intercept(
method, *args, loop=loop + 1, **kwargs
) as result:
yield result
elif loop >= 2:
_LOGGER.error(
"%s - Error %s - %s",
response.request_info.url,
response.status,
await response.text(),
)
raise HonAuthenticationError("Login failure")
else:
try:
await response.json()
yield response
except json.JSONDecodeError:
_LOGGER.warning(
"%s - JsonDecodeError %s - %s",
response.request_info.url,
response.status,
await response.text(),
)
yield {}
class HonAnonymousConnectionHandler(HonBaseConnectionHandler):
_HEADERS = HonBaseConnectionHandler._HEADERS | {"x-api-key": const.API_KEY}
@asynccontextmanager
async def _intercept(self, method, *args, loop=0, **kwargs):
kwargs["headers"] = kwargs.pop("headers", {}) | self._HEADERS
async with method(*args, **kwargs) as response:
if response.status == 403:
_LOGGER.error("Can't authenticate anymore")
yield response

View File

View File

@ -0,0 +1,21 @@
import logging
from collections.abc import AsyncIterator
from contextlib import asynccontextmanager
from typing import Callable, Dict
from pyhon import const
from pyhon.connection.handler.base import ConnectionHandler
_LOGGER = logging.getLogger(__name__)
class HonAnonymousConnectionHandler(ConnectionHandler):
_HEADERS: Dict = ConnectionHandler._HEADERS | {"x-api-key": const.API_KEY}
@asynccontextmanager
async def _intercept(self, method: Callable, *args, **kwargs) -> AsyncIterator:
kwargs["headers"] = kwargs.pop("headers", {}) | self._HEADERS
async with method(*args, **kwargs) as response:
if response.status == 403:
_LOGGER.error("Can't authenticate anymore")
yield response

View File

@ -0,0 +1,36 @@
import logging
from collections.abc import AsyncIterator
from contextlib import asynccontextmanager
from typing import Optional, Callable, List, Tuple
import aiohttp
from pyhon import const
from pyhon.connection.handler.base import ConnectionHandler
_LOGGER = logging.getLogger(__name__)
class HonAuthConnectionHandler(ConnectionHandler):
_HEADERS = {"user-agent": const.USER_AGENT}
def __init__(self, session: Optional[aiohttp.ClientSession] = None) -> None:
super().__init__(session)
self._called_urls: List[Tuple[int, str]] = []
@property
def called_urls(self) -> List[Tuple[int, str]]:
return self._called_urls
@called_urls.setter
def called_urls(self, called_urls: List[Tuple[int, str]]) -> None:
self._called_urls = called_urls
@asynccontextmanager
async def _intercept(
self, method: Callable, *args, loop: int = 0, **kwargs
) -> AsyncIterator:
kwargs["headers"] = kwargs.pop("headers", {}) | self._HEADERS
async with method(*args, **kwargs) as response:
self._called_urls.append((response.status, response.request_info.url))
yield response

View File

@ -0,0 +1,57 @@
import logging
from collections.abc import AsyncIterator
from contextlib import asynccontextmanager
from typing import Optional, Callable, Dict, Any
import aiohttp
from typing_extensions import Self
from pyhon import const, exceptions
_LOGGER = logging.getLogger(__name__)
class ConnectionHandler:
_HEADERS: Dict = {
"user-agent": const.USER_AGENT,
"Content-Type": "application/json",
}
def __init__(self, session: Optional[aiohttp.ClientSession] = None) -> None:
self._create_session: bool = session is None
self._session: Optional[aiohttp.ClientSession] = session
async def __aenter__(self) -> Self:
return await self.create()
async def __aexit__(self, exc_type, exc_val, exc_tb) -> None:
await self.close()
async def create(self) -> Self:
if self._create_session:
self._session = aiohttp.ClientSession()
return self
@asynccontextmanager
def _intercept(self, method: Callable, *args, loop: int = 0, **kwargs):
raise NotImplementedError
@asynccontextmanager
async def get(self, *args, **kwargs) -> AsyncIterator[aiohttp.ClientResponse]:
if self._session is None:
raise exceptions.NoSessionException()
response: aiohttp.ClientResponse
async with self._intercept(self._session.get, *args, **kwargs) as response:
yield response
@asynccontextmanager
async def post(self, *args, **kwargs) -> AsyncIterator[aiohttp.ClientResponse]:
if self._session is None:
raise exceptions.NoSessionException()
response: aiohttp.ClientResponse
async with self._intercept(self._session.post, *args, **kwargs) as response:
yield response
async def close(self) -> None:
if self._create_session and self._session is not None:
await self._session.close()

View File

@ -0,0 +1,102 @@
import json
import logging
from collections.abc import AsyncIterator
from contextlib import asynccontextmanager
from typing import Optional, Callable, Dict
import aiohttp
from typing_extensions import Self
from pyhon.connection.auth import HonAuth
from pyhon.connection.device import HonDevice
from pyhon.connection.handler.base import ConnectionHandler
from pyhon.exceptions import HonAuthenticationError, NoAuthenticationException
_LOGGER = logging.getLogger(__name__)
class HonConnectionHandler(ConnectionHandler):
def __init__(
self, email: str, password: str, session: Optional[aiohttp.ClientSession] = None
) -> None:
super().__init__(session=session)
self._device: HonDevice = HonDevice()
self._email: str = email
self._password: str = password
if not self._email:
raise HonAuthenticationError("An email address must be specified")
if not self._password:
raise HonAuthenticationError("A password address must be specified")
self._auth: Optional[HonAuth] = None
@property
def auth(self) -> HonAuth:
if self._auth is None:
raise NoAuthenticationException()
return self._auth
@property
def device(self) -> HonDevice:
return self._device
async def create(self) -> Self:
await super().create()
self._auth = HonAuth(self._session, self._email, self._password, self._device)
return self
async def _check_headers(self, headers: Dict) -> Dict:
if not (self.auth.cognito_token and self.auth.id_token):
await self.auth.authenticate()
headers["cognito-token"] = self.auth.cognito_token
headers["id-token"] = self.auth.id_token
return self._HEADERS | headers
@asynccontextmanager
async def _intercept(
self, method: Callable, *args, loop: int = 0, **kwargs
) -> AsyncIterator:
kwargs["headers"] = await self._check_headers(kwargs.get("headers", {}))
async with method(*args, **kwargs) as response:
if (
self.auth.token_expires_soon or response.status in [401, 403]
) and loop == 0:
_LOGGER.info("Try refreshing token...")
await self.auth.refresh()
async with self._intercept(
method, *args, loop=loop + 1, **kwargs
) as result:
yield result
elif (
self.auth.token_is_expired or response.status in [401, 403]
) and loop == 1:
_LOGGER.warning(
"%s - Error %s - %s",
response.request_info.url,
response.status,
await response.text(),
)
await self.create()
async with self._intercept(
method, *args, loop=loop + 1, **kwargs
) as result:
yield result
elif loop >= 2:
_LOGGER.error(
"%s - Error %s - %s",
response.request_info.url,
response.status,
await response.text(),
)
raise HonAuthenticationError("Login failure")
else:
try:
await response.json()
yield response
except json.JSONDecodeError:
_LOGGER.warning(
"%s - JsonDecodeError %s - %s",
response.request_info.url,
response.status,
await response.text(),
)
raise HonAuthenticationError("Decode Error")

View File

@ -1,2 +1,14 @@
class HonAuthenticationError(Exception): class HonAuthenticationError(Exception):
pass pass
class HonNoAuthenticationNeeded(Exception):
pass
class NoSessionException(Exception):
pass
class NoAuthenticationException(Exception):
pass

View File

@ -1,17 +1,21 @@
import asyncio import asyncio
from typing import List import copy
from typing import List, Optional, Dict, Any
from typing_extensions import Self
from pyhon import HonAPI from aiohttp import ClientSession
from pyhon import HonAPI, exceptions
from pyhon.appliance import HonAppliance from pyhon.appliance import HonAppliance
class Hon: class Hon:
def __init__(self, email, password, session=None): def __init__(self, email: str, password: str, session: ClientSession | None = None):
self._email = email self._email: str = email
self._password = password self._password: str = password
self._session = session self._session: ClientSession | None = session
self._appliances = [] self._appliances: List[HonAppliance] = []
self._api = None self._api: Optional[HonAPI] = None
async def __aenter__(self): async def __aenter__(self):
return await self.create() return await self.create()
@ -19,7 +23,13 @@ class Hon:
async def __aexit__(self, exc_type, exc_val, exc_tb): async def __aexit__(self, exc_type, exc_val, exc_tb):
await self.close() await self.close()
async def create(self): @property
def api(self) -> HonAPI:
if self._api is None:
raise exceptions.NoAuthenticationException
return self._api
async def create(self) -> Self:
self._api = await HonAPI( self._api = await HonAPI(
self._email, self._password, session=self._session self._email, self._password, session=self._session
).create() ).create()
@ -30,19 +40,25 @@ class Hon:
def appliances(self) -> List[HonAppliance]: def appliances(self) -> List[HonAppliance]:
return self._appliances return self._appliances
async def _create_appliance(self, appliance_data: Dict[str, Any], zone=0) -> None:
appliance = HonAppliance(self._api, appliance_data, zone=zone)
if appliance.mac_address is None:
return
await asyncio.gather(
*[
appliance.load_attributes(),
appliance.load_commands(),
appliance.load_statistics(),
]
)
self._appliances.append(appliance)
async def setup(self): async def setup(self):
appliance: Dict
for appliance in (await self._api.load_appliances())["payload"]["appliances"]: for appliance in (await self._api.load_appliances())["payload"]["appliances"]:
appliance = HonAppliance(self._api, appliance) for zone in range(int(appliance.get("zone", "0"))):
if appliance.mac_address is None: await self._create_appliance(appliance.copy(), zone=zone + 1)
continue await self._create_appliance(appliance)
await asyncio.gather(
*[
appliance.load_attributes(),
appliance.load_commands(),
appliance.load_statistics(),
]
)
self._appliances.append(appliance)
async def close(self): async def close(self):
await self._api.close() await self._api.close()

View File

@ -7,7 +7,7 @@ with open("README.md", "r") as f:
setup( setup(
name="pyhOn", name="pyhOn",
version="0.7.2", version="0.8.0b4",
author="Andre Basche", author="Andre Basche",
description="Control hOn devices with python", description="Control hOn devices with python",
long_description=long_description, long_description=long_description,