Merge branch 'Andre0512:main' into main
This commit is contained in:
commit
3d8011deea
4
.github/workflows/python-check.yml
vendored
4
.github/workflows/python-check.yml
vendored
@ -28,9 +28,7 @@ jobs:
|
|||||||
python -m pip install -r requirements_dev.txt
|
python -m pip install -r requirements_dev.txt
|
||||||
- name: Lint with flake8
|
- name: Lint with flake8
|
||||||
run: |
|
run: |
|
||||||
# stop the build if there are Python syntax errors or undefined names
|
flake8 . --count --statistics
|
||||||
flake8 . --count --select=E9,F63,F7,F82 --show-source --statistics
|
|
||||||
flake8 . --count --exit-zero --max-complexity=10 --max-line-length=88 --statistics
|
|
||||||
- name: Type check with mypy
|
- name: Type check with mypy
|
||||||
run: |
|
run: |
|
||||||
mypy pyhon/
|
mypy pyhon/
|
||||||
|
@ -1,7 +1,9 @@
|
|||||||
[MESSAGES CONTROL]
|
[MESSAGES CONTROL]
|
||||||
|
|
||||||
disable=C,R
|
disable=missing-docstring
|
||||||
|
|
||||||
[FORMAT]
|
[FORMAT]
|
||||||
|
|
||||||
|
max-args=6
|
||||||
|
max-attributes=8
|
||||||
max-line-length=88
|
max-line-length=88
|
||||||
|
1
MANIFEST.in
Normal file
1
MANIFEST.in
Normal file
@ -0,0 +1 @@
|
|||||||
|
include pyhon/py.typed
|
31
mypy.ini
31
mypy.ini
@ -1,9 +1,24 @@
|
|||||||
[mypy]
|
[mypy]
|
||||||
check_untyped_defs = True
|
check_untyped_defs = true
|
||||||
disallow_any_generics = True
|
disallow_any_generics = true
|
||||||
disallow_untyped_defs = True
|
disallow_any_unimported = true
|
||||||
disallow_any_unimported = True
|
disallow_incomplete_defs = true
|
||||||
no_implicit_optional = True
|
disallow_subclassing_any = true
|
||||||
warn_return_any = True
|
disallow_untyped_calls = true
|
||||||
show_error_codes = True
|
disallow_untyped_decorators = true
|
||||||
warn_unused_ignores = True
|
disallow_untyped_defs = true
|
||||||
|
disable_error_code = annotation-unchecked
|
||||||
|
enable_error_code = ignore-without-code, redundant-self, truthy-iterable
|
||||||
|
follow_imports = silent
|
||||||
|
local_partial_types = true
|
||||||
|
no_implicit_optional = true
|
||||||
|
no_implicit_reexport = true
|
||||||
|
show_error_codes = true
|
||||||
|
strict_concatenate = false
|
||||||
|
strict_equality = true
|
||||||
|
warn_incomplete_stub = true
|
||||||
|
warn_redundant_casts = true
|
||||||
|
warn_return_any = true
|
||||||
|
warn_unreachable = true
|
||||||
|
warn_unused_configs = true
|
||||||
|
warn_unused_ignores = true
|
||||||
|
@ -11,6 +11,7 @@ from typing import Tuple, Dict, Any
|
|||||||
if __name__ == "__main__":
|
if __name__ == "__main__":
|
||||||
sys.path.insert(0, str(Path(__file__).parent.parent))
|
sys.path.insert(0, str(Path(__file__).parent.parent))
|
||||||
|
|
||||||
|
# pylint: disable=wrong-import-position
|
||||||
from pyhon import Hon, HonAPI, diagnose, printer
|
from pyhon import Hon, HonAPI, diagnose, printer
|
||||||
|
|
||||||
_LOGGER = logging.getLogger(__name__)
|
_LOGGER = logging.getLogger(__name__)
|
||||||
@ -91,11 +92,9 @@ async def main() -> None:
|
|||||||
data = device.data.copy()
|
data = device.data.copy()
|
||||||
attr = "get" if args.get("all") else "pop"
|
attr = "get" if args.get("all") else "pop"
|
||||||
print(
|
print(
|
||||||
printer.key_print(
|
printer.key_print(getattr(data["attributes"], attr)("parameters"))
|
||||||
data["attributes"].__getattribute__(attr)("parameters")
|
|
||||||
)
|
)
|
||||||
)
|
print(printer.key_print(getattr(data, attr)("appliance")))
|
||||||
print(printer.key_print(data.__getattribute__(attr)("appliance")))
|
|
||||||
print(printer.key_print(data))
|
print(printer.key_print(data))
|
||||||
print(
|
print(
|
||||||
printer.pretty_print(
|
printer.pretty_print(
|
||||||
|
@ -3,7 +3,7 @@ import logging
|
|||||||
import re
|
import re
|
||||||
from datetime import datetime, timedelta
|
from datetime import datetime, timedelta
|
||||||
from pathlib import Path
|
from pathlib import Path
|
||||||
from typing import Optional, Dict, Any, TYPE_CHECKING, List
|
from typing import Optional, Dict, Any, TYPE_CHECKING, List, TypeVar, overload
|
||||||
|
|
||||||
from pyhon import diagnose, exceptions
|
from pyhon import diagnose, exceptions
|
||||||
from pyhon.appliances.base import ApplianceBase
|
from pyhon.appliances.base import ApplianceBase
|
||||||
@ -20,7 +20,10 @@ if TYPE_CHECKING:
|
|||||||
|
|
||||||
_LOGGER = logging.getLogger(__name__)
|
_LOGGER = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
T = TypeVar("T")
|
||||||
|
|
||||||
|
|
||||||
|
# pylint: disable=too-many-public-methods,too-many-instance-attributes
|
||||||
class HonAppliance:
|
class HonAppliance:
|
||||||
_MINIMAL_UPDATE_INTERVAL = 5 # seconds
|
_MINIMAL_UPDATE_INTERVAL = 5 # seconds
|
||||||
|
|
||||||
@ -48,24 +51,35 @@ class HonAppliance:
|
|||||||
except ModuleNotFoundError:
|
except ModuleNotFoundError:
|
||||||
self._extra = None
|
self._extra = None
|
||||||
|
|
||||||
|
def _get_nested_item(self, item: str) -> Any:
|
||||||
|
result: List[Any] | Dict[str, Any] = self.data
|
||||||
|
for key in item.split("."):
|
||||||
|
if all(k in "0123456789" for k in key) and isinstance(result, list):
|
||||||
|
result = result[int(key)]
|
||||||
|
elif isinstance(result, dict):
|
||||||
|
result = result[key]
|
||||||
|
return result
|
||||||
|
|
||||||
def __getitem__(self, item: str) -> Any:
|
def __getitem__(self, item: str) -> Any:
|
||||||
if self._zone:
|
if self._zone:
|
||||||
item += f"Z{self._zone}"
|
item += f"Z{self._zone}"
|
||||||
if "." in item:
|
if "." in item:
|
||||||
result = self.data
|
return self._get_nested_item(item)
|
||||||
for key in item.split("."):
|
|
||||||
if all(k in "0123456789" for k in key) and isinstance(result, list):
|
|
||||||
result = result[int(key)]
|
|
||||||
else:
|
|
||||||
result = result[key]
|
|
||||||
return result
|
|
||||||
if item in self.data:
|
if item in self.data:
|
||||||
return self.data[item]
|
return self.data[item]
|
||||||
if item in self.attributes["parameters"]:
|
if item in self.attributes["parameters"]:
|
||||||
return self.attributes["parameters"][item].value
|
return self.attributes["parameters"][item].value
|
||||||
return self.info[item]
|
return self.info[item]
|
||||||
|
|
||||||
def get(self, item: str, default: Any = None) -> Any:
|
@overload
|
||||||
|
def get(self, item: str, default: None = None) -> Any:
|
||||||
|
...
|
||||||
|
|
||||||
|
@overload
|
||||||
|
def get(self, item: str, default: T) -> T:
|
||||||
|
...
|
||||||
|
|
||||||
|
def get(self, item: str, default: Optional[T] = None) -> Any:
|
||||||
try:
|
try:
|
||||||
return self[item]
|
return self[item]
|
||||||
except (KeyError, IndexError):
|
except (KeyError, IndexError):
|
||||||
@ -188,12 +202,8 @@ class HonAppliance:
|
|||||||
|
|
||||||
async def update(self, force: bool = False) -> None:
|
async def update(self, force: bool = False) -> None:
|
||||||
now = datetime.now()
|
now = datetime.now()
|
||||||
if (
|
min_age = now - timedelta(seconds=self._MINIMAL_UPDATE_INTERVAL)
|
||||||
force
|
if force or not self._last_update or self._last_update < min_age:
|
||||||
or not self._last_update
|
|
||||||
or self._last_update
|
|
||||||
< now - timedelta(seconds=self._MINIMAL_UPDATE_INTERVAL)
|
|
||||||
):
|
|
||||||
self._last_update = now
|
self._last_update = now
|
||||||
await self.load_attributes()
|
await self.load_attributes()
|
||||||
self.sync_params_to_command("settings")
|
self.sync_params_to_command("settings")
|
||||||
@ -253,7 +263,9 @@ class HonAppliance:
|
|||||||
if not (command := self.commands.get(command_name)):
|
if not (command := self.commands.get(command_name)):
|
||||||
return
|
return
|
||||||
for key in command.setting_keys:
|
for key in command.setting_keys:
|
||||||
if (new := self.attributes.get("parameters", {}).get(key)) is None:
|
if (
|
||||||
|
new := self.attributes.get("parameters", {}).get(key)
|
||||||
|
) is None or new.value == "":
|
||||||
continue
|
continue
|
||||||
setting = command.settings[key]
|
setting = command.settings[key]
|
||||||
try:
|
try:
|
||||||
@ -272,18 +284,23 @@ class HonAppliance:
|
|||||||
for command, data in self.commands.items():
|
for command, data in self.commands.items():
|
||||||
if command == main or target and command not in target:
|
if command == main or target and command not in target:
|
||||||
continue
|
continue
|
||||||
for name, parameter in data.parameters.items():
|
|
||||||
if base_value := base.parameters.get(name):
|
for name, target_param in data.parameters.items():
|
||||||
if isinstance(base_value, HonParameterRange) and isinstance(
|
if not (base_param := base.parameters.get(name)):
|
||||||
parameter, HonParameterRange
|
return
|
||||||
|
self.sync_parameter(base_param, target_param)
|
||||||
|
|
||||||
|
def sync_parameter(self, main: Parameter, target: Parameter) -> None:
|
||||||
|
if isinstance(main, HonParameterRange) and isinstance(
|
||||||
|
target, HonParameterRange
|
||||||
):
|
):
|
||||||
parameter.max = base_value.max
|
target.max = main.max
|
||||||
parameter.min = base_value.min
|
target.min = main.min
|
||||||
parameter.step = base_value.step
|
target.step = main.step
|
||||||
elif isinstance(parameter, HonParameterRange):
|
elif isinstance(target, HonParameterRange):
|
||||||
parameter.max = int(base_value.value)
|
target.max = int(main.value)
|
||||||
parameter.min = int(base_value.value)
|
target.min = int(main.value)
|
||||||
parameter.step = 1
|
target.step = 1
|
||||||
elif isinstance(parameter, HonParameterEnum):
|
elif isinstance(target, HonParameterEnum):
|
||||||
parameter.values = base_value.values
|
target.values = main.values
|
||||||
parameter.value = base_value.value
|
target.value = main.value
|
||||||
|
@ -1,3 +1,4 @@
|
|||||||
|
# pylint: disable=duplicate-code
|
||||||
from typing import Any, Dict
|
from typing import Any, Dict
|
||||||
|
|
||||||
from pyhon.appliances.base import ApplianceBase
|
from pyhon.appliances.base import ApplianceBase
|
||||||
|
@ -10,12 +10,12 @@ class Appliance(ApplianceBase):
|
|||||||
data["modeZ1"] = "holiday"
|
data["modeZ1"] = "holiday"
|
||||||
elif data["parameters"]["intelligenceMode"] == "1":
|
elif data["parameters"]["intelligenceMode"] == "1":
|
||||||
data["modeZ1"] = "auto_set"
|
data["modeZ1"] = "auto_set"
|
||||||
elif data["parameters"]["quickModeZ1"] == "1":
|
elif data["parameters"].get("quickModeZ1") == "1":
|
||||||
data["modeZ1"] = "super_cool"
|
data["modeZ1"] = "super_cool"
|
||||||
else:
|
else:
|
||||||
data["modeZ1"] = "no_mode"
|
data["modeZ1"] = "no_mode"
|
||||||
|
|
||||||
if data["parameters"]["quickModeZ2"] == "1":
|
if data["parameters"].get("quickModeZ2") == "1":
|
||||||
data["modeZ2"] = "super_freeze"
|
data["modeZ2"] = "super_freeze"
|
||||||
elif data["parameters"]["intelligenceMode"] == "1":
|
elif data["parameters"]["intelligenceMode"] == "1":
|
||||||
data["modeZ2"] = "auto_set"
|
data["modeZ2"] = "auto_set"
|
||||||
|
@ -1,3 +1,4 @@
|
|||||||
|
# pylint: disable=duplicate-code
|
||||||
from typing import Any, Dict
|
from typing import Any, Dict
|
||||||
|
|
||||||
from pyhon.appliances.base import ApplianceBase
|
from pyhon.appliances.base import ApplianceBase
|
||||||
|
@ -1,3 +1,4 @@
|
|||||||
|
# pylint: disable=duplicate-code
|
||||||
from typing import Dict, Any
|
from typing import Dict, Any
|
||||||
|
|
||||||
from pyhon.appliances.base import ApplianceBase
|
from pyhon.appliances.base import ApplianceBase
|
||||||
|
@ -1,3 +1,4 @@
|
|||||||
|
# pylint: disable=duplicate-code
|
||||||
from typing import Any, Dict
|
from typing import Any, Dict
|
||||||
|
|
||||||
from pyhon.appliances.base import ApplianceBase
|
from pyhon.appliances.base import ApplianceBase
|
||||||
|
@ -17,12 +17,12 @@ class HonCommandLoader:
|
|||||||
"""Loads and parses hOn command data"""
|
"""Loads and parses hOn command data"""
|
||||||
|
|
||||||
def __init__(self, api: "HonAPI", appliance: "HonAppliance") -> None:
|
def __init__(self, api: "HonAPI", appliance: "HonAppliance") -> None:
|
||||||
|
self._api: "HonAPI" = api
|
||||||
|
self._appliance: "HonAppliance" = appliance
|
||||||
self._api_commands: Dict[str, Any] = {}
|
self._api_commands: Dict[str, Any] = {}
|
||||||
self._favourites: List[Dict[str, Any]] = []
|
self._favourites: List[Dict[str, Any]] = []
|
||||||
self._command_history: List[Dict[str, Any]] = []
|
self._command_history: List[Dict[str, Any]] = []
|
||||||
self._commands: Dict[str, HonCommand] = {}
|
self._commands: Dict[str, HonCommand] = {}
|
||||||
self._api: "HonAPI" = api
|
|
||||||
self._appliance: "HonAppliance" = appliance
|
|
||||||
self._appliance_data: Dict[str, Any] = {}
|
self._appliance_data: Dict[str, Any] = {}
|
||||||
self._additional_data: Dict[str, Any] = {}
|
self._additional_data: Dict[str, Any] = {}
|
||||||
|
|
||||||
@ -184,22 +184,44 @@ class HonCommandLoader:
|
|||||||
def _add_favourites(self) -> None:
|
def _add_favourites(self) -> None:
|
||||||
"""Patch program categories with favourites"""
|
"""Patch program categories with favourites"""
|
||||||
for favourite in self._favourites:
|
for favourite in self._favourites:
|
||||||
name = favourite.get("favouriteName", {})
|
name, command_name, base = self._get_favourite_info(favourite)
|
||||||
|
if not base:
|
||||||
|
continue
|
||||||
|
base_command: HonCommand = copy(base)
|
||||||
|
self._update_base_command_with_data(base_command, favourite)
|
||||||
|
self._update_base_command_with_favourite(base_command)
|
||||||
|
self._update_program_categories(command_name, name, base_command)
|
||||||
|
|
||||||
|
def _get_favourite_info(
|
||||||
|
self, favourite: Dict[str, Any]
|
||||||
|
) -> tuple[str, str, HonCommand | None]:
|
||||||
|
name: str = favourite.get("favouriteName", {})
|
||||||
command = favourite.get("command", {})
|
command = favourite.get("command", {})
|
||||||
command_name = command.get("commandName", "")
|
command_name: str = command.get("commandName", "")
|
||||||
program_name = self._clean_name(command.get("programName", ""))
|
program_name = self._clean_name(command.get("programName", ""))
|
||||||
base: HonCommand = copy(
|
base_command = self.commands[command_name].categories.get(program_name)
|
||||||
self.commands[command_name].categories[program_name]
|
return name, command_name, base_command
|
||||||
)
|
|
||||||
|
def _update_base_command_with_data(
|
||||||
|
self, base_command: HonCommand, command: Dict[str, Any]
|
||||||
|
) -> None:
|
||||||
for data in command.values():
|
for data in command.values():
|
||||||
if isinstance(data, str):
|
if isinstance(data, str):
|
||||||
continue
|
continue
|
||||||
for key, value in data.items():
|
for key, value in data.items():
|
||||||
if parameter := base.parameters.get(key):
|
if not (parameter := base_command.parameters.get(key)):
|
||||||
|
continue
|
||||||
with suppress(ValueError):
|
with suppress(ValueError):
|
||||||
parameter.value = value
|
parameter.value = value
|
||||||
|
|
||||||
|
def _update_base_command_with_favourite(self, base_command: HonCommand) -> None:
|
||||||
extra_param = HonParameterFixed("favourite", {"fixedValue": "1"}, "custom")
|
extra_param = HonParameterFixed("favourite", {"fixedValue": "1"}, "custom")
|
||||||
base.parameters.update(favourite=extra_param)
|
base_command.parameters.update(favourite=extra_param)
|
||||||
if isinstance(program := base.parameters["program"], HonParameterProgram):
|
|
||||||
|
def _update_program_categories(
|
||||||
|
self, command_name: str, name: str, base_command: HonCommand
|
||||||
|
) -> None:
|
||||||
|
program = base_command.parameters["program"]
|
||||||
|
if isinstance(program, HonParameterProgram):
|
||||||
program.set_value(name)
|
program.set_value(name)
|
||||||
self.commands[command_name].categories[name] = base
|
self.commands[command_name].categories[name] = base_command
|
||||||
|
@ -27,17 +27,16 @@ class HonCommand:
|
|||||||
categories: Optional[Dict[str, "HonCommand"]] = None,
|
categories: Optional[Dict[str, "HonCommand"]] = None,
|
||||||
category_name: str = "",
|
category_name: str = "",
|
||||||
):
|
):
|
||||||
self._api: Optional[HonAPI] = appliance.api
|
|
||||||
self._appliance: "HonAppliance" = appliance
|
|
||||||
self._name: str = name
|
self._name: str = name
|
||||||
|
self._api: Optional[HonAPI] = None
|
||||||
|
self._appliance: "HonAppliance" = appliance
|
||||||
self._categories: Optional[Dict[str, "HonCommand"]] = categories
|
self._categories: Optional[Dict[str, "HonCommand"]] = categories
|
||||||
self._category_name: str = category_name
|
self._category_name: str = category_name
|
||||||
self._description: str = attributes.pop("description", "")
|
self._parameters: Dict[str, Parameter] = {}
|
||||||
self._protocol_type: str = attributes.pop("protocolType", "")
|
|
||||||
self._parameters: Dict[str, HonParameter] = {}
|
|
||||||
self._data: Dict[str, Any] = {}
|
self._data: Dict[str, Any] = {}
|
||||||
self._available_settings: Dict[str, HonParameter] = {}
|
|
||||||
self._rules: List[HonRuleSet] = []
|
self._rules: List[HonRuleSet] = []
|
||||||
|
attributes.pop("description", "")
|
||||||
|
attributes.pop("protocolType", "")
|
||||||
self._load_parameters(attributes)
|
self._load_parameters(attributes)
|
||||||
|
|
||||||
def __repr__(self) -> str:
|
def __repr__(self) -> str:
|
||||||
@ -49,6 +48,8 @@ class HonCommand:
|
|||||||
|
|
||||||
@property
|
@property
|
||||||
def api(self) -> "HonAPI":
|
def api(self) -> "HonAPI":
|
||||||
|
if self._api is None and self._appliance:
|
||||||
|
self._api = self._appliance.api
|
||||||
if self._api is None:
|
if self._api is None:
|
||||||
raise exceptions.NoAuthenticationException("Missing hOn login")
|
raise exceptions.NoAuthenticationException("Missing hOn login")
|
||||||
return self._api
|
return self._api
|
||||||
|
@ -278,10 +278,12 @@ class TestAPI(HonAPI):
|
|||||||
async def load_appliances(self) -> List[Dict[str, Any]]:
|
async def load_appliances(self) -> List[Dict[str, Any]]:
|
||||||
result = []
|
result = []
|
||||||
for appliance in self._path.glob("*/"):
|
for appliance in self._path.glob("*/"):
|
||||||
with open(
|
file = appliance / "appliance_data.json"
|
||||||
appliance / "appliance_data.json", "r", encoding="utf-8"
|
with open(file, "r", encoding="utf-8") as json_file:
|
||||||
) as json_file:
|
try:
|
||||||
result.append(json.loads(json_file.read()))
|
result.append(json.loads(json_file.read()))
|
||||||
|
except json.decoder.JSONDecodeError as error:
|
||||||
|
_LOGGER.error("%s - %s", str(file), error)
|
||||||
return result
|
return result
|
||||||
|
|
||||||
async def load_commands(self, appliance: HonAppliance) -> Dict[str, Any]:
|
async def load_commands(self, appliance: HonAppliance) -> Dict[str, Any]:
|
||||||
@ -318,4 +320,5 @@ class TestAPI(HonAPI):
|
|||||||
parameters: Dict[str, Any],
|
parameters: Dict[str, Any],
|
||||||
ancillary_parameters: Dict[str, Any],
|
ancillary_parameters: Dict[str, Any],
|
||||||
) -> bool:
|
) -> bool:
|
||||||
|
_LOGGER.info("%s - %s", str(parameters), str(ancillary_parameters))
|
||||||
return True
|
return True
|
||||||
|
@ -30,6 +30,14 @@ class HonLoginData:
|
|||||||
loaded: Optional[Dict[str, Any]] = None
|
loaded: Optional[Dict[str, Any]] = None
|
||||||
|
|
||||||
|
|
||||||
|
@dataclass
|
||||||
|
class HonAuthData:
|
||||||
|
access_token: str = ""
|
||||||
|
refresh_token: str = ""
|
||||||
|
cognito_token: str = ""
|
||||||
|
id_token: str = ""
|
||||||
|
|
||||||
|
|
||||||
class HonAuth:
|
class HonAuth:
|
||||||
_TOKEN_EXPIRES_AFTER_HOURS = 8
|
_TOKEN_EXPIRES_AFTER_HOURS = 8
|
||||||
_TOKEN_EXPIRE_WARNING_HOURS = 7
|
_TOKEN_EXPIRE_WARNING_HOURS = 7
|
||||||
@ -46,28 +54,25 @@ class HonAuth:
|
|||||||
self._login_data = HonLoginData()
|
self._login_data = HonLoginData()
|
||||||
self._login_data.email = email
|
self._login_data.email = email
|
||||||
self._login_data.password = password
|
self._login_data.password = password
|
||||||
self._access_token = ""
|
|
||||||
self._refresh_token = ""
|
|
||||||
self._cognito_token = ""
|
|
||||||
self._id_token = ""
|
|
||||||
self._device = device
|
self._device = device
|
||||||
self._expires: datetime = datetime.utcnow()
|
self._expires: datetime = datetime.utcnow()
|
||||||
|
self._auth = HonAuthData()
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def cognito_token(self) -> str:
|
def cognito_token(self) -> str:
|
||||||
return self._cognito_token
|
return self._auth.cognito_token
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def id_token(self) -> str:
|
def id_token(self) -> str:
|
||||||
return self._id_token
|
return self._auth.id_token
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def access_token(self) -> str:
|
def access_token(self) -> str:
|
||||||
return self._access_token
|
return self._auth.access_token
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def refresh_token(self) -> str:
|
def refresh_token(self) -> str:
|
||||||
return self._refresh_token
|
return self._auth.refresh_token
|
||||||
|
|
||||||
def _check_token_expiration(self, hours: int) -> bool:
|
def _check_token_expiration(self, hours: int) -> bool:
|
||||||
return datetime.utcnow() >= self._expires + timedelta(hours=hours)
|
return datetime.utcnow() >= self._expires + timedelta(hours=hours)
|
||||||
@ -192,12 +197,12 @@ class HonAuth:
|
|||||||
|
|
||||||
def _parse_token_data(self, text: str) -> bool:
|
def _parse_token_data(self, text: str) -> bool:
|
||||||
if access_token := re.findall("access_token=(.*?)&", text):
|
if access_token := re.findall("access_token=(.*?)&", text):
|
||||||
self._access_token = access_token[0]
|
self._auth.access_token = access_token[0]
|
||||||
if refresh_token := re.findall("refresh_token=(.*?)&", text):
|
if refresh_token := re.findall("refresh_token=(.*?)&", text):
|
||||||
self._refresh_token = refresh_token[0]
|
self._auth.refresh_token = refresh_token[0]
|
||||||
if id_token := re.findall("id_token=(.*?)&", text):
|
if id_token := re.findall("id_token=(.*?)&", text):
|
||||||
self._id_token = id_token[0]
|
self._auth.id_token = id_token[0]
|
||||||
return True if access_token and refresh_token and id_token else False
|
return bool(access_token and refresh_token and id_token)
|
||||||
|
|
||||||
async def _get_token(self, url: str) -> bool:
|
async def _get_token(self, url: str) -> bool:
|
||||||
async with self._request.get(url) as response:
|
async with self._request.get(url) as response:
|
||||||
@ -229,7 +234,7 @@ class HonAuth:
|
|||||||
return True
|
return True
|
||||||
|
|
||||||
async def _api_auth(self) -> bool:
|
async def _api_auth(self) -> bool:
|
||||||
post_headers = {"id-token": self._id_token}
|
post_headers = {"id-token": self._auth.id_token}
|
||||||
data = self._device.get()
|
data = self._device.get()
|
||||||
async with self._request.post(
|
async with self._request.post(
|
||||||
f"{const.API_URL}/auth/v1/login", headers=post_headers, json=data
|
f"{const.API_URL}/auth/v1/login", headers=post_headers, json=data
|
||||||
@ -239,8 +244,8 @@ class HonAuth:
|
|||||||
except json.JSONDecodeError:
|
except json.JSONDecodeError:
|
||||||
await self._error_logger(response)
|
await self._error_logger(response)
|
||||||
return False
|
return False
|
||||||
self._cognito_token = json_data.get("cognitoUser", {}).get("Token", "")
|
self._auth.cognito_token = json_data.get("cognitoUser", {}).get("Token", "")
|
||||||
if not self._cognito_token:
|
if not self._auth.cognito_token:
|
||||||
_LOGGER.error(json_data)
|
_LOGGER.error(json_data)
|
||||||
raise exceptions.HonAuthenticationError()
|
raise exceptions.HonAuthenticationError()
|
||||||
return True
|
return True
|
||||||
@ -262,7 +267,7 @@ class HonAuth:
|
|||||||
async def refresh(self) -> bool:
|
async def refresh(self) -> bool:
|
||||||
params = {
|
params = {
|
||||||
"client_id": const.CLIENT_ID,
|
"client_id": const.CLIENT_ID,
|
||||||
"refresh_token": self._refresh_token,
|
"refresh_token": self._auth.refresh_token,
|
||||||
"grant_type": "refresh_token",
|
"grant_type": "refresh_token",
|
||||||
}
|
}
|
||||||
async with self._request.post(
|
async with self._request.post(
|
||||||
@ -273,14 +278,14 @@ class HonAuth:
|
|||||||
return False
|
return False
|
||||||
data = await response.json()
|
data = await response.json()
|
||||||
self._expires = datetime.utcnow()
|
self._expires = datetime.utcnow()
|
||||||
self._id_token = data["id_token"]
|
self._auth.id_token = data["id_token"]
|
||||||
self._access_token = data["access_token"]
|
self._auth.access_token = data["access_token"]
|
||||||
return await self._api_auth()
|
return await self._api_auth()
|
||||||
|
|
||||||
def clear(self) -> None:
|
def clear(self) -> None:
|
||||||
self._session.cookie_jar.clear_domain(const.AUTH_API.split("/")[-2])
|
self._session.cookie_jar.clear_domain(const.AUTH_API.split("/")[-2])
|
||||||
self._request.called_urls = []
|
self._request.called_urls = []
|
||||||
self._cognito_token = ""
|
self._auth.cognito_token = ""
|
||||||
self._id_token = ""
|
self._auth.id_token = ""
|
||||||
self._access_token = ""
|
self._auth.access_token = ""
|
||||||
self._refresh_token = ""
|
self._auth.refresh_token = ""
|
||||||
|
@ -60,7 +60,8 @@ class ConnectionHandler:
|
|||||||
if self._session is None:
|
if self._session is None:
|
||||||
raise exceptions.NoSessionException()
|
raise exceptions.NoSessionException()
|
||||||
response: aiohttp.ClientResponse
|
response: aiohttp.ClientResponse
|
||||||
async with self._intercept(self._session.get, *args, **kwargs) as response: # type: ignore[arg-type]
|
args = self._session.get, *args
|
||||||
|
async with self._intercept(*args, **kwargs) as response:
|
||||||
yield response
|
yield response
|
||||||
|
|
||||||
@asynccontextmanager
|
@asynccontextmanager
|
||||||
@ -70,7 +71,8 @@ class ConnectionHandler:
|
|||||||
if self._session is None:
|
if self._session is None:
|
||||||
raise exceptions.NoSessionException()
|
raise exceptions.NoSessionException()
|
||||||
response: aiohttp.ClientResponse
|
response: aiohttp.ClientResponse
|
||||||
async with self._intercept(self._session.post, *args, **kwargs) as response: # type: ignore[arg-type]
|
args = self._session.post, *args
|
||||||
|
async with self._intercept(*args, **kwargs) as response:
|
||||||
yield response
|
yield response
|
||||||
|
|
||||||
async def close(self) -> None:
|
async def close(self) -> None:
|
||||||
|
@ -2,8 +2,10 @@ AUTH_API = "https://account2.hon-smarthome.com"
|
|||||||
API_URL = "https://api-iot.he.services"
|
API_URL = "https://api-iot.he.services"
|
||||||
API_KEY = "GRCqFhC6Gk@ikWXm1RmnSmX1cm,MxY-configuration"
|
API_KEY = "GRCqFhC6Gk@ikWXm1RmnSmX1cm,MxY-configuration"
|
||||||
APP = "hon"
|
APP = "hon"
|
||||||
# All seen id's (different accounts, different devices) are the same, so I guess this hash is static
|
CLIENT_ID = (
|
||||||
CLIENT_ID = "3MVG9QDx8IX8nP5T2Ha8ofvlmjLZl5L_gvfbT9.HJvpHGKoAS_dcMN8LYpTSYeVFCraUnV.2Ag1Ki7m4znVO6"
|
"3MVG9QDx8IX8nP5T2Ha8ofvlmjLZl5L_gvfbT9."
|
||||||
|
"HJvpHGKoAS_dcMN8LYpTSYeVFCraUnV.2Ag1Ki7m4znVO6"
|
||||||
|
)
|
||||||
APP_VERSION = "2.1.2"
|
APP_VERSION = "2.1.2"
|
||||||
OS_VERSION = 31
|
OS_VERSION = 31
|
||||||
OS = "android"
|
OS = "android"
|
||||||
|
@ -7,9 +7,10 @@ from typing import List, Optional, Dict, Any, Type
|
|||||||
from aiohttp import ClientSession
|
from aiohttp import ClientSession
|
||||||
from typing_extensions import Self
|
from typing_extensions import Self
|
||||||
|
|
||||||
from pyhon import HonAPI, exceptions
|
|
||||||
from pyhon.appliance import HonAppliance
|
from pyhon.appliance import HonAppliance
|
||||||
|
from pyhon.connection.api import HonAPI
|
||||||
from pyhon.connection.api import TestAPI
|
from pyhon.connection.api import TestAPI
|
||||||
|
from pyhon.exceptions import NoAuthenticationException
|
||||||
|
|
||||||
_LOGGER = logging.getLogger(__name__)
|
_LOGGER = logging.getLogger(__name__)
|
||||||
|
|
||||||
@ -43,7 +44,7 @@ class Hon:
|
|||||||
@property
|
@property
|
||||||
def api(self) -> HonAPI:
|
def api(self) -> HonAPI:
|
||||||
if self._api is None:
|
if self._api is None:
|
||||||
raise exceptions.NoAuthenticationException
|
raise NoAuthenticationException
|
||||||
return self._api
|
return self._api
|
||||||
|
|
||||||
@property
|
@property
|
||||||
|
@ -41,13 +41,15 @@ class HonParameterProgram(HonParameterEnum):
|
|||||||
|
|
||||||
@property
|
@property
|
||||||
def ids(self) -> Dict[int, str]:
|
def ids(self) -> Dict[int, str]:
|
||||||
values = {
|
values: Dict[int, str] = {}
|
||||||
int(p.parameters["prCode"].value): n
|
for name, parameter in self._programs.items():
|
||||||
for i, (n, p) in enumerate(self._programs.items())
|
if "iot_" in name:
|
||||||
if "iot_" not in n
|
continue
|
||||||
and p.parameters.get("prCode")
|
if parameter.parameters.get("prCode"):
|
||||||
and not ((fav := p.parameters.get("favourite")) and fav.value == "1")
|
continue
|
||||||
}
|
if (fav := parameter.parameters.get("favourite")) and fav.value == "1":
|
||||||
|
continue
|
||||||
|
values[int(parameter.parameters["prCode"].value)] = name
|
||||||
return dict(sorted(values.items()))
|
return dict(sorted(values.items()))
|
||||||
|
|
||||||
def set_value(self, value: str) -> None:
|
def set_value(self, value: str) -> None:
|
||||||
|
@ -16,9 +16,9 @@ class HonParameterRange(HonParameter):
|
|||||||
|
|
||||||
def _set_attributes(self) -> None:
|
def _set_attributes(self) -> None:
|
||||||
super()._set_attributes()
|
super()._set_attributes()
|
||||||
self._min = str_to_float(self._attributes["minimumValue"])
|
self._min = str_to_float(self._attributes.get("minimumValue", 0))
|
||||||
self._max = str_to_float(self._attributes["maximumValue"])
|
self._max = str_to_float(self._attributes.get("maximumValue", 0))
|
||||||
self._step = str_to_float(self._attributes["incrementValue"])
|
self._step = str_to_float(self._attributes.get("incrementValue", 0))
|
||||||
self._default = str_to_float(self._attributes.get("defaultValue", self.min))
|
self._default = str_to_float(self._attributes.get("defaultValue", self.min))
|
||||||
self._value = self._default
|
self._value = self._default
|
||||||
|
|
||||||
@ -64,9 +64,8 @@ class HonParameterRange(HonParameter):
|
|||||||
self._value = value
|
self._value = value
|
||||||
self.check_trigger(value)
|
self.check_trigger(value)
|
||||||
else:
|
else:
|
||||||
raise ValueError(
|
allowed = f"min {self.min} max {self.max} step {self.step}"
|
||||||
f"Allowed: min {self.min} max {self.max} step {self.step} But was: {value}"
|
raise ValueError(f"Allowed: {allowed} But was: {value}")
|
||||||
)
|
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def values(self) -> List[str]:
|
def values(self) -> List[str]:
|
||||||
|
@ -29,33 +29,26 @@ def pretty_print(
|
|||||||
whitespace: str = " ",
|
whitespace: str = " ",
|
||||||
) -> str:
|
) -> str:
|
||||||
result = ""
|
result = ""
|
||||||
if isinstance(data, list):
|
space = whitespace * intend
|
||||||
if key:
|
if isinstance(data, (dict, list)) and key:
|
||||||
result += f"{whitespace * intend}{'- ' if is_list else ''}{key}:\n"
|
result += f"{space}{'- ' if is_list else ''}{key}:\n"
|
||||||
intend += 1
|
intend += 1
|
||||||
|
if isinstance(data, list):
|
||||||
for i, value in enumerate(data):
|
for i, value in enumerate(data):
|
||||||
result += pretty_print(
|
result += pretty_print(
|
||||||
value, intend=intend, is_list=True, whitespace=whitespace
|
value, intend=intend, is_list=True, whitespace=whitespace
|
||||||
)
|
)
|
||||||
elif isinstance(data, dict):
|
elif isinstance(data, dict):
|
||||||
if key:
|
for i, (list_key, value) in enumerate(sorted(data.items())):
|
||||||
result += f"{whitespace * intend}{'- ' if is_list else ''}{key}:\n"
|
|
||||||
intend += 1
|
|
||||||
for i, (key, value) in enumerate(sorted(data.items())):
|
|
||||||
if is_list and not i:
|
|
||||||
result += pretty_print(
|
result += pretty_print(
|
||||||
value, key=key, intend=intend, is_list=True, whitespace=whitespace
|
value,
|
||||||
)
|
key=list_key,
|
||||||
elif is_list:
|
intend=intend + (is_list if i else 0),
|
||||||
result += pretty_print(
|
is_list=is_list and not i,
|
||||||
value, key=key, intend=intend + 1, whitespace=whitespace
|
whitespace=whitespace,
|
||||||
)
|
)
|
||||||
else:
|
else:
|
||||||
result += pretty_print(
|
result += f"{space}{'- ' if is_list else ''}{key}{': ' if key else ''}{data}\n"
|
||||||
value, key=key, intend=intend, whitespace=whitespace
|
|
||||||
)
|
|
||||||
else:
|
|
||||||
result += f"{whitespace * intend}{'- ' if is_list else ''}{key}{': ' if key else ''}{data}\n"
|
|
||||||
return result
|
return result
|
||||||
|
|
||||||
|
|
||||||
|
0
pyhon/py.typed
Normal file
0
pyhon/py.typed
Normal file
@ -3,6 +3,7 @@ from typing import List, Dict, TYPE_CHECKING, Any, Optional
|
|||||||
|
|
||||||
from pyhon.parameter.enum import HonParameterEnum
|
from pyhon.parameter.enum import HonParameterEnum
|
||||||
from pyhon.parameter.range import HonParameterRange
|
from pyhon.parameter.range import HonParameterRange
|
||||||
|
from pyhon.typedefs import Parameter
|
||||||
|
|
||||||
if TYPE_CHECKING:
|
if TYPE_CHECKING:
|
||||||
from pyhon.commands import HonCommand
|
from pyhon.commands import HonCommand
|
||||||
@ -24,6 +25,10 @@ class HonRuleSet:
|
|||||||
self._rules: Dict[str, List[HonRule]] = {}
|
self._rules: Dict[str, List[HonRule]] = {}
|
||||||
self._parse_rule(rule)
|
self._parse_rule(rule)
|
||||||
|
|
||||||
|
@property
|
||||||
|
def rules(self) -> Dict[str, List[HonRule]]:
|
||||||
|
return self._rules
|
||||||
|
|
||||||
def _parse_rule(self, rule: Dict[str, Any]) -> None:
|
def _parse_rule(self, rule: Dict[str, Any]) -> None:
|
||||||
for param_key, params in rule.items():
|
for param_key, params in rule.items():
|
||||||
param_key = self._command.appliance.options.get(param_key, param_key)
|
param_key = self._command.appliance.options.get(param_key, param_key)
|
||||||
@ -83,29 +88,43 @@ class HonRuleSet:
|
|||||||
for rule in rules:
|
for rule in rules:
|
||||||
self._rules.setdefault(key, []).append(rule)
|
self._rules.setdefault(key, []).append(rule)
|
||||||
|
|
||||||
def _add_trigger(self, parameter: "HonParameter", data: HonRule) -> None:
|
def _extra_rules_matches(self, rule: HonRule) -> bool:
|
||||||
def apply(rule: HonRule) -> None:
|
if rule.extras:
|
||||||
if rule.extras is not None:
|
|
||||||
for key, value in rule.extras.items():
|
for key, value in rule.extras.items():
|
||||||
|
if not self._command.parameters.get(key):
|
||||||
|
return False
|
||||||
if str(self._command.parameters.get(key)) != str(value):
|
if str(self._command.parameters.get(key)) != str(value):
|
||||||
return
|
return False
|
||||||
if param := self._command.parameters.get(rule.param_key):
|
return True
|
||||||
if value := rule.param_data.get("fixedValue", ""):
|
|
||||||
if isinstance(param, HonParameterEnum) and set(param.values) != {
|
def _apply_fixed(self, param: Parameter, value: str | float) -> None:
|
||||||
str(value)
|
if isinstance(param, HonParameterEnum) and set(param.values) != {str(value)}:
|
||||||
}:
|
|
||||||
param.values = [str(value)]
|
param.values = [str(value)]
|
||||||
|
param.value = str(value)
|
||||||
elif isinstance(param, HonParameterRange):
|
elif isinstance(param, HonParameterRange):
|
||||||
param.value = float(value)
|
param.value = float(value)
|
||||||
return
|
return
|
||||||
param.value = str(value)
|
param.value = str(value)
|
||||||
elif rule.param_data.get("typology") == "enum":
|
|
||||||
if isinstance(param, HonParameterEnum):
|
def _apply_enum(self, param: Parameter, rule: HonRule) -> None:
|
||||||
|
if not isinstance(param, HonParameterEnum):
|
||||||
|
return
|
||||||
if enum_values := rule.param_data.get("enumValues"):
|
if enum_values := rule.param_data.get("enumValues"):
|
||||||
param.values = enum_values.split("|")
|
param.values = enum_values.split("|")
|
||||||
if default_value := rule.param_data.get("defaultValue"):
|
if default_value := rule.param_data.get("defaultValue"):
|
||||||
param.value = default_value
|
param.value = default_value
|
||||||
|
|
||||||
|
def _add_trigger(self, parameter: "HonParameter", data: HonRule) -> None:
|
||||||
|
def apply(rule: HonRule) -> None:
|
||||||
|
if not self._extra_rules_matches(rule):
|
||||||
|
return
|
||||||
|
if not (param := self._command.parameters.get(rule.param_key)):
|
||||||
|
return
|
||||||
|
if fixed_value := rule.param_data.get("fixedValue", ""):
|
||||||
|
self._apply_fixed(param, fixed_value)
|
||||||
|
elif rule.param_data.get("typology") == "enum":
|
||||||
|
self._apply_enum(param, rule)
|
||||||
|
|
||||||
parameter.add_trigger(data.trigger_value, apply, data)
|
parameter.add_trigger(data.trigger_value, apply, data)
|
||||||
|
|
||||||
def patch(self) -> None:
|
def patch(self) -> None:
|
||||||
|
@ -11,7 +11,7 @@ if TYPE_CHECKING:
|
|||||||
from pyhon.parameter.range import HonParameterRange
|
from pyhon.parameter.range import HonParameterRange
|
||||||
|
|
||||||
|
|
||||||
class Callback(Protocol):
|
class Callback(Protocol): # pylint: disable=too-few-public-methods
|
||||||
def __call__(
|
def __call__(
|
||||||
self, url: str | URL, *args: Any, **kwargs: Any
|
self, url: str | URL, *args: Any, **kwargs: Any
|
||||||
) -> aiohttp.client._RequestContextManager:
|
) -> aiohttp.client._RequestContextManager:
|
||||||
|
@ -1,3 +1,3 @@
|
|||||||
aiohttp==3.8.4
|
aiohttp==3.8.5
|
||||||
yarl==1.8.2
|
yarl==1.9.2
|
||||||
typing-extensions==4.7.1
|
typing-extensions==4.7.1
|
||||||
|
@ -1,4 +1,4 @@
|
|||||||
black==23.3.0
|
black==23.7.0
|
||||||
flake8==6.0.0
|
flake8==6.0.0
|
||||||
mypy==1.2.0
|
mypy==1.4.1
|
||||||
pylint==2.17.2
|
pylint==2.17.4
|
||||||
|
4
setup.py
4
setup.py
@ -7,7 +7,7 @@ with open("README.md", "r", encoding="utf-8") as f:
|
|||||||
|
|
||||||
setup(
|
setup(
|
||||||
name="pyhOn",
|
name="pyhOn",
|
||||||
version="0.14.9",
|
version="0.15.5",
|
||||||
author="Andre Basche",
|
author="Andre Basche",
|
||||||
description="Control hOn devices with python",
|
description="Control hOn devices with python",
|
||||||
long_description=long_description,
|
long_description=long_description,
|
||||||
@ -21,7 +21,7 @@ setup(
|
|||||||
packages=find_packages(),
|
packages=find_packages(),
|
||||||
include_package_data=True,
|
include_package_data=True,
|
||||||
python_requires=">=3.10",
|
python_requires=">=3.10",
|
||||||
install_requires=["aiohttp==3.8.4", "typing-extensions==4.7.1"],
|
install_requires=["aiohttp==3.8.5", "typing-extensions==4.7.1"],
|
||||||
classifiers=[
|
classifiers=[
|
||||||
"Development Status :: 4 - Beta",
|
"Development Status :: 4 - Beta",
|
||||||
"Environment :: Console",
|
"Environment :: Console",
|
||||||
|
Loading…
Reference in New Issue
Block a user