Compare commits

..

No commits in common. "main" and "v0.14.1" have entirely different histories.

42 changed files with 470 additions and 913 deletions

View File

@ -1,3 +0,0 @@
[flake8]
max-line-length = 88
max-complexity = 7

View File

@ -13,7 +13,7 @@ jobs:
strategy: strategy:
fail-fast: false fail-fast: false
matrix: matrix:
python-version: ["3.10", "3.11", "3.12"] python-version: ["3.10", "3.11"]
steps: steps:
- uses: actions/checkout@v3 - uses: actions/checkout@v3
@ -28,13 +28,15 @@ jobs:
python -m pip install -r requirements_dev.txt python -m pip install -r requirements_dev.txt
- name: Lint with flake8 - name: Lint with flake8
run: | run: |
flake8 . --count --statistics # stop the build if there are Python syntax errors or undefined names
flake8 . --count --select=E9,F63,F7,F82 --show-source --statistics
flake8 . --count --exit-zero --max-complexity=10 --max-line-length=88 --statistics
- name: Type check with mypy - name: Type check with mypy
run: | run: |
mypy pyhon/ mypy pyhon/
- name: Analysing the code with pylint # - name: Analysing the code with pylint
run: | # run: |
pylint $(git ls-files '*.py') # pylint --max-line-length 88 $(git ls-files '*.py')
- name: Check black style - name: Check black style
run: | run: |
black . --check black . --check

View File

@ -1,9 +0,0 @@
[MESSAGES CONTROL]
disable=missing-docstring
[FORMAT]
max-args=6
max-attributes=8
max-line-length=88

View File

@ -1 +0,0 @@
include pyhon/py.typed

View File

@ -1,14 +1,3 @@
# Announcement: I have to take the project down in the next few days
> Dear User,
>
> We are writing to inform you that we have discovered two Home Assistant integration plug-ins developed by you ( https://github.com/Andre0512/hon and https://github.com/Andre0512/pyhOn ) that are in violation of our terms of service. Specifically, the plug-ins are using our services in an unauthorized manner which is causing significant economic harm to our Company.
> We take the protection of our intellectual property very seriously and demand that you immediately cease and desist all illegal activities related to the development and distribution of these plug-ins. We also request that you remove the plug-ins from all stores and code hosting platforms where they are currently available.
> Please be advised that we will take all necessary legal action to protect our interests if you fail to comply with this notice. We reserve the right to pursue all available remedies, including but not limited to monetary damages, injunctive relief, and attorney's fees.
> We strongly urge you to take immediate action to rectify this situation and avoid any further legal action. If you have any questions or concerns, please do not hesitate to contact us.
>
> Haier Europe Security and Governance Department
**This python package is unofficial and is not related in any way to Haier. It was developed by reversed engineered requests and can stop working at anytime!** **This python package is unofficial and is not related in any way to Haier. It was developed by reversed engineered requests and can stop working at anytime!**
# pyhOn # pyhOn

View File

@ -1,24 +0,0 @@
[mypy]
check_untyped_defs = true
disallow_any_generics = true
disallow_any_unimported = true
disallow_incomplete_defs = true
disallow_subclassing_any = true
disallow_untyped_calls = true
disallow_untyped_decorators = true
disallow_untyped_defs = true
disable_error_code = annotation-unchecked
enable_error_code = ignore-without-code, redundant-self, truthy-iterable
follow_imports = silent
local_partial_types = true
no_implicit_optional = true
no_implicit_reexport = true
show_error_codes = true
strict_concatenate = false
strict_equality = true
warn_incomplete_stub = true
warn_redundant_casts = true
warn_return_any = true
warn_unreachable = true
warn_unused_configs = true
warn_unused_ignores = true

View File

@ -6,18 +6,16 @@ import logging
import sys import sys
from getpass import getpass from getpass import getpass
from pathlib import Path from pathlib import Path
from typing import Tuple, Dict, Any
if __name__ == "__main__": if __name__ == "__main__":
sys.path.insert(0, str(Path(__file__).parent.parent)) sys.path.insert(0, str(Path(__file__).parent.parent))
# pylint: disable=wrong-import-position from pyhon import Hon, HonAPI, helper, diagnose
from pyhon import Hon, HonAPI, diagnose, printer
_LOGGER = logging.getLogger(__name__) _LOGGER = logging.getLogger(__name__)
def get_arguments() -> Dict[str, Any]: def get_arguments():
"""Get parsed arguments.""" """Get parsed arguments."""
parser = argparse.ArgumentParser(description="pyhOn: Command Line Utility") parser = argparse.ArgumentParser(description="pyhOn: Command Line Utility")
parser.add_argument("-u", "--user", help="user for haier hOn account") parser.add_argument("-u", "--user", help="user for haier hOn account")
@ -31,20 +29,17 @@ def get_arguments() -> Dict[str, Any]:
export.add_argument("--zip", help="create zip archive", action="store_true") export.add_argument("--zip", help="create zip archive", action="store_true")
export.add_argument("--anonymous", help="anonymize data", action="store_true") export.add_argument("--anonymous", help="anonymize data", action="store_true")
export.add_argument("directory", nargs="?", default=Path().cwd()) export.add_argument("directory", nargs="?", default=Path().cwd())
translation = subparser.add_parser( translate = subparser.add_parser(
"translate", help="print available translation keys" "translate", help="print available translation keys"
) )
translation.add_argument( translate.add_argument(
"translate", help="language (de, en, fr...)", metavar="LANGUAGE" "translate", help="language (de, en, fr...)", metavar="LANGUAGE"
) )
translation.add_argument("--json", help="print as json", action="store_true") translate.add_argument("--json", help="print as json", action="store_true")
parser.add_argument(
"-i", "--import", help="import pyhon data", nargs="?", default=Path().cwd()
)
return vars(parser.parse_args()) return vars(parser.parse_args())
async def translate(language: str, json_output: bool = False) -> None: async def translate(language, json_output=False):
async with HonAPI(anonymous=True) as hon: async with HonAPI(anonymous=True) as hon:
keys = await hon.translation_keys(language) keys = await hon.translation_keys(language)
if json_output: if json_output:
@ -57,10 +52,10 @@ async def translate(language: str, json_output: bool = False) -> None:
.replace("\\r", "") .replace("\\r", "")
) )
keys = json.loads(clean_keys) keys = json.loads(clean_keys)
print(printer.pretty_print(keys)) print(helper.pretty_print(keys))
def get_login_data(args: Dict[str, str]) -> Tuple[str, str]: def get_login_data(args):
if not (user := args["user"]): if not (user := args["user"]):
user = input("User for hOn account: ") user = input("User for hOn account: ")
if not (password := args["password"]): if not (password := args["password"]):
@ -68,44 +63,44 @@ def get_login_data(args: Dict[str, str]) -> Tuple[str, str]:
return user, password return user, password
async def main() -> None: async def main():
args = get_arguments() args = get_arguments()
if language := args.get("translate"): if language := args.get("translate"):
await translate(language, json_output=args.get("json", "")) await translate(language, json_output=args.get("json"))
return return
async with Hon( async with Hon(*get_login_data(args)) as hon:
*get_login_data(args), test_data_path=Path(args.get("import", ""))
) as hon:
for device in hon.appliances: for device in hon.appliances:
if args.get("export"): if args.get("export"):
anonymous = args.get("anonymous", False) anonymous = args.get("anonymous", False)
path = Path(args.get("directory", ".")) path = Path(args.get("directory"))
if not args.get("zip"): if not args.get("zip"):
for file in await diagnose.appliance_data(device, path, anonymous): for file in await diagnose.appliance_data(device, path, anonymous):
print(f"Created {file}") print(f"Created {file}")
else: else:
archive = await diagnose.zip_archive(device, path, anonymous) file = await diagnose.zip_archive(device, path, anonymous)
print(f"Created {archive}") print(f"Created {file}")
continue continue
print("=" * 10, device.appliance_type, "-", device.nick_name, "=" * 10) print("=" * 10, device.appliance_type, "-", device.nick_name, "=" * 10)
if args.get("keys"): if args.get("keys"):
data = device.data.copy() data = device.data.copy()
attr = "get" if args.get("all") else "pop" attr = "get" if args.get("all") else "pop"
print( print(
printer.key_print(getattr(data["attributes"], attr)("parameters")) helper.key_print(
data["attributes"].__getattribute__(attr)("parameters")
) )
print(printer.key_print(getattr(data, attr)("appliance"))) )
print(printer.key_print(data)) print(helper.key_print(data.__getattribute__(attr)("appliance")))
print(helper.key_print(data))
print( print(
printer.pretty_print( helper.pretty_print(
printer.create_commands(device.commands, concat=True) helper.create_command(device.commands, concat=True)
) )
) )
else: else:
print(diagnose.yaml_export(device)) print(diagnose.yaml_export(device))
def start() -> None: def start():
try: try:
asyncio.run(main()) asyncio.run(main())
except KeyboardInterrupt: except KeyboardInterrupt:

View File

@ -1,29 +1,22 @@
import importlib import importlib
import logging import logging
import re
from datetime import datetime, timedelta from datetime import datetime, timedelta
from pathlib import Path from pathlib import Path
from typing import Optional, Dict, Any, TYPE_CHECKING, List, TypeVar, overload from typing import Optional, Dict, Any, TYPE_CHECKING
from pyhon import diagnose, exceptions from pyhon import diagnose
from pyhon.appliances.base import ApplianceBase
from pyhon.attributes import HonAttribute from pyhon.attributes import HonAttribute
from pyhon.command_loader import HonCommandLoader from pyhon.command_loader import HonCommandLoader
from pyhon.commands import HonCommand from pyhon.commands import HonCommand
from pyhon.parameter.base import HonParameter from pyhon.parameter.base import HonParameter
from pyhon.parameter.enum import HonParameterEnum
from pyhon.parameter.range import HonParameterRange from pyhon.parameter.range import HonParameterRange
from pyhon.typedefs import Parameter
if TYPE_CHECKING: if TYPE_CHECKING:
from pyhon import HonAPI from pyhon import HonAPI
_LOGGER = logging.getLogger(__name__) _LOGGER = logging.getLogger(__name__)
T = TypeVar("T")
# pylint: disable=too-many-public-methods,too-many-instance-attributes
class HonAppliance: class HonAppliance:
_MINIMAL_UPDATE_INTERVAL = 5 # seconds _MINIMAL_UPDATE_INTERVAL = 5 # seconds
@ -32,85 +25,69 @@ class HonAppliance:
) -> None: ) -> None:
if attributes := info.get("attributes"): if attributes := info.get("attributes"):
info["attributes"] = {v["parName"]: v["parValue"] for v in attributes} info["attributes"] = {v["parName"]: v["parValue"] for v in attributes}
self._info: Dict[str, Any] = info self._info: Dict = info
self._api: Optional[HonAPI] = api self._api: Optional[HonAPI] = api
self._appliance_model: Dict[str, Any] = {} self._appliance_model: Dict = {}
self._commands: Dict[str, HonCommand] = {} self._commands: Dict[str, HonCommand] = {}
self._statistics: Dict[str, Any] = {} self._statistics: Dict = {}
self._attributes: Dict[str, Any] = {} self._attributes: Dict = {}
self._zone: int = zone self._zone: int = zone
self._additional_data: Dict[str, Any] = {} self._additional_data: Dict[str, Any] = {}
self._last_update: Optional[datetime] = None self._last_update = None
self._default_setting = HonParameter("", {}, "") self._default_setting = HonParameter("", {}, "")
try: try:
self._extra: Optional[ApplianceBase] = importlib.import_module( self._extra = importlib.import_module(
f"pyhon.appliances.{self.appliance_type.lower()}" f"pyhon.appliances.{self.appliance_type.lower()}"
).Appliance(self) ).Appliance(self)
except ModuleNotFoundError: except ModuleNotFoundError:
self._extra = None self._extra = None
def _get_nested_item(self, item: str) -> Any: def __getitem__(self, item):
result: List[Any] | Dict[str, Any] = self.data
for key in item.split("."):
if all(k in "0123456789" for k in key) and isinstance(result, list):
result = result[int(key)]
elif isinstance(result, dict):
result = result[key]
return result
def __getitem__(self, item: str) -> Any:
if self._zone: if self._zone:
item += f"Z{self._zone}" item += f"Z{self._zone}"
if "." in item: if "." in item:
return self._get_nested_item(item) result = self.data
for key in item.split("."):
if all(k in "0123456789" for k in key) and isinstance(result, list):
result = result[int(key)]
else:
result = result[key]
return result
if item in self.data: if item in self.data:
return self.data[item] return self.data[item]
if item in self.attributes["parameters"]: if item in self.attributes["parameters"]:
return self.attributes["parameters"][item].value return self.attributes["parameters"][item].value
return self.info[item] return self.info[item]
@overload def get(self, item, default=None):
def get(self, item: str, default: None = None) -> Any:
...
@overload
def get(self, item: str, default: T) -> T:
...
def get(self, item: str, default: Optional[T] = None) -> Any:
try: try:
return self[item] return self[item]
except (KeyError, IndexError): except (KeyError, IndexError):
return default return default
def _check_name_zone(self, name: str, frontend: bool = True) -> str: def _check_name_zone(self, name: str, frontend: bool = True) -> str:
zone = " Z" if frontend else "_z" middle = " Z" if frontend else "_z"
attribute: str = self._info.get(name, "") if (attribute := self._info.get(name, "")) and self._zone:
if attribute and self._zone: return f"{attribute}{middle}{self._zone}"
return f"{attribute}{zone}{self._zone}"
return attribute return attribute
@property @property
def appliance_model_id(self) -> str: def appliance_model_id(self) -> str:
return str(self._info.get("applianceModelId", "")) return self._info.get("applianceModelId", "")
@property @property
def appliance_type(self) -> str: def appliance_type(self) -> str:
return str(self._info.get("applianceTypeName", "")) return self._info.get("applianceTypeName", "")
@property @property
def mac_address(self) -> str: def mac_address(self) -> str:
return str(self.info.get("macAddress", "")) return self.info.get("macAddress", "")
@property @property
def unique_id(self) -> str: def unique_id(self) -> str:
default_mac = "xx-xx-xx-xx-xx-xx" return self._check_name_zone("macAddress", frontend=False)
import_name = f"{self.appliance_type.lower()}_{self.appliance_model_id}"
result = self._check_name_zone("macAddress", frontend=False)
result = result.replace(default_mac, import_name)
return result
@property @property
def model_name(self) -> str: def model_name(self) -> str:
@ -118,50 +95,45 @@ class HonAppliance:
@property @property
def brand(self) -> str: def brand(self) -> str:
brand = self._check_name_zone("brand") return self._check_name_zone("brand")
return brand[0].upper() + brand[1:]
@property @property
def nick_name(self) -> str: def nick_name(self) -> str:
result = self._check_name_zone("nickName") return self._check_name_zone("nickName")
if not result or re.findall("^[xX1\\s-]+$", result):
return self.model_name
return result
@property @property
def code(self) -> str: def code(self) -> str:
code: str = self.info.get("code", "") if code := self.info.get("code"):
if code:
return code return code
serial_number: str = self.info.get("serialNumber", "") serial_number = self.info.get("serialNumber", "")
return serial_number[:8] if len(serial_number) < 18 else serial_number[:11] return serial_number[:8] if len(serial_number) < 18 else serial_number[:11]
@property @property
def model_id(self) -> int: def model_id(self) -> int:
return int(self._info.get("applianceModelId", 0)) return self._info.get("applianceModelId", 0)
@property @property
def options(self) -> Dict[str, Any]: def options(self):
return dict(self._appliance_model.get("options", {})) return self._appliance_model.get("options", {})
@property @property
def commands(self) -> Dict[str, HonCommand]: def commands(self) -> Dict[str, HonCommand]:
return self._commands return self._commands
@property @property
def attributes(self) -> Dict[str, Any]: def attributes(self):
return self._attributes return self._attributes
@property @property
def statistics(self) -> Dict[str, Any]: def statistics(self):
return self._statistics return self._statistics
@property @property
def info(self) -> Dict[str, Any]: def info(self):
return self._info return self._info
@property @property
def additional_data(self) -> Dict[str, Any]: def additional_data(self):
return self._additional_data return self._additional_data
@property @property
@ -169,52 +141,50 @@ class HonAppliance:
return self._zone return self._zone
@property @property
def api(self) -> "HonAPI": def api(self) -> Optional["HonAPI"]:
"""api connection object"""
if self._api is None:
raise exceptions.NoAuthenticationException("Missing hOn login")
return self._api return self._api
async def load_commands(self) -> None: async def load_commands(self):
command_loader = HonCommandLoader(self.api, self) command_loader = HonCommandLoader(self.api, self)
await command_loader.load_commands() await command_loader.load_commands()
self._commands = command_loader.commands self._commands = command_loader.commands
self._additional_data = command_loader.additional_data self._additional_data = command_loader.additional_data
self._appliance_model = command_loader.appliance_data self._appliance_model = command_loader.appliance_data
self.sync_params_to_command("settings")
async def load_attributes(self) -> None: async def load_attributes(self):
attributes = await self.api.load_attributes(self) self._attributes = await self.api.load_attributes(self)
for name, values in attributes.pop("shadow", {}).get("parameters", {}).items(): for name, values in self._attributes.pop("shadow").get("parameters").items():
if name in self._attributes.get("parameters", {}): if name in self._attributes.get("parameters", {}):
self._attributes["parameters"][name].update(values) self._attributes["parameters"][name].update(values)
else: else:
self._attributes.setdefault("parameters", {})[name] = HonAttribute( self._attributes.setdefault("parameters", {})[name] = HonAttribute(
values values
) )
self._attributes |= attributes
if self._extra: if self._extra:
self._attributes = self._extra.attributes(self._attributes) self._attributes = self._extra.attributes(self._attributes)
async def load_statistics(self) -> None: async def load_statistics(self):
self._statistics = await self.api.load_statistics(self) self._statistics = await self.api.load_statistics(self)
self._statistics |= await self.api.load_maintenance(self) self._statistics |= await self.api.load_maintenance(self)
async def update(self, force: bool = False) -> None: async def update(self, force=False):
now = datetime.now() now = datetime.now()
min_age = now - timedelta(seconds=self._MINIMAL_UPDATE_INTERVAL) if (
if force or not self._last_update or self._last_update < min_age: force
or not self._last_update
or self._last_update
< now - timedelta(seconds=self._MINIMAL_UPDATE_INTERVAL)
):
self._last_update = now self._last_update = now
await self.load_attributes() await self.load_attributes()
self.sync_params_to_command("settings")
@property @property
def command_parameters(self) -> Dict[str, Dict[str, str | float]]: def command_parameters(self):
return {n: c.parameter_value for n, c in self._commands.items()} return {n: c.parameter_value for n, c in self._commands.items()}
@property @property
def settings(self) -> Dict[str, Parameter]: def settings(self):
result: Dict[str, Parameter] = {} result = {}
for name, command in self._commands.items(): for name, command in self._commands.items():
for key in command.setting_keys: for key in command.setting_keys:
setting = command.settings.get(key, self._default_setting) setting = command.settings.get(key, self._default_setting)
@ -224,7 +194,7 @@ class HonAppliance:
return result return result
@property @property
def available_settings(self) -> List[str]: def available_settings(self):
result = [] result = []
for name, command in self._commands.items(): for name, command in self._commands.items():
for key in command.setting_keys: for key in command.setting_keys:
@ -232,7 +202,7 @@ class HonAppliance:
return result return result
@property @property
def data(self) -> Dict[str, Any]: def data(self):
result = { result = {
"attributes": self.attributes, "attributes": self.attributes,
"appliance": self.info, "appliance": self.info,
@ -250,67 +220,31 @@ class HonAppliance:
async def data_archive(self, path: Path) -> str: async def data_archive(self, path: Path) -> str:
return await diagnose.zip_archive(self, path, anonymous=True) return await diagnose.zip_archive(self, path, anonymous=True)
def sync_command_to_params(self, command_name: str) -> None: def sync_to_params(self, command_name):
if not (command := self.commands.get(command_name)): command: HonCommand = self.commands.get(command_name)
return for key, value in self.attributes.get("parameters", {}).items():
for key in self.attributes.get("parameters", {}): if isinstance(value, str) and (new := command.parameters.get(key)):
if new := command.parameters.get(key):
self.attributes["parameters"][key].update( self.attributes["parameters"][key].update(
str(new.intern_value), shield=True str(new.intern_value), shield=True
) )
def sync_params_to_command(self, command_name: str) -> None: def sync_command(self, main, target=None) -> None:
if not (command := self.commands.get(command_name)):
return
for key in command.setting_keys:
if (
new := self.attributes.get("parameters", {}).get(key)
) is None or new.value == "":
continue
setting = command.settings[key]
try:
if not isinstance(setting, HonParameterRange):
command.settings[key].value = str(new.value)
else:
command.settings[key].value = float(new.value)
except ValueError as error:
_LOGGER.info("Can't set %s - %s", key, error)
continue
def sync_command(
self,
main: str,
target: Optional[List[str] | str] = None,
to_sync: Optional[List[str] | bool] = None,
) -> None:
base: Optional[HonCommand] = self.commands.get(main) base: Optional[HonCommand] = self.commands.get(main)
if not base: if not base:
return return
for command, data in self.commands.items(): for command, data in self.commands.items():
if command == main or target and command not in target: if command == main or target and command not in target:
continue continue
for name, parameter in data.parameters.items():
for name, target_param in data.parameters.items(): if base_value := base.parameters.get(name):
if not (base_param := base.parameters.get(name)): if isinstance(base_value, HonParameterRange) and isinstance(
continue parameter, HonParameterRange
if to_sync and (
(isinstance(to_sync, list) and name not in to_sync)
or not base_param.mandatory
): ):
continue parameter.max = base_value.max
self.sync_parameter(base_param, target_param) parameter.min = base_value.min
parameter.step = base_value.step
def sync_parameter(self, main: Parameter, target: Parameter) -> None: elif isinstance(parameter, HonParameterRange):
if isinstance(main, HonParameterRange) and isinstance( parameter.max = int(base_value.value)
target, HonParameterRange parameter.min = int(base_value.value)
): parameter.step = 1
target.max = main.max parameter.value = base_value.value
target.min = main.min
target.step = main.step
elif isinstance(target, HonParameterRange):
target.max = int(main.value)
target.min = int(main.value)
target.step = 1
elif isinstance(target, HonParameterEnum):
target.values = main.values
target.value = main.value

View File

@ -1,25 +1,15 @@
from typing import Dict, Any, TYPE_CHECKING
from pyhon.parameter.program import HonParameterProgram
if TYPE_CHECKING:
from pyhon.appliance import HonAppliance
class ApplianceBase: class ApplianceBase:
def __init__(self, appliance: "HonAppliance"): def __init__(self, appliance):
self.parent = appliance self.parent = appliance
def attributes(self, data: Dict[str, Any]) -> Dict[str, Any]: def attributes(self, data):
program_name = "No Program" program_name = "No Program"
if program := int(str(data.get("parameters", {}).get("prCode", "0"))): if program := int(str(data.get("parameters", {}).get("prCode", "0"))):
if start_cmd := self.parent.settings.get("startProgram.program"): if start_cmd := self.parent.settings.get("startProgram.program"):
if isinstance(start_cmd, HonParameterProgram) and ( if ids := start_cmd.ids:
ids := start_cmd.ids
):
program_name = ids.get(program, program_name) program_name = ids.get(program, program_name)
data["programName"] = program_name data["programName"] = program_name
return data return data
def settings(self, settings: Dict[str, Any]) -> Dict[str, Any]: def settings(self, settings):
return settings return settings

View File

@ -1,11 +1,8 @@
# pylint: disable=duplicate-code
from typing import Any, Dict
from pyhon.appliances.base import ApplianceBase from pyhon.appliances.base import ApplianceBase
class Appliance(ApplianceBase): class Appliance(ApplianceBase):
def attributes(self, data: Dict[str, Any]) -> Dict[str, Any]: def attributes(self, data):
data = super().attributes(data) data = super().attributes(data)
if data.get("lastConnEvent", {}).get("category", "") == "DISCONNECTED": if data.get("lastConnEvent", {}).get("category", "") == "DISCONNECTED":
data["parameters"]["machMode"].value = "0" data["parameters"]["machMode"].value = "0"

View File

@ -1,16 +1,19 @@
from typing import Any, Dict
from pyhon.appliances.base import ApplianceBase from pyhon.appliances.base import ApplianceBase
class Appliance(ApplianceBase): class Appliance(ApplianceBase):
def attributes(self, data: Dict[str, Any]) -> Dict[str, Any]: def attributes(self, data):
data = super().attributes(data) data = super().attributes(data)
if data.get("lastConnEvent", {}).get("category", "") == "DISCONNECTED": if data.get("lastConnEvent", {}).get("category", "") == "DISCONNECTED":
data["parameters"]["temp"].value = 0 data["parameters"]["temp"].value = "0"
data["parameters"]["onOffStatus"].value = 0 data["parameters"]["onOffStatus"].value = "0"
data["parameters"]["remoteCtrValid"].value = 0 data["parameters"]["remoteCtrValid"].value = "0"
data["parameters"]["remainingTimeMM"].value = 0 data["parameters"]["remainingTimeMM"].value = "0"
data["active"] = data["parameters"]["onOffStatus"] == "1"
if program := int(data["parameters"]["prCode"]):
ids = self.parent.settings["startProgram.program"].ids
data["programName"] = ids.get(program, "")
data["active"] = data["parameters"]["onOffStatus"].value == 1
return data return data

View File

@ -1,21 +1,19 @@
from typing import Dict, Any
from pyhon.appliances.base import ApplianceBase from pyhon.appliances.base import ApplianceBase
class Appliance(ApplianceBase): class Appliance(ApplianceBase):
def attributes(self, data: Dict[str, Any]) -> Dict[str, Any]: def attributes(self, data):
data = super().attributes(data) data = super().attributes(data)
if data["parameters"]["holidayMode"] == "1": if data["parameters"]["holidayMode"] == "1":
data["modeZ1"] = "holiday" data["modeZ1"] = "holiday"
elif data["parameters"]["intelligenceMode"] == "1": elif data["parameters"]["intelligenceMode"] == "1":
data["modeZ1"] = "auto_set" data["modeZ1"] = "auto_set"
elif data["parameters"].get("quickModeZ1") == "1": elif data["parameters"]["quickModeZ1"] == "1":
data["modeZ1"] = "super_cool" data["modeZ1"] = "super_cool"
else: else:
data["modeZ1"] = "no_mode" data["modeZ1"] = "no_mode"
if data["parameters"].get("quickModeZ2") == "1": if data["parameters"]["quickModeZ2"] == "1":
data["modeZ2"] = "super_freeze" data["modeZ2"] = "super_freeze"
elif data["parameters"]["intelligenceMode"] == "1": elif data["parameters"]["intelligenceMode"] == "1":
data["modeZ2"] = "auto_set" data["modeZ2"] = "auto_set"

View File

@ -1,12 +1,9 @@
# pylint: disable=duplicate-code
from typing import Any, Dict
from pyhon.appliances.base import ApplianceBase from pyhon.appliances.base import ApplianceBase
from pyhon.parameter.fixed import HonParameterFixed from pyhon.parameter.fixed import HonParameterFixed
class Appliance(ApplianceBase): class Appliance(ApplianceBase):
def attributes(self, data: Dict[str, Any]) -> Dict[str, Any]: def attributes(self, data):
data = super().attributes(data) data = super().attributes(data)
if data.get("lastConnEvent", {}).get("category", "") == "DISCONNECTED": if data.get("lastConnEvent", {}).get("category", "") == "DISCONNECTED":
data["parameters"]["machMode"].value = "0" data["parameters"]["machMode"].value = "0"
@ -14,7 +11,7 @@ class Appliance(ApplianceBase):
data["pause"] = data["parameters"]["machMode"] == "3" data["pause"] = data["parameters"]["machMode"] == "3"
return data return data
def settings(self, settings: Dict[str, Any]) -> Dict[str, Any]: def settings(self, settings):
dry_level = settings.get("startProgram.dryLevel") dry_level = settings.get("startProgram.dryLevel")
if isinstance(dry_level, HonParameterFixed) and dry_level.value == "11": if isinstance(dry_level, HonParameterFixed) and dry_level.value == "11":
settings.pop("startProgram.dryLevel", None) settings.pop("startProgram.dryLevel", None)

View File

@ -1,11 +1,8 @@
# pylint: disable=duplicate-code
from typing import Dict, Any
from pyhon.appliances.base import ApplianceBase from pyhon.appliances.base import ApplianceBase
class Appliance(ApplianceBase): class Appliance(ApplianceBase):
def attributes(self, data: Dict[str, Any]) -> Dict[str, Any]: def attributes(self, data):
data = super().attributes(data) data = super().attributes(data)
if data.get("lastConnEvent", {}).get("category", "") == "DISCONNECTED": if data.get("lastConnEvent", {}).get("category", "") == "DISCONNECTED":
data["parameters"]["machMode"].value = "0" data["parameters"]["machMode"].value = "0"
@ -13,5 +10,5 @@ class Appliance(ApplianceBase):
data["pause"] = data["parameters"]["machMode"] == "3" data["pause"] = data["parameters"]["machMode"] == "3"
return data return data
def settings(self, settings: Dict[str, Any]) -> Dict[str, Any]: def settings(self, settings):
return settings return settings

View File

@ -1,16 +0,0 @@
from typing import Any, Dict
from pyhon.appliances.base import ApplianceBase
from pyhon.parameter.base import HonParameter
class Appliance(ApplianceBase):
def attributes(self, data: Dict[str, Any]) -> Dict[str, Any]:
data = super().attributes(data)
parameter = data.get("parameters", {}).get("onOffStatus")
is_class = isinstance(parameter, HonParameter)
data["active"] = parameter.value == 1 if is_class else parameter == 1
return data
def settings(self, settings: Dict[str, Any]) -> Dict[str, Any]:
return settings

View File

@ -1,11 +1,8 @@
# pylint: disable=duplicate-code
from typing import Any, Dict
from pyhon.appliances.base import ApplianceBase from pyhon.appliances.base import ApplianceBase
class Appliance(ApplianceBase): class Appliance(ApplianceBase):
def attributes(self, data: Dict[str, Any]) -> Dict[str, Any]: def attributes(self, data):
data = super().attributes(data) data = super().attributes(data)
if data.get("lastConnEvent", {}).get("category", "") == "DISCONNECTED": if data.get("lastConnEvent", {}).get("category", "") == "DISCONNECTED":
data["parameters"]["machMode"].value = "0" data["parameters"]["machMode"].value = "0"
@ -13,5 +10,5 @@ class Appliance(ApplianceBase):
data["pause"] = data["parameters"]["machMode"] == "3" data["pause"] = data["parameters"]["machMode"] == "3"
return data return data
def settings(self, settings: Dict[str, Any]) -> Dict[str, Any]: def settings(self, settings):
return settings return settings

View File

@ -7,7 +7,7 @@ from pyhon.helper import str_to_float
class HonAttribute: class HonAttribute:
_LOCK_TIMEOUT: Final = 10 _LOCK_TIMEOUT: Final = 10
def __init__(self, data: Dict[str, str] | str): def __init__(self, data):
self._value: str = "" self._value: str = ""
self._last_update: Optional[datetime] = None self._last_update: Optional[datetime] = None
self._lock_timestamp: Optional[datetime] = None self._lock_timestamp: Optional[datetime] = None
@ -22,7 +22,7 @@ class HonAttribute:
return self._value return self._value
@value.setter @value.setter
def value(self, value: str) -> None: def value(self, value) -> None:
self._value = value self._value = value
@property @property

View File

@ -4,25 +4,24 @@ from copy import copy
from typing import Dict, Any, Optional, TYPE_CHECKING, List from typing import Dict, Any, Optional, TYPE_CHECKING, List
from pyhon.commands import HonCommand from pyhon.commands import HonCommand
from pyhon.exceptions import NoAuthenticationException
from pyhon.parameter.fixed import HonParameterFixed from pyhon.parameter.fixed import HonParameterFixed
from pyhon.parameter.program import HonParameterProgram from pyhon.parameter.program import HonParameterProgram
if TYPE_CHECKING: if TYPE_CHECKING:
from pyhon import HonAPI from pyhon import HonAPI, exceptions
from pyhon.appliance import HonAppliance from pyhon.appliance import HonAppliance
class HonCommandLoader: class HonCommandLoader:
"""Loads and parses hOn command data""" """Loads and parses hOn command data"""
def __init__(self, api: "HonAPI", appliance: "HonAppliance") -> None: def __init__(self, api, appliance):
self._api: "HonAPI" = api
self._appliance: "HonAppliance" = appliance
self._api_commands: Dict[str, Any] = {} self._api_commands: Dict[str, Any] = {}
self._favourites: List[Dict[str, Any]] = [] self._favourites: List[Dict[str, Any]] = []
self._command_history: List[Dict[str, Any]] = [] self._command_history: List[Dict[str, Any]] = []
self._commands: Dict[str, HonCommand] = {} self._commands: Dict[str, HonCommand] = {}
self._api: "HonAPI" = api
self._appliance: "HonAppliance" = appliance
self._appliance_data: Dict[str, Any] = {} self._appliance_data: Dict[str, Any] = {}
self._additional_data: Dict[str, Any] = {} self._additional_data: Dict[str, Any] = {}
@ -30,7 +29,7 @@ class HonCommandLoader:
def api(self) -> "HonAPI": def api(self) -> "HonAPI":
"""api connection object""" """api connection object"""
if self._api is None: if self._api is None:
raise NoAuthenticationException("Missing hOn login") raise exceptions.NoAuthenticationException("Missing hOn login")
return self._api return self._api
@property @property
@ -53,25 +52,25 @@ class HonCommandLoader:
"""Get command additional data""" """Get command additional data"""
return self._additional_data return self._additional_data
async def load_commands(self) -> None: async def load_commands(self):
"""Trigger loading of command data""" """Trigger loading of command data"""
await self._load_data() await self._load_data()
self._appliance_data = self._api_commands.pop("applianceModel", {}) self._appliance_data = self._api_commands.pop("applianceModel")
self._get_commands() self._get_commands()
self._add_favourites() self._add_favourites()
self._recover_last_command_states() self._recover_last_command_states()
async def _load_commands(self) -> None: async def _load_commands(self):
self._api_commands = await self._api.load_commands(self._appliance) self._api_commands = await self._api.load_commands(self._appliance)
async def _load_favourites(self) -> None: async def _load_favourites(self):
self._favourites = await self._api.load_favourites(self._appliance) self._favourites = await self._api.load_favourites(self._appliance)
async def _load_command_history(self) -> None: async def _load_command_history(self):
self._command_history = await self._api.load_command_history(self._appliance) self._command_history = await self._api.load_command_history(self._appliance)
async def _load_data(self) -> None: async def _load_data(self):
"""Callback parallel all relevant data""" """Request parallel all relevant data"""
await asyncio.gather( await asyncio.gather(
*[ *[
self._load_commands(), self._load_commands(),
@ -103,24 +102,14 @@ class HonCommandLoader:
self._commands = {c.name: c for c in commands} self._commands = {c.name: c for c in commands}
def _parse_command( def _parse_command(
self, self, data: Dict[str, Any] | str, command_name: str, **kwargs
data: Dict[str, Any] | str,
command_name: str,
categories: Optional[Dict[str, "HonCommand"]] = None,
category_name: str = "",
) -> Optional[HonCommand]: ) -> Optional[HonCommand]:
"""Try to crate HonCommand object""" """Try to crate HonCommand object"""
if not isinstance(data, dict): if not isinstance(data, dict):
self._additional_data[command_name] = data self._additional_data[command_name] = data
return None return None
if self._is_command(data): if self._is_command(data):
return HonCommand( return HonCommand(command_name, data, self._appliance, **kwargs)
command_name,
data,
self._appliance,
category_name=category_name,
categories=categories,
)
if category := self._parse_categories(data, command_name): if category := self._parse_categories(data, command_name):
return category return category
return None return None
@ -131,9 +120,8 @@ class HonCommandLoader:
"""Parse categories and create reference to other""" """Parse categories and create reference to other"""
categories: Dict[str, HonCommand] = {} categories: Dict[str, HonCommand] = {}
for category, value in data.items(): for category, value in data.items():
if command := self._parse_command( kwargs = {"category_name": category, "categories": categories}
value, command_name, category_name=category, categories=categories if command := self._parse_command(value, command_name, **kwargs):
):
categories[self._clean_name(category)] = command categories[self._clean_name(category)] = command
if categories: if categories:
# setParameters should be at first place # setParameters should be at first place
@ -184,44 +172,22 @@ class HonCommandLoader:
def _add_favourites(self) -> None: def _add_favourites(self) -> None:
"""Patch program categories with favourites""" """Patch program categories with favourites"""
for favourite in self._favourites: for favourite in self._favourites:
name, command_name, base = self._get_favourite_info(favourite) name = favourite.get("favouriteName", {})
if not base:
continue
base_command: HonCommand = copy(base)
self._update_base_command_with_data(base_command, favourite)
self._update_base_command_with_favourite(base_command)
self._update_program_categories(command_name, name, base_command)
def _get_favourite_info(
self, favourite: Dict[str, Any]
) -> tuple[str, str, HonCommand | None]:
name: str = favourite.get("favouriteName", {})
command = favourite.get("command", {}) command = favourite.get("command", {})
command_name: str = command.get("commandName", "") command_name = command.get("commandName", "")
program_name = self._clean_name(command.get("programName", "")) program_name = self._clean_name(command.get("programName", ""))
base_command = self.commands[command_name].categories.get(program_name) base: HonCommand = copy(
return name, command_name, base_command self.commands[command_name].categories[program_name]
)
def _update_base_command_with_data(
self, base_command: HonCommand, command: Dict[str, Any]
) -> None:
for data in command.values(): for data in command.values():
if isinstance(data, str): if isinstance(data, str):
continue continue
for key, value in data.items(): for key, value in data.items():
if not (parameter := base_command.parameters.get(key)): if parameter := base.parameters.get(key):
continue
with suppress(ValueError): with suppress(ValueError):
parameter.value = value parameter.value = value
def _update_base_command_with_favourite(self, base_command: HonCommand) -> None:
extra_param = HonParameterFixed("favourite", {"fixedValue": "1"}, "custom") extra_param = HonParameterFixed("favourite", {"fixedValue": "1"}, "custom")
base_command.parameters.update(favourite=extra_param) base.parameters.update(favourite=extra_param)
if isinstance(program := base.parameters["program"], HonParameterProgram):
def _update_program_categories(
self, command_name: str, name: str, base_command: HonCommand
) -> None:
program = base_command.parameters["program"]
if isinstance(program, HonParameterProgram):
program.set_value(name) program.set_value(name)
self.commands[command_name].categories[name] = base_command self.commands[command_name].categories[name] = base

View File

@ -9,7 +9,6 @@ from pyhon.parameter.fixed import HonParameterFixed
from pyhon.parameter.program import HonParameterProgram from pyhon.parameter.program import HonParameterProgram
from pyhon.parameter.range import HonParameterRange from pyhon.parameter.range import HonParameterRange
from pyhon.rules import HonRuleSet from pyhon.rules import HonRuleSet
from pyhon.typedefs import Parameter
if TYPE_CHECKING: if TYPE_CHECKING:
from pyhon import HonAPI from pyhon import HonAPI
@ -27,29 +26,28 @@ class HonCommand:
categories: Optional[Dict[str, "HonCommand"]] = None, categories: Optional[Dict[str, "HonCommand"]] = None,
category_name: str = "", category_name: str = "",
): ):
self._name: str = name self._api: Optional[HonAPI] = appliance.api
self._api: Optional[HonAPI] = None
self._appliance: "HonAppliance" = appliance self._appliance: "HonAppliance" = appliance
self._name: str = name
self._categories: Optional[Dict[str, "HonCommand"]] = categories self._categories: Optional[Dict[str, "HonCommand"]] = categories
self._category_name: str = category_name self._category_name: str = category_name
self._parameters: Dict[str, Parameter] = {} self._description: str = attributes.pop("description", "")
self._protocol_type: str = attributes.pop("protocolType", "")
self._parameters: Dict[str, HonParameter] = {}
self._data: Dict[str, Any] = {} self._data: Dict[str, Any] = {}
self._available_settings: Dict[str, HonParameter] = {}
self._rules: List[HonRuleSet] = [] self._rules: List[HonRuleSet] = []
attributes.pop("description", "")
attributes.pop("protocolType", "")
self._load_parameters(attributes) self._load_parameters(attributes)
def __repr__(self) -> str: def __repr__(self) -> str:
return f"{self._name} command" return f"{self._name} command"
@property @property
def name(self) -> str: def name(self):
return self._name return self._name
@property @property
def api(self) -> "HonAPI": def api(self) -> "HonAPI":
if self._api is None and self._appliance:
self._api = self._appliance.api
if self._api is None: if self._api is None:
raise exceptions.NoAuthenticationException("Missing hOn login") raise exceptions.NoAuthenticationException("Missing hOn login")
return self._api return self._api
@ -59,7 +57,7 @@ class HonCommand:
return self._appliance return self._appliance
@property @property
def data(self) -> Dict[str, Any]: def data(self):
return self._data return self._data
@property @property
@ -77,40 +75,25 @@ class HonCommand:
result.setdefault(parameter.group, {})[name] = parameter.intern_value result.setdefault(parameter.group, {})[name] = parameter.intern_value
return result return result
@property
def mandatory_parameter_groups(self) -> Dict[str, Dict[str, Union[str, float]]]:
result: Dict[str, Dict[str, Union[str, float]]] = {}
for name, parameter in self._parameters.items():
if parameter.mandatory:
result.setdefault(parameter.group, {})[name] = parameter.intern_value
return result
@property @property
def parameter_value(self) -> Dict[str, Union[str, float]]: def parameter_value(self) -> Dict[str, Union[str, float]]:
return {n: p.value for n, p in self._parameters.items()} return {n: p.value for n, p in self._parameters.items()}
def _load_parameters(self, attributes: Dict[str, Dict[str, Any] | Any]) -> None: def _load_parameters(self, attributes):
for key, items in attributes.items(): for key, items in attributes.items():
if not isinstance(items, dict):
_LOGGER.info("Loading Attributes - Skipping %s", str(items))
continue
for name, data in items.items(): for name, data in items.items():
self._create_parameters(data, name, key) self._create_parameters(data, name, key)
for rule in self._rules: for rule in self._rules:
rule.patch() rule.patch()
def _create_parameters( def _create_parameters(self, data: Dict, name: str, parameter: str) -> None:
self, data: Dict[str, Any], name: str, parameter: str
) -> None:
if name == "zoneMap" and self._appliance.zone: if name == "zoneMap" and self._appliance.zone:
data["default"] = self._appliance.zone data["default"] = self._appliance.zone
if data.get("category") == "rule": if data.get("category") == "rule":
if "fixedValue" in data: if "fixedValue" not in data:
self._rules.append(HonRuleSet(self, data["fixedValue"])) _LOGGER.error("Rule not supported: %s", data)
elif "enumValues" in data:
self._rules.append(HonRuleSet(self, data["enumValues"]))
else: else:
_LOGGER.warning("Rule not supported: %s", data) self._rules.append(HonRuleSet(self, data["fixedValue"]))
match data.get("typology"): match data.get("typology"):
case "range": case "range":
self._parameters[name] = HonParameterRange(name, data, parameter) self._parameters[name] = HonParameterRange(name, data, parameter)
@ -125,33 +108,14 @@ class HonCommand:
name = "program" if "PROGRAM" in self._category_name else "category" name = "program" if "PROGRAM" in self._category_name else "category"
self._parameters[name] = HonParameterProgram(name, self, "custom") self._parameters[name] = HonParameterProgram(name, self, "custom")
async def send(self, only_mandatory: bool = False) -> bool: async def send(self) -> bool:
grouped_params = ( params = self.parameter_groups.get("parameters", {})
self.mandatory_parameter_groups if only_mandatory else self.parameter_groups
)
params = grouped_params.get("parameters", {})
return await self.send_parameters(params)
async def send_specific(self, param_names: List[str]) -> bool:
params: Dict[str, str | float] = {}
for key, parameter in self._parameters.items():
if key in param_names or parameter.mandatory:
params[key] = parameter.value
return await self.send_parameters(params)
async def send_parameters(self, params: Dict[str, str | float]) -> bool:
ancillary_params = self.parameter_groups.get("ancillaryParameters", {}) ancillary_params = self.parameter_groups.get("ancillaryParameters", {})
ancillary_params.pop("programRules", None) ancillary_params.pop("programRules", None)
if "prStr" in params: self.appliance.sync_to_params(self.name)
params["prStr"] = self._category_name.upper()
self.appliance.sync_command_to_params(self.name)
try: try:
result = await self.api.send_command( result = await self.api.send_command(
self._appliance, self._appliance, self._name, params, ancillary_params
self._name,
params,
ancillary_params,
self._category_name,
) )
if not result: if not result:
_LOGGER.error(result) _LOGGER.error(result)
@ -183,7 +147,7 @@ class HonCommand:
) )
@staticmethod @staticmethod
def _more_options(first: Parameter, second: Parameter) -> Parameter: def _more_options(first: HonParameter, second: HonParameter):
if isinstance(first, HonParameterFixed) and not isinstance( if isinstance(first, HonParameterFixed) and not isinstance(
second, HonParameterFixed second, HonParameterFixed
): ):
@ -193,8 +157,8 @@ class HonCommand:
return first return first
@property @property
def available_settings(self) -> Dict[str, Parameter]: def available_settings(self) -> Dict[str, HonParameter]:
result: Dict[str, Parameter] = {} result: Dict[str, HonParameter] = {}
for command in self.categories.values(): for command in self.categories.values():
for name, parameter in command.parameters.items(): for name, parameter in command.parameters.items():
if name in result: if name in result:
@ -202,7 +166,3 @@ class HonCommand:
else: else:
result[name] = parameter result[name] = parameter
return result return result
def reset(self) -> None:
for parameter in self._parameters.values():
parameter.reset()

View File

@ -3,8 +3,7 @@ import logging
from datetime import datetime from datetime import datetime
from pathlib import Path from pathlib import Path
from pprint import pformat from pprint import pformat
from types import TracebackType from typing import Dict, Optional, Any, List, no_type_check
from typing import Dict, Optional, Any, List, no_type_check, Type
from aiohttp import ClientSession from aiohttp import ClientSession
from typing_extensions import Self from typing_extensions import Self
@ -37,12 +36,7 @@ class HonAPI:
async def __aenter__(self) -> Self: async def __aenter__(self) -> Self:
return await self.create() return await self.create()
async def __aexit__( async def __aexit__(self, exc_type, exc_val, exc_tb) -> None:
self,
exc_type: Optional[Type[BaseException]],
exc: Optional[BaseException],
traceback: Optional[TracebackType],
) -> None:
await self.close() await self.close()
@property @property
@ -52,13 +46,13 @@ class HonAPI:
return self._hon.auth return self._hon.auth
@property @property
def _hon(self) -> HonConnectionHandler: def _hon(self):
if self._hon_handler is None: if self._hon_handler is None:
raise exceptions.NoAuthenticationException raise exceptions.NoAuthenticationException
return self._hon_handler return self._hon_handler
@property @property
def _hon_anonymous(self) -> HonAnonymousConnectionHandler: def _hon_anonymous(self):
if self._hon_anonymous_handler is None: if self._hon_anonymous_handler is None:
raise exceptions.NoAuthenticationException raise exceptions.NoAuthenticationException
return self._hon_anonymous_handler return self._hon_anonymous_handler
@ -75,16 +69,12 @@ class HonAPI:
async def load_appliances(self) -> List[Dict[str, Any]]: async def load_appliances(self) -> List[Dict[str, Any]]:
async with self._hon.get(f"{const.API_URL}/commands/v1/appliance") as resp: async with self._hon.get(f"{const.API_URL}/commands/v1/appliance") as resp:
result = await resp.json() if result := await resp.json():
if result: return result.get("payload", {}).get("appliances", {})
appliances: List[Dict[str, Any]] = result.get("payload", {}).get(
"appliances", {}
)
return appliances
return [] return []
async def load_commands(self, appliance: HonAppliance) -> Dict[str, Any]: async def load_commands(self, appliance: HonAppliance) -> Dict[str, Any]:
params: Dict[str, str | int] = { params: Dict = {
"applianceType": appliance.appliance_type, "applianceType": appliance.appliance_type,
"applianceModelId": appliance.appliance_model_id, "applianceModelId": appliance.appliance_model_id,
"macAddress": appliance.mac_address, "macAddress": appliance.mac_address,
@ -100,7 +90,7 @@ class HonAPI:
params["series"] = series params["series"] = series
url: str = f"{const.API_URL}/commands/v1/retrieve" url: str = f"{const.API_URL}/commands/v1/retrieve"
async with self._hon.get(url, params=params) as response: async with self._hon.get(url, params=params) as response:
result: Dict[str, Any] = (await response.json()).get("payload", {}) result: Dict = (await response.json()).get("payload", {})
if not result or result.pop("resultCode") != "0": if not result or result.pop("resultCode") != "0":
_LOGGER.error(await response.json()) _LOGGER.error(await response.json())
return {} return {}
@ -113,87 +103,76 @@ class HonAPI:
f"{const.API_URL}/commands/v1/appliance/{appliance.mac_address}/history" f"{const.API_URL}/commands/v1/appliance/{appliance.mac_address}/history"
) )
async with self._hon.get(url) as response: async with self._hon.get(url) as response:
result: Dict[str, Any] = await response.json() result: Dict = await response.json()
if not result or not result.get("payload"): if not result or not result.get("payload"):
return [] return []
command_history: List[Dict[str, Any]] = result["payload"]["history"] return result["payload"]["history"]
return command_history
async def load_favourites(self, appliance: HonAppliance) -> List[Dict[str, Any]]: async def load_favourites(self, appliance: HonAppliance) -> List[Dict[str, Any]]:
url: str = ( url: str = (
f"{const.API_URL}/commands/v1/appliance/{appliance.mac_address}/favourite" f"{const.API_URL}/commands/v1/appliance/{appliance.mac_address}/favourite"
) )
async with self._hon.get(url) as response: async with self._hon.get(url) as response:
result: Dict[str, Any] = await response.json() result: Dict = await response.json()
if not result or not result.get("payload"): if not result or not result.get("payload"):
return [] return []
favourites: List[Dict[str, Any]] = result["payload"]["favourites"] return result["payload"]["favourites"]
return favourites
async def load_last_activity(self, appliance: HonAppliance) -> Dict[str, Any]: async def load_last_activity(self, appliance: HonAppliance) -> Dict[str, Any]:
url: str = f"{const.API_URL}/commands/v1/retrieve-last-activity" url: str = f"{const.API_URL}/commands/v1/retrieve-last-activity"
params: Dict[str, str] = {"macAddress": appliance.mac_address} params: Dict = {"macAddress": appliance.mac_address}
async with self._hon.get(url, params=params) as response: async with self._hon.get(url, params=params) as response:
result: Dict[str, Any] = await response.json() result: Dict = await response.json()
if result: if result and (activity := result.get("attributes")):
activity: Dict[str, Any] = result.get("attributes", "")
if activity:
return activity return activity
return {} return {}
async def load_appliance_data(self, appliance: HonAppliance) -> Dict[str, Any]: async def load_appliance_data(self, appliance: HonAppliance) -> Dict[str, Any]:
url: str = f"{const.API_URL}/commands/v1/appliance-model" url: str = f"{const.API_URL}/commands/v1/appliance-model"
params: Dict[str, str] = { params: Dict = {
"code": appliance.code, "code": appliance.code,
"macAddress": appliance.mac_address, "macAddress": appliance.mac_address,
} }
async with self._hon.get(url, params=params) as response: async with self._hon.get(url, params=params) as response:
result: Dict[str, Any] = await response.json() result: Dict = await response.json()
if result: if result:
appliance_data: Dict[str, Any] = result.get("payload", {}).get( return result.get("payload", {}).get("applianceModel", {})
"applianceModel", {}
)
return appliance_data
return {} return {}
async def load_attributes(self, appliance: HonAppliance) -> Dict[str, Any]: async def load_attributes(self, appliance: HonAppliance) -> Dict[str, Any]:
params: Dict[str, str] = { params: Dict = {
"macAddress": appliance.mac_address, "macAddress": appliance.mac_address,
"applianceType": appliance.appliance_type, "applianceType": appliance.appliance_type,
"category": "CYCLE", "category": "CYCLE",
} }
url: str = f"{const.API_URL}/commands/v1/context" url: str = f"{const.API_URL}/commands/v1/context"
async with self._hon.get(url, params=params) as response: async with self._hon.get(url, params=params) as response:
attributes: Dict[str, Any] = (await response.json()).get("payload", {}) return (await response.json()).get("payload", {})
return attributes
async def load_statistics(self, appliance: HonAppliance) -> Dict[str, Any]: async def load_statistics(self, appliance: HonAppliance) -> Dict[str, Any]:
params: Dict[str, str] = { params: Dict = {
"macAddress": appliance.mac_address, "macAddress": appliance.mac_address,
"applianceType": appliance.appliance_type, "applianceType": appliance.appliance_type,
} }
url: str = f"{const.API_URL}/commands/v1/statistics" url: str = f"{const.API_URL}/commands/v1/statistics"
async with self._hon.get(url, params=params) as response: async with self._hon.get(url, params=params) as response:
statistics: Dict[str, Any] = (await response.json()).get("payload", {}) return (await response.json()).get("payload", {})
return statistics
async def load_maintenance(self, appliance: HonAppliance) -> Dict[str, Any]: async def load_maintenance(self, appliance: HonAppliance) -> Dict[str, Any]:
url = f"{const.API_URL}/commands/v1/maintenance-cycle" url = f"{const.API_URL}/commands/v1/maintenance-cycle"
params = {"macAddress": appliance.mac_address} params = {"macAddress": appliance.mac_address}
async with self._hon.get(url, params=params) as response: async with self._hon.get(url, params=params) as response:
maintenance: Dict[str, Any] = (await response.json()).get("payload", {}) return (await response.json()).get("payload", {})
return maintenance
async def send_command( async def send_command(
self, self,
appliance: HonAppliance, appliance: HonAppliance,
command: str, command: str,
parameters: Dict[str, Any], parameters: Dict,
ancillary_parameters: Dict[str, Any], ancillary_parameters: Dict,
program_name: str = "",
) -> bool: ) -> bool:
now: str = datetime.utcnow().isoformat() now: str = datetime.utcnow().isoformat()
data: Dict[str, Any] = { data: Dict = {
"macAddress": appliance.mac_address, "macAddress": appliance.mac_address,
"timestamp": f"{now[:-3]}Z", "timestamp": f"{now[:-3]}Z",
"commandName": command, "commandName": command,
@ -209,11 +188,9 @@ class HonAPI:
"parameters": parameters, "parameters": parameters,
"applianceType": appliance.appliance_type, "applianceType": appliance.appliance_type,
} }
if command == "startProgram" and program_name:
data.update({"programName": program_name.upper()})
url: str = f"{const.API_URL}/commands/v1/send" url: str = f"{const.API_URL}/commands/v1/send"
async with self._hon.post(url, json=data) as response: async with self._hon.post(url, json=data) as response:
json_data: Dict[str, Any] = await response.json() json_data: Dict = await response.json()
if json_data.get("payload", {}).get("resultCode") == "0": if json_data.get("payload", {}).get("resultCode") == "0":
return True return True
_LOGGER.error(await response.text()) _LOGGER.error(await response.text())
@ -223,15 +200,16 @@ class HonAPI:
async def appliance_configuration(self) -> Dict[str, Any]: async def appliance_configuration(self) -> Dict[str, Any]:
url: str = f"{const.API_URL}/config/v1/program-list-rules" url: str = f"{const.API_URL}/config/v1/program-list-rules"
async with self._hon_anonymous.get(url) as response: async with self._hon_anonymous.get(url) as response:
result: Dict[str, Any] = await response.json() result: Dict = await response.json()
data: Dict[str, Any] = result.get("payload", {}) if result and (data := result.get("payload")):
return data return data
return {}
async def app_config( async def app_config(
self, language: str = "en", beta: bool = True self, language: str = "en", beta: bool = True
) -> Dict[str, Any]: ) -> Dict[str, Any]:
url: str = f"{const.API_URL}/app-config" url: str = f"{const.API_URL}/app-config"
payload_data: Dict[str, str | int] = { payload_data: Dict = {
"languageCode": language, "languageCode": language,
"beta": beta, "beta": beta,
"appVersion": const.APP_VERSION, "appVersion": const.APP_VERSION,
@ -239,17 +217,17 @@ class HonAPI:
} }
payload: str = json.dumps(payload_data, separators=(",", ":")) payload: str = json.dumps(payload_data, separators=(",", ":"))
async with self._hon_anonymous.post(url, data=payload) as response: async with self._hon_anonymous.post(url, data=payload) as response:
result = await response.json() if (result := await response.json()) and (data := result.get("payload")):
data: Dict[str, Any] = result.get("payload", {})
return data return data
return {}
async def translation_keys(self, language: str = "en") -> Dict[str, Any]: async def translation_keys(self, language: str = "en") -> Dict[str, Any]:
config = await self.app_config(language=language) config = await self.app_config(language=language)
if not (url := config.get("language", {}).get("jsonPath")): if url := config.get("language", {}).get("jsonPath"):
return {}
async with self._hon_anonymous.get(url) as response: async with self._hon_anonymous.get(url) as response:
result: Dict[str, Any] = await response.json() if result := await response.json():
return result return result
return {}
async def close(self) -> None: async def close(self) -> None:
if self._hon_handler is not None: if self._hon_handler is not None:
@ -259,34 +237,24 @@ class HonAPI:
class TestAPI(HonAPI): class TestAPI(HonAPI):
def __init__(self, path: Path): def __init__(self, path):
super().__init__() super().__init__()
self._anonymous = True self._anonymous = True
self._path: Path = path self._path: Path = path
def _load_json(self, appliance: HonAppliance, file: str) -> Dict[str, Any]: def _load_json(self, appliance: HonAppliance, file) -> Dict[str, Any]:
directory = f"{appliance.appliance_type}_{appliance.appliance_model_id}".lower() directory = f"{appliance.appliance_type}_{appliance.appliance_model_id}".lower()
if not (path := self._path / directory / f"{file}.json").exists(): path = f"{self._path}/{directory}/{file}.json"
_LOGGER.warning("Can't open %s", str(path))
return {}
with open(path, "r", encoding="utf-8") as json_file: with open(path, "r", encoding="utf-8") as json_file:
text = json_file.read() return json.loads(json_file.read())
try:
data: Dict[str, Any] = json.loads(text)
return data
except json.decoder.JSONDecodeError as error:
_LOGGER.error("%s - %s", str(path), error)
return {}
async def load_appliances(self) -> List[Dict[str, Any]]: async def load_appliances(self) -> List[Dict[str, Any]]:
result = [] result = []
for appliance in self._path.glob("*/"): for appliance in self._path.glob("*/"):
file = appliance / "appliance_data.json" with open(
with open(file, "r", encoding="utf-8") as json_file: appliance / "appliance_data.json", "r", encoding="utf-8"
try: ) as json_file:
result.append(json.loads(json_file.read())) result.append(json.loads(json_file.read()))
except json.decoder.JSONDecodeError as error:
_LOGGER.error("%s - %s", str(file), error)
return result return result
async def load_commands(self, appliance: HonAppliance) -> Dict[str, Any]: async def load_commands(self, appliance: HonAppliance) -> Dict[str, Any]:
@ -320,14 +288,7 @@ class TestAPI(HonAPI):
self, self,
appliance: HonAppliance, appliance: HonAppliance,
command: str, command: str,
parameters: Dict[str, Any], parameters: Dict,
ancillary_parameters: Dict[str, Any], ancillary_parameters: Dict,
program_name: str = "",
) -> bool: ) -> bool:
_LOGGER.info(
"%s - %s - %s",
str(parameters),
str(ancillary_parameters),
str(program_name),
)
return True return True

View File

@ -6,16 +6,14 @@ import urllib
from contextlib import suppress from contextlib import suppress
from dataclasses import dataclass from dataclasses import dataclass
from datetime import datetime, timedelta from datetime import datetime, timedelta
from typing import Dict, Optional, Any, List from typing import Dict, Optional
from urllib import parse from urllib import parse
from urllib.parse import quote from urllib.parse import quote
import aiohttp
from aiohttp import ClientResponse from aiohttp import ClientResponse
from yarl import URL from yarl import URL
from pyhon import const, exceptions from pyhon import const, exceptions
from pyhon.connection.device import HonDevice
from pyhon.connection.handler.auth import HonAuthConnectionHandler from pyhon.connection.handler.auth import HonAuthConnectionHandler
_LOGGER = logging.getLogger(__name__) _LOGGER = logging.getLogger(__name__)
@ -27,52 +25,41 @@ class HonLoginData:
email: str = "" email: str = ""
password: str = "" password: str = ""
fw_uid: str = "" fw_uid: str = ""
loaded: Optional[Dict[str, Any]] = None loaded: Optional[Dict] = None
@dataclass
class HonAuthData:
access_token: str = ""
refresh_token: str = ""
cognito_token: str = ""
id_token: str = ""
class HonAuth: class HonAuth:
_TOKEN_EXPIRES_AFTER_HOURS = 8 _TOKEN_EXPIRES_AFTER_HOURS = 8
_TOKEN_EXPIRE_WARNING_HOURS = 7 _TOKEN_EXPIRE_WARNING_HOURS = 7
def __init__( def __init__(self, session, email, password, device) -> None:
self,
session: aiohttp.ClientSession,
email: str,
password: str,
device: HonDevice,
) -> None:
self._session = session self._session = session
self._request = HonAuthConnectionHandler(session) self._request = HonAuthConnectionHandler(session)
self._login_data = HonLoginData() self._login_data = HonLoginData()
self._login_data.email = email self._login_data.email = email
self._login_data.password = password self._login_data.password = password
self._access_token = ""
self._refresh_token = ""
self._cognito_token = ""
self._id_token = ""
self._device = device self._device = device
self._expires: datetime = datetime.utcnow() self._expires: datetime = datetime.utcnow()
self._auth = HonAuthData()
@property @property
def cognito_token(self) -> str: def cognito_token(self) -> str:
return self._auth.cognito_token return self._cognito_token
@property @property
def id_token(self) -> str: def id_token(self) -> str:
return self._auth.id_token return self._id_token
@property @property
def access_token(self) -> str: def access_token(self) -> str:
return self._auth.access_token return self._access_token
@property @property
def refresh_token(self) -> str: def refresh_token(self) -> str:
return self._auth.refresh_token return self._refresh_token
def _check_token_expiration(self, hours: int) -> bool: def _check_token_expiration(self, hours: int) -> bool:
return datetime.utcnow() >= self._expires + timedelta(hours=hours) return datetime.utcnow() >= self._expires + timedelta(hours=hours)
@ -120,8 +107,7 @@ class HonAuth:
async with self._request.get(url) as response: async with self._request.get(url) as response:
text = await response.text() text = await response.text()
self._expires = datetime.utcnow() self._expires = datetime.utcnow()
login_url: List[str] = re.findall("url = '(.+?)'", text) if not (login_url := re.findall("url = '(.+?)'", text)):
if not login_url:
if "oauth/done#access_token=" in text: if "oauth/done#access_token=" in text:
self._parse_token_data(text) self._parse_token_data(text)
raise exceptions.HonNoAuthenticationNeeded() raise exceptions.HonNoAuthenticationNeeded()
@ -134,7 +120,7 @@ class HonAuth:
await self._error_logger(response) await self._error_logger(response)
return new_location return new_location
async def _handle_redirects(self, login_url: str) -> str: async def _handle_redirects(self, login_url) -> str:
redirect1 = await self._manual_redirect(login_url) redirect1 = await self._manual_redirect(login_url)
redirect2 = await self._manual_redirect(redirect1) redirect2 = await self._manual_redirect(redirect1)
return f"{redirect2}&System=IoT_Mobile_App&RegistrationSubChannel=hOn" return f"{redirect2}&System=IoT_Mobile_App&RegistrationSubChannel=hOn"
@ -190,19 +176,18 @@ class HonAuth:
if response.status == 200: if response.status == 200:
with suppress(json.JSONDecodeError, KeyError): with suppress(json.JSONDecodeError, KeyError):
result = await response.json() result = await response.json()
url: str = result["events"][0]["attributes"]["values"]["url"] return result["events"][0]["attributes"]["values"]["url"]
return url
await self._error_logger(response) await self._error_logger(response)
return "" return ""
def _parse_token_data(self, text: str) -> bool: def _parse_token_data(self, text: str) -> bool:
if access_token := re.findall("access_token=(.*?)&", text): if access_token := re.findall("access_token=(.*?)&", text):
self._auth.access_token = access_token[0] self._access_token = access_token[0]
if refresh_token := re.findall("refresh_token=(.*?)&", text): if refresh_token := re.findall("refresh_token=(.*?)&", text):
self._auth.refresh_token = refresh_token[0] self._refresh_token = refresh_token[0]
if id_token := re.findall("id_token=(.*?)&", text): if id_token := re.findall("id_token=(.*?)&", text):
self._auth.id_token = id_token[0] self._id_token = id_token[0]
return bool(access_token and refresh_token and id_token) return True if access_token and refresh_token and id_token else False
async def _get_token(self, url: str) -> bool: async def _get_token(self, url: str) -> bool:
async with self._request.get(url) as response: async with self._request.get(url) as response:
@ -234,7 +219,7 @@ class HonAuth:
return True return True
async def _api_auth(self) -> bool: async def _api_auth(self) -> bool:
post_headers = {"id-token": self._auth.id_token} post_headers = {"id-token": self._id_token}
data = self._device.get() data = self._device.get()
async with self._request.post( async with self._request.post(
f"{const.API_URL}/auth/v1/login", headers=post_headers, json=data f"{const.API_URL}/auth/v1/login", headers=post_headers, json=data
@ -244,8 +229,8 @@ class HonAuth:
except json.JSONDecodeError: except json.JSONDecodeError:
await self._error_logger(response) await self._error_logger(response)
return False return False
self._auth.cognito_token = json_data.get("cognitoUser", {}).get("Token", "") self._cognito_token = json_data.get("cognitoUser", {}).get("Token", "")
if not self._auth.cognito_token: if not self._cognito_token:
_LOGGER.error(json_data) _LOGGER.error(json_data)
raise exceptions.HonAuthenticationError() raise exceptions.HonAuthenticationError()
return True return True
@ -267,7 +252,7 @@ class HonAuth:
async def refresh(self) -> bool: async def refresh(self) -> bool:
params = { params = {
"client_id": const.CLIENT_ID, "client_id": const.CLIENT_ID,
"refresh_token": self._auth.refresh_token, "refresh_token": self._refresh_token,
"grant_type": "refresh_token", "grant_type": "refresh_token",
} }
async with self._request.post( async with self._request.post(
@ -278,14 +263,14 @@ class HonAuth:
return False return False
data = await response.json() data = await response.json()
self._expires = datetime.utcnow() self._expires = datetime.utcnow()
self._auth.id_token = data["id_token"] self._id_token = data["id_token"]
self._auth.access_token = data["access_token"] self._access_token = data["access_token"]
return await self._api_auth() return await self._api_auth()
def clear(self) -> None: def clear(self) -> None:
self._session.cookie_jar.clear_domain(const.AUTH_API.split("/")[-2]) self._session.cookie_jar.clear_domain(const.AUTH_API.split("/")[-2])
self._request.called_urls = [] self._request.called_urls = []
self._auth.cognito_token = "" self._cognito_token = ""
self._auth.id_token = "" self._id_token = ""
self._auth.access_token = "" self._access_token = ""
self._auth.refresh_token = "" self._refresh_token = ""

View File

@ -21,7 +21,7 @@ class HonDevice:
return self._os_version return self._os_version
@property @property
def os_type(self) -> str: def os(self) -> str:
return self._os return self._os
@property @property
@ -32,14 +32,12 @@ class HonDevice:
def mobile_id(self) -> str: def mobile_id(self) -> str:
return self._mobile_id return self._mobile_id
def get(self, mobile: bool = False) -> Dict[str, str | int]: def get(self, mobile: bool = False) -> Dict:
result: Dict[str, str | int] = { result = {
"appVersion": self.app_version, "appVersion": self.app_version,
"mobileId": self.mobile_id, "mobileId": self.mobile_id,
"os": self.os_type, "os": self.os,
"osVersion": self.os_version, "osVersion": self.os_version,
"deviceModel": self.device_model, "deviceModel": self.device_model,
} }
if mobile: return (result | {"mobileOs": result.pop("os")}) if mobile else result
result |= {"mobileOs": result.pop("os", "")}
return result

View File

@ -1,27 +1,21 @@
import logging import logging
from collections.abc import AsyncIterator from collections.abc import AsyncIterator
from contextlib import asynccontextmanager from contextlib import asynccontextmanager
from typing import Dict, Any from typing import Callable, Dict
import aiohttp
from yarl import URL
from pyhon import const from pyhon import const
from pyhon.connection.handler.base import ConnectionHandler from pyhon.connection.handler.base import ConnectionHandler
from pyhon.typedefs import Callback
_LOGGER = logging.getLogger(__name__) _LOGGER = logging.getLogger(__name__)
class HonAnonymousConnectionHandler(ConnectionHandler): class HonAnonymousConnectionHandler(ConnectionHandler):
_HEADERS: Dict[str, str] = ConnectionHandler._HEADERS | {"x-api-key": const.API_KEY} _HEADERS: Dict = ConnectionHandler._HEADERS | {"x-api-key": const.API_KEY}
@asynccontextmanager @asynccontextmanager
async def _intercept( async def _intercept(self, method: Callable, *args, **kwargs) -> AsyncIterator:
self, method: Callback, url: str | URL, *args: Any, **kwargs: Dict[str, Any]
) -> AsyncIterator[aiohttp.ClientResponse]:
kwargs["headers"] = kwargs.pop("headers", {}) | self._HEADERS kwargs["headers"] = kwargs.pop("headers", {}) | self._HEADERS
async with method(url, *args, **kwargs) as response: async with method(*args, **kwargs) as response:
if response.status == 403: if response.status == 403:
_LOGGER.error("Can't authenticate anymore") _LOGGER.error("Can't authenticate anymore")
yield response yield response

View File

@ -1,14 +1,12 @@
import logging import logging
from collections.abc import AsyncIterator from collections.abc import AsyncIterator
from contextlib import asynccontextmanager from contextlib import asynccontextmanager
from typing import Optional, List, Tuple, Any, Dict from typing import Optional, Callable, List, Tuple
import aiohttp import aiohttp
from yarl import URL
from pyhon import const from pyhon import const
from pyhon.connection.handler.base import ConnectionHandler from pyhon.connection.handler.base import ConnectionHandler
from pyhon.typedefs import Callback
_LOGGER = logging.getLogger(__name__) _LOGGER = logging.getLogger(__name__)
@ -30,9 +28,9 @@ class HonAuthConnectionHandler(ConnectionHandler):
@asynccontextmanager @asynccontextmanager
async def _intercept( async def _intercept(
self, method: Callback, url: str | URL, *args: Any, **kwargs: Dict[str, Any] self, method: Callable, *args, loop: int = 0, **kwargs
) -> AsyncIterator[aiohttp.ClientResponse]: ) -> AsyncIterator:
kwargs["headers"] = kwargs.pop("headers", {}) | self._HEADERS kwargs["headers"] = kwargs.pop("headers", {}) | self._HEADERS
async with method(url, *args, **kwargs) as response: async with method(*args, **kwargs) as response:
self._called_urls.append((response.status, str(response.request_info.url))) self._called_urls.append((response.status, response.request_info.url))
yield response yield response

View File

@ -1,21 +1,18 @@
import logging import logging
from collections.abc import AsyncIterator from collections.abc import AsyncIterator
from contextlib import asynccontextmanager from contextlib import asynccontextmanager
from types import TracebackType from typing import Optional, Callable, Dict
from typing import Optional, Dict, Type, Any
import aiohttp import aiohttp
from typing_extensions import Self from typing_extensions import Self
from yarl import URL
from pyhon import const, exceptions from pyhon import const, exceptions
from pyhon.typedefs import Callback
_LOGGER = logging.getLogger(__name__) _LOGGER = logging.getLogger(__name__)
class ConnectionHandler: class ConnectionHandler:
_HEADERS: Dict[str, str] = { _HEADERS: Dict = {
"user-agent": const.USER_AGENT, "user-agent": const.USER_AGENT,
"Content-Type": "application/json", "Content-Type": "application/json",
} }
@ -27,52 +24,32 @@ class ConnectionHandler:
async def __aenter__(self) -> Self: async def __aenter__(self) -> Self:
return await self.create() return await self.create()
async def __aexit__( async def __aexit__(self, exc_type, exc_val, exc_tb) -> None:
self,
exc_type: Optional[Type[BaseException]],
exc: Optional[BaseException],
traceback: Optional[TracebackType],
) -> None:
await self.close() await self.close()
@property
def session(self) -> aiohttp.ClientSession:
if self._session is None:
raise exceptions.NoSessionException
return self._session
async def create(self) -> Self: async def create(self) -> Self:
if self._create_session: if self._create_session:
self._session = aiohttp.ClientSession() self._session = aiohttp.ClientSession()
return self return self
@asynccontextmanager @asynccontextmanager
async def _intercept( def _intercept(self, method: Callable, *args, loop: int = 0, **kwargs):
self, method: Callback, url: str | URL, *args: Any, **kwargs: Dict[str, Any] raise NotImplementedError
) -> AsyncIterator[aiohttp.ClientResponse]:
async with method(url, *args, **kwargs) as response:
yield response
@asynccontextmanager @asynccontextmanager
async def get( async def get(self, *args, **kwargs) -> AsyncIterator[aiohttp.ClientResponse]:
self, *args: Any, **kwargs: Any
) -> AsyncIterator[aiohttp.ClientResponse]:
if self._session is None: if self._session is None:
raise exceptions.NoSessionException() raise exceptions.NoSessionException()
response: aiohttp.ClientResponse response: aiohttp.ClientResponse
args = self._session.get, *args async with self._intercept(self._session.get, *args, **kwargs) as response:
async with self._intercept(*args, **kwargs) as response:
yield response yield response
@asynccontextmanager @asynccontextmanager
async def post( async def post(self, *args, **kwargs) -> AsyncIterator[aiohttp.ClientResponse]:
self, *args: Any, **kwargs: Any
) -> AsyncIterator[aiohttp.ClientResponse]:
if self._session is None: if self._session is None:
raise exceptions.NoSessionException() raise exceptions.NoSessionException()
response: aiohttp.ClientResponse response: aiohttp.ClientResponse
args = self._session.post, *args async with self._intercept(self._session.post, *args, **kwargs) as response:
async with self._intercept(*args, **kwargs) as response:
yield response yield response
async def close(self) -> None: async def close(self) -> None:

View File

@ -2,17 +2,15 @@ import json
import logging import logging
from collections.abc import AsyncIterator from collections.abc import AsyncIterator
from contextlib import asynccontextmanager from contextlib import asynccontextmanager
from typing import Optional, Dict, Any from typing import Optional, Callable, Dict
import aiohttp import aiohttp
from typing_extensions import Self from typing_extensions import Self
from yarl import URL
from pyhon.connection.auth import HonAuth from pyhon.connection.auth import HonAuth
from pyhon.connection.device import HonDevice from pyhon.connection.device import HonDevice
from pyhon.connection.handler.base import ConnectionHandler from pyhon.connection.handler.base import ConnectionHandler
from pyhon.exceptions import HonAuthenticationError, NoAuthenticationException from pyhon.exceptions import HonAuthenticationError, NoAuthenticationException
from pyhon.typedefs import Callback
_LOGGER = logging.getLogger(__name__) _LOGGER = logging.getLogger(__name__)
@ -43,10 +41,10 @@ class HonConnectionHandler(ConnectionHandler):
async def create(self) -> Self: async def create(self) -> Self:
await super().create() await super().create()
self._auth = HonAuth(self.session, self._email, self._password, self._device) self._auth = HonAuth(self._session, self._email, self._password, self._device)
return self return self
async def _check_headers(self, headers: Dict[str, str]) -> Dict[str, str]: async def _check_headers(self, headers: Dict) -> Dict:
if not (self.auth.cognito_token and self.auth.id_token): if not (self.auth.cognito_token and self.auth.id_token):
await self.auth.authenticate() await self.auth.authenticate()
headers["cognito-token"] = self.auth.cognito_token headers["cognito-token"] = self.auth.cognito_token
@ -55,18 +53,17 @@ class HonConnectionHandler(ConnectionHandler):
@asynccontextmanager @asynccontextmanager
async def _intercept( async def _intercept(
self, method: Callback, url: str | URL, *args: Any, **kwargs: Any self, method: Callable, *args, loop: int = 0, **kwargs
) -> AsyncIterator[aiohttp.ClientResponse]: ) -> AsyncIterator:
loop: int = kwargs.pop("loop", 0)
kwargs["headers"] = await self._check_headers(kwargs.get("headers", {})) kwargs["headers"] = await self._check_headers(kwargs.get("headers", {}))
async with method(url, *args, **kwargs) as response: async with method(*args, **kwargs) as response:
if ( if (
self.auth.token_expires_soon or response.status in [401, 403] self.auth.token_expires_soon or response.status in [401, 403]
) and loop == 0: ) and loop == 0:
_LOGGER.info("Try refreshing token...") _LOGGER.info("Try refreshing token...")
await self.auth.refresh() await self.auth.refresh()
async with self._intercept( async with self._intercept(
method, url, *args, loop=loop + 1, **kwargs method, *args, loop=loop + 1, **kwargs
) as result: ) as result:
yield result yield result
elif ( elif (
@ -80,7 +77,7 @@ class HonConnectionHandler(ConnectionHandler):
) )
await self.create() await self.create()
async with self._intercept( async with self._intercept(
method, url, *args, loop=loop + 1, **kwargs method, *args, loop=loop + 1, **kwargs
) as result: ) as result:
yield result yield result
elif loop >= 2: elif loop >= 2:
@ -95,11 +92,11 @@ class HonConnectionHandler(ConnectionHandler):
try: try:
await response.json() await response.json()
yield response yield response
except json.JSONDecodeError as exc: except json.JSONDecodeError:
_LOGGER.warning( _LOGGER.warning(
"%s - JsonDecodeError %s - %s", "%s - JsonDecodeError %s - %s",
response.request_info.url, response.request_info.url,
response.status, response.status,
await response.text(), await response.text(),
) )
raise HonAuthenticationError("Decode Error") from exc raise HonAuthenticationError("Decode Error")

View File

@ -2,11 +2,9 @@ AUTH_API = "https://account2.hon-smarthome.com"
API_URL = "https://api-iot.he.services" API_URL = "https://api-iot.he.services"
API_KEY = "GRCqFhC6Gk@ikWXm1RmnSmX1cm,MxY-configuration" API_KEY = "GRCqFhC6Gk@ikWXm1RmnSmX1cm,MxY-configuration"
APP = "hon" APP = "hon"
CLIENT_ID = ( # All seen id's (different accounts, different devices) are the same, so I guess this hash is static
"3MVG9QDx8IX8nP5T2Ha8ofvlmjLZl5L_gvfbT9." CLIENT_ID = "3MVG9QDx8IX8nP5T2Ha8ofvlmjLZl5L_gvfbT9.HJvpHGKoAS_dcMN8LYpTSYeVFCraUnV.2Ag1Ki7m4znVO6"
"HJvpHGKoAS_dcMN8LYpTSYeVFCraUnV.2Ag1Ki7m4znVO6" APP_VERSION = "2.0.10"
)
APP_VERSION = "2.4.7"
OS_VERSION = 31 OS_VERSION = 31
OS = "android" OS = "android"
DEVICE_MODEL = "exynos9820" DEVICE_MODEL = "exynos9820"

View File

@ -5,7 +5,7 @@ import shutil
from pathlib import Path from pathlib import Path
from typing import TYPE_CHECKING, List, Tuple from typing import TYPE_CHECKING, List, Tuple
from pyhon import printer from pyhon import helper
if TYPE_CHECKING: if TYPE_CHECKING:
from pyhon.appliance import HonAppliance from pyhon.appliance import HonAppliance
@ -29,7 +29,7 @@ def anonymize_data(data: str) -> str:
for match in re.findall(f'"{sensible}.*?":\\s"?(.+?)"?,?\\n', data): for match in re.findall(f'"{sensible}.*?":\\s"?(.+?)"?,?\\n', data):
replace = re.sub("[a-z]", "x", match) replace = re.sub("[a-z]", "x", match)
replace = re.sub("[A-Z]", "X", replace) replace = re.sub("[A-Z]", "X", replace)
replace = re.sub("\\d", "1", replace) replace = re.sub("\\d", "0", replace)
data = data.replace(match, replace) data = data.replace(match, replace)
return data return data
@ -38,7 +38,7 @@ async def load_data(appliance: "HonAppliance", topic: str) -> Tuple[str, str]:
return topic, await getattr(appliance.api, f"load_{topic}")(appliance) return topic, await getattr(appliance.api, f"load_{topic}")(appliance)
def write_to_json(data: str, topic: str, path: Path, anonymous: bool = False) -> Path: def write_to_json(data: str, topic: str, path: Path, anonymous: bool = False):
json_data = json.dumps(data, indent=4) json_data = json.dumps(data, indent=4)
if anonymous: if anonymous:
json_data = anonymize_data(json_data) json_data = anonymize_data(json_data)
@ -65,17 +65,15 @@ async def appliance_data(
return [write_to_json(data, topic, path, anonymous) for topic, data in api_data] return [write_to_json(data, topic, path, anonymous) for topic, data in api_data]
async def zip_archive( async def zip_archive(appliance: "HonAppliance", path: Path, anonymous: bool = False):
appliance: "HonAppliance", path: Path, anonymous: bool = False
) -> str:
data = await appliance_data(appliance, path, anonymous) data = await appliance_data(appliance, path, anonymous)
archive = data[0].parent archive = data[0].parent
shutil.make_archive(str(archive), "zip", archive) shutil.make_archive(str(archive), "zip", path)
shutil.rmtree(archive) shutil.rmtree(archive)
return f"{archive.stem}.zip" return f"{archive.stem}.zip"
def yaml_export(appliance: "HonAppliance", anonymous: bool = False) -> str: def yaml_export(appliance: "HonAppliance", anonymous=False) -> str:
data = { data = {
"attributes": appliance.attributes.copy(), "attributes": appliance.attributes.copy(),
"appliance": appliance.info, "appliance": appliance.info,
@ -89,11 +87,12 @@ def yaml_export(appliance: "HonAppliance", anonymous: bool = False) -> str:
if anonymous: if anonymous:
for sensible in ["serialNumber", "coords"]: for sensible in ["serialNumber", "coords"]:
data.get("appliance", {}).pop(sensible, None) data.get("appliance", {}).pop(sensible, None)
result = printer.pretty_print({"data": data}) data = {
if commands := printer.create_commands(appliance.commands): "data": data,
result += printer.pretty_print({"commands": commands}) "commands": helper.create_command(appliance.commands),
if rules := printer.create_rules(appliance.commands): "rules": helper.create_rules(appliance.commands),
result += printer.pretty_print({"rules": rules}) }
result = helper.pretty_print(data)
if anonymous: if anonymous:
result = anonymize_data(result) result = anonymize_data(result)
return result return result

View File

@ -1,3 +1,80 @@
def key_print(data, key="", start=True):
result = ""
if isinstance(data, list):
for i, value in enumerate(data):
result += key_print(value, key=f"{key}.{i}", start=False)
elif isinstance(data, dict):
for k, value in sorted(data.items()):
result += key_print(value, key=k if start else f"{key}.{k}", start=False)
else:
result += f"{key}: {data}\n"
return result
# yaml.dump() would be done the same, but needs an additional dependency...
def pretty_print(data, key="", intend=0, is_list=False, whitespace=" "):
result = ""
if isinstance(data, list):
if key:
result += f"{whitespace * intend}{'- ' if is_list else ''}{key}:\n"
intend += 1
for i, value in enumerate(data):
result += pretty_print(
value, intend=intend, is_list=True, whitespace=whitespace
)
elif isinstance(data, dict):
if key:
result += f"{whitespace * intend}{'- ' if is_list else ''}{key}:\n"
intend += 1
for i, (key, value) in enumerate(sorted(data.items())):
if is_list and not i:
result += pretty_print(
value, key=key, intend=intend, is_list=True, whitespace=whitespace
)
elif is_list:
result += pretty_print(
value, key=key, intend=intend + 1, whitespace=whitespace
)
else:
result += pretty_print(
value, key=key, intend=intend, whitespace=whitespace
)
else:
result += f"{whitespace * intend}{'- ' if is_list else ''}{key}{': ' if key else ''}{data}\n"
return result
def create_command(commands, concat=False):
result = {}
for name, command in commands.items():
for parameter, data in command.available_settings.items():
if data.typology == "enum":
value = data.values
elif data.typology == "range":
value = {"min": data.min, "max": data.max, "step": data.step}
else:
continue
if not concat:
result.setdefault(name, {})[parameter] = value
else:
result[f"{name}.{parameter}"] = value
return result
def create_rules(commands, concat=False):
result = {}
for name, command in commands.items():
for parameter, data in command.available_settings.items():
value = data.triggers
if not value:
continue
if not concat:
result.setdefault(name, {})[parameter] = value
else:
result[f"{name}.{parameter}"] = value
return result
def str_to_float(string: str | float) -> float: def str_to_float(string: str | float) -> float:
try: try:
return int(string) return int(string)

View File

@ -7,10 +7,9 @@ from typing import List, Optional, Dict, Any, Type
from aiohttp import ClientSession from aiohttp import ClientSession
from typing_extensions import Self from typing_extensions import Self
from pyhon import HonAPI, exceptions
from pyhon.appliance import HonAppliance from pyhon.appliance import HonAppliance
from pyhon.connection.api import HonAPI
from pyhon.connection.api import TestAPI from pyhon.connection.api import TestAPI
from pyhon.exceptions import NoAuthenticationException
_LOGGER = logging.getLogger(__name__) _LOGGER = logging.getLogger(__name__)
@ -44,7 +43,7 @@ class Hon:
@property @property
def api(self) -> HonAPI: def api(self) -> HonAPI:
if self._api is None: if self._api is None:
raise NoAuthenticationException raise exceptions.NoAuthenticationException
return self._api return self._api
@property @property
@ -71,11 +70,11 @@ class Hon:
return self._appliances return self._appliances
@appliances.setter @appliances.setter
def appliances(self, appliances: List[HonAppliance]) -> None: def appliances(self, appliances) -> None:
self._appliances = appliances self._appliances = appliances
async def _create_appliance( async def _create_appliance(
self, appliance_data: Dict[str, Any], api: HonAPI, zone: int = 0 self, appliance_data: Dict[str, Any], api: HonAPI, zone=0
) -> None: ) -> None:
appliance = HonAppliance(api, appliance_data, zone=zone) appliance = HonAppliance(api, appliance_data, zone=zone)
if appliance.mac_address == "": if appliance.mac_address == "":

View File

@ -7,21 +7,12 @@ if TYPE_CHECKING:
class HonParameter: class HonParameter:
def __init__(self, key: str, attributes: Dict[str, Any], group: str) -> None: def __init__(self, key: str, attributes: Dict[str, Any], group: str) -> None:
self._key = key self._key = key
self._attributes = attributes self._category: str = attributes.get("category", "")
self._category: str = "" self._typology: str = attributes.get("typology", "")
self._typology: str = "" self._mandatory: int = attributes.get("mandatory", 0)
self._mandatory: int = 0
self._value: str | float = "" self._value: str | float = ""
self._group: str = group self._group: str = group
self._triggers: Dict[ self._triggers: Dict[str, List[Tuple[Callable, "HonRule"]]] = {}
str, List[Tuple[Callable[["HonRule"], None], "HonRule"]]
] = {}
self._set_attributes()
def _set_attributes(self) -> None:
self._category = self._attributes.get("category", "")
self._typology = self._attributes.get("typology", "")
self._mandatory = self._attributes.get("mandatory", 0)
@property @property
def key(self) -> str: def key(self) -> str:
@ -60,23 +51,20 @@ class HonParameter:
def group(self) -> str: def group(self) -> str:
return self._group return self._group
def add_trigger( def add_trigger(self, value, func, data):
self, value: str, func: Callable[["HonRule"], None], data: "HonRule"
) -> None:
if self._value == value: if self._value == value:
func(data) func(data)
self._triggers.setdefault(value, []).append((func, data)) self._triggers.setdefault(value, []).append((func, data))
def check_trigger(self, value: str | float) -> None: def check_trigger(self, value) -> None:
triggers = {str(k).lower(): v for k, v in self._triggers.items()} if str(value) in self._triggers:
if str(value).lower() in triggers: for trigger in self._triggers[str(value)]:
for trigger in triggers[str(value)]:
func, args = trigger func, args = trigger
func(args) func(args)
@property @property
def triggers(self) -> Dict[str, Any]: def triggers(self):
result: Dict[str, Any] = {} result = {}
for value, rules in self._triggers.items(): for value, rules in self._triggers.items():
for _, rule in rules: for _, rule in rules:
if rule.extras: if rule.extras:
@ -93,6 +81,3 @@ class HonParameter:
param[rule.param_key] = rule.param_data.get("defaultValue", "") param[rule.param_key] = rule.param_data.get("defaultValue", "")
return result return result
def reset(self) -> None:
self._set_attributes()

View File

@ -3,26 +3,19 @@ from typing import Dict, Any, List
from pyhon.parameter.base import HonParameter from pyhon.parameter.base import HonParameter
def clean_value(value: str | float) -> str: def clean_value(value):
return str(value).strip("[]").replace("|", "_").lower() return str(value).strip("[]").replace("|", "_").lower()
class HonParameterEnum(HonParameter): class HonParameterEnum(HonParameter):
def __init__(self, key: str, attributes: Dict[str, Any], group: str) -> None: def __init__(self, key: str, attributes: Dict[str, Any], group: str) -> None:
super().__init__(key, attributes, group) super().__init__(key, attributes, group)
self._default: str | float = "" self._default = attributes.get("defaultValue")
self._value: str | float = "" self._value = self._default or "0"
self._values: List[str] = [] self._values: List[str] = attributes.get("enumValues", [])
self._set_attributes()
if self._default and clean_value(self._default.strip("[]")) not in self.values: if self._default and clean_value(self._default.strip("[]")) not in self.values:
self._values.append(self._default) self._values.append(self._default)
def _set_attributes(self) -> None:
super()._set_attributes()
self._default = self._attributes.get("defaultValue", "")
self._value = self._default or "0"
self._values = self._attributes.get("enumValues", [])
def __repr__(self) -> str: def __repr__(self) -> str:
return f"{self.__class__} (<{self.key}> {self.values})" return f"{self.__class__} (<{self.key}> {self.values})"
@ -31,7 +24,7 @@ class HonParameterEnum(HonParameter):
return [clean_value(value) for value in self._values] return [clean_value(value) for value in self._values]
@values.setter @values.setter
def values(self, values: List[str]) -> None: def values(self, values) -> None:
self._values = values self._values = values
@property @property
@ -48,4 +41,4 @@ class HonParameterEnum(HonParameter):
self._value = value self._value = value
self.check_trigger(value) self.check_trigger(value)
else: else:
raise ValueError(f"Allowed values: {self._values} But was: {value}") raise ValueError(f"Allowed values {self._values}")

View File

@ -6,19 +6,14 @@ from pyhon.parameter.base import HonParameter
class HonParameterFixed(HonParameter): class HonParameterFixed(HonParameter):
def __init__(self, key: str, attributes: Dict[str, Any], group: str) -> None: def __init__(self, key: str, attributes: Dict[str, Any], group: str) -> None:
super().__init__(key, attributes, group) super().__init__(key, attributes, group)
self._value: str | float = "" self._value = attributes.get("fixedValue", None)
self._set_attributes()
def _set_attributes(self) -> None:
super()._set_attributes()
self._value = self._attributes.get("fixedValue", "")
def __repr__(self) -> str: def __repr__(self) -> str:
return f"{self.__class__} (<{self.key}> fixed)" return f"{self.__class__} (<{self.key}> fixed)"
@property @property
def value(self) -> str | float: def value(self) -> str | float:
return self._value if self._value != "" else "0" return self._value if self._value is not None else "0"
@value.setter @value.setter
def value(self, value: str | float) -> None: def value(self, value: str | float) -> None:

View File

@ -28,7 +28,7 @@ class HonParameterProgram(HonParameterEnum):
if value in self.values: if value in self.values:
self._command.category = value self._command.category = value
else: else:
raise ValueError(f"Allowed values: {self.values} But was: {value}") raise ValueError(f"Allowed values {self.values}")
@property @property
def values(self) -> List[str]: def values(self) -> List[str]:
@ -36,21 +36,19 @@ class HonParameterProgram(HonParameterEnum):
return sorted(values) return sorted(values)
@values.setter @values.setter
def values(self, values: List[str]) -> None: def values(self, values) -> None:
raise ValueError("Cant set values {values}") return
@property @property
def ids(self) -> Dict[int, str]: def ids(self) -> Dict[int, str]:
values: Dict[int, str] = {} values = {
for name, parameter in self._programs.items(): int(p.parameters["prCode"].value): n
if "iot_" in name: for i, (n, p) in enumerate(self._programs.items())
continue if "iot_" not in n
if parameter.parameters.get("prCode"): and p.parameters.get("prCode")
continue and not ((fav := p.parameters.get("favourite")) and fav.value == "1")
if (fav := parameter.parameters.get("favourite")) and fav.value == "1": }
continue
values[int(parameter.parameters["prCode"].value)] = name
return dict(sorted(values.items())) return dict(sorted(values.items()))
def set_value(self, value: str) -> None: def set_value(self, value: str):
self._value = value self._value = value

View File

@ -7,22 +7,13 @@ from pyhon.parameter.base import HonParameter
class HonParameterRange(HonParameter): class HonParameterRange(HonParameter):
def __init__(self, key: str, attributes: Dict[str, Any], group: str) -> None: def __init__(self, key: str, attributes: Dict[str, Any], group: str) -> None:
super().__init__(key, attributes, group) super().__init__(key, attributes, group)
self._min: float = 0 self._min: float = str_to_float(attributes["minimumValue"])
self._max: float = 0 self._max: float = str_to_float(attributes["maximumValue"])
self._step: float = 0 self._step: float = str_to_float(attributes["incrementValue"])
self._default: float = 0 self._default: float = str_to_float(attributes.get("defaultValue", self.min))
self._value: float = 0 self._value: float = self._default
self._set_attributes()
def _set_attributes(self) -> None: def __repr__(self):
super()._set_attributes()
self._min = str_to_float(self._attributes.get("minimumValue", 0))
self._max = str_to_float(self._attributes.get("maximumValue", 0))
self._step = str_to_float(self._attributes.get("incrementValue", 0))
self._default = str_to_float(self._attributes.get("defaultValue", self.min))
self._value = self._default
def __repr__(self) -> str:
return f"{self.__class__} (<{self.key}> [{self.min} - {self.max}])" return f"{self.__class__} (<{self.key}> [{self.min} - {self.max}])"
@property @property
@ -58,14 +49,11 @@ class HonParameterRange(HonParameter):
@value.setter @value.setter
def value(self, value: str | float) -> None: def value(self, value: str | float) -> None:
value = str_to_float(value) value = str_to_float(value)
if self.min <= value <= self.max and not ((value - self.min) * 100) % ( if self.min <= value <= self.max and not (value - self.min) % self.step:
self.step * 100
):
self._value = value self._value = value
self.check_trigger(value) self.check_trigger(value)
else: else:
allowed = f"min {self.min} max {self.max} step {self.step}" raise ValueError(f"Allowed: min {self.min} max {self.max} step {self.step}")
raise ValueError(f"Allowed: {allowed} But was: {value}")
@property @property
def values(self) -> List[str]: def values(self) -> List[str]:

View File

@ -1,87 +0,0 @@
from typing import Dict, Any, TYPE_CHECKING, List
from pyhon.parameter.enum import HonParameterEnum
from pyhon.parameter.range import HonParameterRange
if TYPE_CHECKING:
from pyhon.commands import HonCommand
def key_print(data: Any, key: str = "", start: bool = True) -> str:
result = ""
if isinstance(data, list):
for i, value in enumerate(data):
result += key_print(value, key=f"{key}.{i}", start=False)
elif isinstance(data, dict):
for k, value in sorted(data.items()):
result += key_print(value, key=k if start else f"{key}.{k}", start=False)
else:
result += f"{key}: {data}\n"
return result
# yaml.dump() would be done the same, but needs an additional dependency...
def pretty_print(
data: Any,
key: str = "",
intend: int = 0,
is_list: bool = False,
whitespace: str = " ",
) -> str:
result = ""
space = whitespace * intend
if isinstance(data, (dict, list)) and key:
result += f"{space}{'- ' if is_list else ''}{key}:\n"
intend += 1
if isinstance(data, list):
for i, value in enumerate(data):
result += pretty_print(
value, intend=intend, is_list=True, whitespace=whitespace
)
elif isinstance(data, dict):
for i, (list_key, value) in enumerate(sorted(data.items())):
result += pretty_print(
value,
key=list_key,
intend=intend + (is_list if i else 0),
is_list=is_list and not i,
whitespace=whitespace,
)
else:
result += f"{space}{'- ' if is_list else ''}{key}{': ' if key else ''}{data}\n"
return result
def create_commands(
commands: Dict[str, "HonCommand"], concat: bool = False
) -> Dict[str, Any]:
result: Dict[str, Any] = {}
for name, command in commands.items():
for parameter, data in command.available_settings.items():
if isinstance(data, HonParameterEnum):
value: List[str] | Dict[str, str | float] = data.values
elif isinstance(data, HonParameterRange):
value = {"min": data.min, "max": data.max, "step": data.step}
else:
continue
if not concat:
result.setdefault(name, {})[parameter] = value
else:
result[f"{name}.{parameter}"] = value
return result
def create_rules(
commands: Dict[str, "HonCommand"], concat: bool = False
) -> Dict[str, Any]:
result: Dict[str, Any] = {}
for name, command in commands.items():
for parameter, data in command.available_settings.items():
value = data.triggers
if not value:
continue
if not concat:
result.setdefault(name, {})[parameter] = value
else:
result[f"{name}.{parameter}"] = value
return result

View File

View File

@ -3,11 +3,9 @@ from typing import List, Dict, TYPE_CHECKING, Any, Optional
from pyhon.parameter.enum import HonParameterEnum from pyhon.parameter.enum import HonParameterEnum
from pyhon.parameter.range import HonParameterRange from pyhon.parameter.range import HonParameterRange
from pyhon.typedefs import Parameter
if TYPE_CHECKING: if TYPE_CHECKING:
from pyhon.commands import HonCommand from pyhon.commands import HonCommand
from pyhon.parameter.base import HonParameter
@dataclass @dataclass
@ -20,28 +18,18 @@ class HonRule:
class HonRuleSet: class HonRuleSet:
def __init__(self, command: "HonCommand", rule: Dict[str, Any]): def __init__(self, command: "HonCommand", rule):
self._command: "HonCommand" = command self._command: "HonCommand" = command
self._rules: Dict[str, List[HonRule]] = {} self._rules: Dict[str, List[HonRule]] = {}
self._parse_rule(rule) self._parse_rule(rule)
@property def _parse_rule(self, rule):
def rules(self) -> Dict[str, List[HonRule]]:
return self._rules
def _parse_rule(self, rule: Dict[str, Any]) -> None:
for param_key, params in rule.items(): for param_key, params in rule.items():
param_key = self._command.appliance.options.get(param_key, param_key) param_key = self._command.appliance.options.get(param_key, param_key)
for trigger_key, trigger_data in params.items(): for trigger_key, trigger_data in params.items():
self._parse_conditions(param_key, trigger_key, trigger_data) self._parse_conditions(param_key, trigger_key, trigger_data)
def _parse_conditions( def _parse_conditions(self, param_key, trigger_key, trigger_data, extra=None):
self,
param_key: str,
trigger_key: str,
trigger_data: Dict[str, Any],
extra: Optional[Dict[str, str]] = None,
) -> None:
trigger_key = trigger_key.replace("@", "") trigger_key = trigger_key.replace("@", "")
trigger_key = self._command.appliance.options.get(trigger_key, trigger_key) trigger_key = self._command.appliance.options.get(trigger_key, trigger_key)
for multi_trigger_value, param_data in trigger_data.items(): for multi_trigger_value, param_data in trigger_data.items():
@ -56,28 +44,18 @@ class HonRuleSet:
extra[trigger_key] = trigger_value extra[trigger_key] = trigger_value
for extra_key, extra_data in param_data.items(): for extra_key, extra_data in param_data.items():
self._parse_conditions(param_key, extra_key, extra_data, extra) self._parse_conditions(param_key, extra_key, extra_data, extra)
else:
param_data = {"typology": "fixed", "fixedValue": param_data}
self._create_rule(
param_key, trigger_key, trigger_value, param_data, extra
)
def _create_rule( def _create_rule(
self, self, param_key, trigger_key, trigger_value, param_data, extras=None
param_key: str, ):
trigger_key: str,
trigger_value: str,
param_data: Dict[str, Any],
extras: Optional[Dict[str, str]] = None,
) -> None:
if param_data.get("fixedValue") == f"@{param_key}": if param_data.get("fixedValue") == f"@{param_key}":
return return
self._rules.setdefault(trigger_key, []).append( self._rules.setdefault(trigger_key, []).append(
HonRule(trigger_key, trigger_value, param_key, param_data, extras) HonRule(trigger_key, trigger_value, param_key, param_data, extras)
) )
def _duplicate_for_extra_conditions(self) -> None: def _duplicate_for_extra_conditions(self):
new: Dict[str, List[HonRule]] = {} new = {}
for rules in self._rules.values(): for rules in self._rules.values():
for rule in rules: for rule in rules:
if rule.extras is None: if rule.extras is None:
@ -93,53 +71,35 @@ class HonRuleSet:
for rule in rules: for rule in rules:
self._rules.setdefault(key, []).append(rule) self._rules.setdefault(key, []).append(rule)
def _extra_rules_matches(self, rule: HonRule) -> bool: def _add_trigger(self, parameter, data):
if rule.extras: def apply(rule: HonRule):
if rule.extras is not None:
for key, value in rule.extras.items(): for key, value in rule.extras.items():
if not self._command.parameters.get(key):
return False
if str(self._command.parameters.get(key)) != str(value): if str(self._command.parameters.get(key)) != str(value):
return False return
return True if param := self._command.parameters.get(rule.param_key):
if value := rule.param_data.get("fixedValue", ""):
def _apply_fixed(self, param: Parameter, value: str | float) -> None: if isinstance(param, HonParameterEnum) and set(param.values) != {
if isinstance(param, HonParameterEnum) and set(param.values) != {str(value)}: str(value)
}:
param.values = [str(value)] param.values = [str(value)]
param.value = str(value)
elif isinstance(param, HonParameterRange): elif isinstance(param, HonParameterRange):
if float(value) < param.min:
param.min = float(value)
elif float(value) > param.max:
param.max = float(value)
param.value = float(value) param.value = float(value)
return return
param.value = str(value) param.value = str(value)
elif rule.param_data.get("typology") == "enum":
def _apply_enum(self, param: Parameter, rule: HonRule) -> None: if isinstance(param, HonParameterEnum):
if not isinstance(param, HonParameterEnum):
return
if enum_values := rule.param_data.get("enumValues"): if enum_values := rule.param_data.get("enumValues"):
param.values = enum_values.split("|") param.values = enum_values.split("|")
if default_value := rule.param_data.get("defaultValue"): if default_value := rule.param_data.get("defaultValue"):
param.value = default_value param.value = default_value
def _add_trigger(self, parameter: "HonParameter", data: HonRule) -> None:
def apply(rule: HonRule) -> None:
if not self._extra_rules_matches(rule):
return
if not (param := self._command.parameters.get(rule.param_key)):
return
if fixed_value := rule.param_data.get("fixedValue", ""):
self._apply_fixed(param, fixed_value)
elif rule.param_data.get("typology") == "enum":
self._apply_enum(param, rule)
parameter.add_trigger(data.trigger_value, apply, data) parameter.add_trigger(data.trigger_value, apply, data)
def patch(self) -> None: def patch(self):
self._duplicate_for_extra_conditions() self._duplicate_for_extra_conditions()
for name, parameter in self._command.parameters.items(): for name, parameter in self._command.parameters.items():
if name not in self._rules: if name not in self._rules:
continue continue
for data in self._rules.get(name, []): for data in self._rules.get(name):
self._add_trigger(parameter, data) self._add_trigger(parameter, data)

View File

@ -1,27 +0,0 @@
from typing import Union, Any, TYPE_CHECKING, Protocol
import aiohttp
from yarl import URL
if TYPE_CHECKING:
from pyhon.parameter.base import HonParameter
from pyhon.parameter.enum import HonParameterEnum
from pyhon.parameter.fixed import HonParameterFixed
from pyhon.parameter.program import HonParameterProgram
from pyhon.parameter.range import HonParameterRange
class Callback(Protocol): # pylint: disable=too-few-public-methods
def __call__(
self, url: str | URL, *args: Any, **kwargs: Any
) -> aiohttp.client._RequestContextManager:
...
Parameter = Union[
"HonParameter",
"HonParameterRange",
"HonParameterEnum",
"HonParameterFixed",
"HonParameterProgram",
]

View File

@ -1,3 +1,2 @@
aiohttp>=3.8 aiohttp==3.8.4
yarl>=1.8 yarl==1.8.2
typing-extensions>=4.8

View File

@ -1,5 +1,4 @@
black>=22.12 black==23.3.0
flake8>=6.0 flake8==6.0.0
mypy>=0.991 mypy==1.2.0
pylint>=2.15 pylint==2.17.2
setuptools>=62.3

View File

@ -2,12 +2,12 @@
from setuptools import setup, find_packages from setuptools import setup, find_packages
with open("README.md", "r", encoding="utf-8") as f: with open("README.md", "r") as f:
long_description = f.read() long_description = f.read()
setup( setup(
name="pyhOn", name="pyhOn",
version="0.15.15", version="0.14.1",
author="Andre Basche", author="Andre Basche",
description="Control hOn devices with python", description="Control hOn devices with python",
long_description=long_description, long_description=long_description,
@ -21,7 +21,7 @@ setup(
packages=find_packages(), packages=find_packages(),
include_package_data=True, include_package_data=True,
python_requires=">=3.10", python_requires=">=3.10",
install_requires=["aiohttp>=3.8", "typing-extensions>=4.8", "yarl>=1.8"], install_requires=["aiohttp"],
classifiers=[ classifiers=[
"Development Status :: 4 - Beta", "Development Status :: 4 - Beta",
"Environment :: Console", "Environment :: Console",
@ -30,7 +30,6 @@ setup(
"Operating System :: OS Independent", "Operating System :: OS Independent",
"Programming Language :: Python :: 3.10", "Programming Language :: Python :: 3.10",
"Programming Language :: Python :: 3.11", "Programming Language :: Python :: 3.11",
"Programming Language :: Python :: 3.12",
"Topic :: Software Development :: Libraries :: Python Modules", "Topic :: Software Development :: Libraries :: Python Modules",
], ],
entry_points={ entry_points={