diff --git a/azhdankin/README.md b/azhdankin/README.md new file mode 100644 index 00000000..834c3368 --- /dev/null +++ b/azhdankin/README.md @@ -0,0 +1,25 @@ +# Multi-threaded file uploader + +# Overview + +This is a python application that uploads a set of given files to a cloud object storage in parallel through the cloud provider's or third party API. + +# Features + +1. Support up to 100,000nds of files, all inside one directory with arbitrary sizes. The root directory may contain subdirectories. +2. The object storage container holds the objects is private and only credential-based access is allowed. +3. Each object inside object storage has an associated metadata which contains file size, last modification time and file permissions. + + The utility is fast (utilizes full network bandwidth), consumes low CPU (low enough not block all other processes) and low Memory (<25%) + It supports GCP Cloud Storage, however it has a modular and Object oriented implementation so the other cloud providers can be added. + +# Prerequisites + You must have Python 3.8 and the Google Cloud Storage Python client installed. + +# To run + 1. Clone the git repository. Make sure the create_files.py and upload_files.py file permissions are set to "executable". + 2. Run ./create_files.py utility. This utility will create the files that need to be uploaded to the cloud storage. You can set the + number of file sto be created as a cmd line parameter (i.e. ./create_files.py 10000 to create 10000 files). + 3. Run ./upload_files.py to upload the files to the Cloud Storage. + +# Notes diff --git a/azhdankin/cloud_storage.py b/azhdankin/cloud_storage.py new file mode 100644 index 00000000..807a301b --- /dev/null +++ b/azhdankin/cloud_storage.py @@ -0,0 +1,41 @@ +""" +This module contains the definitions for the CloudStorage base class which acta like an "abstract class" or +the equivalent of the "interface" for the Cloud storage. +It also has an implementation of the Cloud storage implememntation for GCP, which allows to upload the files. +""" + +from google.cloud import storage + +#Base CloudStorage class +class CloudStorage: + def __init__(self, name): + self.name = name + + def upload_object(self, object_name, source_file_name): + pass + + def __str__(self): + return self.name + +#Cloud storage implementation for GCP +class CloudStorageGCP(CloudStorage): + def __init__(self, bucket_name, project=None): + super().__init__("CloudStorageGCP") + self.project = project + self.bucket_name = bucket_name + self.client = storage.Client(project=self.project) + + #Resolve the reference to the destination bucket + self.bucket = self.client.bucket(self.bucket_name) + + #If target bucket does not exist it will be created + if not self.bucket.exists(): + self.bucket = self.client.create_bucket(self.bucket_name) + + def upload_object(self, object_name, source_file_name): + # Create a new blob object + blob = self.bucket.blob(object_name) + # Upload the file to the bucket + blob.upload_from_filename(source_file_name) + + diff --git a/azhdankin/create_files.py b/azhdankin/create_files.py new file mode 100755 index 00000000..64d23e9a --- /dev/null +++ b/azhdankin/create_files.py @@ -0,0 +1,57 @@ +#!python +""" File creation utility. + +This is a utility to crate the directory and sample files which +will be used for transfer to a cloud storage. + +The utility takes one command line parameter: number of files to create. +If the parameter is not given by default it will create 10 files. + +The file creation is performed by copying the content of the ./seed-file.txt +content n-times (where n is a randomly generated number in a range from 1 to a 100) +into a destination file and naming the destination file by appending sequentially +incremented number to the base file name. + +""" +import sys +import os +import random + +#Performs the file creation +def create_files (path, num_files): + #Specify the "seed" for the generated files' content. + seed_file = "./seed-file.txt" + name_prefix = "file-2-upload" + + #Create the destination directory if it does not exist + if not os.path.exists(path): + os.makedirs(path) + + #Populate the seed string for the files to be created and initialize the content + file=open(seed_file,"r") + seed_content = file.read() + target_file_content = "" + + #Create the files for upload + for target_file_idx in range (0, num_files): + #Replicate the seed content a random number of times + repeat = random.randint(1,100) + for chunk_num in range (0, repeat): + target_file_content = target_file_content + seed_content + target_file = open (path + name_prefix + str(target_file_idx) + ".txt", 'w') + target_file.write (target_file_content) + target_file_content = "" + + +#Set the root of the files location and the name prefix for the files to be generated +path = "./files/" + +#Set the default number of files to be generated +num_files = 10 + +#Read the number of files to be generated from the cmd line if provided +if len(sys.argv) > 1: + num_files = int(sys.argv[1]) + +create_files(path, num_files) + diff --git a/azhdankin/seed-file.txt b/azhdankin/seed-file.txt new file mode 100644 index 00000000..6c1513ee --- /dev/null +++ b/azhdankin/seed-file.txt @@ -0,0 +1,3 @@ +dsgsddhjdnkdhfkdfjhekfndsmcndkfhed +dhfdhfgefgejfhefhekfekfhekfekf +jhdfhejfgejfgejgfehgfehgfehgfhegf diff --git a/azhdankin/test_create_files.py b/azhdankin/test_create_files.py new file mode 100755 index 00000000..3b8da6a5 --- /dev/null +++ b/azhdankin/test_create_files.py @@ -0,0 +1,40 @@ +#Test for file creation utility. +import glob +import os +from create_files import create_files + +#Perform test of file creation +def test_create_files(): + + #Set the root of the files location and the name prefix for the files to be generated + path = "./test_files/" + + #Set the default number of files to be generated + num_files = 10 + + create_files(path, num_files) + + #Path to the root directory where the created files are located + path = "./test_files/*" + + #Initialize the file names list + file_list = [] + + #Populate the list of the file names that were generated. Currently we support two levels of the directories + #where files are located + for entry in glob.iglob(path, recursive=True): + if os.path.isfile(entry): + file_list.append(entry) + else: + entry = entry + "/*" + for element in glob.iglob(entry, recursive=True): + if os.path.isfile(element): + file_list.append(element) + + file_list_len = len(file_list) + assert file_list_len == 10 + + + + + diff --git a/azhdankin/upload_files.py b/azhdankin/upload_files.py new file mode 100755 index 00000000..74492586 --- /dev/null +++ b/azhdankin/upload_files.py @@ -0,0 +1,72 @@ +#!python +""" Main module to run to upload the files to the Cloud storage. + +This program establishes the connection to the Cloud Storage (GCP in this case), +reads the names of the files available for upload +and performs parallel upload of the files to the specified Cloud Storage bucket. + +""" +import os +import glob +import time + +from concurrent.futures import ThreadPoolExecutor + +from cloud_storage import CloudStorageGCP +from uploader import FileUploader + +#Path to the root directory where the files to be uploaded are located +path = "./files/*" + +#Initialize the file names list +file_list = [] + +#Populate the list of the file names set for upload. Currently we support two levels of the directories +#where files are located +for entry in glob.iglob(path, recursive=True): + if os.path.isfile(entry): + file_list.append(entry) + else: + entry = entry + "/*" + for element in glob.iglob(entry, recursive=True): + if os.path.isfile(element): + file_list.append(element) + +#Specify the maximum number of the workers that perform files upload simultaneously +MAX_UPLOAD_WORKERS = 100 + +#Calculate the partitioning of the file names list - each partition or chunk will be assigned +#to a single upload worker + +file_list_len = len(file_list) +step = int(file_list_len/MAX_UPLOAD_WORKERS) +remainder = file_list_len%MAX_UPLOAD_WORKERS + +#Initialize a Cloud Storage Provider +storage = CloudStorageGCP("azhdanki-test-bucket1", project='rewotes') + +#Create the Thread Pool which will be used to run the uploader tasks +pool = ThreadPoolExecutor (max_workers=MAX_UPLOAD_WORKERS) + +#Schedule the upload tasks +i=0 +time_start = time.time() + +while i < (file_list_len - remainder): + print(i) + uploader = FileUploader (storage, file_list, i, step) + pool.submit (uploader.run) + i += step + +if remainder > 0: + uploader = FileUploader (storage, file_list, i, remainder) + pool.submit (uploader.run) + +pool.shutdown (wait=True) + +time_end = time.time() +time_delta = time_end - time_start +print ("It took " + str(time_delta) + " seconds to upload " + str(file_list_len) + " files.") + + + diff --git a/azhdankin/uploader.py b/azhdankin/uploader.py new file mode 100644 index 00000000..7347fbb0 --- /dev/null +++ b/azhdankin/uploader.py @@ -0,0 +1,34 @@ +""" +Class performing upload of the files to the cloud storage. +""" + +import os + +class FileUploader: + """ + Constructor is taking the following parameters: + storage - a reference to the instance of CloudStorage object. + a CloudStorage class is a parent class for the cloud storage + provider specific implementations, i.e. CloudStorageGCP, CloudStrageAWS + + files - a reference to the entire list of the file names that need to be uploaded + + start_idx - an index, a "pointer" to the files list specifying where this file uploader + will start + + count - a number which specifies how many files this instance of uploader has to process (upload) + + """ + def __init__(self, storage, files, start_idx, count): + self.storage = storage + self.files = files + self.start = start_idx + self.count = count + + #The method performing the upload of the group of the files to the Cloud storage + def run (self): + for i in range(self.start, self.start + self.count): + object_name = os.path.split(self.files[i])[1] + self.storage.upload_object(object_name, self.files[i]) + +