Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
221 changes: 180 additions & 41 deletions tapo-cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,22 +96,56 @@ def post(url, data, headers):

# Downloads a file from the Intenetz and decrypts it
def download(url, key_b64, file_path, file_name):
if not os.path.exists(file_path): os.makedirs(file_path)
if not os.path.exists(file_path):
os.makedirs(file_path)

res = requests.get(url)
content = res.content
max_retries = 3
content = None

if key_b64:
key = base64.b64decode(key_b64)
iv = content[:16]
enc_data = content[16:]
cipher = AES.new(key, AES.MODE_CBC, iv)
dec_content = unpad(cipher.decrypt(enc_data), AES.block_size)
else:
dec_content = content
for attempt in range(1, max_retries + 1):
try:
# Add a timeout just so it doesn't hang forever
res = requests.get(url, timeout=120)
res.raise_for_status()
content = res.content
break
except (requests.exceptions.ChunkedEncodingError,
requests.exceptions.ConnectionError) as e:
print(f" Download error for {file_name} (attempt {attempt}/{max_retries}): {e}")
if attempt == max_retries:
print(" Giving up on this file, moving on.")
return False
time.sleep(2 * attempt) # small backoff and retry
except Exception as e:
print(f" Unexpected error for {file_name}: {e}")
return False

if content is None:
# Shouldn't happen, but just in case
print(f" No content for {file_name}, skipping.")
return False

with open(os.path.join(file_path, file_name), 'wb') as file:
file.write(dec_content)
try:
if key_b64:
key = base64.b64decode(key_b64)
iv = content[:16]
enc_data = content[16:]
cipher = AES.new(key, AES.MODE_CBC, iv)
dec_content = unpad(cipher.decrypt(enc_data), AES.block_size)
else:
dec_content = content
except Exception as e:
print(f" Decryption error for {file_name}: {e}")
return False

try:
with open(os.path.join(file_path, file_name), 'wb') as file:
file.write(dec_content)
except Exception as e:
print(f" File write error for {file_name}: {e}")
return False

return True

def probe_endpoint_get(params, endpoint):
token, null, null, app_server_url_get = get_config()
Expand Down Expand Up @@ -140,7 +174,7 @@ def login(username, password):
"""Authenticates a user towards the TP-Link Tapo Cloud."""
terminal_uuid = str(uuid.uuid1()).replace('-','').upper()

url = 'https://n-wap-gw.tplinkcloud.com/api/v2/account/login'
url = 'https://n-euw1-wap-gw.tplinkcloud.com/api/v2/account/login'
content = {"appType":"TP-Link_Tapo_Android","appVersion":"2.12.705","cloudPassword":password,"cloudUserName":username,"platform":"Android 12","refreshTokenNeeded":False,"terminalMeta":"1","terminalName":"Tapo CLI","terminalUUID":terminal_uuid}
content = json.dumps(content)
res = post(url, content, headers_post(content, '/api/v2/account/login'))
Expand All @@ -152,7 +186,7 @@ def login(username, password):
# Login but with extra steps
if 'MFAProcessId' in config:
mfa_process_id = res['result']['MFAProcessId']
url = 'https://n-wap-gw.tplinkcloud.com/api/v2/account/getPushVC4TerminalMFA'
url = 'https://n-euw1-wap-gw.tplinkcloud.com/api/v2/account/getPushVC4TerminalMFA'
content = {"appType":"TP-Link_Tapo_Android","cloudPassword":password,"cloudUserName":username,"terminalUUID":terminal_uuid}
content = json.dumps(content)
res = post(url, content, headers_post(content, '/api/v2/account/getPushVC4TerminalMFA'))
Expand All @@ -162,7 +196,7 @@ def login(username, password):
print('Check your Tapo App for the MFA code!')
mfa_code = str(input('MFA Code (no spaces or dashes): '))

url = 'https://n-wap-gw.tplinkcloud.com/api/v2/account/checkMFACodeAndLogin'
url = 'https://n-euw1-wap-gw.tplinkcloud.com/api/v2/account/checkMFACodeAndLogin'
content = {"appType":"TP-Link_Tapo_Android","cloudUserName":username,"code":mfa_code,"MFAProcessId":mfa_process_id,"MFAType":1,"terminalBindEnabled":True}
content = json.dumps(content)
res = post(url, content, headers_post(content, '/api/v2/account/checkMFACodeAndLogin'))
Expand Down Expand Up @@ -284,19 +318,49 @@ def list_videos(days):

end_unixtime = time.time() + 86400
start_unixtime = end_unixtime - (days + 1) * 86400
end_time = datetime.datetime.utcfromtimestamp(end_unixtime).strftime('%Y-%m-%d 00:00:00')
start_time = datetime.datetime.utcfromtimestamp(start_unixtime).strftime('%Y-%m-%d 00:00:00')
end_time = datetime.datetime.fromtimestamp(end_unixtime, datetime.timezone.utc).strftime('%Y-%m-%d 00:00:00')
start_time = datetime.datetime.fromtimestamp(start_unixtime, datetime.timezone.utc).strftime('%Y-%m-%d 00:00:00')

endpoint = '/v2/videos/list'
page_size = 1000

for dev in devs['deviceList']:
params = 'deviceId=' + dev['deviceId'] + '&page=0&pageSize=3000&order=desc&startTime=' + start_time + '&endTime=' + end_time
videos = probe_endpoint_get(params, endpoint)
print('\nFound ' + str(videos['total']) + ' videos for ' + dev['alias'] + ':')
if 'index' in videos:
for video in videos['index']:
print(video['eventLocalTime'], end = ", ")
print(f"\nListing videos for {dev['alias']}:")

page = 0

while True:
params = (
'deviceId=' + dev['deviceId'] +
'&page=' + str(page) +
'&pageSize=' + str(page_size) +
'&order=desc&startTime=' + start_time +
'&endTime=' + end_time
)

videos = probe_endpoint_get(params, endpoint)

# If the API itself errors, stop for this device
if isinstance(videos, dict) and 'error_code' in videos and videos['error_code'] != 0:
print(f" Error from API: {videos}")
break

entries = videos.get('index', [])
if not entries:
# No more videos on this page -> we are done
break

# Print timestamps for this page
for video in entries:
print(video['eventLocalTime'], end=", ")
#print(video['video'][0]['uri']) # This will print URLs to the videos if you want to download them using another tool, but don't forget to get the AES key from video['video'][0]['decryptionInfo']['key']
if videos['total'] > 0: print('')

# If we got fewer than page_size entries, that was the last page
if len(entries) < page_size:
print("") # newline after the comma-separated list
break

page += 1

@click.command()
@click.option('--days', default=1, prompt="Last X days", help='Last X days which you want to download videos for.')
Expand All @@ -315,39 +379,114 @@ def download_videos(days, path, overwrite):

end_unixtime = time.time() + 86400
start_unixtime = end_unixtime - (days + 1) * 86400
end_time = datetime.datetime.utcfromtimestamp(end_unixtime).strftime('%Y-%m-%d 00:00:00')
start_time = datetime.datetime.utcfromtimestamp(start_unixtime).strftime('%Y-%m-%d 00:00:00')
end_time = datetime.datetime.fromtimestamp(end_unixtime, datetime.timezone.utc).strftime('%Y-%m-%d 00:00:00')
start_time = datetime.datetime.fromtimestamp(start_unixtime, datetime.timezone.utc).strftime('%Y-%m-%d 00:00:00')

result = []
endpoint = '/v2/videos/list'
page_size = 1000

# stats per camera
stats = {}

for dev in devs['deviceList']:
params = 'deviceId=' + dev['deviceId'] + '&page=0&pageSize=3000&order=desc&startTime=' + start_time + '&endTime=' + end_time
videos = probe_endpoint_get(params, endpoint)
print('\nFound ' + str(videos['total']) + ' videos for ' + dev['alias'] + ':')
if 'index' in videos:
for video in videos['index']:
alias = dev['alias']
stats[alias] = {'existing': 0, 'success': 0, 'failed': 0}

print(f"\nDownloading videos for {alias}:")

page = 0

while True:
params = (
'deviceId=' + dev['deviceId'] +
'&page=' + str(page) +
'&pageSize=' + str(page_size) +
'&order=desc&startTime=' + start_time +
'&endTime=' + end_time
)

videos = probe_endpoint_get(params, endpoint)

# If the API itself errors, stop for this device
if isinstance(videos, dict) and 'error_code' in videos and videos['error_code'] != 0:
print(f" Error from API: {videos}")
break

entries = videos.get('index', [])

if not entries:
# No more videos on this page -> we are done
break

for video in entries:
url = video['video'][0]['uri']
key_b64 = False

# Check if the video is encrypted and get the key
if 'encryptionMethod' in video['video'][0]:
method = video['video'][0]['encryptionMethod']
if method != "AES-128-CBC":
print(f"Unsupported encryption method: {method}. Quitting...")
print("Create an issue here: https://github.com/dimme/tapo-cli/issues")
print(f" Unsupported encryption method: {method}. Quitting...")
print(" Create an issue here: https://github.com/dimme/tapo-cli/issues")
exit(1)

key_b64 = video['video'][0]['decryptionInfo']['key']

file_path = path + dev['alias'] + '/' + datetime.datetime.strptime(video['eventLocalTime'], '%Y-%m-%d %H:%M:%S').strftime('%Y-%m-%d') + '/'
file_path = (
path
+ alias + '/'
+ datetime.datetime.strptime(
video['eventLocalTime'],
'%Y-%m-%d %H:%M:%S'
).strftime('%Y-%m-%d')
+ '/'
)
file_name = video['eventLocalTime'].replace(':','-') + '.mp4'
if os.path.exists(file_path + file_name) and overwrite == 0:
print('Already exists ' + file_path + file_name)
result.append({'file': file_path + file_name, 'device': dev['alias'], 'new_video': False, 'video': video})

full_name = file_path + file_name

if os.path.exists(full_name) and overwrite == 0:
print(' Already exists ' + full_name)
stats[alias]['existing'] += 1
result.append({
'file': full_name,
'device': alias,
'new_video': False,
'video': video
})
else:
print('Downloading to ' + file_path + file_name)
download(url, key_b64, file_path, file_name)
result.append({'file': file_path + file_name, 'device': dev['alias'], 'new_video': True, 'video': video})
print(' Downloading to ' + full_name)
ok = download(url, key_b64, file_path, file_name)
if ok:
stats[alias]['success'] += 1
result.append({
'file': full_name,
'device': alias,
'new_video': True,
'video': video
})
else:
stats[alias]['failed'] += 1

# If we got fewer than page_size entries, that was the last page
if len(entries) < page_size:
print("") # newline after the comma-separated list
break

page += 1

# print summary
print("\nDownload summary per camera:")

for alias, s in stats.items():
print(
f" {alias}: "
f"{s['success']} downloaded, "
f"{s['existing']} skipped (already existed), "
f"{s['failed']} failed"
)

return result

tapo.add_command(login, 'login')
Expand Down