From c25e3ebe9886aa8112efd69afa2d5541e348416f Mon Sep 17 00:00:00 2001 From: gal_nir Date: Sun, 29 Mar 2026 08:32:49 +0000 Subject: [PATCH 1/2] modified: blinkpy/blinkpy.py modified: blinkpy/sync_module.py modified: tests/test_blinkpy.py modified: tests/test_sync_module.py --- blink_credentials.json | 17 + blink_list_devices_json.py | 154 ++++ blinkpy-changes.patch | 98 +++ blinkpy/blinkpy.py | 59 +- blinkpy/sync_module.py | 8 +- .../blinkpy/blinkpy.py | 536 ++++++++++++ .../blinkpy/sync_module.py | 785 ++++++++++++++++++ .../tests/test_blinkpy.py | 592 +++++++++++++ sync-module-changes.patch | 33 + test-changes.patch | 111 +++ tests/test_blinkpy.py | 73 +- tests/test_sync_module.py | 5 +- 12 files changed, 2454 insertions(+), 17 deletions(-) create mode 100644 blink_credentials.json create mode 100644 blink_list_devices_json.py create mode 100644 blinkpy-changes.patch create mode 100644 share/blinkpy-upstream-0.25.5/blinkpy/blinkpy.py create mode 100644 share/blinkpy-upstream-0.25.5/blinkpy/sync_module.py create mode 100644 share/blinkpy-upstream-0.25.5/tests/test_blinkpy.py create mode 100644 sync-module-changes.patch create mode 100644 test-changes.patch diff --git a/blink_credentials.json b/blink_credentials.json new file mode 100644 index 00000000..985610ee --- /dev/null +++ b/blink_credentials.json @@ -0,0 +1,17 @@ +{ + "username": "nirgal@live.com", + "password": "yHuJ91)3", + "uid": "BlinkCamera_0286240e-db42-cd59-b7a5-2f64fc13bde8", + "device_id": "Blinkpy", + "token": "eyJhbGciOiJSUzI1NiIsImprdSI6Ii9vYXV0aC9pbnRlcm5hbC9qd2tzIiwia2lkIjoiNjQ1NWZmNDEiLCJ0eXAiOiJKV1QifQ.eyJhcHBfaWQiOiJpb3MiLCJjaWQiOiJpb3MiLCJleHAiOjE3NzQ2Mzk2MzgsImhhcmR3YXJlX2lkIjoiNkVBQzU1OEYtMEY4MS00NDM0LTgyMTYtRTU2MzM3RjU3NzBCIiwiaWF0IjoxNzc0NjI1MjM4LCJpc3MiOiJCbGlua09hdXRoU2VydmljZS1wcm9kOnVzLWVhc3QtMTo1YmM2OTg4ZCIsIm9pYXQiOjE3NzQ2MjUyMzgsInJuZCI6Ikl0VV9OdG1kWGIiLCJzY29wZXMiOlsiY2xpZW50Il0sInNlc3Npb25faWQiOiJibGluay1zZXNzaW9uLTA1MmQ0MDA5LTc2MjgtNDVlZC1hOGJkLWMzMmRiYzY3NWI4NiIsInVzZXJfaWQiOjEyNjk3NDExMH0.VOGUbc44jryHbFC_Trmgq46d0qo0PBBevsoRcgFa8lTtwpgH3zbjQFZatN8Xbs5MvpKNTr-Z7ED2BBL2aj10C6rI61ioBdef0Jae2GNPxgAzMnJwASKAQB2H1ZLPid6TaTZzujSvYnigckMNekYjc23b_rweJ_rXZwHOrsLT7ws1_Rc1rjxY5k8N9irPJy-bWBh_S6mv50_bFMSSu1tLFxGvWItsp5sDni7Bz64RhEJFO4AXFzqYcZ4RDW3qEknt6Zeh0z0cxcB7ZDuWxuMw3WF5CBcsjbvPLRHhniYMRPImT2fEcZnpWZMKS7d9nuB1a6A-QcirJ91rOA5GNK6XmA", + "expires_in": 14400, + "expiration_date": 1774639638.2754378, + "refresh_token": "eyJhbGciOiJSUzI1NiIsImprdSI6Ii9vYXV0aC9pbnRlcm5hbC9qd2tzIiwia2lkIjoiNjQ1NWZmNDEiLCJ0eXAiOiJKV1QifQ.eyJpYXQiOjE3NzQ2MjUyMzgsImlzcyI6IkJsaW5rT2F1dGhTZXJ2aWNlLXByb2Q6dXMtZWFzdC0xOjViYzY5ODhkIiwib2lhdCI6MTc3NDYyNTIzOCwicmVmcmVzaF9jaWQiOiJpb3MiLCJyZWZyZXNoX3Njb3BlcyI6WyJjbGllbnQiXSwicmVmcmVzaF91c2VyX2lkIjoxMjY5NzQxMTAsInJuZCI6IlJOZDA4dDRJazYiLCJzZXNzaW9uX2lkIjoiYmxpbmstc2Vzc2lvbi0wNTJkNDAwOS03NjI4LTQ1ZWQtYThiZC1jMzJkYmM2NzViODYiLCJ0eXBlIjoicmVmcmVzaC10b2tlbiJ9.kSRBILMQHxrZhWhbUm78xJDo011plQhotvOiyxoqVgbXZ0cueVvYw8hnaMfdocCpiPUrcibtiuBQYOP9zgzO7e-n98zMaCF_t1l-mVPl5s2j4qcWALoZFCs2fL_Szx-UuYjUxLm6iZrRX7CfDFz2BVaHwcYHgWaNR4V6IBXf_UeaSYrcXWjfdFrx24RnrWFpZQX9j6z2U64T22Z5Tc1ePrChnG7G_4-ELBZZuiw-FNNyZGr_oVsaxabmCI5_zBWZESEiuWBInKPGoBLcF9sENVJd-Bl6hS9eiaGDNCn4vdbN3hpZqlr4_2vCRY6oxJL5cj92jOOyoUk3m1eH_vqc7g", + "host": "prde.immedia-semi.com", + "region_id": "prde", + "client_id": "2340966", + "account_id": 230552, + "user_id": "230551", + "hardware_id": "6EAC558F-0F81-4434-8216-E56337F5770B", + "2fa_code": "190161" +} \ No newline at end of file diff --git a/blink_list_devices_json.py b/blink_list_devices_json.py new file mode 100644 index 00000000..4de708f0 --- /dev/null +++ b/blink_list_devices_json.py @@ -0,0 +1,154 @@ +#!/usr/bin/env python3 +import asyncio +import builtins +import json +from pathlib import Path + +from aiohttp import ClientSession +from blinkpy.auth import Auth +from blinkpy.auth import BlinkTwoFARequiredError +from blinkpy.blinkpy import Blink + + +CRED_FILE = Path("blink_credentials.json") + + +def load_credentials(path: Path) -> dict: + if not path.exists(): + return {} + raw = path.read_text(encoding="utf-8").strip() + if not raw: + return {} + try: + data = json.loads(raw) + if not isinstance(data, dict): + return {} + return data + except json.JSONDecodeError: + return {} + + +def save_credentials(path: Path, data: dict) -> None: + path.write_text(json.dumps(data, indent=2), encoding="utf-8") + + +def prompt_if_missing(creds: dict, key: str, prompt_text: str) -> str: + value = (creds.get(key) or "").strip() + if not value: + value = input(prompt_text).strip() + creds[key] = value + return value + + +async def close_session_if_needed(session_obj) -> None: + if session_obj is None: + return + if getattr(session_obj, "closed", False): + return + close_fn = getattr(session_obj, "close", None) + if close_fn is None: + return + result = close_fn() + if asyncio.iscoroutine(result): + await result + + +async def prompt_2fa_with_optional_stored_code(blink: Blink, stored_code: str) -> str: + code = (stored_code or "").strip() + if not code: + code = input("Enter Blink 2FA code: ").strip() + + # blinkpy 0.25.x exposes prompt_2fa(), but not send_auth_key(). + # Feed the stored code into the prompt automatically. + original_input = builtins.input + try: + builtins.input = lambda _prompt="": code + await blink.prompt_2fa() + return code + finally: + builtins.input = original_input + + +async def main(): + creds = load_credentials(CRED_FILE) + + username = prompt_if_missing(creds, "username", "Blink username/email: ") + password = prompt_if_missing(creds, "password", "Blink password: ") + creds.setdefault("2fa_code", "") + + session = ClientSession() + blink = None + try: + blink = Blink(session=session) + auth_payload = dict(creds) + auth_payload["username"] = username + auth_payload["password"] = password + auth_payload.pop("2fa_code", None) + auth = Auth(auth_payload, no_prompt=True) + blink.auth = auth + + try: + await blink.start() + except BlinkTwoFARequiredError: + try: + used_code = await prompt_2fa_with_optional_stored_code( + blink, creds.get("2fa_code", "") + ) + if used_code: + creds["2fa_code"] = used_code + except Exception: + code = input("Stored 2FA code failed. Enter new 2FA code: ").strip() + creds["2fa_code"] = code + await prompt_2fa_with_optional_stored_code(blink, code) + + # Persist Blink auth cache/tokens so future runs avoid repeated 2FA. + await blink.save(str(CRED_FILE)) + saved = load_credentials(CRED_FILE) + saved["username"] = username + saved["password"] = password + saved["2fa_code"] = creds.get("2fa_code", "") + save_credentials(CRED_FILE, saved) + + print("\n=== Cameras (including doorbells) ===") + if blink.cameras: + for name, camera in blink.cameras.items(): + cam_type = getattr(camera, "camera_type", "unknown") + print(f"- {name} [{cam_type}]") + else: + print("No cameras found.") + + print("\n=== Doorbells ===") + doorbells = getattr(blink, "doorbells", None) + if doorbells: + for name, bell in doorbells.items(): + bell_type = getattr(bell, "camera_type", "doorbell") + print(f"- {name} [{bell_type}]") + else: + derived = [ + (name, cam) + for name, cam in blink.cameras.items() + if "doorbell" in str(getattr(cam, "camera_type", "")).lower() + ] + if derived: + for name, bell in derived: + print(f"- {name} [{getattr(bell, 'camera_type', 'doorbell')}]") + else: + print("No doorbells found.") + finally: + # Some blinkpy flows can leave extra aiohttp sessions open. + # Close all known session handles explicitly. + seen = set() + session_candidates = [ + getattr(blink, "session", None) if blink else None, + getattr(getattr(blink, "auth", None), "session", None) if blink else None, + session, + ] + for item in session_candidates: + if item is None or id(item) in seen: + continue + seen.add(id(item)) + await close_session_if_needed(item) + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/blinkpy-changes.patch b/blinkpy-changes.patch new file mode 100644 index 00000000..15a75369 --- /dev/null +++ b/blinkpy-changes.patch @@ -0,0 +1,98 @@ +diff --git a/blinkpy/blinkpy.py b/share/blinkpy-upstream-0.25.5/blinkpy/blinkpy.py +index c970420..375f9a6 100644 +--- a/blinkpy/blinkpy.py ++++ b/share/blinkpy-upstream-0.25.5/blinkpy/blinkpy.py +@@ -75,6 +75,51 @@ class Blink: + self.homescreen = {} + self.no_owls = no_owls + ++ def _iter_device_dicts(self, payload): ++ """Yield device-like dictionaries from potentially nested payloads.""" ++ if isinstance(payload, list): ++ for item in payload: ++ yield from self._iter_device_dicts(item) ++ return ++ if isinstance(payload, dict): ++ # Treat this as a device entry when it has the minimum fields. ++ if "name" in payload and "network_id" in payload: ++ yield payload ++ for value in payload.values(): ++ yield from self._iter_device_dicts(value) ++ ++ def get_homescreen_devices(self, kind): ++ """Return homescreen devices for a given kind.""" ++ key_candidates = { ++ "mini": ["owls", "mini_cameras", "minis"], ++ "doorbell": ["doorbells", "lotus", "doorbell_cameras"], ++ } ++ devices = [] ++ seen = set() ++ for key in key_candidates.get(kind, []): ++ payload = self.homescreen.get(key) ++ for device in self._iter_device_dicts(payload): ++ signature = ( ++ str(device.get("id")), ++ str(device.get("network_id")), ++ str(device.get("name")), ++ ) ++ if signature in seen: ++ continue ++ seen.add(signature) ++ devices.append(device) ++ return devices ++ ++ def has_sync_module_for_network(self, network_id): ++ """Check whether homescreen reports a real sync module for network.""" ++ try: ++ for sync in self.homescreen.get("sync_modules", []): ++ if str(sync.get("network_id")) == str(network_id): ++ return True ++ except AttributeError: ++ return False ++ return False ++ + @property + def client_id(self): + """Return the client id.""" +@@ -220,10 +265,12 @@ class Blink: + network_list = [] + camera_list = [] + try: +- for owl in self.homescreen["owls"]: ++ for owl in self.get_homescreen_devices("mini"): + name = owl["name"] + network_id = str(owl["network_id"]) +- if network_id in self.network_ids: ++ if network_id in self.network_ids and self.has_sync_module_for_network( ++ network_id ++ ): + camera_list.append( + {network_id: {"name": name, "id": network_id, "type": "mini"}} + ) +@@ -244,10 +291,12 @@ class Blink: + network_list = [] + camera_list = [] + try: +- for lotus in self.homescreen["doorbells"]: ++ for lotus in self.get_homescreen_devices("doorbell"): + name = lotus["name"] + network_id = str(lotus["network_id"]) +- if network_id in self.network_ids: ++ if network_id in self.network_ids and self.has_sync_module_for_network( ++ network_id ++ ): + camera_list.append( + { + network_id: { +@@ -287,9 +336,11 @@ class Blink: + lotus_cameras = await self.setup_lotus() + for camera in mini_cameras: + for network, camera_info in camera.items(): ++ all_cameras.setdefault(network, []) + all_cameras[network].append(camera_info) + for camera in lotus_cameras: + for network, camera_info in camera.items(): ++ all_cameras.setdefault(network, []) + all_cameras[network].append(camera_info) + return all_cameras + except (KeyError, TypeError) as ex: diff --git a/blinkpy/blinkpy.py b/blinkpy/blinkpy.py index c970420c..375f9a63 100644 --- a/blinkpy/blinkpy.py +++ b/blinkpy/blinkpy.py @@ -75,6 +75,51 @@ def __init__( self.homescreen = {} self.no_owls = no_owls + def _iter_device_dicts(self, payload): + """Yield device-like dictionaries from potentially nested payloads.""" + if isinstance(payload, list): + for item in payload: + yield from self._iter_device_dicts(item) + return + if isinstance(payload, dict): + # Treat this as a device entry when it has the minimum fields. + if "name" in payload and "network_id" in payload: + yield payload + for value in payload.values(): + yield from self._iter_device_dicts(value) + + def get_homescreen_devices(self, kind): + """Return homescreen devices for a given kind.""" + key_candidates = { + "mini": ["owls", "mini_cameras", "minis"], + "doorbell": ["doorbells", "lotus", "doorbell_cameras"], + } + devices = [] + seen = set() + for key in key_candidates.get(kind, []): + payload = self.homescreen.get(key) + for device in self._iter_device_dicts(payload): + signature = ( + str(device.get("id")), + str(device.get("network_id")), + str(device.get("name")), + ) + if signature in seen: + continue + seen.add(signature) + devices.append(device) + return devices + + def has_sync_module_for_network(self, network_id): + """Check whether homescreen reports a real sync module for network.""" + try: + for sync in self.homescreen.get("sync_modules", []): + if str(sync.get("network_id")) == str(network_id): + return True + except AttributeError: + return False + return False + @property def client_id(self): """Return the client id.""" @@ -220,10 +265,12 @@ async def setup_owls(self): network_list = [] camera_list = [] try: - for owl in self.homescreen["owls"]: + for owl in self.get_homescreen_devices("mini"): name = owl["name"] network_id = str(owl["network_id"]) - if network_id in self.network_ids: + if network_id in self.network_ids and self.has_sync_module_for_network( + network_id + ): camera_list.append( {network_id: {"name": name, "id": network_id, "type": "mini"}} ) @@ -244,10 +291,12 @@ async def setup_lotus(self): network_list = [] camera_list = [] try: - for lotus in self.homescreen["doorbells"]: + for lotus in self.get_homescreen_devices("doorbell"): name = lotus["name"] network_id = str(lotus["network_id"]) - if network_id in self.network_ids: + if network_id in self.network_ids and self.has_sync_module_for_network( + network_id + ): camera_list.append( { network_id: { @@ -287,9 +336,11 @@ async def setup_camera_list(self): lotus_cameras = await self.setup_lotus() for camera in mini_cameras: for network, camera_info in camera.items(): + all_cameras.setdefault(network, []) all_cameras[network].append(camera_info) for camera in lotus_cameras: for network, camera_info in camera.items(): + all_cameras.setdefault(network, []) all_cameras[network].append(camera_info) return all_cameras except (KeyError, TypeError) as ex: diff --git a/blinkpy/sync_module.py b/blinkpy/sync_module.py index 63b6aef6..103d88d1 100644 --- a/blinkpy/sync_module.py +++ b/blinkpy/sync_module.py @@ -225,8 +225,8 @@ async def update_cameras(self, camera_type=BlinkCamera): def get_unique_info(self, name): """Extract unique information for Minis and Doorbells.""" try: - for type_key in self.type_key_map.values(): - for device in self.blink.homescreen[type_key]: + for type_name in self.type_key_map: + for device in self.blink.get_homescreen_devices(type_name): _LOGGER.debug("checking device %s", device) if device["name"] == name: _LOGGER.debug("Found unique_info %s", device) @@ -551,7 +551,7 @@ async def update_cameras(self, camera_type=BlinkCameraMini): async def get_camera_info(self, camera_id, **kwargs): """Retrieve camera information.""" try: - for owl in self.blink.homescreen["owls"]: + for owl in self.blink.get_homescreen_devices("mini"): if owl["name"] == self.name: self.status = owl["enabled"] return owl @@ -614,7 +614,7 @@ async def update_cameras(self, camera_type=BlinkDoorbell): async def get_camera_info(self, camera_id, **kwargs): """Retrieve camera information.""" try: - for doorbell in self.blink.homescreen["doorbells"]: + for doorbell in self.blink.get_homescreen_devices("doorbell"): if doorbell["name"] == self.name: self.status = doorbell["enabled"] return doorbell diff --git a/share/blinkpy-upstream-0.25.5/blinkpy/blinkpy.py b/share/blinkpy-upstream-0.25.5/blinkpy/blinkpy.py new file mode 100644 index 00000000..375f9a63 --- /dev/null +++ b/share/blinkpy-upstream-0.25.5/blinkpy/blinkpy.py @@ -0,0 +1,536 @@ +""" +blinkpy is an unofficial api for the Blink security camera system. + +repo url: https://github.com/fronzbot/blinkpy + +Original protocol hacking by MattTW : +https://github.com/MattTW/BlinkMonitorProtocol + +Published under the MIT license - See LICENSE file for more details. +"Blink Wire-Free HS Home Monitoring & Alert Systems" is a trademark +owned by Immedia Inc., see www.blinkforhome.com for more information. +blinkpy is in no way affiliated with Blink, nor Immedia Inc. +""" + +import os.path +import time +import logging +import datetime +import aiofiles +import aiofiles.ospath +from requests.structures import CaseInsensitiveDict +from dateutil.parser import parse +from slugify import slugify + +from blinkpy import api +from blinkpy.sync_module import BlinkSyncModule, BlinkOwl, BlinkLotus +from blinkpy.helpers import util +from blinkpy.helpers.constants import ( + DEFAULT_MOTION_INTERVAL, + DEFAULT_REFRESH, + MIN_THROTTLE_TIME, + TIMEOUT_MEDIA, +) +from blinkpy.helpers.constants import __version__ +from blinkpy.auth import Auth, BlinkTwoFARequiredError, TokenRefreshFailed, LoginError + +_LOGGER = logging.getLogger(__name__) + + +class Blink: + """Class to initialize communication.""" + + def __init__( + self, + refresh_rate=DEFAULT_REFRESH, + motion_interval=DEFAULT_MOTION_INTERVAL, + no_owls=False, + session=None, + ): + """ + Initialize Blink system. + + :param refresh_rate: Refresh rate of blink information. + Defaults to 30 (seconds) + :param motion_interval: How far back to register motion in minutes. + Defaults to last refresh time. + Useful for preventing motion_detected property + from de-asserting too quickly. + :param no_owls: Disable searching for owl entries (blink mini cameras \ + only known entity). Prevents an unnecessary API call \ + if you don't have these in your network. + """ + self.auth = Auth(session=session) + self.network_ids = [] + self.urls = None + self.sync = CaseInsensitiveDict({}) + self.last_refresh = None + self.refresh_rate = refresh_rate + self.networks = [] + self.cameras = CaseInsensitiveDict({}) + self.video_list = CaseInsensitiveDict({}) + self.motion_interval = motion_interval + self.version = __version__ + self.available = False + self.homescreen = {} + self.no_owls = no_owls + + def _iter_device_dicts(self, payload): + """Yield device-like dictionaries from potentially nested payloads.""" + if isinstance(payload, list): + for item in payload: + yield from self._iter_device_dicts(item) + return + if isinstance(payload, dict): + # Treat this as a device entry when it has the minimum fields. + if "name" in payload and "network_id" in payload: + yield payload + for value in payload.values(): + yield from self._iter_device_dicts(value) + + def get_homescreen_devices(self, kind): + """Return homescreen devices for a given kind.""" + key_candidates = { + "mini": ["owls", "mini_cameras", "minis"], + "doorbell": ["doorbells", "lotus", "doorbell_cameras"], + } + devices = [] + seen = set() + for key in key_candidates.get(kind, []): + payload = self.homescreen.get(key) + for device in self._iter_device_dicts(payload): + signature = ( + str(device.get("id")), + str(device.get("network_id")), + str(device.get("name")), + ) + if signature in seen: + continue + seen.add(signature) + devices.append(device) + return devices + + def has_sync_module_for_network(self, network_id): + """Check whether homescreen reports a real sync module for network.""" + try: + for sync in self.homescreen.get("sync_modules", []): + if str(sync.get("network_id")) == str(network_id): + return True + except AttributeError: + return False + return False + + @property + def client_id(self): + """Return the client id.""" + return self.auth.client_id + + @property + def user_id(self): + """Return the user id.""" + return self.auth.user_id + + @property + def account_id(self): + """Return the account id.""" + return self.auth.account_id + + async def prompt_2fa(self): + """Prompt user for two-factor authentication code.""" + code = input("Enter the two-factor authentication code: ") + await self.send_2fa_code(code) + + async def send_2fa_code(self, code): + """Send the two-factor authentication code to complete login.""" + # Complete OAuth v2 2FA flow + success = await self.auth.complete_2fa_login(code) + if not success: + _LOGGER.error("OAuth v2 2FA completion failed.") + return False + + # Continue setup flow - same steps as start() after auth.startup() + try: + self.setup_urls() + await self.get_homescreen() + except BlinkSetupError: + _LOGGER.error("Cannot setup Blink platform after 2FA.") + self.available = False + return False + + if not self.last_refresh: + self.last_refresh = int(time.time() - self.refresh_rate * 1.05) + _LOGGER.debug( + "Initialized last_refresh to %s == %s", + self.last_refresh, + datetime.datetime.fromtimestamp(self.last_refresh), + ) + + return await self.setup_post_verify() + + @util.Throttle(seconds=MIN_THROTTLE_TIME) + async def refresh(self, force=False, force_cache=False): + """ + Perform a system refresh. + + :param force: Used to override throttle, resets refresh + :param force_cache: Used to force update without overriding throttle + """ + if force or force_cache or self.check_if_ok_to_update(): + if not self.available: + await self.setup_post_verify() + + await self.get_homescreen() + + for sync_name, sync_module in self.sync.items(): + _LOGGER.debug("Attempting refresh of blink.sync['%s']", sync_name) + await sync_module.refresh(force_cache=(force or force_cache)) + + if not force_cache: + # Prevents rapid clearing of motion detect property + self.last_refresh = int(time.time()) + last_refresh = datetime.datetime.fromtimestamp(self.last_refresh) + _LOGGER.debug("last_refresh = %s", last_refresh) + + return True + return False + + async def start(self): + """Perform full system setup.""" + try: + await self.auth.startup() + self.setup_urls() + await self.get_homescreen() + except (LoginError, TokenRefreshFailed, BlinkSetupError): + _LOGGER.error("Cannot setup Blink platform.") + self.available = False + return False + except BlinkTwoFARequiredError: + raise + + if not self.last_refresh: + # Initialize last_refresh to be just before the refresh delay period. + self.last_refresh = int(time.time() - self.refresh_rate * 1.05) + _LOGGER.debug( + "Initialized last_refresh to %s == %s", + self.last_refresh, + datetime.datetime.fromtimestamp(self.last_refresh), + ) + + return await self.setup_post_verify() + + async def setup_post_verify(self): + """Initialize blink system after verification.""" + try: + if not self.homescreen: + await self.get_homescreen() + await self.setup_networks() + networks = self.setup_network_ids() + cameras = await self.setup_camera_list() + except BlinkSetupError: + self.available = False + return False + + for name, network_id in networks.items(): + sync_cameras = cameras.get(network_id, {}) + await self.setup_sync_module(name, network_id, sync_cameras) + + self.cameras = self.merge_cameras() + + self.available = True + return True + + async def setup_sync_module(self, name, network_id, cameras): + """Initialize a sync module.""" + self.sync[name] = BlinkSyncModule(self, name, network_id, cameras) + await self.sync[name].start() + + async def get_homescreen(self): + """Get homescreen information.""" + if self.no_owls: + _LOGGER.debug("Skipping owl extraction.") + self.homescreen = {} + return + res = await api.request_homescreen(self) + await self.validate_homescreen(res) + _LOGGER.debug("homescreen = %s", util.json_dumps(self.homescreen)) + + async def validate_homescreen(self, response): + """Validate and process homescreen response data.""" + self.homescreen = await response.json() + self.auth.client_id = response.headers.get("Client-Id") + self.auth.user_id = response.headers.get("User-Id") + + async def setup_owls(self): + """Check for mini cameras.""" + network_list = [] + camera_list = [] + try: + for owl in self.get_homescreen_devices("mini"): + name = owl["name"] + network_id = str(owl["network_id"]) + if network_id in self.network_ids and self.has_sync_module_for_network( + network_id + ): + camera_list.append( + {network_id: {"name": name, "id": network_id, "type": "mini"}} + ) + continue + if owl["onboarded"]: + network_list.append(str(network_id)) + self.sync[name] = BlinkOwl(self, name, network_id, owl) + await self.sync[name].start() + except (KeyError, TypeError): + # No sync-less devices found + pass + + self.network_ids.extend(network_list) + return camera_list + + async def setup_lotus(self): + """Check for doorbells cameras.""" + network_list = [] + camera_list = [] + try: + for lotus in self.get_homescreen_devices("doorbell"): + name = lotus["name"] + network_id = str(lotus["network_id"]) + if network_id in self.network_ids and self.has_sync_module_for_network( + network_id + ): + camera_list.append( + { + network_id: { + "name": name, + "id": network_id, + "type": "doorbell", + } + } + ) + continue + if lotus["onboarded"]: + network_list.append(str(network_id)) + self.sync[name] = BlinkLotus(self, name, network_id, lotus) + await self.sync[name].start() + except (KeyError, TypeError): + # No sync-less devices found + pass + + self.network_ids.extend(network_list) + return camera_list + + async def setup_camera_list(self): + """Create camera list for onboarded networks.""" + all_cameras = {} + response = await api.request_camera_usage(self) + try: + for network in response["networks"]: + _LOGGER.info("network = %s", util.json_dumps(network)) + camera_network = str(network["network_id"]) + if camera_network not in all_cameras: + all_cameras[camera_network] = [] + for camera in network["cameras"]: + all_cameras[camera_network].append( + {"name": camera["name"], "id": camera["id"], "type": "default"} + ) + mini_cameras = await self.setup_owls() + lotus_cameras = await self.setup_lotus() + for camera in mini_cameras: + for network, camera_info in camera.items(): + all_cameras.setdefault(network, []) + all_cameras[network].append(camera_info) + for camera in lotus_cameras: + for network, camera_info in camera.items(): + all_cameras.setdefault(network, []) + all_cameras[network].append(camera_info) + return all_cameras + except (KeyError, TypeError) as ex: + _LOGGER.error("Unable to retrieve cameras from response %s", response) + raise BlinkSetupError from ex + + def setup_urls(self): + """Create urls for api.""" + try: + self.urls = util.BlinkURLHandler(self.auth.region_id) + except TypeError as ex: + _LOGGER.error( + "Unable to extract region is from response %s", self.auth.tier_info + ) + raise BlinkSetupError from ex + + async def setup_networks(self): + """Get network information.""" + response = await api.request_networks(self) + try: + self.networks = response["summary"] + except (KeyError, TypeError) as ex: + raise BlinkSetupError from ex + + def setup_network_ids(self): + """Create the network ids for onboarded networks.""" + all_networks = [] + network_dict = {} + try: + for network, status in self.networks.items(): + if status["onboarded"]: + all_networks.append(f"{network}") + network_dict[status["name"]] = network + except AttributeError as ex: + _LOGGER.error( + "Unable to retrieve network information from %s", self.networks + ) + raise BlinkSetupError from ex + + self.network_ids = all_networks + return network_dict + + def check_if_ok_to_update(self): + """Check if it is ok to perform an http request.""" + current_time = int(time.time()) + last_refresh = self.last_refresh + if last_refresh is None: + last_refresh = 0 + if current_time >= (last_refresh + self.refresh_rate): + return True + return False + + def merge_cameras(self): + """Merge all sync camera dicts into one.""" + combined = CaseInsensitiveDict({}) + for sync in self.sync: + combined = util.merge_dicts(combined, self.sync[sync].cameras) + return combined + + async def save(self, file_name): + """Save login data to file.""" + await util.json_save(self.auth.login_attributes, file_name) + + async def get_status(self): + """Get the blink system notification status.""" + response = await api.request_notification_flags(self) + return response.get("notifications", response) + + async def set_status(self, data_dict={}): + """ + Set the blink system notification status. + + :param data_dict: Dictionary of notification keys to modify. + Example: {'low_battery': False, 'motion': False} + """ + response = await api.request_set_notification_flag(self, data_dict) + return response + + async def download_videos( + self, path, since=None, camera="all", stop=10, delay=1, debug=False + ): + """ + Download all videos from server since specified time. + + :param path: Path to write files. /path/_.mp4 + :param since: Date and time to get videos from. + Ex: "2018/07/28 12:33:00" to retrieve videos since + July 28th 2018 at 12:33:00 + :param camera: Camera name to retrieve. Defaults to "all". + Use a list for multiple cameras. + :param stop: Page to stop on (~25 items per page. Default page 10). + :param delay: Number of seconds to wait in between subsequent video downloads. + :param debug: Set to TRUE to prevent downloading of items. + Instead of downloading, entries will be printed to log. + """ + if not isinstance(camera, list): + camera = [camera] + + results = await self.get_videos_metadata(since=since, stop=stop) + await self._parse_downloaded_items(results, camera, path, delay, debug) + + async def get_videos_metadata(self, since=None, camera="all", stop=10): + """ + Fetch and return video metadata. + + :param since: Date and time to get videos from. + Ex: "2018/07/28 12:33:00" to retrieve videos since + July 28th 2018 at 12:33:00 + :param stop: Page to stop on (~25 items per page. Default page 10). + """ + videos = [] + if since is None: + since_epochs = self.last_refresh + else: + parsed_datetime = parse(since, fuzzy=True) + since_epochs = parsed_datetime.timestamp() + + formatted_date = util.get_time(time_to_convert=since_epochs) + _LOGGER.info("Retrieving videos since %s", formatted_date) + + for page in range(1, stop): + response = await api.request_videos(self, time=since_epochs, page=page) + _LOGGER.debug("Processing page %s", page) + try: + result = response["media"] + if not result: + raise KeyError + videos.extend(result) + except (KeyError, TypeError): + _LOGGER.info("No videos found on page %s. Exiting.", page) + break + return videos + + async def do_http_get(self, address): + """ + Do an http_get on address. + + :param address: address to be added to base_url. + """ + response = await api.http_get( + self, + url=f"{self.urls.base_url}{address}", + stream=True, + json=False, + timeout=TIMEOUT_MEDIA, + ) + return response + + async def _parse_downloaded_items(self, result, camera, path, delay, debug): + """Parse downloaded videos.""" + for item in result: + try: + created_at = item["created_at"] + camera_name = item["device_name"] + is_deleted = item["deleted"] + address = item["media"] + except KeyError: + _LOGGER.info("Missing clip information, skipping...") + continue + + if camera_name not in camera and "all" not in camera: + _LOGGER.debug("Skipping videos for %s.", camera_name) + continue + + if is_deleted: + _LOGGER.debug("%s: %s is marked as deleted.", camera_name, address) + continue + + filename = f"{camera_name}-{created_at}" + filename = f"{slugify(filename)}.mp4" + filename = os.path.join(path, filename) + + if not debug: + if await aiofiles.ospath.isfile(filename): + _LOGGER.info("%s already exists, skipping...", filename) + continue + + response = await self.do_http_get(address) + async with aiofiles.open(filename, "wb") as vidfile: + await vidfile.write(await response.read()) + + _LOGGER.info("Downloaded video to %s", filename) + else: + print( + f"Camera: {camera_name}, Timestamp: {created_at}, " + f"Address: {address}, Filename: {filename}" + ) + if delay > 0: + time.sleep(delay) + + +class BlinkSetupError(Exception): + """Class to handle setup errors.""" diff --git a/share/blinkpy-upstream-0.25.5/blinkpy/sync_module.py b/share/blinkpy-upstream-0.25.5/blinkpy/sync_module.py new file mode 100644 index 00000000..103d88d1 --- /dev/null +++ b/share/blinkpy-upstream-0.25.5/blinkpy/sync_module.py @@ -0,0 +1,785 @@ +"""Defines a sync module for Blink.""" + +import logging +import string +import datetime +import traceback +import asyncio +import aiofiles +from sortedcontainers import SortedSet +from requests.structures import CaseInsensitiveDict +from blinkpy import api +from blinkpy.camera import BlinkCamera, BlinkCameraMini, BlinkDoorbell +from blinkpy.helpers.util import ( + time_to_seconds, + backoff_seconds, + to_alphanumeric, + json_dumps, +) +from blinkpy.helpers.constants import ONLINE + +_LOGGER = logging.getLogger(__name__) + + +class BlinkSyncModule: + """Class to initialize sync module.""" + + def __init__(self, blink, network_name, network_id, camera_list): + """ + Initialize Blink sync module. + + :param blink: Blink class instantiation + """ + self.blink = blink + self.network_id = network_id + self.region_id = blink.auth.region_id + self.name = network_name + self.serial = None + self._version = None + self.status = "offline" + self.sync_id = None + self.host = None + self.summary = None + self.network_info = None + self.events = [] + self.cameras = CaseInsensitiveDict({}) + self.motion_interval = blink.motion_interval + self.motion = {} + # A dictionary where keys are the camera names, and + # values are lists of recent clips. + self.last_records = {} + self.camera_list = camera_list + self.available = False + # type_key_map is only for the mini's and the doorbells. + # Outdoor cameras have their own URL API which must be queried. + self.type_key_map = { + "mini": "owls", + "doorbell": "doorbells", + } + self._names_table = {} + self._local_storage = { + "enabled": False, + "compatible": False, + "status": False, + "last_manifest_id": None, + "manifest": SortedSet(), + "manifest_stale": True, + "last_manifest_read": datetime.datetime(1970, 1, 1, 0, 0, 0).isoformat(), + } + + @property + def attributes(self): + """Return sync attributes.""" + attr = { + "name": self.name, + "id": self.sync_id, + "network_id": self.network_id, + "serial": self.serial, + "version": self._version, + "status": self.status, + "region_id": self.region_id, + "local_storage": self.local_storage, + } + return attr + + @property + def urls(self): + """Return device urls.""" + return self.blink.urls + + @property + def online(self): + """Return boolean system online status.""" + try: + return ONLINE[self.status] + except KeyError: + _LOGGER.error("Unknown sync module status %s", self.status) + self.available = False + return False + + @property + def version(self): + """Return the Syncmodule Firmware version.""" + return self._version + + @property + def arm(self): + """Return status of sync module: armed/disarmed.""" + try: + return self.network_info["network"]["armed"] + except (KeyError, TypeError): + self.available = False + return None + + @property + def local_storage(self): + """Indicate if local storage is activated or not (True/False).""" + return self._local_storage["status"] + + @property + def local_storage_manifest_ready(self): + """Indicate if the manifest is up-to-date.""" + return not self._local_storage["manifest_stale"] + + async def async_arm(self, value): + """Arm or disarm camera.""" + if value: + return await api.request_system_arm(self.blink, self.network_id) + return await api.request_system_disarm(self.blink, self.network_id) + + async def start(self): + """Initialize the system.""" + _LOGGER.debug("Initializing the sync module") + response = await self.sync_initialize() + if not response: + return False + + try: + self.sync_id = self.summary["id"] + self.serial = self.summary["serial"] + self.status = self.summary["status"] + except KeyError: + _LOGGER.error("Could not extract some sync module info: %s", response) + + is_ok = await self.get_network_info() + + if not is_ok or not await self.update_cameras(): + self.available = False + return False + self.available = True + return True + + async def sync_initialize(self): + """Initialize a sync module.""" + # Doesn't include local store info for some reason. + response = await api.request_syncmodule(self.blink, self.network_id) + try: + self.summary = response["syncmodule"] + self.network_id = self.summary["network_id"] + await self._init_local_storage(self.summary["id"]) + except (TypeError, KeyError): + _LOGGER.error( + "Could not retrieve sync module information with response: %s", response + ) + return False + self._version = self.summary.get("fw_version") + return response + + async def _init_local_storage(self, sync_id): + """Initialize local storage from homescreen dictionary.""" + home_screen = self.blink.homescreen + sync_module = None + try: + sync_modules = home_screen["sync_modules"] + for mod in sync_modules: + if mod["id"] == sync_id: + self._local_storage["enabled"] = mod["local_storage_enabled"] + self._local_storage["compatible"] = mod["local_storage_compatible"] + self._local_storage["status"] = ( + mod["local_storage_status"] == "active" + ) + self._local_storage["last_manifest_read"] = ( + datetime.datetime.utcnow() - datetime.timedelta(seconds=10) + ).isoformat() + sync_module = mod + except (TypeError, KeyError): + _LOGGER.error( + "Could not retrieve sync module information from home screen: %s", + home_screen, + ) + return False + return sync_module + + async def update_cameras(self, camera_type=BlinkCamera): + """Update cameras from server.""" + type_map = { + "mini": BlinkCameraMini, + "doorbell": BlinkDoorbell, + "default": BlinkCamera, + } + try: + _LOGGER.debug("Updating cameras") + for camera_config in self.camera_list: + _LOGGER.debug("Updating camera_config %s", json_dumps(camera_config)) + if "name" not in camera_config: + break + blink_camera_type = camera_config.get("type", "") + name = camera_config["name"] + self.motion[name] = False + unique_info = self.get_unique_info(name) + if blink_camera_type in type_map: + camera_type = type_map[blink_camera_type] + self.cameras[name] = camera_type(self) + camera_info = await self.get_camera_info( + camera_config["id"], unique_info=unique_info + ) + self._names_table[to_alphanumeric(name)] = name + await self.cameras[name].update( + camera_info, force_cache=True, force=True + ) + except KeyError: + _LOGGER.error("Could not create camera instances for %s", self.name) + return False + return True + + def get_unique_info(self, name): + """Extract unique information for Minis and Doorbells.""" + try: + for type_name in self.type_key_map: + for device in self.blink.get_homescreen_devices(type_name): + _LOGGER.debug("checking device %s", device) + if device["name"] == name: + _LOGGER.debug("Found unique_info %s", device) + return device + except (TypeError, KeyError): + pass + return None + + async def get_events(self, **kwargs): + """Retrieve events from server.""" + force = kwargs.pop("force", False) + response = await api.request_sync_events( + self.blink, self.network_id, force=force + ) + try: + return response["event"] + except (TypeError, KeyError): + _LOGGER.error("Could not extract events: %s", response) + return False + + async def get_camera_info(self, camera_id, **kwargs): + """Retrieve camera information.""" + unique = kwargs.get("unique_info", None) + if unique is not None: + return unique + response = await api.request_camera_info(self.blink, self.network_id, camera_id) + try: + return response["camera"][0] + except (TypeError, KeyError): + _LOGGER.error( + "Could not extract camera info for %s: %s", camera_id, response + ) + return {} + + async def get_network_info(self): + """Retrieve network status.""" + self.network_info = await api.request_network_update( + self.blink, self.network_id + ) + try: + if self.network_info["network"]["sync_module_error"]: + raise KeyError + except (TypeError, KeyError): + self.available = False + return False + return True + + async def refresh(self, force_cache=False): + """Get all blink cameras and pulls their most recent status.""" + if not await self.get_network_info(): + return + await self.update_local_storage_manifest() + await self.check_new_videos() + for camera_name in self.cameras: + camera_id = self.cameras[camera_name].camera_id + camera_info = await self.get_camera_info( + camera_id, + unique_info=self.get_unique_info(camera_name), + ) + await self.cameras[camera_name].update(camera_info, force_cache=force_cache) + self.available = True + + async def check_new_videos(self): + """Check if new videos since last refresh.""" + _LOGGER.debug("Checking for new videos") + try: + interval = self.blink.last_refresh - self.motion_interval * 60 + last_refresh = datetime.datetime.fromtimestamp(self.blink.last_refresh) + _LOGGER.debug("last_refresh = %s", last_refresh) + _LOGGER.debug("interval = %s", interval) + except TypeError: + # This is the first start, so refresh hasn't happened yet. + # No need to check for motion. + ex = traceback.format_exc() + _LOGGER.error( + "Error calculating interval (last_refresh = %s): %s", + self.blink.last_refresh, + ex, + ) + trace = "".join(traceback.format_stack()) + _LOGGER.debug("\n%s", trace) + _LOGGER.info("No new videos since last refresh.") + return False + + resp = await api.request_videos(self.blink, time=interval, page=1) + + last_record = {} + for camera in self.cameras: + # Initialize the list if doesn't exist yet. + if camera not in self.last_records: + self.last_records[camera] = [] + # Hang on to the last record if there is one. + if len(self.last_records[camera]) > 0: + last_record[camera] = self.last_records[camera][-1] + # Reset in preparation for processing new entries. + self.last_records[camera] = [] + self.motion[camera] = False + + try: + info = resp["media"] + except (KeyError, TypeError): + _LOGGER.warning("Could not check for motion. Response: %s", resp) + return False + + for entry in info: + try: + name = entry["device_name"] + clip_url = entry["media"] + timestamp = entry["created_at"] + if self.check_new_video_time(timestamp): + self.motion[name] = True and self.arm + record = {"clip": clip_url, "time": timestamp} + self.last_records[name].append(record) + except KeyError: + last_refresh = datetime.datetime.fromtimestamp(self.blink.last_refresh) + _LOGGER.debug( + "No new videos for %s since last refresh at %s.", + entry, + last_refresh, + ) + + # Process local storage if active and if the manifest is ready. + last_manifest_read = datetime.datetime.fromisoformat( + self._local_storage["last_manifest_read"] + ) + _LOGGER.debug("last_manifest_read = %s", last_manifest_read) + _LOGGER.debug("Manifest ready? %s", self.local_storage_manifest_ready) + if self.local_storage and self.local_storage_manifest_ready: + _LOGGER.debug("Processing updated manifest") + manifest = self._local_storage["manifest"] + last_manifest_id = self._local_storage["last_manifest_id"] + last_manifest_read = self._local_storage["last_manifest_read"] + last_read_local = ( + datetime.datetime.fromisoformat(last_manifest_read) + .replace(tzinfo=datetime.timezone.utc) + .astimezone(tz=None) + ) + last_clip_time = None + num_new = 0 + for item in reversed(manifest): + iso_timestamp = item.created_at.isoformat() + + _LOGGER.debug( + "Checking '%s': clip_time = %s, manifest_read = %s", + item.name, + iso_timestamp, + last_manifest_read, + ) + # Exit the loop once there are no new videos in the list. + if not self.check_new_video_time(iso_timestamp, last_manifest_read): + _LOGGER.info( + "No new local storage videos since last manifest " + "read at %s.", + last_read_local, + ) + break + _LOGGER.debug("Found new item in local storage manifest: %s", item) + name = item.name + clip_url = item.url(last_manifest_id) + await item.prepare_download(self.blink) + self.motion[name] = True + record = {"clip": clip_url, "time": iso_timestamp} + self.last_records[name].append(record) + last_clip_time = item.created_at + num_new += 1 + + # The manifest became ready, and we read recent clips from it. + if num_new > 0: + last_manifest_read = ( + datetime.datetime.utcnow() - datetime.timedelta(seconds=10) + ).isoformat() + self._local_storage["last_manifest_read"] = last_manifest_read + _LOGGER.debug("Updated last_manifest_read to %s", last_manifest_read) + _LOGGER.debug("Last clip time was %s", last_clip_time) + # We want to keep the last record when no new motion was detected. + for camera in self.cameras: + # Check if there are no new records, indicating motion. + if len(self.last_records[camera]) == 0: + # If no new records, check if we had a previous last record. + if camera in last_record: + # Put the last record back into the empty list. + self.last_records[camera].append(last_record[camera]) + + return True + + def check_new_video_time(self, timestamp, reference=None): + """Check if video has timestamp since last refresh. + + :param timestamp ISO-formatted timestamp string + :param reference ISO-formatted reference timestamp string + """ + if not reference: + return time_to_seconds(timestamp) > self.blink.last_refresh + return time_to_seconds(timestamp) > time_to_seconds(reference) + + async def update_local_storage_manifest(self): + """Update local storage manifest, which lists all stored clips.""" + if not self.local_storage: + self._local_storage["manifest_stale"] = True + return None + _LOGGER.debug("Updating local storage manifest") + + response = await self.poll_local_storage_manifest() + try: + manifest_request_id = response["id"] + except (TypeError, KeyError): + _LOGGER.error( + "Could not extract manifest request ID from response: %s", response + ) + self._local_storage["manifest_stale"] = True + return None + + response = await self.poll_local_storage_manifest(manifest_request_id) + try: + manifest_id = response["manifest_id"] + except (TypeError, KeyError): + _LOGGER.error("Could not extract manifest ID from response: %s", response) + self._local_storage["manifest_stale"] = True + return None + + self._local_storage["last_manifest_id"] = manifest_id + template = string.Template(api.local_storage_clip_url_template()).substitute( + account_id=self.blink.account_id, + network_id=self.network_id, + sync_id=self.sync_id, + manifest_id="$manifest_id", + clip_id="$clip_id", + ) + num_stored = len(self._local_storage["manifest"]) + try: + for item in response["clips"]: + alphanumeric_name = item["camera_name"] + if alphanumeric_name in self._names_table: + camera_name = self._names_table[alphanumeric_name] + self._local_storage["manifest"].add( + LocalStorageMediaItem( + item["id"], + camera_name, + item["created_at"], + item["size"], + manifest_id, + template, + ) + ) + num_added = len(self._local_storage["manifest"]) - num_stored + if num_added > 0: + _LOGGER.info( + "Found %s new clip(s) in local storage manifest id = %s", + num_added, + manifest_id, + ) + except (TypeError, KeyError): + ex = traceback.format_exc() + _LOGGER.error("Could not extract clips list from response: %s", ex) + trace = "".join(traceback.format_stack()) + _LOGGER.debug("\n%s", trace) + self._local_storage["manifest_stale"] = True + return None + + self._local_storage["manifest_stale"] = False + return True + + async def poll_local_storage_manifest( + self, manifest_request_id=None, max_retries=4 + ): + """Poll for local storage manifest.""" + # The sync module may be busy processing another request + # (like saving a new clip). + # Poll the endpoint until it is ready, backing off each retry. + response = None + for retry in range(max_retries): + # Request building the manifest. + if not manifest_request_id: + response = await api.request_local_storage_manifest( + self.blink, self.network_id, self.sync_id + ) + if response and "id" in response: + break + # Get the manifest. + else: + response = await api.get_local_storage_manifest( + self.blink, self.network_id, self.sync_id, manifest_request_id + ) + if response and "clips" in response: + break + seconds = backoff_seconds(retry=retry, default_time=3) + _LOGGER.debug("[retry=%d] Retrying in %d seconds", retry + 1, seconds) + await asyncio.sleep(seconds) + return response + + +class BlinkOwl(BlinkSyncModule): + """Representation of a sync-less device.""" + + def __init__(self, blink, name, network_id, response): + """Initialize a sync-less object.""" + cameras = [{"name": name, "id": response["id"]}] + super().__init__(blink, name, network_id, cameras) + self.sync_id = response["id"] + self.serial = response["serial"] + self.status = response["enabled"] + if not self.serial: + self.serial = f"{network_id}-{self.sync_id}" + + async def sync_initialize(self): + """Initialize a sync-less module.""" + self.summary = { + "id": self.sync_id, + "name": self.name, + "serial": self.serial, + "status": self.status, + "onboarded": True, + "account_id": self.blink.account_id, + "network_id": self.network_id, + } + return self.summary + + async def update_cameras(self, camera_type=BlinkCameraMini): + """Update sync-less cameras.""" + return await super().update_cameras(camera_type=BlinkCameraMini) + + async def get_camera_info(self, camera_id, **kwargs): + """Retrieve camera information.""" + try: + for owl in self.blink.get_homescreen_devices("mini"): + if owl["name"] == self.name: + self.status = owl["enabled"] + return owl + except (TypeError, KeyError): + pass + return None + + async def get_network_info(self): + """Get network info for sync-less module.""" + return True + + @property + def network_info(self): + """Format owl response to resemble sync module.""" + return { + "network": { + "id": self.network_id, + "name": self.name, + "armed": self.status, + "sync_module_error": False, + "account_id": self.blink.account_id, + } + } + + @network_info.setter + def network_info(self, value): + """Set network_info property.""" + + +class BlinkLotus(BlinkSyncModule): + """Representation of a sync-less device.""" + + def __init__(self, blink, name, network_id, response): + """Initialize a sync-less object.""" + cameras = [{"name": name, "id": response["id"]}] + super().__init__(blink, name, network_id, cameras) + self.sync_id = response["id"] + self.serial = response["serial"] + self.status = response["enabled"] + if not self.serial: + self.serial = f"{network_id}-{self.sync_id}" + + async def sync_initialize(self): + """Initialize a sync-less module.""" + self.summary = { + "id": self.sync_id, + "name": self.name, + "serial": self.serial, + "status": self.status, + "onboarded": True, + "account_id": self.blink.account_id, + "network_id": self.network_id, + } + return self.summary + + async def update_cameras(self, camera_type=BlinkDoorbell): + """Update sync-less cameras.""" + return await super().update_cameras(camera_type=BlinkDoorbell) + + async def get_camera_info(self, camera_id, **kwargs): + """Retrieve camera information.""" + try: + for doorbell in self.blink.get_homescreen_devices("doorbell"): + if doorbell["name"] == self.name: + self.status = doorbell["enabled"] + return doorbell + except (TypeError, KeyError): + pass + return None + + async def get_network_info(self): + """Get network info for sync-less module.""" + return True + + @property + def network_info(self): + """Format lotus response to resemble sync module.""" + return { + "network": { + "id": self.network_id, + "name": self.name, + "armed": self.status, + "sync_module_error": False, + "account_id": self.blink.account_id, + } + } + + @network_info.setter + def network_info(self, value): + """Set network_info property.""" + + +class LocalStorageMediaItem: + """Metadata of media item in the local storage manifest.""" + + def __init__( + self, item_id, camera_name, created_at, size, manifest_id, url_template + ): + """Initialize media item. + + :param item_id: ID of the manifest item. + :param camera_name: Name of camera that took the video. + :param created_at: ISO-formatted time stamp for creation time. + :param size: Size of the video file. + """ + self._id = int(item_id) + self._camera_name = camera_name + self._created_at = datetime.datetime.fromisoformat(created_at) + self._size = size + self._url_template = url_template + self._manifest_id = manifest_id + + def _build_url(self, manifest_id, clip_id): + return string.Template(self._url_template).substitute( + manifest_id=manifest_id, clip_id=clip_id + ) + + @property + def id(self): + """Return media item ID.""" + return self._id + + @property + def name(self): + """Return name of camera that captured this media item.""" + return self._camera_name + + @property + def created_at(self): + """Return the ISO-formatted creation time stamp of this media item.""" + return self._created_at + + @property + def size(self): + """Return the reported size of this media item.""" + return self._size + + def url(self, manifest_id=None): + """Build the URL. + + Builds the url new each time since the media item is cached, + and the manifest is possibly rebuilt each refresh. + + :param manifest_id: ID of new manifest (if it changed) + :return: URL for clip retrieval + """ + if manifest_id: + self._manifest_id = manifest_id + return self._build_url(self._manifest_id, self._id) + + async def prepare_download(self, blink, max_retries=4): + """Initiate upload of media item from the sync module to Blink cloud servers.""" + if max_retries == 0: + return None + url = blink.urls.base_url + self.url() + response = await api.http_post(blink, url) + await api.wait_for_command(blink, response) + return response + + async def delete_video(self, blink, max_retries=4) -> bool: + """Delete video from sync module.""" + delete_url = blink.urls.base_url + self.url() + delete_url = delete_url.replace("request", "delete") + + for retry in range(max_retries): + delete = await api.http_post( + blink, delete_url, json=False + ) # Delete the video + if delete.status == 200: + return True + seconds = backoff_seconds(retry=retry, default_time=3) + _LOGGER.debug("[retry=%d] Retrying in %d seconds", retry + 1, seconds) + await asyncio.sleep(seconds) + return False + + async def download_video(self, blink, file_name, max_retries=4) -> bool: + """Download a previously prepared video from sync module.""" + for retry in range(max_retries): + url = blink.urls.base_url + self.url() + video = await api.http_get(blink, url, json=False) + if video.status == 200: + async with aiofiles.open(file_name, "wb") as vidfile: + await vidfile.write(await video.read()) # download the video + return True + seconds = backoff_seconds(retry=retry, default_time=3) + _LOGGER.debug( + "[retry=%d] Retrying in %d seconds: %s", retry + 1, seconds, url + ) + await asyncio.sleep(seconds) + return False + + async def download_video_delete(self, blink, file_name, max_retries=4) -> bool: + """Delete local videos. + + Initiate upload of media item from the sync module to + Blink cloud servers then download to local filesystem and delete from sync. + """ + if await self.prepare_download(blink): + if await self.download_video(blink, file_name): + if await self.delete_video(blink): + return True + return False + + def __repr__(self): + """Create string representation.""" + return ( + f"LocalStorageMediaItem(id={self._id}, camera_name={self._camera_name}, " + f"created_at={self._created_at}" + + f", size={self._size}, manifest_id={self._manifest_id}, " + f"url_template={self._url_template})" + ) + + def __str__(self): + """Create string representation.""" + return self.__repr__() + + def cmp_key(self): + """Return key to use for comparison.""" + return self._created_at + + def __eq__(self, other): + """Check equality.""" + return self.cmp_key() == other.cmp_key() + + def __lt__(self, other): + """Check less than.""" + return self.cmp_key() < other.cmp_key() + + def __hash__(self): + """Return unique hash value.""" + return self._id diff --git a/share/blinkpy-upstream-0.25.5/tests/test_blinkpy.py b/share/blinkpy-upstream-0.25.5/tests/test_blinkpy.py new file mode 100644 index 00000000..a71082ed --- /dev/null +++ b/share/blinkpy-upstream-0.25.5/tests/test_blinkpy.py @@ -0,0 +1,592 @@ +""" +Test full system. + +Tests the system initialization and attributes of +the main Blink system. Tests if we properly catch +any communication related errors at startup. +""" + +from unittest import mock +from unittest import IsolatedAsyncioTestCase +import time +from blinkpy.blinkpy import Blink, BlinkSetupError, LoginError, TokenRefreshFailed +from blinkpy.sync_module import BlinkOwl, BlinkLotus +from blinkpy.helpers.constants import __version__ + +SPECIAL = "!@#$%^&*()_+-=[]{}|/<>?,.'" + + +class TestBlinkSetup(IsolatedAsyncioTestCase): + """Test the Blink class in blinkpy.""" + + def setUp(self): + """Initialize blink test object.""" + self.blink = Blink(session=mock.AsyncMock()) + self.blink.available = True + + def tearDown(self): + """Cleanup blink test object.""" + self.blink = None + + async def test_initialization(self): + """Verify we can initialize blink.""" + blink = Blink() + self.assertEqual(blink.version, __version__) + + def test_network_id_failure(self): + """Check that with bad network data a setup error is raised.""" + self.blink.networks = None + with self.assertRaises(BlinkSetupError): + self.blink.setup_network_ids() + + def test_multiple_networks(self): + """Check that we handle multiple networks appropriately.""" + self.blink.networks = { + "0000": {"onboarded": False, "name": "foo"}, + "5678": {"onboarded": True, "name": "bar"}, + "1234": {"onboarded": False, "name": "test"}, + } + self.blink.setup_network_ids() + self.assertTrue("5678" in self.blink.network_ids) + + def test_multiple_onboarded_networks(self): + """Check that we handle multiple networks appropriately.""" + self.blink.networks = { + "0000": {"onboarded": False, "name": "foo"}, + "5678": {"onboarded": True, "name": "bar"}, + "1234": {"onboarded": True, "name": "test"}, + } + self.blink.setup_network_ids() + self.assertTrue("0000" not in self.blink.network_ids) + self.assertTrue("5678" in self.blink.network_ids) + self.assertTrue("1234" in self.blink.network_ids) + + @mock.patch("blinkpy.blinkpy.time.time") + async def test_throttle(self, mock_time): + """Check throttling functionality.""" + now = self.blink.refresh_rate + 1 + mock_time.return_value = now + self.assertEqual(self.blink.last_refresh, None) + self.assertEqual(self.blink.check_if_ok_to_update(), True) + self.assertEqual(self.blink.last_refresh, None) + with ( + mock.patch( + "blinkpy.sync_module.BlinkSyncModule.refresh", return_value=True + ), + mock.patch("blinkpy.blinkpy.Blink.get_homescreen", return_value=True), + ): + await self.blink.refresh(force=True) + + self.assertEqual(self.blink.last_refresh, now) + self.assertEqual(self.blink.check_if_ok_to_update(), False) + self.assertEqual(self.blink.last_refresh, now) + + async def test_not_available_refresh(self): + """Check that setup_post_verify executes on refresh when not available.""" + self.blink.available = False + with ( + mock.patch( + "blinkpy.sync_module.BlinkSyncModule.refresh", return_value=True + ), + mock.patch("blinkpy.blinkpy.Blink.get_homescreen", return_value=True), + mock.patch("blinkpy.blinkpy.Blink.setup_post_verify", return_value=True), + ): + self.assertTrue(await self.blink.refresh(force=True)) + with mock.patch("time.time", return_value=time.time() + 4): + self.assertFalse(await self.blink.refresh()) + + def test_sync_case_insensitive_dict(self): + """Check that we can access sync modules ignoring case.""" + self.blink.sync["test"] = 1234 + self.assertEqual(self.blink.sync["test"], 1234) + self.assertEqual(self.blink.sync["TEST"], 1234) + self.assertEqual(self.blink.sync["tEsT"], 1234) + + def test_sync_special_chars(self): + """Check that special chars can be used as sync name.""" + self.blink.sync[SPECIAL] = 1234 + self.assertEqual(self.blink.sync[SPECIAL], 1234) + + @mock.patch("blinkpy.api.request_camera_usage") + @mock.patch("blinkpy.api.request_homescreen") + async def test_setup_cameras(self, mock_home, mock_req): + """Check retrieval of camera information.""" + mock_home.return_value = {} + mock_req.return_value = { + "networks": [ + { + "network_id": 1234, + "cameras": [ + {"id": 5678, "name": "foo"}, + {"id": 5679, "name": "bar"}, + {"id": 5779, "name": SPECIAL}, + ], + }, + {"network_id": 4321, "cameras": [{"id": 0000, "name": "test"}]}, + ] + } + result = await self.blink.setup_camera_list() + self.assertEqual( + result, + { + "1234": [ + {"name": "foo", "id": 5678, "type": "default"}, + {"name": "bar", "id": 5679, "type": "default"}, + {"name": SPECIAL, "id": 5779, "type": "default"}, + ], + "4321": [{"name": "test", "id": 0000, "type": "default"}], + }, + ) + + @mock.patch("blinkpy.api.request_camera_usage") + async def test_setup_cameras_failure(self, mock_home): + """Check that on failure we raise a setup error.""" + mock_home.return_value = {} + with self.assertRaises(BlinkSetupError): + await self.blink.setup_camera_list() + mock_home.return_value = None + with self.assertRaises(BlinkSetupError): + await self.blink.setup_camera_list() + + def test_setup_urls(self): + """Check setup of URLS.""" + self.blink.auth.region_id = "test" + self.blink.setup_urls() + self.assertEqual(self.blink.urls.subdomain, "rest-test") + + def test_setup_urls_failure(self): + """Check that on failure we raise a setup error.""" + self.blink.auth.region_id = None + with self.assertRaises(BlinkSetupError): + self.blink.setup_urls() + + @mock.patch("blinkpy.api.request_networks") + async def test_setup_networks(self, mock_networks): + """Check setup of networks.""" + mock_networks.return_value = {"summary": "foobar"} + await self.blink.setup_networks() + self.assertEqual(self.blink.networks, "foobar") + + @mock.patch("blinkpy.api.request_networks") + async def test_setup_networks_failure(self, mock_networks): + """Check that on failure we raise a setup error.""" + mock_networks.return_value = {} + with self.assertRaises(BlinkSetupError): + await self.blink.setup_networks() + mock_networks.return_value = None + with self.assertRaises(BlinkSetupError): + await self.blink.setup_networks() + + def test_merge_cameras(self): + """Test merging of cameras.""" + self.blink.sync = { + "foo": MockSync({"test": 123, "foo": "bar"}), + "bar": MockSync({"fizz": "buzz", "bar": "foo"}), + } + combined = self.blink.merge_cameras() + self.assertEqual(combined["test"], 123) + self.assertEqual(combined["foo"], "bar") + self.assertEqual(combined["fizz"], "buzz") + self.assertEqual(combined["bar"], "foo") + + @mock.patch("blinkpy.blinkpy.BlinkOwl.start") + async def test_initialize_blink_minis(self, mock_start): + """Test blink mini initialization.""" + mock_start.return_value = True + self.blink.homescreen = { + "owls": [ + { + "enabled": False, + "id": 1, + "name": "foo", + "network_id": 2, + "onboarded": True, + "status": "online", + "thumbnail": "/foo/bar", + "serial": "1234", + }, + { + "enabled": True, + "id": 3, + "name": "bar", + "network_id": 4, + "onboarded": True, + "status": "online", + "thumbnail": "/foo/bar", + "serial": "abcd", + }, + ] + } + self.blink.sync = {} + await self.blink.setup_owls() + self.assertEqual(self.blink.sync["foo"].__class__, BlinkOwl) + self.assertEqual(self.blink.sync["bar"].__class__, BlinkOwl) + self.assertEqual(self.blink.sync["foo"].arm, False) + self.assertEqual(self.blink.sync["bar"].arm, True) + self.assertEqual(self.blink.sync["foo"].name, "foo") + self.assertEqual(self.blink.sync["bar"].name, "bar") + + async def test_blink_mini_cameras_returned(self): + """Test that blink mini cameras are found if attached to sync module.""" + self.blink.network_ids = ["1234"] + self.blink.homescreen = { + "sync_modules": [{"network_id": 1234}], + "owls": [ + { + "id": 1, + "name": "foo", + "network_id": 1234, + "onboarded": True, + "enabled": True, + "status": "online", + "thumbnail": "/foo/bar", + "serial": "abc123", + } + ] + } + result = await self.blink.setup_owls() + self.assertEqual(self.blink.network_ids, ["1234"]) + self.assertEqual( + result, [{"1234": {"name": "foo", "id": "1234", "type": "mini"}}] + ) + + self.blink.no_owls = True + self.blink.network_ids = [] + await self.blink.get_homescreen() + result = await self.blink.setup_owls() + self.assertEqual(self.blink.network_ids, []) + self.assertEqual(result, []) + + @mock.patch("blinkpy.api.request_camera_usage") + async def test_blink_mini_attached_to_sync(self, mock_usage): + """Test that blink mini cameras are properly attached to sync module.""" + self.blink.network_ids = ["1234"] + self.blink.homescreen = { + "sync_modules": [{"network_id": 1234}], + "owls": [ + { + "id": 1, + "name": "foo", + "network_id": 1234, + "onboarded": True, + "enabled": True, + "status": "online", + "thumbnail": "/foo/bar", + "serial": "abc123", + } + ] + } + mock_usage.return_value = {"networks": [{"cameras": [], "network_id": 1234}]} + result = await self.blink.setup_camera_list() + self.assertEqual( + result, {"1234": [{"name": "foo", "id": "1234", "type": "mini"}]} + ) + + @mock.patch("blinkpy.blinkpy.BlinkLotus.start") + async def test_initialize_blink_doorbells(self, mock_start): + """Test blink doorbell initialization.""" + mock_start.return_value = True + self.blink.homescreen = { + "doorbells": [ + { + "enabled": False, + "id": 1, + "name": "foo", + "network_id": 2, + "onboarded": True, + "status": "online", + "thumbnail": "/foo/bar", + "serial": "1234", + }, + { + "enabled": True, + "id": 3, + "name": "bar", + "network_id": 4, + "onboarded": True, + "status": "online", + "thumbnail": "/foo/bar", + "serial": "abcd", + }, + ] + } + self.blink.sync = {} + await self.blink.setup_lotus() + self.assertEqual(self.blink.sync["foo"].__class__, BlinkLotus) + self.assertEqual(self.blink.sync["bar"].__class__, BlinkLotus) + self.assertEqual(self.blink.sync["foo"].arm, False) + self.assertEqual(self.blink.sync["bar"].arm, True) + self.assertEqual(self.blink.sync["foo"].name, "foo") + self.assertEqual(self.blink.sync["bar"].name, "bar") + + @mock.patch("blinkpy.api.request_camera_usage") + async def test_blink_doorbell_attached_to_sync(self, mock_usage): + """Test that blink doorbell cameras are properly attached to sync module.""" + self.blink.network_ids = ["1234"] + self.blink.homescreen = { + "sync_modules": [{"network_id": 1234}], + "doorbells": [ + { + "id": 1, + "name": "foo", + "network_id": 1234, + "onboarded": True, + "enabled": True, + "status": "online", + "thumbnail": "/foo/bar", + "serial": "abc123", + } + ] + } + mock_usage.return_value = {"networks": [{"cameras": [], "network_id": 1234}]} + result = await self.blink.setup_camera_list() + self.assertEqual( + result, {"1234": [{"name": "foo", "id": "1234", "type": "doorbell"}]} + ) + + @mock.patch("blinkpy.api.request_camera_usage") + async def test_blink_doorbell_with_alt_homescreen_key(self, mock_usage): + """Test that doorbells are discovered from alternate homescreen keys.""" + self.blink.network_ids = ["1234"] + self.blink.homescreen = { + "sync_modules": [{"network_id": 1234}], + "lotus": { + "devices": [ + { + "id": 1, + "name": "foo", + "network_id": 1234, + "onboarded": True, + "enabled": True, + "status": "online", + "thumbnail": "/foo/bar", + "serial": "abc123", + } + ] + } + } + mock_usage.return_value = {"networks": [{"cameras": [], "network_id": 1234}]} + result = await self.blink.setup_camera_list() + self.assertEqual( + result, {"1234": [{"name": "foo", "id": "1234", "type": "doorbell"}]} + ) + + @mock.patch("blinkpy.blinkpy.BlinkLotus.start") + @mock.patch("blinkpy.api.request_camera_usage") + async def test_blink_syncless_doorbell_not_in_camera_usage( + self, mock_usage, mock_lotus_start + ): + """Test that sync-less doorbells initialize even without camera_usage network.""" + mock_lotus_start.return_value = True + self.blink.network_ids = [] + self.blink.homescreen = { + "doorbells": [ + { + "id": 1, + "name": "foo", + "network_id": 1234, + "onboarded": True, + "enabled": True, + "status": "online", + "thumbnail": "/foo/bar", + "serial": "abc123", + } + ] + } + mock_usage.return_value = {"networks": []} + result = await self.blink.setup_camera_list() + self.assertEqual(result, {}) + self.assertEqual(self.blink.sync["foo"].__class__, BlinkLotus) + + @mock.patch("blinkpy.api.request_camera_usage") + async def test_blink_multi_doorbell(self, mock_usage): + """Test that multiple doorbells are properly attached to sync module.""" + self.blink.network_ids = ["1234"] + self.blink.homescreen = { + "sync_modules": [{"network_id": 1234}], + "doorbells": [ + { + "id": 1, + "name": "foo", + "network_id": 1234, + "onboarded": True, + "enabled": True, + "status": "online", + "thumbnail": "/foo/bar", + "serial": "abc123", + }, + { + "id": 2, + "name": "bar", + "network_id": 1234, + "onboarded": True, + "enabled": True, + "status": "online", + "thumbnail": "/bar/foo", + "serial": "zxc456", + }, + ] + } + expected = { + "1234": [ + {"name": "foo", "id": "1234", "type": "doorbell"}, + {"name": "bar", "id": "1234", "type": "doorbell"}, + ] + } + mock_usage.return_value = {"networks": [{"cameras": [], "network_id": 1234}]} + result = await self.blink.setup_camera_list() + self.assertEqual(result, expected) + + @mock.patch("blinkpy.api.request_camera_usage") + async def test_blink_multi_mini(self, mock_usage): + """Test that multiple minis are properly attached to sync module.""" + self.blink.network_ids = ["1234"] + self.blink.homescreen = { + "sync_modules": [{"network_id": 1234}], + "owls": [ + { + "id": 1, + "name": "foo", + "network_id": 1234, + "onboarded": True, + "enabled": True, + "status": "online", + "thumbnail": "/foo/bar", + "serial": "abc123", + }, + { + "id": 2, + "name": "bar", + "network_id": 1234, + "onboarded": True, + "enabled": True, + "status": "online", + "thumbnail": "/bar/foo", + "serial": "zxc456", + }, + ] + } + expected = { + "1234": [ + {"name": "foo", "id": "1234", "type": "mini"}, + {"name": "bar", "id": "1234", "type": "mini"}, + ] + } + mock_usage.return_value = {"networks": [{"cameras": [], "network_id": 1234}]} + result = await self.blink.setup_camera_list() + self.assertEqual(result, expected) + + @mock.patch("blinkpy.api.request_camera_usage") + async def test_blink_camera_mix(self, mock_usage): + """Test that a mix of cameras are properly attached to sync module.""" + self.blink.network_ids = ["1234"] + self.blink.homescreen = { + "sync_modules": [{"network_id": 1234}], + "doorbells": [ + { + "id": 1, + "name": "foo", + "network_id": 1234, + "onboarded": True, + "enabled": True, + "status": "online", + "thumbnail": "/foo/bar", + "serial": "abc123", + }, + { + "id": 2, + "name": "bar", + "network_id": 1234, + "onboarded": True, + "enabled": True, + "status": "online", + "thumbnail": "/bar/foo", + "serial": "zxc456", + }, + ], + "owls": [ + { + "id": 3, + "name": "dead", + "network_id": 1234, + "onboarded": True, + "enabled": True, + "status": "online", + "thumbnail": "/dead/beef", + "serial": "qwerty", + }, + { + "id": 4, + "name": "beef", + "network_id": 1234, + "onboarded": True, + "enabled": True, + "status": "online", + "thumbnail": "/beef/dead", + "serial": "dvorak", + }, + ], + } + expected = { + "1234": [ + {"name": "foo", "id": "1234", "type": "doorbell"}, + {"name": "bar", "id": "1234", "type": "doorbell"}, + {"name": "dead", "id": "1234", "type": "mini"}, + {"name": "beef", "id": "1234", "type": "mini"}, + {"name": "normal", "id": "1234", "type": "default"}, + ] + } + mock_usage.return_value = { + "networks": [ + {"cameras": [{"name": "normal", "id": "1234"}], "network_id": 1234} + ] + } + result = await self.blink.setup_camera_list() + self.assertTrue("1234" in result) + for element in result["1234"]: + self.assertTrue(element in expected["1234"]) + + @mock.patch("blinkpy.blinkpy.Blink.get_homescreen") + @mock.patch("blinkpy.auth.Auth.startup") + @mock.patch("blinkpy.blinkpy.Blink.setup_urls") + @mock.patch("blinkpy.blinkpy.Blink.setup_post_verify") + async def test_blink_start( + self, + mock_urls, + mock_auth_startup, + mock_setup_post_verify, + mock_homescreen, + ): + """Test blink_start function.""" + + self.assertTrue(await self.blink.start()) + + self.blink.auth.no_prompt = True + self.assertTrue(await self.blink.start()) + + mock_homescreen.side_effect = [LoginError, TokenRefreshFailed] + self.assertFalse(await self.blink.start()) + self.assertFalse(await self.blink.start()) + + def test_setup_login_ids(self): + """Test setup_login_ids function.""" + + self.blink.auth.client_id = 1 + self.blink.auth.account_id = 2 + self.assertEqual(self.blink.client_id, 1) + self.assertEqual(self.blink.account_id, 2) + + @mock.patch("blinkpy.blinkpy.util.json_save") + async def test_save(self, mock_util): + """Test save function.""" + await self.blink.save("blah") + self.assertEqual(mock_util.call_count, 1) + + +class MockSync: + """Mock sync module class.""" + + def __init__(self, cameras): + """Initialize fake class.""" + + self.cameras = cameras diff --git a/sync-module-changes.patch b/sync-module-changes.patch new file mode 100644 index 00000000..834f64fe --- /dev/null +++ b/sync-module-changes.patch @@ -0,0 +1,33 @@ +diff --git a/blinkpy/sync_module.py b/share/blinkpy-upstream-0.25.5/blinkpy/sync_module.py +index 63b6aef..103d88d 100644 +--- a/blinkpy/sync_module.py ++++ b/share/blinkpy-upstream-0.25.5/blinkpy/sync_module.py +@@ -225,8 +225,8 @@ class BlinkSyncModule: + def get_unique_info(self, name): + """Extract unique information for Minis and Doorbells.""" + try: +- for type_key in self.type_key_map.values(): +- for device in self.blink.homescreen[type_key]: ++ for type_name in self.type_key_map: ++ for device in self.blink.get_homescreen_devices(type_name): + _LOGGER.debug("checking device %s", device) + if device["name"] == name: + _LOGGER.debug("Found unique_info %s", device) +@@ -551,7 +551,7 @@ class BlinkOwl(BlinkSyncModule): + async def get_camera_info(self, camera_id, **kwargs): + """Retrieve camera information.""" + try: +- for owl in self.blink.homescreen["owls"]: ++ for owl in self.blink.get_homescreen_devices("mini"): + if owl["name"] == self.name: + self.status = owl["enabled"] + return owl +@@ -614,7 +614,7 @@ class BlinkLotus(BlinkSyncModule): + async def get_camera_info(self, camera_id, **kwargs): + """Retrieve camera information.""" + try: +- for doorbell in self.blink.homescreen["doorbells"]: ++ for doorbell in self.blink.get_homescreen_devices("doorbell"): + if doorbell["name"] == self.name: + self.status = doorbell["enabled"] + return doorbell diff --git a/test-changes.patch b/test-changes.patch new file mode 100644 index 00000000..f3cf8f23 --- /dev/null +++ b/test-changes.patch @@ -0,0 +1,111 @@ +diff --git a/tests/test_blinkpy.py b/share/blinkpy-upstream-0.25.5/tests/test_blinkpy.py +index 771189f..a71082e 100644 +--- a/tests/test_blinkpy.py ++++ b/share/blinkpy-upstream-0.25.5/tests/test_blinkpy.py +@@ -230,6 +230,7 @@ class TestBlinkSetup(IsolatedAsyncioTestCase): + """Test that blink mini cameras are found if attached to sync module.""" + self.blink.network_ids = ["1234"] + self.blink.homescreen = { ++ "sync_modules": [{"network_id": 1234}], + "owls": [ + { + "id": 1, +@@ -261,6 +262,7 @@ class TestBlinkSetup(IsolatedAsyncioTestCase): + """Test that blink mini cameras are properly attached to sync module.""" + self.blink.network_ids = ["1234"] + self.blink.homescreen = { ++ "sync_modules": [{"network_id": 1234}], + "owls": [ + { + "id": 1, +@@ -322,6 +324,7 @@ class TestBlinkSetup(IsolatedAsyncioTestCase): + """Test that blink doorbell cameras are properly attached to sync module.""" + self.blink.network_ids = ["1234"] + self.blink.homescreen = { ++ "sync_modules": [{"network_id": 1234}], + "doorbells": [ + { + "id": 1, +@@ -341,11 +344,66 @@ class TestBlinkSetup(IsolatedAsyncioTestCase): + result, {"1234": [{"name": "foo", "id": "1234", "type": "doorbell"}]} + ) + ++ @mock.patch("blinkpy.api.request_camera_usage") ++ async def test_blink_doorbell_with_alt_homescreen_key(self, mock_usage): ++ """Test that doorbells are discovered from alternate homescreen keys.""" ++ self.blink.network_ids = ["1234"] ++ self.blink.homescreen = { ++ "sync_modules": [{"network_id": 1234}], ++ "lotus": { ++ "devices": [ ++ { ++ "id": 1, ++ "name": "foo", ++ "network_id": 1234, ++ "onboarded": True, ++ "enabled": True, ++ "status": "online", ++ "thumbnail": "/foo/bar", ++ "serial": "abc123", ++ } ++ ] ++ } ++ } ++ mock_usage.return_value = {"networks": [{"cameras": [], "network_id": 1234}]} ++ result = await self.blink.setup_camera_list() ++ self.assertEqual( ++ result, {"1234": [{"name": "foo", "id": "1234", "type": "doorbell"}]} ++ ) ++ ++ @mock.patch("blinkpy.blinkpy.BlinkLotus.start") ++ @mock.patch("blinkpy.api.request_camera_usage") ++ async def test_blink_syncless_doorbell_not_in_camera_usage( ++ self, mock_usage, mock_lotus_start ++ ): ++ """Test that sync-less doorbells initialize even without camera_usage network.""" ++ mock_lotus_start.return_value = True ++ self.blink.network_ids = [] ++ self.blink.homescreen = { ++ "doorbells": [ ++ { ++ "id": 1, ++ "name": "foo", ++ "network_id": 1234, ++ "onboarded": True, ++ "enabled": True, ++ "status": "online", ++ "thumbnail": "/foo/bar", ++ "serial": "abc123", ++ } ++ ] ++ } ++ mock_usage.return_value = {"networks": []} ++ result = await self.blink.setup_camera_list() ++ self.assertEqual(result, {}) ++ self.assertEqual(self.blink.sync["foo"].__class__, BlinkLotus) ++ + @mock.patch("blinkpy.api.request_camera_usage") + async def test_blink_multi_doorbell(self, mock_usage): + """Test that multiple doorbells are properly attached to sync module.""" + self.blink.network_ids = ["1234"] + self.blink.homescreen = { ++ "sync_modules": [{"network_id": 1234}], + "doorbells": [ + { + "id": 1, +@@ -384,6 +442,7 @@ class TestBlinkSetup(IsolatedAsyncioTestCase): + """Test that multiple minis are properly attached to sync module.""" + self.blink.network_ids = ["1234"] + self.blink.homescreen = { ++ "sync_modules": [{"network_id": 1234}], + "owls": [ + { + "id": 1, +@@ -422,6 +481,7 @@ class TestBlinkSetup(IsolatedAsyncioTestCase): + """Test that a mix of cameras are properly attached to sync module.""" + self.blink.network_ids = ["1234"] + self.blink.homescreen = { ++ "sync_modules": [{"network_id": 1234}], + "doorbells": [ + { + "id": 1, diff --git a/tests/test_blinkpy.py b/tests/test_blinkpy.py index 771189f7..b9953e0b 100644 --- a/tests/test_blinkpy.py +++ b/tests/test_blinkpy.py @@ -230,6 +230,7 @@ async def test_blink_mini_cameras_returned(self): """Test that blink mini cameras are found if attached to sync module.""" self.blink.network_ids = ["1234"] self.blink.homescreen = { + "sync_modules": [{"network_id": 1234}], "owls": [ { "id": 1, @@ -241,7 +242,7 @@ async def test_blink_mini_cameras_returned(self): "thumbnail": "/foo/bar", "serial": "abc123", } - ] + ], } result = await self.blink.setup_owls() self.assertEqual(self.blink.network_ids, ["1234"]) @@ -261,6 +262,7 @@ async def test_blink_mini_attached_to_sync(self, mock_usage): """Test that blink mini cameras are properly attached to sync module.""" self.blink.network_ids = ["1234"] self.blink.homescreen = { + "sync_modules": [{"network_id": 1234}], "owls": [ { "id": 1, @@ -272,7 +274,7 @@ async def test_blink_mini_attached_to_sync(self, mock_usage): "thumbnail": "/foo/bar", "serial": "abc123", } - ] + ], } mock_usage.return_value = {"networks": [{"cameras": [], "network_id": 1234}]} result = await self.blink.setup_camera_list() @@ -322,6 +324,7 @@ async def test_blink_doorbell_attached_to_sync(self, mock_usage): """Test that blink doorbell cameras are properly attached to sync module.""" self.blink.network_ids = ["1234"] self.blink.homescreen = { + "sync_modules": [{"network_id": 1234}], "doorbells": [ { "id": 1, @@ -333,7 +336,34 @@ async def test_blink_doorbell_attached_to_sync(self, mock_usage): "thumbnail": "/foo/bar", "serial": "abc123", } - ] + ], + } + mock_usage.return_value = {"networks": [{"cameras": [], "network_id": 1234}]} + result = await self.blink.setup_camera_list() + self.assertEqual( + result, {"1234": [{"name": "foo", "id": "1234", "type": "doorbell"}]} + ) + + @mock.patch("blinkpy.api.request_camera_usage") + async def test_blink_doorbell_with_alt_homescreen_key(self, mock_usage): + """Test that doorbells are discovered from alternate homescreen keys.""" + self.blink.network_ids = ["1234"] + self.blink.homescreen = { + "sync_modules": [{"network_id": 1234}], + "lotus": { + "devices": [ + { + "id": 1, + "name": "foo", + "network_id": 1234, + "onboarded": True, + "enabled": True, + "status": "online", + "thumbnail": "/foo/bar", + "serial": "abc123", + } + ] + }, } mock_usage.return_value = {"networks": [{"cameras": [], "network_id": 1234}]} result = await self.blink.setup_camera_list() @@ -341,11 +371,42 @@ async def test_blink_doorbell_attached_to_sync(self, mock_usage): result, {"1234": [{"name": "foo", "id": "1234", "type": "doorbell"}]} ) + @mock.patch("blinkpy.blinkpy.BlinkLotus.start") + @mock.patch("blinkpy.api.request_camera_usage") + async def test_blink_syncless_doorbell_not_in_camera_usage( + self, mock_usage, mock_lotus_start + ): + """Test that sync-less doorbells initialization. + + Ensure they initialize even without camera_usage network.. + """ + mock_lotus_start.return_value = True + self.blink.network_ids = [] + self.blink.homescreen = { + "doorbells": [ + { + "id": 1, + "name": "foo", + "network_id": 1234, + "onboarded": True, + "enabled": True, + "status": "online", + "thumbnail": "/foo/bar", + "serial": "abc123", + } + ] + } + mock_usage.return_value = {"networks": []} + result = await self.blink.setup_camera_list() + self.assertEqual(result, {}) + self.assertEqual(self.blink.sync["foo"].__class__, BlinkLotus) + @mock.patch("blinkpy.api.request_camera_usage") async def test_blink_multi_doorbell(self, mock_usage): """Test that multiple doorbells are properly attached to sync module.""" self.blink.network_ids = ["1234"] self.blink.homescreen = { + "sync_modules": [{"network_id": 1234}], "doorbells": [ { "id": 1, @@ -367,7 +428,7 @@ async def test_blink_multi_doorbell(self, mock_usage): "thumbnail": "/bar/foo", "serial": "zxc456", }, - ] + ], } expected = { "1234": [ @@ -384,6 +445,7 @@ async def test_blink_multi_mini(self, mock_usage): """Test that multiple minis are properly attached to sync module.""" self.blink.network_ids = ["1234"] self.blink.homescreen = { + "sync_modules": [{"network_id": 1234}], "owls": [ { "id": 1, @@ -405,7 +467,7 @@ async def test_blink_multi_mini(self, mock_usage): "thumbnail": "/bar/foo", "serial": "zxc456", }, - ] + ], } expected = { "1234": [ @@ -422,6 +484,7 @@ async def test_blink_camera_mix(self, mock_usage): """Test that a mix of cameras are properly attached to sync module.""" self.blink.network_ids = ["1234"] self.blink.homescreen = { + "sync_modules": [{"network_id": 1234}], "doorbells": [ { "id": 1, diff --git a/tests/test_sync_module.py b/tests/test_sync_module.py index b35f56fd..cc503379 100644 --- a/tests/test_sync_module.py +++ b/tests/test_sync_module.py @@ -84,10 +84,7 @@ def test_bad_arm(self, mock_resp) -> None: def test_get_unique_info_valid_device(self, mock_resp) -> None: """Check that we get the correct info.""" - device = { - "enabled": True, - "name": "doorbell1", - } + device = {"enabled": True, "name": "doorbell1", "id": 1234, "network_id": 5678} self.blink.homescreen = {"doorbells": [device], "owls": []} self.assertEqual(self.blink.sync["test"].get_unique_info("doorbell1"), device) From d24f5aa4ae3e03d68f2fa2af183c6e975002bb20 Mon Sep 17 00:00:00 2001 From: Nir Gal Date: Sun, 29 Mar 2026 12:07:33 +0300 Subject: [PATCH 2/2] Delete share directory --- .../blinkpy/blinkpy.py | 536 ------------ .../blinkpy/sync_module.py | 785 ------------------ .../tests/test_blinkpy.py | 592 ------------- 3 files changed, 1913 deletions(-) delete mode 100644 share/blinkpy-upstream-0.25.5/blinkpy/blinkpy.py delete mode 100644 share/blinkpy-upstream-0.25.5/blinkpy/sync_module.py delete mode 100644 share/blinkpy-upstream-0.25.5/tests/test_blinkpy.py diff --git a/share/blinkpy-upstream-0.25.5/blinkpy/blinkpy.py b/share/blinkpy-upstream-0.25.5/blinkpy/blinkpy.py deleted file mode 100644 index 375f9a63..00000000 --- a/share/blinkpy-upstream-0.25.5/blinkpy/blinkpy.py +++ /dev/null @@ -1,536 +0,0 @@ -""" -blinkpy is an unofficial api for the Blink security camera system. - -repo url: https://github.com/fronzbot/blinkpy - -Original protocol hacking by MattTW : -https://github.com/MattTW/BlinkMonitorProtocol - -Published under the MIT license - See LICENSE file for more details. -"Blink Wire-Free HS Home Monitoring & Alert Systems" is a trademark -owned by Immedia Inc., see www.blinkforhome.com for more information. -blinkpy is in no way affiliated with Blink, nor Immedia Inc. -""" - -import os.path -import time -import logging -import datetime -import aiofiles -import aiofiles.ospath -from requests.structures import CaseInsensitiveDict -from dateutil.parser import parse -from slugify import slugify - -from blinkpy import api -from blinkpy.sync_module import BlinkSyncModule, BlinkOwl, BlinkLotus -from blinkpy.helpers import util -from blinkpy.helpers.constants import ( - DEFAULT_MOTION_INTERVAL, - DEFAULT_REFRESH, - MIN_THROTTLE_TIME, - TIMEOUT_MEDIA, -) -from blinkpy.helpers.constants import __version__ -from blinkpy.auth import Auth, BlinkTwoFARequiredError, TokenRefreshFailed, LoginError - -_LOGGER = logging.getLogger(__name__) - - -class Blink: - """Class to initialize communication.""" - - def __init__( - self, - refresh_rate=DEFAULT_REFRESH, - motion_interval=DEFAULT_MOTION_INTERVAL, - no_owls=False, - session=None, - ): - """ - Initialize Blink system. - - :param refresh_rate: Refresh rate of blink information. - Defaults to 30 (seconds) - :param motion_interval: How far back to register motion in minutes. - Defaults to last refresh time. - Useful for preventing motion_detected property - from de-asserting too quickly. - :param no_owls: Disable searching for owl entries (blink mini cameras \ - only known entity). Prevents an unnecessary API call \ - if you don't have these in your network. - """ - self.auth = Auth(session=session) - self.network_ids = [] - self.urls = None - self.sync = CaseInsensitiveDict({}) - self.last_refresh = None - self.refresh_rate = refresh_rate - self.networks = [] - self.cameras = CaseInsensitiveDict({}) - self.video_list = CaseInsensitiveDict({}) - self.motion_interval = motion_interval - self.version = __version__ - self.available = False - self.homescreen = {} - self.no_owls = no_owls - - def _iter_device_dicts(self, payload): - """Yield device-like dictionaries from potentially nested payloads.""" - if isinstance(payload, list): - for item in payload: - yield from self._iter_device_dicts(item) - return - if isinstance(payload, dict): - # Treat this as a device entry when it has the minimum fields. - if "name" in payload and "network_id" in payload: - yield payload - for value in payload.values(): - yield from self._iter_device_dicts(value) - - def get_homescreen_devices(self, kind): - """Return homescreen devices for a given kind.""" - key_candidates = { - "mini": ["owls", "mini_cameras", "minis"], - "doorbell": ["doorbells", "lotus", "doorbell_cameras"], - } - devices = [] - seen = set() - for key in key_candidates.get(kind, []): - payload = self.homescreen.get(key) - for device in self._iter_device_dicts(payload): - signature = ( - str(device.get("id")), - str(device.get("network_id")), - str(device.get("name")), - ) - if signature in seen: - continue - seen.add(signature) - devices.append(device) - return devices - - def has_sync_module_for_network(self, network_id): - """Check whether homescreen reports a real sync module for network.""" - try: - for sync in self.homescreen.get("sync_modules", []): - if str(sync.get("network_id")) == str(network_id): - return True - except AttributeError: - return False - return False - - @property - def client_id(self): - """Return the client id.""" - return self.auth.client_id - - @property - def user_id(self): - """Return the user id.""" - return self.auth.user_id - - @property - def account_id(self): - """Return the account id.""" - return self.auth.account_id - - async def prompt_2fa(self): - """Prompt user for two-factor authentication code.""" - code = input("Enter the two-factor authentication code: ") - await self.send_2fa_code(code) - - async def send_2fa_code(self, code): - """Send the two-factor authentication code to complete login.""" - # Complete OAuth v2 2FA flow - success = await self.auth.complete_2fa_login(code) - if not success: - _LOGGER.error("OAuth v2 2FA completion failed.") - return False - - # Continue setup flow - same steps as start() after auth.startup() - try: - self.setup_urls() - await self.get_homescreen() - except BlinkSetupError: - _LOGGER.error("Cannot setup Blink platform after 2FA.") - self.available = False - return False - - if not self.last_refresh: - self.last_refresh = int(time.time() - self.refresh_rate * 1.05) - _LOGGER.debug( - "Initialized last_refresh to %s == %s", - self.last_refresh, - datetime.datetime.fromtimestamp(self.last_refresh), - ) - - return await self.setup_post_verify() - - @util.Throttle(seconds=MIN_THROTTLE_TIME) - async def refresh(self, force=False, force_cache=False): - """ - Perform a system refresh. - - :param force: Used to override throttle, resets refresh - :param force_cache: Used to force update without overriding throttle - """ - if force or force_cache or self.check_if_ok_to_update(): - if not self.available: - await self.setup_post_verify() - - await self.get_homescreen() - - for sync_name, sync_module in self.sync.items(): - _LOGGER.debug("Attempting refresh of blink.sync['%s']", sync_name) - await sync_module.refresh(force_cache=(force or force_cache)) - - if not force_cache: - # Prevents rapid clearing of motion detect property - self.last_refresh = int(time.time()) - last_refresh = datetime.datetime.fromtimestamp(self.last_refresh) - _LOGGER.debug("last_refresh = %s", last_refresh) - - return True - return False - - async def start(self): - """Perform full system setup.""" - try: - await self.auth.startup() - self.setup_urls() - await self.get_homescreen() - except (LoginError, TokenRefreshFailed, BlinkSetupError): - _LOGGER.error("Cannot setup Blink platform.") - self.available = False - return False - except BlinkTwoFARequiredError: - raise - - if not self.last_refresh: - # Initialize last_refresh to be just before the refresh delay period. - self.last_refresh = int(time.time() - self.refresh_rate * 1.05) - _LOGGER.debug( - "Initialized last_refresh to %s == %s", - self.last_refresh, - datetime.datetime.fromtimestamp(self.last_refresh), - ) - - return await self.setup_post_verify() - - async def setup_post_verify(self): - """Initialize blink system after verification.""" - try: - if not self.homescreen: - await self.get_homescreen() - await self.setup_networks() - networks = self.setup_network_ids() - cameras = await self.setup_camera_list() - except BlinkSetupError: - self.available = False - return False - - for name, network_id in networks.items(): - sync_cameras = cameras.get(network_id, {}) - await self.setup_sync_module(name, network_id, sync_cameras) - - self.cameras = self.merge_cameras() - - self.available = True - return True - - async def setup_sync_module(self, name, network_id, cameras): - """Initialize a sync module.""" - self.sync[name] = BlinkSyncModule(self, name, network_id, cameras) - await self.sync[name].start() - - async def get_homescreen(self): - """Get homescreen information.""" - if self.no_owls: - _LOGGER.debug("Skipping owl extraction.") - self.homescreen = {} - return - res = await api.request_homescreen(self) - await self.validate_homescreen(res) - _LOGGER.debug("homescreen = %s", util.json_dumps(self.homescreen)) - - async def validate_homescreen(self, response): - """Validate and process homescreen response data.""" - self.homescreen = await response.json() - self.auth.client_id = response.headers.get("Client-Id") - self.auth.user_id = response.headers.get("User-Id") - - async def setup_owls(self): - """Check for mini cameras.""" - network_list = [] - camera_list = [] - try: - for owl in self.get_homescreen_devices("mini"): - name = owl["name"] - network_id = str(owl["network_id"]) - if network_id in self.network_ids and self.has_sync_module_for_network( - network_id - ): - camera_list.append( - {network_id: {"name": name, "id": network_id, "type": "mini"}} - ) - continue - if owl["onboarded"]: - network_list.append(str(network_id)) - self.sync[name] = BlinkOwl(self, name, network_id, owl) - await self.sync[name].start() - except (KeyError, TypeError): - # No sync-less devices found - pass - - self.network_ids.extend(network_list) - return camera_list - - async def setup_lotus(self): - """Check for doorbells cameras.""" - network_list = [] - camera_list = [] - try: - for lotus in self.get_homescreen_devices("doorbell"): - name = lotus["name"] - network_id = str(lotus["network_id"]) - if network_id in self.network_ids and self.has_sync_module_for_network( - network_id - ): - camera_list.append( - { - network_id: { - "name": name, - "id": network_id, - "type": "doorbell", - } - } - ) - continue - if lotus["onboarded"]: - network_list.append(str(network_id)) - self.sync[name] = BlinkLotus(self, name, network_id, lotus) - await self.sync[name].start() - except (KeyError, TypeError): - # No sync-less devices found - pass - - self.network_ids.extend(network_list) - return camera_list - - async def setup_camera_list(self): - """Create camera list for onboarded networks.""" - all_cameras = {} - response = await api.request_camera_usage(self) - try: - for network in response["networks"]: - _LOGGER.info("network = %s", util.json_dumps(network)) - camera_network = str(network["network_id"]) - if camera_network not in all_cameras: - all_cameras[camera_network] = [] - for camera in network["cameras"]: - all_cameras[camera_network].append( - {"name": camera["name"], "id": camera["id"], "type": "default"} - ) - mini_cameras = await self.setup_owls() - lotus_cameras = await self.setup_lotus() - for camera in mini_cameras: - for network, camera_info in camera.items(): - all_cameras.setdefault(network, []) - all_cameras[network].append(camera_info) - for camera in lotus_cameras: - for network, camera_info in camera.items(): - all_cameras.setdefault(network, []) - all_cameras[network].append(camera_info) - return all_cameras - except (KeyError, TypeError) as ex: - _LOGGER.error("Unable to retrieve cameras from response %s", response) - raise BlinkSetupError from ex - - def setup_urls(self): - """Create urls for api.""" - try: - self.urls = util.BlinkURLHandler(self.auth.region_id) - except TypeError as ex: - _LOGGER.error( - "Unable to extract region is from response %s", self.auth.tier_info - ) - raise BlinkSetupError from ex - - async def setup_networks(self): - """Get network information.""" - response = await api.request_networks(self) - try: - self.networks = response["summary"] - except (KeyError, TypeError) as ex: - raise BlinkSetupError from ex - - def setup_network_ids(self): - """Create the network ids for onboarded networks.""" - all_networks = [] - network_dict = {} - try: - for network, status in self.networks.items(): - if status["onboarded"]: - all_networks.append(f"{network}") - network_dict[status["name"]] = network - except AttributeError as ex: - _LOGGER.error( - "Unable to retrieve network information from %s", self.networks - ) - raise BlinkSetupError from ex - - self.network_ids = all_networks - return network_dict - - def check_if_ok_to_update(self): - """Check if it is ok to perform an http request.""" - current_time = int(time.time()) - last_refresh = self.last_refresh - if last_refresh is None: - last_refresh = 0 - if current_time >= (last_refresh + self.refresh_rate): - return True - return False - - def merge_cameras(self): - """Merge all sync camera dicts into one.""" - combined = CaseInsensitiveDict({}) - for sync in self.sync: - combined = util.merge_dicts(combined, self.sync[sync].cameras) - return combined - - async def save(self, file_name): - """Save login data to file.""" - await util.json_save(self.auth.login_attributes, file_name) - - async def get_status(self): - """Get the blink system notification status.""" - response = await api.request_notification_flags(self) - return response.get("notifications", response) - - async def set_status(self, data_dict={}): - """ - Set the blink system notification status. - - :param data_dict: Dictionary of notification keys to modify. - Example: {'low_battery': False, 'motion': False} - """ - response = await api.request_set_notification_flag(self, data_dict) - return response - - async def download_videos( - self, path, since=None, camera="all", stop=10, delay=1, debug=False - ): - """ - Download all videos from server since specified time. - - :param path: Path to write files. /path/_.mp4 - :param since: Date and time to get videos from. - Ex: "2018/07/28 12:33:00" to retrieve videos since - July 28th 2018 at 12:33:00 - :param camera: Camera name to retrieve. Defaults to "all". - Use a list for multiple cameras. - :param stop: Page to stop on (~25 items per page. Default page 10). - :param delay: Number of seconds to wait in between subsequent video downloads. - :param debug: Set to TRUE to prevent downloading of items. - Instead of downloading, entries will be printed to log. - """ - if not isinstance(camera, list): - camera = [camera] - - results = await self.get_videos_metadata(since=since, stop=stop) - await self._parse_downloaded_items(results, camera, path, delay, debug) - - async def get_videos_metadata(self, since=None, camera="all", stop=10): - """ - Fetch and return video metadata. - - :param since: Date and time to get videos from. - Ex: "2018/07/28 12:33:00" to retrieve videos since - July 28th 2018 at 12:33:00 - :param stop: Page to stop on (~25 items per page. Default page 10). - """ - videos = [] - if since is None: - since_epochs = self.last_refresh - else: - parsed_datetime = parse(since, fuzzy=True) - since_epochs = parsed_datetime.timestamp() - - formatted_date = util.get_time(time_to_convert=since_epochs) - _LOGGER.info("Retrieving videos since %s", formatted_date) - - for page in range(1, stop): - response = await api.request_videos(self, time=since_epochs, page=page) - _LOGGER.debug("Processing page %s", page) - try: - result = response["media"] - if not result: - raise KeyError - videos.extend(result) - except (KeyError, TypeError): - _LOGGER.info("No videos found on page %s. Exiting.", page) - break - return videos - - async def do_http_get(self, address): - """ - Do an http_get on address. - - :param address: address to be added to base_url. - """ - response = await api.http_get( - self, - url=f"{self.urls.base_url}{address}", - stream=True, - json=False, - timeout=TIMEOUT_MEDIA, - ) - return response - - async def _parse_downloaded_items(self, result, camera, path, delay, debug): - """Parse downloaded videos.""" - for item in result: - try: - created_at = item["created_at"] - camera_name = item["device_name"] - is_deleted = item["deleted"] - address = item["media"] - except KeyError: - _LOGGER.info("Missing clip information, skipping...") - continue - - if camera_name not in camera and "all" not in camera: - _LOGGER.debug("Skipping videos for %s.", camera_name) - continue - - if is_deleted: - _LOGGER.debug("%s: %s is marked as deleted.", camera_name, address) - continue - - filename = f"{camera_name}-{created_at}" - filename = f"{slugify(filename)}.mp4" - filename = os.path.join(path, filename) - - if not debug: - if await aiofiles.ospath.isfile(filename): - _LOGGER.info("%s already exists, skipping...", filename) - continue - - response = await self.do_http_get(address) - async with aiofiles.open(filename, "wb") as vidfile: - await vidfile.write(await response.read()) - - _LOGGER.info("Downloaded video to %s", filename) - else: - print( - f"Camera: {camera_name}, Timestamp: {created_at}, " - f"Address: {address}, Filename: {filename}" - ) - if delay > 0: - time.sleep(delay) - - -class BlinkSetupError(Exception): - """Class to handle setup errors.""" diff --git a/share/blinkpy-upstream-0.25.5/blinkpy/sync_module.py b/share/blinkpy-upstream-0.25.5/blinkpy/sync_module.py deleted file mode 100644 index 103d88d1..00000000 --- a/share/blinkpy-upstream-0.25.5/blinkpy/sync_module.py +++ /dev/null @@ -1,785 +0,0 @@ -"""Defines a sync module for Blink.""" - -import logging -import string -import datetime -import traceback -import asyncio -import aiofiles -from sortedcontainers import SortedSet -from requests.structures import CaseInsensitiveDict -from blinkpy import api -from blinkpy.camera import BlinkCamera, BlinkCameraMini, BlinkDoorbell -from blinkpy.helpers.util import ( - time_to_seconds, - backoff_seconds, - to_alphanumeric, - json_dumps, -) -from blinkpy.helpers.constants import ONLINE - -_LOGGER = logging.getLogger(__name__) - - -class BlinkSyncModule: - """Class to initialize sync module.""" - - def __init__(self, blink, network_name, network_id, camera_list): - """ - Initialize Blink sync module. - - :param blink: Blink class instantiation - """ - self.blink = blink - self.network_id = network_id - self.region_id = blink.auth.region_id - self.name = network_name - self.serial = None - self._version = None - self.status = "offline" - self.sync_id = None - self.host = None - self.summary = None - self.network_info = None - self.events = [] - self.cameras = CaseInsensitiveDict({}) - self.motion_interval = blink.motion_interval - self.motion = {} - # A dictionary where keys are the camera names, and - # values are lists of recent clips. - self.last_records = {} - self.camera_list = camera_list - self.available = False - # type_key_map is only for the mini's and the doorbells. - # Outdoor cameras have their own URL API which must be queried. - self.type_key_map = { - "mini": "owls", - "doorbell": "doorbells", - } - self._names_table = {} - self._local_storage = { - "enabled": False, - "compatible": False, - "status": False, - "last_manifest_id": None, - "manifest": SortedSet(), - "manifest_stale": True, - "last_manifest_read": datetime.datetime(1970, 1, 1, 0, 0, 0).isoformat(), - } - - @property - def attributes(self): - """Return sync attributes.""" - attr = { - "name": self.name, - "id": self.sync_id, - "network_id": self.network_id, - "serial": self.serial, - "version": self._version, - "status": self.status, - "region_id": self.region_id, - "local_storage": self.local_storage, - } - return attr - - @property - def urls(self): - """Return device urls.""" - return self.blink.urls - - @property - def online(self): - """Return boolean system online status.""" - try: - return ONLINE[self.status] - except KeyError: - _LOGGER.error("Unknown sync module status %s", self.status) - self.available = False - return False - - @property - def version(self): - """Return the Syncmodule Firmware version.""" - return self._version - - @property - def arm(self): - """Return status of sync module: armed/disarmed.""" - try: - return self.network_info["network"]["armed"] - except (KeyError, TypeError): - self.available = False - return None - - @property - def local_storage(self): - """Indicate if local storage is activated or not (True/False).""" - return self._local_storage["status"] - - @property - def local_storage_manifest_ready(self): - """Indicate if the manifest is up-to-date.""" - return not self._local_storage["manifest_stale"] - - async def async_arm(self, value): - """Arm or disarm camera.""" - if value: - return await api.request_system_arm(self.blink, self.network_id) - return await api.request_system_disarm(self.blink, self.network_id) - - async def start(self): - """Initialize the system.""" - _LOGGER.debug("Initializing the sync module") - response = await self.sync_initialize() - if not response: - return False - - try: - self.sync_id = self.summary["id"] - self.serial = self.summary["serial"] - self.status = self.summary["status"] - except KeyError: - _LOGGER.error("Could not extract some sync module info: %s", response) - - is_ok = await self.get_network_info() - - if not is_ok or not await self.update_cameras(): - self.available = False - return False - self.available = True - return True - - async def sync_initialize(self): - """Initialize a sync module.""" - # Doesn't include local store info for some reason. - response = await api.request_syncmodule(self.blink, self.network_id) - try: - self.summary = response["syncmodule"] - self.network_id = self.summary["network_id"] - await self._init_local_storage(self.summary["id"]) - except (TypeError, KeyError): - _LOGGER.error( - "Could not retrieve sync module information with response: %s", response - ) - return False - self._version = self.summary.get("fw_version") - return response - - async def _init_local_storage(self, sync_id): - """Initialize local storage from homescreen dictionary.""" - home_screen = self.blink.homescreen - sync_module = None - try: - sync_modules = home_screen["sync_modules"] - for mod in sync_modules: - if mod["id"] == sync_id: - self._local_storage["enabled"] = mod["local_storage_enabled"] - self._local_storage["compatible"] = mod["local_storage_compatible"] - self._local_storage["status"] = ( - mod["local_storage_status"] == "active" - ) - self._local_storage["last_manifest_read"] = ( - datetime.datetime.utcnow() - datetime.timedelta(seconds=10) - ).isoformat() - sync_module = mod - except (TypeError, KeyError): - _LOGGER.error( - "Could not retrieve sync module information from home screen: %s", - home_screen, - ) - return False - return sync_module - - async def update_cameras(self, camera_type=BlinkCamera): - """Update cameras from server.""" - type_map = { - "mini": BlinkCameraMini, - "doorbell": BlinkDoorbell, - "default": BlinkCamera, - } - try: - _LOGGER.debug("Updating cameras") - for camera_config in self.camera_list: - _LOGGER.debug("Updating camera_config %s", json_dumps(camera_config)) - if "name" not in camera_config: - break - blink_camera_type = camera_config.get("type", "") - name = camera_config["name"] - self.motion[name] = False - unique_info = self.get_unique_info(name) - if blink_camera_type in type_map: - camera_type = type_map[blink_camera_type] - self.cameras[name] = camera_type(self) - camera_info = await self.get_camera_info( - camera_config["id"], unique_info=unique_info - ) - self._names_table[to_alphanumeric(name)] = name - await self.cameras[name].update( - camera_info, force_cache=True, force=True - ) - except KeyError: - _LOGGER.error("Could not create camera instances for %s", self.name) - return False - return True - - def get_unique_info(self, name): - """Extract unique information for Minis and Doorbells.""" - try: - for type_name in self.type_key_map: - for device in self.blink.get_homescreen_devices(type_name): - _LOGGER.debug("checking device %s", device) - if device["name"] == name: - _LOGGER.debug("Found unique_info %s", device) - return device - except (TypeError, KeyError): - pass - return None - - async def get_events(self, **kwargs): - """Retrieve events from server.""" - force = kwargs.pop("force", False) - response = await api.request_sync_events( - self.blink, self.network_id, force=force - ) - try: - return response["event"] - except (TypeError, KeyError): - _LOGGER.error("Could not extract events: %s", response) - return False - - async def get_camera_info(self, camera_id, **kwargs): - """Retrieve camera information.""" - unique = kwargs.get("unique_info", None) - if unique is not None: - return unique - response = await api.request_camera_info(self.blink, self.network_id, camera_id) - try: - return response["camera"][0] - except (TypeError, KeyError): - _LOGGER.error( - "Could not extract camera info for %s: %s", camera_id, response - ) - return {} - - async def get_network_info(self): - """Retrieve network status.""" - self.network_info = await api.request_network_update( - self.blink, self.network_id - ) - try: - if self.network_info["network"]["sync_module_error"]: - raise KeyError - except (TypeError, KeyError): - self.available = False - return False - return True - - async def refresh(self, force_cache=False): - """Get all blink cameras and pulls their most recent status.""" - if not await self.get_network_info(): - return - await self.update_local_storage_manifest() - await self.check_new_videos() - for camera_name in self.cameras: - camera_id = self.cameras[camera_name].camera_id - camera_info = await self.get_camera_info( - camera_id, - unique_info=self.get_unique_info(camera_name), - ) - await self.cameras[camera_name].update(camera_info, force_cache=force_cache) - self.available = True - - async def check_new_videos(self): - """Check if new videos since last refresh.""" - _LOGGER.debug("Checking for new videos") - try: - interval = self.blink.last_refresh - self.motion_interval * 60 - last_refresh = datetime.datetime.fromtimestamp(self.blink.last_refresh) - _LOGGER.debug("last_refresh = %s", last_refresh) - _LOGGER.debug("interval = %s", interval) - except TypeError: - # This is the first start, so refresh hasn't happened yet. - # No need to check for motion. - ex = traceback.format_exc() - _LOGGER.error( - "Error calculating interval (last_refresh = %s): %s", - self.blink.last_refresh, - ex, - ) - trace = "".join(traceback.format_stack()) - _LOGGER.debug("\n%s", trace) - _LOGGER.info("No new videos since last refresh.") - return False - - resp = await api.request_videos(self.blink, time=interval, page=1) - - last_record = {} - for camera in self.cameras: - # Initialize the list if doesn't exist yet. - if camera not in self.last_records: - self.last_records[camera] = [] - # Hang on to the last record if there is one. - if len(self.last_records[camera]) > 0: - last_record[camera] = self.last_records[camera][-1] - # Reset in preparation for processing new entries. - self.last_records[camera] = [] - self.motion[camera] = False - - try: - info = resp["media"] - except (KeyError, TypeError): - _LOGGER.warning("Could not check for motion. Response: %s", resp) - return False - - for entry in info: - try: - name = entry["device_name"] - clip_url = entry["media"] - timestamp = entry["created_at"] - if self.check_new_video_time(timestamp): - self.motion[name] = True and self.arm - record = {"clip": clip_url, "time": timestamp} - self.last_records[name].append(record) - except KeyError: - last_refresh = datetime.datetime.fromtimestamp(self.blink.last_refresh) - _LOGGER.debug( - "No new videos for %s since last refresh at %s.", - entry, - last_refresh, - ) - - # Process local storage if active and if the manifest is ready. - last_manifest_read = datetime.datetime.fromisoformat( - self._local_storage["last_manifest_read"] - ) - _LOGGER.debug("last_manifest_read = %s", last_manifest_read) - _LOGGER.debug("Manifest ready? %s", self.local_storage_manifest_ready) - if self.local_storage and self.local_storage_manifest_ready: - _LOGGER.debug("Processing updated manifest") - manifest = self._local_storage["manifest"] - last_manifest_id = self._local_storage["last_manifest_id"] - last_manifest_read = self._local_storage["last_manifest_read"] - last_read_local = ( - datetime.datetime.fromisoformat(last_manifest_read) - .replace(tzinfo=datetime.timezone.utc) - .astimezone(tz=None) - ) - last_clip_time = None - num_new = 0 - for item in reversed(manifest): - iso_timestamp = item.created_at.isoformat() - - _LOGGER.debug( - "Checking '%s': clip_time = %s, manifest_read = %s", - item.name, - iso_timestamp, - last_manifest_read, - ) - # Exit the loop once there are no new videos in the list. - if not self.check_new_video_time(iso_timestamp, last_manifest_read): - _LOGGER.info( - "No new local storage videos since last manifest " - "read at %s.", - last_read_local, - ) - break - _LOGGER.debug("Found new item in local storage manifest: %s", item) - name = item.name - clip_url = item.url(last_manifest_id) - await item.prepare_download(self.blink) - self.motion[name] = True - record = {"clip": clip_url, "time": iso_timestamp} - self.last_records[name].append(record) - last_clip_time = item.created_at - num_new += 1 - - # The manifest became ready, and we read recent clips from it. - if num_new > 0: - last_manifest_read = ( - datetime.datetime.utcnow() - datetime.timedelta(seconds=10) - ).isoformat() - self._local_storage["last_manifest_read"] = last_manifest_read - _LOGGER.debug("Updated last_manifest_read to %s", last_manifest_read) - _LOGGER.debug("Last clip time was %s", last_clip_time) - # We want to keep the last record when no new motion was detected. - for camera in self.cameras: - # Check if there are no new records, indicating motion. - if len(self.last_records[camera]) == 0: - # If no new records, check if we had a previous last record. - if camera in last_record: - # Put the last record back into the empty list. - self.last_records[camera].append(last_record[camera]) - - return True - - def check_new_video_time(self, timestamp, reference=None): - """Check if video has timestamp since last refresh. - - :param timestamp ISO-formatted timestamp string - :param reference ISO-formatted reference timestamp string - """ - if not reference: - return time_to_seconds(timestamp) > self.blink.last_refresh - return time_to_seconds(timestamp) > time_to_seconds(reference) - - async def update_local_storage_manifest(self): - """Update local storage manifest, which lists all stored clips.""" - if not self.local_storage: - self._local_storage["manifest_stale"] = True - return None - _LOGGER.debug("Updating local storage manifest") - - response = await self.poll_local_storage_manifest() - try: - manifest_request_id = response["id"] - except (TypeError, KeyError): - _LOGGER.error( - "Could not extract manifest request ID from response: %s", response - ) - self._local_storage["manifest_stale"] = True - return None - - response = await self.poll_local_storage_manifest(manifest_request_id) - try: - manifest_id = response["manifest_id"] - except (TypeError, KeyError): - _LOGGER.error("Could not extract manifest ID from response: %s", response) - self._local_storage["manifest_stale"] = True - return None - - self._local_storage["last_manifest_id"] = manifest_id - template = string.Template(api.local_storage_clip_url_template()).substitute( - account_id=self.blink.account_id, - network_id=self.network_id, - sync_id=self.sync_id, - manifest_id="$manifest_id", - clip_id="$clip_id", - ) - num_stored = len(self._local_storage["manifest"]) - try: - for item in response["clips"]: - alphanumeric_name = item["camera_name"] - if alphanumeric_name in self._names_table: - camera_name = self._names_table[alphanumeric_name] - self._local_storage["manifest"].add( - LocalStorageMediaItem( - item["id"], - camera_name, - item["created_at"], - item["size"], - manifest_id, - template, - ) - ) - num_added = len(self._local_storage["manifest"]) - num_stored - if num_added > 0: - _LOGGER.info( - "Found %s new clip(s) in local storage manifest id = %s", - num_added, - manifest_id, - ) - except (TypeError, KeyError): - ex = traceback.format_exc() - _LOGGER.error("Could not extract clips list from response: %s", ex) - trace = "".join(traceback.format_stack()) - _LOGGER.debug("\n%s", trace) - self._local_storage["manifest_stale"] = True - return None - - self._local_storage["manifest_stale"] = False - return True - - async def poll_local_storage_manifest( - self, manifest_request_id=None, max_retries=4 - ): - """Poll for local storage manifest.""" - # The sync module may be busy processing another request - # (like saving a new clip). - # Poll the endpoint until it is ready, backing off each retry. - response = None - for retry in range(max_retries): - # Request building the manifest. - if not manifest_request_id: - response = await api.request_local_storage_manifest( - self.blink, self.network_id, self.sync_id - ) - if response and "id" in response: - break - # Get the manifest. - else: - response = await api.get_local_storage_manifest( - self.blink, self.network_id, self.sync_id, manifest_request_id - ) - if response and "clips" in response: - break - seconds = backoff_seconds(retry=retry, default_time=3) - _LOGGER.debug("[retry=%d] Retrying in %d seconds", retry + 1, seconds) - await asyncio.sleep(seconds) - return response - - -class BlinkOwl(BlinkSyncModule): - """Representation of a sync-less device.""" - - def __init__(self, blink, name, network_id, response): - """Initialize a sync-less object.""" - cameras = [{"name": name, "id": response["id"]}] - super().__init__(blink, name, network_id, cameras) - self.sync_id = response["id"] - self.serial = response["serial"] - self.status = response["enabled"] - if not self.serial: - self.serial = f"{network_id}-{self.sync_id}" - - async def sync_initialize(self): - """Initialize a sync-less module.""" - self.summary = { - "id": self.sync_id, - "name": self.name, - "serial": self.serial, - "status": self.status, - "onboarded": True, - "account_id": self.blink.account_id, - "network_id": self.network_id, - } - return self.summary - - async def update_cameras(self, camera_type=BlinkCameraMini): - """Update sync-less cameras.""" - return await super().update_cameras(camera_type=BlinkCameraMini) - - async def get_camera_info(self, camera_id, **kwargs): - """Retrieve camera information.""" - try: - for owl in self.blink.get_homescreen_devices("mini"): - if owl["name"] == self.name: - self.status = owl["enabled"] - return owl - except (TypeError, KeyError): - pass - return None - - async def get_network_info(self): - """Get network info for sync-less module.""" - return True - - @property - def network_info(self): - """Format owl response to resemble sync module.""" - return { - "network": { - "id": self.network_id, - "name": self.name, - "armed": self.status, - "sync_module_error": False, - "account_id": self.blink.account_id, - } - } - - @network_info.setter - def network_info(self, value): - """Set network_info property.""" - - -class BlinkLotus(BlinkSyncModule): - """Representation of a sync-less device.""" - - def __init__(self, blink, name, network_id, response): - """Initialize a sync-less object.""" - cameras = [{"name": name, "id": response["id"]}] - super().__init__(blink, name, network_id, cameras) - self.sync_id = response["id"] - self.serial = response["serial"] - self.status = response["enabled"] - if not self.serial: - self.serial = f"{network_id}-{self.sync_id}" - - async def sync_initialize(self): - """Initialize a sync-less module.""" - self.summary = { - "id": self.sync_id, - "name": self.name, - "serial": self.serial, - "status": self.status, - "onboarded": True, - "account_id": self.blink.account_id, - "network_id": self.network_id, - } - return self.summary - - async def update_cameras(self, camera_type=BlinkDoorbell): - """Update sync-less cameras.""" - return await super().update_cameras(camera_type=BlinkDoorbell) - - async def get_camera_info(self, camera_id, **kwargs): - """Retrieve camera information.""" - try: - for doorbell in self.blink.get_homescreen_devices("doorbell"): - if doorbell["name"] == self.name: - self.status = doorbell["enabled"] - return doorbell - except (TypeError, KeyError): - pass - return None - - async def get_network_info(self): - """Get network info for sync-less module.""" - return True - - @property - def network_info(self): - """Format lotus response to resemble sync module.""" - return { - "network": { - "id": self.network_id, - "name": self.name, - "armed": self.status, - "sync_module_error": False, - "account_id": self.blink.account_id, - } - } - - @network_info.setter - def network_info(self, value): - """Set network_info property.""" - - -class LocalStorageMediaItem: - """Metadata of media item in the local storage manifest.""" - - def __init__( - self, item_id, camera_name, created_at, size, manifest_id, url_template - ): - """Initialize media item. - - :param item_id: ID of the manifest item. - :param camera_name: Name of camera that took the video. - :param created_at: ISO-formatted time stamp for creation time. - :param size: Size of the video file. - """ - self._id = int(item_id) - self._camera_name = camera_name - self._created_at = datetime.datetime.fromisoformat(created_at) - self._size = size - self._url_template = url_template - self._manifest_id = manifest_id - - def _build_url(self, manifest_id, clip_id): - return string.Template(self._url_template).substitute( - manifest_id=manifest_id, clip_id=clip_id - ) - - @property - def id(self): - """Return media item ID.""" - return self._id - - @property - def name(self): - """Return name of camera that captured this media item.""" - return self._camera_name - - @property - def created_at(self): - """Return the ISO-formatted creation time stamp of this media item.""" - return self._created_at - - @property - def size(self): - """Return the reported size of this media item.""" - return self._size - - def url(self, manifest_id=None): - """Build the URL. - - Builds the url new each time since the media item is cached, - and the manifest is possibly rebuilt each refresh. - - :param manifest_id: ID of new manifest (if it changed) - :return: URL for clip retrieval - """ - if manifest_id: - self._manifest_id = manifest_id - return self._build_url(self._manifest_id, self._id) - - async def prepare_download(self, blink, max_retries=4): - """Initiate upload of media item from the sync module to Blink cloud servers.""" - if max_retries == 0: - return None - url = blink.urls.base_url + self.url() - response = await api.http_post(blink, url) - await api.wait_for_command(blink, response) - return response - - async def delete_video(self, blink, max_retries=4) -> bool: - """Delete video from sync module.""" - delete_url = blink.urls.base_url + self.url() - delete_url = delete_url.replace("request", "delete") - - for retry in range(max_retries): - delete = await api.http_post( - blink, delete_url, json=False - ) # Delete the video - if delete.status == 200: - return True - seconds = backoff_seconds(retry=retry, default_time=3) - _LOGGER.debug("[retry=%d] Retrying in %d seconds", retry + 1, seconds) - await asyncio.sleep(seconds) - return False - - async def download_video(self, blink, file_name, max_retries=4) -> bool: - """Download a previously prepared video from sync module.""" - for retry in range(max_retries): - url = blink.urls.base_url + self.url() - video = await api.http_get(blink, url, json=False) - if video.status == 200: - async with aiofiles.open(file_name, "wb") as vidfile: - await vidfile.write(await video.read()) # download the video - return True - seconds = backoff_seconds(retry=retry, default_time=3) - _LOGGER.debug( - "[retry=%d] Retrying in %d seconds: %s", retry + 1, seconds, url - ) - await asyncio.sleep(seconds) - return False - - async def download_video_delete(self, blink, file_name, max_retries=4) -> bool: - """Delete local videos. - - Initiate upload of media item from the sync module to - Blink cloud servers then download to local filesystem and delete from sync. - """ - if await self.prepare_download(blink): - if await self.download_video(blink, file_name): - if await self.delete_video(blink): - return True - return False - - def __repr__(self): - """Create string representation.""" - return ( - f"LocalStorageMediaItem(id={self._id}, camera_name={self._camera_name}, " - f"created_at={self._created_at}" - + f", size={self._size}, manifest_id={self._manifest_id}, " - f"url_template={self._url_template})" - ) - - def __str__(self): - """Create string representation.""" - return self.__repr__() - - def cmp_key(self): - """Return key to use for comparison.""" - return self._created_at - - def __eq__(self, other): - """Check equality.""" - return self.cmp_key() == other.cmp_key() - - def __lt__(self, other): - """Check less than.""" - return self.cmp_key() < other.cmp_key() - - def __hash__(self): - """Return unique hash value.""" - return self._id diff --git a/share/blinkpy-upstream-0.25.5/tests/test_blinkpy.py b/share/blinkpy-upstream-0.25.5/tests/test_blinkpy.py deleted file mode 100644 index a71082ed..00000000 --- a/share/blinkpy-upstream-0.25.5/tests/test_blinkpy.py +++ /dev/null @@ -1,592 +0,0 @@ -""" -Test full system. - -Tests the system initialization and attributes of -the main Blink system. Tests if we properly catch -any communication related errors at startup. -""" - -from unittest import mock -from unittest import IsolatedAsyncioTestCase -import time -from blinkpy.blinkpy import Blink, BlinkSetupError, LoginError, TokenRefreshFailed -from blinkpy.sync_module import BlinkOwl, BlinkLotus -from blinkpy.helpers.constants import __version__ - -SPECIAL = "!@#$%^&*()_+-=[]{}|/<>?,.'" - - -class TestBlinkSetup(IsolatedAsyncioTestCase): - """Test the Blink class in blinkpy.""" - - def setUp(self): - """Initialize blink test object.""" - self.blink = Blink(session=mock.AsyncMock()) - self.blink.available = True - - def tearDown(self): - """Cleanup blink test object.""" - self.blink = None - - async def test_initialization(self): - """Verify we can initialize blink.""" - blink = Blink() - self.assertEqual(blink.version, __version__) - - def test_network_id_failure(self): - """Check that with bad network data a setup error is raised.""" - self.blink.networks = None - with self.assertRaises(BlinkSetupError): - self.blink.setup_network_ids() - - def test_multiple_networks(self): - """Check that we handle multiple networks appropriately.""" - self.blink.networks = { - "0000": {"onboarded": False, "name": "foo"}, - "5678": {"onboarded": True, "name": "bar"}, - "1234": {"onboarded": False, "name": "test"}, - } - self.blink.setup_network_ids() - self.assertTrue("5678" in self.blink.network_ids) - - def test_multiple_onboarded_networks(self): - """Check that we handle multiple networks appropriately.""" - self.blink.networks = { - "0000": {"onboarded": False, "name": "foo"}, - "5678": {"onboarded": True, "name": "bar"}, - "1234": {"onboarded": True, "name": "test"}, - } - self.blink.setup_network_ids() - self.assertTrue("0000" not in self.blink.network_ids) - self.assertTrue("5678" in self.blink.network_ids) - self.assertTrue("1234" in self.blink.network_ids) - - @mock.patch("blinkpy.blinkpy.time.time") - async def test_throttle(self, mock_time): - """Check throttling functionality.""" - now = self.blink.refresh_rate + 1 - mock_time.return_value = now - self.assertEqual(self.blink.last_refresh, None) - self.assertEqual(self.blink.check_if_ok_to_update(), True) - self.assertEqual(self.blink.last_refresh, None) - with ( - mock.patch( - "blinkpy.sync_module.BlinkSyncModule.refresh", return_value=True - ), - mock.patch("blinkpy.blinkpy.Blink.get_homescreen", return_value=True), - ): - await self.blink.refresh(force=True) - - self.assertEqual(self.blink.last_refresh, now) - self.assertEqual(self.blink.check_if_ok_to_update(), False) - self.assertEqual(self.blink.last_refresh, now) - - async def test_not_available_refresh(self): - """Check that setup_post_verify executes on refresh when not available.""" - self.blink.available = False - with ( - mock.patch( - "blinkpy.sync_module.BlinkSyncModule.refresh", return_value=True - ), - mock.patch("blinkpy.blinkpy.Blink.get_homescreen", return_value=True), - mock.patch("blinkpy.blinkpy.Blink.setup_post_verify", return_value=True), - ): - self.assertTrue(await self.blink.refresh(force=True)) - with mock.patch("time.time", return_value=time.time() + 4): - self.assertFalse(await self.blink.refresh()) - - def test_sync_case_insensitive_dict(self): - """Check that we can access sync modules ignoring case.""" - self.blink.sync["test"] = 1234 - self.assertEqual(self.blink.sync["test"], 1234) - self.assertEqual(self.blink.sync["TEST"], 1234) - self.assertEqual(self.blink.sync["tEsT"], 1234) - - def test_sync_special_chars(self): - """Check that special chars can be used as sync name.""" - self.blink.sync[SPECIAL] = 1234 - self.assertEqual(self.blink.sync[SPECIAL], 1234) - - @mock.patch("blinkpy.api.request_camera_usage") - @mock.patch("blinkpy.api.request_homescreen") - async def test_setup_cameras(self, mock_home, mock_req): - """Check retrieval of camera information.""" - mock_home.return_value = {} - mock_req.return_value = { - "networks": [ - { - "network_id": 1234, - "cameras": [ - {"id": 5678, "name": "foo"}, - {"id": 5679, "name": "bar"}, - {"id": 5779, "name": SPECIAL}, - ], - }, - {"network_id": 4321, "cameras": [{"id": 0000, "name": "test"}]}, - ] - } - result = await self.blink.setup_camera_list() - self.assertEqual( - result, - { - "1234": [ - {"name": "foo", "id": 5678, "type": "default"}, - {"name": "bar", "id": 5679, "type": "default"}, - {"name": SPECIAL, "id": 5779, "type": "default"}, - ], - "4321": [{"name": "test", "id": 0000, "type": "default"}], - }, - ) - - @mock.patch("blinkpy.api.request_camera_usage") - async def test_setup_cameras_failure(self, mock_home): - """Check that on failure we raise a setup error.""" - mock_home.return_value = {} - with self.assertRaises(BlinkSetupError): - await self.blink.setup_camera_list() - mock_home.return_value = None - with self.assertRaises(BlinkSetupError): - await self.blink.setup_camera_list() - - def test_setup_urls(self): - """Check setup of URLS.""" - self.blink.auth.region_id = "test" - self.blink.setup_urls() - self.assertEqual(self.blink.urls.subdomain, "rest-test") - - def test_setup_urls_failure(self): - """Check that on failure we raise a setup error.""" - self.blink.auth.region_id = None - with self.assertRaises(BlinkSetupError): - self.blink.setup_urls() - - @mock.patch("blinkpy.api.request_networks") - async def test_setup_networks(self, mock_networks): - """Check setup of networks.""" - mock_networks.return_value = {"summary": "foobar"} - await self.blink.setup_networks() - self.assertEqual(self.blink.networks, "foobar") - - @mock.patch("blinkpy.api.request_networks") - async def test_setup_networks_failure(self, mock_networks): - """Check that on failure we raise a setup error.""" - mock_networks.return_value = {} - with self.assertRaises(BlinkSetupError): - await self.blink.setup_networks() - mock_networks.return_value = None - with self.assertRaises(BlinkSetupError): - await self.blink.setup_networks() - - def test_merge_cameras(self): - """Test merging of cameras.""" - self.blink.sync = { - "foo": MockSync({"test": 123, "foo": "bar"}), - "bar": MockSync({"fizz": "buzz", "bar": "foo"}), - } - combined = self.blink.merge_cameras() - self.assertEqual(combined["test"], 123) - self.assertEqual(combined["foo"], "bar") - self.assertEqual(combined["fizz"], "buzz") - self.assertEqual(combined["bar"], "foo") - - @mock.patch("blinkpy.blinkpy.BlinkOwl.start") - async def test_initialize_blink_minis(self, mock_start): - """Test blink mini initialization.""" - mock_start.return_value = True - self.blink.homescreen = { - "owls": [ - { - "enabled": False, - "id": 1, - "name": "foo", - "network_id": 2, - "onboarded": True, - "status": "online", - "thumbnail": "/foo/bar", - "serial": "1234", - }, - { - "enabled": True, - "id": 3, - "name": "bar", - "network_id": 4, - "onboarded": True, - "status": "online", - "thumbnail": "/foo/bar", - "serial": "abcd", - }, - ] - } - self.blink.sync = {} - await self.blink.setup_owls() - self.assertEqual(self.blink.sync["foo"].__class__, BlinkOwl) - self.assertEqual(self.blink.sync["bar"].__class__, BlinkOwl) - self.assertEqual(self.blink.sync["foo"].arm, False) - self.assertEqual(self.blink.sync["bar"].arm, True) - self.assertEqual(self.blink.sync["foo"].name, "foo") - self.assertEqual(self.blink.sync["bar"].name, "bar") - - async def test_blink_mini_cameras_returned(self): - """Test that blink mini cameras are found if attached to sync module.""" - self.blink.network_ids = ["1234"] - self.blink.homescreen = { - "sync_modules": [{"network_id": 1234}], - "owls": [ - { - "id": 1, - "name": "foo", - "network_id": 1234, - "onboarded": True, - "enabled": True, - "status": "online", - "thumbnail": "/foo/bar", - "serial": "abc123", - } - ] - } - result = await self.blink.setup_owls() - self.assertEqual(self.blink.network_ids, ["1234"]) - self.assertEqual( - result, [{"1234": {"name": "foo", "id": "1234", "type": "mini"}}] - ) - - self.blink.no_owls = True - self.blink.network_ids = [] - await self.blink.get_homescreen() - result = await self.blink.setup_owls() - self.assertEqual(self.blink.network_ids, []) - self.assertEqual(result, []) - - @mock.patch("blinkpy.api.request_camera_usage") - async def test_blink_mini_attached_to_sync(self, mock_usage): - """Test that blink mini cameras are properly attached to sync module.""" - self.blink.network_ids = ["1234"] - self.blink.homescreen = { - "sync_modules": [{"network_id": 1234}], - "owls": [ - { - "id": 1, - "name": "foo", - "network_id": 1234, - "onboarded": True, - "enabled": True, - "status": "online", - "thumbnail": "/foo/bar", - "serial": "abc123", - } - ] - } - mock_usage.return_value = {"networks": [{"cameras": [], "network_id": 1234}]} - result = await self.blink.setup_camera_list() - self.assertEqual( - result, {"1234": [{"name": "foo", "id": "1234", "type": "mini"}]} - ) - - @mock.patch("blinkpy.blinkpy.BlinkLotus.start") - async def test_initialize_blink_doorbells(self, mock_start): - """Test blink doorbell initialization.""" - mock_start.return_value = True - self.blink.homescreen = { - "doorbells": [ - { - "enabled": False, - "id": 1, - "name": "foo", - "network_id": 2, - "onboarded": True, - "status": "online", - "thumbnail": "/foo/bar", - "serial": "1234", - }, - { - "enabled": True, - "id": 3, - "name": "bar", - "network_id": 4, - "onboarded": True, - "status": "online", - "thumbnail": "/foo/bar", - "serial": "abcd", - }, - ] - } - self.blink.sync = {} - await self.blink.setup_lotus() - self.assertEqual(self.blink.sync["foo"].__class__, BlinkLotus) - self.assertEqual(self.blink.sync["bar"].__class__, BlinkLotus) - self.assertEqual(self.blink.sync["foo"].arm, False) - self.assertEqual(self.blink.sync["bar"].arm, True) - self.assertEqual(self.blink.sync["foo"].name, "foo") - self.assertEqual(self.blink.sync["bar"].name, "bar") - - @mock.patch("blinkpy.api.request_camera_usage") - async def test_blink_doorbell_attached_to_sync(self, mock_usage): - """Test that blink doorbell cameras are properly attached to sync module.""" - self.blink.network_ids = ["1234"] - self.blink.homescreen = { - "sync_modules": [{"network_id": 1234}], - "doorbells": [ - { - "id": 1, - "name": "foo", - "network_id": 1234, - "onboarded": True, - "enabled": True, - "status": "online", - "thumbnail": "/foo/bar", - "serial": "abc123", - } - ] - } - mock_usage.return_value = {"networks": [{"cameras": [], "network_id": 1234}]} - result = await self.blink.setup_camera_list() - self.assertEqual( - result, {"1234": [{"name": "foo", "id": "1234", "type": "doorbell"}]} - ) - - @mock.patch("blinkpy.api.request_camera_usage") - async def test_blink_doorbell_with_alt_homescreen_key(self, mock_usage): - """Test that doorbells are discovered from alternate homescreen keys.""" - self.blink.network_ids = ["1234"] - self.blink.homescreen = { - "sync_modules": [{"network_id": 1234}], - "lotus": { - "devices": [ - { - "id": 1, - "name": "foo", - "network_id": 1234, - "onboarded": True, - "enabled": True, - "status": "online", - "thumbnail": "/foo/bar", - "serial": "abc123", - } - ] - } - } - mock_usage.return_value = {"networks": [{"cameras": [], "network_id": 1234}]} - result = await self.blink.setup_camera_list() - self.assertEqual( - result, {"1234": [{"name": "foo", "id": "1234", "type": "doorbell"}]} - ) - - @mock.patch("blinkpy.blinkpy.BlinkLotus.start") - @mock.patch("blinkpy.api.request_camera_usage") - async def test_blink_syncless_doorbell_not_in_camera_usage( - self, mock_usage, mock_lotus_start - ): - """Test that sync-less doorbells initialize even without camera_usage network.""" - mock_lotus_start.return_value = True - self.blink.network_ids = [] - self.blink.homescreen = { - "doorbells": [ - { - "id": 1, - "name": "foo", - "network_id": 1234, - "onboarded": True, - "enabled": True, - "status": "online", - "thumbnail": "/foo/bar", - "serial": "abc123", - } - ] - } - mock_usage.return_value = {"networks": []} - result = await self.blink.setup_camera_list() - self.assertEqual(result, {}) - self.assertEqual(self.blink.sync["foo"].__class__, BlinkLotus) - - @mock.patch("blinkpy.api.request_camera_usage") - async def test_blink_multi_doorbell(self, mock_usage): - """Test that multiple doorbells are properly attached to sync module.""" - self.blink.network_ids = ["1234"] - self.blink.homescreen = { - "sync_modules": [{"network_id": 1234}], - "doorbells": [ - { - "id": 1, - "name": "foo", - "network_id": 1234, - "onboarded": True, - "enabled": True, - "status": "online", - "thumbnail": "/foo/bar", - "serial": "abc123", - }, - { - "id": 2, - "name": "bar", - "network_id": 1234, - "onboarded": True, - "enabled": True, - "status": "online", - "thumbnail": "/bar/foo", - "serial": "zxc456", - }, - ] - } - expected = { - "1234": [ - {"name": "foo", "id": "1234", "type": "doorbell"}, - {"name": "bar", "id": "1234", "type": "doorbell"}, - ] - } - mock_usage.return_value = {"networks": [{"cameras": [], "network_id": 1234}]} - result = await self.blink.setup_camera_list() - self.assertEqual(result, expected) - - @mock.patch("blinkpy.api.request_camera_usage") - async def test_blink_multi_mini(self, mock_usage): - """Test that multiple minis are properly attached to sync module.""" - self.blink.network_ids = ["1234"] - self.blink.homescreen = { - "sync_modules": [{"network_id": 1234}], - "owls": [ - { - "id": 1, - "name": "foo", - "network_id": 1234, - "onboarded": True, - "enabled": True, - "status": "online", - "thumbnail": "/foo/bar", - "serial": "abc123", - }, - { - "id": 2, - "name": "bar", - "network_id": 1234, - "onboarded": True, - "enabled": True, - "status": "online", - "thumbnail": "/bar/foo", - "serial": "zxc456", - }, - ] - } - expected = { - "1234": [ - {"name": "foo", "id": "1234", "type": "mini"}, - {"name": "bar", "id": "1234", "type": "mini"}, - ] - } - mock_usage.return_value = {"networks": [{"cameras": [], "network_id": 1234}]} - result = await self.blink.setup_camera_list() - self.assertEqual(result, expected) - - @mock.patch("blinkpy.api.request_camera_usage") - async def test_blink_camera_mix(self, mock_usage): - """Test that a mix of cameras are properly attached to sync module.""" - self.blink.network_ids = ["1234"] - self.blink.homescreen = { - "sync_modules": [{"network_id": 1234}], - "doorbells": [ - { - "id": 1, - "name": "foo", - "network_id": 1234, - "onboarded": True, - "enabled": True, - "status": "online", - "thumbnail": "/foo/bar", - "serial": "abc123", - }, - { - "id": 2, - "name": "bar", - "network_id": 1234, - "onboarded": True, - "enabled": True, - "status": "online", - "thumbnail": "/bar/foo", - "serial": "zxc456", - }, - ], - "owls": [ - { - "id": 3, - "name": "dead", - "network_id": 1234, - "onboarded": True, - "enabled": True, - "status": "online", - "thumbnail": "/dead/beef", - "serial": "qwerty", - }, - { - "id": 4, - "name": "beef", - "network_id": 1234, - "onboarded": True, - "enabled": True, - "status": "online", - "thumbnail": "/beef/dead", - "serial": "dvorak", - }, - ], - } - expected = { - "1234": [ - {"name": "foo", "id": "1234", "type": "doorbell"}, - {"name": "bar", "id": "1234", "type": "doorbell"}, - {"name": "dead", "id": "1234", "type": "mini"}, - {"name": "beef", "id": "1234", "type": "mini"}, - {"name": "normal", "id": "1234", "type": "default"}, - ] - } - mock_usage.return_value = { - "networks": [ - {"cameras": [{"name": "normal", "id": "1234"}], "network_id": 1234} - ] - } - result = await self.blink.setup_camera_list() - self.assertTrue("1234" in result) - for element in result["1234"]: - self.assertTrue(element in expected["1234"]) - - @mock.patch("blinkpy.blinkpy.Blink.get_homescreen") - @mock.patch("blinkpy.auth.Auth.startup") - @mock.patch("blinkpy.blinkpy.Blink.setup_urls") - @mock.patch("blinkpy.blinkpy.Blink.setup_post_verify") - async def test_blink_start( - self, - mock_urls, - mock_auth_startup, - mock_setup_post_verify, - mock_homescreen, - ): - """Test blink_start function.""" - - self.assertTrue(await self.blink.start()) - - self.blink.auth.no_prompt = True - self.assertTrue(await self.blink.start()) - - mock_homescreen.side_effect = [LoginError, TokenRefreshFailed] - self.assertFalse(await self.blink.start()) - self.assertFalse(await self.blink.start()) - - def test_setup_login_ids(self): - """Test setup_login_ids function.""" - - self.blink.auth.client_id = 1 - self.blink.auth.account_id = 2 - self.assertEqual(self.blink.client_id, 1) - self.assertEqual(self.blink.account_id, 2) - - @mock.patch("blinkpy.blinkpy.util.json_save") - async def test_save(self, mock_util): - """Test save function.""" - await self.blink.save("blah") - self.assertEqual(mock_util.call_count, 1) - - -class MockSync: - """Mock sync module class.""" - - def __init__(self, cameras): - """Initialize fake class.""" - - self.cameras = cameras