Compare commits
2 Commits
Author | SHA1 | Date | |
---|---|---|---|
84000fb76c | |||
26528a7310 |
@ -32,7 +32,7 @@ class HonAppliance:
|
||||
self._api: Optional[HonAPI] = api
|
||||
self._appliance_model: Dict = {}
|
||||
|
||||
self._commands: Dict[str, HonCommand] = {}
|
||||
self._commands: Dict = {}
|
||||
self._statistics: Dict = {}
|
||||
self._attributes: Dict = {}
|
||||
self._zone: int = zone
|
||||
@ -112,7 +112,7 @@ class HonAppliance:
|
||||
return self._appliance_model.get("options", {})
|
||||
|
||||
@property
|
||||
def commands(self) -> Dict[str, HonCommand]:
|
||||
def commands(self):
|
||||
return self._commands
|
||||
|
||||
@property
|
||||
@ -329,9 +329,7 @@ class HonAppliance:
|
||||
self.attributes["parameters"][key] = str(new.intern_value)
|
||||
|
||||
def sync_command(self, main, target=None) -> None:
|
||||
base: Optional[HonCommand] = self.commands.get(main)
|
||||
if not base:
|
||||
return
|
||||
base: HonCommand = self.commands.get(main)
|
||||
for command, data in self.commands.items():
|
||||
if command == main or target and command not in target:
|
||||
continue
|
||||
|
@ -4,7 +4,7 @@ from pyhon.appliances.base import ApplianceBase
|
||||
class Appliance(ApplianceBase):
|
||||
def attributes(self, data):
|
||||
data = super().attributes(data)
|
||||
if data.get("lastConnEvent", {}).get("category", "") == "DISCONNECTED":
|
||||
if data["lastConnEvent"]["category"] == "DISCONNECTED":
|
||||
data["parameters"]["machMode"] = "0"
|
||||
data["active"] = bool(data.get("activity"))
|
||||
return data
|
||||
|
@ -4,7 +4,7 @@ from pyhon.appliances.base import ApplianceBase
|
||||
class Appliance(ApplianceBase):
|
||||
def attributes(self, data):
|
||||
data = super().attributes(data)
|
||||
if data.get("lastConnEvent", {}).get("category", "") == "DISCONNECTED":
|
||||
if data["lastConnEvent"]["category"] == "DISCONNECTED":
|
||||
data["parameters"]["temp"] = "0"
|
||||
data["parameters"]["onOffStatus"] = "0"
|
||||
data["parameters"]["remoteCtrValid"] = "0"
|
||||
|
@ -5,7 +5,7 @@ from pyhon.parameter.fixed import HonParameterFixed
|
||||
class Appliance(ApplianceBase):
|
||||
def attributes(self, data):
|
||||
data = super().attributes(data)
|
||||
if data.get("lastConnEvent", {}).get("category", "") == "DISCONNECTED":
|
||||
if data["lastConnEvent"]["category"] == "DISCONNECTED":
|
||||
data["parameters"]["machMode"] = "0"
|
||||
data["active"] = bool(data.get("activity"))
|
||||
data["pause"] = data["parameters"]["machMode"] == "3"
|
||||
|
@ -4,7 +4,7 @@ from pyhon.appliances.base import ApplianceBase
|
||||
class Appliance(ApplianceBase):
|
||||
def attributes(self, data):
|
||||
data = super().attributes(data)
|
||||
if data.get("lastConnEvent", {}).get("category", "") == "DISCONNECTED":
|
||||
if data["lastConnEvent"]["category"] == "DISCONNECTED":
|
||||
data["parameters"]["machMode"] = "0"
|
||||
data["active"] = bool(data.get("activity"))
|
||||
data["pause"] = data["parameters"]["machMode"] == "3"
|
||||
|
@ -4,7 +4,7 @@ from pyhon.appliances.base import ApplianceBase
|
||||
class Appliance(ApplianceBase):
|
||||
def attributes(self, data):
|
||||
data = super().attributes(data)
|
||||
if data.get("lastConnEvent", {}).get("category", "") == "DISCONNECTED":
|
||||
if data["lastConnEvent"]["category"] == "DISCONNECTED":
|
||||
data["parameters"]["machMode"] = "0"
|
||||
data["active"] = bool(data.get("activity"))
|
||||
data["pause"] = data["parameters"]["machMode"] == "3"
|
||||
|
@ -2,7 +2,7 @@ import logging
|
||||
from typing import Optional, Dict, Any, List, TYPE_CHECKING, Union
|
||||
|
||||
from pyhon import exceptions
|
||||
from pyhon.exceptions import ApiError, NoAuthenticationException
|
||||
from pyhon.exceptions import ApiError
|
||||
from pyhon.parameter.base import HonParameter
|
||||
from pyhon.parameter.enum import HonParameterEnum
|
||||
from pyhon.parameter.fixed import HonParameterFixed
|
||||
@ -113,16 +113,11 @@ class HonCommand:
|
||||
ancillary_params = self.parameter_groups.get("ancillaryParameters", {})
|
||||
ancillary_params.pop("programRules", None)
|
||||
self.appliance.sync_to_params(self.name)
|
||||
try:
|
||||
result = await self.api.send_command(
|
||||
self._appliance, self._name, params, ancillary_params
|
||||
)
|
||||
if not result:
|
||||
_LOGGER.error(result)
|
||||
raise ApiError("Can't send command")
|
||||
except NoAuthenticationException:
|
||||
_LOGGER.error("No Authentication")
|
||||
return False
|
||||
return result
|
||||
|
||||
@property
|
||||
|
@ -66,18 +66,5 @@ class HonParameter:
|
||||
def triggers(self):
|
||||
result = {}
|
||||
for value, rules in self._triggers.items():
|
||||
for _, rule in rules:
|
||||
if rule.extras:
|
||||
param = result.setdefault(value, {})
|
||||
for extra_key, extra_value in rule.extras.items():
|
||||
param = param.setdefault(extra_key, {}).setdefault(
|
||||
extra_value, {}
|
||||
)
|
||||
else:
|
||||
param = result.setdefault(value, {})
|
||||
if fixed_value := rule.param_data.get("fixedValue"):
|
||||
param[rule.param_key] = fixed_value
|
||||
else:
|
||||
param[rule.param_key] = rule.param_data.get("defaultValue", "")
|
||||
|
||||
result[value] = {rule.param_key: rule.param_value for _, rule in rules}
|
||||
return result
|
||||
|
101
pyhon/rules.py
101
pyhon/rules.py
@ -1,5 +1,5 @@
|
||||
from dataclasses import dataclass
|
||||
from typing import List, Dict, TYPE_CHECKING, Any, Optional
|
||||
from typing import List, Dict, TYPE_CHECKING
|
||||
|
||||
from pyhon.parameter.enum import HonParameterEnum
|
||||
from pyhon.parameter.range import HonParameterRange
|
||||
@ -13,8 +13,7 @@ class HonRule:
|
||||
trigger_key: str
|
||||
trigger_value: str
|
||||
param_key: str
|
||||
param_data: Dict[str, Any]
|
||||
extras: Optional[Dict[str, str]] = None
|
||||
param_value: str
|
||||
|
||||
|
||||
class HonRuleSet:
|
||||
@ -24,82 +23,40 @@ class HonRuleSet:
|
||||
self._parse_rule(rule)
|
||||
|
||||
def _parse_rule(self, rule):
|
||||
for param_key, params in rule.items():
|
||||
param_key = self._command.appliance.options.get(param_key, param_key)
|
||||
for trigger_key, trigger_data in params.items():
|
||||
self._parse_conditions(param_key, trigger_key, trigger_data)
|
||||
|
||||
def _parse_conditions(self, param_key, trigger_key, trigger_data, extra=None):
|
||||
for entity_key, params in rule.items():
|
||||
entity_key = self._command.appliance.options.get(entity_key, entity_key)
|
||||
for trigger_key, values in params.items():
|
||||
trigger_key = trigger_key.replace("@", "")
|
||||
trigger_key = self._command.appliance.options.get(trigger_key, trigger_key)
|
||||
for multi_trigger_value, param_data in trigger_data.items():
|
||||
for trigger_value in multi_trigger_value.split("|"):
|
||||
if isinstance(param_data, dict) and "typology" in param_data:
|
||||
self._create_rule(
|
||||
param_key, trigger_key, trigger_value, param_data, extra
|
||||
trigger_key = self._command.appliance.options.get(
|
||||
trigger_key, trigger_key
|
||||
)
|
||||
elif isinstance(param_data, dict):
|
||||
if extra is None:
|
||||
extra = {}
|
||||
extra[trigger_key] = trigger_value
|
||||
for extra_key, extra_data in param_data.items():
|
||||
self._parse_conditions(param_key, extra_key, extra_data, extra)
|
||||
|
||||
def _create_rule(
|
||||
self, param_key, trigger_key, trigger_value, param_data, extras=None
|
||||
):
|
||||
if param_data.get("fixedValue") == f"@{param_key}":
|
||||
return
|
||||
self._rules.setdefault(trigger_key, []).append(
|
||||
HonRule(trigger_key, trigger_value, param_key, param_data, extras)
|
||||
)
|
||||
|
||||
def _duplicate_for_extra_conditions(self):
|
||||
new = {}
|
||||
for rules in self._rules.values():
|
||||
for rule in rules:
|
||||
if rule.extras is None:
|
||||
for trigger_value, entity_value in values.items():
|
||||
if entity_value.get("fixedValue") == f"@{entity_key}":
|
||||
continue
|
||||
for key, value in rule.extras.items():
|
||||
extras = rule.extras.copy()
|
||||
extras.pop(key)
|
||||
extras[rule.trigger_key] = rule.trigger_value
|
||||
new.setdefault(key, []).append(
|
||||
HonRule(key, value, rule.param_key, rule.param_data, extras)
|
||||
self._rules.setdefault(trigger_key, []).append(
|
||||
HonRule(
|
||||
trigger_key,
|
||||
trigger_value,
|
||||
entity_key,
|
||||
entity_value.get("fixedValue"),
|
||||
)
|
||||
)
|
||||
for key, rules in new.items():
|
||||
for rule in rules:
|
||||
self._rules.setdefault(key, []).append(rule)
|
||||
|
||||
def _add_trigger(self, parameter, data):
|
||||
def apply(rule: HonRule):
|
||||
if rule.extras is not None:
|
||||
for key, value in rule.extras.items():
|
||||
if str(self._command.parameters.get(key)) != str(value):
|
||||
return
|
||||
if param := self._command.parameters.get(rule.param_key):
|
||||
if value := rule.param_data.get("fixedValue", ""):
|
||||
if isinstance(param, HonParameterEnum) and set(param.values) != {
|
||||
str(value)
|
||||
}:
|
||||
param.values = [str(value)]
|
||||
elif isinstance(param, HonParameterRange):
|
||||
param.value = float(value)
|
||||
return
|
||||
param.value = str(value)
|
||||
elif rule.param_data.get("typology") == "enum":
|
||||
if isinstance(param, HonParameterEnum):
|
||||
if enum_values := rule.param_data.get("enumValues"):
|
||||
param.values = enum_values.split("|")
|
||||
if default_value := rule.param_data.get("defaultValue"):
|
||||
param.value = default_value
|
||||
|
||||
parameter.add_trigger(data.trigger_value, apply, data)
|
||||
|
||||
def patch(self):
|
||||
self._duplicate_for_extra_conditions()
|
||||
for name, parameter in self._command.parameters.items():
|
||||
if name not in self._rules:
|
||||
continue
|
||||
for data in self._rules.get(name):
|
||||
self._add_trigger(parameter, data)
|
||||
|
||||
def apply(rule):
|
||||
if param := self._command.parameters.get(rule.param_key):
|
||||
if isinstance(param, HonParameterEnum) and set(
|
||||
param.values
|
||||
) != {str(rule.param_value)}:
|
||||
param.values = [str(rule.param_value)]
|
||||
elif isinstance(param, HonParameterRange):
|
||||
param.value = float(rule.param_value)
|
||||
return
|
||||
param.value = str(rule.param_value)
|
||||
|
||||
parameter.add_trigger(data.trigger_value, apply, data)
|
||||
|
Reference in New Issue
Block a user