Compare commits

...

4 Commits

Author SHA1 Message Date
a957d7ac0f Fix error for zone devices 2023-04-15 21:58:20 +02:00
f54b7b2dbf Add mypy checks 2023-04-15 15:57:36 +02:00
b6ca12ebff Use dataclass for login data 2023-04-15 14:37:27 +02:00
4a0ee8569b Refactor, add hon auth handler 2023-04-15 14:22:04 +02:00
12 changed files with 363 additions and 285 deletions

View File

@ -25,12 +25,15 @@ jobs:
run: |
python -m pip install --upgrade pip
python -m pip install -r requirements.txt
python -m pip install flake8 pylint black
python -m pip install flake8 pylint black mypy
- name: Lint with flake8
run: |
# stop the build if there are Python syntax errors or undefined names
flake8 . --count --select=E9,F63,F7,F82 --show-source --statistics
flake8 . --count --exit-zero --max-complexity=10 --max-line-length=88 --statistics
- name: Type check with mypy
run: |
mypy pyhon/
# - name: Analysing the code with pylint
# run: |
# pylint --max-line-length 88 $(git ls-files '*.py')

View File

@ -1,23 +1,29 @@
import importlib
from contextlib import suppress
from typing import Optional, Dict
from typing import Optional, Dict, Any
from typing import TYPE_CHECKING
from pyhon import helper
from pyhon.commands import HonCommand
from pyhon.parameter import HonParameterFixed
if TYPE_CHECKING:
from pyhon import HonAPI
class HonAppliance:
def __init__(self, api, info: Dict, zone: int = 0) -> None:
def __init__(
self, api: Optional["HonAPI"], info: Dict[str, Any], zone: int = 0
) -> None:
if attributes := info.get("attributes"):
info["attributes"] = {v["parName"]: v["parValue"] for v in attributes}
self._info = info
self._api = api
self._appliance_model = {}
self._info: Dict = info
self._api: Optional[HonAPI] = api
self._appliance_model: Dict = {}
self._commands = {}
self._statistics = {}
self._attributes = {}
self._commands: Dict = {}
self._statistics: Dict = {}
self._attributes: Dict = {}
self._zone = zone
try:
@ -58,14 +64,18 @@ class HonAppliance:
@property
def appliance_model_id(self) -> str:
return self._info.get("applianceModelId")
return self._info.get("applianceModelId", "")
@property
def appliance_type(self) -> str:
return self._info.get("applianceTypeName")
return self._info.get("applianceTypeName", "")
@property
def mac_address(self) -> str:
return self.info.get("macAddress", "")
@property
def unique_id(self) -> str:
return self._check_name_zone("macAddress", frontend=False)
@property

View File

@ -9,7 +9,8 @@ from aiohttp import ClientSession
from pyhon import const, exceptions
from pyhon.appliance import HonAppliance
from pyhon.connection.auth import HonAuth
from pyhon.connection.handler import HonConnectionHandler, HonAnonymousConnectionHandler
from pyhon.connection.handler.hon import HonConnectionHandler
from pyhon.connection.handler.anonym import HonAnonymousConnectionHandler
_LOGGER = logging.getLogger()

View File

@ -3,52 +3,66 @@ import logging
import re
import secrets
import urllib
from contextlib import suppress
from dataclasses import dataclass
from datetime import datetime, timedelta
from pprint import pformat
from typing import List, Tuple
from typing import Dict, Optional, List
from urllib import parse
from urllib.parse import quote
from aiohttp import ClientResponse
from yarl import URL
from pyhon import const, exceptions
from pyhon.connection.handler.auth import HonAuthConnectionHandler
_LOGGER = logging.getLogger(__name__)
@dataclass
class HonLoginData:
url: str = ""
email: str = ""
password: str = ""
fw_uid: str = ""
loaded: Optional[Dict] = None
class HonAuth:
_TOKEN_EXPIRES_AFTER_HOURS = 8
_TOKEN_EXPIRE_WARNING_HOURS = 7
def __init__(self, session, email, password, device) -> None:
self._session = session
self._email = email
self._password = password
self._request = HonAuthConnectionHandler(session)
self._login_data = HonLoginData()
self._login_data.email = email
self._login_data.password = password
self._access_token = ""
self._refresh_token = ""
self._cognito_token = ""
self._id_token = ""
self._device = device
self._called_urls: List[Tuple[int, str]] = []
self._expires: datetime = datetime.utcnow()
@property
def cognito_token(self):
def cognito_token(self) -> str:
return self._cognito_token
@property
def id_token(self):
def id_token(self) -> str:
return self._id_token
@property
def access_token(self):
def access_token(self) -> str:
return self._access_token
@property
def refresh_token(self):
def refresh_token(self) -> str:
return self._refresh_token
def _check_token_expiration(self, hours):
def _check_token_expiration(self, hours: int) -> bool:
return datetime.utcnow() >= self._expires + timedelta(hours=hours)
@property
@ -59,34 +73,39 @@ class HonAuth:
def token_expires_soon(self) -> bool:
return self._check_token_expiration(self._TOKEN_EXPIRE_WARNING_HOURS)
async def _error_logger(self, response, fail=True):
result = "hOn Authentication Error\n"
for i, (status, url) in enumerate(self._called_urls):
result += f" {i + 1: 2d} {status} - {url}\n"
result += f"ERROR - {response.status} - {response.request_info.url}\n"
result += f"{15 * '='} Response {15 * '='}\n{await response.text()}\n{40 * '='}"
_LOGGER.error(result)
async def _error_logger(self, response: ClientResponse, fail: bool = True) -> None:
output = "hOn Authentication Error\n"
for i, (status, url) in enumerate(self._request.called_urls):
output += f" {i + 1: 2d} {status} - {url}\n"
output += f"ERROR - {response.status} - {response.request_info.url}\n"
output += f"{15 * '='} Response {15 * '='}\n{await response.text()}\n{40 * '='}"
_LOGGER.error(output)
if fail:
raise exceptions.HonAuthenticationError("Can't login")
async def _load_login(self):
@staticmethod
def _generate_nonce() -> str:
nonce = secrets.token_hex(16)
nonce = f"{nonce[:8]}-{nonce[8:12]}-{nonce[12:16]}-{nonce[16:20]}-{nonce[20:]}"
return f"{nonce[:8]}-{nonce[8:12]}-{nonce[12:16]}-{nonce[16:20]}-{nonce[20:]}"
async def _load_login(self) -> bool:
login_url = await self._introduce()
login_url = await self._handle_redirects(login_url)
return await self._login_url(login_url)
async def _introduce(self) -> str:
redirect_uri = urllib.parse.quote(f"{const.APP}://mobilesdk/detect/oauth/done")
params = {
"response_type": "token+id_token",
"client_id": const.CLIENT_ID,
"redirect_uri": urllib.parse.quote(
f"{const.APP}://mobilesdk/detect/oauth/done"
),
"redirect_uri": redirect_uri,
"display": "touch",
"scope": "api openid refresh_token web",
"nonce": nonce,
"nonce": self._generate_nonce(),
}
params = "&".join([f"{k}={v}" for k, v in params.items()])
async with self._session.get(
f"{const.AUTH_API}/services/oauth2/authorize/expid_Login?{params}"
) as response:
self._called_urls.append((response.status, response.request_info.url))
params_encode = "&".join([f"{k}={v}" for k, v in params.items()])
url = f"{const.AUTH_API}/services/oauth2/authorize/expid_Login?{params_encode}"
async with self._request.get(url) as response:
text = await response.text()
self._expires = datetime.utcnow()
if not (login_url := re.findall("url = '(.+?)'", text)):
@ -94,90 +113,77 @@ class HonAuth:
self._parse_token_data(text)
raise exceptions.HonNoAuthenticationNeeded()
await self._error_logger(response)
return False
async with self._session.get(login_url[0], allow_redirects=False) as redirect1:
self._called_urls.append((redirect1.status, redirect1.request_info.url))
if not (url := redirect1.headers.get("Location")):
await self._error_logger(redirect1)
return False
async with self._session.get(url, allow_redirects=False) as redirect2:
self._called_urls.append((redirect2.status, redirect2.request_info.url))
if not (
url := redirect2.headers.get("Location")
+ "&System=IoT_Mobile_App&RegistrationSubChannel=hOn"
):
await self._error_logger(redirect2)
return False
async with self._session.get(
URL(url, encoded=True), headers={"user-agent": const.USER_AGENT}
) as login_screen:
self._called_urls.append(
(login_screen.status, login_screen.request_info.url)
)
if context := re.findall(
'"fwuid":"(.*?)","loaded":(\\{.*?})', await login_screen.text()
):
return login_url[0]
async def _manual_redirect(self, url: str) -> str:
async with self._request.get(url, allow_redirects=False) as response:
if not (new_location := response.headers.get("Location", "")):
await self._error_logger(response)
return new_location
async def _handle_redirects(self, login_url) -> str:
redirect1 = await self._manual_redirect(login_url)
redirect2 = await self._manual_redirect(redirect1)
return f"{redirect2}&System=IoT_Mobile_App&RegistrationSubChannel=hOn"
async def _login_url(self, login_url: str) -> bool:
headers = {"user-agent": const.USER_AGENT}
url = URL(login_url, encoded=True)
async with self._request.get(url, headers=headers) as response:
text = await response.text()
if context := re.findall('"fwuid":"(.*?)","loaded":(\\{.*?})', text):
fw_uid, loaded_str = context[0]
loaded = json.loads(loaded_str)
login_url = login_url[0].replace(
self._login_data.fw_uid = fw_uid
self._login_data.loaded = json.loads(loaded_str)
self._login_data.url = login_url.replace(
"/".join(const.AUTH_API.split("/")[:-1]), ""
)
return fw_uid, loaded, login_url
await self._error_logger(login_screen)
return True
await self._error_logger(response)
return False
async def _login(self, fw_uid, loaded, login_url):
data = {
"message": {
"actions": [
{
"id": "79;a",
"descriptor": "apex://LightningLoginCustomController/ACTION$login",
"callingDescriptor": "markup://c:loginForm",
"params": {
"username": quote(self._email),
"password": quote(self._password),
"startUrl": parse.unquote(
login_url.split("startURL=")[-1]
).split("%3D")[0],
},
}
]
async def _login(self) -> str:
start_url = self._login_data.url.rsplit("startURL=", maxsplit=1)[-1]
start_url = parse.unquote(start_url).split("%3D")[0]
action = {
"id": "79;a",
"descriptor": "apex://LightningLoginCustomController/ACTION$login",
"callingDescriptor": "markup://c:loginForm",
"params": {
"username": quote(self._login_data.email),
"password": quote(self._login_data.password),
"startUrl": start_url,
},
}
data = {
"message": {"actions": [action]},
"aura.context": {
"mode": "PROD",
"fwuid": fw_uid,
"fwuid": self._login_data.fw_uid,
"app": "siteforce:loginApp2",
"loaded": loaded,
"loaded": self._login_data.loaded,
"dn": [],
"globals": {},
"uad": False,
},
"aura.pageURI": login_url,
"aura.pageURI": self._login_data.url,
"aura.token": None,
}
params = {"r": 3, "other.LightningLoginCustom.login": 1}
async with self._session.post(
async with self._request.post(
const.AUTH_API + "/s/sfsites/aura",
headers={"Content-Type": "application/x-www-form-urlencoded"},
data="&".join(f"{k}={json.dumps(v)}" for k, v in data.items()),
params=params,
) as response:
self._called_urls.append((response.status, response.request_info.url))
if response.status == 200:
try:
data = await response.json()
return data["events"][0]["attributes"]["values"]["url"]
except json.JSONDecodeError:
pass
except KeyError:
_LOGGER.error(
"Can't get login url - %s", pformat(await response.json())
)
with suppress(json.JSONDecodeError, KeyError):
result = await response.json()
return result["events"][0]["attributes"]["values"]["url"]
await self._error_logger(response)
return ""
def _parse_token_data(self, text):
def _parse_token_data(self, text: str) -> None:
if access_token := re.findall("access_token=(.*?)&", text):
self._access_token = access_token[0]
if refresh_token := re.findall("refresh_token=(.*?)&", text):
@ -185,39 +191,39 @@ class HonAuth:
if id_token := re.findall("id_token=(.*?)&", text):
self._id_token = id_token[0]
async def _get_token(self, url):
async with self._session.get(url) as response:
self._called_urls.append((response.status, response.request_info.url))
async def _get_token(self, url: str) -> bool:
async with self._request.get(url) as response:
if response.status != 200:
await self._error_logger(response)
return False
url = re.findall("href\\s*=\\s*[\"'](.+?)[\"']", await response.text())
if not url:
url_search = re.findall(
"href\\s*=\\s*[\"'](.+?)[\"']", await response.text()
)
if not url_search:
await self._error_logger(response)
return False
if "ProgressiveLogin" in url[0]:
async with self._session.get(url[0]) as response:
self._called_urls.append((response.status, response.request_info.url))
if "ProgressiveLogin" in url_search[0]:
async with self._request.get(url_search[0]) as response:
if response.status != 200:
await self._error_logger(response)
return False
url = re.findall("href\\s*=\\s*[\"'](.*?)[\"']", await response.text())
url = "/".join(const.AUTH_API.split("/")[:-1]) + url[0]
async with self._session.get(url) as response:
self._called_urls.append((response.status, response.request_info.url))
url_search = re.findall(
"href\\s*=\\s*[\"'](.*?)[\"']", await response.text()
)
url = "/".join(const.AUTH_API.split("/")[:-1]) + url_search[0]
async with self._request.get(url) as response:
if response.status != 200:
await self._error_logger(response)
return False
self._parse_token_data(await response.text())
return True
async def _api_auth(self):
async def _api_auth(self) -> bool:
post_headers = {"id-token": self._id_token}
data = self._device.get()
async with self._session.post(
async with self._request.post(
f"{const.API_URL}/auth/v1/login", headers=post_headers, json=data
) as response:
self._called_urls.append((response.status, response.request_info.url))
try:
json_data = await response.json()
except json.JSONDecodeError:
@ -226,12 +232,12 @@ class HonAuth:
self._cognito_token = json_data["cognitoUser"]["Token"]
return True
async def authenticate(self):
async def authenticate(self) -> None:
self.clear()
try:
if not (login_site := await self._load_login()):
if not await self._load_login():
raise exceptions.HonAuthenticationError("Can't open login page")
if not (url := await self._login(*login_site)):
if not (url := await self._login()):
raise exceptions.HonAuthenticationError("Can't login")
if not await self._get_token(url):
raise exceptions.HonAuthenticationError("Can't get token")
@ -240,16 +246,15 @@ class HonAuth:
except exceptions.HonNoAuthenticationNeeded:
return
async def refresh(self):
async def refresh(self) -> bool:
params = {
"client_id": const.CLIENT_ID,
"refresh_token": self._refresh_token,
"grant_type": "refresh_token",
}
async with self._session.post(
async with self._request.post(
f"{const.AUTH_API}/services/oauth2/token", params=params
) as response:
self._called_urls.append((response.status, response.request_info.url))
if response.status >= 400:
await self._error_logger(response, fail=False)
return False
@ -259,9 +264,9 @@ class HonAuth:
self._access_token = data["access_token"]
return await self._api_auth()
def clear(self):
def clear(self) -> None:
self._session.cookie_jar.clear_domain(const.AUTH_API.split("/")[-2])
self._called_urls = []
self._request.called_urls = []
self._cognito_token = ""
self._id_token = ""
self._access_token = ""

View File

@ -1,159 +0,0 @@
import json
from collections.abc import Generator, AsyncIterator, Coroutine
from contextlib import asynccontextmanager
from typing import Optional, Callable, Dict
from typing_extensions import Self
import aiohttp
from pyhon import const, exceptions
from pyhon.connection.auth import HonAuth, _LOGGER
from pyhon.connection.device import HonDevice
from pyhon.exceptions import HonAuthenticationError
class HonBaseConnectionHandler:
_HEADERS: Dict = {
"user-agent": const.USER_AGENT,
"Content-Type": "application/json",
}
def __init__(self, session: Optional[aiohttp.ClientSession] = None) -> None:
self._create_session: bool = session is None
self._session: Optional[aiohttp.ClientSession] = session
self._auth: Optional[HonAuth] = None
async def __aenter__(self) -> Self:
return await self.create()
async def __aexit__(self, exc_type, exc_val, exc_tb) -> None:
await self.close()
@property
def auth(self) -> Optional[HonAuth]:
return self._auth
async def create(self) -> Self:
if self._create_session:
self._session = aiohttp.ClientSession()
return self
@asynccontextmanager
def _intercept(self, method: Callable, *args, loop: int = 0, **kwargs):
raise NotImplementedError
@asynccontextmanager
async def get(self, *args, **kwargs) -> AsyncIterator[Callable]:
if self._session is None:
raise exceptions.NoSessionException()
response: Callable
async with self._intercept(self._session.get, *args, **kwargs) as response:
yield response
@asynccontextmanager
async def post(self, *args, **kwargs) -> AsyncIterator[Callable]:
if self._session is None:
raise exceptions.NoSessionException()
response: Callable
async with self._intercept(self._session.post, *args, **kwargs) as response:
yield response
async def close(self) -> None:
if self._create_session and self._session is not None:
await self._session.close()
class HonConnectionHandler(HonBaseConnectionHandler):
def __init__(
self, email: str, password: str, session: Optional[aiohttp.ClientSession] = None
) -> None:
super().__init__(session=session)
self._device: HonDevice = HonDevice()
self._email: str = email
self._password: str = password
if not self._email:
raise HonAuthenticationError("An email address must be specified")
if not self._password:
raise HonAuthenticationError("A password address must be specified")
@property
def device(self) -> HonDevice:
return self._device
async def create(self) -> Self:
await super().create()
self._auth: HonAuth = HonAuth(
self._session, self._email, self._password, self._device
)
return self
async def _check_headers(self, headers: Dict) -> Dict:
if not (self._auth.cognito_token and self._auth.id_token):
await self._auth.authenticate()
headers["cognito-token"] = self._auth.cognito_token
headers["id-token"] = self._auth.id_token
return self._HEADERS | headers
@asynccontextmanager
async def _intercept(
self, method: Callable, *args, loop: int = 0, **kwargs
) -> AsyncIterator:
kwargs["headers"] = await self._check_headers(kwargs.get("headers", {}))
async with method(*args, **kwargs) as response:
if (
self._auth.token_expires_soon or response.status in [401, 403]
) and loop == 0:
_LOGGER.info("Try refreshing token...")
await self._auth.refresh()
async with self._intercept(
method, *args, loop=loop + 1, **kwargs
) as result:
yield result
elif (
self._auth.token_is_expired or response.status in [401, 403]
) and loop == 1:
_LOGGER.warning(
"%s - Error %s - %s",
response.request_info.url,
response.status,
await response.text(),
)
await self.create()
async with self._intercept(
method, *args, loop=loop + 1, **kwargs
) as result:
yield result
elif loop >= 2:
_LOGGER.error(
"%s - Error %s - %s",
response.request_info.url,
response.status,
await response.text(),
)
raise HonAuthenticationError("Login failure")
else:
try:
await response.json()
yield response
except json.JSONDecodeError:
_LOGGER.warning(
"%s - JsonDecodeError %s - %s",
response.request_info.url,
response.status,
await response.text(),
)
raise HonAuthenticationError("Decode Error")
class HonAnonymousConnectionHandler(HonBaseConnectionHandler):
_HEADERS: Dict = HonBaseConnectionHandler._HEADERS | {"x-api-key": const.API_KEY}
@asynccontextmanager
async def _intercept(
self, method: Callable, *args, loop: int = 0, **kwargs
) -> AsyncIterator:
kwargs["headers"] = kwargs.pop("headers", {}) | self._HEADERS
async with method(*args, **kwargs) as response:
if response.status == 403:
_LOGGER.error("Can't authenticate anymore")
yield response

View File

View File

@ -0,0 +1,21 @@
import logging
from collections.abc import AsyncIterator
from contextlib import asynccontextmanager
from typing import Callable, Dict
from pyhon import const
from pyhon.connection.handler.base import ConnectionHandler
_LOGGER = logging.getLogger(__name__)
class HonAnonymousConnectionHandler(ConnectionHandler):
_HEADERS: Dict = ConnectionHandler._HEADERS | {"x-api-key": const.API_KEY}
@asynccontextmanager
async def _intercept(self, method: Callable, *args, **kwargs) -> AsyncIterator:
kwargs["headers"] = kwargs.pop("headers", {}) | self._HEADERS
async with method(*args, **kwargs) as response:
if response.status == 403:
_LOGGER.error("Can't authenticate anymore")
yield response

View File

@ -0,0 +1,36 @@
import logging
from collections.abc import AsyncIterator
from contextlib import asynccontextmanager
from typing import Optional, Callable, List, Tuple
import aiohttp
from pyhon import const
from pyhon.connection.handler.base import ConnectionHandler
_LOGGER = logging.getLogger(__name__)
class HonAuthConnectionHandler(ConnectionHandler):
_HEADERS = {"user-agent": const.USER_AGENT}
def __init__(self, session: Optional[aiohttp.ClientSession] = None) -> None:
super().__init__(session)
self._called_urls: List[Tuple[int, str]] = []
@property
def called_urls(self) -> List[Tuple[int, str]]:
return self._called_urls
@called_urls.setter
def called_urls(self, called_urls: List[Tuple[int, str]]) -> None:
self._called_urls = called_urls
@asynccontextmanager
async def _intercept(
self, method: Callable, *args, loop: int = 0, **kwargs
) -> AsyncIterator:
kwargs["headers"] = kwargs.pop("headers", {}) | self._HEADERS
async with method(*args, **kwargs) as response:
self._called_urls.append((response.status, response.request_info.url))
yield response

View File

@ -0,0 +1,57 @@
import logging
from collections.abc import AsyncIterator
from contextlib import asynccontextmanager
from typing import Optional, Callable, Dict, Any
import aiohttp
from typing_extensions import Self
from pyhon import const, exceptions
_LOGGER = logging.getLogger(__name__)
class ConnectionHandler:
_HEADERS: Dict = {
"user-agent": const.USER_AGENT,
"Content-Type": "application/json",
}
def __init__(self, session: Optional[aiohttp.ClientSession] = None) -> None:
self._create_session: bool = session is None
self._session: Optional[aiohttp.ClientSession] = session
async def __aenter__(self) -> Self:
return await self.create()
async def __aexit__(self, exc_type, exc_val, exc_tb) -> None:
await self.close()
async def create(self) -> Self:
if self._create_session:
self._session = aiohttp.ClientSession()
return self
@asynccontextmanager
def _intercept(self, method: Callable, *args, loop: int = 0, **kwargs):
raise NotImplementedError
@asynccontextmanager
async def get(self, *args, **kwargs) -> AsyncIterator[aiohttp.ClientResponse]:
if self._session is None:
raise exceptions.NoSessionException()
response: aiohttp.ClientResponse
async with self._intercept(self._session.get, *args, **kwargs) as response:
yield response
@asynccontextmanager
async def post(self, *args, **kwargs) -> AsyncIterator[aiohttp.ClientResponse]:
if self._session is None:
raise exceptions.NoSessionException()
response: aiohttp.ClientResponse
async with self._intercept(self._session.post, *args, **kwargs) as response:
yield response
async def close(self) -> None:
if self._create_session and self._session is not None:
await self._session.close()

View File

@ -0,0 +1,102 @@
import json
import logging
from collections.abc import AsyncIterator
from contextlib import asynccontextmanager
from typing import Optional, Callable, Dict
import aiohttp
from typing_extensions import Self
from pyhon.connection.auth import HonAuth
from pyhon.connection.device import HonDevice
from pyhon.connection.handler.base import ConnectionHandler
from pyhon.exceptions import HonAuthenticationError, NoAuthenticationException
_LOGGER = logging.getLogger(__name__)
class HonConnectionHandler(ConnectionHandler):
def __init__(
self, email: str, password: str, session: Optional[aiohttp.ClientSession] = None
) -> None:
super().__init__(session=session)
self._device: HonDevice = HonDevice()
self._email: str = email
self._password: str = password
if not self._email:
raise HonAuthenticationError("An email address must be specified")
if not self._password:
raise HonAuthenticationError("A password address must be specified")
self._auth: Optional[HonAuth] = None
@property
def auth(self) -> HonAuth:
if self._auth is None:
raise NoAuthenticationException()
return self._auth
@property
def device(self) -> HonDevice:
return self._device
async def create(self) -> Self:
await super().create()
self._auth = HonAuth(self._session, self._email, self._password, self._device)
return self
async def _check_headers(self, headers: Dict) -> Dict:
if not (self.auth.cognito_token and self.auth.id_token):
await self.auth.authenticate()
headers["cognito-token"] = self.auth.cognito_token
headers["id-token"] = self.auth.id_token
return self._HEADERS | headers
@asynccontextmanager
async def _intercept(
self, method: Callable, *args, loop: int = 0, **kwargs
) -> AsyncIterator:
kwargs["headers"] = await self._check_headers(kwargs.get("headers", {}))
async with method(*args, **kwargs) as response:
if (
self.auth.token_expires_soon or response.status in [401, 403]
) and loop == 0:
_LOGGER.info("Try refreshing token...")
await self.auth.refresh()
async with self._intercept(
method, *args, loop=loop + 1, **kwargs
) as result:
yield result
elif (
self.auth.token_is_expired or response.status in [401, 403]
) and loop == 1:
_LOGGER.warning(
"%s - Error %s - %s",
response.request_info.url,
response.status,
await response.text(),
)
await self.create()
async with self._intercept(
method, *args, loop=loop + 1, **kwargs
) as result:
yield result
elif loop >= 2:
_LOGGER.error(
"%s - Error %s - %s",
response.request_info.url,
response.status,
await response.text(),
)
raise HonAuthenticationError("Login failure")
else:
try:
await response.json()
yield response
except json.JSONDecodeError:
_LOGGER.warning(
"%s - JsonDecodeError %s - %s",
response.request_info.url,
response.status,
await response.text(),
)
raise HonAuthenticationError("Decode Error")

View File

@ -1,5 +1,6 @@
import asyncio
from typing import List, Optional, Dict
import copy
from typing import List, Optional, Dict, Any
from typing_extensions import Self
from aiohttp import ClientSession
@ -39,8 +40,8 @@ class Hon:
def appliances(self) -> List[HonAppliance]:
return self._appliances
async def _create_appliance(self, appliance: Dict, zone=0) -> None:
appliance = HonAppliance(self._api, appliance, zone=zone)
async def _create_appliance(self, appliance_data: Dict[str, Any], zone=0) -> None:
appliance = HonAppliance(self._api, appliance_data, zone=zone)
if appliance.mac_address is None:
return
await asyncio.gather(
@ -53,9 +54,10 @@ class Hon:
self._appliances.append(appliance)
async def setup(self):
appliance: Dict
for appliance in (await self._api.load_appliances())["payload"]["appliances"]:
for zone in range(int(appliance.get("zone", "0"))):
await self._create_appliance(appliance, zone=zone + 1)
await self._create_appliance(appliance.copy(), zone=zone + 1)
await self._create_appliance(appliance)
async def close(self):

View File

@ -7,7 +7,7 @@ with open("README.md", "r") as f:
setup(
name="pyhOn",
version="0.8.0b2",
version="0.8.0b3",
author="Andre Basche",
description="Control hOn devices with python",
long_description=long_description,