From 12304f7b42d30ef37c6ca60a7c3c7895cf673f4b Mon Sep 17 00:00:00 2001 From: Jesus Pulido Date: Tue, 16 Dec 2025 12:53:14 -0700 Subject: [PATCH 01/16] added duckdb to sync --- dsi/core.py | 20 +++++++++++++++----- 1 file changed, 15 insertions(+), 5 deletions(-) diff --git a/dsi/core.py b/dsi/core.py index 55466739..7e0957a5 100644 --- a/dsi/core.py +++ b/dsi/core.py @@ -1496,12 +1496,22 @@ def index(self, local_loc, remote_loc, isVerbose=False): print("trying db: ", f) assert os.path.exists(f) - fnull = open(os.devnull, 'w') - with redirect_stdout(fnull): - t.load_module('backend','Sqlite','back-write', filename=f) - + # Detect to see which reader we should use + with open(f, 'rb') as dbfile: + # Detect sqlite + header_bytes = dbfile.read(16) + if header_bytes == b'SQLite format 3\x00': + fnull = open(os.devnull, 'w') + with redirect_stdout(fnull): + t.load_module('backend','Sqlite','back-write', filename=f) + + # Detect duckdb + dbfile.seek(0) + full_header = dbfile.read(12) + if full_header[8:12] == b'DUCK': + t.load_module('backend','DuckDB','back-write', filename=f) except Exception as err: - print(f"Database {f} not found") + print(f"Invalid Database {f}: {err}") raise # See if filesystem exists From 0960e82098855bb8aa10fc5d696312775b6bb386 Mon Sep 17 00:00:00 2001 From: Jesus Pulido Date: Tue, 16 Dec 2025 12:53:41 -0700 Subject: [PATCH 02/16] added suppress --- dsi/core.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/dsi/core.py b/dsi/core.py index 7e0957a5..4f4ee218 100644 --- a/dsi/core.py +++ b/dsi/core.py @@ -1509,7 +1509,9 @@ def index(self, local_loc, remote_loc, isVerbose=False): dbfile.seek(0) full_header = dbfile.read(12) if full_header[8:12] == b'DUCK': - t.load_module('backend','DuckDB','back-write', filename=f) + fnull = open(os.devnull, 'w') + with redirect_stdout(fnull): + t.load_module('backend','DuckDB','back-write', filename=f) except Exception as err: print(f"Invalid Database {f}: {err}") raise From 71b0aae1d705e8bb6f4d9a84bf9b5dce34afeee6 Mon Sep 17 00:00:00 2001 From: Jesus Pulido Date: Tue, 16 Dec 2025 18:52:53 -0700 Subject: [PATCH 03/16] added checks for data move on duckdb --- dsi/backends/duckdb.py | 4 +-- dsi/core.py | 59 ++++++++++++++++++++++++++++-------------- 2 files changed, 42 insertions(+), 21 deletions(-) diff --git a/dsi/backends/duckdb.py b/dsi/backends/duckdb.py index b319fcd4..83241b9c 100644 --- a/dsi/backends/duckdb.py +++ b/dsi/backends/duckdb.py @@ -323,10 +323,10 @@ def ingest_artifacts(self, collection, isVerbose=False): try: self.cur.execute("COMMIT") - self.cur.execute("CHECKPOINT") + self.cur.execute("FORCE CHECKPOINT") except duckdb.Error as e: self.cur.execute("ROLLBACK") - self.cur.execute("CHECKPOINT") + self.cur.execute("FORCE CHECKPOINT") return (duckdb.Error, e) diff --git a/dsi/core.py b/dsi/core.py index 4f4ee218..17af7a68 100644 --- a/dsi/core.py +++ b/dsi/core.py @@ -1489,6 +1489,8 @@ def index(self, local_loc, remote_loc, isVerbose=False): t = Terminal() f = self.project_name+".db" + isDuckDB=False + isSQLite=False try: #f = os.path.join((local_loc, str(self.project_name+".db") )) #f = local_loc+"/"+self.project_name+".db" @@ -1501,21 +1503,25 @@ def index(self, local_loc, remote_loc, isVerbose=False): # Detect sqlite header_bytes = dbfile.read(16) if header_bytes == b'SQLite format 3\x00': - fnull = open(os.devnull, 'w') - with redirect_stdout(fnull): - t.load_module('backend','Sqlite','back-write', filename=f) - + isSQLite=True # Detect duckdb dbfile.seek(0) full_header = dbfile.read(12) if full_header[8:12] == b'DUCK': - fnull = open(os.devnull, 'w') - with redirect_stdout(fnull): - t.load_module('backend','DuckDB','back-write', filename=f) + isDuckDB=True except Exception as err: - print(f"Invalid Database {f}: {err}") + print(f"Database {f} not found: {err}") raise + fnull = open(os.devnull, 'w') + with redirect_stdout(fnull): + if isSQLite: + t.load_module('backend','DuckDB','back-write', filename=f) + elif isDuckDB: + t.load_module('backend','DuckDB','back-write', filename=f) + else: + assert True, "Unsupported Database type!" + # See if filesystem exists fs_t = t.get_table("filesystem") if fs_t.empty: @@ -1543,25 +1549,40 @@ def copy(self, tool="copy", isVerbose=False, **kwargs): # See if FS table has been created t = Terminal() - f = self.project_name - if ".db" not in f: - f = self.project_name+".db" - - + f = self.project_name+".db" + isDuckDB=False + isSQLite=False try: #f = os.path.join((local_loc, str(self.project_name+".db") )) - #f = self.local_location+"/"+self.project_name+".db" + #f = local_loc+"/"+self.project_name+".db" if isVerbose: print("trying db: ", f) assert os.path.exists(f) - fnull = open(os.devnull, 'w') - with redirect_stdout(fnull): - t.load_module('backend','Sqlite','back-read', filename=f) - except Exception: - print(f"Database {f} not found") + # Detect to see which reader we should use + with open(f, 'rb') as dbfile: + # Detect sqlite + header_bytes = dbfile.read(16) + if header_bytes == b'SQLite format 3\x00': + isSQLite=True + # Detect duckdb + dbfile.seek(0) + full_header = dbfile.read(12) + if full_header[8:12] == b'DUCK': + isDuckDB=True + except Exception as err: + print(f"Database {f} not found: {err}") raise + fnull = open(os.devnull, 'w') + with redirect_stdout(fnull): + if isSQLite: + t.load_module('backend','DuckDB','back-write', filename=f) + elif isDuckDB: + t.load_module('backend','DuckDB','back-write', filename=f) + else: + assert True, "Unsupported Database type!" + # See if filesystem exists fs_t = t.get_table("filesystem") if fs_t.empty: From 4d46aab3f4b388c0e04fd9fee15ba509e7bac4d8 Mon Sep 17 00:00:00 2001 From: Jesus Pulido Date: Fri, 9 Jan 2026 14:22:01 -0700 Subject: [PATCH 04/16] added debug info in verbose --- dsi/core.py | 29 +++++++++++++++++++++++------ 1 file changed, 23 insertions(+), 6 deletions(-) diff --git a/dsi/core.py b/dsi/core.py index 17af7a68..a93206cc 100644 --- a/dsi/core.py +++ b/dsi/core.py @@ -14,6 +14,7 @@ import tarfile import subprocess import uuid +import time from contextlib import redirect_stdout class Terminal(): @@ -1419,7 +1420,10 @@ def index(self, local_loc, remote_loc, isVerbose=False): if isVerbose: print("loc: "+local_loc+ " rem: "+remote_loc) # Data Crawl and gather metadata of local location - file_list = self.dircrawl(local_loc) + file_list = self.dircrawl(local_loc, isVerbose) + + if isVerbose: + print("Collected "+str(len(file_list))+" files.") self.remote_location = remote_loc self.local_location = local_loc @@ -1473,6 +1477,8 @@ def index(self, local_loc, remote_loc, isVerbose=False): # Future: iterate through remote/server list here, for now::: remote_list = [ os.path.join(remote_loc,self.project_name) ] for remote in remote_list: + if isVerbose: + print(f"Testing access to '{remote}' directory.") try: # Try for file permissions if os.path.exists(remote): # Check if exists print(f"The directory '{remote}' already exists remotely.") @@ -1538,6 +1544,9 @@ def index(self, local_loc, remote_loc, isVerbose=False): self.file_list = file_list self.rfile_list = rfile_list + if isVerbose: + print("DSI Index complete!") + def move(self, tool="copy", isVerbose=False, **kwargs): self.copy(tool,isVerbose,kwargs) @@ -1728,7 +1737,7 @@ def copy(self, tool="copy", isVerbose=False, **kwargs): raise TypeError(f"Data movement format not supported:, Type: {tool}") - def dircrawl(self,filepath): + def dircrawl(self,filepath, verbose=False): """ Crawls the root 'filepath' directory and returns files @@ -1736,17 +1745,25 @@ def dircrawl(self,filepath): `return`: returns crawled file-list """ + start_time = time.perf_counter() + file_list = [] for root, dirs, files in os.walk(filepath): #if os.path.basename(filepath) != 'tmp': # Lets skip some files # continue + if verbose: + print(f"Crawling directory: {root}") + print(f" Found {len(files)} files, {len(dirs)} subdirectories") for f in files: # Appent root-level files file_list.append(os.path.join(root, f)) - for d in dirs: # Recursively dive into directories - sub_list = self.dircrawl(os.path.join(root, d)) - for sf in sub_list: - file_list.append(sf) + + elapsed = time.perf_counter() - start_time + + if verbose: + print(f"\nFinished crawling: {filepath}") + print(f"Total files found: {len(file_list)}") + print(f"Runtime: {elapsed:.2f} seconds") return file_list From 67762c3fbdf1fba20d1180383da9dfaa9d82c185 Mon Sep 17 00:00:00 2001 From: Jesus Pulido Date: Tue, 13 Jan 2026 12:22:17 -0700 Subject: [PATCH 05/16] added more verbose debug text --- dsi/core.py | 20 ++++++++++++++++++-- 1 file changed, 18 insertions(+), 2 deletions(-) diff --git a/dsi/core.py b/dsi/core.py index a93206cc..202460e1 100644 --- a/dsi/core.py +++ b/dsi/core.py @@ -1423,7 +1423,7 @@ def index(self, local_loc, remote_loc, isVerbose=False): file_list = self.dircrawl(local_loc, isVerbose) if isVerbose: - print("Collected "+str(len(file_list))+" files.") + print("Crawled "+str(len(file_list))+" files.") self.remote_location = remote_loc self.local_location = local_loc @@ -1449,6 +1449,8 @@ def index(self, local_loc, remote_loc, isVerbose=False): st_dict['uuid'] = [] st_dict['file_remote'] = [] + if isVerbose: + print("Collection object [", end="") for file in file_list: rel_file = os.path.relpath(file,local_loc) #rel path #utils.isgroupreadable(file) # quick test @@ -1471,7 +1473,15 @@ def index(self, local_loc, remote_loc, isVerbose=False): st_dict['uuid'].append(self.gen_uuid(st)) st_dict['file_remote'].append(rfilepath) st_list.append(st) + if isVerbose: + last = -10 + progress = int(len(st_list) / len(file_list) * 100) + if progress % 10 == 0 and progress != last: + print(".", end="") + last = progress + if isVerbose: + print("] Collection object created.") # Test remote location validity, try to check access # Future: iterate through remote/server list here, for now::: @@ -1501,7 +1511,7 @@ def index(self, local_loc, remote_loc, isVerbose=False): #f = os.path.join((local_loc, str(self.project_name+".db") )) #f = local_loc+"/"+self.project_name+".db" if isVerbose: - print("trying db: ", f) + print("Trying to open db: ", f) assert os.path.exists(f) # Detect to see which reader we should use @@ -1538,6 +1548,12 @@ def index(self, local_loc, remote_loc, isVerbose=False): with redirect_stdout(fnull): t.load_module('plugin', "Dict", "reader", collection=st_dict, table_name="filesystem") t.artifact_handler(interaction_type='ingest') + else: + # Do nothing for now to prevent a re-index, + if isVerbose: + print("Error: Filesystem table already exists! DSI Index skipped.") + return + t.close() From 05ba1beb5113c94a69a5eed0c1cca71461761293 Mon Sep 17 00:00:00 2001 From: Jesus Pulido Date: Thu, 15 Jan 2026 10:50:24 -0700 Subject: [PATCH 06/16] dircrawl2 and more verbosity added --- dsi/core.py | 50 +++++++++++++++++++++++++++++++++++++++++++------- 1 file changed, 43 insertions(+), 7 deletions(-) diff --git a/dsi/core.py b/dsi/core.py index 202460e1..9147fe7f 100644 --- a/dsi/core.py +++ b/dsi/core.py @@ -15,6 +15,8 @@ import subprocess import uuid import time +import itertools +from typing import Iterator from contextlib import redirect_stdout class Terminal(): @@ -1419,11 +1421,17 @@ def index(self, local_loc, remote_loc, isVerbose=False): """ if isVerbose: print("loc: "+local_loc+ " rem: "+remote_loc) + + # Warn about relative paths (..) may not work + # Data Crawl and gather metadata of local location - file_list = self.dircrawl(local_loc, isVerbose) + file_list = self.dircrawl2(local_loc, isVerbose) + if isVerbose: - print("Crawled "+str(len(file_list))+" files.") + file_list, tmp = itertools.tee(file_list) + file_len=sum(1 for _ in tmp) + print("Crawled "+str(file_len)+" files.") self.remote_location = remote_loc self.local_location = local_loc @@ -1451,6 +1459,8 @@ def index(self, local_loc, remote_loc, isVerbose=False): if isVerbose: print("Collection object [", end="") + last = -10 + for file in file_list: rel_file = os.path.relpath(file,local_loc) #rel path #utils.isgroupreadable(file) # quick test @@ -1474,14 +1484,13 @@ def index(self, local_loc, remote_loc, isVerbose=False): st_dict['file_remote'].append(rfilepath) st_list.append(st) if isVerbose: - last = -10 - progress = int(len(st_list) / len(file_list) * 100) - if progress % 10 == 0 and progress != last: + progress = int(len(st_list) / file_len * 100) + if progress % 5 == 0 and progress != last: print(".", end="") last = progress if isVerbose: - print("] Collection object created.") + print(f"] Collection object created with {len(st_list)} entries.") # Test remote location validity, try to check access # Future: iterate through remote/server list here, for now::: @@ -1520,11 +1529,16 @@ def index(self, local_loc, remote_loc, isVerbose=False): header_bytes = dbfile.read(16) if header_bytes == b'SQLite format 3\x00': isSQLite=True + if isVerbose: + print("Detected SQLiteDB.") + # Detect duckdb dbfile.seek(0) full_header = dbfile.read(12) if full_header[8:12] == b'DUCK': isDuckDB=True + if isVerbose: + print("Detected DuckDB.") except Exception as err: print(f"Database {f} not found: {err}") raise @@ -1782,7 +1796,29 @@ def dircrawl(self,filepath, verbose=False): print(f"Runtime: {elapsed:.2f} seconds") return file_list - + + def dircrawl2(self, root: str, verbose: bool = False) -> Iterator[str]: + start = time.perf_counter() + + # iterative stack avoids deep recursion limits + stack = [root] + while stack: + path = stack.pop() + try: + with os.scandir(path) as it: + for entry in it: + # follow_symlinks=False avoids surprises and extra stat calls + if entry.is_dir(follow_symlinks=False): + stack.append(entry.path) + elif entry.is_file(follow_symlinks=False): + yield entry.path + except (PermissionError, FileNotFoundError): + # permissions/races happen a lot at scale + continue + + if verbose: + print(f"Runtime: {time.perf_counter() - start:.2f} seconds") + def get(self, project_name = "Project"): ''' Helper function that searches remote location based on project name, and retrieves From 6dc3e21b344c3d0e7956ea891b97a68cdd146fb3 Mon Sep 17 00:00:00 2001 From: Jesus Pulido Date: Thu, 15 Jan 2026 10:57:15 -0700 Subject: [PATCH 07/16] added verbose text --- dsi/core.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/dsi/core.py b/dsi/core.py index 9147fe7f..8467d290 100644 --- a/dsi/core.py +++ b/dsi/core.py @@ -1797,11 +1797,11 @@ def dircrawl(self,filepath, verbose=False): return file_list - def dircrawl2(self, root: str, verbose: bool = False) -> Iterator[str]: + def dircrawl2(self, filepath: str, verbose: bool = False) -> Iterator[str]: start = time.perf_counter() # iterative stack avoids deep recursion limits - stack = [root] + stack = [filepath] while stack: path = stack.pop() try: @@ -1817,6 +1817,7 @@ def dircrawl2(self, root: str, verbose: bool = False) -> Iterator[str]: continue if verbose: + print(f"\nFinished crawling: {filepath}") print(f"Runtime: {time.perf_counter() - start:.2f} seconds") def get(self, project_name = "Project"): From e5057f5b1cc1b54aaa7f3ca9bd41401cd02142a4 Mon Sep 17 00:00:00 2001 From: Jesus Pulido Date: Thu, 15 Jan 2026 16:03:43 -0700 Subject: [PATCH 08/16] added more prints --- dsi/core.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/dsi/core.py b/dsi/core.py index 8467d290..a54dbc76 100644 --- a/dsi/core.py +++ b/dsi/core.py @@ -1485,7 +1485,8 @@ def index(self, local_loc, remote_loc, isVerbose=False): st_list.append(st) if isVerbose: progress = int(len(st_list) / file_len * 100) - if progress % 5 == 0 and progress != last: + # Print progress bar every 2% + if progress % 2 == 0 and progress != last: print(".", end="") last = progress @@ -1809,6 +1810,8 @@ def dircrawl2(self, filepath: str, verbose: bool = False) -> Iterator[str]: for entry in it: # follow_symlinks=False avoids surprises and extra stat calls if entry.is_dir(follow_symlinks=False): + if verbose: + print(f"Crawling directory: {entry.path}") stack.append(entry.path) elif entry.is_file(follow_symlinks=False): yield entry.path From 0cb3f1df260315aadb8130ad3dbe013ab038aefe Mon Sep 17 00:00:00 2001 From: Jesus Pulido Date: Tue, 20 Jan 2026 15:08:54 -0700 Subject: [PATCH 09/16] disabled duckdb for test --- dsi/core.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/dsi/core.py b/dsi/core.py index a54dbc76..0557906c 100644 --- a/dsi/core.py +++ b/dsi/core.py @@ -1549,7 +1549,8 @@ def index(self, local_loc, remote_loc, isVerbose=False): if isSQLite: t.load_module('backend','DuckDB','back-write', filename=f) elif isDuckDB: - t.load_module('backend','DuckDB','back-write', filename=f) + assert True, "Duckdb indexing support not available for dsi." + #t.load_module('backend','DuckDB','back-write', filename=f) else: assert True, "Unsupported Database type!" From 9139422a85f772d130807f98ecc1ef2ba3b4410d Mon Sep 17 00:00:00 2001 From: Vedant Iyer Date: Thu, 22 Jan 2026 14:34:13 -0700 Subject: [PATCH 10/16] Update core.py --- dsi/core.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dsi/core.py b/dsi/core.py index 0557906c..d75a6330 100644 --- a/dsi/core.py +++ b/dsi/core.py @@ -1547,7 +1547,7 @@ def index(self, local_loc, remote_loc, isVerbose=False): fnull = open(os.devnull, 'w') with redirect_stdout(fnull): if isSQLite: - t.load_module('backend','DuckDB','back-write', filename=f) + t.load_module('backend','Sqlite','back-write', filename=f) elif isDuckDB: assert True, "Duckdb indexing support not available for dsi." #t.load_module('backend','DuckDB','back-write', filename=f) From a863ca7e50bc2bfe5d74173d01d9efe2235accc8 Mon Sep 17 00:00:00 2001 From: Jesus Pulido Date: Thu, 22 Jan 2026 15:19:27 -0700 Subject: [PATCH 11/16] reenabled duckdb support --- dsi/core.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/dsi/core.py b/dsi/core.py index d75a6330..75bbf556 100644 --- a/dsi/core.py +++ b/dsi/core.py @@ -1549,8 +1549,7 @@ def index(self, local_loc, remote_loc, isVerbose=False): if isSQLite: t.load_module('backend','Sqlite','back-write', filename=f) elif isDuckDB: - assert True, "Duckdb indexing support not available for dsi." - #t.load_module('backend','DuckDB','back-write', filename=f) + t.load_module('backend','DuckDB','back-write', filename=f) else: assert True, "Unsupported Database type!" From 7e957c36d841f87fce1b5c1853a0c599c4b93b7e Mon Sep 17 00:00:00 2001 From: Jesus Pulido Date: Thu, 22 Jan 2026 15:43:56 -0700 Subject: [PATCH 12/16] added conduit checks before moving data --- dsi/core.py | 27 +++++++++++++++++++++++++++ 1 file changed, 27 insertions(+) diff --git a/dsi/core.py b/dsi/core.py index 75bbf556..0e6502dc 100644 --- a/dsi/core.py +++ b/dsi/core.py @@ -1669,6 +1669,33 @@ def copy(self, tool="copy", isVerbose=False, **kwargs): # Data movement via Conduit env = os.environ.copy() + # Test Kerberos + if isVerbose: + print( "Testing: klist") + cmd = ['klist'] + process = subprocess.Popen(cmd, stdin=subprocess.DEVNULL, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, encoding='latin-1') + stdout, stderr = process.communicate() + returncode = process.returncode + + if "No credentials" in stdout: + print("Kerberos authentication error: No credentials found. Please type 'conduit get' to reissue a ticket.") + assert True, print("Kerberos message: " + str(stdout)) + + # Test Conduit status + if isVerbose: + print( "Testing Conduit: conduit get") + cmd = ['/usr/projects/systems/conduit/bin/conduit-cmd','--config','/usr/projects/systems/conduit/conf/conduit-cmd-config.yaml','get'] + process = subprocess.Popen(cmd, stdin=subprocess.DEVNULL, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, encoding='latin-1') + stdout, stderr = process.communicate() + returncode = process.returncode + + if "TRANSFER_ID" in stdout: + if isVerbose: + print("Testing Conduit: conduit is authenticated.") + else: + assert True, print("Conduit Error: " + str(stdout)) + + # Check remote access for permissions and create folder if not os.path.exists(self.remote_location): if isVerbose: print( " mkdir " + self.remote_location) From 5b4dcb1b97f1e266db51398b10e57d90a7aade07 Mon Sep 17 00:00:00 2001 From: Vedant Iyer Date: Thu, 22 Jan 2026 18:49:37 -0700 Subject: [PATCH 13/16] Update core.py --- dsi/core.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dsi/core.py b/dsi/core.py index 0e6502dc..523680b6 100644 --- a/dsi/core.py +++ b/dsi/core.py @@ -1617,7 +1617,7 @@ def copy(self, tool="copy", isVerbose=False, **kwargs): fnull = open(os.devnull, 'w') with redirect_stdout(fnull): if isSQLite: - t.load_module('backend','DuckDB','back-write', filename=f) + t.load_module('backend','Sqlite','back-write', filename=f) elif isDuckDB: t.load_module('backend','DuckDB','back-write', filename=f) else: From 4a2ae1087e12825401689ba95b12dfc5136e9990 Mon Sep 17 00:00:00 2001 From: Vedant Iyer Date: Fri, 23 Jan 2026 18:55:39 -0500 Subject: [PATCH 14/16] fixed sqlite/duckdb col type assignment error --- dsi/backends/duckdb.py | 19 ++++++++++--------- dsi/backends/sqlite.py | 19 ++++++++++--------- 2 files changed, 20 insertions(+), 18 deletions(-) diff --git a/dsi/backends/duckdb.py b/dsi/backends/duckdb.py index 83241b9c..40d9ec51 100644 --- a/dsi/backends/duckdb.py +++ b/dsi/backends/duckdb.py @@ -75,13 +75,13 @@ def sql_type(self, input_list): if all(isinstance(x, int) for x in input_list if x is not None): if any(x < DUCKDB_BIGINT_MIN or x > DUCKDB_BIGINT_MAX for x in input_list if x is not None): - return " DOUBLE" + return " DOUBLE", [float(x) for x in input_list] elif any(x < DUCKDB_INT_MIN or x > DUCKDB_INT_MAX for x in input_list if x is not None): - return " BIGINT" - return " INTEGER" + return " BIGINT", input_list + return " INTEGER", input_list elif all(isinstance(x, float) for x in input_list if x is not None): - return " DOUBLE" - return " VARCHAR" + return " DOUBLE", input_list + return " VARCHAR", [str(x) for x in input_list] def duckdb_compatible_name(self, name): if (name.startswith('"') and name.endswith('"')) or (name.lower() not in self.duckdb_keywords and name.isidentifier()): @@ -119,7 +119,7 @@ def ingest_table_helper(self, types, foreign_query = None, isVerbose=False): for col in diff_cols: if col.lower() in [c.lower() for c in query_cols]: return (ValueError, "Cannot have duplicate column names") - temp_name = col + self.sql_type(types.properties[col]) + temp_name = col + self.sql_type(types.properties[col])[0] try: self.cur.execute(f"ALTER TABLE {types.name} ADD COLUMN {temp_name};") except duckdb.Error as e: @@ -269,12 +269,13 @@ def ingest_artifacts(self, collection, isVerbose=False): primaryTuple = artifacts[dsi_name]['primary_key'][foreignIndex] foreign_query += f", FOREIGN KEY ({sql_key}) REFERENCES {primaryTuple[0]} ({primaryTuple[1]})" - types.properties[sql_key] = tableData[key] + col_type, col_list = self.sql_type(tableData[key]) + types.properties[sql_key] = col_list if dsi_name in artifacts.keys() and comboTuple in artifacts[dsi_name]["primary_key"]: - types.unit_keys.append(sql_key + self.sql_type(tableData[key]) + " PRIMARY KEY") + types.unit_keys.append(sql_key + col_type + " PRIMARY KEY") else: - types.unit_keys.append(sql_key + self.sql_type(tableData[key])) + types.unit_keys.append(sql_key + col_type) error = self.ingest_table_helper(types, foreign_query) if error is not None: diff --git a/dsi/backends/sqlite.py b/dsi/backends/sqlite.py index 5872fe75..972a92e0 100644 --- a/dsi/backends/sqlite.py +++ b/dsi/backends/sqlite.py @@ -88,11 +88,11 @@ def sql_type(self, input_list): if all(isinstance(x, int) for x in input_list if x is not None): if any(x < SQLITE_INT_MIN or x > SQLITE_INT_MAX for x in input_list if x is not None): - return " FLOAT" - return " INTEGER" + return " FLOAT", [float(x) for x in input_list] + return " INTEGER", input_list elif all(isinstance(x, float) for x in input_list if x is not None): - return " FLOAT" - return " VARCHAR" + return " FLOAT", input_list + return " VARCHAR", [str(x) for x in input_list] def sqlite_compatible_name(self, name): if (name.startswith('"') and name.endswith('"')) or (name.upper() not in self.sqlite_keywords and name.isidentifier()): @@ -127,7 +127,7 @@ def ingest_table_helper(self, types, foreign_query = None, isVerbose=False): for col in diff_cols: if col.lower() in [c.lower() for c in query_cols]: return (ValueError, "Cannot have duplicate column names") - temp_name = col + self.sql_type(types.properties[col]) + temp_name = col + self.sql_type(types.properties[col])[0] try: self.cur.execute(f"ALTER TABLE {types.name} ADD COLUMN {temp_name};") except sqlite3.Error as e: @@ -287,12 +287,13 @@ def ingest_artifacts(self, collection, isVerbose=False): primaryTuple = artifacts[dsi_name]['primary_key'][foreignIndex] foreign_query += f", FOREIGN KEY ({sql_key}) REFERENCES {primaryTuple[0]} ({primaryTuple[1]})" - types.properties[sql_key] = tableData[key] - + col_type, col_list = self.sql_type(tableData[key]) + types.properties[sql_key] = col_list + if dsi_name in artifacts.keys() and comboTuple in artifacts[dsi_name]["primary_key"]: - types.unit_keys.append(sql_key + self.sql_type(tableData[key]) + " PRIMARY KEY") + types.unit_keys.append(sql_key + col_type + " PRIMARY KEY") else: - types.unit_keys.append(sql_key + self.sql_type(tableData[key])) + types.unit_keys.append(sql_key + col_type) error = self.ingest_table_helper(types, foreign_query) if error is not None: From ff892fb4309d5d29658bd9f7174bcb5ca7fc7b42 Mon Sep 17 00:00:00 2001 From: Vedant Iyer Date: Tue, 27 Jan 2026 15:56:55 -0500 Subject: [PATCH 15/16] merged main into issue208 --- dsi/backends/duckdb.py | 110 ++++++++++++++++++++++++++++++++++++++++- dsi/backends/sqlite.py | 4 +- 2 files changed, 110 insertions(+), 4 deletions(-) diff --git a/dsi/backends/duckdb.py b/dsi/backends/duckdb.py index 40d9ec51..1bd8248b 100644 --- a/dsi/backends/duckdb.py +++ b/dsi/backends/duckdb.py @@ -56,6 +56,88 @@ def __init__(self, filename): filtered_df = keywords_df[keywords_df['keyword_category'] != 'unreserved'] self.duckdb_keywords = filtered_df["keyword_name"].tolist() + # def sql_type_helper(self, input_list, recursive = True): + # """ + # **Internal use only. Do not call** + + # Helper function that evaluates a list for ints, floats and strings. Can be called recursively in sql_type() + + # `input_list` : list + # A list of values to analyze for type compatibility. + + # `recursive` : bool, default=True + # Boolean indicating whether the return object is just the column type string or is a tuple + + # `return`: str or (str, list) + # If a string, it represents the inferred DuckDB data type for the input list. + # If a tuple, it is the (DuckDB data type for the input list, input list with any type changes) + # """ + # DUCKDB_BIGINT_MIN = -9223372036854775808 + # DUCKDB_BIGINT_MAX = 9223372036854775807 + # DUCKDB_INT_MIN = -2147483648 + # DUCKDB_INT_MAX = 2147483647 + + # if all(isinstance(x, int) for x in input_list if x is not None): + # if any(x < DUCKDB_BIGINT_MIN or x > DUCKDB_BIGINT_MAX for x in input_list if x is not None): + # if recursive: + # return " DOUBLE" + # return " DOUBLE", [float(x) if x is not None else x for x in input_list] + # elif any(x < DUCKDB_INT_MIN or x > DUCKDB_INT_MAX for x in input_list if x is not None): + # if recursive: + # return " BIGINT" + # return " BIGINT", input_list + # if recursive: + # return " INTEGER" + # return " INTEGER", input_list + # elif all(isinstance(x, float) for x in input_list if x is not None): + # if recursive: + # return " DOUBLE" + # return " DOUBLE", input_list + # if recursive: + # return " VARCHAR" + # return " VARCHAR", [str(x) if x is not None else x for x in input_list] + + # def sql_type(self, input_list): + # """ + # **Internal use only. Do not call** + + # Evaluates a list and returns the predicted compatible DuckDB Type + + # `input_list` : list + # A list of values to analyze for type compatibility. + + # `return`: str + # A string representing the inferred DuckDB data type for the input list. + # """ + # if all(isinstance(x, dict) for x in input_list if x is not None): + # # Find the superset of all keys in the list of dicts. + # all_keys = set() + # for x in input_list: + # all_keys = all_keys | set(x.keys()) + + # # Recursively find types of each field in the dict. + # type_list = "" + # for parent_key, child_key in all_keys: + # col_type, col_list = self.sql_type(input_list=input_list[parent_key][child_key], recursive=False) + # input_list[parent_key][child_key] = col_list + # type_list += f"{child_key} {col_type}, " + + # # Return STRUCT type with the column types found above. + # return f" STRUCT({type_list[:-2]})", input_list + # elif all(isinstance(x, list) for x in input_list if x is not None): + # # Find type of list elements by recursively calling this self.sql_type(). + # type_list = [self.sql_type_helper(input_list=l, recursive=True) for l in input_list if l is not None] + + # # If all types are the same (i.e., list is homogeneous), add to table as a list. + # # Otherwise, fallback on VARCHAR. + # if all(t == type_list[0] for t in type_list): + # return f"{type_list[0]}[]", input_list + # else: + # return " VARCHAR", [str(x) if x is not None else x for x in input_list] + # else: + # return self.sql_type_helper(input_list, recursive=False) + + def sql_type(self, input_list): """ **Internal use only. Do not call** @@ -75,13 +157,37 @@ def sql_type(self, input_list): if all(isinstance(x, int) for x in input_list if x is not None): if any(x < DUCKDB_BIGINT_MIN or x > DUCKDB_BIGINT_MAX for x in input_list if x is not None): - return " DOUBLE", [float(x) for x in input_list] + return " DOUBLE", [float(x) if x is not None else x for x in input_list] elif any(x < DUCKDB_INT_MIN or x > DUCKDB_INT_MAX for x in input_list if x is not None): return " BIGINT", input_list return " INTEGER", input_list elif all(isinstance(x, float) for x in input_list if x is not None): return " DOUBLE", input_list - return " VARCHAR", [str(x) for x in input_list] + elif all(isinstance(x, dict) for x in input_list if x is not None): + # Find the superset of all keys in the list of dicts. + all_keys = set() + for x in input_list: + all_keys = all_keys | set(x.keys()) + + # Recursively find types of each field in the dict. + type_list = "" + for k in all_keys: + t = self.sql_type([l[k] for l in input_list]) + type_list += f"{k} {t}, " + + # Return STRUCT type with the column types found above. + return f" STRUCT({type_list[:-2]})", input_list + elif all(isinstance(x, list) for x in input_list if x is not None): + # Find type of list elements by recursively calling this self.sql_type(). + type_list = [self.sql_type(input_list=l) for l in input_list] + + # If all types are the same (i.e., list is homogeneous), add to table as a list. + # Otherwise, fallback on VARCHAR. + if all(t == type_list[0] for t in type_list): + return f"{type_list[0]}[]", input_list + else: + return " VARCHAR", [str(x) if x is not None else x for x in input_list] + return " VARCHAR", [str(x) if x is not None else x for x in input_list] def duckdb_compatible_name(self, name): if (name.startswith('"') and name.endswith('"')) or (name.lower() not in self.duckdb_keywords and name.isidentifier()): diff --git a/dsi/backends/sqlite.py b/dsi/backends/sqlite.py index 972a92e0..a051775f 100644 --- a/dsi/backends/sqlite.py +++ b/dsi/backends/sqlite.py @@ -88,11 +88,11 @@ def sql_type(self, input_list): if all(isinstance(x, int) for x in input_list if x is not None): if any(x < SQLITE_INT_MIN or x > SQLITE_INT_MAX for x in input_list if x is not None): - return " FLOAT", [float(x) for x in input_list] + return " FLOAT", [float(x) if x is not None else x for x in input_list] return " INTEGER", input_list elif all(isinstance(x, float) for x in input_list if x is not None): return " FLOAT", input_list - return " VARCHAR", [str(x) for x in input_list] + return " VARCHAR", [str(x) if x is not None else x for x in input_list] def sqlite_compatible_name(self, name): if (name.startswith('"') and name.endswith('"')) or (name.upper() not in self.sqlite_keywords and name.isidentifier()): From ed27b1aadfe8dd6b37079548efddae9e38412610 Mon Sep 17 00:00:00 2001 From: Jesus Pulido Date: Tue, 27 Jan 2026 14:34:41 -0700 Subject: [PATCH 16/16] updated user example, added move example --- examples/user/10.move.py | 11 +++++++++++ examples/user/2a.read.py | 2 +- 2 files changed, 12 insertions(+), 1 deletion(-) create mode 100644 examples/user/10.move.py diff --git a/examples/user/10.move.py b/examples/user/10.move.py new file mode 100644 index 00000000..fb3f25ea --- /dev/null +++ b/examples/user/10.move.py @@ -0,0 +1,11 @@ +from dsi.core import Sync + +# Origin location of data +local_files = "/Users/Shared/dsi/examples/clover3d/" +# Remote Location where database and data will be moved +remote_path = "/Users/Shared/staging/" + +# Create Sync type with project database name +s = Sync("clover3d") +s.index(local_files,remote_path,True) +s.copy("cp",True) \ No newline at end of file diff --git a/examples/user/2a.read.py b/examples/user/2a.read.py index b2760dca..c67ab933 100644 --- a/examples/user/2a.read.py +++ b/examples/user/2a.read.py @@ -1,7 +1,7 @@ # examples/user/2a.read.py from dsi.dsi import DSI -read_dsi = DSI("data.db") # Target a backend, defaults to SQLite if not defined +read_dsi = DSI("clover3d.db") # Target a backend, defaults to SQLite if not defined #dsi.read(path, reader) read_dsi.read("../clover3d/", 'Cloverleaf') # Read data into memory