Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -44,5 +44,5 @@ command to refresh the credentials (stored on AWS) without scheduling location f
## Local Debug

- After you have executed `python manage.py refresh-credentials` you have one minute before the credentials expire
- Run `python manage.py fetch-locations --trackers E0D4FA128FA9,EC3987ECAA50,CDAA0CCF4128,EDDC7DA1A247,D173D540749D --limit 1000 --hours-ago 48`
- Run `python manage.py fetch-locations --trackers E0D4FA128FA9,EC3987ECAA50,CDAA0CCF4128,EDDC7DA1A247,D173D540749D --limit 1000 --minutes_ago 15`
to fetch the locations of specific trackers
256 changes: 227 additions & 29 deletions app/apple_fetch.py
Original file line number Diff line number Diff line change
@@ -1,18 +1,40 @@
"""
Fetch from Apple's acsnservice
"""
import asyncio
import datetime
import json
import logging
from requests import Session
import time
from collections import deque

from requests import Session
from app.credentials.base import CredentialsService
from app.exceptions import AppleAuthCredentialsExpired
from app.helpers import status_code_success
from app.date import unix_epoch, date_milliseconds
from pydantic import BaseModel, Field

requestSession = Session()
from app.settings import settings
from typing import TypedDict

import aiohttp

logger = logging.getLogger(__name__)


class CredentialsExpired(Exception):
pass


class AppleHTTPResponse(BaseModel):
status_code: int
text: str

def json(self):
return json.loads(self.text)


class AppleLocation(BaseModel):
date_published: int = Field(alias="datePublished")
payload: str
Expand All @@ -35,37 +57,213 @@ def is_success(self) -> bool:
return self.statusCode == "200"


def apple_fetch(security_headers: dict, ids, minutes_ago: int = 15) -> ResponseDto:
logger.info("Fetching locations from Apple API for %s", ids)
startdate = unix_epoch() - minutes_ago * 60
enddate = unix_epoch()
def apple_fetch(credentials_service: CredentialsService, ids: list[str], minutes_ago: int = 15) -> ResponseDto:
logger.info("Fetching locations from Apple API for %s IDs with %d minutes lookback", len(ids), minutes_ago)
start_date = unix_epoch() - minutes_ago * 60
end_date = unix_epoch()

response = _acsnservice_fetch(security_headers, ids, startdate, enddate)
if is_short_time_range(start_date, end_date):
logger.info("Using ID-only batching strategy (time range < 20 minutes)")
payloads = generate_request_payloads(
device_ids=ids, start_date=start_date, end_date=end_date, device_batch_size=1, time_chunk_size=None
)
else:
logger.info("Using ID+time batching strategy (time range >= 20 minutes)")
# 3600 (seconds in an hour) * 24(hours in a day) = seconds in a day
payloads = generate_request_payloads(
device_ids=ids, start_date=start_date, end_date=end_date, device_batch_size=1, time_chunk_size=3600*24
)

if not status_code_success(response.status_code):
if response.status_code == 401:
raise AppleAuthCredentialsExpired(response.reason)
responses = []
chunks = split_chunks(payloads, 20)
for i, payload_chunk in enumerate(chunks):
logger.info(f"[{i+1}/{len(chunks)}] Processing requests chunk")
responses.extend(asyncio.run(try_fetch_payloads(credentials_service, payload_chunk, max_attempts_per_payload=2)))

logger.error('Error from Apple API: %s %s', response.status_code, response.reason)
return ResponseDto(error=response.reason, statusCode=str(response.status_code))
return merge_successful_responses(responses)

return ResponseDto(**response.json())

def is_short_time_range(start_date: int, end_date: int) -> bool:
twenty_minutes_in_seconds = 20 * 60
return (end_date - start_date) < twenty_minutes_in_seconds

def _acsnservice_fetch(security_headers, ids, startdate, enddate):
"""Fetch from Apple's acsnservice"""
data = {
"search": [
{
"startDate": date_milliseconds(startdate),
"endDate": date_milliseconds(enddate),
"ids": ids,
}
]

def build_acsnservice_payload(ids: list[str], start_date: int, end_date: int) -> dict:
return {
"startDate": date_milliseconds(start_date),
"endDate": date_milliseconds(end_date),
"ids": ids,
}
return requestSession.post(
"https://gateway.icloud.com/acsnservice/fetch",
headers=security_headers,
json=data,
timeout=60,
)


def split_chunks(ids: list, batch_size: int) -> list[list]:
return [ids[i:i + batch_size] for i in range(0, len(ids), batch_size)]


def create_time_chunks(start_date: int, end_date: int, time_chunk_size_in_seconds: int) -> list[tuple[int, int]]:
chunks = []
current_start = start_date

while current_start < end_date:
current_end = min(current_start + time_chunk_size_in_seconds, end_date)
chunks.append((current_start, current_end))
current_start = current_end

return chunks


def generate_request_payloads(device_ids: list[str], start_date: int, end_date: int, device_batch_size: int = 20, time_chunk_size: int = None):
payloads = []
id_batches = split_chunks(device_ids, batch_size=device_batch_size)
logger.info(f"Broke down {len(device_ids)} devices into {len(id_batches)} batches of {device_batch_size} devices each")

time_chunks = [(start_date, end_date)]

if time_chunk_size is not None:
time_chunks = create_time_chunks(start_date, end_date, time_chunk_size)
logger.info(f"Broke down time range into {len(time_chunks)} chunks of {time_chunk_size} seconds each")

payloads = []
for device_id_batch in id_batches:
payloads.extend(
[
build_acsnservice_payload(device_id_batch, time_chunk[0], time_chunk[1])
for time_chunk in time_chunks
]
)

logger.info(f"Created {len(payloads)} payloads")
return payloads


async def try_fetch_payloads(
credentials_service: CredentialsService, payloads: list[dict], max_attempts_per_payload: int = 3,
max_credentials_attempts: int = 10, wait_time_for_credentials_attempt: int = 1
) -> list:
responses = []

queue = deque(payloads)
attempts = {}

failed_payloads = 0
successful_payloads = 0
credentials_attempts = 0

idx = 0

security_headers = credentials_service\
.get_credentials()\
.model_dump(mode='json', by_alias=True)

while len(queue) != 0:
tasks = [
_async_acsnservice_fetch(security_headers, payload["ids"], payload["startDate"], payload["endDate"])
for payload in queue
]
keys = [
" ".join(payload["ids"]) + str(payload["startDate"]) + str(payload["endDate"]) for payload in queue
]

out = await asyncio.gather(*tasks, return_exceptions=True)

any_401 = False
new_queue = []

for payload, response in zip(queue, out):
key = " ".join(payload["ids"]) + str(payload["startDate"]) + str(payload["endDate"])

if isinstance(response, Exception):
logger.warning(f"Caught exception during Apple request: {response}")
if attempts.get(key, 0) <= max_attempts_per_payload:
attempts[key] = attempts.get(key, 0) + 1
new_queue.append(payload)

continue

if not status_code_success(response.status_code):
logger.warning(f"Received {response.status_code} (Full response: `{response.text}`)")

if response.status_code == 401:
any_401 = True

if attempts.get(key, 0) <= max_attempts_per_payload:
attempts[key] = attempts.get(key, 0) + 1
new_queue.append(payload)
else:
responses.append(response)

queue = new_queue

if any_401:
logger.info(
f"Got 401 - waiting for {wait_time_for_credentials_attempt} seconds and fetching credentials again"
)
time.sleep(wait_time_for_credentials_attempt)

if credentials_attempts == max_credentials_attempts:
logger.error(
f"Credential fetching retries exceeded (max retries: {max_credentials_attempts}) - exiting early"
)
raise CredentialsExpired(f"Credential fetching retries exceeded (max retries: {max_credentials_attempts}")

credentials_attempts += 1

security_headers = credentials_service \
.get_credentials() \
.model_dump(mode='json', by_alias=True)

logger.info(f"{len(responses)}/{len(payloads)} responses retrieved")

return responses


def merge_successful_responses(responses: list[AppleHTTPResponse]) -> ResponseDto:
if not responses:
logger.warning("No responses to merge")
return create_empty_response_dto()

if len(responses) == 1:
response_dto = ResponseDto(**responses[0].json())
logger.info("Single response with %d results", len(response_dto.results))
return response_dto

all_results = extract_and_combine_all_results(responses)
logger.info("Merged %d responses into %d total results", len(responses), len(all_results))
return create_merged_response_dto(all_results)


def extract_and_combine_all_results(responses: list[AppleHTTPResponse]) -> list[AppleLocation]:
combined_results = []
for response in responses:
if status_code_success(response.status_code):
response_data = response.json()
response_dto = ResponseDto(**response_data)
combined_results.extend(response_dto.results)
return combined_results


def create_empty_response_dto() -> ResponseDto:
return ResponseDto(results=[], statusCode="200")


def create_merged_response_dto(results: list[AppleLocation]) -> ResponseDto:
return ResponseDto(results=results, statusCode="200")


async def _async_acsnservice_fetch(security_headers, ids, startdate, enddate) -> AppleHTTPResponse:
async with aiohttp.ClientSession(headers=security_headers, timeout=aiohttp.ClientTimeout(total=60)) as session:
out = await session.post(
"https://gateway.icloud.com/acsnservice/fetch",
json={
"search": [
{
"startDate": date_milliseconds(startdate),
"endDate": date_milliseconds(enddate),
"ids": ids,
}
]
},
)

r = AppleHTTPResponse(status_code=out.status, text=await out.text())
return r
50 changes: 50 additions & 0 deletions app/credentials/api.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
from pydantic import BaseModel
import os
from requests import Session

from app.credentials.base import CredentialsService
from app.models import ICloudCredentials
from app.settings import settings

requestSession = Session()


class APICredentialsService(CredentialsService):
def __init__(
self,
api_key: str,
base_url: str = 'https://ghfbaqjy00.execute-api.eu-central-1.amazonaws.com/prod/credentials',
default_client: str = 'space-invader-mac'
):
self.base_url = base_url
self.default_client = default_client
self.api_key = api_key

def get_credentials(self) -> ICloudCredentials:
response = requestSession.get(
url=f'{self.base_url}/{self.default_client}',
headers={
'x-api-key': self.api_key
}
)
if response.status_code != 200:
raise Exception(f"Failed to retrieve credentials: {response.status_code} - {response.text}")
return ICloudCredentials(**response.json())

def update_credentials(self, credentials: ICloudCredentials, schedule_data_fetching: bool = True) -> None:
response = requestSession.put(
url=f'{self.base_url}/{self.default_client}',
headers={
'x-api-key': self.api_key
},
json={
'headers': credentials.model_dump(by_alias=True),
'schedule_data_fetching': schedule_data_fetching
}
)
if response.status_code != 200:
raise Exception(f"Failed to update credentials: {response.status_code} - {response.text}")


api_credentials_service = APICredentialsService(api_key=settings.CREDENTIALS_API_KEY)

13 changes: 13 additions & 0 deletions app/credentials/base.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
import abc

from app.models import ICloudCredentials


class CredentialsService:
@abc.abstractmethod
def update_credentials(self, credentials: ICloudCredentials):
pass

@abc.abstractmethod
def get_credentials(self) -> ICloudCredentials | None:
pass
Loading