Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
111 changes: 97 additions & 14 deletions dsi/backends/duckdb.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,88 @@ def __init__(self, filename):
filtered_df = keywords_df[keywords_df['keyword_category'] != 'unreserved']
self.duckdb_keywords = filtered_df["keyword_name"].tolist()

# def sql_type_helper(self, input_list, recursive = True):
# """
# **Internal use only. Do not call**

# Helper function that evaluates a list for ints, floats and strings. Can be called recursively in sql_type()

# `input_list` : list
# A list of values to analyze for type compatibility.

# `recursive` : bool, default=True
# Boolean indicating whether the return object is just the column type string or is a tuple

# `return`: str or (str, list)
# If a string, it represents the inferred DuckDB data type for the input list.
# If a tuple, it is the (DuckDB data type for the input list, input list with any type changes)
# """
# DUCKDB_BIGINT_MIN = -9223372036854775808
# DUCKDB_BIGINT_MAX = 9223372036854775807
# DUCKDB_INT_MIN = -2147483648
# DUCKDB_INT_MAX = 2147483647

# if all(isinstance(x, int) for x in input_list if x is not None):
# if any(x < DUCKDB_BIGINT_MIN or x > DUCKDB_BIGINT_MAX for x in input_list if x is not None):
# if recursive:
# return " DOUBLE"
# return " DOUBLE", [float(x) if x is not None else x for x in input_list]
# elif any(x < DUCKDB_INT_MIN or x > DUCKDB_INT_MAX for x in input_list if x is not None):
# if recursive:
# return " BIGINT"
# return " BIGINT", input_list
# if recursive:
# return " INTEGER"
# return " INTEGER", input_list
# elif all(isinstance(x, float) for x in input_list if x is not None):
# if recursive:
# return " DOUBLE"
# return " DOUBLE", input_list
# if recursive:
# return " VARCHAR"
# return " VARCHAR", [str(x) if x is not None else x for x in input_list]

# def sql_type(self, input_list):
# """
# **Internal use only. Do not call**

# Evaluates a list and returns the predicted compatible DuckDB Type

# `input_list` : list
# A list of values to analyze for type compatibility.

# `return`: str
# A string representing the inferred DuckDB data type for the input list.
# """
# if all(isinstance(x, dict) for x in input_list if x is not None):
# # Find the superset of all keys in the list of dicts.
# all_keys = set()
# for x in input_list:
# all_keys = all_keys | set(x.keys())

# # Recursively find types of each field in the dict.
# type_list = ""
# for parent_key, child_key in all_keys:
# col_type, col_list = self.sql_type(input_list=input_list[parent_key][child_key], recursive=False)
# input_list[parent_key][child_key] = col_list
# type_list += f"{child_key} {col_type}, "

# # Return STRUCT type with the column types found above.
# return f" STRUCT({type_list[:-2]})", input_list
# elif all(isinstance(x, list) for x in input_list if x is not None):
# # Find type of list elements by recursively calling this self.sql_type().
# type_list = [self.sql_type_helper(input_list=l, recursive=True) for l in input_list if l is not None]

# # If all types are the same (i.e., list is homogeneous), add to table as a list.
# # Otherwise, fallback on VARCHAR.
# if all(t == type_list[0] for t in type_list):
# return f"{type_list[0]}[]", input_list
# else:
# return " VARCHAR", [str(x) if x is not None else x for x in input_list]
# else:
# return self.sql_type_helper(input_list, recursive=False)


def sql_type(self, input_list):
"""
**Internal use only. Do not call**
Expand All @@ -75,12 +157,12 @@ def sql_type(self, input_list):

if all(isinstance(x, int) for x in input_list if x is not None):
if any(x < DUCKDB_BIGINT_MIN or x > DUCKDB_BIGINT_MAX for x in input_list if x is not None):
return " DOUBLE"
return " DOUBLE", [float(x) if x is not None else x for x in input_list]
elif any(x < DUCKDB_INT_MIN or x > DUCKDB_INT_MAX for x in input_list if x is not None):
return " BIGINT"
return " INTEGER"
return " BIGINT", input_list
return " INTEGER", input_list
elif all(isinstance(x, float) for x in input_list if x is not None):
return " DOUBLE"
return " DOUBLE", input_list
elif all(isinstance(x, dict) for x in input_list if x is not None):
# Find the superset of all keys in the list of dicts.
all_keys = set()
Expand All @@ -94,18 +176,18 @@ def sql_type(self, input_list):
type_list += f"{k} {t}, "

# Return STRUCT type with the column types found above.
return f" STRUCT({type_list[:-2]})"
return f" STRUCT({type_list[:-2]})", input_list
elif all(isinstance(x, list) for x in input_list if x is not None):
# Find type of list elements by recursively calling this self.sql_type().
type_list = [self.sql_type(input_list=l) for l in input_list]

# If all types are the same (i.e., list is homogeneous), add to table as a list.
# Otherwise, fallback on VARCHAR.
if all(t == type_list[0] for t in type_list):
return type_list[0] + "[]"
return f"{type_list[0]}[]", input_list
else:
return " VARCHAR"
return " VARCHAR"
return " VARCHAR", [str(x) if x is not None else x for x in input_list]
return " VARCHAR", [str(x) if x is not None else x for x in input_list]

def duckdb_compatible_name(self, name):
if (name.startswith('"') and name.endswith('"')) or (name.lower() not in self.duckdb_keywords and name.isidentifier()):
Expand Down Expand Up @@ -143,7 +225,7 @@ def ingest_table_helper(self, types, foreign_query = None, isVerbose=False):
for col in diff_cols:
if col.lower() in [c.lower() for c in query_cols]:
return (ValueError, "Cannot have duplicate column names")
temp_name = col + self.sql_type(types.properties[col])
temp_name = col + self.sql_type(types.properties[col])[0]
try:
self.cur.execute(f"ALTER TABLE {types.name} ADD COLUMN {temp_name};")
except duckdb.Error as e:
Expand Down Expand Up @@ -295,12 +377,13 @@ def ingest_artifacts(self, collection, isVerbose=False):
primary_col = self.duckdb_compatible_name(re.sub(r'[\r\n]+', ' ', primaryTuple[1].replace('-', '_')))
foreign_query += f", FOREIGN KEY ({sql_key}) REFERENCES {primary_table} ({primary_col})"

types.properties[sql_key] = tableData[key]
col_type, col_list = self.sql_type(tableData[key])
types.properties[sql_key] = col_list

if dsi_name in artifacts.keys() and comboTuple in artifacts[dsi_name]["primary_key"]:
types.unit_keys.append(sql_key + self.sql_type(tableData[key]) + " PRIMARY KEY")
types.unit_keys.append(sql_key + col_type + " PRIMARY KEY")
else:
types.unit_keys.append(sql_key + self.sql_type(tableData[key]))
types.unit_keys.append(sql_key + col_type)

error = self.ingest_table_helper(types, foreign_query)
if error is not None:
Expand Down Expand Up @@ -349,10 +432,10 @@ def ingest_artifacts(self, collection, isVerbose=False):

try:
self.cur.execute("COMMIT")
self.cur.execute("CHECKPOINT")
self.cur.execute("FORCE CHECKPOINT")
except duckdb.Error as e:
self.cur.execute("ROLLBACK")
self.cur.execute("CHECKPOINT")
self.cur.execute("FORCE CHECKPOINT")
return (duckdb.Error, e)


Expand Down
19 changes: 10 additions & 9 deletions dsi/backends/sqlite.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,11 +88,11 @@ def sql_type(self, input_list):

if all(isinstance(x, int) for x in input_list if x is not None):
if any(x < SQLITE_INT_MIN or x > SQLITE_INT_MAX for x in input_list if x is not None):
return " FLOAT"
return " INTEGER"
return " FLOAT", [float(x) if x is not None else x for x in input_list]
return " INTEGER", input_list
elif all(isinstance(x, float) for x in input_list if x is not None):
return " FLOAT"
return " VARCHAR"
return " FLOAT", input_list
return " VARCHAR", [str(x) if x is not None else x for x in input_list]

def sqlite_compatible_name(self, name):
if (name.startswith('"') and name.endswith('"')) or (name.upper() not in self.sqlite_keywords and name.isidentifier()):
Expand Down Expand Up @@ -127,7 +127,7 @@ def ingest_table_helper(self, types, foreign_query = None, isVerbose=False):
for col in diff_cols:
if col.lower() in [c.lower() for c in query_cols]:
return (ValueError, "Cannot have duplicate column names")
temp_name = col + self.sql_type(types.properties[col])
temp_name = col + self.sql_type(types.properties[col])[0]
try:
self.cur.execute(f"ALTER TABLE {types.name} ADD COLUMN {temp_name};")
except sqlite3.Error as e:
Expand Down Expand Up @@ -289,12 +289,13 @@ def ingest_artifacts(self, collection, isVerbose=False):
primary_col = self.sqlite_compatible_name(re.sub(r'[\r\n]+', ' ', primaryTuple[1].replace('-', '_')))
foreign_query += f", FOREIGN KEY ({sql_key}) REFERENCES {primary_table} ({primary_col})"

types.properties[sql_key] = tableData[key]

col_type, col_list = self.sql_type(tableData[key])
types.properties[sql_key] = col_list

if dsi_name in artifacts.keys() and comboTuple in artifacts[dsi_name]["primary_key"]:
types.unit_keys.append(sql_key + self.sql_type(tableData[key]) + " PRIMARY KEY")
types.unit_keys.append(sql_key + col_type + " PRIMARY KEY")
else:
types.unit_keys.append(sql_key + self.sql_type(tableData[key]))
types.unit_keys.append(sql_key + col_type)

error = self.ingest_table_helper(types, foreign_query)
if error is not None:
Expand Down
Loading