Improve rule parsing
This commit is contained in:
parent
9e35dcf9cf
commit
310d1bafd7
@ -66,5 +66,18 @@ class HonParameter:
|
|||||||
def triggers(self):
|
def triggers(self):
|
||||||
result = {}
|
result = {}
|
||||||
for value, rules in self._triggers.items():
|
for value, rules in self._triggers.items():
|
||||||
result[value] = {rule.param_key: rule.param_value for _, rule in rules}
|
for _, rule in rules:
|
||||||
|
if rule.extras:
|
||||||
|
param = result.setdefault(value, {})
|
||||||
|
for extra_key, extra_value in rule.extras.items():
|
||||||
|
param = param.setdefault(extra_key, {}).setdefault(
|
||||||
|
extra_value, {}
|
||||||
|
)
|
||||||
|
else:
|
||||||
|
param = result.setdefault(value, {})
|
||||||
|
if fixed_value := rule.param_data.get("fixedValue"):
|
||||||
|
param[rule.param_key] = fixed_value
|
||||||
|
else:
|
||||||
|
param[rule.param_key] = rule.param_data.get("defaultValue", "")
|
||||||
|
|
||||||
return result
|
return result
|
||||||
|
107
pyhon/rules.py
107
pyhon/rules.py
@ -1,5 +1,5 @@
|
|||||||
from dataclasses import dataclass
|
from dataclasses import dataclass
|
||||||
from typing import List, Dict, TYPE_CHECKING
|
from typing import List, Dict, TYPE_CHECKING, Any, Optional
|
||||||
|
|
||||||
from pyhon.parameter.enum import HonParameterEnum
|
from pyhon.parameter.enum import HonParameterEnum
|
||||||
from pyhon.parameter.range import HonParameterRange
|
from pyhon.parameter.range import HonParameterRange
|
||||||
@ -13,7 +13,8 @@ class HonRule:
|
|||||||
trigger_key: str
|
trigger_key: str
|
||||||
trigger_value: str
|
trigger_value: str
|
||||||
param_key: str
|
param_key: str
|
||||||
param_value: str
|
param_data: Dict[str, Any]
|
||||||
|
extras: Optional[Dict[str, str]] = None
|
||||||
|
|
||||||
|
|
||||||
class HonRuleSet:
|
class HonRuleSet:
|
||||||
@ -23,40 +24,82 @@ class HonRuleSet:
|
|||||||
self._parse_rule(rule)
|
self._parse_rule(rule)
|
||||||
|
|
||||||
def _parse_rule(self, rule):
|
def _parse_rule(self, rule):
|
||||||
for entity_key, params in rule.items():
|
for param_key, params in rule.items():
|
||||||
entity_key = self._command.appliance.options.get(entity_key, entity_key)
|
param_key = self._command.appliance.options.get(param_key, param_key)
|
||||||
for trigger_key, values in params.items():
|
for trigger_key, trigger_data in params.items():
|
||||||
trigger_key = trigger_key.replace("@", "")
|
self._parse_conditions(param_key, trigger_key, trigger_data)
|
||||||
trigger_key = self._command.appliance.options.get(
|
|
||||||
trigger_key, trigger_key
|
def _parse_conditions(self, param_key, trigger_key, trigger_data, extra=None):
|
||||||
)
|
trigger_key = trigger_key.replace("@", "")
|
||||||
for trigger_value, entity_value in values.items():
|
trigger_key = self._command.appliance.options.get(trigger_key, trigger_key)
|
||||||
if entity_value.get("fixedValue") == f"@{entity_key}":
|
for multi_trigger_value, param_data in trigger_data.items():
|
||||||
continue
|
for trigger_value in multi_trigger_value.split("|"):
|
||||||
self._rules.setdefault(trigger_key, []).append(
|
if isinstance(param_data, dict) and "typology" in param_data:
|
||||||
HonRule(
|
self._create_rule(
|
||||||
trigger_key,
|
param_key, trigger_key, trigger_value, param_data, extra
|
||||||
trigger_value,
|
|
||||||
entity_key,
|
|
||||||
entity_value.get("fixedValue"),
|
|
||||||
)
|
|
||||||
)
|
)
|
||||||
|
elif isinstance(param_data, dict):
|
||||||
|
if extra is None:
|
||||||
|
extra = {}
|
||||||
|
extra[trigger_key] = trigger_value
|
||||||
|
for extra_key, extra_data in param_data.items():
|
||||||
|
self._parse_conditions(param_key, extra_key, extra_data, extra)
|
||||||
|
|
||||||
|
def _create_rule(
|
||||||
|
self, param_key, trigger_key, trigger_value, param_data, extras=None
|
||||||
|
):
|
||||||
|
if param_data.get("fixedValue") == f"@{param_key}":
|
||||||
|
return
|
||||||
|
self._rules.setdefault(trigger_key, []).append(
|
||||||
|
HonRule(trigger_key, trigger_value, param_key, param_data, extras)
|
||||||
|
)
|
||||||
|
|
||||||
|
def _duplicate_for_extra_conditions(self):
|
||||||
|
new = {}
|
||||||
|
for rules in self._rules.values():
|
||||||
|
for rule in rules:
|
||||||
|
if rule.extras is None:
|
||||||
|
continue
|
||||||
|
for key, value in rule.extras.items():
|
||||||
|
extras = rule.extras.copy()
|
||||||
|
extras.pop(key)
|
||||||
|
extras[rule.trigger_key] = rule.trigger_value
|
||||||
|
new.setdefault(key, []).append(
|
||||||
|
HonRule(key, value, rule.param_key, rule.param_data, extras)
|
||||||
|
)
|
||||||
|
for key, rules in new.items():
|
||||||
|
for rule in rules:
|
||||||
|
self._rules.setdefault(key, []).append(rule)
|
||||||
|
|
||||||
|
def _add_trigger(self, parameter, data):
|
||||||
|
def apply(rule: HonRule):
|
||||||
|
if rule.extras is not None:
|
||||||
|
for key, value in rule.extras.items():
|
||||||
|
if str(self._command.parameters.get(key)) != str(value):
|
||||||
|
return
|
||||||
|
if param := self._command.parameters.get(rule.param_key):
|
||||||
|
if value := rule.param_data.get("fixedValue", ""):
|
||||||
|
if isinstance(param, HonParameterEnum) and set(param.values) != {
|
||||||
|
str(value)
|
||||||
|
}:
|
||||||
|
param.values = [str(value)]
|
||||||
|
elif isinstance(param, HonParameterRange):
|
||||||
|
param.value = float(value)
|
||||||
|
return
|
||||||
|
param.value = str(value)
|
||||||
|
elif rule.param_data.get("typology") == "enum":
|
||||||
|
if isinstance(param, HonParameterEnum):
|
||||||
|
if enum_values := rule.param_data.get("enumValues"):
|
||||||
|
param.values = enum_values.split("|")
|
||||||
|
if default_value := rule.param_data.get("defaultValue"):
|
||||||
|
param.value = default_value
|
||||||
|
|
||||||
|
parameter.add_trigger(data.trigger_value, apply, data)
|
||||||
|
|
||||||
def patch(self):
|
def patch(self):
|
||||||
|
self._duplicate_for_extra_conditions()
|
||||||
for name, parameter in self._command.parameters.items():
|
for name, parameter in self._command.parameters.items():
|
||||||
if name not in self._rules:
|
if name not in self._rules:
|
||||||
continue
|
continue
|
||||||
for data in self._rules.get(name):
|
for data in self._rules.get(name):
|
||||||
|
self._add_trigger(parameter, data)
|
||||||
def apply(rule):
|
|
||||||
if param := self._command.parameters.get(rule.param_key):
|
|
||||||
if isinstance(param, HonParameterEnum) and set(
|
|
||||||
param.values
|
|
||||||
) != {str(rule.param_value)}:
|
|
||||||
param.values = [str(rule.param_value)]
|
|
||||||
elif isinstance(param, HonParameterRange):
|
|
||||||
param.value = float(rule.param_value)
|
|
||||||
return
|
|
||||||
param.value = str(rule.param_value)
|
|
||||||
|
|
||||||
parameter.add_trigger(data.trigger_value, apply, data)
|
|
||||||
|
Loading…
Reference in New Issue
Block a user