Compare commits

..

2 Commits

Author SHA1 Message Date
e857fe91de Fix login issue 2023-04-10 20:32:35 +02:00
79c9678492 Fix float convertion 2023-04-10 18:48:46 +02:00
4 changed files with 46 additions and 29 deletions

View File

@ -2,7 +2,7 @@ name: Python check
on: on:
push: push:
branches: [ "main", "refactor" ] branches: [ "main" ]
pull_request: pull_request:
branches: [ "main" ] branches: [ "main" ]

View File

@ -2,6 +2,7 @@ import json
import logging import logging
import re import re
import secrets import secrets
import sys
import urllib import urllib
from pprint import pformat from pprint import pformat
from urllib import parse from urllib import parse
@ -10,7 +11,7 @@ from yarl import URL
from pyhon import const from pyhon import const
_LOGGER = logging.getLogger() _LOGGER = logging.getLogger(__name__)
class HonAuth: class HonAuth:
@ -56,19 +57,23 @@ class HonAuth:
params = "&".join([f"{k}={v}" for k, v in params.items()]) params = "&".join([f"{k}={v}" for k, v in params.items()])
async with self._session.get( async with self._session.get(
f"{const.AUTH_API}/services/oauth2/authorize/expid_Login?{params}" f"{const.AUTH_API}/services/oauth2/authorize/expid_Login?{params}"
) as resp: ) as response:
if not (login_url := re.findall("url = '(.+?)'", await resp.text())): _LOGGER.debug("%s - %s", response.status, response.request_info.url)
if not (login_url := re.findall("url = '(.+?)'", await response.text())):
return False return False
async with self._session.get(login_url[0], allow_redirects=False) as redirect1: async with self._session.get(login_url[0], allow_redirects=False) as redirect1:
_LOGGER.debug("%s - %s", redirect1.status, redirect1.request_info.url)
if not (url := redirect1.headers.get("Location")): if not (url := redirect1.headers.get("Location")):
return False return False
async with self._session.get(url, allow_redirects=False) as redirect2: async with self._session.get(url, allow_redirects=False) as redirect2:
_LOGGER.debug("%s - %s", redirect2.status, redirect2.request_info.url)
if not ( if not (
url := redirect2.headers.get("Location") url := redirect2.headers.get("Location")
+ "&System=IoT_Mobile_App&RegistrationSubChannel=hOn" + "&System=IoT_Mobile_App&RegistrationSubChannel=hOn"
): ):
return False return False
async with self._session.get(URL(url, encoded=True)) as login_screen: async with self._session.get(URL(url, encoded=True)) as login_screen:
_LOGGER.debug("%s - %s", login_screen.status, login_screen.request_info.url)
if context := re.findall( if context := re.findall(
'"fwuid":"(.*?)","loaded":(\\{.*?})', await login_screen.text() '"fwuid":"(.*?)","loaded":(\\{.*?})', await login_screen.text()
): ):
@ -117,6 +122,7 @@ class HonAuth:
data="&".join(f"{k}={json.dumps(v)}" for k, v in data.items()), data="&".join(f"{k}={json.dumps(v)}" for k, v in data.items()),
params=params, params=params,
) as response: ) as response:
_LOGGER.debug("%s - %s", response.status, response.request_info.url)
if response.status == 200: if response.status == 200:
try: try:
data = await response.json() data = await response.json()
@ -133,25 +139,31 @@ class HonAuth:
return "" return ""
async def _get_token(self, url): async def _get_token(self, url):
async with self._session.get(url) as resp: async with self._session.get(url) as response:
if resp.status != 200: _LOGGER.debug("%s - %s", response.status, response.request_info.url)
_LOGGER.error("Unable to get token: %s", resp.status) if response.status != 200:
_LOGGER.error("Unable to get token: %s", response.status)
return False return False
url = re.findall("href\\s*=\\s*[\"'](http.+?)[\"']", await resp.text()) url = re.findall("href\\s*=\\s*[\"'](.+?)[\"']", await response.text())
if not url: if not url:
_LOGGER.error("Can't get login url - \n%s", await resp.text()) _LOGGER.error("Can't get login url - \n%s", await response.text())
raise PermissionError raise PermissionError
async with self._session.get(url[0]) as resp: if "ProgressiveLogin" in url[0]:
if resp.status != 200: async with self._session.get(url[0]) as response:
_LOGGER.error("Unable to get token: %s", resp.status) _LOGGER.debug("%s - %s", response.status, response.request_info.url)
if response.status != 200:
_LOGGER.error("Unable to get token: %s", response.status)
return False
url = re.findall("href\\s*=\\s*[\"'](.*?)[\"']", await response.text())
url = "/".join(const.AUTH_API.split("/")[:-1]) + url[0]
async with self._session.get(url) as response:
_LOGGER.debug("%s - %s", response.status, response.request_info.url)
if response.status != 200:
_LOGGER.error(
"Unable to connect to the login service: %s", response.status
)
return False return False
url = re.findall("href\\s*=\\s*[\"'](.*?)[\"']", await resp.text()) text = await response.text()
url = "/".join(const.AUTH_API.split("/")[:-1]) + url[0]
async with self._session.get(url) as resp:
if resp.status != 200:
_LOGGER.error("Unable to connect to the login service: %s", resp.status)
return False
text = await resp.text()
if access_token := re.findall("access_token=(.*?)&", text): if access_token := re.findall("access_token=(.*?)&", text):
self._access_token = access_token[0] self._access_token = access_token[0]
if refresh_token := re.findall("refresh_token=(.*?)&", text): if refresh_token := re.findall("refresh_token=(.*?)&", text):
@ -174,11 +186,12 @@ class HonAuth:
data = self._device.get() data = self._device.get()
async with self._session.post( async with self._session.post(
f"{const.API_URL}/auth/v1/login", headers=post_headers, json=data f"{const.API_URL}/auth/v1/login", headers=post_headers, json=data
) as resp: ) as response:
_LOGGER.debug("%s - %s", response.status, response.request_info.url)
try: try:
json_data = await resp.json() json_data = await response.json()
except json.JSONDecodeError: except json.JSONDecodeError:
_LOGGER.error("No JSON Data after POST: %s", await resp.text()) _LOGGER.error("No JSON Data after POST: %s", await response.text())
return False return False
self._cognito_token = json_data["cognitoUser"]["Token"] self._cognito_token = json_data["cognitoUser"]["Token"]
return True return True
@ -191,10 +204,11 @@ class HonAuth:
} }
async with self._session.post( async with self._session.post(
f"{const.AUTH_API}/services/oauth2/token", params=params f"{const.AUTH_API}/services/oauth2/token", params=params
) as resp: ) as response:
if resp.status >= 400: _LOGGER.debug("%s - %s", response.status, response.request_info.url)
if response.status >= 400:
return False return False
data = await resp.json() data = await response.json()
self._id_token = data["id_token"] self._id_token = data["id_token"]
self._access_token = data["access_token"] self._access_token = data["access_token"]
return True return True

View File

@ -2,7 +2,10 @@ import re
def str_to_float(string): def str_to_float(string):
return float(string.replace(",", ".")) try:
return int(string)
except ValueError:
return float(str(string.replace(",", ".")))
class HonParameter: class HonParameter:

View File

@ -7,7 +7,7 @@ with open("README.md", "r") as f:
setup( setup(
name="pyhOn", name="pyhOn",
version="0.6.1", version="0.6.3",
author="Andre Basche", author="Andre Basche",
description="Control hOn devices with python", description="Control hOn devices with python",
long_description=long_description, long_description=long_description,