diff --git a/tapo-cli.py b/tapo-cli.py index 5394263..43a4162 100755 --- a/tapo-cli.py +++ b/tapo-cli.py @@ -96,22 +96,56 @@ def post(url, data, headers): # Downloads a file from the Intenetz and decrypts it def download(url, key_b64, file_path, file_name): - if not os.path.exists(file_path): os.makedirs(file_path) + if not os.path.exists(file_path): + os.makedirs(file_path) - res = requests.get(url) - content = res.content + max_retries = 3 + content = None - if key_b64: - key = base64.b64decode(key_b64) - iv = content[:16] - enc_data = content[16:] - cipher = AES.new(key, AES.MODE_CBC, iv) - dec_content = unpad(cipher.decrypt(enc_data), AES.block_size) - else: - dec_content = content + for attempt in range(1, max_retries + 1): + try: + # Add a timeout just so it doesn't hang forever + res = requests.get(url, timeout=120) + res.raise_for_status() + content = res.content + break + except (requests.exceptions.ChunkedEncodingError, + requests.exceptions.ConnectionError) as e: + print(f" Download error for {file_name} (attempt {attempt}/{max_retries}): {e}") + if attempt == max_retries: + print(" Giving up on this file, moving on.") + return False + time.sleep(2 * attempt) # small backoff and retry + except Exception as e: + print(f" Unexpected error for {file_name}: {e}") + return False + + if content is None: + # Shouldn't happen, but just in case + print(f" No content for {file_name}, skipping.") + return False - with open(os.path.join(file_path, file_name), 'wb') as file: - file.write(dec_content) + try: + if key_b64: + key = base64.b64decode(key_b64) + iv = content[:16] + enc_data = content[16:] + cipher = AES.new(key, AES.MODE_CBC, iv) + dec_content = unpad(cipher.decrypt(enc_data), AES.block_size) + else: + dec_content = content + except Exception as e: + print(f" Decryption error for {file_name}: {e}") + return False + + try: + with open(os.path.join(file_path, file_name), 'wb') as file: + file.write(dec_content) + except Exception as e: + print(f" File write error for {file_name}: {e}") + return False + + return True def probe_endpoint_get(params, endpoint): token, null, null, app_server_url_get = get_config() @@ -140,7 +174,7 @@ def login(username, password): """Authenticates a user towards the TP-Link Tapo Cloud.""" terminal_uuid = str(uuid.uuid1()).replace('-','').upper() - url = 'https://n-wap-gw.tplinkcloud.com/api/v2/account/login' + url = 'https://n-euw1-wap-gw.tplinkcloud.com/api/v2/account/login' content = {"appType":"TP-Link_Tapo_Android","appVersion":"2.12.705","cloudPassword":password,"cloudUserName":username,"platform":"Android 12","refreshTokenNeeded":False,"terminalMeta":"1","terminalName":"Tapo CLI","terminalUUID":terminal_uuid} content = json.dumps(content) res = post(url, content, headers_post(content, '/api/v2/account/login')) @@ -152,7 +186,7 @@ def login(username, password): # Login but with extra steps if 'MFAProcessId' in config: mfa_process_id = res['result']['MFAProcessId'] - url = 'https://n-wap-gw.tplinkcloud.com/api/v2/account/getPushVC4TerminalMFA' + url = 'https://n-euw1-wap-gw.tplinkcloud.com/api/v2/account/getPushVC4TerminalMFA' content = {"appType":"TP-Link_Tapo_Android","cloudPassword":password,"cloudUserName":username,"terminalUUID":terminal_uuid} content = json.dumps(content) res = post(url, content, headers_post(content, '/api/v2/account/getPushVC4TerminalMFA')) @@ -162,7 +196,7 @@ def login(username, password): print('Check your Tapo App for the MFA code!') mfa_code = str(input('MFA Code (no spaces or dashes): ')) - url = 'https://n-wap-gw.tplinkcloud.com/api/v2/account/checkMFACodeAndLogin' + url = 'https://n-euw1-wap-gw.tplinkcloud.com/api/v2/account/checkMFACodeAndLogin' content = {"appType":"TP-Link_Tapo_Android","cloudUserName":username,"code":mfa_code,"MFAProcessId":mfa_process_id,"MFAType":1,"terminalBindEnabled":True} content = json.dumps(content) res = post(url, content, headers_post(content, '/api/v2/account/checkMFACodeAndLogin')) @@ -284,19 +318,49 @@ def list_videos(days): end_unixtime = time.time() + 86400 start_unixtime = end_unixtime - (days + 1) * 86400 - end_time = datetime.datetime.utcfromtimestamp(end_unixtime).strftime('%Y-%m-%d 00:00:00') - start_time = datetime.datetime.utcfromtimestamp(start_unixtime).strftime('%Y-%m-%d 00:00:00') + end_time = datetime.datetime.fromtimestamp(end_unixtime, datetime.timezone.utc).strftime('%Y-%m-%d 00:00:00') + start_time = datetime.datetime.fromtimestamp(start_unixtime, datetime.timezone.utc).strftime('%Y-%m-%d 00:00:00') endpoint = '/v2/videos/list' + page_size = 1000 + for dev in devs['deviceList']: - params = 'deviceId=' + dev['deviceId'] + '&page=0&pageSize=3000&order=desc&startTime=' + start_time + '&endTime=' + end_time - videos = probe_endpoint_get(params, endpoint) - print('\nFound ' + str(videos['total']) + ' videos for ' + dev['alias'] + ':') - if 'index' in videos: - for video in videos['index']: - print(video['eventLocalTime'], end = ", ") + print(f"\nListing videos for {dev['alias']}:") + + page = 0 + + while True: + params = ( + 'deviceId=' + dev['deviceId'] + + '&page=' + str(page) + + '&pageSize=' + str(page_size) + + '&order=desc&startTime=' + start_time + + '&endTime=' + end_time + ) + + videos = probe_endpoint_get(params, endpoint) + + # If the API itself errors, stop for this device + if isinstance(videos, dict) and 'error_code' in videos and videos['error_code'] != 0: + print(f" Error from API: {videos}") + break + + entries = videos.get('index', []) + if not entries: + # No more videos on this page -> we are done + break + + # Print timestamps for this page + for video in entries: + print(video['eventLocalTime'], end=", ") #print(video['video'][0]['uri']) # This will print URLs to the videos if you want to download them using another tool, but don't forget to get the AES key from video['video'][0]['decryptionInfo']['key'] - if videos['total'] > 0: print('') + + # If we got fewer than page_size entries, that was the last page + if len(entries) < page_size: + print("") # newline after the comma-separated list + break + + page += 1 @click.command() @click.option('--days', default=1, prompt="Last X days", help='Last X days which you want to download videos for.') @@ -315,17 +379,47 @@ def download_videos(days, path, overwrite): end_unixtime = time.time() + 86400 start_unixtime = end_unixtime - (days + 1) * 86400 - end_time = datetime.datetime.utcfromtimestamp(end_unixtime).strftime('%Y-%m-%d 00:00:00') - start_time = datetime.datetime.utcfromtimestamp(start_unixtime).strftime('%Y-%m-%d 00:00:00') + end_time = datetime.datetime.fromtimestamp(end_unixtime, datetime.timezone.utc).strftime('%Y-%m-%d 00:00:00') + start_time = datetime.datetime.fromtimestamp(start_unixtime, datetime.timezone.utc).strftime('%Y-%m-%d 00:00:00') result = [] endpoint = '/v2/videos/list' + page_size = 1000 + + # stats per camera + stats = {} + for dev in devs['deviceList']: - params = 'deviceId=' + dev['deviceId'] + '&page=0&pageSize=3000&order=desc&startTime=' + start_time + '&endTime=' + end_time - videos = probe_endpoint_get(params, endpoint) - print('\nFound ' + str(videos['total']) + ' videos for ' + dev['alias'] + ':') - if 'index' in videos: - for video in videos['index']: + alias = dev['alias'] + stats[alias] = {'existing': 0, 'success': 0, 'failed': 0} + + print(f"\nDownloading videos for {alias}:") + + page = 0 + + while True: + params = ( + 'deviceId=' + dev['deviceId'] + + '&page=' + str(page) + + '&pageSize=' + str(page_size) + + '&order=desc&startTime=' + start_time + + '&endTime=' + end_time + ) + + videos = probe_endpoint_get(params, endpoint) + + # If the API itself errors, stop for this device + if isinstance(videos, dict) and 'error_code' in videos and videos['error_code'] != 0: + print(f" Error from API: {videos}") + break + + entries = videos.get('index', []) + + if not entries: + # No more videos on this page -> we are done + break + + for video in entries: url = video['video'][0]['uri'] key_b64 = False @@ -333,21 +427,66 @@ def download_videos(days, path, overwrite): if 'encryptionMethod' in video['video'][0]: method = video['video'][0]['encryptionMethod'] if method != "AES-128-CBC": - print(f"Unsupported encryption method: {method}. Quitting...") - print("Create an issue here: https://github.com/dimme/tapo-cli/issues") + print(f" Unsupported encryption method: {method}. Quitting...") + print(" Create an issue here: https://github.com/dimme/tapo-cli/issues") exit(1) key_b64 = video['video'][0]['decryptionInfo']['key'] - file_path = path + dev['alias'] + '/' + datetime.datetime.strptime(video['eventLocalTime'], '%Y-%m-%d %H:%M:%S').strftime('%Y-%m-%d') + '/' + file_path = ( + path + + alias + '/' + + datetime.datetime.strptime( + video['eventLocalTime'], + '%Y-%m-%d %H:%M:%S' + ).strftime('%Y-%m-%d') + + '/' + ) file_name = video['eventLocalTime'].replace(':','-') + '.mp4' - if os.path.exists(file_path + file_name) and overwrite == 0: - print('Already exists ' + file_path + file_name) - result.append({'file': file_path + file_name, 'device': dev['alias'], 'new_video': False, 'video': video}) + + full_name = file_path + file_name + + if os.path.exists(full_name) and overwrite == 0: + print(' Already exists ' + full_name) + stats[alias]['existing'] += 1 + result.append({ + 'file': full_name, + 'device': alias, + 'new_video': False, + 'video': video + }) else: - print('Downloading to ' + file_path + file_name) - download(url, key_b64, file_path, file_name) - result.append({'file': file_path + file_name, 'device': dev['alias'], 'new_video': True, 'video': video}) + print(' Downloading to ' + full_name) + ok = download(url, key_b64, file_path, file_name) + if ok: + stats[alias]['success'] += 1 + result.append({ + 'file': full_name, + 'device': alias, + 'new_video': True, + 'video': video + }) + else: + stats[alias]['failed'] += 1 + + # If we got fewer than page_size entries, that was the last page + if len(entries) < page_size: + print("") # newline after the comma-separated list + break + + page += 1 + + # print summary + print("\nDownload summary per camera:") + + for alias, s in stats.items(): + print( + f" {alias}: " + f"{s['success']} downloaded, " + f"{s['existing']} skipped (already existed), " + f"{s['failed']} failed" + ) + return result tapo.add_command(login, 'login')