-
Notifications
You must be signed in to change notification settings - Fork 7
Expand file tree
/
Copy pathr2.py
More file actions
executable file
·49 lines (42 loc) · 1.74 KB
/
r2.py
File metadata and controls
executable file
·49 lines (42 loc) · 1.74 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
import time
import logging
import aioboto3
from io import BytesIO
import json
logger = logging.getLogger(__name__)
class R2Storage:
"""
Simple Cloudflare r2 storage connection
:param endpoint_url: Your Cloudflare endpoint url
:type endpoint_url: string
:param key_id: Your key id
:type key_id: string
:param access_key: Your access key
:type access_key: string
:param bucket: R2 bucket name
:type bucket: string
:param bucket_url: R2 bucket public url
:type bucket_url: string
"""
def __init__(self, endpoint_url, key_id, access_key, bucket, bucket_url):
self.session = aioboto3.Session(key_id, access_key)
self.endpoint_url = endpoint_url
self.bucket = bucket
self.bucket_url = bucket_url
async def upload_file(self, file, path):
async with self.session.client("s3", endpoint_url=self.endpoint_url) as client:
start = time.perf_counter()
if isinstance(file, str):
# If the file is a string path, open the file and read it into a BytesIO object
with open(file, "rb") as f:
file_obj = BytesIO(f.read())
await client.put_object(Bucket=self.bucket, Key=path, Body=file_obj)
else:
# If file is already a file-like object, upload directly
await client.put_object(Bucket=self.bucket, Key=path, Body=file)
end = time.perf_counter()
logger.info(f"Uploaded file in {end - start:.2f} seconds.")
return self.bucket_url + "/" + path
async def upload_json(self, data: dict, path: str):
file_obj = BytesIO(json.dumps(data, default=str).encode("utf-8"))
return await self.upload_file(file_obj, path)