-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathgee_folder_manager.py
More file actions
104 lines (80 loc) · 3.29 KB
/
gee_folder_manager.py
File metadata and controls
104 lines (80 loc) · 3.29 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
"""Manage google drive programatically"""
import argparse
import logging
import os
import sys
from googleapiclient.errors import HttpError
from googleapiclient.discovery import build
from google.oauth2.service_account import Credentials
logging.basicConfig(
level=logging.WARNING,
format=(
'%(asctime)s (%(relativeCreated)d) %(levelname)s %(name)s'
' [%(funcName)s:%(lineno)d] %(message)s'),
stream=sys.stdout)
LOGGER = logging.getLogger(__name__)
LOGGER.setLevel(logging.DEBUG)
def delete_file_callback(request_id, response, exception):
if exception is not None:
print(f"Failed to delete file (request_id: {request_id}): {exception}")
elif isinstance(response, dict) and 'id' in response:
print(f"Deleted file: {response['id']}")
else:
print(f"Unexpected response for request_id {request_id}: {response}")
def delete_all_files_in_folder(folder_id):
gee_key_path = os.environ['GEE_KEY_PATH']
SCOPES = ['https://www.googleapis.com/auth/drive']
# Authenticate the service account
credentials = Credentials.from_service_account_file(gee_key_path, scopes=SCOPES)
service = build('drive', 'v3', credentials=credentials)
try:
query = f"'{folder_id}' in parents and trashed = false"
response = service.files().list(q=query).execute()
files = response.get('files', [])
batch = service.new_batch_http_request(callback=delete_file_callback)
for file in files:
batch.add(service.files().delete(fileId=file['id']))
batch.execute()
print("Batch deletion completed.")
print("All files in the folder have been deleted.")
except HttpError as error:
print(f"An error occurred: {error}")
def make_shared_folder(folder_name):
# Path to your service account key file
gee_key_path = os.environ['GEE_KEY_PATH']
SCOPES = ['https://www.googleapis.com/auth/drive']
# Authenticate the service account
credentials = Credentials.from_service_account_file(gee_key_path, scopes=SCOPES)
drive_service = build('drive', 'v3', credentials=credentials)
# Create a folder in the service account's Drive
folder_metadata = {
'name': folder_name,
'mimeType': 'application/vnd.google-apps.folder'
}
folder = drive_service.files().create(body=folder_metadata, fields='id').execute()
folder_id = folder.get('id')
print(f"Folder created with ID: {folder_id}")
# Share the folder with your personal Google account
user_permission = {
'type': 'user',
'role': 'writer',
'emailAddress': 'richpsharp@gmail.com'
}
drive_service.permissions().create(
fileId=folder_id,
body=user_permission,
fields='id'
).execute()
print("Folder shared successfully!")
def main():
parser = argparse.ArgumentParser(description='folder manager')
parser.add_argument('--delete', metavar='FOLDER_ID', type=str, help='Specify the folder ID to delete.')
parser.add_argument('--create', metavar='FOLDER_NAME', type=str, help='Specify the name of the folder to create.')
args = parser.parse_args()
if args.delete:
delete_all_files_in_folder(args.delete)
if args.create:
make_shared_folder(args.create)
LOGGER.info('all done')
if __name__ == '__main__':
main()