Compare commits

...

3 Commits

Author SHA1 Message Date
e857fe91de Fix login issue 2023-04-10 20:32:35 +02:00
79c9678492 Fix float convertion 2023-04-10 18:48:46 +02:00
7c49589944 Use float for range parameter 2023-04-10 16:59:10 +02:00
4 changed files with 54 additions and 33 deletions

View File

@ -2,7 +2,7 @@ name: Python check
on: on:
push: push:
branches: [ "main", "refactor" ] branches: [ "main" ]
pull_request: pull_request:
branches: [ "main" ] branches: [ "main" ]

View File

@ -2,6 +2,7 @@ import json
import logging import logging
import re import re
import secrets import secrets
import sys
import urllib import urllib
from pprint import pformat from pprint import pformat
from urllib import parse from urllib import parse
@ -10,7 +11,7 @@ from yarl import URL
from pyhon import const from pyhon import const
_LOGGER = logging.getLogger() _LOGGER = logging.getLogger(__name__)
class HonAuth: class HonAuth:
@ -56,19 +57,23 @@ class HonAuth:
params = "&".join([f"{k}={v}" for k, v in params.items()]) params = "&".join([f"{k}={v}" for k, v in params.items()])
async with self._session.get( async with self._session.get(
f"{const.AUTH_API}/services/oauth2/authorize/expid_Login?{params}" f"{const.AUTH_API}/services/oauth2/authorize/expid_Login?{params}"
) as resp: ) as response:
if not (login_url := re.findall("url = '(.+?)'", await resp.text())): _LOGGER.debug("%s - %s", response.status, response.request_info.url)
if not (login_url := re.findall("url = '(.+?)'", await response.text())):
return False return False
async with self._session.get(login_url[0], allow_redirects=False) as redirect1: async with self._session.get(login_url[0], allow_redirects=False) as redirect1:
_LOGGER.debug("%s - %s", redirect1.status, redirect1.request_info.url)
if not (url := redirect1.headers.get("Location")): if not (url := redirect1.headers.get("Location")):
return False return False
async with self._session.get(url, allow_redirects=False) as redirect2: async with self._session.get(url, allow_redirects=False) as redirect2:
_LOGGER.debug("%s - %s", redirect2.status, redirect2.request_info.url)
if not ( if not (
url := redirect2.headers.get("Location") url := redirect2.headers.get("Location")
+ "&System=IoT_Mobile_App&RegistrationSubChannel=hOn" + "&System=IoT_Mobile_App&RegistrationSubChannel=hOn"
): ):
return False return False
async with self._session.get(URL(url, encoded=True)) as login_screen: async with self._session.get(URL(url, encoded=True)) as login_screen:
_LOGGER.debug("%s - %s", login_screen.status, login_screen.request_info.url)
if context := re.findall( if context := re.findall(
'"fwuid":"(.*?)","loaded":(\\{.*?})', await login_screen.text() '"fwuid":"(.*?)","loaded":(\\{.*?})', await login_screen.text()
): ):
@ -117,6 +122,7 @@ class HonAuth:
data="&".join(f"{k}={json.dumps(v)}" for k, v in data.items()), data="&".join(f"{k}={json.dumps(v)}" for k, v in data.items()),
params=params, params=params,
) as response: ) as response:
_LOGGER.debug("%s - %s", response.status, response.request_info.url)
if response.status == 200: if response.status == 200:
try: try:
data = await response.json() data = await response.json()
@ -133,25 +139,31 @@ class HonAuth:
return "" return ""
async def _get_token(self, url): async def _get_token(self, url):
async with self._session.get(url) as resp: async with self._session.get(url) as response:
if resp.status != 200: _LOGGER.debug("%s - %s", response.status, response.request_info.url)
_LOGGER.error("Unable to get token: %s", resp.status) if response.status != 200:
_LOGGER.error("Unable to get token: %s", response.status)
return False return False
url = re.findall("href\\s*=\\s*[\"'](http.+?)[\"']", await resp.text()) url = re.findall("href\\s*=\\s*[\"'](.+?)[\"']", await response.text())
if not url: if not url:
_LOGGER.error("Can't get login url - \n%s", await resp.text()) _LOGGER.error("Can't get login url - \n%s", await response.text())
raise PermissionError raise PermissionError
async with self._session.get(url[0]) as resp: if "ProgressiveLogin" in url[0]:
if resp.status != 200: async with self._session.get(url[0]) as response:
_LOGGER.error("Unable to get token: %s", resp.status) _LOGGER.debug("%s - %s", response.status, response.request_info.url)
if response.status != 200:
_LOGGER.error("Unable to get token: %s", response.status)
return False return False
url = re.findall("href\\s*=\\s*[\"'](.*?)[\"']", await resp.text()) url = re.findall("href\\s*=\\s*[\"'](.*?)[\"']", await response.text())
url = "/".join(const.AUTH_API.split("/")[:-1]) + url[0] url = "/".join(const.AUTH_API.split("/")[:-1]) + url[0]
async with self._session.get(url) as resp: async with self._session.get(url) as response:
if resp.status != 200: _LOGGER.debug("%s - %s", response.status, response.request_info.url)
_LOGGER.error("Unable to connect to the login service: %s", resp.status) if response.status != 200:
_LOGGER.error(
"Unable to connect to the login service: %s", response.status
)
return False return False
text = await resp.text() text = await response.text()
if access_token := re.findall("access_token=(.*?)&", text): if access_token := re.findall("access_token=(.*?)&", text):
self._access_token = access_token[0] self._access_token = access_token[0]
if refresh_token := re.findall("refresh_token=(.*?)&", text): if refresh_token := re.findall("refresh_token=(.*?)&", text):
@ -174,11 +186,12 @@ class HonAuth:
data = self._device.get() data = self._device.get()
async with self._session.post( async with self._session.post(
f"{const.API_URL}/auth/v1/login", headers=post_headers, json=data f"{const.API_URL}/auth/v1/login", headers=post_headers, json=data
) as resp: ) as response:
_LOGGER.debug("%s - %s", response.status, response.request_info.url)
try: try:
json_data = await resp.json() json_data = await response.json()
except json.JSONDecodeError: except json.JSONDecodeError:
_LOGGER.error("No JSON Data after POST: %s", await resp.text()) _LOGGER.error("No JSON Data after POST: %s", await response.text())
return False return False
self._cognito_token = json_data["cognitoUser"]["Token"] self._cognito_token = json_data["cognitoUser"]["Token"]
return True return True
@ -191,10 +204,11 @@ class HonAuth:
} }
async with self._session.post( async with self._session.post(
f"{const.AUTH_API}/services/oauth2/token", params=params f"{const.AUTH_API}/services/oauth2/token", params=params
) as resp: ) as response:
if resp.status >= 400: _LOGGER.debug("%s - %s", response.status, response.request_info.url)
if response.status >= 400:
return False return False
data = await resp.json() data = await response.json()
self._id_token = data["id_token"] self._id_token = data["id_token"]
self._access_token = data["access_token"] self._access_token = data["access_token"]
return True return True

View File

@ -1,6 +1,13 @@
import re import re
def str_to_float(string):
try:
return int(string)
except ValueError:
return float(str(string.replace(",", ".")))
class HonParameter: class HonParameter:
def __init__(self, key, attributes): def __init__(self, key, attributes):
self._key = key self._key = key
@ -51,10 +58,10 @@ class HonParameterFixed(HonParameter):
class HonParameterRange(HonParameter): class HonParameterRange(HonParameter):
def __init__(self, key, attributes): def __init__(self, key, attributes):
super().__init__(key, attributes) super().__init__(key, attributes)
self._min = int(attributes["minimumValue"]) self._min = str_to_float(attributes["minimumValue"])
self._max = int(attributes["maximumValue"]) self._max = str_to_float(attributes["maximumValue"])
self._step = int(attributes["incrementValue"]) self._step = str_to_float(attributes["incrementValue"])
self._default = int(attributes.get("defaultValue", self._min)) self._default = str_to_float(attributes.get("defaultValue", self._min))
self._value = self._default self._value = self._default
def __repr__(self): def __repr__(self):
@ -78,7 +85,7 @@ class HonParameterRange(HonParameter):
@value.setter @value.setter
def value(self, value): def value(self, value):
value = int(value) value = str_to_float(value)
if self._min <= value <= self._max and not value % self._step: if self._min <= value <= self._max and not value % self._step:
self._value = value self._value = value
else: else:

View File

@ -7,7 +7,7 @@ with open("README.md", "r") as f:
setup( setup(
name="pyhOn", name="pyhOn",
version="0.6.0", version="0.6.3",
author="Andre Basche", author="Andre Basche",
description="Control hOn devices with python", description="Control hOn devices with python",
long_description=long_description, long_description=long_description,