Compare commits

...

4 Commits

Author SHA1 Message Date
310d1bafd7 Improve rule parsing 2023-06-10 06:47:37 +02:00
9e35dcf9cf Don't send optional program rules 2023-06-10 06:40:55 +02:00
f9d0fa4ae8 Add wine cellar 2023-06-09 22:52:33 +02:00
11988c73a6 Fix command parameter issue hon#63 2023-06-09 02:09:20 +02:00
9 changed files with 113 additions and 39 deletions

View File

@ -302,8 +302,6 @@ class HonAppliance:
"statistics": self.statistics,
"additional_data": self._additional_data,
}
if self._extra and data.get("attributes"):
data = self._extra.data(data)
if command_only:
data.pop("attributes")
data.pop("appliance")

5
pyhon/appliances/wc.py Normal file
View File

@ -0,0 +1,5 @@
from pyhon.appliances.base import ApplianceBase
class Appliance(ApplianceBase):
pass

View File

@ -2,6 +2,7 @@ import logging
from typing import Optional, Dict, Any, List, TYPE_CHECKING, Union
from pyhon import exceptions
from pyhon.exceptions import ApiError
from pyhon.parameter.base import HonParameter
from pyhon.parameter.enum import HonParameterEnum
from pyhon.parameter.fixed import HonParameterFixed
@ -110,10 +111,14 @@ class HonCommand:
async def send(self) -> bool:
params = self.parameter_groups.get("parameters", {})
ancillary_params = self.parameter_groups.get("ancillaryParameters", {})
ancillary_params.pop("programRules", None)
self.appliance.sync_to_params(self.name)
return await self.api.send_command(
result = await self.api.send_command(
self._appliance, self._name, params, ancillary_params
)
if not result:
raise ApiError("Can't send command")
return result
@property
def categories(self) -> Dict[str, "HonCommand"]:

View File

@ -1,6 +1,7 @@
import json
import logging
from datetime import datetime
from pprint import pformat
from typing import Dict, Optional
from aiohttp import ClientSession
@ -188,6 +189,7 @@ class HonAPI:
if json_data.get("payload", {}).get("resultCode") == "0":
return True
_LOGGER.error(await response.text())
_LOGGER.error("%s - Payload:\n%s", url, pformat(data))
return False
async def appliance_configuration(self) -> Dict:

View File

@ -12,3 +12,7 @@ class NoSessionException(Exception):
class NoAuthenticationException(Exception):
pass
class ApiError(Exception):
pass

View File

@ -28,8 +28,8 @@ class HonParameter:
self.check_trigger(value)
@property
def intern_value(self) -> str | float:
return str(self._value) if self._value is not None else ""
def intern_value(self) -> str:
return str(self.value)
@property
def values(self) -> List[str]:
@ -66,5 +66,18 @@ class HonParameter:
def triggers(self):
result = {}
for value, rules in self._triggers.items():
result[value] = {rule.param_key: rule.param_value for _, rule in rules}
for _, rule in rules:
if rule.extras:
param = result.setdefault(value, {})
for extra_key, extra_value in rule.extras.items():
param = param.setdefault(extra_key, {}).setdefault(
extra_value, {}
)
else:
param = result.setdefault(value, {})
if fixed_value := rule.param_data.get("fixedValue"):
param[rule.param_key] = fixed_value
else:
param[rule.param_key] = rule.param_data.get("defaultValue", "")
return result

View File

@ -27,6 +27,10 @@ class HonParameterEnum(HonParameter):
def values(self, values) -> None:
self._values = values
@property
def intern_value(self) -> str:
return str(self._value) if self._value is not None else str(self.values[0])
@property
def value(self) -> str | float:
return clean_value(self._value) if self._value is not None else self.values[0]

View File

@ -1,5 +1,5 @@
from dataclasses import dataclass
from typing import List, Dict, TYPE_CHECKING
from typing import List, Dict, TYPE_CHECKING, Any, Optional
from pyhon.parameter.enum import HonParameterEnum
from pyhon.parameter.range import HonParameterRange
@ -13,7 +13,8 @@ class HonRule:
trigger_key: str
trigger_value: str
param_key: str
param_value: str
param_data: Dict[str, Any]
extras: Optional[Dict[str, str]] = None
class HonRuleSet:
@ -23,40 +24,82 @@ class HonRuleSet:
self._parse_rule(rule)
def _parse_rule(self, rule):
for entity_key, params in rule.items():
entity_key = self._command.appliance.options.get(entity_key, entity_key)
for trigger_key, values in params.items():
for param_key, params in rule.items():
param_key = self._command.appliance.options.get(param_key, param_key)
for trigger_key, trigger_data in params.items():
self._parse_conditions(param_key, trigger_key, trigger_data)
def _parse_conditions(self, param_key, trigger_key, trigger_data, extra=None):
trigger_key = trigger_key.replace("@", "")
trigger_key = self._command.appliance.options.get(
trigger_key, trigger_key
trigger_key = self._command.appliance.options.get(trigger_key, trigger_key)
for multi_trigger_value, param_data in trigger_data.items():
for trigger_value in multi_trigger_value.split("|"):
if isinstance(param_data, dict) and "typology" in param_data:
self._create_rule(
param_key, trigger_key, trigger_value, param_data, extra
)
for trigger_value, entity_value in values.items():
if entity_value.get("fixedValue") == f"@{entity_key}":
continue
elif isinstance(param_data, dict):
if extra is None:
extra = {}
extra[trigger_key] = trigger_value
for extra_key, extra_data in param_data.items():
self._parse_conditions(param_key, extra_key, extra_data, extra)
def _create_rule(
self, param_key, trigger_key, trigger_value, param_data, extras=None
):
if param_data.get("fixedValue") == f"@{param_key}":
return
self._rules.setdefault(trigger_key, []).append(
HonRule(
trigger_key,
trigger_value,
entity_key,
entity_value.get("fixedValue"),
)
HonRule(trigger_key, trigger_value, param_key, param_data, extras)
)
def _duplicate_for_extra_conditions(self):
new = {}
for rules in self._rules.values():
for rule in rules:
if rule.extras is None:
continue
for key, value in rule.extras.items():
extras = rule.extras.copy()
extras.pop(key)
extras[rule.trigger_key] = rule.trigger_value
new.setdefault(key, []).append(
HonRule(key, value, rule.param_key, rule.param_data, extras)
)
for key, rules in new.items():
for rule in rules:
self._rules.setdefault(key, []).append(rule)
def _add_trigger(self, parameter, data):
def apply(rule: HonRule):
if rule.extras is not None:
for key, value in rule.extras.items():
if str(self._command.parameters.get(key)) != str(value):
return
if param := self._command.parameters.get(rule.param_key):
if value := rule.param_data.get("fixedValue", ""):
if isinstance(param, HonParameterEnum) and set(param.values) != {
str(value)
}:
param.values = [str(value)]
elif isinstance(param, HonParameterRange):
param.value = float(value)
return
param.value = str(value)
elif rule.param_data.get("typology") == "enum":
if isinstance(param, HonParameterEnum):
if enum_values := rule.param_data.get("enumValues"):
param.values = enum_values.split("|")
if default_value := rule.param_data.get("defaultValue"):
param.value = default_value
parameter.add_trigger(data.trigger_value, apply, data)
def patch(self):
self._duplicate_for_extra_conditions()
for name, parameter in self._command.parameters.items():
if name not in self._rules:
continue
for data in self._rules.get(name):
def apply(rule):
if param := self._command.parameters.get(rule.param_key):
if isinstance(param, HonParameterEnum) and set(
param.values
) != {str(rule.param_value)}:
param.values = [str(rule.param_value)]
elif isinstance(param, HonParameterRange):
param.value = float(rule.param_value)
return
param.value = str(rule.param_value)
parameter.add_trigger(data.trigger_value, apply, data)
self._add_trigger(parameter, data)

View File

@ -7,7 +7,7 @@ with open("README.md", "r") as f:
setup(
name="pyhOn",
version="0.12.2",
version="0.13.0",
author="Andre Basche",
description="Control hOn devices with python",
long_description=long_description,