From 1c9074e774cf486f412898854198c84bb93f3023 Mon Sep 17 00:00:00 2001 From: Joris Limousin <1956842+JorisLimousin@users.noreply.github.com> Date: Sat, 15 Nov 2025 20:40:34 +0000 Subject: [PATCH 1/4] Fixing errors and cleaning up a bit --- requirements.txt | 1 + tapo-cli.py | 223 ++++++++++++++++++++++++++++++++++++++--------- 2 files changed, 185 insertions(+), 39 deletions(-) diff --git a/requirements.txt b/requirements.txt index 227e98f..aa2adc9 100644 --- a/requirements.txt +++ b/requirements.txt @@ -3,3 +3,4 @@ requests urllib3 datetime pycryptodome +Crypto diff --git a/tapo-cli.py b/tapo-cli.py index 5394263..0b88d36 100755 --- a/tapo-cli.py +++ b/tapo-cli.py @@ -96,22 +96,56 @@ def post(url, data, headers): # Downloads a file from the Intenetz and decrypts it def download(url, key_b64, file_path, file_name): - if not os.path.exists(file_path): os.makedirs(file_path) + if not os.path.exists(file_path): + os.makedirs(file_path) - res = requests.get(url) - content = res.content + max_retries = 3 + content = None - if key_b64: - key = base64.b64decode(key_b64) - iv = content[:16] - enc_data = content[16:] - cipher = AES.new(key, AES.MODE_CBC, iv) - dec_content = unpad(cipher.decrypt(enc_data), AES.block_size) - else: - dec_content = content + for attempt in range(1, max_retries + 1): + try: + # Add a timeout just so it doesn't hang forever + res = requests.get(url, timeout=120) + res.raise_for_status() + content = res.content + break + except (requests.exceptions.ChunkedEncodingError, + requests.exceptions.ConnectionError) as e: + print(f" Download error for {file_name} (attempt {attempt}/{max_retries}): {e}") + if attempt == max_retries: + print(" Giving up on this file, moving on.") + return False + time.sleep(2 * attempt) # small backoff and retry + except Exception as e: + print(f" Unexpected error for {file_name}: {e}") + return False + + if content is None: + # Shouldn't happen, but just in case + print(f" No content for {file_name}, skipping.") + return False + + try: + if key_b64: + key = base64.b64decode(key_b64) + iv = content[:16] + enc_data = content[16:] + cipher = AES.new(key, AES.MODE_CBC, iv) + dec_content = unpad(cipher.decrypt(enc_data), AES.block_size) + else: + dec_content = content + except Exception as e: + print(f" Decryption error for {file_name}: {e}") + return False - with open(os.path.join(file_path, file_name), 'wb') as file: - file.write(dec_content) + try: + with open(os.path.join(file_path, file_name), 'wb') as file: + file.write(dec_content) + except Exception as e: + print(f" File write error for {file_name}: {e}") + return False + + return True def probe_endpoint_get(params, endpoint): token, null, null, app_server_url_get = get_config() @@ -284,19 +318,54 @@ def list_videos(days): end_unixtime = time.time() + 86400 start_unixtime = end_unixtime - (days + 1) * 86400 - end_time = datetime.datetime.utcfromtimestamp(end_unixtime).strftime('%Y-%m-%d 00:00:00') - start_time = datetime.datetime.utcfromtimestamp(start_unixtime).strftime('%Y-%m-%d 00:00:00') + end_time = datetime.datetime.fromtimestamp(end_unixtime, datetime.timezone.utc).strftime('%Y-%m-%d 00:00:00') + start_time = datetime.datetime.fromtimestamp(start_unixtime, datetime.timezone.utc).strftime('%Y-%m-%d 00:00:00') endpoint = '/v2/videos/list' + page_size = 1000 # was 3000 + for dev in devs['deviceList']: - params = 'deviceId=' + dev['deviceId'] + '&page=0&pageSize=3000&order=desc&startTime=' + start_time + '&endTime=' + end_time - videos = probe_endpoint_get(params, endpoint) - print('\nFound ' + str(videos['total']) + ' videos for ' + dev['alias'] + ':') - if 'index' in videos: - for video in videos['index']: - print(video['eventLocalTime'], end = ", ") - #print(video['video'][0]['uri']) # This will print URLs to the videos if you want to download them using another tool, but don't forget to get the AES key from video['video'][0]['decryptionInfo']['key'] - if videos['total'] > 0: print('') + print(f"\nListing videos for {dev['alias']}:") + + page = 0 + total_printed = 0 + + while True: + print("page: " + str(page)) + + params = ( + 'deviceId=' + dev['deviceId'] + + '&page=' + str(page) + + '&pageSize=' + str(page_size) + + '&order=desc&startTime=' + start_time + + '&endTime=' + end_time + ) + + videos = probe_endpoint_get(params, endpoint) + + # If the API itself errors, stop for this device + if isinstance(videos, dict) and 'error_code' in videos and videos['error_code'] != 0: + print(f" Error from API: {videos}") + break + + entries = videos.get('index', []) + if not entries: + # No more videos on this page -> we are done + if total_printed == 0: + print(" No videos found.") + break + + # Print timestamps for this page + for video in entries: + # print(video['eventLocalTime'], end=", ") + total_printed += 1 + print(len(entries)) + # If we got fewer than page_size entries, that was the last page + if len(entries) < page_size: + print("") # newline after the comma-separated list + break + + page += 1 @click.command() @click.option('--days', default=1, prompt="Last X days", help='Last X days which you want to download videos for.') @@ -315,17 +384,48 @@ def download_videos(days, path, overwrite): end_unixtime = time.time() + 86400 start_unixtime = end_unixtime - (days + 1) * 86400 - end_time = datetime.datetime.utcfromtimestamp(end_unixtime).strftime('%Y-%m-%d 00:00:00') - start_time = datetime.datetime.utcfromtimestamp(start_unixtime).strftime('%Y-%m-%d 00:00:00') + end_time = datetime.datetime.fromtimestamp(end_unixtime, datetime.timezone.utc).strftime('%Y-%m-%d 00:00:00') + start_time = datetime.datetime.fromtimestamp(start_unixtime, datetime.timezone.utc).strftime('%Y-%m-%d 00:00:00') result = [] endpoint = '/v2/videos/list' + page_size = 1000 # was 3000 + + # stats per camera + stats = {} + for dev in devs['deviceList']: - params = 'deviceId=' + dev['deviceId'] + '&page=0&pageSize=3000&order=desc&startTime=' + start_time + '&endTime=' + end_time - videos = probe_endpoint_get(params, endpoint) - print('\nFound ' + str(videos['total']) + ' videos for ' + dev['alias'] + ':') - if 'index' in videos: - for video in videos['index']: + alias = dev['alias'] + stats[alias] = {'existing': 0, 'success': 0, 'failed': 0} + + print(f"\nDownloading videos for {alias}:") + + page = 0 + + while True: + params = ( + 'deviceId=' + dev['deviceId'] + + '&page=' + str(page) + + '&pageSize=' + str(page_size) + + '&order=desc&startTime=' + start_time + + '&endTime=' + end_time + ) + + videos = probe_endpoint_get(params, endpoint) + + # If the API itself errors, stop for this device + if isinstance(videos, dict) and 'error_code' in videos and videos['error_code'] != 0: + print(f" Error from API: {videos}") + break + + entries = videos.get('index', []) + + if not entries: + # No more videos on this page -> we are done + print(" No videos found.") + break + + for video in entries: url = video['video'][0]['uri'] key_b64 = False @@ -333,21 +433,66 @@ def download_videos(days, path, overwrite): if 'encryptionMethod' in video['video'][0]: method = video['video'][0]['encryptionMethod'] if method != "AES-128-CBC": - print(f"Unsupported encryption method: {method}. Quitting...") - print("Create an issue here: https://github.com/dimme/tapo-cli/issues") + print(f" Unsupported encryption method: {method}. Quitting...") + print(" Create an issue here: https://github.com/dimme/tapo-cli/issues") exit(1) key_b64 = video['video'][0]['decryptionInfo']['key'] - file_path = path + dev['alias'] + '/' + datetime.datetime.strptime(video['eventLocalTime'], '%Y-%m-%d %H:%M:%S').strftime('%Y-%m-%d') + '/' + file_path = ( + path + + alias + '/' + + datetime.datetime.strptime( + video['eventLocalTime'], + '%Y-%m-%d %H:%M:%S' + ).strftime('%Y-%m-%d') + + '/' + ) file_name = video['eventLocalTime'].replace(':','-') + '.mp4' - if os.path.exists(file_path + file_name) and overwrite == 0: - print('Already exists ' + file_path + file_name) - result.append({'file': file_path + file_name, 'device': dev['alias'], 'new_video': False, 'video': video}) + + full_name = file_path + file_name + + if os.path.exists(full_name) and overwrite == 0: + print(' Already exists ' + full_name) + stats[alias]['existing'] += 1 + result.append({ + 'file': full_name, + 'device': alias, + 'new_video': False, + 'video': video + }) else: - print('Downloading to ' + file_path + file_name) - download(url, key_b64, file_path, file_name) - result.append({'file': file_path + file_name, 'device': dev['alias'], 'new_video': True, 'video': video}) + print(' Downloading to ' + full_name) + ok = download(url, key_b64, file_path, file_name) + if ok: + stats[alias]['success'] += 1 + result.append({ + 'file': full_name, + 'device': alias, + 'new_video': True, + 'video': video + }) + else: + stats[alias]['failed'] += 1 + + # If we got fewer than page_size entries, that was the last page + if len(entries) < page_size: + print("") # newline after the comma-separated list + break + + page += 1 + + # print summary + print("\nDownload summary per camera:") + + for alias, s in stats.items(): + print( + f" {alias}: " + f"{s['success']} downloaded, " + f"{s['existing']} skipped (already existed), " + f"{s['failed']} failed" + ) + return result tapo.add_command(login, 'login') From b9c2ad35ae910df2fa935de7b1f26f4ab172b0c5 Mon Sep 17 00:00:00 2001 From: Joris Limousin <1956842+JorisLimousin@users.noreply.github.com> Date: Sat, 15 Nov 2025 20:39:03 +0000 Subject: [PATCH 2/4] Tidying things up --- tapo-cli.py | 16 +++++----------- 1 file changed, 5 insertions(+), 11 deletions(-) diff --git a/tapo-cli.py b/tapo-cli.py index 0b88d36..5340108 100755 --- a/tapo-cli.py +++ b/tapo-cli.py @@ -322,17 +322,14 @@ def list_videos(days): start_time = datetime.datetime.fromtimestamp(start_unixtime, datetime.timezone.utc).strftime('%Y-%m-%d 00:00:00') endpoint = '/v2/videos/list' - page_size = 1000 # was 3000 + page_size = 1000 for dev in devs['deviceList']: print(f"\nListing videos for {dev['alias']}:") page = 0 - total_printed = 0 while True: - print("page: " + str(page)) - params = ( 'deviceId=' + dev['deviceId'] + '&page=' + str(page) + @@ -351,15 +348,13 @@ def list_videos(days): entries = videos.get('index', []) if not entries: # No more videos on this page -> we are done - if total_printed == 0: - print(" No videos found.") break # Print timestamps for this page for video in entries: - # print(video['eventLocalTime'], end=", ") - total_printed += 1 - print(len(entries)) + print(video['eventLocalTime'], end=", ") + #print(video['video'][0]['uri']) # This will print URLs to the videos if you want to download them using another tool, but don't forget to get the AES key from video['video'][0]['decryptionInfo']['key'] + # If we got fewer than page_size entries, that was the last page if len(entries) < page_size: print("") # newline after the comma-separated list @@ -389,7 +384,7 @@ def download_videos(days, path, overwrite): result = [] endpoint = '/v2/videos/list' - page_size = 1000 # was 3000 + page_size = 1000 # stats per camera stats = {} @@ -422,7 +417,6 @@ def download_videos(days, path, overwrite): if not entries: # No more videos on this page -> we are done - print(" No videos found.") break for video in entries: From 25beab5f8d07c6bd55d19e7c9640147cf6afaa0a Mon Sep 17 00:00:00 2001 From: Joris Limousin <1956842+JorisLimousin@users.noreply.github.com> Date: Sat, 15 Nov 2025 20:45:49 +0000 Subject: [PATCH 3/4] Changing requirements back to what they were --- requirements.txt | 1 - 1 file changed, 1 deletion(-) diff --git a/requirements.txt b/requirements.txt index aa2adc9..227e98f 100644 --- a/requirements.txt +++ b/requirements.txt @@ -3,4 +3,3 @@ requests urllib3 datetime pycryptodome -Crypto From 64183297b9b22075100a8162550a6ce4835bda59 Mon Sep 17 00:00:00 2001 From: Joris Limousin <1956842+JorisLimousin@users.noreply.github.com> Date: Thu, 5 Feb 2026 13:44:55 +0400 Subject: [PATCH 4/4] Replacing old broken URLs with the new functional ones --- tapo-cli.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/tapo-cli.py b/tapo-cli.py index 5394263..44e0112 100755 --- a/tapo-cli.py +++ b/tapo-cli.py @@ -140,7 +140,7 @@ def login(username, password): """Authenticates a user towards the TP-Link Tapo Cloud.""" terminal_uuid = str(uuid.uuid1()).replace('-','').upper() - url = 'https://n-wap-gw.tplinkcloud.com/api/v2/account/login' + url = 'https://n-euw1-wap-gw.tplinkcloud.com/api/v2/account/login' content = {"appType":"TP-Link_Tapo_Android","appVersion":"2.12.705","cloudPassword":password,"cloudUserName":username,"platform":"Android 12","refreshTokenNeeded":False,"terminalMeta":"1","terminalName":"Tapo CLI","terminalUUID":terminal_uuid} content = json.dumps(content) res = post(url, content, headers_post(content, '/api/v2/account/login')) @@ -152,7 +152,7 @@ def login(username, password): # Login but with extra steps if 'MFAProcessId' in config: mfa_process_id = res['result']['MFAProcessId'] - url = 'https://n-wap-gw.tplinkcloud.com/api/v2/account/getPushVC4TerminalMFA' + url = 'https://n-euw1-wap-gw.tplinkcloud.com/api/v2/account/getPushVC4TerminalMFA' content = {"appType":"TP-Link_Tapo_Android","cloudPassword":password,"cloudUserName":username,"terminalUUID":terminal_uuid} content = json.dumps(content) res = post(url, content, headers_post(content, '/api/v2/account/getPushVC4TerminalMFA')) @@ -162,7 +162,7 @@ def login(username, password): print('Check your Tapo App for the MFA code!') mfa_code = str(input('MFA Code (no spaces or dashes): ')) - url = 'https://n-wap-gw.tplinkcloud.com/api/v2/account/checkMFACodeAndLogin' + url = 'https://n-euw1-wap-gw.tplinkcloud.com/api/v2/account/checkMFACodeAndLogin' content = {"appType":"TP-Link_Tapo_Android","cloudUserName":username,"code":mfa_code,"MFAProcessId":mfa_process_id,"MFAType":1,"terminalBindEnabled":True} content = json.dumps(content) res = post(url, content, headers_post(content, '/api/v2/account/checkMFACodeAndLogin'))