Compare commits

...

8 Commits

Author SHA1 Message Date
5575b240e1 Bump version 2023-07-18 21:32:12 +02:00
22367242a2 Add flake8 config 2023-07-18 21:31:26 +02:00
4f7d4860db Water Heater. Ability to send only mandatory parameters (#14)
* Added water heater appliance. Added ability to send only mandatory parameters

* fixed build

* formatting

* cleanup

* cleanup

* reformatting

* Added ability to send specific parameters. Useful in case the command has many not mandatory parameters and you want to send only one/few

* cleanup

* Fixed code style

---------

Co-authored-by: Vadym Melnychuk <vme@primexm.com>
2023-07-18 21:26:11 +02:00
5a778373b6 Enable more pylint checks 2023-07-16 05:53:23 +02:00
e1c8bc5835 Reduce complexity and line length for flake8 2023-07-16 04:42:29 +02:00
442e7a07dd Fix pylint check 2023-07-14 00:40:48 +02:00
b0e3b15ff0 Add pylint checks 2023-07-12 19:36:32 +02:00
2788a3ac91 Reset commands to defaults 2023-07-12 00:05:27 +02:00
29 changed files with 259 additions and 152 deletions

3
.flake8 Normal file
View File

@ -0,0 +1,3 @@
[flake8]
max-line-length = 88
max-complexity = 7

View File

@ -28,15 +28,13 @@ jobs:
python -m pip install -r requirements_dev.txt python -m pip install -r requirements_dev.txt
- name: Lint with flake8 - name: Lint with flake8
run: | run: |
# stop the build if there are Python syntax errors or undefined names flake8 . --count --statistics
flake8 . --count --select=E9,F63,F7,F82 --show-source --statistics
flake8 . --count --exit-zero --max-complexity=10 --max-line-length=88 --statistics
- name: Type check with mypy - name: Type check with mypy
run: | run: |
mypy pyhon/ mypy pyhon/
# - name: Analysing the code with pylint - name: Analysing the code with pylint
# run: | run: |
# pylint --max-line-length 88 $(git ls-files '*.py') pylint $(git ls-files '*.py')
- name: Check black style - name: Check black style
run: | run: |
black . --check black . --check

9
.pylintrc Normal file
View File

@ -0,0 +1,9 @@
[MESSAGES CONTROL]
disable=missing-docstring
[FORMAT]
max-args=6
max-attributes=8
max-line-length=88

View File

@ -11,6 +11,7 @@ from typing import Tuple, Dict, Any
if __name__ == "__main__": if __name__ == "__main__":
sys.path.insert(0, str(Path(__file__).parent.parent)) sys.path.insert(0, str(Path(__file__).parent.parent))
# pylint: disable=wrong-import-position
from pyhon import Hon, HonAPI, diagnose, printer from pyhon import Hon, HonAPI, diagnose, printer
_LOGGER = logging.getLogger(__name__) _LOGGER = logging.getLogger(__name__)
@ -30,13 +31,13 @@ def get_arguments() -> Dict[str, Any]:
export.add_argument("--zip", help="create zip archive", action="store_true") export.add_argument("--zip", help="create zip archive", action="store_true")
export.add_argument("--anonymous", help="anonymize data", action="store_true") export.add_argument("--anonymous", help="anonymize data", action="store_true")
export.add_argument("directory", nargs="?", default=Path().cwd()) export.add_argument("directory", nargs="?", default=Path().cwd())
translate = subparser.add_parser( translation = subparser.add_parser(
"translate", help="print available translation keys" "translate", help="print available translation keys"
) )
translate.add_argument( translation.add_argument(
"translate", help="language (de, en, fr...)", metavar="LANGUAGE" "translate", help="language (de, en, fr...)", metavar="LANGUAGE"
) )
translate.add_argument("--json", help="print as json", action="store_true") translation.add_argument("--json", help="print as json", action="store_true")
parser.add_argument( parser.add_argument(
"-i", "--import", help="import pyhon data", nargs="?", default=Path().cwd() "-i", "--import", help="import pyhon data", nargs="?", default=Path().cwd()
) )
@ -91,11 +92,9 @@ async def main() -> None:
data = device.data.copy() data = device.data.copy()
attr = "get" if args.get("all") else "pop" attr = "get" if args.get("all") else "pop"
print( print(
printer.key_print( printer.key_print(getattr(data["attributes"], attr)("parameters"))
data["attributes"].__getattribute__(attr)("parameters")
)
) )
print(printer.key_print(data.__getattribute__(attr)("appliance"))) print(printer.key_print(getattr(data, attr)("appliance")))
print(printer.key_print(data)) print(printer.key_print(data))
print( print(
printer.pretty_print( printer.pretty_print(

View File

@ -21,6 +21,7 @@ if TYPE_CHECKING:
_LOGGER = logging.getLogger(__name__) _LOGGER = logging.getLogger(__name__)
# pylint: disable=too-many-public-methods,too-many-instance-attributes
class HonAppliance: class HonAppliance:
_MINIMAL_UPDATE_INTERVAL = 5 # seconds _MINIMAL_UPDATE_INTERVAL = 5 # seconds
@ -188,12 +189,8 @@ class HonAppliance:
async def update(self, force: bool = False) -> None: async def update(self, force: bool = False) -> None:
now = datetime.now() now = datetime.now()
if ( min_age = now - timedelta(seconds=self._MINIMAL_UPDATE_INTERVAL)
force if force or not self._last_update or self._last_update < min_age:
or not self._last_update
or self._last_update
< now - timedelta(seconds=self._MINIMAL_UPDATE_INTERVAL)
):
self._last_update = now self._last_update = now
await self.load_attributes() await self.load_attributes()
self.sync_params_to_command("settings") self.sync_params_to_command("settings")
@ -272,18 +269,23 @@ class HonAppliance:
for command, data in self.commands.items(): for command, data in self.commands.items():
if command == main or target and command not in target: if command == main or target and command not in target:
continue continue
for name, parameter in data.parameters.items():
if base_value := base.parameters.get(name): for name, target_param in data.parameters.items():
if isinstance(base_value, HonParameterRange) and isinstance( if not (base_param := base.parameters.get(name)):
parameter, HonParameterRange return
): self.sync_parameter(base_param, target_param)
parameter.max = base_value.max
parameter.min = base_value.min def sync_parameter(self, main: Parameter, target: Parameter) -> None:
parameter.step = base_value.step if isinstance(main, HonParameterRange) and isinstance(
elif isinstance(parameter, HonParameterRange): target, HonParameterRange
parameter.max = int(base_value.value) ):
parameter.min = int(base_value.value) target.max = main.max
parameter.step = 1 target.min = main.min
elif isinstance(parameter, HonParameterEnum): target.step = main.step
parameter.values = base_value.values elif isinstance(target, HonParameterRange):
parameter.value = base_value.value target.max = int(main.value)
target.min = int(main.value)
target.step = 1
elif isinstance(target, HonParameterEnum):
target.values = main.values
target.value = main.value

View File

@ -1,3 +1,4 @@
# pylint: disable=duplicate-code
from typing import Any, Dict from typing import Any, Dict
from pyhon.appliances.base import ApplianceBase from pyhon.appliances.base import ApplianceBase

View File

@ -1,7 +1,6 @@
from typing import Any, Dict from typing import Any, Dict
from pyhon.appliances.base import ApplianceBase from pyhon.appliances.base import ApplianceBase
from pyhon.parameter.program import HonParameterProgram
class Appliance(ApplianceBase): class Appliance(ApplianceBase):

View File

@ -1,3 +1,4 @@
# pylint: disable=duplicate-code
from typing import Any, Dict from typing import Any, Dict
from pyhon.appliances.base import ApplianceBase from pyhon.appliances.base import ApplianceBase

View File

@ -1,3 +1,4 @@
# pylint: disable=duplicate-code
from typing import Dict, Any from typing import Dict, Any
from pyhon.appliances.base import ApplianceBase from pyhon.appliances.base import ApplianceBase

11
pyhon/appliances/wh.py Normal file
View File

@ -0,0 +1,11 @@
from typing import Any, Dict
from pyhon.appliances.base import ApplianceBase
class Appliance(ApplianceBase):
def attributes(self, data: Dict[str, Any]) -> Dict[str, Any]:
data = super().attributes(data)
data["active"] = data["parameters"]["onOffStatus"] == "1"
return data

View File

@ -1,3 +1,4 @@
# pylint: disable=duplicate-code
from typing import Any, Dict from typing import Any, Dict
from pyhon.appliances.base import ApplianceBase from pyhon.appliances.base import ApplianceBase

View File

@ -1,14 +1,15 @@
import asyncio import asyncio
from contextlib import suppress from contextlib import suppress
from copy import copy from copy import copy
from typing import Dict, Any, Optional, TYPE_CHECKING, List, Collection from typing import Dict, Any, Optional, TYPE_CHECKING, List
from pyhon.commands import HonCommand from pyhon.commands import HonCommand
from pyhon.exceptions import NoAuthenticationException
from pyhon.parameter.fixed import HonParameterFixed from pyhon.parameter.fixed import HonParameterFixed
from pyhon.parameter.program import HonParameterProgram from pyhon.parameter.program import HonParameterProgram
if TYPE_CHECKING: if TYPE_CHECKING:
from pyhon import HonAPI, exceptions from pyhon import HonAPI
from pyhon.appliance import HonAppliance from pyhon.appliance import HonAppliance
@ -16,12 +17,12 @@ class HonCommandLoader:
"""Loads and parses hOn command data""" """Loads and parses hOn command data"""
def __init__(self, api: "HonAPI", appliance: "HonAppliance") -> None: def __init__(self, api: "HonAPI", appliance: "HonAppliance") -> None:
self._api: "HonAPI" = api
self._appliance: "HonAppliance" = appliance
self._api_commands: Dict[str, Any] = {} self._api_commands: Dict[str, Any] = {}
self._favourites: List[Dict[str, Any]] = [] self._favourites: List[Dict[str, Any]] = []
self._command_history: List[Dict[str, Any]] = [] self._command_history: List[Dict[str, Any]] = []
self._commands: Dict[str, HonCommand] = {} self._commands: Dict[str, HonCommand] = {}
self._api: "HonAPI" = api
self._appliance: "HonAppliance" = appliance
self._appliance_data: Dict[str, Any] = {} self._appliance_data: Dict[str, Any] = {}
self._additional_data: Dict[str, Any] = {} self._additional_data: Dict[str, Any] = {}
@ -29,7 +30,7 @@ class HonCommandLoader:
def api(self) -> "HonAPI": def api(self) -> "HonAPI":
"""api connection object""" """api connection object"""
if self._api is None: if self._api is None:
raise exceptions.NoAuthenticationException("Missing hOn login") raise NoAuthenticationException("Missing hOn login")
return self._api return self._api
@property @property

View File

@ -27,17 +27,16 @@ class HonCommand:
categories: Optional[Dict[str, "HonCommand"]] = None, categories: Optional[Dict[str, "HonCommand"]] = None,
category_name: str = "", category_name: str = "",
): ):
self._api: Optional[HonAPI] = appliance.api
self._appliance: "HonAppliance" = appliance
self._name: str = name self._name: str = name
self._api: Optional[HonAPI] = None
self._appliance: "HonAppliance" = appliance
self._categories: Optional[Dict[str, "HonCommand"]] = categories self._categories: Optional[Dict[str, "HonCommand"]] = categories
self._category_name: str = category_name self._category_name: str = category_name
self._description: str = attributes.pop("description", "") self._parameters: Dict[str, Parameter] = {}
self._protocol_type: str = attributes.pop("protocolType", "")
self._parameters: Dict[str, HonParameter] = {}
self._data: Dict[str, Any] = {} self._data: Dict[str, Any] = {}
self._available_settings: Dict[str, HonParameter] = {}
self._rules: List[HonRuleSet] = [] self._rules: List[HonRuleSet] = []
attributes.pop("description", "")
attributes.pop("protocolType", "")
self._load_parameters(attributes) self._load_parameters(attributes)
def __repr__(self) -> str: def __repr__(self) -> str:
@ -49,6 +48,8 @@ class HonCommand:
@property @property
def api(self) -> "HonAPI": def api(self) -> "HonAPI":
if self._api is None and self._appliance:
self._api = self._appliance.api
if self._api is None: if self._api is None:
raise exceptions.NoAuthenticationException("Missing hOn login") raise exceptions.NoAuthenticationException("Missing hOn login")
return self._api return self._api
@ -76,6 +77,14 @@ class HonCommand:
result.setdefault(parameter.group, {})[name] = parameter.intern_value result.setdefault(parameter.group, {})[name] = parameter.intern_value
return result return result
@property
def mandatory_parameter_groups(self) -> Dict[str, Dict[str, Union[str, float]]]:
result: Dict[str, Dict[str, Union[str, float]]] = {}
for name, parameter in self._parameters.items():
if parameter.mandatory:
result.setdefault(parameter.group, {})[name] = parameter.intern_value
return result
@property @property
def parameter_value(self) -> Dict[str, Union[str, float]]: def parameter_value(self) -> Dict[str, Union[str, float]]:
return {n: p.value for n, p in self._parameters.items()} return {n: p.value for n, p in self._parameters.items()}
@ -111,8 +120,21 @@ class HonCommand:
name = "program" if "PROGRAM" in self._category_name else "category" name = "program" if "PROGRAM" in self._category_name else "category"
self._parameters[name] = HonParameterProgram(name, self, "custom") self._parameters[name] = HonParameterProgram(name, self, "custom")
async def send(self) -> bool: async def send(self, only_mandatory: bool = False) -> bool:
params = self.parameter_groups.get("parameters", {}) grouped_params = (
self.mandatory_parameter_groups if only_mandatory else self.parameter_groups
)
params = grouped_params.get("parameters", {})
return await self.send_parameters(params)
async def send_specific(self, param_names: List[str]) -> bool:
params: Dict[str, str | float] = {}
for key, parameter in self._parameters.items():
if key in param_names:
params[key] = parameter.value
return await self.send_parameters(params)
async def send_parameters(self, params: Dict[str, str | float]) -> bool:
ancillary_params = self.parameter_groups.get("ancillaryParameters", {}) ancillary_params = self.parameter_groups.get("ancillaryParameters", {})
ancillary_params.pop("programRules", None) ancillary_params.pop("programRules", None)
self.appliance.sync_command_to_params(self.name) self.appliance.sync_command_to_params(self.name)
@ -169,3 +191,7 @@ class HonCommand:
else: else:
result[name] = parameter result[name] = parameter
return result return result
def reset(self) -> None:
for parameter in self._parameters.values():
parameter.reset()

View File

@ -30,6 +30,14 @@ class HonLoginData:
loaded: Optional[Dict[str, Any]] = None loaded: Optional[Dict[str, Any]] = None
@dataclass
class HonAuthData:
access_token: str = ""
refresh_token: str = ""
cognito_token: str = ""
id_token: str = ""
class HonAuth: class HonAuth:
_TOKEN_EXPIRES_AFTER_HOURS = 8 _TOKEN_EXPIRES_AFTER_HOURS = 8
_TOKEN_EXPIRE_WARNING_HOURS = 7 _TOKEN_EXPIRE_WARNING_HOURS = 7
@ -46,28 +54,25 @@ class HonAuth:
self._login_data = HonLoginData() self._login_data = HonLoginData()
self._login_data.email = email self._login_data.email = email
self._login_data.password = password self._login_data.password = password
self._access_token = ""
self._refresh_token = ""
self._cognito_token = ""
self._id_token = ""
self._device = device self._device = device
self._expires: datetime = datetime.utcnow() self._expires: datetime = datetime.utcnow()
self._auth = HonAuthData()
@property @property
def cognito_token(self) -> str: def cognito_token(self) -> str:
return self._cognito_token return self._auth.cognito_token
@property @property
def id_token(self) -> str: def id_token(self) -> str:
return self._id_token return self._auth.id_token
@property @property
def access_token(self) -> str: def access_token(self) -> str:
return self._access_token return self._auth.access_token
@property @property
def refresh_token(self) -> str: def refresh_token(self) -> str:
return self._refresh_token return self._auth.refresh_token
def _check_token_expiration(self, hours: int) -> bool: def _check_token_expiration(self, hours: int) -> bool:
return datetime.utcnow() >= self._expires + timedelta(hours=hours) return datetime.utcnow() >= self._expires + timedelta(hours=hours)
@ -192,12 +197,12 @@ class HonAuth:
def _parse_token_data(self, text: str) -> bool: def _parse_token_data(self, text: str) -> bool:
if access_token := re.findall("access_token=(.*?)&", text): if access_token := re.findall("access_token=(.*?)&", text):
self._access_token = access_token[0] self._auth.access_token = access_token[0]
if refresh_token := re.findall("refresh_token=(.*?)&", text): if refresh_token := re.findall("refresh_token=(.*?)&", text):
self._refresh_token = refresh_token[0] self._auth.refresh_token = refresh_token[0]
if id_token := re.findall("id_token=(.*?)&", text): if id_token := re.findall("id_token=(.*?)&", text):
self._id_token = id_token[0] self._auth.id_token = id_token[0]
return True if access_token and refresh_token and id_token else False return bool(access_token and refresh_token and id_token)
async def _get_token(self, url: str) -> bool: async def _get_token(self, url: str) -> bool:
async with self._request.get(url) as response: async with self._request.get(url) as response:
@ -229,7 +234,7 @@ class HonAuth:
return True return True
async def _api_auth(self) -> bool: async def _api_auth(self) -> bool:
post_headers = {"id-token": self._id_token} post_headers = {"id-token": self._auth.id_token}
data = self._device.get() data = self._device.get()
async with self._request.post( async with self._request.post(
f"{const.API_URL}/auth/v1/login", headers=post_headers, json=data f"{const.API_URL}/auth/v1/login", headers=post_headers, json=data
@ -239,8 +244,8 @@ class HonAuth:
except json.JSONDecodeError: except json.JSONDecodeError:
await self._error_logger(response) await self._error_logger(response)
return False return False
self._cognito_token = json_data.get("cognitoUser", {}).get("Token", "") self._auth.cognito_token = json_data.get("cognitoUser", {}).get("Token", "")
if not self._cognito_token: if not self._auth.cognito_token:
_LOGGER.error(json_data) _LOGGER.error(json_data)
raise exceptions.HonAuthenticationError() raise exceptions.HonAuthenticationError()
return True return True
@ -262,7 +267,7 @@ class HonAuth:
async def refresh(self) -> bool: async def refresh(self) -> bool:
params = { params = {
"client_id": const.CLIENT_ID, "client_id": const.CLIENT_ID,
"refresh_token": self._refresh_token, "refresh_token": self._auth.refresh_token,
"grant_type": "refresh_token", "grant_type": "refresh_token",
} }
async with self._request.post( async with self._request.post(
@ -273,14 +278,14 @@ class HonAuth:
return False return False
data = await response.json() data = await response.json()
self._expires = datetime.utcnow() self._expires = datetime.utcnow()
self._id_token = data["id_token"] self._auth.id_token = data["id_token"]
self._access_token = data["access_token"] self._auth.access_token = data["access_token"]
return await self._api_auth() return await self._api_auth()
def clear(self) -> None: def clear(self) -> None:
self._session.cookie_jar.clear_domain(const.AUTH_API.split("/")[-2]) self._session.cookie_jar.clear_domain(const.AUTH_API.split("/")[-2])
self._request.called_urls = [] self._request.called_urls = []
self._cognito_token = "" self._auth.cognito_token = ""
self._id_token = "" self._auth.id_token = ""
self._access_token = "" self._auth.access_token = ""
self._refresh_token = "" self._auth.refresh_token = ""

View File

@ -21,7 +21,7 @@ class HonDevice:
return self._os_version return self._os_version
@property @property
def os(self) -> str: def os_type(self) -> str:
return self._os return self._os
@property @property
@ -36,7 +36,7 @@ class HonDevice:
result: Dict[str, str | int] = { result: Dict[str, str | int] = {
"appVersion": self.app_version, "appVersion": self.app_version,
"mobileId": self.mobile_id, "mobileId": self.mobile_id,
"os": self.os, "os": self.os_type,
"osVersion": self.os_version, "osVersion": self.os_version,
"deviceModel": self.device_model, "deviceModel": self.device_model,
} }

View File

@ -2,7 +2,7 @@ import logging
from collections.abc import AsyncIterator from collections.abc import AsyncIterator
from contextlib import asynccontextmanager from contextlib import asynccontextmanager
from types import TracebackType from types import TracebackType
from typing import Optional, Dict, Type, Any, Protocol from typing import Optional, Dict, Type, Any
import aiohttp import aiohttp
from typing_extensions import Self from typing_extensions import Self
@ -47,10 +47,11 @@ class ConnectionHandler:
return self return self
@asynccontextmanager @asynccontextmanager
def _intercept( async def _intercept(
self, method: Callback, url: str | URL, *args: Any, **kwargs: Dict[str, Any] self, method: Callback, url: str | URL, *args: Any, **kwargs: Dict[str, Any]
) -> AsyncIterator[aiohttp.ClientResponse]: ) -> AsyncIterator[aiohttp.ClientResponse]:
raise NotImplementedError async with method(url, *args, **kwargs) as response:
yield response
@asynccontextmanager @asynccontextmanager
async def get( async def get(
@ -59,7 +60,8 @@ class ConnectionHandler:
if self._session is None: if self._session is None:
raise exceptions.NoSessionException() raise exceptions.NoSessionException()
response: aiohttp.ClientResponse response: aiohttp.ClientResponse
async with self._intercept(self._session.get, *args, **kwargs) as response: # type: ignore[arg-type] args = self._session.get, *args
async with self._intercept(*args, **kwargs) as response:
yield response yield response
@asynccontextmanager @asynccontextmanager
@ -69,7 +71,8 @@ class ConnectionHandler:
if self._session is None: if self._session is None:
raise exceptions.NoSessionException() raise exceptions.NoSessionException()
response: aiohttp.ClientResponse response: aiohttp.ClientResponse
async with self._intercept(self._session.post, *args, **kwargs) as response: # type: ignore[arg-type] args = self._session.post, *args
async with self._intercept(*args, **kwargs) as response:
yield response yield response
async def close(self) -> None: async def close(self) -> None:

View File

@ -95,11 +95,11 @@ class HonConnectionHandler(ConnectionHandler):
try: try:
await response.json() await response.json()
yield response yield response
except json.JSONDecodeError: except json.JSONDecodeError as exc:
_LOGGER.warning( _LOGGER.warning(
"%s - JsonDecodeError %s - %s", "%s - JsonDecodeError %s - %s",
response.request_info.url, response.request_info.url,
response.status, response.status,
await response.text(), await response.text(),
) )
raise HonAuthenticationError("Decode Error") raise HonAuthenticationError("Decode Error") from exc

View File

@ -2,8 +2,10 @@ AUTH_API = "https://account2.hon-smarthome.com"
API_URL = "https://api-iot.he.services" API_URL = "https://api-iot.he.services"
API_KEY = "GRCqFhC6Gk@ikWXm1RmnSmX1cm,MxY-configuration" API_KEY = "GRCqFhC6Gk@ikWXm1RmnSmX1cm,MxY-configuration"
APP = "hon" APP = "hon"
# All seen id's (different accounts, different devices) are the same, so I guess this hash is static CLIENT_ID = (
CLIENT_ID = "3MVG9QDx8IX8nP5T2Ha8ofvlmjLZl5L_gvfbT9.HJvpHGKoAS_dcMN8LYpTSYeVFCraUnV.2Ag1Ki7m4znVO6" "3MVG9QDx8IX8nP5T2Ha8ofvlmjLZl5L_gvfbT9."
"HJvpHGKoAS_dcMN8LYpTSYeVFCraUnV.2Ag1Ki7m4znVO6"
)
APP_VERSION = "2.1.2" APP_VERSION = "2.1.2"
OS_VERSION = 31 OS_VERSION = 31
OS = "android" OS = "android"

View File

@ -7,9 +7,10 @@ from typing import List, Optional, Dict, Any, Type
from aiohttp import ClientSession from aiohttp import ClientSession
from typing_extensions import Self from typing_extensions import Self
from pyhon import HonAPI, exceptions
from pyhon.appliance import HonAppliance from pyhon.appliance import HonAppliance
from pyhon.connection.api import HonAPI
from pyhon.connection.api import TestAPI from pyhon.connection.api import TestAPI
from pyhon.exceptions import NoAuthenticationException
_LOGGER = logging.getLogger(__name__) _LOGGER = logging.getLogger(__name__)
@ -43,7 +44,7 @@ class Hon:
@property @property
def api(self) -> HonAPI: def api(self) -> HonAPI:
if self._api is None: if self._api is None:
raise exceptions.NoAuthenticationException raise NoAuthenticationException
return self._api return self._api
@property @property

View File

@ -7,14 +7,21 @@ if TYPE_CHECKING:
class HonParameter: class HonParameter:
def __init__(self, key: str, attributes: Dict[str, Any], group: str) -> None: def __init__(self, key: str, attributes: Dict[str, Any], group: str) -> None:
self._key = key self._key = key
self._category: str = attributes.get("category", "") self._attributes = attributes
self._typology: str = attributes.get("typology", "") self._category: str = ""
self._mandatory: int = attributes.get("mandatory", 0) self._typology: str = ""
self._mandatory: int = 0
self._value: str | float = "" self._value: str | float = ""
self._group: str = group self._group: str = group
self._triggers: Dict[ self._triggers: Dict[
str, List[Tuple[Callable[["HonRule"], None], "HonRule"]] str, List[Tuple[Callable[["HonRule"], None], "HonRule"]]
] = {} ] = {}
self._set_attributes()
def _set_attributes(self) -> None:
self._category = self._attributes.get("category", "")
self._typology = self._attributes.get("typology", "")
self._mandatory = self._attributes.get("mandatory", 0)
@property @property
def key(self) -> str: def key(self) -> str:
@ -85,3 +92,6 @@ class HonParameter:
param[rule.param_key] = rule.param_data.get("defaultValue", "") param[rule.param_key] = rule.param_data.get("defaultValue", "")
return result return result
def reset(self) -> None:
self._set_attributes()

View File

@ -10,12 +10,19 @@ def clean_value(value: str | float) -> str:
class HonParameterEnum(HonParameter): class HonParameterEnum(HonParameter):
def __init__(self, key: str, attributes: Dict[str, Any], group: str) -> None: def __init__(self, key: str, attributes: Dict[str, Any], group: str) -> None:
super().__init__(key, attributes, group) super().__init__(key, attributes, group)
self._default = attributes.get("defaultValue") self._default: str | float = ""
self._value = self._default or "0" self._value: str | float = ""
self._values: List[str] = attributes.get("enumValues", []) self._values: List[str] = []
self._set_attributes()
if self._default and clean_value(self._default.strip("[]")) not in self.values: if self._default and clean_value(self._default.strip("[]")) not in self.values:
self._values.append(self._default) self._values.append(self._default)
def _set_attributes(self) -> None:
super()._set_attributes()
self._default = self._attributes.get("defaultValue", "")
self._value = self._default or "0"
self._values = self._attributes.get("enumValues", [])
def __repr__(self) -> str: def __repr__(self) -> str:
return f"{self.__class__} (<{self.key}> {self.values})" return f"{self.__class__} (<{self.key}> {self.values})"

View File

@ -6,7 +6,12 @@ from pyhon.parameter.base import HonParameter
class HonParameterFixed(HonParameter): class HonParameterFixed(HonParameter):
def __init__(self, key: str, attributes: Dict[str, Any], group: str) -> None: def __init__(self, key: str, attributes: Dict[str, Any], group: str) -> None:
super().__init__(key, attributes, group) super().__init__(key, attributes, group)
self._value = attributes.get("fixedValue", None) self._value: str | float = ""
self._set_attributes()
def _set_attributes(self) -> None:
super()._set_attributes()
self._value = self._attributes.get("fixedValue", "")
def __repr__(self) -> str: def __repr__(self) -> str:
return f"{self.__class__} (<{self.key}> fixed)" return f"{self.__class__} (<{self.key}> fixed)"

View File

@ -37,17 +37,19 @@ class HonParameterProgram(HonParameterEnum):
@values.setter @values.setter
def values(self, values: List[str]) -> None: def values(self, values: List[str]) -> None:
return raise ValueError("Cant set values {values}")
@property @property
def ids(self) -> Dict[int, str]: def ids(self) -> Dict[int, str]:
values = { values: Dict[int, str] = {}
int(p.parameters["prCode"].value): n for name, parameter in self._programs.items():
for i, (n, p) in enumerate(self._programs.items()) if "iot_" in name:
if "iot_" not in n continue
and p.parameters.get("prCode") if parameter.parameters.get("prCode"):
and not ((fav := p.parameters.get("favourite")) and fav.value == "1") continue
} if (fav := parameter.parameters.get("favourite")) and fav.value == "1":
continue
values[int(parameter.parameters["prCode"].value)] = name
return dict(sorted(values.items())) return dict(sorted(values.items()))
def set_value(self, value: str) -> None: def set_value(self, value: str) -> None:

View File

@ -7,11 +7,20 @@ from pyhon.parameter.base import HonParameter
class HonParameterRange(HonParameter): class HonParameterRange(HonParameter):
def __init__(self, key: str, attributes: Dict[str, Any], group: str) -> None: def __init__(self, key: str, attributes: Dict[str, Any], group: str) -> None:
super().__init__(key, attributes, group) super().__init__(key, attributes, group)
self._min: float = str_to_float(attributes["minimumValue"]) self._min: float = 0
self._max: float = str_to_float(attributes["maximumValue"]) self._max: float = 0
self._step: float = str_to_float(attributes["incrementValue"]) self._step: float = 0
self._default: float = str_to_float(attributes.get("defaultValue", self.min)) self._default: float = 0
self._value: float = self._default self._value: float = 0
self._set_attributes()
def _set_attributes(self) -> None:
super()._set_attributes()
self._min = str_to_float(self._attributes.get("minimumValue", 0))
self._max = str_to_float(self._attributes.get("maximumValue", 0))
self._step = str_to_float(self._attributes.get("incrementValue", 0))
self._default = str_to_float(self._attributes.get("defaultValue", self.min))
self._value = self._default
def __repr__(self) -> str: def __repr__(self) -> str:
return f"{self.__class__} (<{self.key}> [{self.min} - {self.max}])" return f"{self.__class__} (<{self.key}> [{self.min} - {self.max}])"
@ -55,9 +64,8 @@ class HonParameterRange(HonParameter):
self._value = value self._value = value
self.check_trigger(value) self.check_trigger(value)
else: else:
raise ValueError( allowed = f"min {self.min} max {self.max} step {self.step}"
f"Allowed: min {self.min} max {self.max} step {self.step} But was: {value}" raise ValueError(f"Allowed: {allowed} But was: {value}")
)
@property @property
def values(self) -> List[str]: def values(self) -> List[str]:

View File

@ -29,33 +29,26 @@ def pretty_print(
whitespace: str = " ", whitespace: str = " ",
) -> str: ) -> str:
result = "" result = ""
space = whitespace * intend
if isinstance(data, (dict, list)) and key:
result += f"{space}{'- ' if is_list else ''}{key}:\n"
intend += 1
if isinstance(data, list): if isinstance(data, list):
if key:
result += f"{whitespace * intend}{'- ' if is_list else ''}{key}:\n"
intend += 1
for i, value in enumerate(data): for i, value in enumerate(data):
result += pretty_print( result += pretty_print(
value, intend=intend, is_list=True, whitespace=whitespace value, intend=intend, is_list=True, whitespace=whitespace
) )
elif isinstance(data, dict): elif isinstance(data, dict):
if key: for i, (list_key, value) in enumerate(sorted(data.items())):
result += f"{whitespace * intend}{'- ' if is_list else ''}{key}:\n" result += pretty_print(
intend += 1 value,
for i, (key, value) in enumerate(sorted(data.items())): key=list_key,
if is_list and not i: intend=intend + (is_list if i else 0),
result += pretty_print( is_list=is_list and not i,
value, key=key, intend=intend, is_list=True, whitespace=whitespace whitespace=whitespace,
) )
elif is_list:
result += pretty_print(
value, key=key, intend=intend + 1, whitespace=whitespace
)
else:
result += pretty_print(
value, key=key, intend=intend, whitespace=whitespace
)
else: else:
result += f"{whitespace * intend}{'- ' if is_list else ''}{key}{': ' if key else ''}{data}\n" result += f"{space}{'- ' if is_list else ''}{key}{': ' if key else ''}{data}\n"
return result return result

View File

@ -3,6 +3,7 @@ from typing import List, Dict, TYPE_CHECKING, Any, Optional
from pyhon.parameter.enum import HonParameterEnum from pyhon.parameter.enum import HonParameterEnum
from pyhon.parameter.range import HonParameterRange from pyhon.parameter.range import HonParameterRange
from pyhon.typedefs import Parameter
if TYPE_CHECKING: if TYPE_CHECKING:
from pyhon.commands import HonCommand from pyhon.commands import HonCommand
@ -24,6 +25,10 @@ class HonRuleSet:
self._rules: Dict[str, List[HonRule]] = {} self._rules: Dict[str, List[HonRule]] = {}
self._parse_rule(rule) self._parse_rule(rule)
@property
def rules(self) -> Dict[str, List[HonRule]]:
return self._rules
def _parse_rule(self, rule: Dict[str, Any]) -> None: def _parse_rule(self, rule: Dict[str, Any]) -> None:
for param_key, params in rule.items(): for param_key, params in rule.items():
param_key = self._command.appliance.options.get(param_key, param_key) param_key = self._command.appliance.options.get(param_key, param_key)
@ -83,28 +88,42 @@ class HonRuleSet:
for rule in rules: for rule in rules:
self._rules.setdefault(key, []).append(rule) self._rules.setdefault(key, []).append(rule)
def _extra_rules_matches(self, rule: HonRule) -> bool:
if rule.extras:
for key, value in rule.extras.items():
if not self._command.parameters.get(key):
return False
if str(self._command.parameters.get(key)) != str(value):
return False
return True
def _apply_fixed(self, param: Parameter, value: str | float) -> None:
if isinstance(param, HonParameterEnum) and set(param.values) != {str(value)}:
param.values = [str(value)]
param.value = str(value)
elif isinstance(param, HonParameterRange):
param.value = float(value)
return
param.value = str(value)
def _apply_enum(self, param: Parameter, rule: HonRule) -> None:
if not isinstance(param, HonParameterEnum):
return
if enum_values := rule.param_data.get("enumValues"):
param.values = enum_values.split("|")
if default_value := rule.param_data.get("defaultValue"):
param.value = default_value
def _add_trigger(self, parameter: "HonParameter", data: HonRule) -> None: def _add_trigger(self, parameter: "HonParameter", data: HonRule) -> None:
def apply(rule: HonRule) -> None: def apply(rule: HonRule) -> None:
if rule.extras is not None: if not self._extra_rules_matches(rule):
for key, value in rule.extras.items(): return
if str(self._command.parameters.get(key)) != str(value): if not (param := self._command.parameters.get(rule.param_key)):
return return
if param := self._command.parameters.get(rule.param_key): if fixed_value := rule.param_data.get("fixedValue", ""):
if value := rule.param_data.get("fixedValue", ""): self._apply_fixed(param, fixed_value)
if isinstance(param, HonParameterEnum) and set(param.values) != { elif rule.param_data.get("typology") == "enum":
str(value) self._apply_enum(param, rule)
}:
param.values = [str(value)]
elif isinstance(param, HonParameterRange):
param.value = float(value)
return
param.value = str(value)
elif rule.param_data.get("typology") == "enum":
if isinstance(param, HonParameterEnum):
if enum_values := rule.param_data.get("enumValues"):
param.values = enum_values.split("|")
if default_value := rule.param_data.get("defaultValue"):
param.value = default_value
parameter.add_trigger(data.trigger_value, apply, data) parameter.add_trigger(data.trigger_value, apply, data)

View File

@ -11,7 +11,7 @@ if TYPE_CHECKING:
from pyhon.parameter.range import HonParameterRange from pyhon.parameter.range import HonParameterRange
class Callback(Protocol): class Callback(Protocol): # pylint: disable=too-few-public-methods
def __call__( def __call__(
self, url: str | URL, *args: Any, **kwargs: Any self, url: str | URL, *args: Any, **kwargs: Any
) -> aiohttp.client._RequestContextManager: ) -> aiohttp.client._RequestContextManager:

View File

@ -1,4 +1,4 @@
black==23.3.0 black==23.7.0
flake8==6.0.0 flake8==6.0.0
mypy==1.2.0 mypy==1.4.1
pylint==2.17.2 pylint==2.17.4

View File

@ -2,12 +2,12 @@
from setuptools import setup, find_packages from setuptools import setup, find_packages
with open("README.md", "r") as f: with open("README.md", "r", encoding="utf-8") as f:
long_description = f.read() long_description = f.read()
setup( setup(
name="pyhOn", name="pyhOn",
version="0.14.9", version="0.15.0",
author="Andre Basche", author="Andre Basche",
description="Control hOn devices with python", description="Control hOn devices with python",
long_description=long_description, long_description=long_description,