Compare commits
No commits in common. "main" and "v0.14.3" have entirely different histories.
12
.github/workflows/python-check.yml
vendored
12
.github/workflows/python-check.yml
vendored
@ -13,7 +13,7 @@ jobs:
|
|||||||
strategy:
|
strategy:
|
||||||
fail-fast: false
|
fail-fast: false
|
||||||
matrix:
|
matrix:
|
||||||
python-version: ["3.10", "3.11", "3.12"]
|
python-version: ["3.10", "3.11"]
|
||||||
|
|
||||||
steps:
|
steps:
|
||||||
- uses: actions/checkout@v3
|
- uses: actions/checkout@v3
|
||||||
@ -28,13 +28,15 @@ jobs:
|
|||||||
python -m pip install -r requirements_dev.txt
|
python -m pip install -r requirements_dev.txt
|
||||||
- name: Lint with flake8
|
- name: Lint with flake8
|
||||||
run: |
|
run: |
|
||||||
flake8 . --count --statistics
|
# stop the build if there are Python syntax errors or undefined names
|
||||||
|
flake8 . --count --select=E9,F63,F7,F82 --show-source --statistics
|
||||||
|
flake8 . --count --exit-zero --max-complexity=10 --max-line-length=88 --statistics
|
||||||
- name: Type check with mypy
|
- name: Type check with mypy
|
||||||
run: |
|
run: |
|
||||||
mypy pyhon/
|
mypy pyhon/
|
||||||
- name: Analysing the code with pylint
|
# - name: Analysing the code with pylint
|
||||||
run: |
|
# run: |
|
||||||
pylint $(git ls-files '*.py')
|
# pylint --max-line-length 88 $(git ls-files '*.py')
|
||||||
- name: Check black style
|
- name: Check black style
|
||||||
run: |
|
run: |
|
||||||
black . --check
|
black . --check
|
||||||
|
@ -1,9 +0,0 @@
|
|||||||
[MESSAGES CONTROL]
|
|
||||||
|
|
||||||
disable=missing-docstring
|
|
||||||
|
|
||||||
[FORMAT]
|
|
||||||
|
|
||||||
max-args=6
|
|
||||||
max-attributes=8
|
|
||||||
max-line-length=88
|
|
@ -1 +0,0 @@
|
|||||||
include pyhon/py.typed
|
|
11
README.md
11
README.md
@ -1,14 +1,3 @@
|
|||||||
# Announcement: I have to take the project down in the next few days
|
|
||||||
> Dear User,
|
|
||||||
>
|
|
||||||
> We are writing to inform you that we have discovered two Home Assistant integration plug-ins developed by you ( https://github.com/Andre0512/hon and https://github.com/Andre0512/pyhOn ) that are in violation of our terms of service. Specifically, the plug-ins are using our services in an unauthorized manner which is causing significant economic harm to our Company.
|
|
||||||
> We take the protection of our intellectual property very seriously and demand that you immediately cease and desist all illegal activities related to the development and distribution of these plug-ins. We also request that you remove the plug-ins from all stores and code hosting platforms where they are currently available.
|
|
||||||
> Please be advised that we will take all necessary legal action to protect our interests if you fail to comply with this notice. We reserve the right to pursue all available remedies, including but not limited to monetary damages, injunctive relief, and attorney's fees.
|
|
||||||
> We strongly urge you to take immediate action to rectify this situation and avoid any further legal action. If you have any questions or concerns, please do not hesitate to contact us.
|
|
||||||
>
|
|
||||||
> Haier Europe Security and Governance Department
|
|
||||||
|
|
||||||
|
|
||||||
**This python package is unofficial and is not related in any way to Haier. It was developed by reversed engineered requests and can stop working at anytime!**
|
**This python package is unofficial and is not related in any way to Haier. It was developed by reversed engineered requests and can stop working at anytime!**
|
||||||
|
|
||||||
# pyhOn
|
# pyhOn
|
||||||
|
26
mypy.ini
26
mypy.ini
@ -1,24 +1,4 @@
|
|||||||
[mypy]
|
[mypy]
|
||||||
check_untyped_defs = true
|
check_untyped_defs = True
|
||||||
disallow_any_generics = true
|
disallow_any_generics = True
|
||||||
disallow_any_unimported = true
|
disallow_untyped_defs = True
|
||||||
disallow_incomplete_defs = true
|
|
||||||
disallow_subclassing_any = true
|
|
||||||
disallow_untyped_calls = true
|
|
||||||
disallow_untyped_decorators = true
|
|
||||||
disallow_untyped_defs = true
|
|
||||||
disable_error_code = annotation-unchecked
|
|
||||||
enable_error_code = ignore-without-code, redundant-self, truthy-iterable
|
|
||||||
follow_imports = silent
|
|
||||||
local_partial_types = true
|
|
||||||
no_implicit_optional = true
|
|
||||||
no_implicit_reexport = true
|
|
||||||
show_error_codes = true
|
|
||||||
strict_concatenate = false
|
|
||||||
strict_equality = true
|
|
||||||
warn_incomplete_stub = true
|
|
||||||
warn_redundant_casts = true
|
|
||||||
warn_return_any = true
|
|
||||||
warn_unreachable = true
|
|
||||||
warn_unused_configs = true
|
|
||||||
warn_unused_ignores = true
|
|
@ -11,7 +11,6 @@ from typing import Tuple, Dict, Any
|
|||||||
if __name__ == "__main__":
|
if __name__ == "__main__":
|
||||||
sys.path.insert(0, str(Path(__file__).parent.parent))
|
sys.path.insert(0, str(Path(__file__).parent.parent))
|
||||||
|
|
||||||
# pylint: disable=wrong-import-position
|
|
||||||
from pyhon import Hon, HonAPI, diagnose, printer
|
from pyhon import Hon, HonAPI, diagnose, printer
|
||||||
|
|
||||||
_LOGGER = logging.getLogger(__name__)
|
_LOGGER = logging.getLogger(__name__)
|
||||||
@ -31,13 +30,13 @@ def get_arguments() -> Dict[str, Any]:
|
|||||||
export.add_argument("--zip", help="create zip archive", action="store_true")
|
export.add_argument("--zip", help="create zip archive", action="store_true")
|
||||||
export.add_argument("--anonymous", help="anonymize data", action="store_true")
|
export.add_argument("--anonymous", help="anonymize data", action="store_true")
|
||||||
export.add_argument("directory", nargs="?", default=Path().cwd())
|
export.add_argument("directory", nargs="?", default=Path().cwd())
|
||||||
translation = subparser.add_parser(
|
translate = subparser.add_parser(
|
||||||
"translate", help="print available translation keys"
|
"translate", help="print available translation keys"
|
||||||
)
|
)
|
||||||
translation.add_argument(
|
translate.add_argument(
|
||||||
"translate", help="language (de, en, fr...)", metavar="LANGUAGE"
|
"translate", help="language (de, en, fr...)", metavar="LANGUAGE"
|
||||||
)
|
)
|
||||||
translation.add_argument("--json", help="print as json", action="store_true")
|
translate.add_argument("--json", help="print as json", action="store_true")
|
||||||
parser.add_argument(
|
parser.add_argument(
|
||||||
"-i", "--import", help="import pyhon data", nargs="?", default=Path().cwd()
|
"-i", "--import", help="import pyhon data", nargs="?", default=Path().cwd()
|
||||||
)
|
)
|
||||||
@ -92,13 +91,15 @@ async def main() -> None:
|
|||||||
data = device.data.copy()
|
data = device.data.copy()
|
||||||
attr = "get" if args.get("all") else "pop"
|
attr = "get" if args.get("all") else "pop"
|
||||||
print(
|
print(
|
||||||
printer.key_print(getattr(data["attributes"], attr)("parameters"))
|
printer.key_print(
|
||||||
|
data["attributes"].__getattribute__(attr)("parameters")
|
||||||
|
)
|
||||||
)
|
)
|
||||||
print(printer.key_print(getattr(data, attr)("appliance")))
|
print(printer.key_print(data.__getattribute__(attr)("appliance")))
|
||||||
print(printer.key_print(data))
|
print(printer.key_print(data))
|
||||||
print(
|
print(
|
||||||
printer.pretty_print(
|
printer.pretty_print(
|
||||||
printer.create_commands(device.commands, concat=True)
|
printer.create_command(device.commands, concat=True)
|
||||||
)
|
)
|
||||||
)
|
)
|
||||||
else:
|
else:
|
||||||
|
@ -3,15 +3,13 @@ import logging
|
|||||||
import re
|
import re
|
||||||
from datetime import datetime, timedelta
|
from datetime import datetime, timedelta
|
||||||
from pathlib import Path
|
from pathlib import Path
|
||||||
from typing import Optional, Dict, Any, TYPE_CHECKING, List, TypeVar, overload
|
from typing import Optional, Dict, Any, TYPE_CHECKING, List
|
||||||
|
|
||||||
from pyhon import diagnose, exceptions
|
from pyhon import diagnose, exceptions
|
||||||
from pyhon.appliances.base import ApplianceBase
|
|
||||||
from pyhon.attributes import HonAttribute
|
from pyhon.attributes import HonAttribute
|
||||||
from pyhon.command_loader import HonCommandLoader
|
from pyhon.command_loader import HonCommandLoader
|
||||||
from pyhon.commands import HonCommand
|
from pyhon.commands import HonCommand
|
||||||
from pyhon.parameter.base import HonParameter
|
from pyhon.parameter.base import HonParameter
|
||||||
from pyhon.parameter.enum import HonParameterEnum
|
|
||||||
from pyhon.parameter.range import HonParameterRange
|
from pyhon.parameter.range import HonParameterRange
|
||||||
from pyhon.typedefs import Parameter
|
from pyhon.typedefs import Parameter
|
||||||
|
|
||||||
@ -20,10 +18,7 @@ if TYPE_CHECKING:
|
|||||||
|
|
||||||
_LOGGER = logging.getLogger(__name__)
|
_LOGGER = logging.getLogger(__name__)
|
||||||
|
|
||||||
T = TypeVar("T")
|
|
||||||
|
|
||||||
|
|
||||||
# pylint: disable=too-many-public-methods,too-many-instance-attributes
|
|
||||||
class HonAppliance:
|
class HonAppliance:
|
||||||
_MINIMAL_UPDATE_INTERVAL = 5 # seconds
|
_MINIMAL_UPDATE_INTERVAL = 5 # seconds
|
||||||
|
|
||||||
@ -45,41 +40,30 @@ class HonAppliance:
|
|||||||
self._default_setting = HonParameter("", {}, "")
|
self._default_setting = HonParameter("", {}, "")
|
||||||
|
|
||||||
try:
|
try:
|
||||||
self._extra: Optional[ApplianceBase] = importlib.import_module(
|
self._extra = importlib.import_module(
|
||||||
f"pyhon.appliances.{self.appliance_type.lower()}"
|
f"pyhon.appliances.{self.appliance_type.lower()}"
|
||||||
).Appliance(self)
|
).Appliance(self)
|
||||||
except ModuleNotFoundError:
|
except ModuleNotFoundError:
|
||||||
self._extra = None
|
self._extra = None
|
||||||
|
|
||||||
def _get_nested_item(self, item: str) -> Any:
|
|
||||||
result: List[Any] | Dict[str, Any] = self.data
|
|
||||||
for key in item.split("."):
|
|
||||||
if all(k in "0123456789" for k in key) and isinstance(result, list):
|
|
||||||
result = result[int(key)]
|
|
||||||
elif isinstance(result, dict):
|
|
||||||
result = result[key]
|
|
||||||
return result
|
|
||||||
|
|
||||||
def __getitem__(self, item: str) -> Any:
|
def __getitem__(self, item: str) -> Any:
|
||||||
if self._zone:
|
if self._zone:
|
||||||
item += f"Z{self._zone}"
|
item += f"Z{self._zone}"
|
||||||
if "." in item:
|
if "." in item:
|
||||||
return self._get_nested_item(item)
|
result = self.data
|
||||||
|
for key in item.split("."):
|
||||||
|
if all(k in "0123456789" for k in key) and isinstance(result, list):
|
||||||
|
result = result[int(key)]
|
||||||
|
else:
|
||||||
|
result = result[key]
|
||||||
|
return result
|
||||||
if item in self.data:
|
if item in self.data:
|
||||||
return self.data[item]
|
return self.data[item]
|
||||||
if item in self.attributes["parameters"]:
|
if item in self.attributes["parameters"]:
|
||||||
return self.attributes["parameters"][item].value
|
return self.attributes["parameters"][item].value
|
||||||
return self.info[item]
|
return self.info[item]
|
||||||
|
|
||||||
@overload
|
def get(self, item: str, default: Any = None) -> Any:
|
||||||
def get(self, item: str, default: None = None) -> Any:
|
|
||||||
...
|
|
||||||
|
|
||||||
@overload
|
|
||||||
def get(self, item: str, default: T) -> T:
|
|
||||||
...
|
|
||||||
|
|
||||||
def get(self, item: str, default: Optional[T] = None) -> Any:
|
|
||||||
try:
|
try:
|
||||||
return self[item]
|
return self[item]
|
||||||
except (KeyError, IndexError):
|
except (KeyError, IndexError):
|
||||||
@ -87,22 +71,21 @@ class HonAppliance:
|
|||||||
|
|
||||||
def _check_name_zone(self, name: str, frontend: bool = True) -> str:
|
def _check_name_zone(self, name: str, frontend: bool = True) -> str:
|
||||||
zone = " Z" if frontend else "_z"
|
zone = " Z" if frontend else "_z"
|
||||||
attribute: str = self._info.get(name, "")
|
if (attribute := self._info.get(name, "")) and self._zone:
|
||||||
if attribute and self._zone:
|
|
||||||
return f"{attribute}{zone}{self._zone}"
|
return f"{attribute}{zone}{self._zone}"
|
||||||
return attribute
|
return attribute
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def appliance_model_id(self) -> str:
|
def appliance_model_id(self) -> str:
|
||||||
return str(self._info.get("applianceModelId", ""))
|
return self._info.get("applianceModelId", "")
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def appliance_type(self) -> str:
|
def appliance_type(self) -> str:
|
||||||
return str(self._info.get("applianceTypeName", ""))
|
return self._info.get("applianceTypeName", "")
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def mac_address(self) -> str:
|
def mac_address(self) -> str:
|
||||||
return str(self.info.get("macAddress", ""))
|
return self.info.get("macAddress", "")
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def unique_id(self) -> str:
|
def unique_id(self) -> str:
|
||||||
@ -118,31 +101,29 @@ class HonAppliance:
|
|||||||
|
|
||||||
@property
|
@property
|
||||||
def brand(self) -> str:
|
def brand(self) -> str:
|
||||||
brand = self._check_name_zone("brand")
|
return self._check_name_zone("brand")
|
||||||
return brand[0].upper() + brand[1:]
|
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def nick_name(self) -> str:
|
def nick_name(self) -> str:
|
||||||
result = self._check_name_zone("nickName")
|
result = self._check_name_zone("nickName")
|
||||||
if not result or re.findall("^[xX1\\s-]+$", result):
|
if not result or re.findall("^[xX\s]+$", result):
|
||||||
return self.model_name
|
return self.model_name
|
||||||
return result
|
return result
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def code(self) -> str:
|
def code(self) -> str:
|
||||||
code: str = self.info.get("code", "")
|
if code := self.info.get("code"):
|
||||||
if code:
|
|
||||||
return code
|
return code
|
||||||
serial_number: str = self.info.get("serialNumber", "")
|
serial_number = self.info.get("serialNumber", "")
|
||||||
return serial_number[:8] if len(serial_number) < 18 else serial_number[:11]
|
return serial_number[:8] if len(serial_number) < 18 else serial_number[:11]
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def model_id(self) -> int:
|
def model_id(self) -> int:
|
||||||
return int(self._info.get("applianceModelId", 0))
|
return self._info.get("applianceModelId", 0)
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def options(self) -> Dict[str, Any]:
|
def options(self) -> Dict[str, Any]:
|
||||||
return dict(self._appliance_model.get("options", {}))
|
return self._appliance_model.get("options", {})
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def commands(self) -> Dict[str, HonCommand]:
|
def commands(self) -> Dict[str, HonCommand]:
|
||||||
@ -181,18 +162,16 @@ class HonAppliance:
|
|||||||
self._commands = command_loader.commands
|
self._commands = command_loader.commands
|
||||||
self._additional_data = command_loader.additional_data
|
self._additional_data = command_loader.additional_data
|
||||||
self._appliance_model = command_loader.appliance_data
|
self._appliance_model = command_loader.appliance_data
|
||||||
self.sync_params_to_command("settings")
|
|
||||||
|
|
||||||
async def load_attributes(self) -> None:
|
async def load_attributes(self) -> None:
|
||||||
attributes = await self.api.load_attributes(self)
|
self._attributes = await self.api.load_attributes(self)
|
||||||
for name, values in attributes.pop("shadow", {}).get("parameters", {}).items():
|
for name, values in self._attributes.pop("shadow").get("parameters").items():
|
||||||
if name in self._attributes.get("parameters", {}):
|
if name in self._attributes.get("parameters", {}):
|
||||||
self._attributes["parameters"][name].update(values)
|
self._attributes["parameters"][name].update(values)
|
||||||
else:
|
else:
|
||||||
self._attributes.setdefault("parameters", {})[name] = HonAttribute(
|
self._attributes.setdefault("parameters", {})[name] = HonAttribute(
|
||||||
values
|
values
|
||||||
)
|
)
|
||||||
self._attributes |= attributes
|
|
||||||
if self._extra:
|
if self._extra:
|
||||||
self._attributes = self._extra.attributes(self._attributes)
|
self._attributes = self._extra.attributes(self._attributes)
|
||||||
|
|
||||||
@ -202,11 +181,14 @@ class HonAppliance:
|
|||||||
|
|
||||||
async def update(self, force: bool = False) -> None:
|
async def update(self, force: bool = False) -> None:
|
||||||
now = datetime.now()
|
now = datetime.now()
|
||||||
min_age = now - timedelta(seconds=self._MINIMAL_UPDATE_INTERVAL)
|
if (
|
||||||
if force or not self._last_update or self._last_update < min_age:
|
force
|
||||||
|
or not self._last_update
|
||||||
|
or self._last_update
|
||||||
|
< now - timedelta(seconds=self._MINIMAL_UPDATE_INTERVAL)
|
||||||
|
):
|
||||||
self._last_update = now
|
self._last_update = now
|
||||||
await self.load_attributes()
|
await self.load_attributes()
|
||||||
self.sync_params_to_command("settings")
|
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def command_parameters(self) -> Dict[str, Dict[str, str | float]]:
|
def command_parameters(self) -> Dict[str, Dict[str, str | float]]:
|
||||||
@ -214,7 +196,7 @@ class HonAppliance:
|
|||||||
|
|
||||||
@property
|
@property
|
||||||
def settings(self) -> Dict[str, Parameter]:
|
def settings(self) -> Dict[str, Parameter]:
|
||||||
result: Dict[str, Parameter] = {}
|
result = {}
|
||||||
for name, command in self._commands.items():
|
for name, command in self._commands.items():
|
||||||
for key in command.setting_keys:
|
for key in command.setting_keys:
|
||||||
setting = command.settings.get(key, self._default_setting)
|
setting = command.settings.get(key, self._default_setting)
|
||||||
@ -250,67 +232,32 @@ class HonAppliance:
|
|||||||
async def data_archive(self, path: Path) -> str:
|
async def data_archive(self, path: Path) -> str:
|
||||||
return await diagnose.zip_archive(self, path, anonymous=True)
|
return await diagnose.zip_archive(self, path, anonymous=True)
|
||||||
|
|
||||||
def sync_command_to_params(self, command_name: str) -> None:
|
def sync_to_params(self, command_name: str) -> None:
|
||||||
if not (command := self.commands.get(command_name)):
|
if not (command := self.commands.get(command_name)):
|
||||||
return
|
return
|
||||||
for key in self.attributes.get("parameters", {}):
|
for key, value in self.attributes.get("parameters", {}).items():
|
||||||
if new := command.parameters.get(key):
|
if isinstance(value, str) and (new := command.parameters.get(key)):
|
||||||
self.attributes["parameters"][key].update(
|
self.attributes["parameters"][key].update(
|
||||||
str(new.intern_value), shield=True
|
str(new.intern_value), shield=True
|
||||||
)
|
)
|
||||||
|
|
||||||
def sync_params_to_command(self, command_name: str) -> None:
|
def sync_command(self, main: str, target: Optional[List[str]] = None) -> None:
|
||||||
if not (command := self.commands.get(command_name)):
|
|
||||||
return
|
|
||||||
for key in command.setting_keys:
|
|
||||||
if (
|
|
||||||
new := self.attributes.get("parameters", {}).get(key)
|
|
||||||
) is None or new.value == "":
|
|
||||||
continue
|
|
||||||
setting = command.settings[key]
|
|
||||||
try:
|
|
||||||
if not isinstance(setting, HonParameterRange):
|
|
||||||
command.settings[key].value = str(new.value)
|
|
||||||
else:
|
|
||||||
command.settings[key].value = float(new.value)
|
|
||||||
except ValueError as error:
|
|
||||||
_LOGGER.info("Can't set %s - %s", key, error)
|
|
||||||
continue
|
|
||||||
|
|
||||||
def sync_command(
|
|
||||||
self,
|
|
||||||
main: str,
|
|
||||||
target: Optional[List[str] | str] = None,
|
|
||||||
to_sync: Optional[List[str] | bool] = None,
|
|
||||||
) -> None:
|
|
||||||
base: Optional[HonCommand] = self.commands.get(main)
|
base: Optional[HonCommand] = self.commands.get(main)
|
||||||
if not base:
|
if not base:
|
||||||
return
|
return
|
||||||
for command, data in self.commands.items():
|
for command, data in self.commands.items():
|
||||||
if command == main or target and command not in target:
|
if command == main or target and command not in target:
|
||||||
continue
|
continue
|
||||||
|
for name, parameter in data.parameters.items():
|
||||||
for name, target_param in data.parameters.items():
|
if base_value := base.parameters.get(name):
|
||||||
if not (base_param := base.parameters.get(name)):
|
if isinstance(base_value, HonParameterRange) and isinstance(
|
||||||
continue
|
parameter, HonParameterRange
|
||||||
if to_sync and (
|
):
|
||||||
(isinstance(to_sync, list) and name not in to_sync)
|
parameter.max = base_value.max
|
||||||
or not base_param.mandatory
|
parameter.min = base_value.min
|
||||||
):
|
parameter.step = base_value.step
|
||||||
continue
|
elif isinstance(parameter, HonParameterRange):
|
||||||
self.sync_parameter(base_param, target_param)
|
parameter.max = int(base_value.value)
|
||||||
|
parameter.min = int(base_value.value)
|
||||||
def sync_parameter(self, main: Parameter, target: Parameter) -> None:
|
parameter.step = 1
|
||||||
if isinstance(main, HonParameterRange) and isinstance(
|
parameter.value = base_value.value
|
||||||
target, HonParameterRange
|
|
||||||
):
|
|
||||||
target.max = main.max
|
|
||||||
target.min = main.min
|
|
||||||
target.step = main.step
|
|
||||||
elif isinstance(target, HonParameterRange):
|
|
||||||
target.max = int(main.value)
|
|
||||||
target.min = int(main.value)
|
|
||||||
target.step = 1
|
|
||||||
elif isinstance(target, HonParameterEnum):
|
|
||||||
target.values = main.values
|
|
||||||
target.value = main.value
|
|
||||||
|
@ -1,4 +1,3 @@
|
|||||||
# pylint: disable=duplicate-code
|
|
||||||
from typing import Any, Dict
|
from typing import Any, Dict
|
||||||
|
|
||||||
from pyhon.appliances.base import ApplianceBase
|
from pyhon.appliances.base import ApplianceBase
|
||||||
|
@ -1,16 +1,24 @@
|
|||||||
from typing import Any, Dict
|
from typing import Any, Dict
|
||||||
|
|
||||||
from pyhon.appliances.base import ApplianceBase
|
from pyhon.appliances.base import ApplianceBase
|
||||||
|
from pyhon.parameter.program import HonParameterProgram
|
||||||
|
|
||||||
|
|
||||||
class Appliance(ApplianceBase):
|
class Appliance(ApplianceBase):
|
||||||
def attributes(self, data: Dict[str, Any]) -> Dict[str, Any]:
|
def attributes(self, data: Dict[str, Any]) -> Dict[str, Any]:
|
||||||
data = super().attributes(data)
|
data = super().attributes(data)
|
||||||
if data.get("lastConnEvent", {}).get("category", "") == "DISCONNECTED":
|
if data.get("lastConnEvent", {}).get("category", "") == "DISCONNECTED":
|
||||||
data["parameters"]["temp"].value = 0
|
data["parameters"]["temp"].value = "0"
|
||||||
data["parameters"]["onOffStatus"].value = 0
|
data["parameters"]["onOffStatus"].value = "0"
|
||||||
data["parameters"]["remoteCtrValid"].value = 0
|
data["parameters"]["remoteCtrValid"].value = "0"
|
||||||
data["parameters"]["remainingTimeMM"].value = 0
|
data["parameters"]["remainingTimeMM"].value = "0"
|
||||||
|
|
||||||
|
data["active"] = data["parameters"]["onOffStatus"] == "1"
|
||||||
|
|
||||||
|
if program := int(data["parameters"]["prCode"]):
|
||||||
|
if (setting := self.parent.settings["startProgram.program"]) and isinstance(
|
||||||
|
setting, HonParameterProgram
|
||||||
|
):
|
||||||
|
data["programName"] = setting.ids.get(program, "")
|
||||||
|
|
||||||
data["active"] = data["parameters"]["onOffStatus"].value == 1
|
|
||||||
return data
|
return data
|
||||||
|
@ -10,12 +10,12 @@ class Appliance(ApplianceBase):
|
|||||||
data["modeZ1"] = "holiday"
|
data["modeZ1"] = "holiday"
|
||||||
elif data["parameters"]["intelligenceMode"] == "1":
|
elif data["parameters"]["intelligenceMode"] == "1":
|
||||||
data["modeZ1"] = "auto_set"
|
data["modeZ1"] = "auto_set"
|
||||||
elif data["parameters"].get("quickModeZ1") == "1":
|
elif data["parameters"]["quickModeZ1"] == "1":
|
||||||
data["modeZ1"] = "super_cool"
|
data["modeZ1"] = "super_cool"
|
||||||
else:
|
else:
|
||||||
data["modeZ1"] = "no_mode"
|
data["modeZ1"] = "no_mode"
|
||||||
|
|
||||||
if data["parameters"].get("quickModeZ2") == "1":
|
if data["parameters"]["quickModeZ2"] == "1":
|
||||||
data["modeZ2"] = "super_freeze"
|
data["modeZ2"] = "super_freeze"
|
||||||
elif data["parameters"]["intelligenceMode"] == "1":
|
elif data["parameters"]["intelligenceMode"] == "1":
|
||||||
data["modeZ2"] = "auto_set"
|
data["modeZ2"] = "auto_set"
|
||||||
|
@ -1,4 +1,3 @@
|
|||||||
# pylint: disable=duplicate-code
|
|
||||||
from typing import Any, Dict
|
from typing import Any, Dict
|
||||||
|
|
||||||
from pyhon.appliances.base import ApplianceBase
|
from pyhon.appliances.base import ApplianceBase
|
||||||
|
@ -1,4 +1,3 @@
|
|||||||
# pylint: disable=duplicate-code
|
|
||||||
from typing import Dict, Any
|
from typing import Dict, Any
|
||||||
|
|
||||||
from pyhon.appliances.base import ApplianceBase
|
from pyhon.appliances.base import ApplianceBase
|
||||||
|
@ -1,16 +0,0 @@
|
|||||||
from typing import Any, Dict
|
|
||||||
|
|
||||||
from pyhon.appliances.base import ApplianceBase
|
|
||||||
from pyhon.parameter.base import HonParameter
|
|
||||||
|
|
||||||
|
|
||||||
class Appliance(ApplianceBase):
|
|
||||||
def attributes(self, data: Dict[str, Any]) -> Dict[str, Any]:
|
|
||||||
data = super().attributes(data)
|
|
||||||
parameter = data.get("parameters", {}).get("onOffStatus")
|
|
||||||
is_class = isinstance(parameter, HonParameter)
|
|
||||||
data["active"] = parameter.value == 1 if is_class else parameter == 1
|
|
||||||
return data
|
|
||||||
|
|
||||||
def settings(self, settings: Dict[str, Any]) -> Dict[str, Any]:
|
|
||||||
return settings
|
|
@ -1,4 +1,3 @@
|
|||||||
# pylint: disable=duplicate-code
|
|
||||||
from typing import Any, Dict
|
from typing import Any, Dict
|
||||||
|
|
||||||
from pyhon.appliances.base import ApplianceBase
|
from pyhon.appliances.base import ApplianceBase
|
||||||
|
@ -1,15 +1,14 @@
|
|||||||
import asyncio
|
import asyncio
|
||||||
from contextlib import suppress
|
from contextlib import suppress
|
||||||
from copy import copy
|
from copy import copy
|
||||||
from typing import Dict, Any, Optional, TYPE_CHECKING, List
|
from typing import Dict, Any, Optional, TYPE_CHECKING, List, Collection
|
||||||
|
|
||||||
from pyhon.commands import HonCommand
|
from pyhon.commands import HonCommand
|
||||||
from pyhon.exceptions import NoAuthenticationException
|
|
||||||
from pyhon.parameter.fixed import HonParameterFixed
|
from pyhon.parameter.fixed import HonParameterFixed
|
||||||
from pyhon.parameter.program import HonParameterProgram
|
from pyhon.parameter.program import HonParameterProgram
|
||||||
|
|
||||||
if TYPE_CHECKING:
|
if TYPE_CHECKING:
|
||||||
from pyhon import HonAPI
|
from pyhon import HonAPI, exceptions
|
||||||
from pyhon.appliance import HonAppliance
|
from pyhon.appliance import HonAppliance
|
||||||
|
|
||||||
|
|
||||||
@ -17,12 +16,12 @@ class HonCommandLoader:
|
|||||||
"""Loads and parses hOn command data"""
|
"""Loads and parses hOn command data"""
|
||||||
|
|
||||||
def __init__(self, api: "HonAPI", appliance: "HonAppliance") -> None:
|
def __init__(self, api: "HonAPI", appliance: "HonAppliance") -> None:
|
||||||
self._api: "HonAPI" = api
|
|
||||||
self._appliance: "HonAppliance" = appliance
|
|
||||||
self._api_commands: Dict[str, Any] = {}
|
self._api_commands: Dict[str, Any] = {}
|
||||||
self._favourites: List[Dict[str, Any]] = []
|
self._favourites: List[Dict[str, Any]] = []
|
||||||
self._command_history: List[Dict[str, Any]] = []
|
self._command_history: List[Dict[str, Any]] = []
|
||||||
self._commands: Dict[str, HonCommand] = {}
|
self._commands: Dict[str, HonCommand] = {}
|
||||||
|
self._api: "HonAPI" = api
|
||||||
|
self._appliance: "HonAppliance" = appliance
|
||||||
self._appliance_data: Dict[str, Any] = {}
|
self._appliance_data: Dict[str, Any] = {}
|
||||||
self._additional_data: Dict[str, Any] = {}
|
self._additional_data: Dict[str, Any] = {}
|
||||||
|
|
||||||
@ -30,7 +29,7 @@ class HonCommandLoader:
|
|||||||
def api(self) -> "HonAPI":
|
def api(self) -> "HonAPI":
|
||||||
"""api connection object"""
|
"""api connection object"""
|
||||||
if self._api is None:
|
if self._api is None:
|
||||||
raise NoAuthenticationException("Missing hOn login")
|
raise exceptions.NoAuthenticationException("Missing hOn login")
|
||||||
return self._api
|
return self._api
|
||||||
|
|
||||||
@property
|
@property
|
||||||
@ -56,7 +55,7 @@ class HonCommandLoader:
|
|||||||
async def load_commands(self) -> None:
|
async def load_commands(self) -> None:
|
||||||
"""Trigger loading of command data"""
|
"""Trigger loading of command data"""
|
||||||
await self._load_data()
|
await self._load_data()
|
||||||
self._appliance_data = self._api_commands.pop("applianceModel", {})
|
self._appliance_data = self._api_commands.pop("applianceModel")
|
||||||
self._get_commands()
|
self._get_commands()
|
||||||
self._add_favourites()
|
self._add_favourites()
|
||||||
self._recover_last_command_states()
|
self._recover_last_command_states()
|
||||||
@ -184,44 +183,22 @@ class HonCommandLoader:
|
|||||||
def _add_favourites(self) -> None:
|
def _add_favourites(self) -> None:
|
||||||
"""Patch program categories with favourites"""
|
"""Patch program categories with favourites"""
|
||||||
for favourite in self._favourites:
|
for favourite in self._favourites:
|
||||||
name, command_name, base = self._get_favourite_info(favourite)
|
name = favourite.get("favouriteName", {})
|
||||||
if not base:
|
command = favourite.get("command", {})
|
||||||
continue
|
command_name = command.get("commandName", "")
|
||||||
base_command: HonCommand = copy(base)
|
program_name = self._clean_name(command.get("programName", ""))
|
||||||
self._update_base_command_with_data(base_command, favourite)
|
base: HonCommand = copy(
|
||||||
self._update_base_command_with_favourite(base_command)
|
self.commands[command_name].categories[program_name]
|
||||||
self._update_program_categories(command_name, name, base_command)
|
)
|
||||||
|
for data in command.values():
|
||||||
def _get_favourite_info(
|
if isinstance(data, str):
|
||||||
self, favourite: Dict[str, Any]
|
|
||||||
) -> tuple[str, str, HonCommand | None]:
|
|
||||||
name: str = favourite.get("favouriteName", {})
|
|
||||||
command = favourite.get("command", {})
|
|
||||||
command_name: str = command.get("commandName", "")
|
|
||||||
program_name = self._clean_name(command.get("programName", ""))
|
|
||||||
base_command = self.commands[command_name].categories.get(program_name)
|
|
||||||
return name, command_name, base_command
|
|
||||||
|
|
||||||
def _update_base_command_with_data(
|
|
||||||
self, base_command: HonCommand, command: Dict[str, Any]
|
|
||||||
) -> None:
|
|
||||||
for data in command.values():
|
|
||||||
if isinstance(data, str):
|
|
||||||
continue
|
|
||||||
for key, value in data.items():
|
|
||||||
if not (parameter := base_command.parameters.get(key)):
|
|
||||||
continue
|
continue
|
||||||
with suppress(ValueError):
|
for key, value in data.items():
|
||||||
parameter.value = value
|
if parameter := base.parameters.get(key):
|
||||||
|
with suppress(ValueError):
|
||||||
def _update_base_command_with_favourite(self, base_command: HonCommand) -> None:
|
parameter.value = value
|
||||||
extra_param = HonParameterFixed("favourite", {"fixedValue": "1"}, "custom")
|
extra_param = HonParameterFixed("favourite", {"fixedValue": "1"}, "custom")
|
||||||
base_command.parameters.update(favourite=extra_param)
|
base.parameters.update(favourite=extra_param)
|
||||||
|
if isinstance(program := base.parameters["program"], HonParameterProgram):
|
||||||
def _update_program_categories(
|
program.set_value(name)
|
||||||
self, command_name: str, name: str, base_command: HonCommand
|
self.commands[command_name].categories[name] = base
|
||||||
) -> None:
|
|
||||||
program = base_command.parameters["program"]
|
|
||||||
if isinstance(program, HonParameterProgram):
|
|
||||||
program.set_value(name)
|
|
||||||
self.commands[command_name].categories[name] = base_command
|
|
||||||
|
@ -27,16 +27,17 @@ class HonCommand:
|
|||||||
categories: Optional[Dict[str, "HonCommand"]] = None,
|
categories: Optional[Dict[str, "HonCommand"]] = None,
|
||||||
category_name: str = "",
|
category_name: str = "",
|
||||||
):
|
):
|
||||||
self._name: str = name
|
self._api: Optional[HonAPI] = appliance.api
|
||||||
self._api: Optional[HonAPI] = None
|
|
||||||
self._appliance: "HonAppliance" = appliance
|
self._appliance: "HonAppliance" = appliance
|
||||||
|
self._name: str = name
|
||||||
self._categories: Optional[Dict[str, "HonCommand"]] = categories
|
self._categories: Optional[Dict[str, "HonCommand"]] = categories
|
||||||
self._category_name: str = category_name
|
self._category_name: str = category_name
|
||||||
self._parameters: Dict[str, Parameter] = {}
|
self._description: str = attributes.pop("description", "")
|
||||||
|
self._protocol_type: str = attributes.pop("protocolType", "")
|
||||||
|
self._parameters: Dict[str, HonParameter] = {}
|
||||||
self._data: Dict[str, Any] = {}
|
self._data: Dict[str, Any] = {}
|
||||||
|
self._available_settings: Dict[str, HonParameter] = {}
|
||||||
self._rules: List[HonRuleSet] = []
|
self._rules: List[HonRuleSet] = []
|
||||||
attributes.pop("description", "")
|
|
||||||
attributes.pop("protocolType", "")
|
|
||||||
self._load_parameters(attributes)
|
self._load_parameters(attributes)
|
||||||
|
|
||||||
def __repr__(self) -> str:
|
def __repr__(self) -> str:
|
||||||
@ -48,8 +49,6 @@ class HonCommand:
|
|||||||
|
|
||||||
@property
|
@property
|
||||||
def api(self) -> "HonAPI":
|
def api(self) -> "HonAPI":
|
||||||
if self._api is None and self._appliance:
|
|
||||||
self._api = self._appliance.api
|
|
||||||
if self._api is None:
|
if self._api is None:
|
||||||
raise exceptions.NoAuthenticationException("Missing hOn login")
|
raise exceptions.NoAuthenticationException("Missing hOn login")
|
||||||
return self._api
|
return self._api
|
||||||
@ -77,23 +76,12 @@ class HonCommand:
|
|||||||
result.setdefault(parameter.group, {})[name] = parameter.intern_value
|
result.setdefault(parameter.group, {})[name] = parameter.intern_value
|
||||||
return result
|
return result
|
||||||
|
|
||||||
@property
|
|
||||||
def mandatory_parameter_groups(self) -> Dict[str, Dict[str, Union[str, float]]]:
|
|
||||||
result: Dict[str, Dict[str, Union[str, float]]] = {}
|
|
||||||
for name, parameter in self._parameters.items():
|
|
||||||
if parameter.mandatory:
|
|
||||||
result.setdefault(parameter.group, {})[name] = parameter.intern_value
|
|
||||||
return result
|
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def parameter_value(self) -> Dict[str, Union[str, float]]:
|
def parameter_value(self) -> Dict[str, Union[str, float]]:
|
||||||
return {n: p.value for n, p in self._parameters.items()}
|
return {n: p.value for n, p in self._parameters.items()}
|
||||||
|
|
||||||
def _load_parameters(self, attributes: Dict[str, Dict[str, Any] | Any]) -> None:
|
def _load_parameters(self, attributes: Dict[str, Dict[str, Any]]) -> None:
|
||||||
for key, items in attributes.items():
|
for key, items in attributes.items():
|
||||||
if not isinstance(items, dict):
|
|
||||||
_LOGGER.info("Loading Attributes - Skipping %s", str(items))
|
|
||||||
continue
|
|
||||||
for name, data in items.items():
|
for name, data in items.items():
|
||||||
self._create_parameters(data, name, key)
|
self._create_parameters(data, name, key)
|
||||||
for rule in self._rules:
|
for rule in self._rules:
|
||||||
@ -105,12 +93,10 @@ class HonCommand:
|
|||||||
if name == "zoneMap" and self._appliance.zone:
|
if name == "zoneMap" and self._appliance.zone:
|
||||||
data["default"] = self._appliance.zone
|
data["default"] = self._appliance.zone
|
||||||
if data.get("category") == "rule":
|
if data.get("category") == "rule":
|
||||||
if "fixedValue" in data:
|
if "fixedValue" not in data:
|
||||||
self._rules.append(HonRuleSet(self, data["fixedValue"]))
|
_LOGGER.error("Rule not supported: %s", data)
|
||||||
elif "enumValues" in data:
|
|
||||||
self._rules.append(HonRuleSet(self, data["enumValues"]))
|
|
||||||
else:
|
else:
|
||||||
_LOGGER.warning("Rule not supported: %s", data)
|
self._rules.append(HonRuleSet(self, data["fixedValue"]))
|
||||||
match data.get("typology"):
|
match data.get("typology"):
|
||||||
case "range":
|
case "range":
|
||||||
self._parameters[name] = HonParameterRange(name, data, parameter)
|
self._parameters[name] = HonParameterRange(name, data, parameter)
|
||||||
@ -125,33 +111,14 @@ class HonCommand:
|
|||||||
name = "program" if "PROGRAM" in self._category_name else "category"
|
name = "program" if "PROGRAM" in self._category_name else "category"
|
||||||
self._parameters[name] = HonParameterProgram(name, self, "custom")
|
self._parameters[name] = HonParameterProgram(name, self, "custom")
|
||||||
|
|
||||||
async def send(self, only_mandatory: bool = False) -> bool:
|
async def send(self) -> bool:
|
||||||
grouped_params = (
|
params = self.parameter_groups.get("parameters", {})
|
||||||
self.mandatory_parameter_groups if only_mandatory else self.parameter_groups
|
|
||||||
)
|
|
||||||
params = grouped_params.get("parameters", {})
|
|
||||||
return await self.send_parameters(params)
|
|
||||||
|
|
||||||
async def send_specific(self, param_names: List[str]) -> bool:
|
|
||||||
params: Dict[str, str | float] = {}
|
|
||||||
for key, parameter in self._parameters.items():
|
|
||||||
if key in param_names or parameter.mandatory:
|
|
||||||
params[key] = parameter.value
|
|
||||||
return await self.send_parameters(params)
|
|
||||||
|
|
||||||
async def send_parameters(self, params: Dict[str, str | float]) -> bool:
|
|
||||||
ancillary_params = self.parameter_groups.get("ancillaryParameters", {})
|
ancillary_params = self.parameter_groups.get("ancillaryParameters", {})
|
||||||
ancillary_params.pop("programRules", None)
|
ancillary_params.pop("programRules", None)
|
||||||
if "prStr" in params:
|
self.appliance.sync_to_params(self.name)
|
||||||
params["prStr"] = self._category_name.upper()
|
|
||||||
self.appliance.sync_command_to_params(self.name)
|
|
||||||
try:
|
try:
|
||||||
result = await self.api.send_command(
|
result = await self.api.send_command(
|
||||||
self._appliance,
|
self._appliance, self._name, params, ancillary_params
|
||||||
self._name,
|
|
||||||
params,
|
|
||||||
ancillary_params,
|
|
||||||
self._category_name,
|
|
||||||
)
|
)
|
||||||
if not result:
|
if not result:
|
||||||
_LOGGER.error(result)
|
_LOGGER.error(result)
|
||||||
@ -202,7 +169,3 @@ class HonCommand:
|
|||||||
else:
|
else:
|
||||||
result[name] = parameter
|
result[name] = parameter
|
||||||
return result
|
return result
|
||||||
|
|
||||||
def reset(self) -> None:
|
|
||||||
for parameter in self._parameters.values():
|
|
||||||
parameter.reset()
|
|
||||||
|
@ -75,12 +75,8 @@ class HonAPI:
|
|||||||
|
|
||||||
async def load_appliances(self) -> List[Dict[str, Any]]:
|
async def load_appliances(self) -> List[Dict[str, Any]]:
|
||||||
async with self._hon.get(f"{const.API_URL}/commands/v1/appliance") as resp:
|
async with self._hon.get(f"{const.API_URL}/commands/v1/appliance") as resp:
|
||||||
result = await resp.json()
|
if result := await resp.json():
|
||||||
if result:
|
return result.get("payload", {}).get("appliances", {})
|
||||||
appliances: List[Dict[str, Any]] = result.get("payload", {}).get(
|
|
||||||
"appliances", {}
|
|
||||||
)
|
|
||||||
return appliances
|
|
||||||
return []
|
return []
|
||||||
|
|
||||||
async def load_commands(self, appliance: HonAppliance) -> Dict[str, Any]:
|
async def load_commands(self, appliance: HonAppliance) -> Dict[str, Any]:
|
||||||
@ -114,10 +110,9 @@ class HonAPI:
|
|||||||
)
|
)
|
||||||
async with self._hon.get(url) as response:
|
async with self._hon.get(url) as response:
|
||||||
result: Dict[str, Any] = await response.json()
|
result: Dict[str, Any] = await response.json()
|
||||||
if not result or not result.get("payload"):
|
if not result or not result.get("payload"):
|
||||||
return []
|
return []
|
||||||
command_history: List[Dict[str, Any]] = result["payload"]["history"]
|
return result["payload"]["history"]
|
||||||
return command_history
|
|
||||||
|
|
||||||
async def load_favourites(self, appliance: HonAppliance) -> List[Dict[str, Any]]:
|
async def load_favourites(self, appliance: HonAppliance) -> List[Dict[str, Any]]:
|
||||||
url: str = (
|
url: str = (
|
||||||
@ -125,20 +120,17 @@ class HonAPI:
|
|||||||
)
|
)
|
||||||
async with self._hon.get(url) as response:
|
async with self._hon.get(url) as response:
|
||||||
result: Dict[str, Any] = await response.json()
|
result: Dict[str, Any] = await response.json()
|
||||||
if not result or not result.get("payload"):
|
if not result or not result.get("payload"):
|
||||||
return []
|
return []
|
||||||
favourites: List[Dict[str, Any]] = result["payload"]["favourites"]
|
return result["payload"]["favourites"]
|
||||||
return favourites
|
|
||||||
|
|
||||||
async def load_last_activity(self, appliance: HonAppliance) -> Dict[str, Any]:
|
async def load_last_activity(self, appliance: HonAppliance) -> Dict[str, Any]:
|
||||||
url: str = f"{const.API_URL}/commands/v1/retrieve-last-activity"
|
url: str = f"{const.API_URL}/commands/v1/retrieve-last-activity"
|
||||||
params: Dict[str, str] = {"macAddress": appliance.mac_address}
|
params: Dict[str, str] = {"macAddress": appliance.mac_address}
|
||||||
async with self._hon.get(url, params=params) as response:
|
async with self._hon.get(url, params=params) as response:
|
||||||
result: Dict[str, Any] = await response.json()
|
result: Dict[str, Any] = await response.json()
|
||||||
if result:
|
if result and (activity := result.get("attributes")):
|
||||||
activity: Dict[str, Any] = result.get("attributes", "")
|
return activity
|
||||||
if activity:
|
|
||||||
return activity
|
|
||||||
return {}
|
return {}
|
||||||
|
|
||||||
async def load_appliance_data(self, appliance: HonAppliance) -> Dict[str, Any]:
|
async def load_appliance_data(self, appliance: HonAppliance) -> Dict[str, Any]:
|
||||||
@ -150,10 +142,7 @@ class HonAPI:
|
|||||||
async with self._hon.get(url, params=params) as response:
|
async with self._hon.get(url, params=params) as response:
|
||||||
result: Dict[str, Any] = await response.json()
|
result: Dict[str, Any] = await response.json()
|
||||||
if result:
|
if result:
|
||||||
appliance_data: Dict[str, Any] = result.get("payload", {}).get(
|
return result.get("payload", {}).get("applianceModel", {})
|
||||||
"applianceModel", {}
|
|
||||||
)
|
|
||||||
return appliance_data
|
|
||||||
return {}
|
return {}
|
||||||
|
|
||||||
async def load_attributes(self, appliance: HonAppliance) -> Dict[str, Any]:
|
async def load_attributes(self, appliance: HonAppliance) -> Dict[str, Any]:
|
||||||
@ -164,8 +153,7 @@ class HonAPI:
|
|||||||
}
|
}
|
||||||
url: str = f"{const.API_URL}/commands/v1/context"
|
url: str = f"{const.API_URL}/commands/v1/context"
|
||||||
async with self._hon.get(url, params=params) as response:
|
async with self._hon.get(url, params=params) as response:
|
||||||
attributes: Dict[str, Any] = (await response.json()).get("payload", {})
|
return (await response.json()).get("payload", {})
|
||||||
return attributes
|
|
||||||
|
|
||||||
async def load_statistics(self, appliance: HonAppliance) -> Dict[str, Any]:
|
async def load_statistics(self, appliance: HonAppliance) -> Dict[str, Any]:
|
||||||
params: Dict[str, str] = {
|
params: Dict[str, str] = {
|
||||||
@ -174,15 +162,13 @@ class HonAPI:
|
|||||||
}
|
}
|
||||||
url: str = f"{const.API_URL}/commands/v1/statistics"
|
url: str = f"{const.API_URL}/commands/v1/statistics"
|
||||||
async with self._hon.get(url, params=params) as response:
|
async with self._hon.get(url, params=params) as response:
|
||||||
statistics: Dict[str, Any] = (await response.json()).get("payload", {})
|
return (await response.json()).get("payload", {})
|
||||||
return statistics
|
|
||||||
|
|
||||||
async def load_maintenance(self, appliance: HonAppliance) -> Dict[str, Any]:
|
async def load_maintenance(self, appliance: HonAppliance) -> Dict[str, Any]:
|
||||||
url = f"{const.API_URL}/commands/v1/maintenance-cycle"
|
url = f"{const.API_URL}/commands/v1/maintenance-cycle"
|
||||||
params = {"macAddress": appliance.mac_address}
|
params = {"macAddress": appliance.mac_address}
|
||||||
async with self._hon.get(url, params=params) as response:
|
async with self._hon.get(url, params=params) as response:
|
||||||
maintenance: Dict[str, Any] = (await response.json()).get("payload", {})
|
return (await response.json()).get("payload", {})
|
||||||
return maintenance
|
|
||||||
|
|
||||||
async def send_command(
|
async def send_command(
|
||||||
self,
|
self,
|
||||||
@ -190,7 +176,6 @@ class HonAPI:
|
|||||||
command: str,
|
command: str,
|
||||||
parameters: Dict[str, Any],
|
parameters: Dict[str, Any],
|
||||||
ancillary_parameters: Dict[str, Any],
|
ancillary_parameters: Dict[str, Any],
|
||||||
program_name: str = "",
|
|
||||||
) -> bool:
|
) -> bool:
|
||||||
now: str = datetime.utcnow().isoformat()
|
now: str = datetime.utcnow().isoformat()
|
||||||
data: Dict[str, Any] = {
|
data: Dict[str, Any] = {
|
||||||
@ -209,8 +194,6 @@ class HonAPI:
|
|||||||
"parameters": parameters,
|
"parameters": parameters,
|
||||||
"applianceType": appliance.appliance_type,
|
"applianceType": appliance.appliance_type,
|
||||||
}
|
}
|
||||||
if command == "startProgram" and program_name:
|
|
||||||
data.update({"programName": program_name.upper()})
|
|
||||||
url: str = f"{const.API_URL}/commands/v1/send"
|
url: str = f"{const.API_URL}/commands/v1/send"
|
||||||
async with self._hon.post(url, json=data) as response:
|
async with self._hon.post(url, json=data) as response:
|
||||||
json_data: Dict[str, Any] = await response.json()
|
json_data: Dict[str, Any] = await response.json()
|
||||||
@ -224,8 +207,9 @@ class HonAPI:
|
|||||||
url: str = f"{const.API_URL}/config/v1/program-list-rules"
|
url: str = f"{const.API_URL}/config/v1/program-list-rules"
|
||||||
async with self._hon_anonymous.get(url) as response:
|
async with self._hon_anonymous.get(url) as response:
|
||||||
result: Dict[str, Any] = await response.json()
|
result: Dict[str, Any] = await response.json()
|
||||||
data: Dict[str, Any] = result.get("payload", {})
|
if result and (data := result.get("payload")):
|
||||||
return data
|
return data
|
||||||
|
return {}
|
||||||
|
|
||||||
async def app_config(
|
async def app_config(
|
||||||
self, language: str = "en", beta: bool = True
|
self, language: str = "en", beta: bool = True
|
||||||
@ -239,17 +223,17 @@ class HonAPI:
|
|||||||
}
|
}
|
||||||
payload: str = json.dumps(payload_data, separators=(",", ":"))
|
payload: str = json.dumps(payload_data, separators=(",", ":"))
|
||||||
async with self._hon_anonymous.post(url, data=payload) as response:
|
async with self._hon_anonymous.post(url, data=payload) as response:
|
||||||
result = await response.json()
|
if (result := await response.json()) and (data := result.get("payload")):
|
||||||
data: Dict[str, Any] = result.get("payload", {})
|
return data
|
||||||
return data
|
return {}
|
||||||
|
|
||||||
async def translation_keys(self, language: str = "en") -> Dict[str, Any]:
|
async def translation_keys(self, language: str = "en") -> Dict[str, Any]:
|
||||||
config = await self.app_config(language=language)
|
config = await self.app_config(language=language)
|
||||||
if not (url := config.get("language", {}).get("jsonPath")):
|
if url := config.get("language", {}).get("jsonPath"):
|
||||||
return {}
|
async with self._hon_anonymous.get(url) as response:
|
||||||
async with self._hon_anonymous.get(url) as response:
|
if result := await response.json():
|
||||||
result: Dict[str, Any] = await response.json()
|
return result
|
||||||
return result
|
return {}
|
||||||
|
|
||||||
async def close(self) -> None:
|
async def close(self) -> None:
|
||||||
if self._hon_handler is not None:
|
if self._hon_handler is not None:
|
||||||
@ -266,27 +250,17 @@ class TestAPI(HonAPI):
|
|||||||
|
|
||||||
def _load_json(self, appliance: HonAppliance, file: str) -> Dict[str, Any]:
|
def _load_json(self, appliance: HonAppliance, file: str) -> Dict[str, Any]:
|
||||||
directory = f"{appliance.appliance_type}_{appliance.appliance_model_id}".lower()
|
directory = f"{appliance.appliance_type}_{appliance.appliance_model_id}".lower()
|
||||||
if not (path := self._path / directory / f"{file}.json").exists():
|
path = f"{self._path}/{directory}/{file}.json"
|
||||||
_LOGGER.warning("Can't open %s", str(path))
|
|
||||||
return {}
|
|
||||||
with open(path, "r", encoding="utf-8") as json_file:
|
with open(path, "r", encoding="utf-8") as json_file:
|
||||||
text = json_file.read()
|
return json.loads(json_file.read())
|
||||||
try:
|
|
||||||
data: Dict[str, Any] = json.loads(text)
|
|
||||||
return data
|
|
||||||
except json.decoder.JSONDecodeError as error:
|
|
||||||
_LOGGER.error("%s - %s", str(path), error)
|
|
||||||
return {}
|
|
||||||
|
|
||||||
async def load_appliances(self) -> List[Dict[str, Any]]:
|
async def load_appliances(self) -> List[Dict[str, Any]]:
|
||||||
result = []
|
result = []
|
||||||
for appliance in self._path.glob("*/"):
|
for appliance in self._path.glob("*/"):
|
||||||
file = appliance / "appliance_data.json"
|
with open(
|
||||||
with open(file, "r", encoding="utf-8") as json_file:
|
appliance / "appliance_data.json", "r", encoding="utf-8"
|
||||||
try:
|
) as json_file:
|
||||||
result.append(json.loads(json_file.read()))
|
result.append(json.loads(json_file.read()))
|
||||||
except json.decoder.JSONDecodeError as error:
|
|
||||||
_LOGGER.error("%s - %s", str(file), error)
|
|
||||||
return result
|
return result
|
||||||
|
|
||||||
async def load_commands(self, appliance: HonAppliance) -> Dict[str, Any]:
|
async def load_commands(self, appliance: HonAppliance) -> Dict[str, Any]:
|
||||||
@ -322,12 +296,5 @@ class TestAPI(HonAPI):
|
|||||||
command: str,
|
command: str,
|
||||||
parameters: Dict[str, Any],
|
parameters: Dict[str, Any],
|
||||||
ancillary_parameters: Dict[str, Any],
|
ancillary_parameters: Dict[str, Any],
|
||||||
program_name: str = "",
|
|
||||||
) -> bool:
|
) -> bool:
|
||||||
_LOGGER.info(
|
|
||||||
"%s - %s - %s",
|
|
||||||
str(parameters),
|
|
||||||
str(ancillary_parameters),
|
|
||||||
str(program_name),
|
|
||||||
)
|
|
||||||
return True
|
return True
|
||||||
|
@ -6,7 +6,7 @@ import urllib
|
|||||||
from contextlib import suppress
|
from contextlib import suppress
|
||||||
from dataclasses import dataclass
|
from dataclasses import dataclass
|
||||||
from datetime import datetime, timedelta
|
from datetime import datetime, timedelta
|
||||||
from typing import Dict, Optional, Any, List
|
from typing import Dict, Optional, Any
|
||||||
from urllib import parse
|
from urllib import parse
|
||||||
from urllib.parse import quote
|
from urllib.parse import quote
|
||||||
|
|
||||||
@ -30,14 +30,6 @@ class HonLoginData:
|
|||||||
loaded: Optional[Dict[str, Any]] = None
|
loaded: Optional[Dict[str, Any]] = None
|
||||||
|
|
||||||
|
|
||||||
@dataclass
|
|
||||||
class HonAuthData:
|
|
||||||
access_token: str = ""
|
|
||||||
refresh_token: str = ""
|
|
||||||
cognito_token: str = ""
|
|
||||||
id_token: str = ""
|
|
||||||
|
|
||||||
|
|
||||||
class HonAuth:
|
class HonAuth:
|
||||||
_TOKEN_EXPIRES_AFTER_HOURS = 8
|
_TOKEN_EXPIRES_AFTER_HOURS = 8
|
||||||
_TOKEN_EXPIRE_WARNING_HOURS = 7
|
_TOKEN_EXPIRE_WARNING_HOURS = 7
|
||||||
@ -54,25 +46,28 @@ class HonAuth:
|
|||||||
self._login_data = HonLoginData()
|
self._login_data = HonLoginData()
|
||||||
self._login_data.email = email
|
self._login_data.email = email
|
||||||
self._login_data.password = password
|
self._login_data.password = password
|
||||||
|
self._access_token = ""
|
||||||
|
self._refresh_token = ""
|
||||||
|
self._cognito_token = ""
|
||||||
|
self._id_token = ""
|
||||||
self._device = device
|
self._device = device
|
||||||
self._expires: datetime = datetime.utcnow()
|
self._expires: datetime = datetime.utcnow()
|
||||||
self._auth = HonAuthData()
|
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def cognito_token(self) -> str:
|
def cognito_token(self) -> str:
|
||||||
return self._auth.cognito_token
|
return self._cognito_token
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def id_token(self) -> str:
|
def id_token(self) -> str:
|
||||||
return self._auth.id_token
|
return self._id_token
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def access_token(self) -> str:
|
def access_token(self) -> str:
|
||||||
return self._auth.access_token
|
return self._access_token
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def refresh_token(self) -> str:
|
def refresh_token(self) -> str:
|
||||||
return self._auth.refresh_token
|
return self._refresh_token
|
||||||
|
|
||||||
def _check_token_expiration(self, hours: int) -> bool:
|
def _check_token_expiration(self, hours: int) -> bool:
|
||||||
return datetime.utcnow() >= self._expires + timedelta(hours=hours)
|
return datetime.utcnow() >= self._expires + timedelta(hours=hours)
|
||||||
@ -120,8 +115,7 @@ class HonAuth:
|
|||||||
async with self._request.get(url) as response:
|
async with self._request.get(url) as response:
|
||||||
text = await response.text()
|
text = await response.text()
|
||||||
self._expires = datetime.utcnow()
|
self._expires = datetime.utcnow()
|
||||||
login_url: List[str] = re.findall("url = '(.+?)'", text)
|
if not (login_url := re.findall("url = '(.+?)'", text)):
|
||||||
if not login_url:
|
|
||||||
if "oauth/done#access_token=" in text:
|
if "oauth/done#access_token=" in text:
|
||||||
self._parse_token_data(text)
|
self._parse_token_data(text)
|
||||||
raise exceptions.HonNoAuthenticationNeeded()
|
raise exceptions.HonNoAuthenticationNeeded()
|
||||||
@ -190,19 +184,18 @@ class HonAuth:
|
|||||||
if response.status == 200:
|
if response.status == 200:
|
||||||
with suppress(json.JSONDecodeError, KeyError):
|
with suppress(json.JSONDecodeError, KeyError):
|
||||||
result = await response.json()
|
result = await response.json()
|
||||||
url: str = result["events"][0]["attributes"]["values"]["url"]
|
return result["events"][0]["attributes"]["values"]["url"]
|
||||||
return url
|
|
||||||
await self._error_logger(response)
|
await self._error_logger(response)
|
||||||
return ""
|
return ""
|
||||||
|
|
||||||
def _parse_token_data(self, text: str) -> bool:
|
def _parse_token_data(self, text: str) -> bool:
|
||||||
if access_token := re.findall("access_token=(.*?)&", text):
|
if access_token := re.findall("access_token=(.*?)&", text):
|
||||||
self._auth.access_token = access_token[0]
|
self._access_token = access_token[0]
|
||||||
if refresh_token := re.findall("refresh_token=(.*?)&", text):
|
if refresh_token := re.findall("refresh_token=(.*?)&", text):
|
||||||
self._auth.refresh_token = refresh_token[0]
|
self._refresh_token = refresh_token[0]
|
||||||
if id_token := re.findall("id_token=(.*?)&", text):
|
if id_token := re.findall("id_token=(.*?)&", text):
|
||||||
self._auth.id_token = id_token[0]
|
self._id_token = id_token[0]
|
||||||
return bool(access_token and refresh_token and id_token)
|
return True if access_token and refresh_token and id_token else False
|
||||||
|
|
||||||
async def _get_token(self, url: str) -> bool:
|
async def _get_token(self, url: str) -> bool:
|
||||||
async with self._request.get(url) as response:
|
async with self._request.get(url) as response:
|
||||||
@ -234,7 +227,7 @@ class HonAuth:
|
|||||||
return True
|
return True
|
||||||
|
|
||||||
async def _api_auth(self) -> bool:
|
async def _api_auth(self) -> bool:
|
||||||
post_headers = {"id-token": self._auth.id_token}
|
post_headers = {"id-token": self._id_token}
|
||||||
data = self._device.get()
|
data = self._device.get()
|
||||||
async with self._request.post(
|
async with self._request.post(
|
||||||
f"{const.API_URL}/auth/v1/login", headers=post_headers, json=data
|
f"{const.API_URL}/auth/v1/login", headers=post_headers, json=data
|
||||||
@ -244,8 +237,8 @@ class HonAuth:
|
|||||||
except json.JSONDecodeError:
|
except json.JSONDecodeError:
|
||||||
await self._error_logger(response)
|
await self._error_logger(response)
|
||||||
return False
|
return False
|
||||||
self._auth.cognito_token = json_data.get("cognitoUser", {}).get("Token", "")
|
self._cognito_token = json_data.get("cognitoUser", {}).get("Token", "")
|
||||||
if not self._auth.cognito_token:
|
if not self._cognito_token:
|
||||||
_LOGGER.error(json_data)
|
_LOGGER.error(json_data)
|
||||||
raise exceptions.HonAuthenticationError()
|
raise exceptions.HonAuthenticationError()
|
||||||
return True
|
return True
|
||||||
@ -267,7 +260,7 @@ class HonAuth:
|
|||||||
async def refresh(self) -> bool:
|
async def refresh(self) -> bool:
|
||||||
params = {
|
params = {
|
||||||
"client_id": const.CLIENT_ID,
|
"client_id": const.CLIENT_ID,
|
||||||
"refresh_token": self._auth.refresh_token,
|
"refresh_token": self._refresh_token,
|
||||||
"grant_type": "refresh_token",
|
"grant_type": "refresh_token",
|
||||||
}
|
}
|
||||||
async with self._request.post(
|
async with self._request.post(
|
||||||
@ -278,14 +271,14 @@ class HonAuth:
|
|||||||
return False
|
return False
|
||||||
data = await response.json()
|
data = await response.json()
|
||||||
self._expires = datetime.utcnow()
|
self._expires = datetime.utcnow()
|
||||||
self._auth.id_token = data["id_token"]
|
self._id_token = data["id_token"]
|
||||||
self._auth.access_token = data["access_token"]
|
self._access_token = data["access_token"]
|
||||||
return await self._api_auth()
|
return await self._api_auth()
|
||||||
|
|
||||||
def clear(self) -> None:
|
def clear(self) -> None:
|
||||||
self._session.cookie_jar.clear_domain(const.AUTH_API.split("/")[-2])
|
self._session.cookie_jar.clear_domain(const.AUTH_API.split("/")[-2])
|
||||||
self._request.called_urls = []
|
self._request.called_urls = []
|
||||||
self._auth.cognito_token = ""
|
self._cognito_token = ""
|
||||||
self._auth.id_token = ""
|
self._id_token = ""
|
||||||
self._auth.access_token = ""
|
self._access_token = ""
|
||||||
self._auth.refresh_token = ""
|
self._refresh_token = ""
|
||||||
|
@ -21,7 +21,7 @@ class HonDevice:
|
|||||||
return self._os_version
|
return self._os_version
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def os_type(self) -> str:
|
def os(self) -> str:
|
||||||
return self._os
|
return self._os
|
||||||
|
|
||||||
@property
|
@property
|
||||||
@ -36,7 +36,7 @@ class HonDevice:
|
|||||||
result: Dict[str, str | int] = {
|
result: Dict[str, str | int] = {
|
||||||
"appVersion": self.app_version,
|
"appVersion": self.app_version,
|
||||||
"mobileId": self.mobile_id,
|
"mobileId": self.mobile_id,
|
||||||
"os": self.os_type,
|
"os": self.os,
|
||||||
"osVersion": self.os_version,
|
"osVersion": self.os_version,
|
||||||
"deviceModel": self.device_model,
|
"deviceModel": self.device_model,
|
||||||
}
|
}
|
||||||
|
@ -4,7 +4,6 @@ from contextlib import asynccontextmanager
|
|||||||
from typing import Dict, Any
|
from typing import Dict, Any
|
||||||
|
|
||||||
import aiohttp
|
import aiohttp
|
||||||
from yarl import URL
|
|
||||||
|
|
||||||
from pyhon import const
|
from pyhon import const
|
||||||
from pyhon.connection.handler.base import ConnectionHandler
|
from pyhon.connection.handler.base import ConnectionHandler
|
||||||
@ -18,10 +17,10 @@ class HonAnonymousConnectionHandler(ConnectionHandler):
|
|||||||
|
|
||||||
@asynccontextmanager
|
@asynccontextmanager
|
||||||
async def _intercept(
|
async def _intercept(
|
||||||
self, method: Callback, url: str | URL, *args: Any, **kwargs: Dict[str, Any]
|
self, method: Callback, *args: Any, **kwargs: Any
|
||||||
) -> AsyncIterator[aiohttp.ClientResponse]:
|
) -> AsyncIterator[aiohttp.ClientResponse]:
|
||||||
kwargs["headers"] = kwargs.pop("headers", {}) | self._HEADERS
|
kwargs["headers"] = kwargs.pop("headers", {}) | self._HEADERS
|
||||||
async with method(url, *args, **kwargs) as response:
|
async with method(*args, **kwargs) as response:
|
||||||
if response.status == 403:
|
if response.status == 403:
|
||||||
_LOGGER.error("Can't authenticate anymore")
|
_LOGGER.error("Can't authenticate anymore")
|
||||||
yield response
|
yield response
|
||||||
|
@ -1,10 +1,9 @@
|
|||||||
import logging
|
import logging
|
||||||
from collections.abc import AsyncIterator
|
from collections.abc import AsyncIterator
|
||||||
from contextlib import asynccontextmanager
|
from contextlib import asynccontextmanager
|
||||||
from typing import Optional, List, Tuple, Any, Dict
|
from typing import Optional, List, Tuple, Any
|
||||||
|
|
||||||
import aiohttp
|
import aiohttp
|
||||||
from yarl import URL
|
|
||||||
|
|
||||||
from pyhon import const
|
from pyhon import const
|
||||||
from pyhon.connection.handler.base import ConnectionHandler
|
from pyhon.connection.handler.base import ConnectionHandler
|
||||||
@ -30,9 +29,9 @@ class HonAuthConnectionHandler(ConnectionHandler):
|
|||||||
|
|
||||||
@asynccontextmanager
|
@asynccontextmanager
|
||||||
async def _intercept(
|
async def _intercept(
|
||||||
self, method: Callback, url: str | URL, *args: Any, **kwargs: Dict[str, Any]
|
self, method: Callback, *args: Any, **kwargs: Any
|
||||||
) -> AsyncIterator[aiohttp.ClientResponse]:
|
) -> AsyncIterator[aiohttp.ClientResponse]:
|
||||||
kwargs["headers"] = kwargs.pop("headers", {}) | self._HEADERS
|
kwargs["headers"] = kwargs.pop("headers", {}) | self._HEADERS
|
||||||
async with method(url, *args, **kwargs) as response:
|
async with method(*args, **kwargs) as response:
|
||||||
self._called_urls.append((response.status, str(response.request_info.url)))
|
self._called_urls.append((response.status, str(response.request_info.url)))
|
||||||
yield response
|
yield response
|
||||||
|
@ -2,11 +2,10 @@ import logging
|
|||||||
from collections.abc import AsyncIterator
|
from collections.abc import AsyncIterator
|
||||||
from contextlib import asynccontextmanager
|
from contextlib import asynccontextmanager
|
||||||
from types import TracebackType
|
from types import TracebackType
|
||||||
from typing import Optional, Dict, Type, Any
|
from typing import Optional, Dict, Type, Any, Protocol
|
||||||
|
|
||||||
import aiohttp
|
import aiohttp
|
||||||
from typing_extensions import Self
|
from typing_extensions import Self
|
||||||
from yarl import URL
|
|
||||||
|
|
||||||
from pyhon import const, exceptions
|
from pyhon import const, exceptions
|
||||||
from pyhon.typedefs import Callback
|
from pyhon.typedefs import Callback
|
||||||
@ -47,11 +46,10 @@ class ConnectionHandler:
|
|||||||
return self
|
return self
|
||||||
|
|
||||||
@asynccontextmanager
|
@asynccontextmanager
|
||||||
async def _intercept(
|
def _intercept(
|
||||||
self, method: Callback, url: str | URL, *args: Any, **kwargs: Dict[str, Any]
|
self, method: Callback, *args: Any, loop: int = 0, **kwargs: Any
|
||||||
) -> AsyncIterator[aiohttp.ClientResponse]:
|
) -> AsyncIterator[aiohttp.ClientResponse]:
|
||||||
async with method(url, *args, **kwargs) as response:
|
raise NotImplementedError
|
||||||
yield response
|
|
||||||
|
|
||||||
@asynccontextmanager
|
@asynccontextmanager
|
||||||
async def get(
|
async def get(
|
||||||
@ -60,8 +58,7 @@ class ConnectionHandler:
|
|||||||
if self._session is None:
|
if self._session is None:
|
||||||
raise exceptions.NoSessionException()
|
raise exceptions.NoSessionException()
|
||||||
response: aiohttp.ClientResponse
|
response: aiohttp.ClientResponse
|
||||||
args = self._session.get, *args
|
async with self._intercept(self._session.get, *args, **kwargs) as response: # type: ignore[arg-type]
|
||||||
async with self._intercept(*args, **kwargs) as response:
|
|
||||||
yield response
|
yield response
|
||||||
|
|
||||||
@asynccontextmanager
|
@asynccontextmanager
|
||||||
@ -71,8 +68,7 @@ class ConnectionHandler:
|
|||||||
if self._session is None:
|
if self._session is None:
|
||||||
raise exceptions.NoSessionException()
|
raise exceptions.NoSessionException()
|
||||||
response: aiohttp.ClientResponse
|
response: aiohttp.ClientResponse
|
||||||
args = self._session.post, *args
|
async with self._intercept(self._session.post, *args, **kwargs) as response: # type: ignore[arg-type]
|
||||||
async with self._intercept(*args, **kwargs) as response:
|
|
||||||
yield response
|
yield response
|
||||||
|
|
||||||
async def close(self) -> None:
|
async def close(self) -> None:
|
||||||
|
@ -6,7 +6,6 @@ from typing import Optional, Dict, Any
|
|||||||
|
|
||||||
import aiohttp
|
import aiohttp
|
||||||
from typing_extensions import Self
|
from typing_extensions import Self
|
||||||
from yarl import URL
|
|
||||||
|
|
||||||
from pyhon.connection.auth import HonAuth
|
from pyhon.connection.auth import HonAuth
|
||||||
from pyhon.connection.device import HonDevice
|
from pyhon.connection.device import HonDevice
|
||||||
@ -55,19 +54,16 @@ class HonConnectionHandler(ConnectionHandler):
|
|||||||
|
|
||||||
@asynccontextmanager
|
@asynccontextmanager
|
||||||
async def _intercept(
|
async def _intercept(
|
||||||
self, method: Callback, url: str | URL, *args: Any, **kwargs: Any
|
self, method: Callback, *args: Any, loop: int = 0, **kwargs: Dict[str, str]
|
||||||
) -> AsyncIterator[aiohttp.ClientResponse]:
|
) -> AsyncIterator[aiohttp.ClientResponse]:
|
||||||
loop: int = kwargs.pop("loop", 0)
|
|
||||||
kwargs["headers"] = await self._check_headers(kwargs.get("headers", {}))
|
kwargs["headers"] = await self._check_headers(kwargs.get("headers", {}))
|
||||||
async with method(url, *args, **kwargs) as response:
|
async with method(args[0], *args[1:], **kwargs) as response:
|
||||||
if (
|
if (
|
||||||
self.auth.token_expires_soon or response.status in [401, 403]
|
self.auth.token_expires_soon or response.status in [401, 403]
|
||||||
) and loop == 0:
|
) and loop == 0:
|
||||||
_LOGGER.info("Try refreshing token...")
|
_LOGGER.info("Try refreshing token...")
|
||||||
await self.auth.refresh()
|
await self.auth.refresh()
|
||||||
async with self._intercept(
|
async with self._intercept(method, loop=loop + 1, **kwargs) as result:
|
||||||
method, url, *args, loop=loop + 1, **kwargs
|
|
||||||
) as result:
|
|
||||||
yield result
|
yield result
|
||||||
elif (
|
elif (
|
||||||
self.auth.token_is_expired or response.status in [401, 403]
|
self.auth.token_is_expired or response.status in [401, 403]
|
||||||
@ -79,9 +75,7 @@ class HonConnectionHandler(ConnectionHandler):
|
|||||||
await response.text(),
|
await response.text(),
|
||||||
)
|
)
|
||||||
await self.create()
|
await self.create()
|
||||||
async with self._intercept(
|
async with self._intercept(method, loop=loop + 1, **kwargs) as result:
|
||||||
method, url, *args, loop=loop + 1, **kwargs
|
|
||||||
) as result:
|
|
||||||
yield result
|
yield result
|
||||||
elif loop >= 2:
|
elif loop >= 2:
|
||||||
_LOGGER.error(
|
_LOGGER.error(
|
||||||
@ -95,11 +89,11 @@ class HonConnectionHandler(ConnectionHandler):
|
|||||||
try:
|
try:
|
||||||
await response.json()
|
await response.json()
|
||||||
yield response
|
yield response
|
||||||
except json.JSONDecodeError as exc:
|
except json.JSONDecodeError:
|
||||||
_LOGGER.warning(
|
_LOGGER.warning(
|
||||||
"%s - JsonDecodeError %s - %s",
|
"%s - JsonDecodeError %s - %s",
|
||||||
response.request_info.url,
|
response.request_info.url,
|
||||||
response.status,
|
response.status,
|
||||||
await response.text(),
|
await response.text(),
|
||||||
)
|
)
|
||||||
raise HonAuthenticationError("Decode Error") from exc
|
raise HonAuthenticationError("Decode Error")
|
||||||
|
@ -2,11 +2,9 @@ AUTH_API = "https://account2.hon-smarthome.com"
|
|||||||
API_URL = "https://api-iot.he.services"
|
API_URL = "https://api-iot.he.services"
|
||||||
API_KEY = "GRCqFhC6Gk@ikWXm1RmnSmX1cm,MxY-configuration"
|
API_KEY = "GRCqFhC6Gk@ikWXm1RmnSmX1cm,MxY-configuration"
|
||||||
APP = "hon"
|
APP = "hon"
|
||||||
CLIENT_ID = (
|
# All seen id's (different accounts, different devices) are the same, so I guess this hash is static
|
||||||
"3MVG9QDx8IX8nP5T2Ha8ofvlmjLZl5L_gvfbT9."
|
CLIENT_ID = "3MVG9QDx8IX8nP5T2Ha8ofvlmjLZl5L_gvfbT9.HJvpHGKoAS_dcMN8LYpTSYeVFCraUnV.2Ag1Ki7m4znVO6"
|
||||||
"HJvpHGKoAS_dcMN8LYpTSYeVFCraUnV.2Ag1Ki7m4znVO6"
|
APP_VERSION = "2.0.10"
|
||||||
)
|
|
||||||
APP_VERSION = "2.4.7"
|
|
||||||
OS_VERSION = 31
|
OS_VERSION = 31
|
||||||
OS = "android"
|
OS = "android"
|
||||||
DEVICE_MODEL = "exynos9820"
|
DEVICE_MODEL = "exynos9820"
|
||||||
|
@ -70,7 +70,7 @@ async def zip_archive(
|
|||||||
) -> str:
|
) -> str:
|
||||||
data = await appliance_data(appliance, path, anonymous)
|
data = await appliance_data(appliance, path, anonymous)
|
||||||
archive = data[0].parent
|
archive = data[0].parent
|
||||||
shutil.make_archive(str(archive), "zip", archive)
|
shutil.make_archive(str(archive.parent), "zip", archive)
|
||||||
shutil.rmtree(archive)
|
shutil.rmtree(archive)
|
||||||
return f"{archive.stem}.zip"
|
return f"{archive.stem}.zip"
|
||||||
|
|
||||||
@ -89,11 +89,12 @@ def yaml_export(appliance: "HonAppliance", anonymous: bool = False) -> str:
|
|||||||
if anonymous:
|
if anonymous:
|
||||||
for sensible in ["serialNumber", "coords"]:
|
for sensible in ["serialNumber", "coords"]:
|
||||||
data.get("appliance", {}).pop(sensible, None)
|
data.get("appliance", {}).pop(sensible, None)
|
||||||
result = printer.pretty_print({"data": data})
|
data = {
|
||||||
if commands := printer.create_commands(appliance.commands):
|
"data": data,
|
||||||
result += printer.pretty_print({"commands": commands})
|
"commands": printer.create_command(appliance.commands),
|
||||||
if rules := printer.create_rules(appliance.commands):
|
"rules": printer.create_rules(appliance.commands),
|
||||||
result += printer.pretty_print({"rules": rules})
|
}
|
||||||
|
result = printer.pretty_print(data)
|
||||||
if anonymous:
|
if anonymous:
|
||||||
result = anonymize_data(result)
|
result = anonymize_data(result)
|
||||||
return result
|
return result
|
||||||
|
@ -7,10 +7,9 @@ from typing import List, Optional, Dict, Any, Type
|
|||||||
from aiohttp import ClientSession
|
from aiohttp import ClientSession
|
||||||
from typing_extensions import Self
|
from typing_extensions import Self
|
||||||
|
|
||||||
|
from pyhon import HonAPI, exceptions
|
||||||
from pyhon.appliance import HonAppliance
|
from pyhon.appliance import HonAppliance
|
||||||
from pyhon.connection.api import HonAPI
|
|
||||||
from pyhon.connection.api import TestAPI
|
from pyhon.connection.api import TestAPI
|
||||||
from pyhon.exceptions import NoAuthenticationException
|
|
||||||
|
|
||||||
_LOGGER = logging.getLogger(__name__)
|
_LOGGER = logging.getLogger(__name__)
|
||||||
|
|
||||||
@ -44,7 +43,7 @@ class Hon:
|
|||||||
@property
|
@property
|
||||||
def api(self) -> HonAPI:
|
def api(self) -> HonAPI:
|
||||||
if self._api is None:
|
if self._api is None:
|
||||||
raise NoAuthenticationException
|
raise exceptions.NoAuthenticationException
|
||||||
return self._api
|
return self._api
|
||||||
|
|
||||||
@property
|
@property
|
||||||
|
@ -7,21 +7,14 @@ if TYPE_CHECKING:
|
|||||||
class HonParameter:
|
class HonParameter:
|
||||||
def __init__(self, key: str, attributes: Dict[str, Any], group: str) -> None:
|
def __init__(self, key: str, attributes: Dict[str, Any], group: str) -> None:
|
||||||
self._key = key
|
self._key = key
|
||||||
self._attributes = attributes
|
self._category: str = attributes.get("category", "")
|
||||||
self._category: str = ""
|
self._typology: str = attributes.get("typology", "")
|
||||||
self._typology: str = ""
|
self._mandatory: int = attributes.get("mandatory", 0)
|
||||||
self._mandatory: int = 0
|
|
||||||
self._value: str | float = ""
|
self._value: str | float = ""
|
||||||
self._group: str = group
|
self._group: str = group
|
||||||
self._triggers: Dict[
|
self._triggers: Dict[
|
||||||
str, List[Tuple[Callable[["HonRule"], None], "HonRule"]]
|
str, List[Tuple[Callable[["HonRule"], None], "HonRule"]]
|
||||||
] = {}
|
] = {}
|
||||||
self._set_attributes()
|
|
||||||
|
|
||||||
def _set_attributes(self) -> None:
|
|
||||||
self._category = self._attributes.get("category", "")
|
|
||||||
self._typology = self._attributes.get("typology", "")
|
|
||||||
self._mandatory = self._attributes.get("mandatory", 0)
|
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def key(self) -> str:
|
def key(self) -> str:
|
||||||
@ -68,9 +61,8 @@ class HonParameter:
|
|||||||
self._triggers.setdefault(value, []).append((func, data))
|
self._triggers.setdefault(value, []).append((func, data))
|
||||||
|
|
||||||
def check_trigger(self, value: str | float) -> None:
|
def check_trigger(self, value: str | float) -> None:
|
||||||
triggers = {str(k).lower(): v for k, v in self._triggers.items()}
|
if str(value) in self._triggers:
|
||||||
if str(value).lower() in triggers:
|
for trigger in self._triggers[str(value)]:
|
||||||
for trigger in triggers[str(value)]:
|
|
||||||
func, args = trigger
|
func, args = trigger
|
||||||
func(args)
|
func(args)
|
||||||
|
|
||||||
@ -93,6 +85,3 @@ class HonParameter:
|
|||||||
param[rule.param_key] = rule.param_data.get("defaultValue", "")
|
param[rule.param_key] = rule.param_data.get("defaultValue", "")
|
||||||
|
|
||||||
return result
|
return result
|
||||||
|
|
||||||
def reset(self) -> None:
|
|
||||||
self._set_attributes()
|
|
||||||
|
@ -10,19 +10,12 @@ def clean_value(value: str | float) -> str:
|
|||||||
class HonParameterEnum(HonParameter):
|
class HonParameterEnum(HonParameter):
|
||||||
def __init__(self, key: str, attributes: Dict[str, Any], group: str) -> None:
|
def __init__(self, key: str, attributes: Dict[str, Any], group: str) -> None:
|
||||||
super().__init__(key, attributes, group)
|
super().__init__(key, attributes, group)
|
||||||
self._default: str | float = ""
|
self._default = attributes.get("defaultValue")
|
||||||
self._value: str | float = ""
|
self._value = self._default or "0"
|
||||||
self._values: List[str] = []
|
self._values: List[str] = attributes.get("enumValues", [])
|
||||||
self._set_attributes()
|
|
||||||
if self._default and clean_value(self._default.strip("[]")) not in self.values:
|
if self._default and clean_value(self._default.strip("[]")) not in self.values:
|
||||||
self._values.append(self._default)
|
self._values.append(self._default)
|
||||||
|
|
||||||
def _set_attributes(self) -> None:
|
|
||||||
super()._set_attributes()
|
|
||||||
self._default = self._attributes.get("defaultValue", "")
|
|
||||||
self._value = self._default or "0"
|
|
||||||
self._values = self._attributes.get("enumValues", [])
|
|
||||||
|
|
||||||
def __repr__(self) -> str:
|
def __repr__(self) -> str:
|
||||||
return f"{self.__class__} (<{self.key}> {self.values})"
|
return f"{self.__class__} (<{self.key}> {self.values})"
|
||||||
|
|
||||||
@ -48,4 +41,4 @@ class HonParameterEnum(HonParameter):
|
|||||||
self._value = value
|
self._value = value
|
||||||
self.check_trigger(value)
|
self.check_trigger(value)
|
||||||
else:
|
else:
|
||||||
raise ValueError(f"Allowed values: {self._values} But was: {value}")
|
raise ValueError(f"Allowed values {self._values}")
|
||||||
|
@ -6,19 +6,14 @@ from pyhon.parameter.base import HonParameter
|
|||||||
class HonParameterFixed(HonParameter):
|
class HonParameterFixed(HonParameter):
|
||||||
def __init__(self, key: str, attributes: Dict[str, Any], group: str) -> None:
|
def __init__(self, key: str, attributes: Dict[str, Any], group: str) -> None:
|
||||||
super().__init__(key, attributes, group)
|
super().__init__(key, attributes, group)
|
||||||
self._value: str | float = ""
|
self._value = attributes.get("fixedValue", None)
|
||||||
self._set_attributes()
|
|
||||||
|
|
||||||
def _set_attributes(self) -> None:
|
|
||||||
super()._set_attributes()
|
|
||||||
self._value = self._attributes.get("fixedValue", "")
|
|
||||||
|
|
||||||
def __repr__(self) -> str:
|
def __repr__(self) -> str:
|
||||||
return f"{self.__class__} (<{self.key}> fixed)"
|
return f"{self.__class__} (<{self.key}> fixed)"
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def value(self) -> str | float:
|
def value(self) -> str | float:
|
||||||
return self._value if self._value != "" else "0"
|
return self._value if self._value is not None else "0"
|
||||||
|
|
||||||
@value.setter
|
@value.setter
|
||||||
def value(self, value: str | float) -> None:
|
def value(self, value: str | float) -> None:
|
||||||
|
@ -28,7 +28,7 @@ class HonParameterProgram(HonParameterEnum):
|
|||||||
if value in self.values:
|
if value in self.values:
|
||||||
self._command.category = value
|
self._command.category = value
|
||||||
else:
|
else:
|
||||||
raise ValueError(f"Allowed values: {self.values} But was: {value}")
|
raise ValueError(f"Allowed values {self.values}")
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def values(self) -> List[str]:
|
def values(self) -> List[str]:
|
||||||
@ -37,19 +37,17 @@ class HonParameterProgram(HonParameterEnum):
|
|||||||
|
|
||||||
@values.setter
|
@values.setter
|
||||||
def values(self, values: List[str]) -> None:
|
def values(self, values: List[str]) -> None:
|
||||||
raise ValueError("Cant set values {values}")
|
return
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def ids(self) -> Dict[int, str]:
|
def ids(self) -> Dict[int, str]:
|
||||||
values: Dict[int, str] = {}
|
values = {
|
||||||
for name, parameter in self._programs.items():
|
int(p.parameters["prCode"].value): n
|
||||||
if "iot_" in name:
|
for i, (n, p) in enumerate(self._programs.items())
|
||||||
continue
|
if "iot_" not in n
|
||||||
if parameter.parameters.get("prCode"):
|
and p.parameters.get("prCode")
|
||||||
continue
|
and not ((fav := p.parameters.get("favourite")) and fav.value == "1")
|
||||||
if (fav := parameter.parameters.get("favourite")) and fav.value == "1":
|
}
|
||||||
continue
|
|
||||||
values[int(parameter.parameters["prCode"].value)] = name
|
|
||||||
return dict(sorted(values.items()))
|
return dict(sorted(values.items()))
|
||||||
|
|
||||||
def set_value(self, value: str) -> None:
|
def set_value(self, value: str) -> None:
|
||||||
|
@ -7,20 +7,11 @@ from pyhon.parameter.base import HonParameter
|
|||||||
class HonParameterRange(HonParameter):
|
class HonParameterRange(HonParameter):
|
||||||
def __init__(self, key: str, attributes: Dict[str, Any], group: str) -> None:
|
def __init__(self, key: str, attributes: Dict[str, Any], group: str) -> None:
|
||||||
super().__init__(key, attributes, group)
|
super().__init__(key, attributes, group)
|
||||||
self._min: float = 0
|
self._min: float = str_to_float(attributes["minimumValue"])
|
||||||
self._max: float = 0
|
self._max: float = str_to_float(attributes["maximumValue"])
|
||||||
self._step: float = 0
|
self._step: float = str_to_float(attributes["incrementValue"])
|
||||||
self._default: float = 0
|
self._default: float = str_to_float(attributes.get("defaultValue", self.min))
|
||||||
self._value: float = 0
|
self._value: float = self._default
|
||||||
self._set_attributes()
|
|
||||||
|
|
||||||
def _set_attributes(self) -> None:
|
|
||||||
super()._set_attributes()
|
|
||||||
self._min = str_to_float(self._attributes.get("minimumValue", 0))
|
|
||||||
self._max = str_to_float(self._attributes.get("maximumValue", 0))
|
|
||||||
self._step = str_to_float(self._attributes.get("incrementValue", 0))
|
|
||||||
self._default = str_to_float(self._attributes.get("defaultValue", self.min))
|
|
||||||
self._value = self._default
|
|
||||||
|
|
||||||
def __repr__(self) -> str:
|
def __repr__(self) -> str:
|
||||||
return f"{self.__class__} (<{self.key}> [{self.min} - {self.max}])"
|
return f"{self.__class__} (<{self.key}> [{self.min} - {self.max}])"
|
||||||
@ -58,14 +49,11 @@ class HonParameterRange(HonParameter):
|
|||||||
@value.setter
|
@value.setter
|
||||||
def value(self, value: str | float) -> None:
|
def value(self, value: str | float) -> None:
|
||||||
value = str_to_float(value)
|
value = str_to_float(value)
|
||||||
if self.min <= value <= self.max and not ((value - self.min) * 100) % (
|
if self.min <= value <= self.max and not (value - self.min) % self.step:
|
||||||
self.step * 100
|
|
||||||
):
|
|
||||||
self._value = value
|
self._value = value
|
||||||
self.check_trigger(value)
|
self.check_trigger(value)
|
||||||
else:
|
else:
|
||||||
allowed = f"min {self.min} max {self.max} step {self.step}"
|
raise ValueError(f"Allowed: min {self.min} max {self.max} step {self.step}")
|
||||||
raise ValueError(f"Allowed: {allowed} But was: {value}")
|
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def values(self) -> List[str]:
|
def values(self) -> List[str]:
|
||||||
|
@ -29,30 +29,37 @@ def pretty_print(
|
|||||||
whitespace: str = " ",
|
whitespace: str = " ",
|
||||||
) -> str:
|
) -> str:
|
||||||
result = ""
|
result = ""
|
||||||
space = whitespace * intend
|
|
||||||
if isinstance(data, (dict, list)) and key:
|
|
||||||
result += f"{space}{'- ' if is_list else ''}{key}:\n"
|
|
||||||
intend += 1
|
|
||||||
if isinstance(data, list):
|
if isinstance(data, list):
|
||||||
|
if key:
|
||||||
|
result += f"{whitespace * intend}{'- ' if is_list else ''}{key}:\n"
|
||||||
|
intend += 1
|
||||||
for i, value in enumerate(data):
|
for i, value in enumerate(data):
|
||||||
result += pretty_print(
|
result += pretty_print(
|
||||||
value, intend=intend, is_list=True, whitespace=whitespace
|
value, intend=intend, is_list=True, whitespace=whitespace
|
||||||
)
|
)
|
||||||
elif isinstance(data, dict):
|
elif isinstance(data, dict):
|
||||||
for i, (list_key, value) in enumerate(sorted(data.items())):
|
if key:
|
||||||
result += pretty_print(
|
result += f"{whitespace * intend}{'- ' if is_list else ''}{key}:\n"
|
||||||
value,
|
intend += 1
|
||||||
key=list_key,
|
for i, (key, value) in enumerate(sorted(data.items())):
|
||||||
intend=intend + (is_list if i else 0),
|
if is_list and not i:
|
||||||
is_list=is_list and not i,
|
result += pretty_print(
|
||||||
whitespace=whitespace,
|
value, key=key, intend=intend, is_list=True, whitespace=whitespace
|
||||||
)
|
)
|
||||||
|
elif is_list:
|
||||||
|
result += pretty_print(
|
||||||
|
value, key=key, intend=intend + 1, whitespace=whitespace
|
||||||
|
)
|
||||||
|
else:
|
||||||
|
result += pretty_print(
|
||||||
|
value, key=key, intend=intend, whitespace=whitespace
|
||||||
|
)
|
||||||
else:
|
else:
|
||||||
result += f"{space}{'- ' if is_list else ''}{key}{': ' if key else ''}{data}\n"
|
result += f"{whitespace * intend}{'- ' if is_list else ''}{key}{': ' if key else ''}{data}\n"
|
||||||
return result
|
return result
|
||||||
|
|
||||||
|
|
||||||
def create_commands(
|
def create_command(
|
||||||
commands: Dict[str, "HonCommand"], concat: bool = False
|
commands: Dict[str, "HonCommand"], concat: bool = False
|
||||||
) -> Dict[str, Any]:
|
) -> Dict[str, Any]:
|
||||||
result: Dict[str, Any] = {}
|
result: Dict[str, Any] = {}
|
||||||
|
@ -3,7 +3,6 @@ from typing import List, Dict, TYPE_CHECKING, Any, Optional
|
|||||||
|
|
||||||
from pyhon.parameter.enum import HonParameterEnum
|
from pyhon.parameter.enum import HonParameterEnum
|
||||||
from pyhon.parameter.range import HonParameterRange
|
from pyhon.parameter.range import HonParameterRange
|
||||||
from pyhon.typedefs import Parameter
|
|
||||||
|
|
||||||
if TYPE_CHECKING:
|
if TYPE_CHECKING:
|
||||||
from pyhon.commands import HonCommand
|
from pyhon.commands import HonCommand
|
||||||
@ -25,10 +24,6 @@ class HonRuleSet:
|
|||||||
self._rules: Dict[str, List[HonRule]] = {}
|
self._rules: Dict[str, List[HonRule]] = {}
|
||||||
self._parse_rule(rule)
|
self._parse_rule(rule)
|
||||||
|
|
||||||
@property
|
|
||||||
def rules(self) -> Dict[str, List[HonRule]]:
|
|
||||||
return self._rules
|
|
||||||
|
|
||||||
def _parse_rule(self, rule: Dict[str, Any]) -> None:
|
def _parse_rule(self, rule: Dict[str, Any]) -> None:
|
||||||
for param_key, params in rule.items():
|
for param_key, params in rule.items():
|
||||||
param_key = self._command.appliance.options.get(param_key, param_key)
|
param_key = self._command.appliance.options.get(param_key, param_key)
|
||||||
@ -56,11 +51,6 @@ class HonRuleSet:
|
|||||||
extra[trigger_key] = trigger_value
|
extra[trigger_key] = trigger_value
|
||||||
for extra_key, extra_data in param_data.items():
|
for extra_key, extra_data in param_data.items():
|
||||||
self._parse_conditions(param_key, extra_key, extra_data, extra)
|
self._parse_conditions(param_key, extra_key, extra_data, extra)
|
||||||
else:
|
|
||||||
param_data = {"typology": "fixed", "fixedValue": param_data}
|
|
||||||
self._create_rule(
|
|
||||||
param_key, trigger_key, trigger_value, param_data, extra
|
|
||||||
)
|
|
||||||
|
|
||||||
def _create_rule(
|
def _create_rule(
|
||||||
self,
|
self,
|
||||||
@ -93,46 +83,28 @@ class HonRuleSet:
|
|||||||
for rule in rules:
|
for rule in rules:
|
||||||
self._rules.setdefault(key, []).append(rule)
|
self._rules.setdefault(key, []).append(rule)
|
||||||
|
|
||||||
def _extra_rules_matches(self, rule: HonRule) -> bool:
|
|
||||||
if rule.extras:
|
|
||||||
for key, value in rule.extras.items():
|
|
||||||
if not self._command.parameters.get(key):
|
|
||||||
return False
|
|
||||||
if str(self._command.parameters.get(key)) != str(value):
|
|
||||||
return False
|
|
||||||
return True
|
|
||||||
|
|
||||||
def _apply_fixed(self, param: Parameter, value: str | float) -> None:
|
|
||||||
if isinstance(param, HonParameterEnum) and set(param.values) != {str(value)}:
|
|
||||||
param.values = [str(value)]
|
|
||||||
param.value = str(value)
|
|
||||||
elif isinstance(param, HonParameterRange):
|
|
||||||
if float(value) < param.min:
|
|
||||||
param.min = float(value)
|
|
||||||
elif float(value) > param.max:
|
|
||||||
param.max = float(value)
|
|
||||||
param.value = float(value)
|
|
||||||
return
|
|
||||||
param.value = str(value)
|
|
||||||
|
|
||||||
def _apply_enum(self, param: Parameter, rule: HonRule) -> None:
|
|
||||||
if not isinstance(param, HonParameterEnum):
|
|
||||||
return
|
|
||||||
if enum_values := rule.param_data.get("enumValues"):
|
|
||||||
param.values = enum_values.split("|")
|
|
||||||
if default_value := rule.param_data.get("defaultValue"):
|
|
||||||
param.value = default_value
|
|
||||||
|
|
||||||
def _add_trigger(self, parameter: "HonParameter", data: HonRule) -> None:
|
def _add_trigger(self, parameter: "HonParameter", data: HonRule) -> None:
|
||||||
def apply(rule: HonRule) -> None:
|
def apply(rule: HonRule) -> None:
|
||||||
if not self._extra_rules_matches(rule):
|
if rule.extras is not None:
|
||||||
return
|
for key, value in rule.extras.items():
|
||||||
if not (param := self._command.parameters.get(rule.param_key)):
|
if str(self._command.parameters.get(key)) != str(value):
|
||||||
return
|
return
|
||||||
if fixed_value := rule.param_data.get("fixedValue", ""):
|
if param := self._command.parameters.get(rule.param_key):
|
||||||
self._apply_fixed(param, fixed_value)
|
if value := rule.param_data.get("fixedValue", ""):
|
||||||
elif rule.param_data.get("typology") == "enum":
|
if isinstance(param, HonParameterEnum) and set(param.values) != {
|
||||||
self._apply_enum(param, rule)
|
str(value)
|
||||||
|
}:
|
||||||
|
param.values = [str(value)]
|
||||||
|
elif isinstance(param, HonParameterRange):
|
||||||
|
param.value = float(value)
|
||||||
|
return
|
||||||
|
param.value = str(value)
|
||||||
|
elif rule.param_data.get("typology") == "enum":
|
||||||
|
if isinstance(param, HonParameterEnum):
|
||||||
|
if enum_values := rule.param_data.get("enumValues"):
|
||||||
|
param.values = enum_values.split("|")
|
||||||
|
if default_value := rule.param_data.get("defaultValue"):
|
||||||
|
param.value = default_value
|
||||||
|
|
||||||
parameter.add_trigger(data.trigger_value, apply, data)
|
parameter.add_trigger(data.trigger_value, apply, data)
|
||||||
|
|
||||||
|
@ -11,7 +11,7 @@ if TYPE_CHECKING:
|
|||||||
from pyhon.parameter.range import HonParameterRange
|
from pyhon.parameter.range import HonParameterRange
|
||||||
|
|
||||||
|
|
||||||
class Callback(Protocol): # pylint: disable=too-few-public-methods
|
class Callback(Protocol):
|
||||||
def __call__(
|
def __call__(
|
||||||
self, url: str | URL, *args: Any, **kwargs: Any
|
self, url: str | URL, *args: Any, **kwargs: Any
|
||||||
) -> aiohttp.client._RequestContextManager:
|
) -> aiohttp.client._RequestContextManager:
|
||||||
|
@ -1,3 +1,2 @@
|
|||||||
aiohttp>=3.8
|
aiohttp==3.8.4
|
||||||
yarl>=1.8
|
yarl==1.8.2
|
||||||
typing-extensions>=4.8
|
|
||||||
|
@ -1,5 +1,4 @@
|
|||||||
black>=22.12
|
black==23.3.0
|
||||||
flake8>=6.0
|
flake8==6.0.0
|
||||||
mypy>=0.991
|
mypy==1.2.0
|
||||||
pylint>=2.15
|
pylint==2.17.2
|
||||||
setuptools>=62.3
|
|
||||||
|
7
setup.py
7
setup.py
@ -2,12 +2,12 @@
|
|||||||
|
|
||||||
from setuptools import setup, find_packages
|
from setuptools import setup, find_packages
|
||||||
|
|
||||||
with open("README.md", "r", encoding="utf-8") as f:
|
with open("README.md", "r") as f:
|
||||||
long_description = f.read()
|
long_description = f.read()
|
||||||
|
|
||||||
setup(
|
setup(
|
||||||
name="pyhOn",
|
name="pyhOn",
|
||||||
version="0.15.15",
|
version="0.14.3",
|
||||||
author="Andre Basche",
|
author="Andre Basche",
|
||||||
description="Control hOn devices with python",
|
description="Control hOn devices with python",
|
||||||
long_description=long_description,
|
long_description=long_description,
|
||||||
@ -21,7 +21,7 @@ setup(
|
|||||||
packages=find_packages(),
|
packages=find_packages(),
|
||||||
include_package_data=True,
|
include_package_data=True,
|
||||||
python_requires=">=3.10",
|
python_requires=">=3.10",
|
||||||
install_requires=["aiohttp>=3.8", "typing-extensions>=4.8", "yarl>=1.8"],
|
install_requires=["aiohttp"],
|
||||||
classifiers=[
|
classifiers=[
|
||||||
"Development Status :: 4 - Beta",
|
"Development Status :: 4 - Beta",
|
||||||
"Environment :: Console",
|
"Environment :: Console",
|
||||||
@ -30,7 +30,6 @@ setup(
|
|||||||
"Operating System :: OS Independent",
|
"Operating System :: OS Independent",
|
||||||
"Programming Language :: Python :: 3.10",
|
"Programming Language :: Python :: 3.10",
|
||||||
"Programming Language :: Python :: 3.11",
|
"Programming Language :: Python :: 3.11",
|
||||||
"Programming Language :: Python :: 3.12",
|
|
||||||
"Topic :: Software Development :: Libraries :: Python Modules",
|
"Topic :: Software Development :: Libraries :: Python Modules",
|
||||||
],
|
],
|
||||||
entry_points={
|
entry_points={
|
||||||
|
Loading…
Reference in New Issue
Block a user