Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -44,5 +44,5 @@ command to refresh the credentials (stored on AWS) without scheduling location f
## Local Debug

- After you have executed `python manage.py refresh-credentials` you have one minute before the credentials expire
- Run `python manage.py fetch-locations --trackers E0D4FA128FA9,EC3987ECAA50,CDAA0CCF4128,EDDC7DA1A247,D173D540749D --limit 1000 --hours-ago 48`
- Run `python manage.py fetch-locations --trackers E0D4FA128FA9,EC3987ECAA50,CDAA0CCF4128,EDDC7DA1A247,D173D540749D --limit 1000 --minutes_ago 15`
to fetch the locations of specific trackers
178 changes: 157 additions & 21 deletions app/apple_fetch.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,36 +36,172 @@ def is_success(self) -> bool:


def apple_fetch(security_headers: dict, ids, minutes_ago: int = 15) -> ResponseDto:
logger.info("Fetching locations from Apple API for %s", ids)
startdate = unix_epoch() - minutes_ago * 60
enddate = unix_epoch()
logger.info("Fetching locations from Apple API for %s IDs with %d minutes lookback", len(ids), minutes_ago)
start_date = unix_epoch() - minutes_ago * 60
end_date = unix_epoch()

response = _acsnservice_fetch(security_headers, ids, startdate, enddate)
if is_short_time_range(start_date, end_date):
logger.info("Using ID-only batching strategy (time range < 20 minutes)")
responses = process_with_id_batching_only(security_headers, ids, start_date, end_date)
else:
logger.info("Using ID+time batching strategy (time range >= 20 minutes)")
responses = process_with_id_and_time_batching(security_headers, ids, start_date, end_date)

if not status_code_success(response.status_code):
if response.status_code == 401:
raise AppleAuthCredentialsExpired(response.reason)
failed_response = find_first_failed_response(responses)
if failed_response:
if failed_response.status_code == 401:
raise AppleAuthCredentialsExpired(failed_response.reason)

logger.error('Error from Apple API: %s %s', response.status_code, response.reason)
return ResponseDto(error=response.reason, statusCode=str(response.status_code))
logger.error('Error from Apple API: %s %s', failed_response.status_code, failed_response.reason)
return ResponseDto(error=failed_response.reason, statusCode=str(failed_response.status_code))

return ResponseDto(**response.json())
logger.info("Successfully completed all API requests, merging results")
return merge_successful_responses(responses)


def is_short_time_range(start_date: int, end_date: int) -> bool:
twenty_minutes_in_seconds = 20 * 60
return (end_date - start_date) < twenty_minutes_in_seconds


def process_with_id_batching_only(security_headers: dict, ids: list, start_date: int, end_date: int) -> list:
id_batches = create_id_batches(ids, batch_size=10)
logger.info("Created %d ID batches of size 10", len(id_batches))
return fetch_all_id_batches(security_headers, id_batches, start_date, end_date)


def process_with_id_and_time_batching(security_headers: dict, ids: list, start_date: int, end_date: int) -> list:
id_batches = create_id_batches(ids, batch_size=1)
time_chunks = create_daily_time_chunks(start_date, end_date)
total_requests = len(id_batches) * len(time_chunks)
logger.info("Created %d ID batches (size 1) × %d time chunks = %d total requests",
len(id_batches), len(time_chunks), total_requests)
return fetch_all_batch_combinations(security_headers, id_batches, time_chunks)


def create_id_batches(ids: list, batch_size: int) -> list[list]:
return [ids[i:i + batch_size] for i in range(0, len(ids), batch_size)]


def create_hourly_time_chunks(start_date: int, end_date: int) -> list[tuple[int, int]]:
one_hour_in_seconds = 3600
chunks = []
current_start = start_date

while current_start < end_date:
current_end = min(current_start + one_hour_in_seconds, end_date)
chunks.append((current_start, current_end))
current_start = current_end

return chunks


def create_daily_time_chunks(start_date: int, end_date: int) -> list[tuple[int, int]]:
one_day_in_seconds = 86400
chunks = []
current_start = start_date

while current_start < end_date:
current_end = min(current_start + one_day_in_seconds, end_date)
chunks.append((current_start, current_end))
current_start = current_end

return chunks


def fetch_all_id_batches(security_headers: dict, id_batches: list[list], start_date: int, end_date: int) -> list:
responses = []
total_batches = len(id_batches)

for batch_idx, id_batch in enumerate(id_batches, 1):
logger.info("Processing ID batch %d/%d (IDs: %s)", batch_idx, total_batches, len(id_batch))
response = _acsnservice_fetch(security_headers, id_batch, start_date, end_date)
responses.append(response)

if not status_code_success(response.status_code):
logger.warning("Request failed with status %d, stopping batch processing", response.status_code)
break

logger.info("Completed %d ID batch requests", len(responses))
return responses


def fetch_all_batch_combinations(security_headers: dict, id_batches: list[list],
time_chunks: list[tuple[int, int]]) -> list:
responses = []
total_requests = len(id_batches) * len(time_chunks)
current_request = 0

for id_batch_idx, id_batch in enumerate(id_batches, 1):
logger.info("Processing ID batch %d/%d", id_batch_idx, len(id_batches))

for chunk_idx, (chunk_start, chunk_end) in enumerate(time_chunks, 1):
current_request += 1
logger.info(" Time chunk %d/%d (request %d/%d)",
chunk_idx, len(time_chunks), current_request, total_requests)
response = _acsnservice_fetch(security_headers, id_batch, chunk_start, chunk_end)
responses.append(response)

if not status_code_success(response.status_code):
logger.warning("Request failed with status %d, stopping batch processing", response.status_code)
return responses

logger.info("Completed all %d combined batch requests", total_requests)
return responses


def find_first_failed_response(responses: list):
for response in responses:
if not status_code_success(response.status_code):
return response
return None


def merge_successful_responses(responses: list) -> ResponseDto:
if not responses:
logger.warning("No responses to merge")
return create_empty_response_dto()

if len(responses) == 1:
response_dto = ResponseDto(**responses[0].json())
logger.info("Single response with %d results", len(response_dto.results))
return response_dto

all_results = extract_and_combine_all_results(responses)
logger.info("Merged %d responses into %d total results", len(responses), len(all_results))
return create_merged_response_dto(all_results)


def extract_and_combine_all_results(responses: list) -> list[AppleLocation]:
combined_results = []
for response in responses:
if status_code_success(response.status_code):
response_data = response.json()
response_dto = ResponseDto(**response_data)
combined_results.extend(response_dto.results)
return combined_results


def create_empty_response_dto() -> ResponseDto:
return ResponseDto(results=[], statusCode="200")


def create_merged_response_dto(results: list[AppleLocation]) -> ResponseDto:
return ResponseDto(results=results, statusCode="200")


def _acsnservice_fetch(security_headers, ids, startdate, enddate):
"""Fetch from Apple's acsnservice"""
data = {
"search": [
{
"startDate": date_milliseconds(startdate),
"endDate": date_milliseconds(enddate),
"ids": ids,
}
]
}
return requestSession.post(
"https://gateway.icloud.com/acsnservice/fetch",
headers=security_headers,
json=data,
json={
"search": [
{
"startDate": date_milliseconds(startdate),
"endDate": date_milliseconds(enddate),
"ids": ids,
}
]
},
timeout=60,
)
2 changes: 1 addition & 1 deletion app/sqs.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ def schedule_device_location_metadata_enrichment(
message = {
"page": page,
"limit": batch_size,
"hours_ago": 1,
"minutes_ago": 15,
}
message_group_id = f'page-processing-group_{str(uuid.uuid4())}'

Expand Down
4 changes: 2 additions & 2 deletions entrypoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ def fetch_locations_and_report(event, context):
try:
page = int(message_body['page'])
limit = int(message_body['limit'])
hours_ago = int(message_body.get('hours_ago', 1))
minutes_ago = int(message_body.get('minutes_ago', 1))
except (ValueError, TypeError):
logger.error(f"Invalid page value: {message_body['page']}. Must be an integer.")
return {
Expand All @@ -99,7 +99,7 @@ def fetch_locations_and_report(event, context):
security_headers,
page=page,
limit=limit,
hours_ago=hours_ago)
minutes_ago=15)

return {
"statusCode": 200,
Expand Down
12 changes: 6 additions & 6 deletions manage.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,27 +21,27 @@ def cli():

@cli.command()
@click.option(
'--trackers', '-t', default='E0D4FA128FA9,EC3987ECAA50,CDAA0CCF4128,EDDC7DA1A247,D173D540749D',
help='Comma-separated list of trackers to fetch locations for'
'--trackers', '-t', default='',
help='Comma-separated list of trackers. E.g: E0D4FA128FA9,EC3987ECAA50,CDAA0CCF4128,EDDC7DA1A247,D173D540749D'
)
@click.option('--limit', '-l', default=2500, help='Number of locations to fetch')
@click.option('--page', '-p', default=0, help='Page number for pagination')
@click.option('--hours-ago', '-ha', default=24, help='Number of hours ago to fetch locations for')
@click.option('--minutes-ago', '-ma', default=24, help='Number of minutes ago to fetch locations for')
@click.option('--send-reports', '-s', is_flag=True, default=False, help='Whether to send reports')
def fetch_locations(
trackers: str,
limit: int,
page: int,
send_reports: bool,
hours_ago: int
minutes_ago: int
) -> None:
tracker_ids = set(trackers.split(','))
tracker_ids = set(trackers.split(',')) if trackers else None
resolve_locations(
tracker_ids=tracker_ids,
limit=limit,
page=page,
send_reports=send_reports,
minutes=hours_ago * 60,
minutes_ago=minutes_ago,
print_report=True,
)

Expand Down