44
55import asyncio
66import logging
7- from typing import cast
7+ from threading import Thread
8+ from typing import Any , cast
89
9- from aiobotocore .client import AioBaseClient as BotoClient
1010from aiobotocore .session import AioSession
1111from botocore .exceptions import ClientError , ConnectionError , ParamValidationError
1212
2525 DOMAIN ,
2626)
2727
28- type S3CompatibleConfigEntry = ConfigEntry [BotoClient ]
29-
30-
3128_LOGGER = logging .getLogger (__name__ )
3229
3330
34- def _verify_s3_credentials (
35- endpoint_url : str | None ,
36- access_key : str ,
37- secret_key : str ,
38- region : str ,
39- bucket : str ,
40- ) -> None :
41- """Verify S3 credentials in executor to avoid blocking the event loop.
31+ class S3ClientWrapper :
32+ """Wrapper for S3 client that runs all operations in a dedicated worker thread.
4233
43- This creates a temporary client in a separate thread with its own event loop.
44- All blocking operations (botocore data loading , SSL certificate loading)
45- happen here, warming the OS-level caches for subsequent client creation .
34+ This avoids blocking the main Home Assistant event loop with botocore's
35+ synchronous I/O operations (listdir, file reads , SSL certificate loading).
36+ All S3 operations are dispatched to a worker thread with its own event loop .
4637 """
4738
48- async def _verify () -> None :
39+ def __init__ (
40+ self ,
41+ endpoint_url : str | None ,
42+ access_key : str ,
43+ secret_key : str ,
44+ region : str ,
45+ bucket : str ,
46+ ) -> None :
47+ """Initialize the wrapper and start the worker thread."""
48+ self ._endpoint_url = endpoint_url
49+ self ._access_key = access_key
50+ self ._secret_key = secret_key
51+ self ._region = region
52+ self ._bucket = bucket
53+ self ._client : Any = None
54+ self ._loop : asyncio .AbstractEventLoop | None = None
55+ self ._thread : Thread | None = None
56+ self ._started = False
57+
58+ def _run_worker_loop (self ) -> None :
59+ """Run the worker event loop in a dedicated thread."""
60+ self ._loop = asyncio .new_event_loop ()
61+ asyncio .set_event_loop (self ._loop )
62+ self ._loop .run_forever ()
63+
64+ async def _create_client (self ) -> None :
65+ """Create the S3 client (runs in worker thread)."""
4966 session = AioSession ()
50- async with session .create_client (
67+ # pylint: disable-next=unnecessary-dunder-call
68+ self ._client = await session .create_client (
5169 "s3" ,
52- endpoint_url = endpoint_url ,
53- aws_secret_access_key = secret_key ,
54- aws_access_key_id = access_key ,
55- region_name = region ,
56- ) as client :
57- await client .head_bucket (Bucket = bucket )
70+ endpoint_url = self ._endpoint_url ,
71+ aws_secret_access_key = self ._secret_key ,
72+ aws_access_key_id = self ._access_key ,
73+ region_name = self ._region ,
74+ ).__aenter__ ()
75+ # Verify credentials and warm SSL context
76+ await self ._client .head_bucket (Bucket = self ._bucket )
77+
78+ async def _close_client (self ) -> None :
79+ """Close the S3 client (runs in worker thread)."""
80+ if self ._client :
81+ await self ._client .__aexit__ (None , None , None )
82+ self ._client = None
83+
84+ def start (self ) -> None :
85+ """Start the worker thread and create the client.
86+
87+ This method blocks until the client is ready.
88+ Should be called from an executor via hass.async_add_executor_job.
89+ """
90+ if self ._started :
91+ return
92+
93+ # Start worker thread
94+ self ._thread = Thread (target = self ._run_worker_loop , daemon = True )
95+ self ._thread .start ()
96+
97+ # Wait for loop to be ready
98+ while self ._loop is None :
99+ pass
100+
101+ # Create client in worker thread
102+ future = asyncio .run_coroutine_threadsafe (self ._create_client (), self ._loop )
103+ future .result () # Block until client is created
104+ self ._started = True
105+
106+ def stop (self ) -> None :
107+ """Stop the worker thread and close the client.
108+
109+ Should be called from an executor via hass.async_add_executor_job.
110+ """
111+ if not self ._started or self ._loop is None :
112+ return
113+
114+ # Close client in worker thread
115+ future = asyncio .run_coroutine_threadsafe (self ._close_client (), self ._loop )
116+ future .result ()
117+
118+ # Stop the loop
119+ self ._loop .call_soon_threadsafe (self ._loop .stop )
120+ if self ._thread :
121+ self ._thread .join (timeout = 5 )
122+ self ._started = False
123+
124+ async def _dispatch [T ](self , coro : Any ) -> T :
125+ """Dispatch a coroutine to the worker thread and await its result."""
126+ if self ._loop is None :
127+ raise RuntimeError ("Worker loop not started" )
128+ future = asyncio .run_coroutine_threadsafe (coro , self ._loop )
129+ return await asyncio .wrap_future (future )
58130
59- asyncio .run (_verify ())
131+ async def head_bucket (self , ** kwargs : Any ) -> dict [str , Any ]:
132+ """Check if a bucket exists and is accessible."""
133+ return await self ._dispatch (self ._client .head_bucket (** kwargs ))
134+
135+ async def list_objects_v2 (self , ** kwargs : Any ) -> dict [str , Any ]:
136+ """List objects in a bucket."""
137+ return await self ._dispatch (self ._client .list_objects_v2 (** kwargs ))
138+
139+ async def get_object (self , ** kwargs : Any ) -> dict [str , Any ]:
140+ """Get an object from a bucket."""
141+ return await self ._dispatch (self ._client .get_object (** kwargs ))
142+
143+ async def put_object (self , ** kwargs : Any ) -> dict [str , Any ]:
144+ """Put an object into a bucket."""
145+ return await self ._dispatch (self ._client .put_object (** kwargs ))
146+
147+ async def delete_object (self , ** kwargs : Any ) -> dict [str , Any ]:
148+ """Delete an object from a bucket."""
149+ return await self ._dispatch (self ._client .delete_object (** kwargs ))
150+
151+ async def create_multipart_upload (self , ** kwargs : Any ) -> dict [str , Any ]:
152+ """Initiate a multipart upload."""
153+ return await self ._dispatch (self ._client .create_multipart_upload (** kwargs ))
154+
155+ async def upload_part (self , ** kwargs : Any ) -> dict [str , Any ]:
156+ """Upload a part in a multipart upload."""
157+ return await self ._dispatch (self ._client .upload_part (** kwargs ))
158+
159+ async def complete_multipart_upload (self , ** kwargs : Any ) -> dict [str , Any ]:
160+ """Complete a multipart upload."""
161+ return await self ._dispatch (self ._client .complete_multipart_upload (** kwargs ))
162+
163+ async def abort_multipart_upload (self , ** kwargs : Any ) -> dict [str , Any ]:
164+ """Abort a multipart upload."""
165+ return await self ._dispatch (self ._client .abort_multipart_upload (** kwargs ))
166+
167+
168+ type S3CompatibleConfigEntry = ConfigEntry [S3ClientWrapper ]
60169
61170
62171async def async_setup_entry (hass : HomeAssistant , entry : S3CompatibleConfigEntry ) -> bool :
@@ -65,29 +174,18 @@ async def async_setup_entry(hass: HomeAssistant, entry: S3CompatibleConfigEntry)
65174 data = cast (dict , entry .data )
66175 region = data .get (CONF_REGION , DEFAULT_REGION )
67176
177+ # Create wrapper that will run S3 client in dedicated worker thread
178+ wrapper = S3ClientWrapper (
179+ endpoint_url = data .get (CONF_ENDPOINT_URL ),
180+ access_key = data [CONF_ACCESS_KEY_ID ],
181+ secret_key = data [CONF_SECRET_ACCESS_KEY ],
182+ region = region ,
183+ bucket = data [CONF_BUCKET ],
184+ )
185+
68186 try :
69- # Verify credentials in executor to avoid blocking the event loop
70- # with botocore's synchronous I/O operations (listdir, file reads, SSL loading)
71- await hass .async_add_executor_job (
72- _verify_s3_credentials ,
73- data .get (CONF_ENDPOINT_URL ),
74- data [CONF_ACCESS_KEY_ID ],
75- data [CONF_SECRET_ACCESS_KEY ],
76- region ,
77- data [CONF_BUCKET ],
78- )
79-
80- # Create the actual client for runtime use
81- # OS-level caches are now warm from the verification step
82- session = AioSession ()
83- # pylint: disable-next=unnecessary-dunder-call
84- client = await session .create_client (
85- "s3" ,
86- endpoint_url = data .get (CONF_ENDPOINT_URL ),
87- aws_secret_access_key = data [CONF_SECRET_ACCESS_KEY ],
88- aws_access_key_id = data [CONF_ACCESS_KEY_ID ],
89- region_name = region ,
90- ).__aenter__ ()
187+ # Start wrapper in executor - all blocking I/O happens in worker thread
188+ await hass .async_add_executor_job (wrapper .start )
91189 except ClientError as err :
92190 raise ConfigEntryAuthFailed (
93191 translation_domain = DOMAIN ,
@@ -111,7 +209,7 @@ async def async_setup_entry(hass: HomeAssistant, entry: S3CompatibleConfigEntry)
111209 translation_key = "cannot_connect" ,
112210 ) from err
113211
114- entry .runtime_data = client
212+ entry .runtime_data = wrapper
115213
116214 def notify_backup_listeners () -> None :
117215 for listener in hass .data .get (DATA_BACKUP_AGENT_LISTENERS , []):
@@ -124,6 +222,6 @@ def notify_backup_listeners() -> None:
124222
125223async def async_unload_entry (hass : HomeAssistant , entry : S3CompatibleConfigEntry ) -> bool :
126224 """Unload a config entry."""
127- client = entry .runtime_data
128- await client . __aexit__ ( None , None , None )
225+ wrapper = entry .runtime_data
226+ await hass . async_add_executor_job ( wrapper . stop )
129227 return True
0 commit comments