11"""
22Fetch from Apple's acsnservice
33"""
4+ import asyncio
5+ import datetime
6+ import json
47import logging
5- from requests import Session
8+ import time
9+ from collections import deque
610
11+ from requests import Session
12+ from app .credentials .base import CredentialsService
713from app .exceptions import AppleAuthCredentialsExpired
814from app .helpers import status_code_success
915from app .date import unix_epoch , date_milliseconds
1016from pydantic import BaseModel , Field
1117
12- requestSession = Session ()
18+ from app .settings import settings
19+ from typing import TypedDict
20+
21+ import aiohttp
22+
1323logger = logging .getLogger (__name__ )
1424
1525
26+ class CredentialsExpired (Exception ):
27+ pass
28+
29+
30+ class AppleHTTPResponse (BaseModel ):
31+ status_code : int
32+ text : str
33+
34+ def json (self ):
35+ return json .loads (self .text )
36+
37+
1638class AppleLocation (BaseModel ):
1739 date_published : int = Field (alias = "datePublished" )
1840 payload : str
@@ -35,37 +57,213 @@ def is_success(self) -> bool:
3557 return self .statusCode == "200"
3658
3759
38- def apple_fetch (security_headers : dict , ids , minutes_ago : int = 15 ) -> ResponseDto :
39- logger .info ("Fetching locations from Apple API for %s" , ids )
40- startdate = unix_epoch () - minutes_ago * 60
41- enddate = unix_epoch ()
60+ def apple_fetch (credentials_service : CredentialsService , ids : list [ str ] , minutes_ago : int = 15 ) -> ResponseDto :
61+ logger .info ("Fetching locations from Apple API for %s IDs with %d minutes lookback " , len ( ids ), minutes_ago )
62+ start_date = unix_epoch () - minutes_ago * 60
63+ end_date = unix_epoch ()
4264
43- response = _acsnservice_fetch (security_headers , ids , startdate , enddate )
65+ if is_short_time_range (start_date , end_date ):
66+ logger .info ("Using ID-only batching strategy (time range < 20 minutes)" )
67+ payloads = generate_request_payloads (
68+ device_ids = ids , start_date = start_date , end_date = end_date , device_batch_size = 1 , time_chunk_size = None
69+ )
70+ else :
71+ logger .info ("Using ID+time batching strategy (time range >= 20 minutes)" )
72+ # 3600 (seconds in an hour) * 24(hours in a day) = seconds in a day
73+ payloads = generate_request_payloads (
74+ device_ids = ids , start_date = start_date , end_date = end_date , device_batch_size = 1 , time_chunk_size = 3600 * 24
75+ )
4476
45- if not status_code_success (response .status_code ):
46- if response .status_code == 401 :
47- raise AppleAuthCredentialsExpired (response .reason )
77+ responses = []
78+ chunks = split_chunks (payloads , 20 )
79+ for i , payload_chunk in enumerate (chunks ):
80+ logger .info (f"[{ i + 1 } /{ len (chunks )} ] Processing requests chunk" )
81+ responses .extend (asyncio .run (try_fetch_payloads (credentials_service , payload_chunk , max_attempts_per_payload = 2 )))
4882
49- logger .error ('Error from Apple API: %s %s' , response .status_code , response .reason )
50- return ResponseDto (error = response .reason , statusCode = str (response .status_code ))
83+ return merge_successful_responses (responses )
5184
52- return ResponseDto (** response .json ())
5385
86+ def is_short_time_range (start_date : int , end_date : int ) -> bool :
87+ twenty_minutes_in_seconds = 20 * 60
88+ return (end_date - start_date ) < twenty_minutes_in_seconds
5489
55- def _acsnservice_fetch (security_headers , ids , startdate , enddate ):
56- """Fetch from Apple's acsnservice"""
57- data = {
58- "search" : [
59- {
60- "startDate" : date_milliseconds (startdate ),
61- "endDate" : date_milliseconds (enddate ),
62- "ids" : ids ,
63- }
64- ]
90+
91+ def build_acsnservice_payload (ids : list [str ], start_date : int , end_date : int ) -> dict :
92+ return {
93+ "startDate" : date_milliseconds (start_date ),
94+ "endDate" : date_milliseconds (end_date ),
95+ "ids" : ids ,
6596 }
66- return requestSession .post (
67- "https://gateway.icloud.com/acsnservice/fetch" ,
68- headers = security_headers ,
69- json = data ,
70- timeout = 60 ,
71- )
97+
98+
99+ def split_chunks (ids : list , batch_size : int ) -> list [list ]:
100+ return [ids [i :i + batch_size ] for i in range (0 , len (ids ), batch_size )]
101+
102+
103+ def create_time_chunks (start_date : int , end_date : int , time_chunk_size_in_seconds : int ) -> list [tuple [int , int ]]:
104+ chunks = []
105+ current_start = start_date
106+
107+ while current_start < end_date :
108+ current_end = min (current_start + time_chunk_size_in_seconds , end_date )
109+ chunks .append ((current_start , current_end ))
110+ current_start = current_end
111+
112+ return chunks
113+
114+
115+ def generate_request_payloads (device_ids : list [str ], start_date : int , end_date : int , device_batch_size : int = 20 , time_chunk_size : int = None ):
116+ payloads = []
117+ id_batches = split_chunks (device_ids , batch_size = device_batch_size )
118+ logger .info (f"Broke down { len (device_ids )} devices into { len (id_batches )} batches of { device_batch_size } devices each" )
119+
120+ time_chunks = [(start_date , end_date )]
121+
122+ if time_chunk_size is not None :
123+ time_chunks = create_time_chunks (start_date , end_date , time_chunk_size )
124+ logger .info (f"Broke down time range into { len (time_chunks )} chunks of { time_chunk_size } seconds each" )
125+
126+ payloads = []
127+ for device_id_batch in id_batches :
128+ payloads .extend (
129+ [
130+ build_acsnservice_payload (device_id_batch , time_chunk [0 ], time_chunk [1 ])
131+ for time_chunk in time_chunks
132+ ]
133+ )
134+
135+ logger .info (f"Created { len (payloads )} payloads" )
136+ return payloads
137+
138+
139+ async def try_fetch_payloads (
140+ credentials_service : CredentialsService , payloads : list [dict ], max_attempts_per_payload : int = 3 ,
141+ max_credentials_attempts : int = 10 , wait_time_for_credentials_attempt : int = 1
142+ ) -> list :
143+ responses = []
144+
145+ queue = deque (payloads )
146+ attempts = {}
147+
148+ failed_payloads = 0
149+ successful_payloads = 0
150+ credentials_attempts = 0
151+
152+ idx = 0
153+
154+ security_headers = credentials_service \
155+ .get_credentials ()\
156+ .model_dump (mode = 'json' , by_alias = True )
157+
158+ while len (queue ) != 0 :
159+ tasks = [
160+ _async_acsnservice_fetch (security_headers , payload ["ids" ], payload ["startDate" ], payload ["endDate" ])
161+ for payload in queue
162+ ]
163+ keys = [
164+ " " .join (payload ["ids" ]) + str (payload ["startDate" ]) + str (payload ["endDate" ]) for payload in queue
165+ ]
166+
167+ out = await asyncio .gather (* tasks , return_exceptions = True )
168+
169+ any_401 = False
170+ new_queue = []
171+
172+ for payload , response in zip (queue , out ):
173+ key = " " .join (payload ["ids" ]) + str (payload ["startDate" ]) + str (payload ["endDate" ])
174+
175+ if isinstance (response , Exception ):
176+ logger .warning (f"Caught exception during Apple request: { response } " )
177+ if attempts .get (key , 0 ) <= max_attempts_per_payload :
178+ attempts [key ] = attempts .get (key , 0 ) + 1
179+ new_queue .append (payload )
180+
181+ continue
182+
183+ if not status_code_success (response .status_code ):
184+ logger .warning (f"Received { response .status_code } (Full response: `{ response .text } `)" )
185+
186+ if response .status_code == 401 :
187+ any_401 = True
188+
189+ if attempts .get (key , 0 ) <= max_attempts_per_payload :
190+ attempts [key ] = attempts .get (key , 0 ) + 1
191+ new_queue .append (payload )
192+ else :
193+ responses .append (response )
194+
195+ queue = new_queue
196+
197+ if any_401 :
198+ logger .info (
199+ f"Got 401 - waiting for { wait_time_for_credentials_attempt } seconds and fetching credentials again"
200+ )
201+ time .sleep (wait_time_for_credentials_attempt )
202+
203+ if credentials_attempts == max_credentials_attempts :
204+ logger .error (
205+ f"Credential fetching retries exceeded (max retries: { max_credentials_attempts } ) - exiting early"
206+ )
207+ raise CredentialsExpired (f"Credential fetching retries exceeded (max retries: { max_credentials_attempts } " )
208+
209+ credentials_attempts += 1
210+
211+ security_headers = credentials_service \
212+ .get_credentials () \
213+ .model_dump (mode = 'json' , by_alias = True )
214+
215+ logger .info (f"{ len (responses )} /{ len (payloads )} responses retrieved" )
216+
217+ return responses
218+
219+
220+ def merge_successful_responses (responses : list [AppleHTTPResponse ]) -> ResponseDto :
221+ if not responses :
222+ logger .warning ("No responses to merge" )
223+ return create_empty_response_dto ()
224+
225+ if len (responses ) == 1 :
226+ response_dto = ResponseDto (** responses [0 ].json ())
227+ logger .info ("Single response with %d results" , len (response_dto .results ))
228+ return response_dto
229+
230+ all_results = extract_and_combine_all_results (responses )
231+ logger .info ("Merged %d responses into %d total results" , len (responses ), len (all_results ))
232+ return create_merged_response_dto (all_results )
233+
234+
235+ def extract_and_combine_all_results (responses : list [AppleHTTPResponse ]) -> list [AppleLocation ]:
236+ combined_results = []
237+ for response in responses :
238+ if status_code_success (response .status_code ):
239+ response_data = response .json ()
240+ response_dto = ResponseDto (** response_data )
241+ combined_results .extend (response_dto .results )
242+ return combined_results
243+
244+
245+ def create_empty_response_dto () -> ResponseDto :
246+ return ResponseDto (results = [], statusCode = "200" )
247+
248+
249+ def create_merged_response_dto (results : list [AppleLocation ]) -> ResponseDto :
250+ return ResponseDto (results = results , statusCode = "200" )
251+
252+
253+ async def _async_acsnservice_fetch (security_headers , ids , startdate , enddate ) -> AppleHTTPResponse :
254+ async with aiohttp .ClientSession (headers = security_headers , timeout = aiohttp .ClientTimeout (total = 60 )) as session :
255+ out = await session .post (
256+ "https://gateway.icloud.com/acsnservice/fetch" ,
257+ json = {
258+ "search" : [
259+ {
260+ "startDate" : date_milliseconds (startdate ),
261+ "endDate" : date_milliseconds (enddate ),
262+ "ids" : ids ,
263+ }
264+ ]
265+ },
266+ )
267+
268+ r = AppleHTTPResponse (status_code = out .status , text = await out .text ())
269+ return r
0 commit comments