diff --git a/dsi/backends/duckdb.py b/dsi/backends/duckdb.py index 8b6566fe..df4b14bc 100644 --- a/dsi/backends/duckdb.py +++ b/dsi/backends/duckdb.py @@ -56,6 +56,88 @@ def __init__(self, filename): filtered_df = keywords_df[keywords_df['keyword_category'] != 'unreserved'] self.duckdb_keywords = filtered_df["keyword_name"].tolist() + # def sql_type_helper(self, input_list, recursive = True): + # """ + # **Internal use only. Do not call** + + # Helper function that evaluates a list for ints, floats and strings. Can be called recursively in sql_type() + + # `input_list` : list + # A list of values to analyze for type compatibility. + + # `recursive` : bool, default=True + # Boolean indicating whether the return object is just the column type string or is a tuple + + # `return`: str or (str, list) + # If a string, it represents the inferred DuckDB data type for the input list. + # If a tuple, it is the (DuckDB data type for the input list, input list with any type changes) + # """ + # DUCKDB_BIGINT_MIN = -9223372036854775808 + # DUCKDB_BIGINT_MAX = 9223372036854775807 + # DUCKDB_INT_MIN = -2147483648 + # DUCKDB_INT_MAX = 2147483647 + + # if all(isinstance(x, int) for x in input_list if x is not None): + # if any(x < DUCKDB_BIGINT_MIN or x > DUCKDB_BIGINT_MAX for x in input_list if x is not None): + # if recursive: + # return " DOUBLE" + # return " DOUBLE", [float(x) if x is not None else x for x in input_list] + # elif any(x < DUCKDB_INT_MIN or x > DUCKDB_INT_MAX for x in input_list if x is not None): + # if recursive: + # return " BIGINT" + # return " BIGINT", input_list + # if recursive: + # return " INTEGER" + # return " INTEGER", input_list + # elif all(isinstance(x, float) for x in input_list if x is not None): + # if recursive: + # return " DOUBLE" + # return " DOUBLE", input_list + # if recursive: + # return " VARCHAR" + # return " VARCHAR", [str(x) if x is not None else x for x in input_list] + + # def sql_type(self, input_list): + # """ + # **Internal use only. Do not call** + + # Evaluates a list and returns the predicted compatible DuckDB Type + + # `input_list` : list + # A list of values to analyze for type compatibility. + + # `return`: str + # A string representing the inferred DuckDB data type for the input list. + # """ + # if all(isinstance(x, dict) for x in input_list if x is not None): + # # Find the superset of all keys in the list of dicts. + # all_keys = set() + # for x in input_list: + # all_keys = all_keys | set(x.keys()) + + # # Recursively find types of each field in the dict. + # type_list = "" + # for parent_key, child_key in all_keys: + # col_type, col_list = self.sql_type(input_list=input_list[parent_key][child_key], recursive=False) + # input_list[parent_key][child_key] = col_list + # type_list += f"{child_key} {col_type}, " + + # # Return STRUCT type with the column types found above. + # return f" STRUCT({type_list[:-2]})", input_list + # elif all(isinstance(x, list) for x in input_list if x is not None): + # # Find type of list elements by recursively calling this self.sql_type(). + # type_list = [self.sql_type_helper(input_list=l, recursive=True) for l in input_list if l is not None] + + # # If all types are the same (i.e., list is homogeneous), add to table as a list. + # # Otherwise, fallback on VARCHAR. + # if all(t == type_list[0] for t in type_list): + # return f"{type_list[0]}[]", input_list + # else: + # return " VARCHAR", [str(x) if x is not None else x for x in input_list] + # else: + # return self.sql_type_helper(input_list, recursive=False) + + def sql_type(self, input_list): """ **Internal use only. Do not call** @@ -75,12 +157,12 @@ def sql_type(self, input_list): if all(isinstance(x, int) for x in input_list if x is not None): if any(x < DUCKDB_BIGINT_MIN or x > DUCKDB_BIGINT_MAX for x in input_list if x is not None): - return " DOUBLE" + return " DOUBLE", [float(x) if x is not None else x for x in input_list] elif any(x < DUCKDB_INT_MIN or x > DUCKDB_INT_MAX for x in input_list if x is not None): - return " BIGINT" - return " INTEGER" + return " BIGINT", input_list + return " INTEGER", input_list elif all(isinstance(x, float) for x in input_list if x is not None): - return " DOUBLE" + return " DOUBLE", input_list elif all(isinstance(x, dict) for x in input_list if x is not None): # Find the superset of all keys in the list of dicts. all_keys = set() @@ -94,7 +176,7 @@ def sql_type(self, input_list): type_list += f"{k} {t}, " # Return STRUCT type with the column types found above. - return f" STRUCT({type_list[:-2]})" + return f" STRUCT({type_list[:-2]})", input_list elif all(isinstance(x, list) for x in input_list if x is not None): # Find type of list elements by recursively calling this self.sql_type(). type_list = [self.sql_type(input_list=l) for l in input_list] @@ -102,10 +184,10 @@ def sql_type(self, input_list): # If all types are the same (i.e., list is homogeneous), add to table as a list. # Otherwise, fallback on VARCHAR. if all(t == type_list[0] for t in type_list): - return type_list[0] + "[]" + return f"{type_list[0]}[]", input_list else: - return " VARCHAR" - return " VARCHAR" + return " VARCHAR", [str(x) if x is not None else x for x in input_list] + return " VARCHAR", [str(x) if x is not None else x for x in input_list] def duckdb_compatible_name(self, name): if (name.startswith('"') and name.endswith('"')) or (name.lower() not in self.duckdb_keywords and name.isidentifier()): @@ -143,7 +225,7 @@ def ingest_table_helper(self, types, foreign_query = None, isVerbose=False): for col in diff_cols: if col.lower() in [c.lower() for c in query_cols]: return (ValueError, "Cannot have duplicate column names") - temp_name = col + self.sql_type(types.properties[col]) + temp_name = col + self.sql_type(types.properties[col])[0] try: self.cur.execute(f"ALTER TABLE {types.name} ADD COLUMN {temp_name};") except duckdb.Error as e: @@ -295,12 +377,13 @@ def ingest_artifacts(self, collection, isVerbose=False): primary_col = self.duckdb_compatible_name(re.sub(r'[\r\n]+', ' ', primaryTuple[1].replace('-', '_'))) foreign_query += f", FOREIGN KEY ({sql_key}) REFERENCES {primary_table} ({primary_col})" - types.properties[sql_key] = tableData[key] + col_type, col_list = self.sql_type(tableData[key]) + types.properties[sql_key] = col_list if dsi_name in artifacts.keys() and comboTuple in artifacts[dsi_name]["primary_key"]: - types.unit_keys.append(sql_key + self.sql_type(tableData[key]) + " PRIMARY KEY") + types.unit_keys.append(sql_key + col_type + " PRIMARY KEY") else: - types.unit_keys.append(sql_key + self.sql_type(tableData[key])) + types.unit_keys.append(sql_key + col_type) error = self.ingest_table_helper(types, foreign_query) if error is not None: @@ -349,10 +432,10 @@ def ingest_artifacts(self, collection, isVerbose=False): try: self.cur.execute("COMMIT") - self.cur.execute("CHECKPOINT") + self.cur.execute("FORCE CHECKPOINT") except duckdb.Error as e: self.cur.execute("ROLLBACK") - self.cur.execute("CHECKPOINT") + self.cur.execute("FORCE CHECKPOINT") return (duckdb.Error, e) diff --git a/dsi/backends/sqlite.py b/dsi/backends/sqlite.py index 112e9a0b..58c06561 100644 --- a/dsi/backends/sqlite.py +++ b/dsi/backends/sqlite.py @@ -88,11 +88,11 @@ def sql_type(self, input_list): if all(isinstance(x, int) for x in input_list if x is not None): if any(x < SQLITE_INT_MIN or x > SQLITE_INT_MAX for x in input_list if x is not None): - return " FLOAT" - return " INTEGER" + return " FLOAT", [float(x) if x is not None else x for x in input_list] + return " INTEGER", input_list elif all(isinstance(x, float) for x in input_list if x is not None): - return " FLOAT" - return " VARCHAR" + return " FLOAT", input_list + return " VARCHAR", [str(x) if x is not None else x for x in input_list] def sqlite_compatible_name(self, name): if (name.startswith('"') and name.endswith('"')) or (name.upper() not in self.sqlite_keywords and name.isidentifier()): @@ -127,7 +127,7 @@ def ingest_table_helper(self, types, foreign_query = None, isVerbose=False): for col in diff_cols: if col.lower() in [c.lower() for c in query_cols]: return (ValueError, "Cannot have duplicate column names") - temp_name = col + self.sql_type(types.properties[col]) + temp_name = col + self.sql_type(types.properties[col])[0] try: self.cur.execute(f"ALTER TABLE {types.name} ADD COLUMN {temp_name};") except sqlite3.Error as e: @@ -289,12 +289,13 @@ def ingest_artifacts(self, collection, isVerbose=False): primary_col = self.sqlite_compatible_name(re.sub(r'[\r\n]+', ' ', primaryTuple[1].replace('-', '_'))) foreign_query += f", FOREIGN KEY ({sql_key}) REFERENCES {primary_table} ({primary_col})" - types.properties[sql_key] = tableData[key] - + col_type, col_list = self.sql_type(tableData[key]) + types.properties[sql_key] = col_list + if dsi_name in artifacts.keys() and comboTuple in artifacts[dsi_name]["primary_key"]: - types.unit_keys.append(sql_key + self.sql_type(tableData[key]) + " PRIMARY KEY") + types.unit_keys.append(sql_key + col_type + " PRIMARY KEY") else: - types.unit_keys.append(sql_key + self.sql_type(tableData[key])) + types.unit_keys.append(sql_key + col_type) error = self.ingest_table_helper(types, foreign_query) if error is not None: diff --git a/dsi/core.py b/dsi/core.py index aa0b515e..44b8ac17 100644 --- a/dsi/core.py +++ b/dsi/core.py @@ -14,6 +14,9 @@ import tarfile import subprocess import uuid +import time +import itertools +from typing import Iterator from contextlib import redirect_stdout class Terminal(): @@ -1428,8 +1431,17 @@ def index(self, local_loc, remote_loc, isVerbose=False): """ if isVerbose: print("loc: "+local_loc+ " rem: "+remote_loc) + + # Warn about relative paths (..) may not work + # Data Crawl and gather metadata of local location - file_list = self.dircrawl(local_loc) + file_list = self.dircrawl2(local_loc, isVerbose) + + + if isVerbose: + file_list, tmp = itertools.tee(file_list) + file_len=sum(1 for _ in tmp) + print("Crawled "+str(file_len)+" files.") self.remote_location = remote_loc self.local_location = local_loc @@ -1455,6 +1467,10 @@ def index(self, local_loc, remote_loc, isVerbose=False): st_dict['uuid'] = [] st_dict['file_remote'] = [] + if isVerbose: + print("Collection object [", end="") + last = -10 + for file in file_list: rel_file = os.path.relpath(file,local_loc) #rel path #utils.isgroupreadable(file) # quick test @@ -1477,12 +1493,22 @@ def index(self, local_loc, remote_loc, isVerbose=False): st_dict['uuid'].append(self.gen_uuid(st)) st_dict['file_remote'].append(rfilepath) st_list.append(st) + if isVerbose: + progress = int(len(st_list) / file_len * 100) + # Print progress bar every 2% + if progress % 2 == 0 and progress != last: + print(".", end="") + last = progress + if isVerbose: + print(f"] Collection object created with {len(st_list)} entries.") # Test remote location validity, try to check access # Future: iterate through remote/server list here, for now::: remote_list = [ os.path.join(remote_loc,self.project_name) ] for remote in remote_list: + if isVerbose: + print(f"Testing access to '{remote}' directory.") try: # Try for file permissions if os.path.exists(remote): # Check if exists print(f"The directory '{remote}' already exists remotely.") @@ -1499,21 +1525,44 @@ def index(self, local_loc, remote_loc, isVerbose=False): t = Terminal() f = self.project_name+".db" + isDuckDB=False + isSQLite=False try: #f = os.path.join((local_loc, str(self.project_name+".db") )) #f = local_loc+"/"+self.project_name+".db" if isVerbose: - print("trying db: ", f) + print("Trying to open db: ", f) assert os.path.exists(f) - fnull = open(os.devnull, 'w') - with redirect_stdout(fnull): - t.load_module('backend','Sqlite','back-write', filename=f) - + # Detect to see which reader we should use + with open(f, 'rb') as dbfile: + # Detect sqlite + header_bytes = dbfile.read(16) + if header_bytes == b'SQLite format 3\x00': + isSQLite=True + if isVerbose: + print("Detected SQLiteDB.") + + # Detect duckdb + dbfile.seek(0) + full_header = dbfile.read(12) + if full_header[8:12] == b'DUCK': + isDuckDB=True + if isVerbose: + print("Detected DuckDB.") except Exception as err: - print(f"Database {f} not found") + print(f"Database {f} not found: {err}") raise + fnull = open(os.devnull, 'w') + with redirect_stdout(fnull): + if isSQLite: + t.load_module('backend','Sqlite','back-write', filename=f) + elif isDuckDB: + t.load_module('backend','DuckDB','back-write', filename=f) + else: + assert True, "Unsupported Database type!" + # See if filesystem exists fs_t = t.get_table("filesystem") if fs_t.empty: @@ -1524,12 +1573,21 @@ def index(self, local_loc, remote_loc, isVerbose=False): with redirect_stdout(fnull): t.load_module('plugin', "Dict", "reader", collection=st_dict, table_name="filesystem") t.artifact_handler(interaction_type='ingest') + else: + # Do nothing for now to prevent a re-index, + if isVerbose: + print("Error: Filesystem table already exists! DSI Index skipped.") + return + t.close() self.file_list = file_list self.rfile_list = rfile_list + if isVerbose: + print("DSI Index complete!") + def move(self, tool="copy", isVerbose=False, **kwargs): self.copy(tool,isVerbose,kwargs) @@ -1541,25 +1599,40 @@ def copy(self, tool="copy", isVerbose=False, **kwargs): # See if FS table has been created t = Terminal() - f = self.project_name - if ".db" not in f: - f = self.project_name+".db" - - + f = self.project_name+".db" + isDuckDB=False + isSQLite=False try: #f = os.path.join((local_loc, str(self.project_name+".db") )) - #f = self.local_location+"/"+self.project_name+".db" + #f = local_loc+"/"+self.project_name+".db" if isVerbose: print("trying db: ", f) assert os.path.exists(f) - fnull = open(os.devnull, 'w') - with redirect_stdout(fnull): - t.load_module('backend','Sqlite','back-read', filename=f) - except Exception: - print(f"Database {f} not found") + # Detect to see which reader we should use + with open(f, 'rb') as dbfile: + # Detect sqlite + header_bytes = dbfile.read(16) + if header_bytes == b'SQLite format 3\x00': + isSQLite=True + # Detect duckdb + dbfile.seek(0) + full_header = dbfile.read(12) + if full_header[8:12] == b'DUCK': + isDuckDB=True + except Exception as err: + print(f"Database {f} not found: {err}") raise + fnull = open(os.devnull, 'w') + with redirect_stdout(fnull): + if isSQLite: + t.load_module('backend','Sqlite','back-write', filename=f) + elif isDuckDB: + t.load_module('backend','DuckDB','back-write', filename=f) + else: + assert True, "Unsupported Database type!" + # See if filesystem exists fs_t = t.get_table("filesystem") if fs_t.empty: @@ -1606,6 +1679,33 @@ def copy(self, tool="copy", isVerbose=False, **kwargs): # Data movement via Conduit env = os.environ.copy() + # Test Kerberos + if isVerbose: + print( "Testing: klist") + cmd = ['klist'] + process = subprocess.Popen(cmd, stdin=subprocess.DEVNULL, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, encoding='latin-1') + stdout, stderr = process.communicate() + returncode = process.returncode + + if "No credentials" in stdout: + print("Kerberos authentication error: No credentials found. Please type 'conduit get' to reissue a ticket.") + assert True, print("Kerberos message: " + str(stdout)) + + # Test Conduit status + if isVerbose: + print( "Testing Conduit: conduit get") + cmd = ['/usr/projects/systems/conduit/bin/conduit-cmd','--config','/usr/projects/systems/conduit/conf/conduit-cmd-config.yaml','get'] + process = subprocess.Popen(cmd, stdin=subprocess.DEVNULL, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, encoding='latin-1') + stdout, stderr = process.communicate() + returncode = process.returncode + + if "TRANSFER_ID" in stdout: + if isVerbose: + print("Testing Conduit: conduit is authenticated.") + else: + assert True, print("Conduit Error: " + str(stdout)) + + # Check remote access for permissions and create folder if not os.path.exists(self.remote_location): if isVerbose: print( " mkdir " + self.remote_location) @@ -1705,7 +1805,7 @@ def copy(self, tool="copy", isVerbose=False, **kwargs): raise TypeError(f"Data movement format not supported:, Type: {tool}") - def dircrawl(self,filepath): + def dircrawl(self,filepath, verbose=False): """ Crawls the root 'filepath' directory and returns files @@ -1713,20 +1813,53 @@ def dircrawl(self,filepath): `return`: returns crawled file-list """ + start_time = time.perf_counter() + file_list = [] for root, dirs, files in os.walk(filepath): #if os.path.basename(filepath) != 'tmp': # Lets skip some files # continue + if verbose: + print(f"Crawling directory: {root}") + print(f" Found {len(files)} files, {len(dirs)} subdirectories") for f in files: # Appent root-level files file_list.append(os.path.join(root, f)) - for d in dirs: # Recursively dive into directories - sub_list = self.dircrawl(os.path.join(root, d)) - for sf in sub_list: - file_list.append(sf) + + elapsed = time.perf_counter() - start_time + + if verbose: + print(f"\nFinished crawling: {filepath}") + print(f"Total files found: {len(file_list)}") + print(f"Runtime: {elapsed:.2f} seconds") return file_list - + + def dircrawl2(self, filepath: str, verbose: bool = False) -> Iterator[str]: + start = time.perf_counter() + + # iterative stack avoids deep recursion limits + stack = [filepath] + while stack: + path = stack.pop() + try: + with os.scandir(path) as it: + for entry in it: + # follow_symlinks=False avoids surprises and extra stat calls + if entry.is_dir(follow_symlinks=False): + if verbose: + print(f"Crawling directory: {entry.path}") + stack.append(entry.path) + elif entry.is_file(follow_symlinks=False): + yield entry.path + except (PermissionError, FileNotFoundError): + # permissions/races happen a lot at scale + continue + + if verbose: + print(f"\nFinished crawling: {filepath}") + print(f"Runtime: {time.perf_counter() - start:.2f} seconds") + def get(self, project_name = "Project"): ''' Helper function that searches remote location based on project name, and retrieves diff --git a/examples/user/10.move.py b/examples/user/10.move.py new file mode 100644 index 00000000..fb3f25ea --- /dev/null +++ b/examples/user/10.move.py @@ -0,0 +1,11 @@ +from dsi.core import Sync + +# Origin location of data +local_files = "/Users/Shared/dsi/examples/clover3d/" +# Remote Location where database and data will be moved +remote_path = "/Users/Shared/staging/" + +# Create Sync type with project database name +s = Sync("clover3d") +s.index(local_files,remote_path,True) +s.copy("cp",True) \ No newline at end of file diff --git a/examples/user/2a.read.py b/examples/user/2a.read.py index b2760dca..c67ab933 100644 --- a/examples/user/2a.read.py +++ b/examples/user/2a.read.py @@ -1,7 +1,7 @@ # examples/user/2a.read.py from dsi.dsi import DSI -read_dsi = DSI("data.db") # Target a backend, defaults to SQLite if not defined +read_dsi = DSI("clover3d.db") # Target a backend, defaults to SQLite if not defined #dsi.read(path, reader) read_dsi.read("../clover3d/", 'Cloverleaf') # Read data into memory