Compare commits

...

97 Commits

Author SHA1 Message Date
52837f16e3 Improve appliance import 2023-06-28 19:45:37 +02:00
2a6b040193 Add import parameter 2023-06-28 19:17:17 +02:00
9eb99f283b Add more type hints 2023-06-28 19:02:11 +02:00
ad0d065b03 Fix creating wrong zip archive 2023-06-26 02:19:07 +02:00
924e2c240d Fix wrong archive link 2023-06-25 18:30:29 +02:00
76bd189e7b Bump version 2023-06-25 17:49:03 +02:00
ef67188b93 Create data archive and use it to test 2023-06-25 17:30:15 +02:00
66cb7bcc24 Merge branch 'refactor2' 2023-06-22 00:03:07 +02:00
c25e898b42 Bump version 2023-06-21 19:56:09 +02:00
55966dd52f Fix typeerror hon#77 2023-06-21 18:02:07 +02:00
8c65a37f29 Add command loader class 2023-06-15 02:16:03 +02:00
1ca89995a2 Lock attributes 2023-06-13 00:39:18 +02:00
f6139db0b5 Use class for attributes 2023-06-13 00:12:29 +02:00
310d1bafd7 Improve rule parsing 2023-06-10 06:47:37 +02:00
9e35dcf9cf Don't send optional program rules 2023-06-10 06:40:55 +02:00
f9d0fa4ae8 Add wine cellar 2023-06-09 22:52:33 +02:00
11988c73a6 Fix command parameter issue hon#63 2023-06-09 02:09:20 +02:00
7b51caecca Improve update performance 2023-06-08 19:52:08 +02:00
38d09e2ef5 Fix step is 0 hon#60 2023-06-08 14:20:08 +02:00
3c7ad3f395 Fix changed hOn login 2023-06-07 02:27:02 +02:00
31c03faca8 Get program name 2023-05-29 19:05:37 +02:00
a081ef1f97 Add favourites to progams hon#47 2023-05-28 19:24:02 +02:00
4888f2b1d0 Add oven climate support 2023-05-28 17:41:20 +02:00
7c6ac15901 Add and improve fridge 2023-05-28 07:37:38 +02:00
eea79e28b9 Fix for fridge program names 2023-05-22 01:07:55 +02:00
ecbf438889 Bump version 2023-05-21 20:48:27 +02:00
9cd12e3234 Fixes for fridge 2023-05-21 20:33:08 +02:00
c2765fe953 Add rule handling 2023-05-21 02:25:43 +02:00
9971fe95e2 Get appliance code from serial number 2023-05-20 13:24:24 +02:00
9f130e2e85 Make 'code' attribute really optional 2023-05-19 00:48:08 +02:00
dfbc24452b Fix value check 2023-05-16 21:26:24 +02:00
af4fbdd8cd Make 'code' attribute optional, fix hon#51 2023-05-16 20:46:46 +02:00
b5af81b744 Improve set parameters 2023-05-15 19:29:42 +02:00
22a98e1781 Add favorites request 2023-05-14 23:04:36 +02:00
2feb3295e1 Fix some dw bugs 2023-05-14 23:03:46 +02:00
d350d639cc Add maintenance cycle 2023-05-13 00:16:52 +02:00
81c202d730 Use test devies 2023-05-11 00:44:18 +02:00
022da71800 Fix none-type attributes 2023-05-08 02:23:48 +02:00
8e16b4a215 Fix some errors 2023-05-08 00:03:53 +02:00
7bd3aac7c5 Fix error for no category 2023-05-07 17:39:22 +02:00
365a37b42d Fix send command error 2023-05-07 01:17:02 +02:00
2bde6bb61c Fix mypy error 2023-05-07 00:48:42 +02:00
ccff32e6c1 Fix missing program 2023-05-07 00:47:08 +02:00
22cbd7474a Fix performance issues 2023-05-07 00:28:24 +02:00
dd61b24eed Improve output 2023-05-06 20:06:51 +02:00
ea8f481b01 More general parsing 2023-05-06 16:08:36 +02:00
7dcb34559b Raise error if missing token 2023-04-24 21:50:25 +02:00
5db13a90e7 Fix some stuff for hoover appliances 2023-04-24 04:33:00 +02:00
9ee5dbc956 Get program name by prcode 2023-04-23 21:42:44 +02:00
d4c6ccdce3 Use parameter optional 2023-04-23 20:15:07 +02:00
9594b9ebd8 Use firmware version optional 2023-04-23 19:28:56 +02:00
b011d98e07 Expose fixed parameter as setting 2023-04-23 16:21:34 +02:00
ad864286fc Use firmware id optional 2023-04-23 14:40:39 +02:00
13cff8caa0 Bump version to 8.2 2023-04-23 03:35:13 +02:00
5fc6245806 Get values for every parameter 2023-04-22 23:08:44 +02:00
1dad0e14b8 Handle special dry level 11 2023-04-22 23:08:24 +02:00
b04c601ad6 Add log for empty token 2023-04-21 23:51:35 +02:00
3ec0f5a006 Merge pull request #12 from MiguelAngelLV/main
Oven overwrite
2023-04-19 20:18:32 +02:00
78bc85132f Black Style 2023-04-19 20:12:52 +02:00
191928287f Override Oven attributes when is disconnected 2023-04-19 19:29:59 +02:00
c0aab8b99d Merge remote-tracking branch 'origin/main' 2023-04-19 18:57:13 +02:00
b37715d0ca Bump version to 0.8.0 2023-04-19 17:37:55 +02:00
411effd814 Fix disappearing programs 2023-04-17 00:37:55 +02:00
04f19c4609 Fix error when sending commands 2023-04-17 00:07:22 +02:00
a68dcac379 Merge remote-tracking branch 'origin/main'
# Conflicts:
#	pyhon/appliance.py
2023-04-16 16:37:11 +02:00
69be63df2a Bump version to v0.8.0b5 2023-04-16 13:57:40 +02:00
6c44aa895d Disable zones for devices with just 1 zone, fixes #11 2023-04-16 13:33:21 +02:00
8372c75e30 expose ancillary parameter in settings 2023-04-16 13:11:56 +02:00
40cc0013a5 Override Oven attributes when is disconnected 2023-04-16 11:36:10 +02:00
f2aa3dc8fd Merge remote-tracking branch 'origin/main'
# Conflicts:
#	pyhon/appliances/ov.py
2023-04-16 09:58:00 +02:00
e887371bec expose ancillary parameter in settings 2023-04-16 04:23:08 +02:00
5b91747ec1 Add dish washer 2023-04-16 02:46:30 +02:00
8da2018302 Set fixed values 2023-04-16 02:44:20 +02:00
03187745bf Split up parameters 2023-04-16 01:43:37 +02:00
461a247ad3 More type hints 2023-04-16 01:36:10 +02:00
834f25a639 Remove filters, filter out recies #9 2023-04-16 00:40:59 +02:00
46ff9be4a2 Fix code depts 2023-04-15 23:02:37 +02:00
a1618bb18a Fix missing zone attribute 2023-04-15 22:25:34 +02:00
a957d7ac0f Fix error for zone devices 2023-04-15 21:58:20 +02:00
f54b7b2dbf Add mypy checks 2023-04-15 15:57:36 +02:00
b6ca12ebff Use dataclass for login data 2023-04-15 14:37:27 +02:00
4a0ee8569b Refactor, add hon auth handler 2023-04-15 14:22:04 +02:00
d52d622785 Add zone support 2023-04-15 04:16:50 +02:00
9643f66549 Bump version to v0.7.4 2023-04-15 00:31:45 +02:00
d26e33a055 Fix error in starting programs 2023-04-15 00:29:24 +02:00
0301427497 Remove coords from diagnose 2023-04-14 23:24:31 +02:00
272556586e Refresh token workaround because expires to fast 2023-04-14 23:15:27 +02:00
e82c14ec99 Add some type hints 2023-04-13 23:35:43 +02:00
970b94bfa7 Fix unclear session errors 2023-04-12 19:14:50 +02:00
33454f68b8 Encode username/password 2023-04-12 02:10:37 +02:00
6b2c60d552 Fix session issues 2023-04-12 01:07:33 +02:00
46e6a85e84 add diagnose property for devices 2023-04-11 22:14:36 +02:00
8c832b44cd Fix token refresh problems 2023-04-11 17:09:02 +02:00
b4b782c52c Improve logging for authentication errors 2023-04-11 00:59:00 +02:00
e857fe91de Fix login issue 2023-04-10 20:32:35 +02:00
79c9678492 Fix float convertion 2023-04-10 18:48:46 +02:00
18b0ecdd68 Override Oven attributes when is disconnected 2023-04-08 22:24:14 +02:00
44 changed files with 2180 additions and 786 deletions

View File

@ -2,7 +2,7 @@ name: Python check
on:
push:
branches: [ "main", "refactor" ]
branches: [ "main" ]
pull_request:
branches: [ "main" ]
@ -25,12 +25,15 @@ jobs:
run: |
python -m pip install --upgrade pip
python -m pip install -r requirements.txt
python -m pip install flake8 pylint black
python -m pip install -r requirements_dev.txt
- name: Lint with flake8
run: |
# stop the build if there are Python syntax errors or undefined names
flake8 . --count --select=E9,F63,F7,F82 --show-source --statistics
flake8 . --count --exit-zero --max-complexity=10 --max-line-length=88 --statistics
- name: Type check with mypy
run: |
mypy pyhon/
# - name: Analysing the code with pylint
# run: |
# pylint --max-line-length 88 $(git ls-files '*.py')

1
.gitignore vendored
View File

@ -4,3 +4,4 @@ __pycache__/
dist/
**/*.egg-info/
test*
build/

View File

@ -6,7 +6,7 @@
[![PyPI - Python Version](https://img.shields.io/pypi/pyversions/pyhOn)](https://www.python.org/)
[![PyPI - License](https://img.shields.io/pypi/l/pyhOn)](https://github.com/Andre0512/pyhOn/blob/main/LICENSE)
[![PyPI - Downloads](https://img.shields.io/pypi/dm/pyhOn)](https://pypistats.org/packages/pyhon)
Control your Haier appliances with python!
Control your Haier, Candy and Hoover appliances with python!
The idea behind this library is, to make the use of all available commands as simple as possible.
## Installation
@ -100,8 +100,6 @@ This generates a huge output. It is recommended to pipe this into a file
$ pyhOn translate fr > hon_fr.yaml
$ pyhOn translate en --json > hon_en.json
```
## Tested devices
- Haier Washing Machine HW90
## Usage example
This library is used for the custom [HomeAssistant Integration "Haier hOn"](https://github.com/Andre0512/hOn).
@ -109,3 +107,6 @@ This library is used for the custom [HomeAssistant Integration "Haier hOn"](http
## Contribution
Any kind of contribution is welcome!
| Please add your appliances data to our [hon-test-data collection](https://github.com/Andre0512/hon-test-data). <br/>This helps us to develop new features and not to break compatibility in newer versions. |
|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|

4
mypy.ini Normal file
View File

@ -0,0 +1,4 @@
[mypy]
check_untyped_defs = True
disallow_any_generics = True
disallow_untyped_defs = True

View File

@ -6,16 +6,17 @@ import logging
import sys
from getpass import getpass
from pathlib import Path
from typing import Tuple, Dict, Any
if __name__ == "__main__":
sys.path.insert(0, str(Path(__file__).parent.parent))
from pyhon import Hon, HonAPI
from pyhon import Hon, HonAPI, diagnose, printer
_LOGGER = logging.getLogger(__name__)
def get_arguments():
def get_arguments() -> Dict[str, Any]:
"""Get parsed arguments."""
parser = argparse.ArgumentParser(description="pyhOn: Command Line Utility")
parser.add_argument("-u", "--user", help="user for haier hOn account")
@ -24,6 +25,11 @@ def get_arguments():
keys = subparser.add_parser("keys", help="print as key format")
keys.add_argument("keys", help="print as key format", action="store_true")
keys.add_argument("--all", help="print also full keys", action="store_true")
export = subparser.add_parser("export")
export.add_argument("export", help="export pyhon data", action="store_true")
export.add_argument("--zip", help="create zip archive", action="store_true")
export.add_argument("--anonymous", help="anonymize data", action="store_true")
export.add_argument("directory", nargs="?", default=Path().cwd())
translate = subparser.add_parser(
"translate", help="print available translation keys"
)
@ -31,65 +37,13 @@ def get_arguments():
"translate", help="language (de, en, fr...)", metavar="LANGUAGE"
)
translate.add_argument("--json", help="print as json", action="store_true")
parser.add_argument(
"-i", "--import", help="import pyhon data", nargs="?", default=Path().cwd()
)
return vars(parser.parse_args())
# yaml.dump() would be done the same, but needs an additional dependency...
def pretty_print(data, key="", intend=0, is_list=False):
if type(data) is list:
if key:
print(f"{' ' * intend}{'- ' if is_list else ''}{key}:")
intend += 1
for i, value in enumerate(data):
pretty_print(value, intend=intend, is_list=True)
elif type(data) is dict:
if key:
print(f"{' ' * intend}{'- ' if is_list else ''}{key}:")
intend += 1
for i, (key, value) in enumerate(sorted(data.items())):
if is_list and not i:
pretty_print(value, key=key, intend=intend, is_list=True)
elif is_list:
pretty_print(value, key=key, intend=intend + 1)
else:
pretty_print(value, key=key, intend=intend)
else:
print(
f"{' ' * intend}{'- ' if is_list else ''}{key}{': ' if key else ''}{data}"
)
def key_print(data, key="", start=True):
if type(data) is list:
for i, value in enumerate(data):
key_print(value, key=f"{key}.{i}", start=False)
elif type(data) is dict:
for k, value in sorted(data.items()):
key_print(value, key=k if start else f"{key}.{k}", start=False)
else:
print(f"{key}: {data}")
def create_command(commands, concat=False):
result = {}
for name, command in commands.items():
if not concat:
result[name] = {}
for parameter, data in command.parameters.items():
if data.typology == "enum":
value = data.values
elif data.typology == "range":
value = {"min": data.min, "max": data.max, "step": data.step}
else:
continue
if not concat:
result[name][parameter] = value
else:
result[f"{name}.{parameter}"] = value
return result
async def translate(language, json_output=False):
async def translate(language: str, json_output: bool = False) -> None:
async with HonAPI(anonymous=True) as hon:
keys = await hon.translation_keys(language)
if json_output:
@ -102,34 +56,57 @@ async def translate(language, json_output=False):
.replace("\\r", "")
)
keys = json.loads(clean_keys)
pretty_print(keys)
print(printer.pretty_print(keys))
async def main():
args = get_arguments()
if language := args.get("translate"):
await translate(language, json_output=args.get("json"))
return
def get_login_data(args: Dict[str, str]) -> Tuple[str, str]:
if not (user := args["user"]):
user = input("User for hOn account: ")
if not (password := args["password"]):
password = getpass("Password for hOn account: ")
async with Hon(user, password) as hon:
return user, password
async def main() -> None:
args = get_arguments()
if language := args.get("translate"):
await translate(language, json_output=args.get("json", ""))
return
async with Hon(
*get_login_data(args), test_data_path=Path(args.get("import", ""))
) as hon:
for device in hon.appliances:
if args.get("export"):
anonymous = args.get("anonymous", False)
path = Path(args.get("directory", "."))
if not args.get("zip"):
for file in await diagnose.appliance_data(device, path, anonymous):
print(f"Created {file}")
else:
archive = await diagnose.zip_archive(device, path, anonymous)
print(f"Created {archive}")
continue
print("=" * 10, device.appliance_type, "-", device.nick_name, "=" * 10)
if args.get("keys"):
data = device.data.copy()
attr = "get" if args.get("all") else "pop"
key_print(data["attributes"].__getattribute__(attr)("parameters"))
key_print(data.__getattribute__(attr)("appliance"))
key_print(data)
pretty_print(create_command(device.commands, concat=True))
print(
printer.key_print(
data["attributes"].__getattribute__(attr)("parameters")
)
)
print(printer.key_print(data.__getattribute__(attr)("appliance")))
print(printer.key_print(data))
print(
printer.pretty_print(
printer.create_command(device.commands, concat=True)
)
)
else:
pretty_print({"data": device.data})
pretty_print({"settings": create_command(device.commands)})
print(diagnose.yaml_export(device))
def start():
def start() -> None:
try:
asyncio.run(main())
except KeyboardInterrupt:

View File

@ -1,174 +1,263 @@
import importlib
from contextlib import suppress
import logging
import re
from datetime import datetime, timedelta
from pathlib import Path
from typing import Optional, Dict, Any, TYPE_CHECKING, List
from pyhon import diagnose, exceptions
from pyhon.attributes import HonAttribute
from pyhon.command_loader import HonCommandLoader
from pyhon.commands import HonCommand
from pyhon.parameter import HonParameterFixed
from pyhon.parameter.base import HonParameter
from pyhon.parameter.range import HonParameterRange
from pyhon.typedefs import Parameter
if TYPE_CHECKING:
from pyhon import HonAPI
_LOGGER = logging.getLogger(__name__)
class HonAppliance:
def __init__(self, api, info):
_MINIMAL_UPDATE_INTERVAL = 5 # seconds
def __init__(
self, api: Optional["HonAPI"], info: Dict[str, Any], zone: int = 0
) -> None:
if attributes := info.get("attributes"):
info["attributes"] = {v["parName"]: v["parValue"] for v in attributes}
self._info = info
self._api = api
self._appliance_model = {}
self._info: Dict[str, Any] = info
self._api: Optional[HonAPI] = api
self._appliance_model: Dict[str, Any] = {}
self._commands = {}
self._statistics = {}
self._attributes = {}
self._commands: Dict[str, HonCommand] = {}
self._statistics: Dict[str, Any] = {}
self._attributes: Dict[str, Any] = {}
self._zone: int = zone
self._additional_data: Dict[str, Any] = {}
self._last_update: Optional[datetime] = None
self._default_setting = HonParameter("", {}, "")
try:
self._extra = importlib.import_module(
f"pyhon.appliances.{self.appliance_type.lower()}"
).Appliance()
).Appliance(self)
except ModuleNotFoundError:
self._extra = None
def __getitem__(self, item):
def __getitem__(self, item: str) -> Any:
if self._zone:
item += f"Z{self._zone}"
if "." in item:
result = self.data
for key in item.split("."):
if all([k in "0123456789" for k in key]) and type(result) is list:
if all(k in "0123456789" for k in key) and isinstance(result, list):
result = result[int(key)]
else:
result = result[key]
return result
else:
if item in self.data:
return self.data[item]
if item in self.attributes["parameters"]:
return self.attributes["parameters"].get(item)
return self.info[item]
if item in self.data:
return self.data[item]
if item in self.attributes["parameters"]:
return self.attributes["parameters"][item].value
return self.info[item]
def get(self, item, default=None):
def get(self, item: str, default: Any = None) -> Any:
try:
return self[item]
except (KeyError, IndexError):
return default
@property
def appliance_model_id(self):
return self._info.get("applianceModelId")
def _check_name_zone(self, name: str, frontend: bool = True) -> str:
zone = " Z" if frontend else "_z"
if (attribute := self._info.get(name, "")) and self._zone:
return f"{attribute}{zone}{self._zone}"
return attribute
@property
def appliance_type(self):
return self._info.get("applianceTypeName")
def appliance_model_id(self) -> str:
return self._info.get("applianceModelId", "")
@property
def mac_address(self):
return self._info.get("macAddress")
def appliance_type(self) -> str:
return self._info.get("applianceTypeName", "")
@property
def model_name(self):
return self._info.get("modelName")
def mac_address(self) -> str:
return self.info.get("macAddress", "")
@property
def nick_name(self):
return self._info.get("nickName")
def unique_id(self) -> str:
default_mac = "xx-xx-xx-xx-xx-xx"
import_name = f"{self.appliance_type.lower()}_{self.appliance_model_id}"
result = self._check_name_zone("macAddress", frontend=False)
result = result.replace(default_mac, import_name)
return result
@property
def commands_options(self):
return self._appliance_model.get("options")
def model_name(self) -> str:
return self._check_name_zone("modelName")
@property
def commands(self):
def brand(self) -> str:
return self._check_name_zone("brand")
@property
def nick_name(self) -> str:
result = self._check_name_zone("nickName")
if not result or re.findall("^[xX\s]+$", result):
return self.model_name
return result
@property
def code(self) -> str:
if code := self.info.get("code"):
return code
serial_number = self.info.get("serialNumber", "")
return serial_number[:8] if len(serial_number) < 18 else serial_number[:11]
@property
def model_id(self) -> int:
return self._info.get("applianceModelId", 0)
@property
def options(self) -> Dict[str, Any]:
return self._appliance_model.get("options", {})
@property
def commands(self) -> Dict[str, HonCommand]:
return self._commands
@property
def attributes(self):
def attributes(self) -> Dict[str, Any]:
return self._attributes
@property
def statistics(self):
def statistics(self) -> Dict[str, Any]:
return self._statistics
@property
def info(self):
def info(self) -> Dict[str, Any]:
return self._info
async def _recover_last_command_states(self, commands):
command_history = await self._api.command_history(self)
for name, command in commands.items():
last = next(
(
index
for (index, d) in enumerate(command_history)
if d.get("command", {}).get("commandName") == name
),
None,
)
if last is None:
continue
parameters = command_history[last].get("command", {}).get("parameters", {})
if command._multi and parameters.get("program"):
command.set_program(parameters.pop("program").split(".")[-1].lower())
command = self.commands[name]
for key, data in command.settings.items():
if (
not isinstance(data, HonParameterFixed)
and parameters.get(key) is not None
):
with suppress(ValueError):
data.value = parameters.get(key)
async def load_commands(self):
raw = await self._api.load_commands(self)
self._appliance_model = raw.pop("applianceModel")
for item in ["settings", "options", "dictionaryId"]:
raw.pop(item)
commands = {}
for command, attr in raw.items():
if "parameters" in attr:
commands[command] = HonCommand(command, attr, self._api, self)
elif "parameters" in attr[list(attr)[0]]:
multi = {}
for program, attr2 in attr.items():
program = program.split(".")[-1].lower()
cmd = HonCommand(
command, attr2, self._api, self, multi=multi, program=program
)
multi[program] = cmd
commands[command] = cmd
self._commands = commands
await self._recover_last_command_states(commands)
@property
def additional_data(self) -> Dict[str, Any]:
return self._additional_data
@property
def settings(self):
def zone(self) -> int:
return self._zone
@property
def api(self) -> "HonAPI":
"""api connection object"""
if self._api is None:
raise exceptions.NoAuthenticationException("Missing hOn login")
return self._api
async def load_commands(self) -> None:
command_loader = HonCommandLoader(self.api, self)
await command_loader.load_commands()
self._commands = command_loader.commands
self._additional_data = command_loader.additional_data
self._appliance_model = command_loader.appliance_data
async def load_attributes(self) -> None:
self._attributes = await self.api.load_attributes(self)
for name, values in self._attributes.pop("shadow").get("parameters").items():
if name in self._attributes.get("parameters", {}):
self._attributes["parameters"][name].update(values)
else:
self._attributes.setdefault("parameters", {})[name] = HonAttribute(
values
)
if self._extra:
self._attributes = self._extra.attributes(self._attributes)
async def load_statistics(self) -> None:
self._statistics = await self.api.load_statistics(self)
self._statistics |= await self.api.load_maintenance(self)
async def update(self, force: bool = False) -> None:
now = datetime.now()
if (
force
or not self._last_update
or self._last_update
< now - timedelta(seconds=self._MINIMAL_UPDATE_INTERVAL)
):
self._last_update = now
await self.load_attributes()
@property
def command_parameters(self) -> Dict[str, Dict[str, str | float]]:
return {n: c.parameter_value for n, c in self._commands.items()}
@property
def settings(self) -> Dict[str, Parameter]:
result = {}
for name, command in self._commands.items():
for key, setting in command.settings.items():
for key in command.setting_keys:
setting = command.settings.get(key, self._default_setting)
result[f"{name}.{key}"] = setting
if self._extra:
return self._extra.settings(result)
return result
@property
def parameters(self):
result = {}
def available_settings(self) -> List[str]:
result = []
for name, command in self._commands.items():
for key, parameter in command.parameters.items():
result.setdefault(name, {})[key] = parameter.value
for key in command.setting_keys:
result.append(f"{name}.{key}")
return result
async def load_attributes(self):
self._attributes = await self._api.load_attributes(self)
for name, values in self._attributes.pop("shadow").get("parameters").items():
self._attributes.setdefault("parameters", {})[name] = values["parNewVal"]
async def load_statistics(self):
self._statistics = await self._api.load_statistics(self)
async def update(self):
await self.load_attributes()
@property
def data(self):
def data(self) -> Dict[str, Any]:
result = {
"attributes": self.attributes,
"appliance": self.info,
"statistics": self.statistics,
**self.parameters,
"additional_data": self._additional_data,
**self.command_parameters,
**self.attributes,
}
if self._extra:
return self._extra.data(result)
return result
@property
def diagnose(self) -> str:
return diagnose.yaml_export(self, anonymous=True)
async def data_archive(self, path: Path) -> str:
return await diagnose.zip_archive(self, path, anonymous=True)
def sync_to_params(self, command_name: str) -> None:
if not (command := self.commands.get(command_name)):
return
for key, value in self.attributes.get("parameters", {}).items():
if isinstance(value, str) and (new := command.parameters.get(key)):
self.attributes["parameters"][key].update(
str(new.intern_value), shield=True
)
def sync_command(self, main: str, target: Optional[List[str]] = None) -> None:
base: Optional[HonCommand] = self.commands.get(main)
if not base:
return
for command, data in self.commands.items():
if command == main or target and command not in target:
continue
for name, parameter in data.parameters.items():
if base_value := base.parameters.get(name):
if isinstance(base_value, HonParameterRange) and isinstance(
parameter, HonParameterRange
):
parameter.max = base_value.max
parameter.min = base_value.min
parameter.step = base_value.step
elif isinstance(parameter, HonParameterRange):
parameter.max = int(base_value.value)
parameter.min = int(base_value.value)
parameter.step = 1
parameter.value = base_value.value

25
pyhon/appliances/base.py Normal file
View File

@ -0,0 +1,25 @@
from typing import Dict, Any, TYPE_CHECKING
from pyhon.parameter.program import HonParameterProgram
if TYPE_CHECKING:
from pyhon.appliance import HonAppliance
class ApplianceBase:
def __init__(self, appliance: "HonAppliance"):
self.parent = appliance
def attributes(self, data: Dict[str, Any]) -> Dict[str, Any]:
program_name = "No Program"
if program := int(str(data.get("parameters", {}).get("prCode", "0"))):
if start_cmd := self.parent.settings.get("startProgram.program"):
if isinstance(start_cmd, HonParameterProgram) and (
ids := start_cmd.ids
):
program_name = ids.get(program, program_name)
data["programName"] = program_name
return data
def settings(self, settings: Dict[str, Any]) -> Dict[str, Any]:
return settings

12
pyhon/appliances/dw.py Normal file
View File

@ -0,0 +1,12 @@
from typing import Any, Dict
from pyhon.appliances.base import ApplianceBase
class Appliance(ApplianceBase):
def attributes(self, data: Dict[str, Any]) -> Dict[str, Any]:
data = super().attributes(data)
if data.get("lastConnEvent", {}).get("category", "") == "DISCONNECTED":
data["parameters"]["machMode"].value = "0"
data["active"] = bool(data.get("activity"))
return data

View File

@ -1,23 +1,24 @@
from pyhon.parameter import HonParameterEnum
from typing import Any, Dict
from pyhon.appliances.base import ApplianceBase
from pyhon.parameter.program import HonParameterProgram
class Appliance:
_FILTERS = {
"default": "^(?!iot_(?:recipe|guided))\\S+$",
"recipe": "iot_recipe_",
"guided": "iot_guided_",
}
class Appliance(ApplianceBase):
def attributes(self, data: Dict[str, Any]) -> Dict[str, Any]:
data = super().attributes(data)
if data.get("lastConnEvent", {}).get("category", "") == "DISCONNECTED":
data["parameters"]["temp"].value = "0"
data["parameters"]["onOffStatus"].value = "0"
data["parameters"]["remoteCtrValid"].value = "0"
data["parameters"]["remainingTimeMM"].value = "0"
def __init__(self):
filters = list(self._FILTERS.values())
data = {"defaultValue": filters[0], "enumValues": filters}
self._program_filter = HonParameterEnum("program_filter", data)
data["active"] = data["parameters"]["onOffStatus"] == "1"
if program := int(data["parameters"]["prCode"]):
if (setting := self.parent.settings["startProgram.program"]) and isinstance(
setting, HonParameterProgram
):
data["programName"] = setting.ids.get(program, "")
def data(self, data):
return data
def settings(self, settings):
settings["program_filter"] = self._program_filter
value = self._FILTERS[self._program_filter.value]
settings["startProgram.program"].filter = value
return settings

25
pyhon/appliances/ref.py Normal file
View File

@ -0,0 +1,25 @@
from typing import Dict, Any
from pyhon.appliances.base import ApplianceBase
class Appliance(ApplianceBase):
def attributes(self, data: Dict[str, Any]) -> Dict[str, Any]:
data = super().attributes(data)
if data["parameters"]["holidayMode"] == "1":
data["modeZ1"] = "holiday"
elif data["parameters"]["intelligenceMode"] == "1":
data["modeZ1"] = "auto_set"
elif data["parameters"]["quickModeZ1"] == "1":
data["modeZ1"] = "super_cool"
else:
data["modeZ1"] = "no_mode"
if data["parameters"]["quickModeZ2"] == "1":
data["modeZ2"] = "super_freeze"
elif data["parameters"]["intelligenceMode"] == "1":
data["modeZ2"] = "auto_set"
else:
data["modeZ2"] = "no_mode"
return data

View File

@ -1,10 +1,20 @@
class Appliance:
def data(self, data):
if data["attributes"]["lastConnEvent"]["category"] == "DISCONNECTED":
data["attributes"]["parameters"]["machMode"] = "0"
data["active"] = bool(data.get("attributes", {}).get("activity"))
data["pause"] = data["attributes"]["parameters"]["machMode"] == "3"
from typing import Any, Dict
from pyhon.appliances.base import ApplianceBase
from pyhon.parameter.fixed import HonParameterFixed
class Appliance(ApplianceBase):
def attributes(self, data: Dict[str, Any]) -> Dict[str, Any]:
data = super().attributes(data)
if data.get("lastConnEvent", {}).get("category", "") == "DISCONNECTED":
data["parameters"]["machMode"].value = "0"
data["active"] = bool(data.get("activity"))
data["pause"] = data["parameters"]["machMode"] == "3"
return data
def settings(self, settings):
def settings(self, settings: Dict[str, Any]) -> Dict[str, Any]:
dry_level = settings.get("startProgram.dryLevel")
if isinstance(dry_level, HonParameterFixed) and dry_level.value == "11":
settings.pop("startProgram.dryLevel", None)
return settings

5
pyhon/appliances/wc.py Normal file
View File

@ -0,0 +1,5 @@
from pyhon.appliances.base import ApplianceBase
class Appliance(ApplianceBase):
pass

View File

@ -1,10 +1,16 @@
class Appliance:
def data(self, data):
if data["attributes"]["lastConnEvent"]["category"] == "DISCONNECTED":
data["attributes"]["parameters"]["machMode"] = "0"
data["active"] = bool(data.get("attributes", {}).get("activity"))
data["pause"] = data["attributes"]["parameters"]["machMode"] == "3"
from typing import Dict, Any
from pyhon.appliances.base import ApplianceBase
class Appliance(ApplianceBase):
def attributes(self, data: Dict[str, Any]) -> Dict[str, Any]:
data = super().attributes(data)
if data.get("lastConnEvent", {}).get("category", "") == "DISCONNECTED":
data["parameters"]["machMode"].value = "0"
data["active"] = bool(data.get("activity"))
data["pause"] = data["parameters"]["machMode"] == "3"
return data
def settings(self, settings):
def settings(self, settings: Dict[str, Any]) -> Dict[str, Any]:
return settings

View File

@ -1,10 +1,16 @@
class Appliance:
def data(self, data):
if data["attributes"]["lastConnEvent"]["category"] == "DISCONNECTED":
data["attributes"]["parameters"]["machMode"] = "0"
data["active"] = bool(data.get("attributes", {}).get("activity"))
data["pause"] = data["attributes"]["parameters"]["machMode"] == "3"
from typing import Any, Dict
from pyhon.appliances.base import ApplianceBase
class Appliance(ApplianceBase):
def attributes(self, data: Dict[str, Any]) -> Dict[str, Any]:
data = super().attributes(data)
if data.get("lastConnEvent", {}).get("category", "") == "DISCONNECTED":
data["parameters"]["machMode"].value = "0"
data["active"] = bool(data.get("activity"))
data["pause"] = data["parameters"]["machMode"] == "3"
return data
def settings(self, settings):
def settings(self, settings: Dict[str, Any]) -> Dict[str, Any]:
return settings

58
pyhon/attributes.py Normal file
View File

@ -0,0 +1,58 @@
from datetime import datetime, timedelta
from typing import Optional, Final, Dict
from pyhon.helper import str_to_float
class HonAttribute:
_LOCK_TIMEOUT: Final = 10
def __init__(self, data: Dict[str, str] | str):
self._value: str = ""
self._last_update: Optional[datetime] = None
self._lock_timestamp: Optional[datetime] = None
self.update(data)
@property
def value(self) -> float | str:
"""Attribute value"""
try:
return str_to_float(self._value)
except ValueError:
return self._value
@value.setter
def value(self, value: str) -> None:
self._value = value
@property
def last_update(self) -> Optional[datetime]:
"""Timestamp of last api update"""
return self._last_update
@property
def lock(self) -> bool:
"""Shows if value changes are forbidden"""
if not self._lock_timestamp:
return False
lock_until = self._lock_timestamp + timedelta(seconds=self._LOCK_TIMEOUT)
return lock_until >= datetime.utcnow()
def update(self, data: Dict[str, str] | str, shield: bool = False) -> bool:
if self.lock and not shield:
return False
if shield:
self._lock_timestamp = datetime.utcnow()
if isinstance(data, str):
self.value = data
return True
self.value = data.get("parNewVal", "")
if last_update := data.get("lastUpdate"):
try:
self._last_update = datetime.fromisoformat(last_update)
except ValueError:
self._last_update = None
return True
def __str__(self) -> str:
return self._value

204
pyhon/command_loader.py Normal file
View File

@ -0,0 +1,204 @@
import asyncio
from contextlib import suppress
from copy import copy
from typing import Dict, Any, Optional, TYPE_CHECKING, List, Collection
from pyhon.commands import HonCommand
from pyhon.parameter.fixed import HonParameterFixed
from pyhon.parameter.program import HonParameterProgram
if TYPE_CHECKING:
from pyhon import HonAPI, exceptions
from pyhon.appliance import HonAppliance
class HonCommandLoader:
"""Loads and parses hOn command data"""
def __init__(self, api: "HonAPI", appliance: "HonAppliance") -> None:
self._api_commands: Dict[str, Any] = {}
self._favourites: List[Dict[str, Any]] = []
self._command_history: List[Dict[str, Any]] = []
self._commands: Dict[str, HonCommand] = {}
self._api: "HonAPI" = api
self._appliance: "HonAppliance" = appliance
self._appliance_data: Dict[str, Any] = {}
self._additional_data: Dict[str, Any] = {}
@property
def api(self) -> "HonAPI":
"""api connection object"""
if self._api is None:
raise exceptions.NoAuthenticationException("Missing hOn login")
return self._api
@property
def appliance(self) -> "HonAppliance":
"""appliance object"""
return self._appliance
@property
def commands(self) -> Dict[str, HonCommand]:
"""Get list of hon commands"""
return self._commands
@property
def appliance_data(self) -> Dict[str, Any]:
"""Get command appliance data"""
return self._appliance_data
@property
def additional_data(self) -> Dict[str, Any]:
"""Get command additional data"""
return self._additional_data
async def load_commands(self) -> None:
"""Trigger loading of command data"""
await self._load_data()
self._appliance_data = self._api_commands.pop("applianceModel")
self._get_commands()
self._add_favourites()
self._recover_last_command_states()
async def _load_commands(self) -> None:
self._api_commands = await self._api.load_commands(self._appliance)
async def _load_favourites(self) -> None:
self._favourites = await self._api.load_favourites(self._appliance)
async def _load_command_history(self) -> None:
self._command_history = await self._api.load_command_history(self._appliance)
async def _load_data(self) -> None:
"""Callback parallel all relevant data"""
await asyncio.gather(
*[
self._load_commands(),
self._load_favourites(),
self._load_command_history(),
]
)
@staticmethod
def _is_command(data: Dict[str, Any]) -> bool:
"""Check if dict can be parsed as command"""
return (
data.get("description") is not None and data.get("protocolType") is not None
)
@staticmethod
def _clean_name(category: str) -> str:
"""Clean up category name"""
if "PROGRAM" in category:
return category.split(".")[-1].lower()
return category
def _get_commands(self) -> None:
"""Generates HonCommand dict from api data"""
commands = []
for name, data in self._api_commands.items():
if command := self._parse_command(data, name):
commands.append(command)
self._commands = {c.name: c for c in commands}
def _parse_command(
self,
data: Dict[str, Any] | str,
command_name: str,
categories: Optional[Dict[str, "HonCommand"]] = None,
category_name: str = "",
) -> Optional[HonCommand]:
"""Try to crate HonCommand object"""
if not isinstance(data, dict):
self._additional_data[command_name] = data
return None
if self._is_command(data):
return HonCommand(
command_name,
data,
self._appliance,
category_name=category_name,
categories=categories,
)
if category := self._parse_categories(data, command_name):
return category
return None
def _parse_categories(
self, data: Dict[str, Any], command_name: str
) -> Optional[HonCommand]:
"""Parse categories and create reference to other"""
categories: Dict[str, HonCommand] = {}
for category, value in data.items():
if command := self._parse_command(
value, command_name, category_name=category, categories=categories
):
categories[self._clean_name(category)] = command
if categories:
# setParameters should be at first place
if "setParameters" in categories:
return categories["setParameters"]
return list(categories.values())[0]
return None
def _get_last_command_index(self, name: str) -> Optional[int]:
"""Get index of last command execution"""
return next(
(
index
for (index, d) in enumerate(self._command_history)
if d.get("command", {}).get("commandName") == name
),
None,
)
def _set_last_category(
self, command: HonCommand, name: str, parameters: Dict[str, Any]
) -> HonCommand:
"""Set category to last state"""
if command.categories:
if program := parameters.pop("program", None):
command.category = self._clean_name(program)
elif category := parameters.pop("category", None):
command.category = category
else:
return command
return self.commands[name]
return command
def _recover_last_command_states(self) -> None:
"""Set commands to last state"""
for name, command in self.commands.items():
if (last_index := self._get_last_command_index(name)) is None:
continue
last_command = self._command_history[last_index]
parameters = last_command.get("command", {}).get("parameters", {})
command = self._set_last_category(command, name, parameters)
for key, data in command.settings.items():
if parameters.get(key) is None:
continue
with suppress(ValueError):
data.value = parameters.get(key)
def _add_favourites(self) -> None:
"""Patch program categories with favourites"""
for favourite in self._favourites:
name = favourite.get("favouriteName", {})
command = favourite.get("command", {})
command_name = command.get("commandName", "")
program_name = self._clean_name(command.get("programName", ""))
base: HonCommand = copy(
self.commands[command_name].categories[program_name]
)
for data in command.values():
if isinstance(data, str):
continue
for key, value in data.items():
if parameter := base.parameters.get(key):
with suppress(ValueError):
parameter.value = value
extra_param = HonParameterFixed("favourite", {"fixedValue": "1"}, "custom")
base.parameters.update(favourite=extra_param)
if isinstance(program := base.parameters["program"], HonParameterProgram):
program.set_value(name)
self.commands[command_name].categories[name] = base

View File

@ -1,90 +1,171 @@
from pyhon.parameter import (
HonParameterFixed,
HonParameterEnum,
HonParameterRange,
HonParameterProgram,
)
import logging
from typing import Optional, Dict, Any, List, TYPE_CHECKING, Union
from pyhon import exceptions
from pyhon.exceptions import ApiError, NoAuthenticationException
from pyhon.parameter.base import HonParameter
from pyhon.parameter.enum import HonParameterEnum
from pyhon.parameter.fixed import HonParameterFixed
from pyhon.parameter.program import HonParameterProgram
from pyhon.parameter.range import HonParameterRange
from pyhon.rules import HonRuleSet
from pyhon.typedefs import Parameter
if TYPE_CHECKING:
from pyhon import HonAPI
from pyhon.appliance import HonAppliance
_LOGGER = logging.getLogger(__name__)
class HonCommand:
def __init__(self, name, attributes, connector, device, multi=None, program=""):
self._connector = connector
self._device = device
self._name = name
self._multi = multi or {}
self._program = program
self._description = attributes.get("description", "")
self._parameters = self._create_parameters(attributes.get("parameters", {}))
self._ancillary_parameters = self._create_parameters(
attributes.get("ancillaryParameters", {})
)
def __init__(
self,
name: str,
attributes: Dict[str, Any],
appliance: "HonAppliance",
categories: Optional[Dict[str, "HonCommand"]] = None,
category_name: str = "",
):
self._api: Optional[HonAPI] = appliance.api
self._appliance: "HonAppliance" = appliance
self._name: str = name
self._categories: Optional[Dict[str, "HonCommand"]] = categories
self._category_name: str = category_name
self._description: str = attributes.pop("description", "")
self._protocol_type: str = attributes.pop("protocolType", "")
self._parameters: Dict[str, HonParameter] = {}
self._data: Dict[str, Any] = {}
self._available_settings: Dict[str, HonParameter] = {}
self._rules: List[HonRuleSet] = []
self._load_parameters(attributes)
def __repr__(self):
def __repr__(self) -> str:
return f"{self._name} command"
def _create_parameters(self, parameters):
result = {}
for parameter, attributes in parameters.items():
match attributes.get("typology"):
case "range":
result[parameter] = HonParameterRange(parameter, attributes)
case "enum":
result[parameter] = HonParameterEnum(parameter, attributes)
case "fixed":
result[parameter] = HonParameterFixed(parameter, attributes)
if self._multi:
result["program"] = HonParameterProgram("program", self)
return result
@property
def name(self) -> str:
return self._name
@property
def parameters(self):
def api(self) -> "HonAPI":
if self._api is None:
raise exceptions.NoAuthenticationException("Missing hOn login")
return self._api
@property
def appliance(self) -> "HonAppliance":
return self._appliance
@property
def data(self) -> Dict[str, Any]:
return self._data
@property
def parameters(self) -> Dict[str, HonParameter]:
return self._parameters
@property
def ancillary_parameters(self):
return {
key: parameter.value
for key, parameter in self._ancillary_parameters.items()
}
def settings(self) -> Dict[str, HonParameter]:
return self._parameters
async def send(self):
parameters = {
name: parameter.value for name, parameter in self._parameters.items()
}
return await self._connector.send_command(
self._device, self._name, parameters, self.ancillary_parameters
@property
def parameter_groups(self) -> Dict[str, Dict[str, Union[str, float]]]:
result: Dict[str, Dict[str, Union[str, float]]] = {}
for name, parameter in self._parameters.items():
result.setdefault(parameter.group, {})[name] = parameter.intern_value
return result
@property
def parameter_value(self) -> Dict[str, Union[str, float]]:
return {n: p.value for n, p in self._parameters.items()}
def _load_parameters(self, attributes: Dict[str, Dict[str, Any]]) -> None:
for key, items in attributes.items():
for name, data in items.items():
self._create_parameters(data, name, key)
for rule in self._rules:
rule.patch()
def _create_parameters(
self, data: Dict[str, Any], name: str, parameter: str
) -> None:
if name == "zoneMap" and self._appliance.zone:
data["default"] = self._appliance.zone
if data.get("category") == "rule":
if "fixedValue" not in data:
_LOGGER.error("Rule not supported: %s", data)
else:
self._rules.append(HonRuleSet(self, data["fixedValue"]))
match data.get("typology"):
case "range":
self._parameters[name] = HonParameterRange(name, data, parameter)
case "enum":
self._parameters[name] = HonParameterEnum(name, data, parameter)
case "fixed":
self._parameters[name] = HonParameterFixed(name, data, parameter)
case _:
self._data[name] = data
return
if self._category_name:
name = "program" if "PROGRAM" in self._category_name else "category"
self._parameters[name] = HonParameterProgram(name, self, "custom")
async def send(self) -> bool:
params = self.parameter_groups.get("parameters", {})
ancillary_params = self.parameter_groups.get("ancillaryParameters", {})
ancillary_params.pop("programRules", None)
self.appliance.sync_to_params(self.name)
try:
result = await self.api.send_command(
self._appliance, self._name, params, ancillary_params
)
if not result:
_LOGGER.error(result)
raise ApiError("Can't send command")
except NoAuthenticationException:
_LOGGER.error("No Authentication")
return False
return result
@property
def categories(self) -> Dict[str, "HonCommand"]:
if self._categories is None:
return {"_": self}
return self._categories
@property
def category(self) -> str:
return self._category_name
@category.setter
def category(self, category: str) -> None:
if category in self.categories:
self._appliance.commands[self._name] = self.categories[category]
@property
def setting_keys(self) -> List[str]:
return list(
{param for cmd in self.categories.values() for param in cmd.parameters}
)
def get_programs(self):
return self._multi
def set_program(self, program):
self._device.commands[self._name] = self._multi[program]
def _get_settings_keys(self, command=None):
command = command or self
keys = []
for key, parameter in command._parameters.items():
if isinstance(parameter, HonParameterFixed):
continue
if key not in keys:
keys.append(key)
return keys
@staticmethod
def _more_options(first: Parameter, second: Parameter) -> Parameter:
if isinstance(first, HonParameterFixed) and not isinstance(
second, HonParameterFixed
):
return second
if len(second.values) > len(first.values):
return second
return first
@property
def setting_keys(self):
if not self._multi:
return self._get_settings_keys()
result = [
key for cmd in self._multi.values() for key in self._get_settings_keys(cmd)
]
return list(set(result + ["program"]))
@property
def settings(self):
"""Parameters with typology enum and range"""
return {
s: self._parameters.get(s)
for s in self.setting_keys
if self._parameters.get(s) is not None
}
def available_settings(self) -> Dict[str, Parameter]:
result: Dict[str, Parameter] = {}
for command in self.categories.values():
for name, parameter in command.parameters.items():
if name in result:
result[name] = self._more_options(result[name], parameter)
else:
result[name] = parameter
return result

View File

@ -1,108 +1,190 @@
import json
import logging
from datetime import datetime
from pathlib import Path
from pprint import pformat
from types import TracebackType
from typing import Dict, Optional, Any, List, no_type_check, Type
from pyhon import const
from aiohttp import ClientSession
from typing_extensions import Self
from pyhon import const, exceptions
from pyhon.appliance import HonAppliance
from pyhon.connection.handler import HonConnectionHandler, HonAnonymousConnectionHandler
from pyhon.connection.auth import HonAuth
from pyhon.connection.handler.anonym import HonAnonymousConnectionHandler
from pyhon.connection.handler.hon import HonConnectionHandler
_LOGGER = logging.getLogger()
_LOGGER = logging.getLogger(__name__)
class HonAPI:
def __init__(self, email="", password="", anonymous=False, session=None) -> None:
def __init__(
self,
email: str = "",
password: str = "",
anonymous: bool = False,
session: Optional[ClientSession] = None,
) -> None:
super().__init__()
self._email = email
self._password = password
self._anonymous = anonymous
self._hon = None
self._hon_anonymous = None
self._session = session
self._email: str = email
self._password: str = password
self._anonymous: bool = anonymous
self._hon_handler: Optional[HonConnectionHandler] = None
self._hon_anonymous_handler: Optional[HonAnonymousConnectionHandler] = None
self._session: Optional[ClientSession] = session
async def __aenter__(self):
async def __aenter__(self) -> Self:
return await self.create()
async def __aexit__(self, exc_type, exc_val, exc_tb):
async def __aexit__(
self,
exc_type: Optional[Type[BaseException]],
exc: Optional[BaseException],
traceback: Optional[TracebackType],
) -> None:
await self.close()
async def create(self):
self._hon_anonymous = await HonAnonymousConnectionHandler(
@property
def auth(self) -> HonAuth:
if self._hon is None or self._hon.auth is None:
raise exceptions.NoAuthenticationException
return self._hon.auth
@property
def _hon(self) -> HonConnectionHandler:
if self._hon_handler is None:
raise exceptions.NoAuthenticationException
return self._hon_handler
@property
def _hon_anonymous(self) -> HonAnonymousConnectionHandler:
if self._hon_anonymous_handler is None:
raise exceptions.NoAuthenticationException
return self._hon_anonymous_handler
async def create(self) -> Self:
self._hon_anonymous_handler = await HonAnonymousConnectionHandler(
self._session
).create()
if not self._anonymous:
self._hon = await HonConnectionHandler(
self._hon_handler = await HonConnectionHandler(
self._email, self._password, self._session
).create()
return self
async def load_appliances(self):
async def load_appliances(self) -> List[Dict[str, Any]]:
async with self._hon.get(f"{const.API_URL}/commands/v1/appliance") as resp:
return await resp.json()
if result := await resp.json():
return result.get("payload", {}).get("appliances", {})
return []
async def load_commands(self, appliance: HonAppliance):
params = {
async def load_commands(self, appliance: HonAppliance) -> Dict[str, Any]:
params: Dict[str, str | int] = {
"applianceType": appliance.appliance_type,
"code": appliance.info["code"],
"applianceModelId": appliance.appliance_model_id,
"firmwareId": appliance.info["eepromId"],
"macAddress": appliance.mac_address,
"fwVersion": appliance.info["fwVersion"],
"os": const.OS,
"appVersion": const.APP_VERSION,
"series": appliance.info["series"],
"code": appliance.code,
}
url = f"{const.API_URL}/commands/v1/retrieve"
if firmware_id := appliance.info.get("eepromId"):
params["firmwareId"] = firmware_id
if firmware_version := appliance.info.get("fwVersion"):
params["fwVersion"] = firmware_version
if series := appliance.info.get("series"):
params["series"] = series
url: str = f"{const.API_URL}/commands/v1/retrieve"
async with self._hon.get(url, params=params) as response:
result = (await response.json()).get("payload", {})
result: Dict[str, Any] = (await response.json()).get("payload", {})
if not result or result.pop("resultCode") != "0":
_LOGGER.error(await response.json())
return {}
return result
async def command_history(self, appliance: HonAppliance):
url = f"{const.API_URL}/commands/v1/appliance/{appliance.mac_address}/history"
async def load_command_history(
self, appliance: HonAppliance
) -> List[Dict[str, Any]]:
url: str = (
f"{const.API_URL}/commands/v1/appliance/{appliance.mac_address}/history"
)
async with self._hon.get(url) as response:
result = await response.json()
result: Dict[str, Any] = await response.json()
if not result or not result.get("payload"):
return {}
return []
return result["payload"]["history"]
async def last_activity(self, appliance: HonAppliance):
url = f"{const.API_URL}/commands/v1/retrieve-last-activity"
params = {"macAddress": appliance.mac_address}
async def load_favourites(self, appliance: HonAppliance) -> List[Dict[str, Any]]:
url: str = (
f"{const.API_URL}/commands/v1/appliance/{appliance.mac_address}/favourite"
)
async with self._hon.get(url) as response:
result: Dict[str, Any] = await response.json()
if not result or not result.get("payload"):
return []
return result["payload"]["favourites"]
async def load_last_activity(self, appliance: HonAppliance) -> Dict[str, Any]:
url: str = f"{const.API_URL}/commands/v1/retrieve-last-activity"
params: Dict[str, str] = {"macAddress": appliance.mac_address}
async with self._hon.get(url, params=params) as response:
result = await response.json()
result: Dict[str, Any] = await response.json()
if result and (activity := result.get("attributes")):
return activity
return {}
async def load_attributes(self, appliance: HonAppliance):
params = {
async def load_appliance_data(self, appliance: HonAppliance) -> Dict[str, Any]:
url: str = f"{const.API_URL}/commands/v1/appliance-model"
params: Dict[str, str] = {
"code": appliance.code,
"macAddress": appliance.mac_address,
}
async with self._hon.get(url, params=params) as response:
result: Dict[str, Any] = await response.json()
if result:
return result.get("payload", {}).get("applianceModel", {})
return {}
async def load_attributes(self, appliance: HonAppliance) -> Dict[str, Any]:
params: Dict[str, str] = {
"macAddress": appliance.mac_address,
"applianceType": appliance.appliance_type,
"category": "CYCLE",
}
url = f"{const.API_URL}/commands/v1/context"
url: str = f"{const.API_URL}/commands/v1/context"
async with self._hon.get(url, params=params) as response:
return (await response.json()).get("payload", {})
async def load_statistics(self, appliance: HonAppliance):
params = {
async def load_statistics(self, appliance: HonAppliance) -> Dict[str, Any]:
params: Dict[str, str] = {
"macAddress": appliance.mac_address,
"applianceType": appliance.appliance_type,
}
url = f"{const.API_URL}/commands/v1/statistics"
url: str = f"{const.API_URL}/commands/v1/statistics"
async with self._hon.get(url, params=params) as response:
return (await response.json()).get("payload", {})
async def send_command(self, appliance, command, parameters, ancillary_parameters):
now = datetime.utcnow().isoformat()
data = {
async def load_maintenance(self, appliance: HonAppliance) -> Dict[str, Any]:
url = f"{const.API_URL}/commands/v1/maintenance-cycle"
params = {"macAddress": appliance.mac_address}
async with self._hon.get(url, params=params) as response:
return (await response.json()).get("payload", {})
async def send_command(
self,
appliance: HonAppliance,
command: str,
parameters: Dict[str, Any],
ancillary_parameters: Dict[str, Any],
) -> bool:
now: str = datetime.utcnow().isoformat()
data: Dict[str, Any] = {
"macAddress": appliance.mac_address,
"timestamp": f"{now[:-3]}Z",
"commandName": command,
"transactionId": f"{appliance.mac_address}_{now[:-3]}Z",
"applianceOptions": appliance.commands_options,
"appliance": self._hon.device.get(),
"applianceOptions": appliance.options,
"device": self._hon.device.get(mobile=True),
"attributes": {
"channel": "mobileApp",
"origin": "standardProgram",
@ -112,36 +194,40 @@ class HonAPI:
"parameters": parameters,
"applianceType": appliance.appliance_type,
}
url = f"{const.API_URL}/commands/v1/send"
async with self._hon.post(url, json=data) as resp:
json_data = await resp.json()
url: str = f"{const.API_URL}/commands/v1/send"
async with self._hon.post(url, json=data) as response:
json_data: Dict[str, Any] = await response.json()
if json_data.get("payload", {}).get("resultCode") == "0":
return True
_LOGGER.error(await response.text())
_LOGGER.error("%s - Payload:\n%s", url, pformat(data))
return False
async def appliance_configuration(self):
url = f"{const.API_URL}/config/v1/appliance-configuration"
async def appliance_configuration(self) -> Dict[str, Any]:
url: str = f"{const.API_URL}/config/v1/program-list-rules"
async with self._hon_anonymous.get(url) as response:
result = await response.json()
result: Dict[str, Any] = await response.json()
if result and (data := result.get("payload")):
return data
return {}
async def app_config(self, language="en", beta=True):
url = f"{const.API_URL}/app-config"
payload = {
async def app_config(
self, language: str = "en", beta: bool = True
) -> Dict[str, Any]:
url: str = f"{const.API_URL}/app-config"
payload_data: Dict[str, str | int] = {
"languageCode": language,
"beta": beta,
"appVersion": const.APP_VERSION,
"os": const.OS,
}
payload = json.dumps(payload, separators=(",", ":"))
payload: str = json.dumps(payload_data, separators=(",", ":"))
async with self._hon_anonymous.post(url, data=payload) as response:
if (result := await response.json()) and (data := result.get("payload")):
return data
return {}
async def translation_keys(self, language="en"):
async def translation_keys(self, language: str = "en") -> Dict[str, Any]:
config = await self.app_config(language=language)
if url := config.get("language", {}).get("jsonPath"):
async with self._hon_anonymous.get(url) as response:
@ -149,8 +235,66 @@ class HonAPI:
return result
return {}
async def close(self):
if self._hon:
await self._hon.close()
if self._hon_anonymous:
await self._hon_anonymous.close()
async def close(self) -> None:
if self._hon_handler is not None:
await self._hon_handler.close()
if self._hon_anonymous_handler is not None:
await self._hon_anonymous_handler.close()
class TestAPI(HonAPI):
def __init__(self, path: Path):
super().__init__()
self._anonymous = True
self._path: Path = path
def _load_json(self, appliance: HonAppliance, file: str) -> Dict[str, Any]:
directory = f"{appliance.appliance_type}_{appliance.appliance_model_id}".lower()
path = f"{self._path}/{directory}/{file}.json"
with open(path, "r", encoding="utf-8") as json_file:
return json.loads(json_file.read())
async def load_appliances(self) -> List[Dict[str, Any]]:
result = []
for appliance in self._path.glob("*/"):
with open(
appliance / "appliance_data.json", "r", encoding="utf-8"
) as json_file:
result.append(json.loads(json_file.read()))
return result
async def load_commands(self, appliance: HonAppliance) -> Dict[str, Any]:
return self._load_json(appliance, "commands")
@no_type_check
async def load_command_history(
self, appliance: HonAppliance
) -> List[Dict[str, Any]]:
return self._load_json(appliance, "command_history")
async def load_favourites(self, appliance: HonAppliance) -> List[Dict[str, Any]]:
return []
async def load_last_activity(self, appliance: HonAppliance) -> Dict[str, Any]:
return {}
async def load_appliance_data(self, appliance: HonAppliance) -> Dict[str, Any]:
return self._load_json(appliance, "appliance_data")
async def load_attributes(self, appliance: HonAppliance) -> Dict[str, Any]:
return self._load_json(appliance, "attributes")
async def load_statistics(self, appliance: HonAppliance) -> Dict[str, Any]:
return self._load_json(appliance, "statistics")
async def load_maintenance(self, appliance: HonAppliance) -> Dict[str, Any]:
return self._load_json(appliance, "maintenance")
async def send_command(
self,
appliance: HonAppliance,
command: str,
parameters: Dict[str, Any],
ancillary_parameters: Dict[str, Any],
) -> bool:
return True

View File

@ -3,198 +3,282 @@ import logging
import re
import secrets
import urllib
from pprint import pformat
from contextlib import suppress
from dataclasses import dataclass
from datetime import datetime, timedelta
from typing import Dict, Optional, Any
from urllib import parse
from urllib.parse import quote
import aiohttp
from aiohttp import ClientResponse
from yarl import URL
from pyhon import const
from pyhon import const, exceptions
from pyhon.connection.device import HonDevice
from pyhon.connection.handler.auth import HonAuthConnectionHandler
_LOGGER = logging.getLogger()
_LOGGER = logging.getLogger(__name__)
@dataclass
class HonLoginData:
url: str = ""
email: str = ""
password: str = ""
fw_uid: str = ""
loaded: Optional[Dict[str, Any]] = None
class HonAuth:
def __init__(self, session, email, password, device) -> None:
_TOKEN_EXPIRES_AFTER_HOURS = 8
_TOKEN_EXPIRE_WARNING_HOURS = 7
def __init__(
self,
session: aiohttp.ClientSession,
email: str,
password: str,
device: HonDevice,
) -> None:
self._session = session
self._email = email
self._password = password
self._request = HonAuthConnectionHandler(session)
self._login_data = HonLoginData()
self._login_data.email = email
self._login_data.password = password
self._access_token = ""
self._refresh_token = ""
self._cognito_token = ""
self._id_token = ""
self._device = device
self._expires: datetime = datetime.utcnow()
@property
def cognito_token(self):
def cognito_token(self) -> str:
return self._cognito_token
@property
def id_token(self):
def id_token(self) -> str:
return self._id_token
@property
def access_token(self):
def access_token(self) -> str:
return self._access_token
@property
def refresh_token(self):
def refresh_token(self) -> str:
return self._refresh_token
async def _load_login(self):
def _check_token_expiration(self, hours: int) -> bool:
return datetime.utcnow() >= self._expires + timedelta(hours=hours)
@property
def token_is_expired(self) -> bool:
return self._check_token_expiration(self._TOKEN_EXPIRES_AFTER_HOURS)
@property
def token_expires_soon(self) -> bool:
return self._check_token_expiration(self._TOKEN_EXPIRE_WARNING_HOURS)
async def _error_logger(self, response: ClientResponse, fail: bool = True) -> None:
output = "hOn Authentication Error\n"
for i, (status, url) in enumerate(self._request.called_urls):
output += f" {i + 1: 2d} {status} - {url}\n"
output += f"ERROR - {response.status} - {response.request_info.url}\n"
output += f"{15 * '='} Response {15 * '='}\n{await response.text()}\n{40 * '='}"
_LOGGER.error(output)
if fail:
raise exceptions.HonAuthenticationError("Can't login")
@staticmethod
def _generate_nonce() -> str:
nonce = secrets.token_hex(16)
nonce = f"{nonce[:8]}-{nonce[8:12]}-{nonce[12:16]}-{nonce[16:20]}-{nonce[20:]}"
return f"{nonce[:8]}-{nonce[8:12]}-{nonce[12:16]}-{nonce[16:20]}-{nonce[20:]}"
async def _load_login(self) -> bool:
login_url = await self._introduce()
login_url = await self._handle_redirects(login_url)
return await self._login_url(login_url)
async def _introduce(self) -> str:
redirect_uri = urllib.parse.quote(f"{const.APP}://mobilesdk/detect/oauth/done")
params = {
"response_type": "token+id_token",
"client_id": const.CLIENT_ID,
"redirect_uri": urllib.parse.quote(
f"{const.APP}://mobilesdk/detect/oauth/done"
),
"redirect_uri": redirect_uri,
"display": "touch",
"scope": "api openid refresh_token web",
"nonce": nonce,
"nonce": self._generate_nonce(),
}
params = "&".join([f"{k}={v}" for k, v in params.items()])
async with self._session.get(
f"{const.AUTH_API}/services/oauth2/authorize/expid_Login?{params}"
) as resp:
if not (login_url := re.findall("url = '(.+?)'", await resp.text())):
return False
async with self._session.get(login_url[0], allow_redirects=False) as redirect1:
if not (url := redirect1.headers.get("Location")):
return False
async with self._session.get(url, allow_redirects=False) as redirect2:
if not (
url := redirect2.headers.get("Location")
+ "&System=IoT_Mobile_App&RegistrationSubChannel=hOn"
):
return False
async with self._session.get(URL(url, encoded=True)) as login_screen:
if context := re.findall(
'"fwuid":"(.*?)","loaded":(\\{.*?})', await login_screen.text()
):
params_encode = "&".join([f"{k}={v}" for k, v in params.items()])
url = f"{const.AUTH_API}/services/oauth2/authorize/expid_Login?{params_encode}"
async with self._request.get(url) as response:
text = await response.text()
self._expires = datetime.utcnow()
if not (login_url := re.findall("url = '(.+?)'", text)):
if "oauth/done#access_token=" in text:
self._parse_token_data(text)
raise exceptions.HonNoAuthenticationNeeded()
await self._error_logger(response)
return login_url[0]
async def _manual_redirect(self, url: str) -> str:
async with self._request.get(url, allow_redirects=False) as response:
if not (new_location := response.headers.get("Location", "")):
await self._error_logger(response)
return new_location
async def _handle_redirects(self, login_url: str) -> str:
redirect1 = await self._manual_redirect(login_url)
redirect2 = await self._manual_redirect(redirect1)
return f"{redirect2}&System=IoT_Mobile_App&RegistrationSubChannel=hOn"
async def _login_url(self, login_url: str) -> bool:
headers = {"user-agent": const.USER_AGENT}
url = URL(login_url, encoded=True)
async with self._request.get(url, headers=headers) as response:
text = await response.text()
if context := re.findall('"fwuid":"(.*?)","loaded":(\\{.*?})', text):
fw_uid, loaded_str = context[0]
loaded = json.loads(loaded_str)
login_url = login_url[0].replace(
"/".join(const.AUTH_API.split("/")[:-1]), ""
)
return fw_uid, loaded, login_url
self._login_data.fw_uid = fw_uid
self._login_data.loaded = json.loads(loaded_str)
self._login_data.url = login_url.replace(const.AUTH_API, "")
return True
await self._error_logger(response)
return False
async def _login(self, fw_uid, loaded, login_url):
data = {
"message": {
"actions": [
{
"id": "79;a",
"descriptor": "apex://LightningLoginCustomController/ACTION$login",
"callingDescriptor": "markup://c:loginForm",
"params": {
"username": self._email,
"password": self._password,
"startUrl": parse.unquote(
login_url.split("startURL=")[-1]
).split("%3D")[0],
},
}
]
async def _login(self) -> str:
start_url = self._login_data.url.rsplit("startURL=", maxsplit=1)[-1]
start_url = parse.unquote(start_url).split("%3D")[0]
action = {
"id": "79;a",
"descriptor": "apex://LightningLoginCustomController/ACTION$login",
"callingDescriptor": "markup://c:loginForm",
"params": {
"username": self._login_data.email,
"password": self._login_data.password,
"startUrl": start_url,
},
}
data = {
"message": {"actions": [action]},
"aura.context": {
"mode": "PROD",
"fwuid": fw_uid,
"fwuid": self._login_data.fw_uid,
"app": "siteforce:loginApp2",
"loaded": loaded,
"loaded": self._login_data.loaded,
"dn": [],
"globals": {},
"uad": False,
},
"aura.pageURI": login_url,
"aura.pageURI": self._login_data.url,
"aura.token": None,
}
params = {"r": 3, "other.LightningLoginCustom.login": 1}
async with self._session.post(
async with self._request.post(
const.AUTH_API + "/s/sfsites/aura",
headers={"Content-Type": "application/x-www-form-urlencoded"},
data="&".join(f"{k}={json.dumps(v)}" for k, v in data.items()),
data="&".join(f"{k}={quote(json.dumps(v))}" for k, v in data.items()),
params=params,
) as response:
if response.status == 200:
try:
data = await response.json()
return data["events"][0]["attributes"]["values"]["url"]
except json.JSONDecodeError:
pass
except KeyError:
_LOGGER.error(
"Can't get login url - %s", pformat(await response.json())
)
_LOGGER.error(
"Unable to login: %s\n%s", response.status, await response.text()
)
with suppress(json.JSONDecodeError, KeyError):
result = await response.json()
return result["events"][0]["attributes"]["values"]["url"]
await self._error_logger(response)
return ""
async def _get_token(self, url):
async with self._session.get(url) as resp:
if resp.status != 200:
_LOGGER.error("Unable to get token: %s", resp.status)
return False
url = re.findall("href\\s*=\\s*[\"'](http.+?)[\"']", await resp.text())
if not url:
_LOGGER.error("Can't get login url - \n%s", await resp.text())
raise PermissionError
async with self._session.get(url[0]) as resp:
if resp.status != 200:
_LOGGER.error("Unable to get token: %s", resp.status)
return False
url = re.findall("href\\s*=\\s*[\"'](.*?)[\"']", await resp.text())
url = "/".join(const.AUTH_API.split("/")[:-1]) + url[0]
async with self._session.get(url) as resp:
if resp.status != 200:
_LOGGER.error("Unable to connect to the login service: %s", resp.status)
return False
text = await resp.text()
def _parse_token_data(self, text: str) -> bool:
if access_token := re.findall("access_token=(.*?)&", text):
self._access_token = access_token[0]
if refresh_token := re.findall("refresh_token=(.*?)&", text):
self._refresh_token = refresh_token[0]
if id_token := re.findall("id_token=(.*?)&", text):
self._id_token = id_token[0]
return True if access_token and refresh_token and id_token else False
async def _get_token(self, url: str) -> bool:
async with self._request.get(url) as response:
if response.status != 200:
await self._error_logger(response)
return False
url_search = re.findall(
"href\\s*=\\s*[\"'](.+?)[\"']", await response.text()
)
if not url_search:
await self._error_logger(response)
return False
if "ProgressiveLogin" in url_search[0]:
async with self._request.get(url_search[0]) as response:
if response.status != 200:
await self._error_logger(response)
return False
url_search = re.findall(
"href\\s*=\\s*[\"'](.*?)[\"']", await response.text()
)
url = const.AUTH_API + url_search[0]
async with self._request.get(url) as response:
if response.status != 200:
await self._error_logger(response)
return False
if not self._parse_token_data(await response.text()):
await self._error_logger(response)
return False
return True
async def authorize(self):
if login_site := await self._load_login():
fw_uid, loaded, login_url = login_site
else:
return False
if not (url := await self._login(fw_uid, loaded, login_url)):
return False
if not await self._get_token(url):
return False
async def _api_auth(self) -> bool:
post_headers = {"id-token": self._id_token}
data = self._device.get()
async with self._session.post(
async with self._request.post(
f"{const.API_URL}/auth/v1/login", headers=post_headers, json=data
) as resp:
) as response:
try:
json_data = await resp.json()
json_data = await response.json()
except json.JSONDecodeError:
_LOGGER.error("No JSON Data after POST: %s", await resp.text())
await self._error_logger(response)
return False
self._cognito_token = json_data["cognitoUser"]["Token"]
self._cognito_token = json_data.get("cognitoUser", {}).get("Token", "")
if not self._cognito_token:
_LOGGER.error(json_data)
raise exceptions.HonAuthenticationError()
return True
async def refresh(self):
async def authenticate(self) -> None:
self.clear()
try:
if not await self._load_login():
raise exceptions.HonAuthenticationError("Can't open login page")
if not (url := await self._login()):
raise exceptions.HonAuthenticationError("Can't login")
if not await self._get_token(url):
raise exceptions.HonAuthenticationError("Can't get token")
if not await self._api_auth():
raise exceptions.HonAuthenticationError("Can't get api token")
except exceptions.HonNoAuthenticationNeeded:
return
async def refresh(self) -> bool:
params = {
"client_id": const.CLIENT_ID,
"refresh_token": self._refresh_token,
"grant_type": "refresh_token",
}
async with self._session.post(
async with self._request.post(
f"{const.AUTH_API}/services/oauth2/token", params=params
) as resp:
if resp.status >= 400:
) as response:
if response.status >= 400:
await self._error_logger(response, fail=False)
return False
data = await resp.json()
data = await response.json()
self._expires = datetime.utcnow()
self._id_token = data["id_token"]
self._access_token = data["access_token"]
return True
return await self._api_auth()
def clear(self) -> None:
self._session.cookie_jar.clear_domain(const.AUTH_API.split("/")[-2])
self._request.called_urls = []
self._cognito_token = ""
self._id_token = ""
self._access_token = ""
self._refresh_token = ""

View File

@ -1,41 +1,45 @@
import secrets
from typing import Dict
from pyhon import const
class HonDevice:
def __init__(self):
self._app_version = const.APP_VERSION
self._os_version = const.OS_VERSION
self._os = const.OS
self._device_model = const.DEVICE_MODEL
self._mobile_id = secrets.token_hex(8)
def __init__(self) -> None:
self._app_version: str = const.APP_VERSION
self._os_version: int = const.OS_VERSION
self._os: str = const.OS
self._device_model: str = const.DEVICE_MODEL
self._mobile_id: str = secrets.token_hex(8)
@property
def app_version(self):
def app_version(self) -> str:
return self._app_version
@property
def os_version(self):
def os_version(self) -> int:
return self._os_version
@property
def os(self):
def os(self) -> str:
return self._os
@property
def device_model(self):
def device_model(self) -> str:
return self._device_model
@property
def mobile_id(self):
def mobile_id(self) -> str:
return self._mobile_id
def get(self):
return {
def get(self, mobile: bool = False) -> Dict[str, str | int]:
result: Dict[str, str | int] = {
"appVersion": self.app_version,
"mobileId": self.mobile_id,
"osVersion": self.os_version,
"os": self.os,
"osVersion": self.os_version,
"deviceModel": self.device_model,
}
if mobile:
result |= {"mobileOs": result.pop("os", "")}
return result

View File

@ -1,127 +0,0 @@
import json
from contextlib import asynccontextmanager
import aiohttp
from pyhon import const
from pyhon.connection.auth import HonAuth, _LOGGER
from pyhon.connection.device import HonDevice
class HonBaseConnectionHandler:
_HEADERS = {"user-agent": const.USER_AGENT, "Content-Type": "application/json"}
def __init__(self, session=None):
self._session = session
self._auth = None
async def __aenter__(self):
return await self.create()
async def __aexit__(self, exc_type, exc_val, exc_tb):
await self.close()
async def create(self):
self._session = aiohttp.ClientSession(headers=self._HEADERS)
return self
@asynccontextmanager
async def _intercept(self, method, *args, loop=0, **kwargs):
raise NotImplementedError
@asynccontextmanager
async def get(self, *args, **kwargs):
async with self._intercept(self._session.get, *args, **kwargs) as response:
yield response
@asynccontextmanager
async def post(self, *args, **kwargs):
async with self._intercept(self._session.post, *args, **kwargs) as response:
yield response
async def close(self):
await self._session.close()
class HonConnectionHandler(HonBaseConnectionHandler):
def __init__(self, email, password, session=None):
super().__init__(session=session)
self._device = HonDevice()
self._email = email
self._password = password
if not self._email:
raise PermissionError("Login-Error - An email address must be specified")
if not self._password:
raise PermissionError("Login-Error - A password address must be specified")
self._request_headers = {}
@property
def device(self):
return self._device
async def create(self):
await super().create()
self._auth = HonAuth(self._session, self._email, self._password, self._device)
return self
async def _check_headers(self, headers):
if (
"cognito-token" not in self._request_headers
or "id-token" not in self._request_headers
):
if await self._auth.authorize():
self._request_headers["cognito-token"] = self._auth.cognito_token
self._request_headers["id-token"] = self._auth.id_token
else:
raise PermissionError("Can't Login")
return {h: v for h, v in self._request_headers.items() if h not in headers}
@asynccontextmanager
async def _intercept(self, method, *args, loop=0, **kwargs):
kwargs["headers"] = await self._check_headers(kwargs.get("headers", {}))
async with method(*args, **kwargs) as response:
if response.status == 403 and not loop:
_LOGGER.info("Try refreshing token...")
await self._auth.refresh()
yield await self._intercept(method, *args, loop=loop + 1, **kwargs)
elif response.status == 403 and loop < 2:
_LOGGER.warning(
"%s - Error %s - %s",
response.request_info.url,
response.status,
await response.text(),
)
await self.create()
yield await self._intercept(method, *args, loop=loop + 1, **kwargs)
elif loop >= 2:
_LOGGER.error(
"%s - Error %s - %s",
response.request_info.url,
response.status,
await response.text(),
)
raise PermissionError("Login failure")
else:
try:
await response.json()
yield response
except json.JSONDecodeError:
_LOGGER.warning(
"%s - JsonDecodeError %s - %s",
response.request_info.url,
response.status,
await response.text(),
)
yield {}
class HonAnonymousConnectionHandler(HonBaseConnectionHandler):
_HEADERS = HonBaseConnectionHandler._HEADERS | {"x-api-key": const.API_KEY}
@asynccontextmanager
async def _intercept(self, method, *args, loop=0, **kwargs):
kwargs["headers"] = kwargs.pop("headers", {}) | self._HEADERS
async with method(*args, **kwargs) as response:
if response.status == 403:
print("Can't authorize")
yield response

View File

View File

@ -0,0 +1,26 @@
import logging
from collections.abc import AsyncIterator
from contextlib import asynccontextmanager
from typing import Dict, Any
import aiohttp
from pyhon import const
from pyhon.connection.handler.base import ConnectionHandler
from pyhon.typedefs import Callback
_LOGGER = logging.getLogger(__name__)
class HonAnonymousConnectionHandler(ConnectionHandler):
_HEADERS: Dict[str, str] = ConnectionHandler._HEADERS | {"x-api-key": const.API_KEY}
@asynccontextmanager
async def _intercept(
self, method: Callback, *args: Any, **kwargs: Any
) -> AsyncIterator[aiohttp.ClientResponse]:
kwargs["headers"] = kwargs.pop("headers", {}) | self._HEADERS
async with method(*args, **kwargs) as response:
if response.status == 403:
_LOGGER.error("Can't authenticate anymore")
yield response

View File

@ -0,0 +1,37 @@
import logging
from collections.abc import AsyncIterator
from contextlib import asynccontextmanager
from typing import Optional, List, Tuple, Any
import aiohttp
from pyhon import const
from pyhon.connection.handler.base import ConnectionHandler
from pyhon.typedefs import Callback
_LOGGER = logging.getLogger(__name__)
class HonAuthConnectionHandler(ConnectionHandler):
_HEADERS = {"user-agent": const.USER_AGENT}
def __init__(self, session: Optional[aiohttp.ClientSession] = None) -> None:
super().__init__(session)
self._called_urls: List[Tuple[int, str]] = []
@property
def called_urls(self) -> List[Tuple[int, str]]:
return self._called_urls
@called_urls.setter
def called_urls(self, called_urls: List[Tuple[int, str]]) -> None:
self._called_urls = called_urls
@asynccontextmanager
async def _intercept(
self, method: Callback, *args: Any, **kwargs: Any
) -> AsyncIterator[aiohttp.ClientResponse]:
kwargs["headers"] = kwargs.pop("headers", {}) | self._HEADERS
async with method(*args, **kwargs) as response:
self._called_urls.append((response.status, str(response.request_info.url)))
yield response

View File

@ -0,0 +1,76 @@
import logging
from collections.abc import AsyncIterator
from contextlib import asynccontextmanager
from types import TracebackType
from typing import Optional, Dict, Type, Any, Protocol
import aiohttp
from typing_extensions import Self
from pyhon import const, exceptions
from pyhon.typedefs import Callback
_LOGGER = logging.getLogger(__name__)
class ConnectionHandler:
_HEADERS: Dict[str, str] = {
"user-agent": const.USER_AGENT,
"Content-Type": "application/json",
}
def __init__(self, session: Optional[aiohttp.ClientSession] = None) -> None:
self._create_session: bool = session is None
self._session: Optional[aiohttp.ClientSession] = session
async def __aenter__(self) -> Self:
return await self.create()
async def __aexit__(
self,
exc_type: Optional[Type[BaseException]],
exc: Optional[BaseException],
traceback: Optional[TracebackType],
) -> None:
await self.close()
@property
def session(self) -> aiohttp.ClientSession:
if self._session is None:
raise exceptions.NoSessionException
return self._session
async def create(self) -> Self:
if self._create_session:
self._session = aiohttp.ClientSession()
return self
@asynccontextmanager
def _intercept(
self, method: Callback, *args: Any, loop: int = 0, **kwargs: Any
) -> AsyncIterator[aiohttp.ClientResponse]:
raise NotImplementedError
@asynccontextmanager
async def get(
self, *args: Any, **kwargs: Any
) -> AsyncIterator[aiohttp.ClientResponse]:
if self._session is None:
raise exceptions.NoSessionException()
response: aiohttp.ClientResponse
async with self._intercept(self._session.get, *args, **kwargs) as response: # type: ignore[arg-type]
yield response
@asynccontextmanager
async def post(
self, *args: Any, **kwargs: Any
) -> AsyncIterator[aiohttp.ClientResponse]:
if self._session is None:
raise exceptions.NoSessionException()
response: aiohttp.ClientResponse
async with self._intercept(self._session.post, *args, **kwargs) as response: # type: ignore[arg-type]
yield response
async def close(self) -> None:
if self._create_session and self._session is not None:
await self._session.close()

View File

@ -0,0 +1,99 @@
import json
import logging
from collections.abc import AsyncIterator
from contextlib import asynccontextmanager
from typing import Optional, Dict, Any
import aiohttp
from typing_extensions import Self
from pyhon.connection.auth import HonAuth
from pyhon.connection.device import HonDevice
from pyhon.connection.handler.base import ConnectionHandler
from pyhon.exceptions import HonAuthenticationError, NoAuthenticationException
from pyhon.typedefs import Callback
_LOGGER = logging.getLogger(__name__)
class HonConnectionHandler(ConnectionHandler):
def __init__(
self, email: str, password: str, session: Optional[aiohttp.ClientSession] = None
) -> None:
super().__init__(session=session)
self._device: HonDevice = HonDevice()
self._email: str = email
self._password: str = password
if not self._email:
raise HonAuthenticationError("An email address must be specified")
if not self._password:
raise HonAuthenticationError("A password address must be specified")
self._auth: Optional[HonAuth] = None
@property
def auth(self) -> HonAuth:
if self._auth is None:
raise NoAuthenticationException()
return self._auth
@property
def device(self) -> HonDevice:
return self._device
async def create(self) -> Self:
await super().create()
self._auth = HonAuth(self.session, self._email, self._password, self._device)
return self
async def _check_headers(self, headers: Dict[str, str]) -> Dict[str, str]:
if not (self.auth.cognito_token and self.auth.id_token):
await self.auth.authenticate()
headers["cognito-token"] = self.auth.cognito_token
headers["id-token"] = self.auth.id_token
return self._HEADERS | headers
@asynccontextmanager
async def _intercept(
self, method: Callback, *args: Any, loop: int = 0, **kwargs: Dict[str, str]
) -> AsyncIterator[aiohttp.ClientResponse]:
kwargs["headers"] = await self._check_headers(kwargs.get("headers", {}))
async with method(args[0], *args[1:], **kwargs) as response:
if (
self.auth.token_expires_soon or response.status in [401, 403]
) and loop == 0:
_LOGGER.info("Try refreshing token...")
await self.auth.refresh()
async with self._intercept(method, loop=loop + 1, **kwargs) as result:
yield result
elif (
self.auth.token_is_expired or response.status in [401, 403]
) and loop == 1:
_LOGGER.warning(
"%s - Error %s - %s",
response.request_info.url,
response.status,
await response.text(),
)
await self.create()
async with self._intercept(method, loop=loop + 1, **kwargs) as result:
yield result
elif loop >= 2:
_LOGGER.error(
"%s - Error %s - %s",
response.request_info.url,
response.status,
await response.text(),
)
raise HonAuthenticationError("Login failure")
else:
try:
await response.json()
yield response
except json.JSONDecodeError:
_LOGGER.warning(
"%s - JsonDecodeError %s - %s",
response.request_info.url,
response.status,
await response.text(),
)
raise HonAuthenticationError("Decode Error")

View File

@ -1,10 +1,10 @@
AUTH_API = "https://he-accounts.force.com/SmartHome"
AUTH_API = "https://account2.hon-smarthome.com"
API_URL = "https://api-iot.he.services"
API_KEY = "GRCqFhC6Gk@ikWXm1RmnSmX1cm,MxY-configuration"
APP = "hon"
# All seen id's (different accounts, different devices) are the same, so I guess this hash is static
CLIENT_ID = "3MVG9QDx8IX8nP5T2Ha8ofvlmjLZl5L_gvfbT9.HJvpHGKoAS_dcMN8LYpTSYeVFCraUnV.2Ag1Ki7m4znVO6"
APP_VERSION = "1.53.7"
APP_VERSION = "2.0.10"
OS_VERSION = 31
OS = "android"
DEVICE_MODEL = "exynos9820"

100
pyhon/diagnose.py Normal file
View File

@ -0,0 +1,100 @@
import asyncio
import json
import re
import shutil
from pathlib import Path
from typing import TYPE_CHECKING, List, Tuple
from pyhon import printer
if TYPE_CHECKING:
from pyhon.appliance import HonAppliance
def anonymize_data(data: str) -> str:
default_date = "1970-01-01T00:00:00.0Z"
default_mac = "xx-xx-xx-xx-xx-xx"
data = re.sub("[0-9A-Fa-f]{2}(-[0-9A-Fa-f]{2}){5}", default_mac, data)
data = re.sub("[\\d-]{10}T[\\d:]{8}(.\\d+)?Z", default_date, data)
for sensible in [
"serialNumber",
"code",
"nickName",
"mobileId",
"PK",
"SK",
"lat",
"lng",
]:
for match in re.findall(f'"{sensible}.*?":\\s"?(.+?)"?,?\\n', data):
replace = re.sub("[a-z]", "x", match)
replace = re.sub("[A-Z]", "X", replace)
replace = re.sub("\\d", "1", replace)
data = data.replace(match, replace)
return data
async def load_data(appliance: "HonAppliance", topic: str) -> Tuple[str, str]:
return topic, await getattr(appliance.api, f"load_{topic}")(appliance)
def write_to_json(data: str, topic: str, path: Path, anonymous: bool = False) -> Path:
json_data = json.dumps(data, indent=4)
if anonymous:
json_data = anonymize_data(json_data)
file = path / f"{topic}.json"
with open(file, "w", encoding="utf-8") as json_file:
json_file.write(json_data)
return file
async def appliance_data(
appliance: "HonAppliance", path: Path, anonymous: bool = False
) -> List[Path]:
requests = [
"commands",
"attributes",
"command_history",
"statistics",
"maintenance",
"appliance_data",
]
path /= f"{appliance.appliance_type}_{appliance.model_id}".lower()
path.mkdir(parents=True, exist_ok=True)
api_data = await asyncio.gather(*[load_data(appliance, name) for name in requests])
return [write_to_json(data, topic, path, anonymous) for topic, data in api_data]
async def zip_archive(
appliance: "HonAppliance", path: Path, anonymous: bool = False
) -> str:
data = await appliance_data(appliance, path, anonymous)
archive = data[0].parent
shutil.make_archive(str(archive.parent), "zip", archive)
shutil.rmtree(archive)
return f"{archive.stem}.zip"
def yaml_export(appliance: "HonAppliance", anonymous: bool = False) -> str:
data = {
"attributes": appliance.attributes.copy(),
"appliance": appliance.info,
"statistics": appliance.statistics,
"additional_data": appliance.additional_data,
}
data |= {n: c.parameter_groups for n, c in appliance.commands.items()}
extra = {n: c.data for n, c in appliance.commands.items() if c.data}
if extra:
data |= {"extra_command_data": extra}
if anonymous:
for sensible in ["serialNumber", "coords"]:
data.get("appliance", {}).pop(sensible, None)
data = {
"data": data,
"commands": printer.create_command(appliance.commands),
"rules": printer.create_rules(appliance.commands),
}
result = printer.pretty_print(data)
if anonymous:
result = anonymize_data(result)
return result

18
pyhon/exceptions.py Normal file
View File

@ -0,0 +1,18 @@
class HonAuthenticationError(Exception):
pass
class HonNoAuthenticationNeeded(Exception):
pass
class NoSessionException(Exception):
pass
class NoAuthenticationException(Exception):
pass
class ApiError(Exception):
pass

5
pyhon/helper.py Normal file
View File

@ -0,0 +1,5 @@
def str_to_float(string: str | float) -> float:
try:
return int(string)
except ValueError:
return float(str(string).replace(",", "."))

View File

@ -1,27 +1,66 @@
import asyncio
from typing import List
import logging
from pathlib import Path
from types import TracebackType
from typing import List, Optional, Dict, Any, Type
from pyhon import HonAPI
from aiohttp import ClientSession
from typing_extensions import Self
from pyhon import HonAPI, exceptions
from pyhon.appliance import HonAppliance
from pyhon.connection.api import TestAPI
_LOGGER = logging.getLogger(__name__)
class Hon:
def __init__(self, email, password, session=None):
self._email = email
self._password = password
self._session = session
self._appliances = []
self._api = None
def __init__(
self,
email: Optional[str] = "",
password: Optional[str] = "",
session: Optional[ClientSession] = None,
test_data_path: Optional[Path] = None,
):
self._email: Optional[str] = email
self._password: Optional[str] = password
self._session: ClientSession | None = session
self._appliances: List[HonAppliance] = []
self._api: Optional[HonAPI] = None
self._test_data_path: Path = test_data_path or Path().cwd()
async def __aenter__(self):
async def __aenter__(self) -> Self:
return await self.create()
async def __aexit__(self, exc_type, exc_val, exc_tb):
async def __aexit__(
self,
exc_type: Optional[Type[BaseException]],
exc: Optional[BaseException],
traceback: Optional[TracebackType],
) -> None:
await self.close()
async def create(self):
@property
def api(self) -> HonAPI:
if self._api is None:
raise exceptions.NoAuthenticationException
return self._api
@property
def email(self) -> str:
if not self._email:
raise ValueError("Missing email")
return self._email
@property
def password(self) -> str:
if not self._password:
raise ValueError("Missing password")
return self._password
async def create(self) -> Self:
self._api = await HonAPI(
self._email, self._password, session=self._session
self.email, self.password, session=self._session
).create()
await self.setup()
return self
@ -30,11 +69,17 @@ class Hon:
def appliances(self) -> List[HonAppliance]:
return self._appliances
async def setup(self):
for appliance in (await self._api.load_appliances())["payload"]["appliances"]:
appliance = HonAppliance(self._api, appliance)
if appliance.mac_address is None:
continue
@appliances.setter
def appliances(self, appliances: List[HonAppliance]) -> None:
self._appliances = appliances
async def _create_appliance(
self, appliance_data: Dict[str, Any], api: HonAPI, zone: int = 0
) -> None:
appliance = HonAppliance(api, appliance_data, zone=zone)
if appliance.mac_address == "":
return
try:
await asyncio.gather(
*[
appliance.load_attributes(),
@ -42,7 +87,26 @@ class Hon:
appliance.load_statistics(),
]
)
self._appliances.append(appliance)
except (KeyError, ValueError, IndexError) as error:
_LOGGER.exception(error)
_LOGGER.error("Device data - %s", appliance_data)
self._appliances.append(appliance)
async def close(self):
await self._api.close()
async def setup(self) -> None:
appliances = await self.api.load_appliances()
for appliance in appliances:
if (zones := int(appliance.get("zone", "0"))) > 1:
for zone in range(zones):
await self._create_appliance(
appliance.copy(), self.api, zone=zone + 1
)
await self._create_appliance(appliance, self.api)
if (
test_data := self._test_data_path / "hon-test-data" / "test_data"
).exists() or (test_data := test_data / "test_data").exists():
api = TestAPI(test_data)
for appliance in await api.load_appliances():
await self._create_appliance(appliance, api)
async def close(self) -> None:
await self.api.close()

View File

@ -1,154 +0,0 @@
import re
def str_to_float(string):
return float(string.replace(",", "."))
class HonParameter:
def __init__(self, key, attributes):
self._key = key
self._category = attributes.get("category")
self._typology = attributes.get("typology")
self._mandatory = attributes.get("mandatory")
self._value = ""
@property
def key(self):
return self._key
@property
def value(self):
return self._value if self._value is not None else "0"
@property
def category(self):
return self._category
@property
def typology(self):
return self._typology
@property
def mandatory(self):
return self._mandatory
class HonParameterFixed(HonParameter):
def __init__(self, key, attributes):
super().__init__(key, attributes)
self._value = attributes.get("fixedValue", None)
def __repr__(self):
return f"{self.__class__} (<{self.key}> fixed)"
@property
def value(self):
return self._value if self._value is not None else "0"
@value.setter
def value(self, value):
if not value == self._value:
raise ValueError("Can't change fixed value")
class HonParameterRange(HonParameter):
def __init__(self, key, attributes):
super().__init__(key, attributes)
self._min = str_to_float(attributes["minimumValue"])
self._max = str_to_float(attributes["maximumValue"])
self._step = str_to_float(attributes["incrementValue"])
self._default = str_to_float(attributes.get("defaultValue", self._min))
self._value = self._default
def __repr__(self):
return f"{self.__class__} (<{self.key}> [{self._min} - {self._max}])"
@property
def min(self):
return self._min
@property
def max(self):
return self._max
@property
def step(self):
return self._step
@property
def value(self):
return self._value if self._value is not None else self._min
@value.setter
def value(self, value):
value = str_to_float(value)
if self._min <= value <= self._max and not value % self._step:
self._value = value
else:
raise ValueError(
f"Allowed: min {self._min} max {self._max} step {self._step}"
)
class HonParameterEnum(HonParameter):
def __init__(self, key, attributes):
super().__init__(key, attributes)
self._default = attributes.get("defaultValue")
self._value = self._default or "0"
self._values = attributes.get("enumValues")
def __repr__(self):
return f"{self.__class__} (<{self.key}> {self.values})"
@property
def values(self):
return [str(value) for value in self._values]
@property
def value(self):
return self._value if self._value is not None else self.values[0]
@value.setter
def value(self, value):
if value in self.values:
self._value = value
else:
raise ValueError(f"Allowed values {self._value}")
class HonParameterProgram(HonParameterEnum):
def __init__(self, key, command):
super().__init__(key, {})
self._command = command
self._value = command._program
self._values = command._multi
self._typology = "enum"
self._filter = ""
@property
def value(self):
return self._value
@value.setter
def value(self, value):
if value in self.values:
self._command.set_program(value)
else:
raise ValueError(f"Allowed values {self._values}")
@property
def filter(self):
return self._filter
@filter.setter
def filter(self, filter):
self._filter = filter
@property
def values(self):
values = []
for value in self._values:
if not self._filter or re.findall(self._filter, str(value)):
values.append(str(value))
return sorted(values)

View File

87
pyhon/parameter/base.py Normal file
View File

@ -0,0 +1,87 @@
from typing import Dict, Any, List, Tuple, Callable, TYPE_CHECKING
if TYPE_CHECKING:
from pyhon.rules import HonRule
class HonParameter:
def __init__(self, key: str, attributes: Dict[str, Any], group: str) -> None:
self._key = key
self._category: str = attributes.get("category", "")
self._typology: str = attributes.get("typology", "")
self._mandatory: int = attributes.get("mandatory", 0)
self._value: str | float = ""
self._group: str = group
self._triggers: Dict[
str, List[Tuple[Callable[["HonRule"], None], "HonRule"]]
] = {}
@property
def key(self) -> str:
return self._key
@property
def value(self) -> str | float:
return self._value if self._value is not None else "0"
@value.setter
def value(self, value: str | float) -> None:
self._value = value
self.check_trigger(value)
@property
def intern_value(self) -> str:
return str(self.value)
@property
def values(self) -> List[str]:
return [str(self.value)]
@property
def category(self) -> str:
return self._category
@property
def typology(self) -> str:
return self._typology
@property
def mandatory(self) -> int:
return self._mandatory
@property
def group(self) -> str:
return self._group
def add_trigger(
self, value: str, func: Callable[["HonRule"], None], data: "HonRule"
) -> None:
if self._value == value:
func(data)
self._triggers.setdefault(value, []).append((func, data))
def check_trigger(self, value: str | float) -> None:
if str(value) in self._triggers:
for trigger in self._triggers[str(value)]:
func, args = trigger
func(args)
@property
def triggers(self) -> Dict[str, Any]:
result: Dict[str, Any] = {}
for value, rules in self._triggers.items():
for _, rule in rules:
if rule.extras:
param = result.setdefault(value, {})
for extra_key, extra_value in rule.extras.items():
param = param.setdefault(extra_key, {}).setdefault(
extra_value, {}
)
else:
param = result.setdefault(value, {})
if fixed_value := rule.param_data.get("fixedValue"):
param[rule.param_key] = fixed_value
else:
param[rule.param_key] = rule.param_data.get("defaultValue", "")
return result

44
pyhon/parameter/enum.py Normal file
View File

@ -0,0 +1,44 @@
from typing import Dict, Any, List
from pyhon.parameter.base import HonParameter
def clean_value(value: str | float) -> str:
return str(value).strip("[]").replace("|", "_").lower()
class HonParameterEnum(HonParameter):
def __init__(self, key: str, attributes: Dict[str, Any], group: str) -> None:
super().__init__(key, attributes, group)
self._default = attributes.get("defaultValue")
self._value = self._default or "0"
self._values: List[str] = attributes.get("enumValues", [])
if self._default and clean_value(self._default.strip("[]")) not in self.values:
self._values.append(self._default)
def __repr__(self) -> str:
return f"{self.__class__} (<{self.key}> {self.values})"
@property
def values(self) -> List[str]:
return [clean_value(value) for value in self._values]
@values.setter
def values(self, values: List[str]) -> None:
self._values = values
@property
def intern_value(self) -> str:
return str(self._value) if self._value is not None else str(self.values[0])
@property
def value(self) -> str | float:
return clean_value(self._value) if self._value is not None else self.values[0]
@value.setter
def value(self, value: str) -> None:
if value in self.values:
self._value = value
self.check_trigger(value)
else:
raise ValueError(f"Allowed values {self._values}")

22
pyhon/parameter/fixed.py Normal file
View File

@ -0,0 +1,22 @@
from typing import Dict, Any
from pyhon.parameter.base import HonParameter
class HonParameterFixed(HonParameter):
def __init__(self, key: str, attributes: Dict[str, Any], group: str) -> None:
super().__init__(key, attributes, group)
self._value = attributes.get("fixedValue", None)
def __repr__(self) -> str:
return f"{self.__class__} (<{self.key}> fixed)"
@property
def value(self) -> str | float:
return self._value if self._value is not None else "0"
@value.setter
def value(self, value: str | float) -> None:
# Fixed values seems being not so fixed as thought
self._value = value
self.check_trigger(value)

View File

@ -0,0 +1,54 @@
from typing import List, TYPE_CHECKING, Dict
from pyhon.parameter.enum import HonParameterEnum
if TYPE_CHECKING:
from pyhon.commands import HonCommand
class HonParameterProgram(HonParameterEnum):
_FILTER = ["iot_recipe", "iot_guided"]
def __init__(self, key: str, command: "HonCommand", group: str) -> None:
super().__init__(key, {}, group)
self._command = command
if "PROGRAM" in command.category:
self._value = command.category.split(".")[-1].lower()
else:
self._value = command.category
self._programs: Dict[str, "HonCommand"] = command.categories
self._typology: str = "enum"
@property
def value(self) -> str | float:
return self._value
@value.setter
def value(self, value: str) -> None:
if value in self.values:
self._command.category = value
else:
raise ValueError(f"Allowed values {self.values}")
@property
def values(self) -> List[str]:
values = [v for v in self._programs if all(f not in v for f in self._FILTER)]
return sorted(values)
@values.setter
def values(self, values: List[str]) -> None:
return
@property
def ids(self) -> Dict[int, str]:
values = {
int(p.parameters["prCode"].value): n
for i, (n, p) in enumerate(self._programs.items())
if "iot_" not in n
and p.parameters.get("prCode")
and not ((fav := p.parameters.get("favourite")) and fav.value == "1")
}
return dict(sorted(values.items()))
def set_value(self, value: str) -> None:
self._value = value

60
pyhon/parameter/range.py Normal file
View File

@ -0,0 +1,60 @@
from typing import Dict, Any, List
from pyhon.helper import str_to_float
from pyhon.parameter.base import HonParameter
class HonParameterRange(HonParameter):
def __init__(self, key: str, attributes: Dict[str, Any], group: str) -> None:
super().__init__(key, attributes, group)
self._min: float = str_to_float(attributes["minimumValue"])
self._max: float = str_to_float(attributes["maximumValue"])
self._step: float = str_to_float(attributes["incrementValue"])
self._default: float = str_to_float(attributes.get("defaultValue", self.min))
self._value: float = self._default
def __repr__(self) -> str:
return f"{self.__class__} (<{self.key}> [{self.min} - {self.max}])"
@property
def min(self) -> float:
return self._min
@min.setter
def min(self, mini: float) -> None:
self._min = mini
@property
def max(self) -> float:
return self._max
@max.setter
def max(self, maxi: float) -> None:
self._max = maxi
@property
def step(self) -> float:
if not self._step:
return 1
return self._step
@step.setter
def step(self, step: float) -> None:
self._step = step
@property
def value(self) -> str | float:
return self._value if self._value is not None else self.min
@value.setter
def value(self, value: str | float) -> None:
value = str_to_float(value)
if self.min <= value <= self.max and not (value - self.min) % self.step:
self._value = value
self.check_trigger(value)
else:
raise ValueError(f"Allowed: min {self.min} max {self.max} step {self.step}")
@property
def values(self) -> List[str]:
return [str(i) for i in range(int(self.min), int(self.max) + 1, int(self.step))]

94
pyhon/printer.py Normal file
View File

@ -0,0 +1,94 @@
from typing import Dict, Any, TYPE_CHECKING, List
from pyhon.parameter.enum import HonParameterEnum
from pyhon.parameter.range import HonParameterRange
if TYPE_CHECKING:
from pyhon.commands import HonCommand
def key_print(data: Any, key: str = "", start: bool = True) -> str:
result = ""
if isinstance(data, list):
for i, value in enumerate(data):
result += key_print(value, key=f"{key}.{i}", start=False)
elif isinstance(data, dict):
for k, value in sorted(data.items()):
result += key_print(value, key=k if start else f"{key}.{k}", start=False)
else:
result += f"{key}: {data}\n"
return result
# yaml.dump() would be done the same, but needs an additional dependency...
def pretty_print(
data: Any,
key: str = "",
intend: int = 0,
is_list: bool = False,
whitespace: str = " ",
) -> str:
result = ""
if isinstance(data, list):
if key:
result += f"{whitespace * intend}{'- ' if is_list else ''}{key}:\n"
intend += 1
for i, value in enumerate(data):
result += pretty_print(
value, intend=intend, is_list=True, whitespace=whitespace
)
elif isinstance(data, dict):
if key:
result += f"{whitespace * intend}{'- ' if is_list else ''}{key}:\n"
intend += 1
for i, (key, value) in enumerate(sorted(data.items())):
if is_list and not i:
result += pretty_print(
value, key=key, intend=intend, is_list=True, whitespace=whitespace
)
elif is_list:
result += pretty_print(
value, key=key, intend=intend + 1, whitespace=whitespace
)
else:
result += pretty_print(
value, key=key, intend=intend, whitespace=whitespace
)
else:
result += f"{whitespace * intend}{'- ' if is_list else ''}{key}{': ' if key else ''}{data}\n"
return result
def create_command(
commands: Dict[str, "HonCommand"], concat: bool = False
) -> Dict[str, Any]:
result: Dict[str, Any] = {}
for name, command in commands.items():
for parameter, data in command.available_settings.items():
if isinstance(data, HonParameterEnum):
value: List[str] | Dict[str, str | float] = data.values
elif isinstance(data, HonParameterRange):
value = {"min": data.min, "max": data.max, "step": data.step}
else:
continue
if not concat:
result.setdefault(name, {})[parameter] = value
else:
result[f"{name}.{parameter}"] = value
return result
def create_rules(
commands: Dict[str, "HonCommand"], concat: bool = False
) -> Dict[str, Any]:
result: Dict[str, Any] = {}
for name, command in commands.items():
for parameter, data in command.available_settings.items():
value = data.triggers
if not value:
continue
if not concat:
result.setdefault(name, {})[parameter] = value
else:
result[f"{name}.{parameter}"] = value
return result

117
pyhon/rules.py Normal file
View File

@ -0,0 +1,117 @@
from dataclasses import dataclass
from typing import List, Dict, TYPE_CHECKING, Any, Optional
from pyhon.parameter.enum import HonParameterEnum
from pyhon.parameter.range import HonParameterRange
if TYPE_CHECKING:
from pyhon.commands import HonCommand
from pyhon.parameter.base import HonParameter
@dataclass
class HonRule:
trigger_key: str
trigger_value: str
param_key: str
param_data: Dict[str, Any]
extras: Optional[Dict[str, str]] = None
class HonRuleSet:
def __init__(self, command: "HonCommand", rule: Dict[str, Any]):
self._command: "HonCommand" = command
self._rules: Dict[str, List[HonRule]] = {}
self._parse_rule(rule)
def _parse_rule(self, rule: Dict[str, Any]) -> None:
for param_key, params in rule.items():
param_key = self._command.appliance.options.get(param_key, param_key)
for trigger_key, trigger_data in params.items():
self._parse_conditions(param_key, trigger_key, trigger_data)
def _parse_conditions(
self,
param_key: str,
trigger_key: str,
trigger_data: Dict[str, Any],
extra: Optional[Dict[str, str]] = None,
) -> None:
trigger_key = trigger_key.replace("@", "")
trigger_key = self._command.appliance.options.get(trigger_key, trigger_key)
for multi_trigger_value, param_data in trigger_data.items():
for trigger_value in multi_trigger_value.split("|"):
if isinstance(param_data, dict) and "typology" in param_data:
self._create_rule(
param_key, trigger_key, trigger_value, param_data, extra
)
elif isinstance(param_data, dict):
if extra is None:
extra = {}
extra[trigger_key] = trigger_value
for extra_key, extra_data in param_data.items():
self._parse_conditions(param_key, extra_key, extra_data, extra)
def _create_rule(
self,
param_key: str,
trigger_key: str,
trigger_value: str,
param_data: Dict[str, Any],
extras: Optional[Dict[str, str]] = None,
) -> None:
if param_data.get("fixedValue") == f"@{param_key}":
return
self._rules.setdefault(trigger_key, []).append(
HonRule(trigger_key, trigger_value, param_key, param_data, extras)
)
def _duplicate_for_extra_conditions(self) -> None:
new: Dict[str, List[HonRule]] = {}
for rules in self._rules.values():
for rule in rules:
if rule.extras is None:
continue
for key, value in rule.extras.items():
extras = rule.extras.copy()
extras.pop(key)
extras[rule.trigger_key] = rule.trigger_value
new.setdefault(key, []).append(
HonRule(key, value, rule.param_key, rule.param_data, extras)
)
for key, rules in new.items():
for rule in rules:
self._rules.setdefault(key, []).append(rule)
def _add_trigger(self, parameter: "HonParameter", data: HonRule) -> None:
def apply(rule: HonRule) -> None:
if rule.extras is not None:
for key, value in rule.extras.items():
if str(self._command.parameters.get(key)) != str(value):
return
if param := self._command.parameters.get(rule.param_key):
if value := rule.param_data.get("fixedValue", ""):
if isinstance(param, HonParameterEnum) and set(param.values) != {
str(value)
}:
param.values = [str(value)]
elif isinstance(param, HonParameterRange):
param.value = float(value)
return
param.value = str(value)
elif rule.param_data.get("typology") == "enum":
if isinstance(param, HonParameterEnum):
if enum_values := rule.param_data.get("enumValues"):
param.values = enum_values.split("|")
if default_value := rule.param_data.get("defaultValue"):
param.value = default_value
parameter.add_trigger(data.trigger_value, apply, data)
def patch(self) -> None:
self._duplicate_for_extra_conditions()
for name, parameter in self._command.parameters.items():
if name not in self._rules:
continue
for data in self._rules.get(name, []):
self._add_trigger(parameter, data)

27
pyhon/typedefs.py Normal file
View File

@ -0,0 +1,27 @@
from typing import Union, Any, TYPE_CHECKING, Protocol
import aiohttp
from yarl import URL
if TYPE_CHECKING:
from pyhon.parameter.base import HonParameter
from pyhon.parameter.enum import HonParameterEnum
from pyhon.parameter.fixed import HonParameterFixed
from pyhon.parameter.program import HonParameterProgram
from pyhon.parameter.range import HonParameterRange
class Callback(Protocol):
def __call__(
self, url: str | URL, *args: Any, **kwargs: Any
) -> aiohttp.client._RequestContextManager:
...
Parameter = Union[
"HonParameter",
"HonParameterRange",
"HonParameterEnum",
"HonParameterFixed",
"HonParameterProgram",
]

View File

@ -1 +1,2 @@
aiohttp
aiohttp==3.8.4
yarl==1.8.2

4
requirements_dev.txt Normal file
View File

@ -0,0 +1,4 @@
black==23.3.0
flake8==6.0.0
mypy==1.2.0
pylint==2.17.2

View File

@ -7,7 +7,7 @@ with open("README.md", "r") as f:
setup(
name="pyhOn",
version="0.6.1",
version="0.14.3",
author="Andre Basche",
description="Control hOn devices with python",
long_description=long_description,