Skip to content
This repository was archived by the owner on Dec 25, 2025. It is now read-only.
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 23 additions & 15 deletions sharkiqpy/ayla_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,17 +16,23 @@
LOGIN_URL,
SHARK_APP_ID,
SHARK_APP_SECRET,
EU_DEVICE_URL,
EU_LOGIN_URL,
EU_SHARK_APP_ID,
EU_SHARK_APP_SECRET
)
from .exc import SharkIqAuthError, SharkIqAuthExpiringError, SharkIqNotAuthedError
from .sharkiq import SharkIqVacuum

_session = None


def get_ayla_api(username: str, password: str, websession: Optional[aiohttp.ClientSession] = None):
def get_ayla_api(username: str, password: str, websession: Optional[aiohttp.ClientSession] = None, europe: bool = False):
"""Get an AylaApi object"""
return AylaApi(username, password, SHARK_APP_ID, SHARK_APP_SECRET, websession=websession)

if europe:
return AylaApi(username, password, EU_SHARK_APP_ID, EU_SHARK_APP_SECRET, websession=websession, europe=europe)
else:
return AylaApi(username, password, SHARK_APP_ID, SHARK_APP_SECRET, websession=websession)

class AylaApi:
"""Simple Ayla Networks API wrapper"""
Expand All @@ -37,7 +43,8 @@ def __init__(
password: str,
app_id: str,
app_secret: str,
websession: Optional[aiohttp.ClientSession] = None):
websession: Optional[aiohttp.ClientSession] = None,
europe: bool = False):
self._email = email
self._password = password
self._access_token = None # type: Optional[str]
Expand All @@ -46,7 +53,8 @@ def __init__(
self._is_authed = False # type: bool
self._app_id = app_id
self._app_secret = app_secret
self.websession = websession
self.websession = websession,
self.europe = europe

def ensure_session(self) -> aiohttp.ClientSession:
"""Ensure that we have an aiohttp ClientSession"""
Expand Down Expand Up @@ -80,27 +88,27 @@ def _set_credentials(self, status_code: int, login_result: Dict):
def sign_in(self):
"""Authenticate to Ayla API synchronously."""
login_data = self._login_data
resp = requests.post(f"{LOGIN_URL:s}/users/sign_in.json", json=login_data)
resp = requests.post(f"{EU_LOGIN_URL if self.europe else LOGIN_URL:s}/users/sign_in.json", json=login_data)
self._set_credentials(resp.status_code, resp.json())

def refresh_auth(self):
"""Refresh the authentication synchronously"""
refresh_data = {"user": {"refresh_token": self._refresh_token}}
resp = requests.post(f"{LOGIN_URL:s}/users/refresh_token.json", json=refresh_data)
resp = requests.post(f"{EU_LOGIN_URL if self.europe else LOGIN_URL:s}/users/refresh_token.json", json=refresh_data)
self._set_credentials(resp.status_code, resp.json())

async def async_sign_in(self):
"""Authenticate to Ayla API synchronously."""
session = self.ensure_session()
login_data = self._login_data
async with session.post(f"{LOGIN_URL:s}/users/sign_in.json", json=login_data) as resp:
async with session.post(f"{EU_LOGIN_URL if self.europe else LOGIN_URL:s}/users/sign_in.json", json=login_data) as resp:
self._set_credentials(resp.status, await resp.json())

async def async_refresh_auth(self):
"""Refresh the authentication synchronously."""
session = self.ensure_session()
refresh_data = {"user": {"refresh_token": self._refresh_token}}
async with session.post(f"{LOGIN_URL:s}/users/refresh_token.json", json=refresh_data) as resp:
async with session.post(f"{EU_LOGIN_URL if self.europe else LOGIN_URL:s}/users/refresh_token.json", json=refresh_data) as resp:
self._set_credentials(resp.status, await resp.json())

@property
Expand All @@ -117,13 +125,13 @@ def _clear_auth(self):

def sign_out(self):
"""Sign out and invalidate the access token"""
requests.post(f"{LOGIN_URL:s}/users/sign_out.json", json=self.sign_out_data)
requests.post(f"{EU_LOGIN_URL if self.europe else LOGIN_URL:s}/users/sign_out.json", json=self.sign_out_data)
self._clear_auth()

async def async_sign_out(self):
"""Sign out and invalidate the access token"""
session = self.ensure_session()
async with session.post(f"{LOGIN_URL:s}/users/sign_out.json", json=self.sign_out_data) as _:
async with session.post(f"{EU_LOGIN_URL if self.europe else LOGIN_URL:s}/users/sign_out.json", json=self.sign_out_data) as _:
pass
self._clear_auth()

Expand Down Expand Up @@ -188,29 +196,29 @@ async def async_request(self, http_method: str, url: str, **kwargs):
return session.request(http_method, url, headers=headers, **kwargs)

def list_devices(self) -> List[Dict]:
resp = self.request("get", f"{DEVICE_URL:s}/apiv1/devices.json")
resp = self.request("get", f"{EU_DEVICE_URL if self.europe else DEVICE_URL:s}/apiv1/devices.json")
devices = resp.json()
if resp.status_code == 401:
raise SharkIqAuthError(devices["error"]["message"])
return [d["device"] for d in devices]

async def async_list_devices(self) -> List[Dict]:
async with await self.async_request("get", f"{DEVICE_URL:s}/apiv1/devices.json") as resp:
async with await self.async_request("get", f"{EU_DEVICE_URL if self.europe else DEVICE_URL:s}/apiv1/devices.json") as resp:
devices = await resp.json()
if resp.status == 401:
raise SharkIqAuthError(devices["error"]["message"])
return [d["device"] for d in devices]

def get_devices(self, update: bool = True) -> List[SharkIqVacuum]:
devices = [SharkIqVacuum(self, d) for d in self.list_devices()]
devices = [SharkIqVacuum(self, d, europe=self.europe) for d in self.list_devices()]
if update:
for device in devices:
device.get_metadata()
device.update()
return devices

async def async_get_devices(self, update: bool = True) -> List[SharkIqVacuum]:
devices = [SharkIqVacuum(self, d) for d in await self.async_list_devices()]
devices = [SharkIqVacuum(self, d, europe=self.europe) for d in await self.async_list_devices()]
if update:
for device in devices:
await device.async_get_metadata()
Expand Down
4 changes: 4 additions & 0 deletions sharkiqpy/const.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,8 @@
LOGIN_URL = "https://user-field.aylanetworks.com"
SHARK_APP_ID = "Shark-Android-field-id"
SHARK_APP_SECRET = "Shark-Android-field-Wv43MbdXRM297HUHotqe6lU1n-w"
EU_DEVICE_URL = "https://ads-eu.aylanetworks.com"
EU_LOGIN_URL = "https://user-field-eu.aylanetworks.com"
EU_SHARK_APP_ID = "Shark-Android-EUField-Fw-id"
EU_SHARK_APP_SECRET = "Shark-Android-EUField-s-zTykblGJujGcSSTaJaeE4PESI"

13 changes: 7 additions & 6 deletions sharkiqpy/sharkiq.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
from datetime import datetime
from pprint import pformat
from typing import Any, Dict, Iterable, List, Optional, Set, Union, TYPE_CHECKING
from .const import DEVICE_URL
from .const import DEVICE_URL, EU_DEVICE_URL
from .exc import SharkIqReadOnlyPropertyError

try:
Expand Down Expand Up @@ -99,7 +99,7 @@ def _clean_property_name(raw_property_name: str) -> str:
class SharkIqVacuum:
"""Shark IQ vacuum entity"""

def __init__(self, ayla_api: "AylaApi", device_dct: Dict):
def __init__(self, ayla_api: "AylaApi", device_dct: Dict, europe: bool = False):
self.ayla_api = ayla_api
self._dsn = device_dct['dsn']
self._key = device_dct['key']
Expand All @@ -109,6 +109,7 @@ def __init__(self, ayla_api: "AylaApi", device_dct: Dict):
self.properties_full = defaultdict(dict) # Using a defaultdict prevents errors before calling `update()`
self.property_values = SharkPropertiesView(self)
self._settable_properties = None # type: Optional[Set]
self.europe = europe

# Properties
self._name = device_dct['product_name']
Expand Down Expand Up @@ -137,7 +138,7 @@ def serial_number(self) -> str:
@property
def metadata_endpoint(self) -> str:
"""Endpoint for device metadata"""
return f'{DEVICE_URL:s}/apiv1/dsns/{self._dsn:s}/data.json'
return f'{EU_DEVICE_URL if self.europe else DEVICE_URL:s}/apiv1/dsns/{self._dsn:s}/data.json'

def _update_metadata(self, metadata: List[Dict]):
data = [d['datum'] for d in metadata if d.get('datum', {}).get('key', '') == 'sharkDeviceMobileData']
Expand All @@ -164,7 +165,7 @@ async def async_get_metadata(self):

def set_property_endpoint(self, property_name) -> str:
"""Get the API endpoint for a given property"""
return f'{DEVICE_URL:s}/apiv1/dsns/{self._dsn:s}/properties/{property_name:s}/datapoints.json'
return f'{EU_DEVICE_URL if self.europe else DEVICE_URL:s}/apiv1/dsns/{self._dsn:s}/properties/{property_name:s}/datapoints.json'

def get_property_value(self, property_name: PropertyName) -> Any:
"""Get the value of a property from the properties dictionary"""
Expand Down Expand Up @@ -202,7 +203,7 @@ async def async_set_property_value(self, property_name: PropertyName, value: Pro
@property
def update_url(self) -> str:
"""API endpoint to fetch updated device information"""
return f'{DEVICE_URL}/apiv1/dsns/{self.serial_number}/properties.json'
return f'{EU_DEVICE_URL if self.europe else DEVICE_URL}/apiv1/dsns/{self.serial_number}/properties.json'

def update(self, property_list: Optional[Iterable[str]] = None):
"""Update the known device state"""
Expand Down Expand Up @@ -297,7 +298,7 @@ def _get_file_property_endpoint(self, property_name: PropertyName) -> str:
property_id = self.properties_full[property_name]['key']
if self.properties_full[property_name].get('base_type') != 'file':
raise ValueError(f'{property_name} is not a file property')
return f'{DEVICE_URL:s}/apiv1/properties/{property_id:d}/datapoints.json'
return f'{EU_DEVICE_URL if self.europe else DEVICE_URL:s}/apiv1/properties/{property_id:d}/datapoints.json'

def get_file_property_url(self, property_name: PropertyName) -> Optional[str]:
"""File properties are versioned and need a special lookup"""
Expand Down