Compare commits
No commits in common. "main" and "v0.7.0" have entirely different histories.
@ -13,7 +13,7 @@ jobs:
fail-fast: false
python-version: ["3.10", "3.11", "3.12"]
python-version: ["3.10", "3.11"]
- uses: actions/checkout@v3
@ -25,16 +25,15 @@ jobs:
run: |
python -m pip install --upgrade pip
python -m pip install -r requirements.txt
python -m pip install -r requirements_dev.txt
python -m pip install flake8 pylint black
- name: Lint with flake8
run: |
flake8 . --count --statistics
- name: Type check with mypy
run: |
mypy pyhon/
- name: Analysing the code with pylint
run: |
pylint $(git ls-files '*.py')
# stop the build if there are Python syntax errors or undefined names
flake8 . --count --select=E9,F63,F7,F82 --show-source --statistics
flake8 . --count --exit-zero --max-complexity=10 --max-line-length=88 --statistics
# - name: Analysing the code with pylint
# run: |
# pylint --max-line-length 88 $(git ls-files '*.py')
- name: Check black style
run: |
black . --check
@ -4,4 +4,3 @@ __pycache__/
@ -1,9 +0,0 @@
@ -1 +0,0 @@
include pyhon/py.typed
@ -1,14 +1,3 @@
# Announcement: I have to take the project down in the next few days
> Dear User,
> We are writing to inform you that we have discovered two Home Assistant integration plug-ins developed by you ( and ) that are in violation of our terms of service. Specifically, the plug-ins are using our services in an unauthorized manner which is causing significant economic harm to our Company.
> We take the protection of our intellectual property very seriously and demand that you immediately cease and desist all illegal activities related to the development and distribution of these plug-ins. We also request that you remove the plug-ins from all stores and code hosting platforms where they are currently available.
> Please be advised that we will take all necessary legal action to protect our interests if you fail to comply with this notice. We reserve the right to pursue all available remedies, including but not limited to monetary damages, injunctive relief, and attorney's fees.
> We strongly urge you to take immediate action to rectify this situation and avoid any further legal action. If you have any questions or concerns, please do not hesitate to contact us.
> Haier Europe Security and Governance Department
**This python package is unofficial and is not related in any way to Haier. It was developed by reversed engineered requests and can stop working at anytime!**
# pyhOn
@ -17,7 +6,7 @@
Control your Haier, Candy and Hoover appliances with python!
Control your Haier appliances with python!
The idea behind this library is, to make the use of all available commands as simple as possible.
## Installation
@ -111,6 +100,8 @@ This generates a huge output. It is recommended to pipe this into a file
$ pyhOn translate fr > hon_fr.yaml
$ pyhOn translate en --json > hon_en.json
## Tested devices
- Haier Washing Machine HW90
## Usage example
This library is used for the custom [HomeAssistant Integration "Haier hOn"](
@ -118,6 +109,3 @@ This library is used for the custom [HomeAssistant Integration "Haier hOn"](http
## Contribution
Any kind of contribution is welcome!
| Please add your appliances data to our [hon-test-data collection]( <br/>This helps us to develop new features and not to break compatibility in newer versions. |
@ -1,24 +0,0 @@
check_untyped_defs = true
disallow_any_generics = true
disallow_any_unimported = true
disallow_incomplete_defs = true
disallow_subclassing_any = true
disallow_untyped_calls = true
disallow_untyped_decorators = true
disallow_untyped_defs = true
disable_error_code = annotation-unchecked
enable_error_code = ignore-without-code, redundant-self, truthy-iterable
follow_imports = silent
local_partial_types = true
no_implicit_optional = true
no_implicit_reexport = true
show_error_codes = true
strict_concatenate = false
strict_equality = true
warn_incomplete_stub = true
warn_redundant_casts = true
warn_return_any = true
warn_unreachable = true
warn_unused_configs = true
warn_unused_ignores = true
Executable file → Normal file
Executable file → Normal file
@ -6,18 +6,16 @@ import logging
import sys
from getpass import getpass
from pathlib import Path
from typing import Tuple, Dict, Any
if __name__ == "__main__":
sys.path.insert(0, str(Path(__file__).parent.parent))
# pylint: disable=wrong-import-position
from pyhon import Hon, HonAPI, diagnose, printer
from pyhon import Hon, HonAPI, helper
_LOGGER = logging.getLogger(__name__)
def get_arguments() -> Dict[str, Any]:
def get_arguments():
"""Get parsed arguments."""
parser = argparse.ArgumentParser(description="pyhOn: Command Line Utility")
parser.add_argument("-u", "--user", help="user for haier hOn account")
@ -26,25 +24,17 @@ def get_arguments() -> Dict[str, Any]:
keys = subparser.add_parser("keys", help="print as key format")
keys.add_argument("keys", help="print as key format", action="store_true")
keys.add_argument("--all", help="print also full keys", action="store_true")
export = subparser.add_parser("export")
export.add_argument("export", help="export pyhon data", action="store_true")
export.add_argument("--zip", help="create zip archive", action="store_true")
export.add_argument("--anonymous", help="anonymize data", action="store_true")
export.add_argument("directory", nargs="?", default=Path().cwd())
translation = subparser.add_parser(
translate = subparser.add_parser(
"translate", help="print available translation keys"
"translate", help="language (de, en, fr...)", metavar="LANGUAGE"
translation.add_argument("--json", help="print as json", action="store_true")
"-i", "--import", help="import pyhon data", nargs="?", default=Path().cwd()
translate.add_argument("--json", help="print as json", action="store_true")
return vars(parser.parse_args())
async def translate(language: str, json_output: bool = False) -> None:
async def translate(language, json_output=False):
async with HonAPI(anonymous=True) as hon:
keys = await hon.translation_keys(language)
if json_output:
@ -57,55 +47,46 @@ async def translate(language: str, json_output: bool = False) -> None:
.replace("\\r", "")
keys = json.loads(clean_keys)
def get_login_data(args: Dict[str, str]) -> Tuple[str, str]:
async def main():
args = get_arguments()
if language := args.get("translate"):
await translate(language, json_output=args.get("json"))
if not (user := args["user"]):
user = input("User for hOn account: ")
if not (password := args["password"]):
password = getpass("Password for hOn account: ")
return user, password
async def main() -> None:
args = get_arguments()
if language := args.get("translate"):
await translate(language, json_output=args.get("json", ""))
async with Hon(
*get_login_data(args), test_data_path=Path(args.get("import", ""))
) as hon:
async with Hon(user, password) as hon:
for device in hon.appliances:
if args.get("export"):
anonymous = args.get("anonymous", False)
path = Path(args.get("directory", "."))
if not args.get("zip"):
for file in await diagnose.appliance_data(device, path, anonymous):
print(f"Created {file}")
archive = await diagnose.zip_archive(device, path, anonymous)
print(f"Created {archive}")
print("=" * 10, device.appliance_type, "-", device.nick_name, "=" * 10)
if args.get("keys"):
data =
attr = "get" if args.get("all") else "pop"
printer.key_print(getattr(data["attributes"], attr)("parameters"))
print(printer.key_print(getattr(data, attr)("appliance")))
printer.create_commands(device.commands, concat=True)
helper.create_command(device.commands, concat=True)
{"settings": helper.create_command(device.commands)}
def start() -> None:
def start():
except KeyboardInterrupt:
@ -1,316 +1,187 @@
import importlib
import logging
import re
from datetime import datetime, timedelta
from pathlib import Path
from typing import Optional, Dict, Any, TYPE_CHECKING, List, TypeVar, overload
from contextlib import suppress
from pyhon import diagnose, exceptions
from pyhon.appliances.base import ApplianceBase
from pyhon.attributes import HonAttribute
from pyhon.command_loader import HonCommandLoader
from pyhon import helper
from pyhon.commands import HonCommand
from pyhon.parameter.base import HonParameter
from pyhon.parameter.enum import HonParameterEnum
from pyhon.parameter.range import HonParameterRange
from pyhon.typedefs import Parameter
from pyhon import HonAPI
_LOGGER = logging.getLogger(__name__)
T = TypeVar("T")
from pyhon.parameter import HonParameterFixed
# pylint: disable=too-many-public-methods,too-many-instance-attributes
class HonAppliance:
def __init__(
self, api: Optional["HonAPI"], info: Dict[str, Any], zone: int = 0
) -> None:
def __init__(self, api, info):
if attributes := info.get("attributes"):
info["attributes"] = {v["parName"]: v["parValue"] for v in attributes}
self._info: Dict[str, Any] = info
self._api: Optional[HonAPI] = api
self._appliance_model: Dict[str, Any] = {}
self._info = info
self._api = api
self._appliance_model = {}
self._commands: Dict[str, HonCommand] = {}
self._statistics: Dict[str, Any] = {}
self._attributes: Dict[str, Any] = {}
self._zone: int = zone
self._additional_data: Dict[str, Any] = {}
self._last_update: Optional[datetime] = None
self._default_setting = HonParameter("", {}, "")
self._commands = {}
self._statistics = {}
self._attributes = {}
self._extra: Optional[ApplianceBase] = importlib.import_module(
self._extra = importlib.import_module(
except ModuleNotFoundError:
self._extra = None
def _get_nested_item(self, item: str) -> Any:
result: List[Any] | Dict[str, Any] =
for key in item.split("."):
if all(k in "0123456789" for k in key) and isinstance(result, list):
result = result[int(key)]
elif isinstance(result, dict):
result = result[key]
return result
def __getitem__(self, item: str) -> Any:
if self._zone:
item += f"Z{self._zone}"
def __getitem__(self, item):
if "." in item:
return self._get_nested_item(item)
if item in
if item in self.attributes["parameters"]:
return self.attributes["parameters"][item].value
result =
for key in item.split("."):
if all([k in "0123456789" for k in key]) and type(result) is list:
result = result[int(key)]
result = result[key]
return result
if item in
if item in self.attributes["parameters"]:
return self.attributes["parameters"].get(item)
def get(self, item: str, default: None = None) -> Any:
def get(self, item: str, default: T) -> T:
def get(self, item: str, default: Optional[T] = None) -> Any:
def get(self, item, default=None):
return self[item]
except (KeyError, IndexError):
return default
def _check_name_zone(self, name: str, frontend: bool = True) -> str:
zone = " Z" if frontend else "_z"
attribute: str = self._info.get(name, "")
if attribute and self._zone:
return f"{attribute}{zone}{self._zone}"
return attribute
def appliance_model_id(self):
return self._info.get("applianceModelId")
def appliance_model_id(self) -> str:
return str(self._info.get("applianceModelId", ""))
def appliance_type(self):
return self._info.get("applianceTypeName")
def appliance_type(self) -> str:
return str(self._info.get("applianceTypeName", ""))
def mac_address(self):
return self._info.get("macAddress")
def mac_address(self) -> str:
return str("macAddress", ""))
def model_name(self):
return self._info.get("modelName")
def unique_id(self) -> str:
default_mac = "xx-xx-xx-xx-xx-xx"
import_name = f"{self.appliance_type.lower()}_{self.appliance_model_id}"
result = self._check_name_zone("macAddress", frontend=False)
result = result.replace(default_mac, import_name)
return result
def nick_name(self):
return self._info.get("nickName")
def model_name(self) -> str:
return self._check_name_zone("modelName")
def commands_options(self):
return self._appliance_model.get("options")
def brand(self) -> str:
brand = self._check_name_zone("brand")
return brand[0].upper() + brand[1:]
def nick_name(self) -> str:
result = self._check_name_zone("nickName")
if not result or re.findall("^[xX1\\s-]+$", result):
return self.model_name
return result
def code(self) -> str:
code: str ="code", "")
if code:
return code
serial_number: str ="serialNumber", "")
return serial_number[:8] if len(serial_number) < 18 else serial_number[:11]
def model_id(self) -> int:
return int(self._info.get("applianceModelId", 0))
def options(self) -> Dict[str, Any]:
return dict(self._appliance_model.get("options", {}))
def commands(self) -> Dict[str, HonCommand]:
def commands(self):
return self._commands
def attributes(self) -> Dict[str, Any]:
def attributes(self):
return self._attributes
def statistics(self) -> Dict[str, Any]:
def statistics(self):
return self._statistics
def info(self) -> Dict[str, Any]:
def info(self):
return self._info
def additional_data(self) -> Dict[str, Any]:
return self._additional_data
async def _recover_last_command_states(self, commands):
command_history = await self._api.command_history(self)
for name, command in commands.items():
last = next(
for (index, d) in enumerate(command_history)
if d.get("command", {}).get("commandName") == name
if last is None:
parameters = command_history[last].get("command", {}).get("parameters", {})
if command._multi and parameters.get("program"):
command = self.commands[name]
for key, data in command.settings.items():
if (
not isinstance(data, HonParameterFixed)
and parameters.get(key) is not None
with suppress(ValueError):
data.value = parameters.get(key)
async def load_commands(self):
raw = await self._api.load_commands(self)
self._appliance_model = raw.pop("applianceModel")
for item in ["settings", "options", "dictionaryId"]:
commands = {}
for command, attr in raw.items():
if "parameters" in attr:
commands[command] = HonCommand(command, attr, self._api, self)
elif "parameters" in attr[list(attr)[0]]:
multi = {}
for program, attr2 in attr.items():
program = program.split(".")[-1].lower()
cmd = HonCommand(
command, attr2, self._api, self, multi=multi, program=program
multi[program] = cmd
commands[command] = cmd
self._commands = commands
await self._recover_last_command_states(commands)
def zone(self) -> int:
return self._zone
def api(self) -> "HonAPI":
"""api connection object"""
if self._api is None:
raise exceptions.NoAuthenticationException("Missing hOn login")
return self._api
async def load_commands(self) -> None:
command_loader = HonCommandLoader(self.api, self)
await command_loader.load_commands()
self._commands = command_loader.commands
self._additional_data = command_loader.additional_data
self._appliance_model = command_loader.appliance_data
async def load_attributes(self) -> None:
attributes = await self.api.load_attributes(self)
for name, values in attributes.pop("shadow", {}).get("parameters", {}).items():
if name in self._attributes.get("parameters", {}):
self._attributes.setdefault("parameters", {})[name] = HonAttribute(
self._attributes |= attributes
if self._extra:
self._attributes = self._extra.attributes(self._attributes)
async def load_statistics(self) -> None:
self._statistics = await self.api.load_statistics(self)
self._statistics |= await self.api.load_maintenance(self)
async def update(self, force: bool = False) -> None:
now =
min_age = now - timedelta(seconds=self._MINIMAL_UPDATE_INTERVAL)
if force or not self._last_update or self._last_update < min_age:
self._last_update = now
await self.load_attributes()
def command_parameters(self) -> Dict[str, Dict[str, str | float]]:
return {n: c.parameter_value for n, c in self._commands.items()}
def settings(self) -> Dict[str, Parameter]:
result: Dict[str, Parameter] = {}
def settings(self):
result = {}
for name, command in self._commands.items():
for key in command.setting_keys:
setting = command.settings.get(key, self._default_setting)
for key, setting in command.settings.items():
result[f"{name}.{key}"] = setting
if self._extra:
return self._extra.settings(result)
return result
def available_settings(self) -> List[str]:
result = []
def parameters(self):
result = {}
for name, command in self._commands.items():
for key in command.setting_keys:
for key, parameter in command.parameters.items():
result.setdefault(name, {})[key] = parameter.value
return result
async def load_attributes(self):
self._attributes = await self._api.load_attributes(self)
for name, values in self._attributes.pop("shadow").get("parameters").items():
self._attributes.setdefault("parameters", {})[name] = values["parNewVal"]
async def load_statistics(self):
self._statistics = await self._api.load_statistics(self)
async def update(self):
await self.load_attributes()
def data(self) -> Dict[str, Any]:
def data(self):
result = {
"attributes": self.attributes,
"statistics": self.statistics,
"additional_data": self._additional_data,
if self._extra:
return result
def diagnose(self) -> str:
return diagnose.yaml_export(self, anonymous=True)
async def data_archive(self, path: Path) -> str:
return await diagnose.zip_archive(self, path, anonymous=True)
def sync_command_to_params(self, command_name: str) -> None:
if not (command := self.commands.get(command_name)):
for key in self.attributes.get("parameters", {}):
if new := command.parameters.get(key):
str(new.intern_value), shield=True
def sync_params_to_command(self, command_name: str) -> None:
if not (command := self.commands.get(command_name)):
for key in command.setting_keys:
if (
new := self.attributes.get("parameters", {}).get(key)
) is None or new.value == "":
setting = command.settings[key]
if not isinstance(setting, HonParameterRange):
command.settings[key].value = str(new.value)
command.settings[key].value = float(new.value)
except ValueError as error:
||||"Can't set %s - %s", key, error)
def sync_command(
main: str,
target: Optional[List[str] | str] = None,
to_sync: Optional[List[str] | bool] = None,
) -> None:
base: Optional[HonCommand] = self.commands.get(main)
if not base:
for command, data in self.commands.items():
if command == main or target and command not in target:
for name, target_param in data.parameters.items():
if not (base_param := base.parameters.get(name)):
if to_sync and (
(isinstance(to_sync, list) and name not in to_sync)
or not base_param.mandatory
self.sync_parameter(base_param, target_param)
def sync_parameter(self, main: Parameter, target: Parameter) -> None:
if isinstance(main, HonParameterRange) and isinstance(
target, HonParameterRange
target.max = main.max
target.min = main.min
target.step = main.step
elif isinstance(target, HonParameterRange):
target.max = int(main.value)
target.min = int(main.value)
target.step = 1
elif isinstance(target, HonParameterEnum):
target.values = main.values
target.value = main.value
def diagnose(self):
data =
for sensible in ["PK", "SK", "serialNumber", "code"]:
data["appliance"].pop(sensible, None)
result = helper.pretty_print({"data":}, whitespace="\u200B \u200B ")
result += helper.pretty_print(
{"commands": helper.create_command(self.commands)},
whitespace="\u200B \u200B ",
return result.replace(self.mac_address, "12-34-56-78-90-ab")
@ -1,25 +0,0 @@
from typing import Dict, Any, TYPE_CHECKING
from pyhon.parameter.program import HonParameterProgram
from pyhon.appliance import HonAppliance
class ApplianceBase:
def __init__(self, appliance: "HonAppliance"):
self.parent = appliance
def attributes(self, data: Dict[str, Any]) -> Dict[str, Any]:
program_name = "No Program"
if program := int(str(data.get("parameters", {}).get("prCode", "0"))):
if start_cmd := self.parent.settings.get("startProgram.program"):
if isinstance(start_cmd, HonParameterProgram) and (
ids := start_cmd.ids
program_name = ids.get(program, program_name)
data["programName"] = program_name
return data
def settings(self, settings: Dict[str, Any]) -> Dict[str, Any]:
return settings
@ -1,13 +0,0 @@
# pylint: disable=duplicate-code
from typing import Any, Dict
from pyhon.appliances.base import ApplianceBase
class Appliance(ApplianceBase):
def attributes(self, data: Dict[str, Any]) -> Dict[str, Any]:
data = super().attributes(data)
if data.get("lastConnEvent", {}).get("category", "") == "DISCONNECTED":
data["parameters"]["machMode"].value = "0"
data["active"] = bool(data.get("activity"))
return data
@ -1,16 +1,23 @@
from typing import Any, Dict
from pyhon.appliances.base import ApplianceBase
from pyhon.parameter import HonParameterEnum
class Appliance(ApplianceBase):
def attributes(self, data: Dict[str, Any]) -> Dict[str, Any]:
data = super().attributes(data)
if data.get("lastConnEvent", {}).get("category", "") == "DISCONNECTED":
data["parameters"]["temp"].value = 0
data["parameters"]["onOffStatus"].value = 0
data["parameters"]["remoteCtrValid"].value = 0
data["parameters"]["remainingTimeMM"].value = 0
class Appliance:
"default": "^(?!iot_(?:recipe|guided))\\S+$",
"recipe": "iot_recipe_",
"guided": "iot_guided_",
data["active"] = data["parameters"]["onOffStatus"].value == 1
def __init__(self):
filters = list(self._FILTERS.values())
data = {"defaultValue": filters[0], "enumValues": filters}
self._program_filter = HonParameterEnum("program_filter", data)
def data(self, data):
return data
def settings(self, settings):
settings["program_filter"] = self._program_filter
value = self._FILTERS[self._program_filter.value]
settings["startProgram.program"].filter = value
return settings
@ -1,25 +0,0 @@
from typing import Dict, Any
from pyhon.appliances.base import ApplianceBase
class Appliance(ApplianceBase):
def attributes(self, data: Dict[str, Any]) -> Dict[str, Any]:
data = super().attributes(data)
if data["parameters"]["holidayMode"] == "1":
data["modeZ1"] = "holiday"
elif data["parameters"]["intelligenceMode"] == "1":
data["modeZ1"] = "auto_set"
elif data["parameters"].get("quickModeZ1") == "1":
data["modeZ1"] = "super_cool"
data["modeZ1"] = "no_mode"
if data["parameters"].get("quickModeZ2") == "1":
data["modeZ2"] = "super_freeze"
elif data["parameters"]["intelligenceMode"] == "1":
data["modeZ2"] = "auto_set"
data["modeZ2"] = "no_mode"
return data
@ -1,21 +1,10 @@
# pylint: disable=duplicate-code
from typing import Any, Dict
from pyhon.appliances.base import ApplianceBase
from pyhon.parameter.fixed import HonParameterFixed
class Appliance(ApplianceBase):
def attributes(self, data: Dict[str, Any]) -> Dict[str, Any]:
data = super().attributes(data)
if data.get("lastConnEvent", {}).get("category", "") == "DISCONNECTED":
data["parameters"]["machMode"].value = "0"
data["active"] = bool(data.get("activity"))
data["pause"] = data["parameters"]["machMode"] == "3"
class Appliance:
def data(self, data):
if data["attributes"]["lastConnEvent"]["category"] == "DISCONNECTED":
data["attributes"]["parameters"]["machMode"] = "0"
data["active"] = bool(data.get("attributes", {}).get("activity"))
data["pause"] = data["attributes"]["parameters"]["machMode"] == "3"
return data
def settings(self, settings: Dict[str, Any]) -> Dict[str, Any]:
dry_level = settings.get("startProgram.dryLevel")
if isinstance(dry_level, HonParameterFixed) and dry_level.value == "11":
settings.pop("startProgram.dryLevel", None)
def settings(self, settings):
return settings
@ -1,5 +0,0 @@
from pyhon.appliances.base import ApplianceBase
class Appliance(ApplianceBase):
@ -1,17 +1,10 @@
# pylint: disable=duplicate-code
from typing import Dict, Any
from pyhon.appliances.base import ApplianceBase
class Appliance(ApplianceBase):
def attributes(self, data: Dict[str, Any]) -> Dict[str, Any]:
data = super().attributes(data)
if data.get("lastConnEvent", {}).get("category", "") == "DISCONNECTED":
data["parameters"]["machMode"].value = "0"
data["active"] = bool(data.get("activity"))
data["pause"] = data["parameters"]["machMode"] == "3"
class Appliance:
def data(self, data):
if data["attributes"]["lastConnEvent"]["category"] == "DISCONNECTED":
data["attributes"]["parameters"]["machMode"] = "0"
data["active"] = bool(data.get("attributes", {}).get("activity"))
data["pause"] = data["attributes"]["parameters"]["machMode"] == "3"
return data
def settings(self, settings: Dict[str, Any]) -> Dict[str, Any]:
def settings(self, settings):
return settings
@ -1,16 +0,0 @@
from typing import Any, Dict
from pyhon.appliances.base import ApplianceBase
from pyhon.parameter.base import HonParameter
class Appliance(ApplianceBase):
def attributes(self, data: Dict[str, Any]) -> Dict[str, Any]:
data = super().attributes(data)
parameter = data.get("parameters", {}).get("onOffStatus")
is_class = isinstance(parameter, HonParameter)
data["active"] = parameter.value == 1 if is_class else parameter == 1
return data
def settings(self, settings: Dict[str, Any]) -> Dict[str, Any]:
return settings
@ -1,17 +1,10 @@
# pylint: disable=duplicate-code
from typing import Any, Dict
from pyhon.appliances.base import ApplianceBase
class Appliance(ApplianceBase):
def attributes(self, data: Dict[str, Any]) -> Dict[str, Any]:
data = super().attributes(data)
if data.get("lastConnEvent", {}).get("category", "") == "DISCONNECTED":
data["parameters"]["machMode"].value = "0"
data["active"] = bool(data.get("activity"))
data["pause"] = data["parameters"]["machMode"] == "3"
class Appliance:
def data(self, data):
if data["attributes"]["lastConnEvent"]["category"] == "DISCONNECTED":
data["attributes"]["parameters"]["machMode"] = "0"
data["active"] = bool(data.get("attributes", {}).get("activity"))
data["pause"] = data["attributes"]["parameters"]["machMode"] == "3"
return data
def settings(self, settings: Dict[str, Any]) -> Dict[str, Any]:
def settings(self, settings):
return settings
@ -1,58 +0,0 @@
from datetime import datetime, timedelta
from typing import Optional, Final, Dict
from pyhon.helper import str_to_float
class HonAttribute:
_LOCK_TIMEOUT: Final = 10
def __init__(self, data: Dict[str, str] | str):
self._value: str = ""
self._last_update: Optional[datetime] = None
self._lock_timestamp: Optional[datetime] = None
def value(self) -> float | str:
"""Attribute value"""
return str_to_float(self._value)
except ValueError:
return self._value
def value(self, value: str) -> None:
self._value = value
def last_update(self) -> Optional[datetime]:
"""Timestamp of last api update"""
return self._last_update
def lock(self) -> bool:
"""Shows if value changes are forbidden"""
if not self._lock_timestamp:
return False
lock_until = self._lock_timestamp + timedelta(seconds=self._LOCK_TIMEOUT)
return lock_until >= datetime.utcnow()
def update(self, data: Dict[str, str] | str, shield: bool = False) -> bool:
if self.lock and not shield:
return False
if shield:
self._lock_timestamp = datetime.utcnow()
if isinstance(data, str):
self.value = data
return True
self.value = data.get("parNewVal", "")
if last_update := data.get("lastUpdate"):
self._last_update = datetime.fromisoformat(last_update)
except ValueError:
self._last_update = None
return True
def __str__(self) -> str:
return self._value
@ -1,227 +0,0 @@
import asyncio
from contextlib import suppress
from copy import copy
from typing import Dict, Any, Optional, TYPE_CHECKING, List
from pyhon.commands import HonCommand
from pyhon.exceptions import NoAuthenticationException
from pyhon.parameter.fixed import HonParameterFixed
from pyhon.parameter.program import HonParameterProgram
from pyhon import HonAPI
from pyhon.appliance import HonAppliance
class HonCommandLoader:
"""Loads and parses hOn command data"""
def __init__(self, api: "HonAPI", appliance: "HonAppliance") -> None:
self._api: "HonAPI" = api
self._appliance: "HonAppliance" = appliance
self._api_commands: Dict[str, Any] = {}
self._favourites: List[Dict[str, Any]] = []
self._command_history: List[Dict[str, Any]] = []
self._commands: Dict[str, HonCommand] = {}
self._appliance_data: Dict[str, Any] = {}
self._additional_data: Dict[str, Any] = {}
def api(self) -> "HonAPI":
"""api connection object"""
if self._api is None:
raise NoAuthenticationException("Missing hOn login")
return self._api
def appliance(self) -> "HonAppliance":
"""appliance object"""
return self._appliance
def commands(self) -> Dict[str, HonCommand]:
"""Get list of hon commands"""
return self._commands
def appliance_data(self) -> Dict[str, Any]:
"""Get command appliance data"""
return self._appliance_data
def additional_data(self) -> Dict[str, Any]:
"""Get command additional data"""
return self._additional_data
async def load_commands(self) -> None:
"""Trigger loading of command data"""
await self._load_data()
self._appliance_data = self._api_commands.pop("applianceModel", {})
async def _load_commands(self) -> None:
self._api_commands = await self._api.load_commands(self._appliance)
async def _load_favourites(self) -> None:
self._favourites = await self._api.load_favourites(self._appliance)
async def _load_command_history(self) -> None:
self._command_history = await self._api.load_command_history(self._appliance)
async def _load_data(self) -> None:
"""Callback parallel all relevant data"""
await asyncio.gather(
def _is_command(data: Dict[str, Any]) -> bool:
"""Check if dict can be parsed as command"""
return (
data.get("description") is not None and data.get("protocolType") is not None
def _clean_name(category: str) -> str:
"""Clean up category name"""
if "PROGRAM" in category:
return category.split(".")[-1].lower()
return category
def _get_commands(self) -> None:
"""Generates HonCommand dict from api data"""
commands = []
for name, data in self._api_commands.items():
if command := self._parse_command(data, name):
self._commands = { c for c in commands}
def _parse_command(
data: Dict[str, Any] | str,
command_name: str,
categories: Optional[Dict[str, "HonCommand"]] = None,
category_name: str = "",
) -> Optional[HonCommand]:
"""Try to crate HonCommand object"""
if not isinstance(data, dict):
self._additional_data[command_name] = data
return None
if self._is_command(data):
return HonCommand(
if category := self._parse_categories(data, command_name):
return category
return None
def _parse_categories(
self, data: Dict[str, Any], command_name: str
) -> Optional[HonCommand]:
"""Parse categories and create reference to other"""
categories: Dict[str, HonCommand] = {}
for category, value in data.items():
if command := self._parse_command(
value, command_name, category_name=category, categories=categories
categories[self._clean_name(category)] = command
if categories:
# setParameters should be at first place
if "setParameters" in categories:
return categories["setParameters"]
return list(categories.values())[0]
return None
def _get_last_command_index(self, name: str) -> Optional[int]:
"""Get index of last command execution"""
return next(
for (index, d) in enumerate(self._command_history)
if d.get("command", {}).get("commandName") == name
def _set_last_category(
self, command: HonCommand, name: str, parameters: Dict[str, Any]
) -> HonCommand:
"""Set category to last state"""
if command.categories:
if program := parameters.pop("program", None):
command.category = self._clean_name(program)
elif category := parameters.pop("category", None):
command.category = category
return command
return self.commands[name]
return command
def _recover_last_command_states(self) -> None:
"""Set commands to last state"""
for name, command in self.commands.items():
if (last_index := self._get_last_command_index(name)) is None:
last_command = self._command_history[last_index]
parameters = last_command.get("command", {}).get("parameters", {})
command = self._set_last_category(command, name, parameters)
for key, data in command.settings.items():
if parameters.get(key) is None:
with suppress(ValueError):
data.value = parameters.get(key)
def _add_favourites(self) -> None:
"""Patch program categories with favourites"""
for favourite in self._favourites:
name, command_name, base = self._get_favourite_info(favourite)
if not base:
base_command: HonCommand = copy(base)
self._update_base_command_with_data(base_command, favourite)
self._update_program_categories(command_name, name, base_command)
def _get_favourite_info(
self, favourite: Dict[str, Any]
) -> tuple[str, str, HonCommand | None]:
name: str = favourite.get("favouriteName", {})
command = favourite.get("command", {})
command_name: str = command.get("commandName", "")
program_name = self._clean_name(command.get("programName", ""))
base_command = self.commands[command_name].categories.get(program_name)
return name, command_name, base_command
def _update_base_command_with_data(
self, base_command: HonCommand, command: Dict[str, Any]
) -> None:
for data in command.values():
if isinstance(data, str):
for key, value in data.items():
if not (parameter := base_command.parameters.get(key)):
with suppress(ValueError):
parameter.value = value
def _update_base_command_with_favourite(self, base_command: HonCommand) -> None:
extra_param = HonParameterFixed("favourite", {"fixedValue": "1"}, "custom")
def _update_program_categories(
self, command_name: str, name: str, base_command: HonCommand
) -> None:
program = base_command.parameters["program"]
if isinstance(program, HonParameterProgram):
self.commands[command_name].categories[name] = base_command
@ -1,208 +1,90 @@
import logging
from typing import Optional, Dict, Any, List, TYPE_CHECKING, Union
from pyhon import exceptions
from pyhon.exceptions import ApiError, NoAuthenticationException
from pyhon.parameter.base import HonParameter
from pyhon.parameter.enum import HonParameterEnum
from pyhon.parameter.fixed import HonParameterFixed
from pyhon.parameter.program import HonParameterProgram
from pyhon.parameter.range import HonParameterRange
from pyhon.rules import HonRuleSet
from pyhon.typedefs import Parameter
from pyhon import HonAPI
from pyhon.appliance import HonAppliance
_LOGGER = logging.getLogger(__name__)
from pyhon.parameter import (
class HonCommand:
def __init__(
name: str,
attributes: Dict[str, Any],
appliance: "HonAppliance",
categories: Optional[Dict[str, "HonCommand"]] = None,
category_name: str = "",
self._name: str = name
self._api: Optional[HonAPI] = None
self._appliance: "HonAppliance" = appliance
self._categories: Optional[Dict[str, "HonCommand"]] = categories
self._category_name: str = category_name
self._parameters: Dict[str, Parameter] = {}
self._data: Dict[str, Any] = {}
self._rules: List[HonRuleSet] = []
attributes.pop("description", "")
attributes.pop("protocolType", "")
def __init__(self, name, attributes, connector, device, multi=None, program=""):
self._connector = connector
self._device = device
self._name = name
self._multi = multi or {}
self._program = program
self._description = attributes.get("description", "")
self._parameters = self._create_parameters(attributes.get("parameters", {}))
self._ancillary_parameters = self._create_parameters(
attributes.get("ancillaryParameters", {})
def __repr__(self) -> str:
def __repr__(self):
return f"{self._name} command"
def name(self) -> str:
return self._name
def api(self) -> "HonAPI":
if self._api is None and self._appliance:
self._api = self._appliance.api
if self._api is None:
raise exceptions.NoAuthenticationException("Missing hOn login")
return self._api
def appliance(self) -> "HonAppliance":
return self._appliance
def data(self) -> Dict[str, Any]:
return self._data
def parameters(self) -> Dict[str, HonParameter]:
return self._parameters
def settings(self) -> Dict[str, HonParameter]:
return self._parameters
def parameter_groups(self) -> Dict[str, Dict[str, Union[str, float]]]:
result: Dict[str, Dict[str, Union[str, float]]] = {}
for name, parameter in self._parameters.items():
result.setdefault(, {})[name] = parameter.intern_value
def _create_parameters(self, parameters):
result = {}
for parameter, attributes in parameters.items():
match attributes.get("typology"):
case "range":
result[parameter] = HonParameterRange(parameter, attributes)
case "enum":
result[parameter] = HonParameterEnum(parameter, attributes)
case "fixed":
result[parameter] = HonParameterFixed(parameter, attributes)
if self._multi:
result["program"] = HonParameterProgram("program", self)
return result
def mandatory_parameter_groups(self) -> Dict[str, Dict[str, Union[str, float]]]:
result: Dict[str, Dict[str, Union[str, float]]] = {}
for name, parameter in self._parameters.items():
if parameter.mandatory:
result.setdefault(, {})[name] = parameter.intern_value
return result
def parameters(self):
return self._parameters
def parameter_value(self) -> Dict[str, Union[str, float]]:
return {n: p.value for n, p in self._parameters.items()}
def ancillary_parameters(self):
return {
key: parameter.value
for key, parameter in self._ancillary_parameters.items()
def _load_parameters(self, attributes: Dict[str, Dict[str, Any] | Any]) -> None:
for key, items in attributes.items():
if not isinstance(items, dict):
||||"Loading Attributes - Skipping %s", str(items))
async def send(self):
parameters = {
name: parameter.value for name, parameter in self._parameters.items()
return await self._connector.send_command(
self._device, self._name, parameters, self.ancillary_parameters
def get_programs(self):
return self._multi
def set_program(self, program):
self._device.commands[self._name] = self._multi[program]
def _get_settings_keys(self, command=None):
command = command or self
keys = []
for key, parameter in command._parameters.items():
if isinstance(parameter, HonParameterFixed):
for name, data in items.items():
self._create_parameters(data, name, key)
for rule in self._rules:
def _create_parameters(
self, data: Dict[str, Any], name: str, parameter: str
) -> None:
if name == "zoneMap" and
data["default"] =
if data.get("category") == "rule":
if "fixedValue" in data:
self._rules.append(HonRuleSet(self, data["fixedValue"]))
elif "enumValues" in data:
self._rules.append(HonRuleSet(self, data["enumValues"]))
_LOGGER.warning("Rule not supported: %s", data)
match data.get("typology"):
case "range":
self._parameters[name] = HonParameterRange(name, data, parameter)
case "enum":
self._parameters[name] = HonParameterEnum(name, data, parameter)
case "fixed":
self._parameters[name] = HonParameterFixed(name, data, parameter)
case _:
self._data[name] = data
if self._category_name:
name = "program" if "PROGRAM" in self._category_name else "category"
self._parameters[name] = HonParameterProgram(name, self, "custom")
async def send(self, only_mandatory: bool = False) -> bool:
grouped_params = (
self.mandatory_parameter_groups if only_mandatory else self.parameter_groups
params = grouped_params.get("parameters", {})
return await self.send_parameters(params)
async def send_specific(self, param_names: List[str]) -> bool:
params: Dict[str, str | float] = {}
for key, parameter in self._parameters.items():
if key in param_names or parameter.mandatory:
params[key] = parameter.value
return await self.send_parameters(params)
async def send_parameters(self, params: Dict[str, str | float]) -> bool:
ancillary_params = self.parameter_groups.get("ancillaryParameters", {})
ancillary_params.pop("programRules", None)
if "prStr" in params:
params["prStr"] = self._category_name.upper()
result = await self.api.send_command(
if not result:
raise ApiError("Can't send command")
except NoAuthenticationException:
_LOGGER.error("No Authentication")
return False
return result
if key not in keys:
return keys
def categories(self) -> Dict[str, "HonCommand"]:
if self._categories is None:
return {"_": self}
return self._categories
def setting_keys(self):
if not self._multi:
return self._get_settings_keys()
result = [
key for cmd in self._multi.values() for key in self._get_settings_keys(cmd)
return list(set(result + ["program"]))
def category(self) -> str:
return self._category_name
def category(self, category: str) -> None:
if category in self.categories:
self._appliance.commands[self._name] = self.categories[category]
def setting_keys(self) -> List[str]:
return list(
{param for cmd in self.categories.values() for param in cmd.parameters}
def _more_options(first: Parameter, second: Parameter) -> Parameter:
if isinstance(first, HonParameterFixed) and not isinstance(
second, HonParameterFixed
return second
if len(second.values) > len(first.values):
return second
return first
def available_settings(self) -> Dict[str, Parameter]:
result: Dict[str, Parameter] = {}
for command in self.categories.values():
for name, parameter in command.parameters.items():
if name in result:
result[name] = self._more_options(result[name], parameter)
result[name] = parameter
return result
def reset(self) -> None:
for parameter in self._parameters.values():
def settings(self):
"""Parameters with typology enum and range"""
return {
s: self._parameters.get(s)
for s in self.setting_keys
if self._parameters.get(s) is not None
@ -1,205 +1,108 @@
import json
import logging
from datetime import datetime
from pathlib import Path
from pprint import pformat
from types import TracebackType
from typing import Dict, Optional, Any, List, no_type_check, Type
from aiohttp import ClientSession
from typing_extensions import Self
from pyhon import const, exceptions
from pyhon import const
from pyhon.appliance import HonAppliance
from pyhon.connection.auth import HonAuth
from pyhon.connection.handler.anonym import HonAnonymousConnectionHandler
from pyhon.connection.handler.hon import HonConnectionHandler
from pyhon.connection.handler import HonConnectionHandler, HonAnonymousConnectionHandler
_LOGGER = logging.getLogger(__name__)
_LOGGER = logging.getLogger()
class HonAPI:
def __init__(
email: str = "",
password: str = "",
anonymous: bool = False,
session: Optional[ClientSession] = None,
) -> None:
def __init__(self, email="", password="", anonymous=False, session=None) -> None:
self._email: str = email
self._password: str = password
self._anonymous: bool = anonymous
self._hon_handler: Optional[HonConnectionHandler] = None
self._hon_anonymous_handler: Optional[HonAnonymousConnectionHandler] = None
self._session: Optional[ClientSession] = session
self._email = email
self._password = password
self._anonymous = anonymous
self._hon = None
self._hon_anonymous = None
self._session = session
async def __aenter__(self) -> Self:
async def __aenter__(self):
return await self.create()
async def __aexit__(
exc_type: Optional[Type[BaseException]],
exc: Optional[BaseException],
traceback: Optional[TracebackType],
) -> None:
async def __aexit__(self, exc_type, exc_val, exc_tb):
await self.close()
def auth(self) -> HonAuth:
if self._hon is None or self._hon.auth is None:
raise exceptions.NoAuthenticationException
return self._hon.auth
def _hon(self) -> HonConnectionHandler:
if self._hon_handler is None:
raise exceptions.NoAuthenticationException
return self._hon_handler
def _hon_anonymous(self) -> HonAnonymousConnectionHandler:
if self._hon_anonymous_handler is None:
raise exceptions.NoAuthenticationException
return self._hon_anonymous_handler
async def create(self) -> Self:
self._hon_anonymous_handler = await HonAnonymousConnectionHandler(
async def create(self):
self._hon_anonymous = await HonAnonymousConnectionHandler(
if not self._anonymous:
self._hon_handler = await HonConnectionHandler(
self._hon = await HonConnectionHandler(
self._email, self._password, self._session
return self
async def load_appliances(self) -> List[Dict[str, Any]]:
async def load_appliances(self):
async with self._hon.get(f"{const.API_URL}/commands/v1/appliance") as resp:
result = await resp.json()
if result:
appliances: List[Dict[str, Any]] = result.get("payload", {}).get(
"appliances", {}
return appliances
return []
return await resp.json()
async def load_commands(self, appliance: HonAppliance) -> Dict[str, Any]:
params: Dict[str, str | int] = {
async def load_commands(self, appliance: HonAppliance):
params = {
"applianceType": appliance.appliance_type,
"applianceModelId": appliance.appliance_model_id,
"macAddress": appliance.mac_address,
"os": const.OS,
"appVersion": const.APP_VERSION,
"code": appliance.code,
if firmware_id :="eepromId"):
params["firmwareId"] = firmware_id
if firmware_version :="fwVersion"):
params["fwVersion"] = firmware_version
if series :="series"):
params["series"] = series
url: str = f"{const.API_URL}/commands/v1/retrieve"
url = f"{const.API_URL}/commands/v1/retrieve"
async with self._hon.get(url, params=params) as response:
result: Dict[str, Any] = (await response.json()).get("payload", {})
result = (await response.json()).get("payload", {})
if not result or result.pop("resultCode") != "0":
_LOGGER.error(await response.json())
return {}
return result
async def load_command_history(
self, appliance: HonAppliance
) -> List[Dict[str, Any]]:
url: str = (
async def command_history(self, appliance: HonAppliance):
url = f"{const.API_URL}/commands/v1/appliance/{appliance.mac_address}/history"
async with self._hon.get(url) as response:
result: Dict[str, Any] = await response.json()
if not result or not result.get("payload"):
return []
command_history: List[Dict[str, Any]] = result["payload"]["history"]
return command_history
result = await response.json()
if not result or not result.get("payload"):
return {}
return result["payload"]["history"]
async def load_favourites(self, appliance: HonAppliance) -> List[Dict[str, Any]]:
url: str = (
async with self._hon.get(url) as response:
result: Dict[str, Any] = await response.json()
if not result or not result.get("payload"):
return []
favourites: List[Dict[str, Any]] = result["payload"]["favourites"]
return favourites
async def load_last_activity(self, appliance: HonAppliance) -> Dict[str, Any]:
url: str = f"{const.API_URL}/commands/v1/retrieve-last-activity"
params: Dict[str, str] = {"macAddress": appliance.mac_address}
async def last_activity(self, appliance: HonAppliance):
url = f"{const.API_URL}/commands/v1/retrieve-last-activity"
params = {"macAddress": appliance.mac_address}
async with self._hon.get(url, params=params) as response:
result: Dict[str, Any] = await response.json()
if result:
activity: Dict[str, Any] = result.get("attributes", "")
if activity:
return activity
result = await response.json()
if result and (activity := result.get("attributes")):
return activity
return {}
async def load_appliance_data(self, appliance: HonAppliance) -> Dict[str, Any]:
url: str = f"{const.API_URL}/commands/v1/appliance-model"
params: Dict[str, str] = {
"code": appliance.code,
"macAddress": appliance.mac_address,
async with self._hon.get(url, params=params) as response:
result: Dict[str, Any] = await response.json()
if result:
appliance_data: Dict[str, Any] = result.get("payload", {}).get(
"applianceModel", {}
return appliance_data
return {}
async def load_attributes(self, appliance: HonAppliance) -> Dict[str, Any]:
params: Dict[str, str] = {
async def load_attributes(self, appliance: HonAppliance):
params = {
"macAddress": appliance.mac_address,
"applianceType": appliance.appliance_type,
"category": "CYCLE",
url: str = f"{const.API_URL}/commands/v1/context"
url = f"{const.API_URL}/commands/v1/context"
async with self._hon.get(url, params=params) as response:
attributes: Dict[str, Any] = (await response.json()).get("payload", {})
return attributes
return (await response.json()).get("payload", {})
async def load_statistics(self, appliance: HonAppliance) -> Dict[str, Any]:
params: Dict[str, str] = {
async def load_statistics(self, appliance: HonAppliance):
params = {
"macAddress": appliance.mac_address,
"applianceType": appliance.appliance_type,
url: str = f"{const.API_URL}/commands/v1/statistics"
url = f"{const.API_URL}/commands/v1/statistics"
async with self._hon.get(url, params=params) as response:
statistics: Dict[str, Any] = (await response.json()).get("payload", {})
return statistics
return (await response.json()).get("payload", {})
async def load_maintenance(self, appliance: HonAppliance) -> Dict[str, Any]:
url = f"{const.API_URL}/commands/v1/maintenance-cycle"
params = {"macAddress": appliance.mac_address}
async with self._hon.get(url, params=params) as response:
maintenance: Dict[str, Any] = (await response.json()).get("payload", {})
return maintenance
async def send_command(
appliance: HonAppliance,
command: str,
parameters: Dict[str, Any],
ancillary_parameters: Dict[str, Any],
program_name: str = "",
) -> bool:
now: str = datetime.utcnow().isoformat()
data: Dict[str, Any] = {
async def send_command(self, appliance, command, parameters, ancillary_parameters):
now = datetime.utcnow().isoformat()
data = {
"macAddress": appliance.mac_address,
"timestamp": f"{now[:-3]}Z",
"commandName": command,
"transactionId": f"{appliance.mac_address}_{now[:-3]}Z",
"applianceOptions": appliance.options,
"device": self._hon.device.get(mobile=True),
"applianceOptions": appliance.commands_options,
"appliance": self._hon.device.get(),
"attributes": {
"channel": "mobileApp",
"origin": "standardProgram",
@ -209,125 +112,45 @@ class HonAPI:
"parameters": parameters,
"applianceType": appliance.appliance_type,
if command == "startProgram" and program_name:
data.update({"programName": program_name.upper()})
url: str = f"{const.API_URL}/commands/v1/send"
async with, json=data) as response:
json_data: Dict[str, Any] = await response.json()
url = f"{const.API_URL}/commands/v1/send"
async with, json=data) as resp:
json_data = await resp.json()
if json_data.get("payload", {}).get("resultCode") == "0":
return True
_LOGGER.error(await response.text())
_LOGGER.error("%s - Payload:\n%s", url, pformat(data))
return False
async def appliance_configuration(self) -> Dict[str, Any]:
url: str = f"{const.API_URL}/config/v1/program-list-rules"
async def appliance_configuration(self):
url = f"{const.API_URL}/config/v1/appliance-configuration"
async with self._hon_anonymous.get(url) as response:
result: Dict[str, Any] = await response.json()
data: Dict[str, Any] = result.get("payload", {})
return data
result = await response.json()
if result and (data := result.get("payload")):
return data
return {}
async def app_config(
self, language: str = "en", beta: bool = True
) -> Dict[str, Any]:
url: str = f"{const.API_URL}/app-config"
payload_data: Dict[str, str | int] = {
async def app_config(self, language="en", beta=True):
url = f"{const.API_URL}/app-config"
payload = {
"languageCode": language,
"beta": beta,
"appVersion": const.APP_VERSION,
"os": const.OS,
payload: str = json.dumps(payload_data, separators=(",", ":"))
payload = json.dumps(payload, separators=(",", ":"))
async with, data=payload) as response:
result = await response.json()
data: Dict[str, Any] = result.get("payload", {})
return data
async def translation_keys(self, language: str = "en") -> Dict[str, Any]:
config = await self.app_config(language=language)
if not (url := config.get("language", {}).get("jsonPath")):
return {}
async with self._hon_anonymous.get(url) as response:
result: Dict[str, Any] = await response.json()
return result
async def close(self) -> None:
if self._hon_handler is not None:
await self._hon_handler.close()
if self._hon_anonymous_handler is not None:
await self._hon_anonymous_handler.close()
class TestAPI(HonAPI):
def __init__(self, path: Path):
self._anonymous = True
self._path: Path = path
def _load_json(self, appliance: HonAppliance, file: str) -> Dict[str, Any]:
directory = f"{appliance.appliance_type}_{appliance.appliance_model_id}".lower()
if not (path := self._path / directory / f"{file}.json").exists():
_LOGGER.warning("Can't open %s", str(path))
return {}
with open(path, "r", encoding="utf-8") as json_file:
text =
data: Dict[str, Any] = json.loads(text)
return data
except json.decoder.JSONDecodeError as error:
_LOGGER.error("%s - %s", str(path), error)
return {}
async def load_appliances(self) -> List[Dict[str, Any]]:
result = []
for appliance in self._path.glob("*/"):
file = appliance / "appliance_data.json"
with open(file, "r", encoding="utf-8") as json_file:
except json.decoder.JSONDecodeError as error:
_LOGGER.error("%s - %s", str(file), error)
return result
async def load_commands(self, appliance: HonAppliance) -> Dict[str, Any]:
return self._load_json(appliance, "commands")
async def load_command_history(
self, appliance: HonAppliance
) -> List[Dict[str, Any]]:
return self._load_json(appliance, "command_history")
async def load_favourites(self, appliance: HonAppliance) -> List[Dict[str, Any]]:
return []
async def load_last_activity(self, appliance: HonAppliance) -> Dict[str, Any]:
if (result := await response.json()) and (data := result.get("payload")):
return data
return {}
async def load_appliance_data(self, appliance: HonAppliance) -> Dict[str, Any]:
return self._load_json(appliance, "appliance_data")
async def translation_keys(self, language="en"):
config = await self.app_config(language=language)
if url := config.get("language", {}).get("jsonPath"):
async with self._hon_anonymous.get(url) as response:
if result := await response.json():
return result
return {}
async def load_attributes(self, appliance: HonAppliance) -> Dict[str, Any]:
return self._load_json(appliance, "attributes")
async def load_statistics(self, appliance: HonAppliance) -> Dict[str, Any]:
return self._load_json(appliance, "statistics")
async def load_maintenance(self, appliance: HonAppliance) -> Dict[str, Any]:
return self._load_json(appliance, "maintenance")
async def send_command(
appliance: HonAppliance,
command: str,
parameters: Dict[str, Any],
ancillary_parameters: Dict[str, Any],
program_name: str = "",
) -> bool:
"%s - %s - %s",
return True
async def close(self):
if self._hon:
await self._hon.close()
if self._hon_anonymous:
await self._hon_anonymous.close()
@ -3,289 +3,228 @@ import logging
import re
import secrets
import urllib
from contextlib import suppress
from dataclasses import dataclass
from datetime import datetime, timedelta
from typing import Dict, Optional, Any, List
from pprint import pformat
from urllib import parse
from urllib.parse import quote
import aiohttp
from aiohttp import ClientResponse
from yarl import URL
from pyhon import const, exceptions
from pyhon.connection.device import HonDevice
from pyhon.connection.handler.auth import HonAuthConnectionHandler
from pyhon import const
from pyhon.exceptions import HonAuthenticationError
_LOGGER = logging.getLogger(__name__)
class HonLoginData:
url: str = ""
email: str = ""
password: str = ""
fw_uid: str = ""
loaded: Optional[Dict[str, Any]] = None
class HonAuthData:
access_token: str = ""
refresh_token: str = ""
cognito_token: str = ""
id_token: str = ""
class HonAuth:
def __init__(
session: aiohttp.ClientSession,
email: str,
password: str,
device: HonDevice,
) -> None:
def __init__(self, session, email, password, device) -> None:
self._session = session
self._request = HonAuthConnectionHandler(session)
self._login_data = HonLoginData()
|||| = email
self._login_data.password = password
self._email = email
self._password = password
self._access_token = ""
self._refresh_token = ""
self._cognito_token = ""
self._id_token = ""
self._device = device
self._expires: datetime = datetime.utcnow()
self._auth = HonAuthData()
self._called_urls = []
def cognito_token(self) -> str:
return self._auth.cognito_token
def cognito_token(self):
return self._cognito_token
def id_token(self) -> str:
return self._auth.id_token
def id_token(self):
return self._id_token
def access_token(self) -> str:
return self._auth.access_token
def access_token(self):
return self._access_token
def refresh_token(self) -> str:
return self._auth.refresh_token
def refresh_token(self):
return self._refresh_token
def _check_token_expiration(self, hours: int) -> bool:
return datetime.utcnow() >= self._expires + timedelta(hours=hours)
def token_is_expired(self) -> bool:
return self._check_token_expiration(self._TOKEN_EXPIRES_AFTER_HOURS)
def token_expires_soon(self) -> bool:
return self._check_token_expiration(self._TOKEN_EXPIRE_WARNING_HOURS)
async def _error_logger(self, response: ClientResponse, fail: bool = True) -> None:
output = "hOn Authentication Error\n"
for i, (status, url) in enumerate(self._request.called_urls):
output += f" {i + 1: 2d} {status} - {url}\n"
output += f"ERROR - {response.status} - {response.request_info.url}\n"
output += f"{15 * '='} Response {15 * '='}\n{await response.text()}\n{40 * '='}"
async def _error_logger(self, response, fail=True):
result = "hOn Authentication Error\n"
for i, (status, url) in enumerate(self._called_urls):
result += f" {i + 1: 2d} {status} - {url}\n"
result += f"ERROR - {response.status} - {response.request_info.url}\n"
result += f"{15 * '='} Response {15 * '='}\n{await response.text()}\n{40 * '='}"
if fail:
raise exceptions.HonAuthenticationError("Can't login")
raise HonAuthenticationError("Can't login")
def _generate_nonce() -> str:
async def _load_login(self):
nonce = secrets.token_hex(16)
return f"{nonce[:8]}-{nonce[8:12]}-{nonce[12:16]}-{nonce[16:20]}-{nonce[20:]}"
async def _load_login(self) -> bool:
login_url = await self._introduce()
login_url = await self._handle_redirects(login_url)
return await self._login_url(login_url)
async def _introduce(self) -> str:
redirect_uri = urllib.parse.quote(f"{const.APP}://mobilesdk/detect/oauth/done")
nonce = f"{nonce[:8]}-{nonce[8:12]}-{nonce[12:16]}-{nonce[16:20]}-{nonce[20:]}"
params = {
"response_type": "token+id_token",
"client_id": const.CLIENT_ID,
"redirect_uri": redirect_uri,
"redirect_uri": urllib.parse.quote(
"display": "touch",
"scope": "api openid refresh_token web",
"nonce": self._generate_nonce(),
"nonce": nonce,
params_encode = "&".join([f"{k}={v}" for k, v in params.items()])
url = f"{const.AUTH_API}/services/oauth2/authorize/expid_Login?{params_encode}"
async with self._request.get(url) as response:
text = await response.text()
self._expires = datetime.utcnow()
login_url: List[str] = re.findall("url = '(.+?)'", text)
if not login_url:
if "oauth/done#access_token=" in text:
raise exceptions.HonNoAuthenticationNeeded()
params = "&".join([f"{k}={v}" for k, v in params.items()])
async with self._session.get(
) as response:
self._called_urls.append((response.status, response.request_info.url))
if not (login_url := re.findall("url = '(.+?)'", await response.text())):
await self._error_logger(response)
return login_url[0]
async def _manual_redirect(self, url: str) -> str:
async with self._request.get(url, allow_redirects=False) as response:
if not (new_location := response.headers.get("Location", "")):
await self._error_logger(response)
return new_location
async def _handle_redirects(self, login_url: str) -> str:
redirect1 = await self._manual_redirect(login_url)
redirect2 = await self._manual_redirect(redirect1)
return f"{redirect2}&System=IoT_Mobile_App&RegistrationSubChannel=hOn"
async def _login_url(self, login_url: str) -> bool:
headers = {"user-agent": const.USER_AGENT}
url = URL(login_url, encoded=True)
async with self._request.get(url, headers=headers) as response:
text = await response.text()
if context := re.findall('"fwuid":"(.*?)","loaded":(\\{.*?})', text):
return False
async with self._session.get(login_url[0], allow_redirects=False) as redirect1:
self._called_urls.append((redirect1.status, redirect1.request_info.url))
if not (url := redirect1.headers.get("Location")):
await self._error_logger(redirect1)
return False
async with self._session.get(url, allow_redirects=False) as redirect2:
self._called_urls.append((redirect2.status, redirect2.request_info.url))
if not (
url := redirect2.headers.get("Location")
+ "&System=IoT_Mobile_App&RegistrationSubChannel=hOn"
await self._error_logger(redirect2)
return False
async with self._session.get(URL(url, encoded=True)) as login_screen:
(login_screen.status, login_screen.request_info.url)
if context := re.findall(
'"fwuid":"(.*?)","loaded":(\\{.*?})', await login_screen.text()
fw_uid, loaded_str = context[0]
self._login_data.fw_uid = fw_uid
self._login_data.loaded = json.loads(loaded_str)
self._login_data.url = login_url.replace(const.AUTH_API, "")
return True
await self._error_logger(response)
loaded = json.loads(loaded_str)
login_url = login_url[0].replace(
"/".join(const.AUTH_API.split("/")[:-1]), ""
return fw_uid, loaded, login_url
await self._error_logger(login_screen)
return False
async def _login(self) -> str:
start_url = self._login_data.url.rsplit("startURL=", maxsplit=1)[-1]
start_url = parse.unquote(start_url).split("%3D")[0]
action = {
"id": "79;a",
"descriptor": "apex://LightningLoginCustomController/ACTION$login",
"callingDescriptor": "markup://c:loginForm",
"params": {
"password": self._login_data.password,
"startUrl": start_url,
async def _login(self, fw_uid, loaded, login_url):
data = {
"message": {"actions": [action]},
"message": {
"actions": [
"id": "79;a",
"descriptor": "apex://LightningLoginCustomController/ACTION$login",
"callingDescriptor": "markup://c:loginForm",
"params": {
"username": self._email,
"password": self._password,
"startUrl": parse.unquote(
"aura.context": {
"mode": "PROD",
"fwuid": self._login_data.fw_uid,
"fwuid": fw_uid,
"app": "siteforce:loginApp2",
"loaded": self._login_data.loaded,
"loaded": loaded,
"dn": [],
"globals": {},
"uad": False,
"aura.pageURI": self._login_data.url,
"aura.pageURI": login_url,
"aura.token": None,
params = {"r": 3, "other.LightningLoginCustom.login": 1}
async with
async with
const.AUTH_API + "/s/sfsites/aura",
headers={"Content-Type": "application/x-www-form-urlencoded"},
data="&".join(f"{k}={quote(json.dumps(v))}" for k, v in data.items()),
data="&".join(f"{k}={json.dumps(v)}" for k, v in data.items()),
) as response:
self._called_urls.append((response.status, response.request_info.url))
if response.status == 200:
with suppress(json.JSONDecodeError, KeyError):
result = await response.json()
url: str = result["events"][0]["attributes"]["values"]["url"]
return url
data = await response.json()
return data["events"][0]["attributes"]["values"]["url"]
except json.JSONDecodeError:
except KeyError:
"Can't get login url - %s", pformat(await response.json())
await self._error_logger(response)
return ""
def _parse_token_data(self, text: str) -> bool:
if access_token := re.findall("access_token=(.*?)&", text):
self._auth.access_token = access_token[0]
if refresh_token := re.findall("refresh_token=(.*?)&", text):
self._auth.refresh_token = refresh_token[0]
if id_token := re.findall("id_token=(.*?)&", text):
self._auth.id_token = id_token[0]
return bool(access_token and refresh_token and id_token)
async def _get_token(self, url: str) -> bool:
async with self._request.get(url) as response:
async def _get_token(self, url):
async with self._session.get(url) as response:
self._called_urls.append((response.status, response.request_info.url))
if response.status != 200:
await self._error_logger(response)
return False
url_search = re.findall(
"href\\s*=\\s*[\"'](.+?)[\"']", await response.text()
if not url_search:
url = re.findall("href\\s*=\\s*[\"'](.+?)[\"']", await response.text())
if not url:
await self._error_logger(response)
return False
if "ProgressiveLogin" in url_search[0]:
async with self._request.get(url_search[0]) as response:
if "ProgressiveLogin" in url[0]:
async with self._session.get(url[0]) as response:
self._called_urls.append((response.status, response.request_info.url))
if response.status != 200:
await self._error_logger(response)
return False
url_search = re.findall(
"href\\s*=\\s*[\"'](.*?)[\"']", await response.text()
url = const.AUTH_API + url_search[0]
async with self._request.get(url) as response:
url = re.findall("href\\s*=\\s*[\"'](.*?)[\"']", await response.text())
url = "/".join(const.AUTH_API.split("/")[:-1]) + url[0]
async with self._session.get(url) as response:
self._called_urls.append((response.status, response.request_info.url))
if response.status != 200:
await self._error_logger(response)
return False
if not self._parse_token_data(await response.text()):
await self._error_logger(response)
return False
text = await response.text()
if access_token := re.findall("access_token=(.*?)&", text):
self._access_token = access_token[0]
if refresh_token := re.findall("refresh_token=(.*?)&", text):
self._refresh_token = refresh_token[0]
if id_token := re.findall("id_token=(.*?)&", text):
self._id_token = id_token[0]
return True
async def _api_auth(self) -> bool:
post_headers = {"id-token": self._auth.id_token}
async def authorize(self):
if login_site := await self._load_login():
fw_uid, loaded, login_url = login_site
return False
if not (url := await self._login(fw_uid, loaded, login_url)):
return False
if not await self._get_token(url):
return False
return await self._api_auth()
async def _api_auth(self):
post_headers = {"id-token": self._id_token}
data = self._device.get()
async with
async with
f"{const.API_URL}/auth/v1/login", headers=post_headers, json=data
) as response:
self._called_urls.append((response.status, response.request_info.url))
json_data = await response.json()
except json.JSONDecodeError:
await self._error_logger(response)
return False
self._auth.cognito_token = json_data.get("cognitoUser", {}).get("Token", "")
if not self._auth.cognito_token:
raise exceptions.HonAuthenticationError()
self._cognito_token = json_data["cognitoUser"]["Token"]
return True
async def authenticate(self) -> None:
if not await self._load_login():
raise exceptions.HonAuthenticationError("Can't open login page")
if not (url := await self._login()):
raise exceptions.HonAuthenticationError("Can't login")
if not await self._get_token(url):
raise exceptions.HonAuthenticationError("Can't get token")
if not await self._api_auth():
raise exceptions.HonAuthenticationError("Can't get api token")
except exceptions.HonNoAuthenticationNeeded:
async def refresh(self) -> bool:
async def refresh(self):
params = {
"client_id": const.CLIENT_ID,
"refresh_token": self._auth.refresh_token,
"refresh_token": self._refresh_token,
"grant_type": "refresh_token",
async with
async with
f"{const.AUTH_API}/services/oauth2/token", params=params
) as response:
self._called_urls.append((response.status, response.request_info.url))
if response.status >= 400:
await self._error_logger(response, fail=False)
return False
data = await response.json()
self._expires = datetime.utcnow()
self._auth.id_token = data["id_token"]
self._auth.access_token = data["access_token"]
self._id_token = data["id_token"]
self._access_token = data["access_token"]
return await self._api_auth()
def clear(self) -> None:
self._request.called_urls = []
self._auth.cognito_token = ""
self._auth.id_token = ""
self._auth.access_token = ""
self._auth.refresh_token = ""
@ -1,45 +1,41 @@
import secrets
from typing import Dict
from pyhon import const
class HonDevice:
def __init__(self) -> None:
self._app_version: str = const.APP_VERSION
self._os_version: int = const.OS_VERSION
self._os: str = const.OS
self._device_model: str = const.DEVICE_MODEL
self._mobile_id: str = secrets.token_hex(8)
def __init__(self):
self._app_version = const.APP_VERSION
self._os_version = const.OS_VERSION
self._os = const.OS
self._device_model = const.DEVICE_MODEL
self._mobile_id = secrets.token_hex(8)
def app_version(self) -> str:
def app_version(self):
return self._app_version
def os_version(self) -> int:
def os_version(self):
return self._os_version
def os_type(self) -> str:
def os(self):
return self._os
def device_model(self) -> str:
def device_model(self):
return self._device_model
def mobile_id(self) -> str:
def mobile_id(self):
return self._mobile_id
def get(self, mobile: bool = False) -> Dict[str, str | int]:
result: Dict[str, str | int] = {
def get(self):
return {
"appVersion": self.app_version,
"mobileId": self.mobile_id,
"os": self.os_type,
"osVersion": self.os_version,
"os": self.os,
"deviceModel": self.device_model,
if mobile:
result |= {"mobileOs": result.pop("os", "")}
return result
Normal file
Normal file
@ -0,0 +1,134 @@
import json
from contextlib import asynccontextmanager
import aiohttp
from pyhon import const
from pyhon.connection.auth import HonAuth, _LOGGER
from pyhon.connection.device import HonDevice
from pyhon.exceptions import HonAuthenticationError
class HonBaseConnectionHandler:
_HEADERS = {"user-agent": const.USER_AGENT, "Content-Type": "application/json"}
def __init__(self, session=None):
self._session = session
self._auth = None
async def __aenter__(self):
return await self.create()
async def __aexit__(self, exc_type, exc_val, exc_tb):
await self.close()
async def create(self):
self._session = aiohttp.ClientSession(headers=self._HEADERS)
return self
async def _intercept(self, method, *args, loop=0, **kwargs):
raise NotImplementedError
async def get(self, *args, **kwargs):
async with self._intercept(self._session.get, *args, **kwargs) as response:
yield response
async def post(self, *args, **kwargs):
async with self._intercept(, *args, **kwargs) as response:
yield response
async def close(self):
await self._session.close()
class HonConnectionHandler(HonBaseConnectionHandler):
def __init__(self, email, password, session=None):
self._device = HonDevice()
self._email = email
self._password = password
if not self._email:
raise HonAuthenticationError("An email address must be specified")
if not self._password:
raise HonAuthenticationError("A password address must be specified")
self._request_headers = {}
def device(self):
return self._device
async def create(self):
await super().create()
self._auth = HonAuth(self._session, self._email, self._password, self._device)
return self
async def _check_headers(self, headers):
if (
"cognito-token" not in self._request_headers
or "id-token" not in self._request_headers
if await self._auth.authorize():
self._request_headers["cognito-token"] = self._auth.cognito_token
self._request_headers["id-token"] = self._auth.id_token
raise HonAuthenticationError("Can't login")
return headers | self._request_headers
async def _intercept(self, method, *args, loop=0, **kwargs):
kwargs["headers"] = await self._check_headers(kwargs.get("headers", {}))
async with method(*args, **kwargs) as response:
if response.status in [401, 403] and loop == 0:
||||"Try refreshing token...")
await self._auth.refresh()
async with self._intercept(
method, *args, loop=loop + 1, **kwargs
) as result:
yield result
elif response.status in [401, 403] and loop == 1:
"%s - Error %s - %s",
await response.text(),
await self.create()
async with self._intercept(
method, *args, loop=loop + 1, **kwargs
) as result:
yield result
elif loop >= 2:
"%s - Error %s - %s",
await response.text(),
raise HonAuthenticationError("Login failure")
await response.json()
yield response
except json.JSONDecodeError:
"%s - JsonDecodeError %s - %s",
await response.text(),
yield {}
class HonAnonymousConnectionHandler(HonBaseConnectionHandler):
_HEADERS = HonBaseConnectionHandler._HEADERS | {"x-api-key": const.API_KEY}
async def _intercept(self, method, *args, loop=0, **kwargs):
kwargs["headers"] = kwargs.pop("headers", {}) | self._HEADERS
async with method(*args, **kwargs) as response:
if response.status == 403:
_LOGGER.error("Can't authenticate anymore")
yield response
@ -1,27 +0,0 @@
import logging
from import AsyncIterator
from contextlib import asynccontextmanager
from typing import Dict, Any
import aiohttp
from yarl import URL
from pyhon import const
from pyhon.connection.handler.base import ConnectionHandler
from pyhon.typedefs import Callback
_LOGGER = logging.getLogger(__name__)
class HonAnonymousConnectionHandler(ConnectionHandler):
_HEADERS: Dict[str, str] = ConnectionHandler._HEADERS | {"x-api-key": const.API_KEY}
async def _intercept(
self, method: Callback, url: str | URL, *args: Any, **kwargs: Dict[str, Any]
) -> AsyncIterator[aiohttp.ClientResponse]:
kwargs["headers"] = kwargs.pop("headers", {}) | self._HEADERS
async with method(url, *args, **kwargs) as response:
if response.status == 403:
_LOGGER.error("Can't authenticate anymore")
yield response
@ -1,38 +0,0 @@
import logging
from import AsyncIterator
from contextlib import asynccontextmanager
from typing import Optional, List, Tuple, Any, Dict
import aiohttp
from yarl import URL
from pyhon import const
from pyhon.connection.handler.base import ConnectionHandler
from pyhon.typedefs import Callback
_LOGGER = logging.getLogger(__name__)
class HonAuthConnectionHandler(ConnectionHandler):
_HEADERS = {"user-agent": const.USER_AGENT}
def __init__(self, session: Optional[aiohttp.ClientSession] = None) -> None:
self._called_urls: List[Tuple[int, str]] = []
def called_urls(self) -> List[Tuple[int, str]]:
return self._called_urls
def called_urls(self, called_urls: List[Tuple[int, str]]) -> None:
self._called_urls = called_urls
async def _intercept(
self, method: Callback, url: str | URL, *args: Any, **kwargs: Dict[str, Any]
) -> AsyncIterator[aiohttp.ClientResponse]:
kwargs["headers"] = kwargs.pop("headers", {}) | self._HEADERS
async with method(url, *args, **kwargs) as response:
self._called_urls.append((response.status, str(response.request_info.url)))
yield response
@ -1,80 +0,0 @@
import logging
from import AsyncIterator
from contextlib import asynccontextmanager
from types import TracebackType
from typing import Optional, Dict, Type, Any
import aiohttp
from typing_extensions import Self
from yarl import URL
from pyhon import const, exceptions
from pyhon.typedefs import Callback
_LOGGER = logging.getLogger(__name__)
class ConnectionHandler:
_HEADERS: Dict[str, str] = {
"user-agent": const.USER_AGENT,
"Content-Type": "application/json",
def __init__(self, session: Optional[aiohttp.ClientSession] = None) -> None:
self._create_session: bool = session is None
self._session: Optional[aiohttp.ClientSession] = session
async def __aenter__(self) -> Self:
return await self.create()
async def __aexit__(
exc_type: Optional[Type[BaseException]],
exc: Optional[BaseException],
traceback: Optional[TracebackType],
) -> None:
await self.close()
def session(self) -> aiohttp.ClientSession:
if self._session is None:
raise exceptions.NoSessionException
return self._session
async def create(self) -> Self:
if self._create_session:
self._session = aiohttp.ClientSession()
return self
async def _intercept(
self, method: Callback, url: str | URL, *args: Any, **kwargs: Dict[str, Any]
) -> AsyncIterator[aiohttp.ClientResponse]:
async with method(url, *args, **kwargs) as response:
yield response
async def get(
self, *args: Any, **kwargs: Any
) -> AsyncIterator[aiohttp.ClientResponse]:
if self._session is None:
raise exceptions.NoSessionException()
response: aiohttp.ClientResponse
args = self._session.get, *args
async with self._intercept(*args, **kwargs) as response:
yield response
async def post(
self, *args: Any, **kwargs: Any
) -> AsyncIterator[aiohttp.ClientResponse]:
if self._session is None:
raise exceptions.NoSessionException()
response: aiohttp.ClientResponse
args =, *args
async with self._intercept(*args, **kwargs) as response:
yield response
async def close(self) -> None:
if self._create_session and self._session is not None:
await self._session.close()
@ -1,105 +0,0 @@
import json
import logging
from import AsyncIterator
from contextlib import asynccontextmanager
from typing import Optional, Dict, Any
import aiohttp
from typing_extensions import Self
from yarl import URL
from pyhon.connection.auth import HonAuth
from pyhon.connection.device import HonDevice
from pyhon.connection.handler.base import ConnectionHandler
from pyhon.exceptions import HonAuthenticationError, NoAuthenticationException
from pyhon.typedefs import Callback
_LOGGER = logging.getLogger(__name__)
class HonConnectionHandler(ConnectionHandler):
def __init__(
self, email: str, password: str, session: Optional[aiohttp.ClientSession] = None
) -> None:
self._device: HonDevice = HonDevice()
self._email: str = email
self._password: str = password
if not self._email:
raise HonAuthenticationError("An email address must be specified")
if not self._password:
raise HonAuthenticationError("A password address must be specified")
self._auth: Optional[HonAuth] = None
def auth(self) -> HonAuth:
if self._auth is None:
raise NoAuthenticationException()
return self._auth
def device(self) -> HonDevice:
return self._device
async def create(self) -> Self:
await super().create()
self._auth = HonAuth(self.session, self._email, self._password, self._device)
return self
async def _check_headers(self, headers: Dict[str, str]) -> Dict[str, str]:
if not (self.auth.cognito_token and self.auth.id_token):
await self.auth.authenticate()
headers["cognito-token"] = self.auth.cognito_token
headers["id-token"] = self.auth.id_token
return self._HEADERS | headers
async def _intercept(
self, method: Callback, url: str | URL, *args: Any, **kwargs: Any
) -> AsyncIterator[aiohttp.ClientResponse]:
loop: int = kwargs.pop("loop", 0)
kwargs["headers"] = await self._check_headers(kwargs.get("headers", {}))
async with method(url, *args, **kwargs) as response:
if (
self.auth.token_expires_soon or response.status in [401, 403]
) and loop == 0:
||||"Try refreshing token...")
await self.auth.refresh()
async with self._intercept(
method, url, *args, loop=loop + 1, **kwargs
) as result:
yield result
elif (
self.auth.token_is_expired or response.status in [401, 403]
) and loop == 1:
"%s - Error %s - %s",
await response.text(),
await self.create()
async with self._intercept(
method, url, *args, loop=loop + 1, **kwargs
) as result:
yield result
elif loop >= 2:
"%s - Error %s - %s",
await response.text(),
raise HonAuthenticationError("Login failure")
await response.json()
yield response
except json.JSONDecodeError as exc:
"%s - JsonDecodeError %s - %s",
await response.text(),
raise HonAuthenticationError("Decode Error") from exc
@ -1,12 +1,10 @@
API_URL = ""
API_KEY = "GRCqFhC6Gk@ikWXm1RmnSmX1cm,MxY-configuration"
APP = "hon"
APP_VERSION = "2.4.7"
# All seen id's (different accounts, different devices) are the same, so I guess this hash is static
CLIENT_ID = "3MVG9QDx8IX8nP5T2Ha8ofvlmjLZl5L_gvfbT9.HJvpHGKoAS_dcMN8LYpTSYeVFCraUnV.2Ag1Ki7m4znVO6"
APP_VERSION = "1.53.7"
OS = "android"
DEVICE_MODEL = "exynos9820"
@ -1,99 +0,0 @@
import asyncio
import json
import re
import shutil
from pathlib import Path
from typing import TYPE_CHECKING, List, Tuple
from pyhon import printer
from pyhon.appliance import HonAppliance
def anonymize_data(data: str) -> str:
default_date = "1970-01-01T00:00:00.0Z"
default_mac = "xx-xx-xx-xx-xx-xx"
data = re.sub("[0-9A-Fa-f]{2}(-[0-9A-Fa-f]{2}){5}", default_mac, data)
data = re.sub("[\\d-]{10}T[\\d:]{8}(.\\d+)?Z", default_date, data)
for sensible in [
for match in re.findall(f'"{sensible}.*?":\\s"?(.+?)"?,?\\n', data):
replace = re.sub("[a-z]", "x", match)
replace = re.sub("[A-Z]", "X", replace)
replace = re.sub("\\d", "1", replace)
data = data.replace(match, replace)
return data
async def load_data(appliance: "HonAppliance", topic: str) -> Tuple[str, str]:
return topic, await getattr(appliance.api, f"load_{topic}")(appliance)
def write_to_json(data: str, topic: str, path: Path, anonymous: bool = False) -> Path:
json_data = json.dumps(data, indent=4)
if anonymous:
json_data = anonymize_data(json_data)
file = path / f"{topic}.json"
with open(file, "w", encoding="utf-8") as json_file:
return file
async def appliance_data(
appliance: "HonAppliance", path: Path, anonymous: bool = False
) -> List[Path]:
requests = [
path /= f"{appliance.appliance_type}_{appliance.model_id}".lower()
path.mkdir(parents=True, exist_ok=True)
api_data = await asyncio.gather(*[load_data(appliance, name) for name in requests])
return [write_to_json(data, topic, path, anonymous) for topic, data in api_data]
async def zip_archive(
appliance: "HonAppliance", path: Path, anonymous: bool = False
) -> str:
data = await appliance_data(appliance, path, anonymous)
archive = data[0].parent
shutil.make_archive(str(archive), "zip", archive)
return f"{archive.stem}.zip"
def yaml_export(appliance: "HonAppliance", anonymous: bool = False) -> str:
data = {
"attributes": appliance.attributes.copy(),
"statistics": appliance.statistics,
"additional_data": appliance.additional_data,
data |= {n: c.parameter_groups for n, c in appliance.commands.items()}
extra = {n: for n, c in appliance.commands.items() if}
if extra:
data |= {"extra_command_data": extra}
if anonymous:
for sensible in ["serialNumber", "coords"]:
data.get("appliance", {}).pop(sensible, None)
result = printer.pretty_print({"data": data})
if commands := printer.create_commands(appliance.commands):
result += printer.pretty_print({"commands": commands})
if rules := printer.create_rules(appliance.commands):
result += printer.pretty_print({"rules": rules})
if anonymous:
result = anonymize_data(result)
return result
@ -1,18 +1,2 @@
class HonAuthenticationError(Exception):
class HonNoAuthenticationNeeded(Exception):
class NoSessionException(Exception):
class NoAuthenticationException(Exception):
class ApiError(Exception):
@ -1,5 +1,63 @@
def str_to_float(string: str | float) -> float:
return int(string)
except ValueError:
return float(str(string).replace(",", "."))
def key_print(data, key="", start=True):
result = ""
if isinstance(data, list):
for i, value in enumerate(data):
result += key_print(value, key=f"{key}.{i}", start=False)
elif isinstance(data, dict):
for k, value in sorted(data.items()):
result += key_print(value, key=k if start else f"{key}.{k}", start=False)
result += f"{key}: {data}\n"
return result
# yaml.dump() would be done the same, but needs an additional dependency...
def pretty_print(data, key="", intend=0, is_list=False, whitespace=" "):
result = ""
if isinstance(data, list):
if key:
result += f"{whitespace * intend}{'- ' if is_list else ''}{key}:\n"
intend += 1
for i, value in enumerate(data):
result += pretty_print(
value, intend=intend, is_list=True, whitespace=whitespace
elif isinstance(data, dict):
if key:
result += f"{whitespace * intend}{'- ' if is_list else ''}{key}:\n"
intend += 1
for i, (key, value) in enumerate(sorted(data.items())):
if is_list and not i:
result += pretty_print(
value, key=key, intend=intend, is_list=True, whitespace=whitespace
elif is_list:
result += pretty_print(
value, key=key, intend=intend + 1, whitespace=whitespace
result += pretty_print(
value, key=key, intend=intend, whitespace=whitespace
result += f"{whitespace * intend}{'- ' if is_list else ''}{key}{': ' if key else ''}{data}\n"
return result
def create_command(commands, concat=False):
result = {}
for name, command in commands.items():
if not concat:
result[name] = {}
for parameter, data in command.parameters.items():
if data.typology == "enum":
value = data.values
elif data.typology == "range":
value = {"min": data.min, "max": data.max, "step": data.step}
if not concat:
result[name][parameter] = value
result[f"{name}.{parameter}"] = value
return result
@ -1,67 +1,27 @@
import asyncio
import logging
from pathlib import Path
from types import TracebackType
from typing import List, Optional, Dict, Any, Type
from aiohttp import ClientSession
from typing_extensions import Self
from typing import List
from pyhon import HonAPI
from pyhon.appliance import HonAppliance
from pyhon.connection.api import HonAPI
from pyhon.connection.api import TestAPI
from pyhon.exceptions import NoAuthenticationException
_LOGGER = logging.getLogger(__name__)
class Hon:
def __init__(
email: Optional[str] = "",
password: Optional[str] = "",
session: Optional[ClientSession] = None,
test_data_path: Optional[Path] = None,
self._email: Optional[str] = email
self._password: Optional[str] = password
self._session: ClientSession | None = session
self._appliances: List[HonAppliance] = []
self._api: Optional[HonAPI] = None
self._test_data_path: Path = test_data_path or Path().cwd()
def __init__(self, email, password, session=None):
self._email = email
self._password = password
self._session = session
self._appliances = []
self._api = None
async def __aenter__(self) -> Self:
async def __aenter__(self):
return await self.create()
async def __aexit__(
exc_type: Optional[Type[BaseException]],
exc: Optional[BaseException],
traceback: Optional[TracebackType],
) -> None:
async def __aexit__(self, exc_type, exc_val, exc_tb):
await self.close()
def api(self) -> HonAPI:
if self._api is None:
raise NoAuthenticationException
return self._api
def email(self) -> str:
if not self._email:
raise ValueError("Missing email")
return self._email
def password(self) -> str:
if not self._password:
raise ValueError("Missing password")
return self._password
async def create(self) -> Self:
async def create(self):
self._api = await HonAPI(
||||, self.password, session=self._session
self._email, self._password, session=self._session
await self.setup()
return self
@ -70,17 +30,11 @@ class Hon:
def appliances(self) -> List[HonAppliance]:
return self._appliances
def appliances(self, appliances: List[HonAppliance]) -> None:
self._appliances = appliances
async def _create_appliance(
self, appliance_data: Dict[str, Any], api: HonAPI, zone: int = 0
) -> None:
appliance = HonAppliance(api, appliance_data, zone=zone)
if appliance.mac_address == "":
async def setup(self):
for appliance in (await self._api.load_appliances())["payload"]["appliances"]:
appliance = HonAppliance(self._api, appliance)
if appliance.mac_address is None:
await asyncio.gather(
@ -88,26 +42,7 @@ class Hon:
except (KeyError, ValueError, IndexError) as error:
_LOGGER.error("Device data - %s", appliance_data)
async def setup(self) -> None:
appliances = await self.api.load_appliances()
for appliance in appliances:
if (zones := int(appliance.get("zone", "0"))) > 1:
for zone in range(zones):
await self._create_appliance(
appliance.copy(), self.api, zone=zone + 1
await self._create_appliance(appliance, self.api)
if (
test_data := self._test_data_path / "hon-test-data" / "test_data"
).exists() or (test_data := test_data / "test_data").exists():
api = TestAPI(test_data)
for appliance in await api.load_appliances():
await self._create_appliance(appliance, api)
async def close(self) -> None:
await self.api.close()
async def close(self):
await self._api.close()
Normal file
Normal file
@ -0,0 +1,157 @@
import re
def str_to_float(string):
return int(string)
except ValueError:
return float(str(string).replace(",", "."))
class HonParameter:
def __init__(self, key, attributes):
self._key = key
self._category = attributes.get("category")
self._typology = attributes.get("typology")
self._mandatory = attributes.get("mandatory")
self._value = ""
def key(self):
return self._key
def value(self):
return self._value if self._value is not None else "0"
def category(self):
return self._category
def typology(self):
return self._typology
def mandatory(self):
return self._mandatory
class HonParameterFixed(HonParameter):
def __init__(self, key, attributes):
super().__init__(key, attributes)
self._value = attributes.get("fixedValue", None)
def __repr__(self):
return f"{self.__class__} (<{self.key}> fixed)"
def value(self):
return self._value if self._value is not None else "0"
def value(self, value):
if not value == self._value:
raise ValueError("Can't change fixed value")
class HonParameterRange(HonParameter):
def __init__(self, key, attributes):
super().__init__(key, attributes)
self._min = str_to_float(attributes["minimumValue"])
self._max = str_to_float(attributes["maximumValue"])
self._step = str_to_float(attributes["incrementValue"])
self._default = str_to_float(attributes.get("defaultValue", self._min))
self._value = self._default
def __repr__(self):
return f"{self.__class__} (<{self.key}> [{self._min} - {self._max}])"
def min(self):
return self._min
def max(self):
return self._max
def step(self):
return self._step
def value(self):
return self._value if self._value is not None else self._min
def value(self, value):
value = str_to_float(value)
if self._min <= value <= self._max and not value % self._step:
self._value = value
raise ValueError(
f"Allowed: min {self._min} max {self._max} step {self._step}"
class HonParameterEnum(HonParameter):
def __init__(self, key, attributes):
super().__init__(key, attributes)
self._default = attributes.get("defaultValue")
self._value = self._default or "0"
self._values = attributes.get("enumValues")
def __repr__(self):
return f"{self.__class__} (<{self.key}> {self.values})"
def values(self):
return [str(value) for value in self._values]
def value(self):
return self._value if self._value is not None else self.values[0]
def value(self, value):
if value in self.values:
self._value = value
raise ValueError(f"Allowed values {self._value}")
class HonParameterProgram(HonParameterEnum):
def __init__(self, key, command):
super().__init__(key, {})
self._command = command
self._value = command._program
self._values = command._multi
self._typology = "enum"
self._filter = ""
def value(self):
return self._value
def value(self, value):
if value in self.values:
raise ValueError(f"Allowed values {self._values}")
def filter(self):
return self._filter
def filter(self, filter):
self._filter = filter
def values(self):
values = []
for value in self._values:
if not self._filter or re.findall(self._filter, str(value)):
return sorted(values)
@ -1,98 +0,0 @@
from typing import Dict, Any, List, Tuple, Callable, TYPE_CHECKING
from pyhon.rules import HonRule
class HonParameter:
def __init__(self, key: str, attributes: Dict[str, Any], group: str) -> None:
self._key = key
self._attributes = attributes
self._category: str = ""
self._typology: str = ""
self._mandatory: int = 0
self._value: str | float = ""
self._group: str = group
self._triggers: Dict[
str, List[Tuple[Callable[["HonRule"], None], "HonRule"]]
] = {}
def _set_attributes(self) -> None:
self._category = self._attributes.get("category", "")
self._typology = self._attributes.get("typology", "")
self._mandatory = self._attributes.get("mandatory", 0)
def key(self) -> str:
return self._key
def value(self) -> str | float:
return self._value if self._value is not None else "0"
def value(self, value: str | float) -> None:
self._value = value
def intern_value(self) -> str:
return str(self.value)
def values(self) -> List[str]:
return [str(self.value)]
def category(self) -> str:
return self._category
def typology(self) -> str:
return self._typology
def mandatory(self) -> int:
return self._mandatory
def group(self) -> str:
return self._group
def add_trigger(
self, value: str, func: Callable[["HonRule"], None], data: "HonRule"
) -> None:
if self._value == value:
self._triggers.setdefault(value, []).append((func, data))
def check_trigger(self, value: str | float) -> None:
triggers = {str(k).lower(): v for k, v in self._triggers.items()}
if str(value).lower() in triggers:
for trigger in triggers[str(value)]:
func, args = trigger
def triggers(self) -> Dict[str, Any]:
result: Dict[str, Any] = {}
for value, rules in self._triggers.items():
for _, rule in rules:
if rule.extras:
param = result.setdefault(value, {})
for extra_key, extra_value in rule.extras.items():
param = param.setdefault(extra_key, {}).setdefault(
extra_value, {}
param = result.setdefault(value, {})
if fixed_value := rule.param_data.get("fixedValue"):
param[rule.param_key] = fixed_value
param[rule.param_key] = rule.param_data.get("defaultValue", "")
return result
def reset(self) -> None:
@ -1,51 +0,0 @@
from typing import Dict, Any, List
from pyhon.parameter.base import HonParameter
def clean_value(value: str | float) -> str:
return str(value).strip("[]").replace("|", "_").lower()
class HonParameterEnum(HonParameter):
def __init__(self, key: str, attributes: Dict[str, Any], group: str) -> None:
super().__init__(key, attributes, group)
self._default: str | float = ""
self._value: str | float = ""
self._values: List[str] = []
if self._default and clean_value(self._default.strip("[]")) not in self.values:
def _set_attributes(self) -> None:
self._default = self._attributes.get("defaultValue", "")
self._value = self._default or "0"
self._values = self._attributes.get("enumValues", [])
def __repr__(self) -> str:
return f"{self.__class__} (<{self.key}> {self.values})"
def values(self) -> List[str]:
return [clean_value(value) for value in self._values]
def values(self, values: List[str]) -> None:
self._values = values
def intern_value(self) -> str:
return str(self._value) if self._value is not None else str(self.values[0])
def value(self) -> str | float:
return clean_value(self._value) if self._value is not None else self.values[0]
def value(self, value: str) -> None:
if value in self.values:
self._value = value
raise ValueError(f"Allowed values: {self._values} But was: {value}")
@ -1,27 +0,0 @@
from typing import Dict, Any
from pyhon.parameter.base import HonParameter
class HonParameterFixed(HonParameter):
def __init__(self, key: str, attributes: Dict[str, Any], group: str) -> None:
super().__init__(key, attributes, group)
self._value: str | float = ""
def _set_attributes(self) -> None:
self._value = self._attributes.get("fixedValue", "")
def __repr__(self) -> str:
return f"{self.__class__} (<{self.key}> fixed)"
def value(self) -> str | float:
return self._value if self._value != "" else "0"
def value(self, value: str | float) -> None:
# Fixed values seems being not so fixed as thought
self._value = value
@ -1,56 +0,0 @@
from typing import List, TYPE_CHECKING, Dict
from pyhon.parameter.enum import HonParameterEnum
from pyhon.commands import HonCommand
class HonParameterProgram(HonParameterEnum):
_FILTER = ["iot_recipe", "iot_guided"]
def __init__(self, key: str, command: "HonCommand", group: str) -> None:
super().__init__(key, {}, group)
self._command = command
if "PROGRAM" in command.category:
self._value = command.category.split(".")[-1].lower()
self._value = command.category
self._programs: Dict[str, "HonCommand"] = command.categories
self._typology: str = "enum"
def value(self) -> str | float:
return self._value
def value(self, value: str) -> None:
if value in self.values:
self._command.category = value
raise ValueError(f"Allowed values: {self.values} But was: {value}")
def values(self) -> List[str]:
values = [v for v in self._programs if all(f not in v for f in self._FILTER)]
return sorted(values)
def values(self, values: List[str]) -> None:
raise ValueError("Cant set values {values}")
def ids(self) -> Dict[int, str]:
values: Dict[int, str] = {}
for name, parameter in self._programs.items():
if "iot_" in name:
if parameter.parameters.get("prCode"):
if (fav := parameter.parameters.get("favourite")) and fav.value == "1":
values[int(parameter.parameters["prCode"].value)] = name
return dict(sorted(values.items()))
def set_value(self, value: str) -> None:
self._value = value
@ -1,72 +0,0 @@
from typing import Dict, Any, List
from pyhon.helper import str_to_float
from pyhon.parameter.base import HonParameter
class HonParameterRange(HonParameter):
def __init__(self, key: str, attributes: Dict[str, Any], group: str) -> None:
super().__init__(key, attributes, group)
self._min: float = 0
self._max: float = 0
self._step: float = 0
self._default: float = 0
self._value: float = 0
def _set_attributes(self) -> None:
self._min = str_to_float(self._attributes.get("minimumValue", 0))
self._max = str_to_float(self._attributes.get("maximumValue", 0))
self._step = str_to_float(self._attributes.get("incrementValue", 0))
self._default = str_to_float(self._attributes.get("defaultValue", self.min))
self._value = self._default
def __repr__(self) -> str:
return f"{self.__class__} (<{self.key}> [{self.min} - {self.max}])"
def min(self) -> float:
return self._min
def min(self, mini: float) -> None:
self._min = mini
def max(self) -> float:
return self._max
def max(self, maxi: float) -> None:
self._max = maxi
def step(self) -> float:
if not self._step:
return 1
return self._step
def step(self, step: float) -> None:
self._step = step
def value(self) -> str | float:
return self._value if self._value is not None else self.min
def value(self, value: str | float) -> None:
value = str_to_float(value)
if self.min <= value <= self.max and not ((value - self.min) * 100) % (
self.step * 100
self._value = value
allowed = f"min {self.min} max {self.max} step {self.step}"
raise ValueError(f"Allowed: {allowed} But was: {value}")
def values(self) -> List[str]:
return [str(i) for i in range(int(self.min), int(self.max) + 1, int(self.step))]
@ -1,87 +0,0 @@
from typing import Dict, Any, TYPE_CHECKING, List
from pyhon.parameter.enum import HonParameterEnum
from pyhon.parameter.range import HonParameterRange
from pyhon.commands import HonCommand
def key_print(data: Any, key: str = "", start: bool = True) -> str:
result = ""
if isinstance(data, list):
for i, value in enumerate(data):
result += key_print(value, key=f"{key}.{i}", start=False)
elif isinstance(data, dict):
for k, value in sorted(data.items()):
result += key_print(value, key=k if start else f"{key}.{k}", start=False)
result += f"{key}: {data}\n"
return result
# yaml.dump() would be done the same, but needs an additional dependency...
def pretty_print(
data: Any,
key: str = "",
intend: int = 0,
is_list: bool = False,
whitespace: str = " ",
) -> str:
result = ""
space = whitespace * intend
if isinstance(data, (dict, list)) and key:
result += f"{space}{'- ' if is_list else ''}{key}:\n"
intend += 1
if isinstance(data, list):
for i, value in enumerate(data):
result += pretty_print(
value, intend=intend, is_list=True, whitespace=whitespace
elif isinstance(data, dict):
for i, (list_key, value) in enumerate(sorted(data.items())):
result += pretty_print(
intend=intend + (is_list if i else 0),
is_list=is_list and not i,
result += f"{space}{'- ' if is_list else ''}{key}{': ' if key else ''}{data}\n"
return result
def create_commands(
commands: Dict[str, "HonCommand"], concat: bool = False
) -> Dict[str, Any]:
result: Dict[str, Any] = {}
for name, command in commands.items():
for parameter, data in command.available_settings.items():
if isinstance(data, HonParameterEnum):
value: List[str] | Dict[str, str | float] = data.values
elif isinstance(data, HonParameterRange):
value = {"min": data.min, "max": data.max, "step": data.step}
if not concat:
result.setdefault(name, {})[parameter] = value
result[f"{name}.{parameter}"] = value
return result
def create_rules(
commands: Dict[str, "HonCommand"], concat: bool = False
) -> Dict[str, Any]:
result: Dict[str, Any] = {}
for name, command in commands.items():
for parameter, data in command.available_settings.items():
value = data.triggers
if not value:
if not concat:
result.setdefault(name, {})[parameter] = value
result[f"{name}.{parameter}"] = value
return result
@ -1,145 +0,0 @@
from dataclasses import dataclass
from typing import List, Dict, TYPE_CHECKING, Any, Optional
from pyhon.parameter.enum import HonParameterEnum
from pyhon.parameter.range import HonParameterRange
from pyhon.typedefs import Parameter
from pyhon.commands import HonCommand
from pyhon.parameter.base import HonParameter
class HonRule:
trigger_key: str
trigger_value: str
param_key: str
param_data: Dict[str, Any]
extras: Optional[Dict[str, str]] = None
class HonRuleSet:
def __init__(self, command: "HonCommand", rule: Dict[str, Any]):
self._command: "HonCommand" = command
self._rules: Dict[str, List[HonRule]] = {}
def rules(self) -> Dict[str, List[HonRule]]:
return self._rules
def _parse_rule(self, rule: Dict[str, Any]) -> None:
for param_key, params in rule.items():
param_key = self._command.appliance.options.get(param_key, param_key)
for trigger_key, trigger_data in params.items():
self._parse_conditions(param_key, trigger_key, trigger_data)
def _parse_conditions(
param_key: str,
trigger_key: str,
trigger_data: Dict[str, Any],
extra: Optional[Dict[str, str]] = None,
) -> None:
trigger_key = trigger_key.replace("@", "")
trigger_key = self._command.appliance.options.get(trigger_key, trigger_key)
for multi_trigger_value, param_data in trigger_data.items():
for trigger_value in multi_trigger_value.split("|"):
if isinstance(param_data, dict) and "typology" in param_data:
param_key, trigger_key, trigger_value, param_data, extra
elif isinstance(param_data, dict):
if extra is None:
extra = {}
extra[trigger_key] = trigger_value
for extra_key, extra_data in param_data.items():
self._parse_conditions(param_key, extra_key, extra_data, extra)
param_data = {"typology": "fixed", "fixedValue": param_data}
param_key, trigger_key, trigger_value, param_data, extra
def _create_rule(
param_key: str,
trigger_key: str,
trigger_value: str,
param_data: Dict[str, Any],
extras: Optional[Dict[str, str]] = None,
) -> None:
if param_data.get("fixedValue") == f"@{param_key}":
self._rules.setdefault(trigger_key, []).append(
HonRule(trigger_key, trigger_value, param_key, param_data, extras)
def _duplicate_for_extra_conditions(self) -> None:
new: Dict[str, List[HonRule]] = {}
for rules in self._rules.values():
for rule in rules:
if rule.extras is None:
for key, value in rule.extras.items():
extras = rule.extras.copy()
extras[rule.trigger_key] = rule.trigger_value
new.setdefault(key, []).append(
HonRule(key, value, rule.param_key, rule.param_data, extras)
for key, rules in new.items():
for rule in rules:
self._rules.setdefault(key, []).append(rule)
def _extra_rules_matches(self, rule: HonRule) -> bool:
if rule.extras:
for key, value in rule.extras.items():
if not self._command.parameters.get(key):
return False
if str(self._command.parameters.get(key)) != str(value):
return False
return True
def _apply_fixed(self, param: Parameter, value: str | float) -> None:
if isinstance(param, HonParameterEnum) and set(param.values) != {str(value)}:
param.values = [str(value)]
param.value = str(value)
elif isinstance(param, HonParameterRange):
if float(value) < param.min:
param.min = float(value)
elif float(value) > param.max:
param.max = float(value)
param.value = float(value)
param.value = str(value)
def _apply_enum(self, param: Parameter, rule: HonRule) -> None:
if not isinstance(param, HonParameterEnum):
if enum_values := rule.param_data.get("enumValues"):
param.values = enum_values.split("|")
if default_value := rule.param_data.get("defaultValue"):
param.value = default_value
def _add_trigger(self, parameter: "HonParameter", data: HonRule) -> None:
def apply(rule: HonRule) -> None:
if not self._extra_rules_matches(rule):
if not (param := self._command.parameters.get(rule.param_key)):
if fixed_value := rule.param_data.get("fixedValue", ""):
self._apply_fixed(param, fixed_value)
elif rule.param_data.get("typology") == "enum":
self._apply_enum(param, rule)
parameter.add_trigger(data.trigger_value, apply, data)
def patch(self) -> None:
for name, parameter in self._command.parameters.items():
if name not in self._rules:
for data in self._rules.get(name, []):
self._add_trigger(parameter, data)
@ -1,27 +0,0 @@
from typing import Union, Any, TYPE_CHECKING, Protocol
import aiohttp
from yarl import URL
from pyhon.parameter.base import HonParameter
from pyhon.parameter.enum import HonParameterEnum
from pyhon.parameter.fixed import HonParameterFixed
from pyhon.parameter.program import HonParameterProgram
from pyhon.parameter.range import HonParameterRange
class Callback(Protocol): # pylint: disable=too-few-public-methods
def __call__(
self, url: str | URL, *args: Any, **kwargs: Any
) -> aiohttp.client._RequestContextManager:
Parameter = Union[
@ -1,3 +1 @@
@ -1,5 +0,0 @@
@ -2,12 +2,12 @@
from setuptools import setup, find_packages
with open("", "r", encoding="utf-8") as f:
with open("", "r") as f:
long_description =
author="Andre Basche",
description="Control hOn devices with python",
@ -21,7 +21,7 @@ setup(
install_requires=["aiohttp>=3.8", "typing-extensions>=4.8", "yarl>=1.8"],
"Development Status :: 4 - Beta",
"Environment :: Console",
@ -30,7 +30,6 @@ setup(
"Operating System :: OS Independent",
"Programming Language :: Python :: 3.10",
"Programming Language :: Python :: 3.11",
"Programming Language :: Python :: 3.12",
"Topic :: Software Development :: Libraries :: Python Modules",
Reference in New Issue
Block a user