Skip to content
This repository was archived by the owner on Aug 19, 2024. It is now read-only.
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -52,3 +52,7 @@ docs/_build/

# PyBuilder
target/


#ide files
.vscode/
161 changes: 50 additions & 111 deletions datatables/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,19 +2,15 @@
import re
import inspect


BOOLEAN_FIELDS = (
"search.regex", "searchable", "orderable", "regex"
)


DataColumn = namedtuple("DataColumn", ("name", "model_name", "filter"))


class DataTablesError(ValueError):
pass


class DataTable(object):
def __init__(self, params, model, query, columns):
self.params = params
Expand All @@ -27,97 +23,77 @@ def __init__(self, params, model, query, columns):
self.column_search_func = lambda mc, qs, s: qs

for col in columns:
name, model_name, filter_func = None, None, None

if isinstance(col, DataColumn):
self.columns.append(col)
continue
elif isinstance(col, tuple):
# col is either 1. (name, model_name), 2. (name, filter) or 3. (name, model_name, filter)
if len(col) == 3:
name, model_name, filter_func = col
elif len(col) == 2:
# Work out the second argument. If it is a function then it's type 2, else it is type 1.
if callable(col[1]):
name, filter_func = col
model_name = name
else:
name, model_name = col
name, model_name = col if not callable(col[1]) else (col[0], col[0], col[1])
else:
raise ValueError("Columns must be a tuple of 2 to 3 elements")
d = DataColumn(name=name, model_name=model_name, filter=filter_func)
self.columns.append(d)
else:
# It's just a string
name, model_name = col, col
d = DataColumn(name=name, model_name=model_name, filter=None)
self.columns.append(d)

d = DataColumn(name=name, model_name=model_name, filter=filter_func)
self.columns.append(d)
self.columns_dict[d.name] = d

self.join_related_models()

def join_related_models(self):
for column in (col for col in self.columns if "." in col.model_name):
self.query = self.query.join(column.model_name.split(".")[0], aliased=True)

def query_into_dict(self, key_start):
returner = defaultdict(dict)

# Matches columns[number][key] with an [optional_value] on the end
pattern = "{}(?:\[(\d+)\])?\[(\w+)\](?:\[(\w+)\])?".format(key_start)

columns = (param for param in self.params if re.match(pattern, param))

for param in columns:

column_id, key, optional_subkey = re.search(pattern, param).groups()

if column_id is None:
returner[key] = self.coerce_value(key, self.params[param])
elif optional_subkey is None:
returner[int(column_id)][key] = self.coerce_value(key, self.params[param])
else:
# Oh baby a triple
subdict = returner[int(column_id)].setdefault(key, {})
subdict[optional_subkey] = self.coerce_value("{}.{}".format(key, optional_subkey),
self.params[param])

pattern = re.compile(f"{key_start}(?:\[(\d+)\])?\[(\w+)\](?:\[(\w+)\])?")

for param, value in self.params.items():
match = pattern.match(param)
if match:
column_id, key, optional_subkey = match.groups()
if column_id is None:
returner[key] = self.coerce_value(key, value)
elif optional_subkey is None:
returner[int(column_id)][key] = self.coerce_value(key, value)
else:
subdict = returner[int(column_id)].setdefault(key, {})
subdict[optional_subkey] = self.coerce_value(f"{key}.{optional_subkey}", value)

return dict(returner)

@staticmethod
def coerce_value(key, value):
if key in BOOLEAN_FIELDS:
return value == "true"
try:
return int(value)
except ValueError:
if key in BOOLEAN_FIELDS:
return value == "true"

return value
return value

def get_integer_param(self, param_name):
if param_name not in self.params:
raise DataTablesError("Parameter {} is missing".format(param_name))

try:
return int(self.params[param_name])
except ValueError:
raise DataTablesError("Parameter {} is invalid".format(param_name))
except (KeyError, ValueError):
raise DataTablesError(f"Parameter {param_name} is missing or invalid")

def add_data(self, **kwargs):
self.data.update(**kwargs)
self.data.update(kwargs)

def json(self):
try:
return self._json()
except DataTablesError as e:
return {
"error": str(e)
}
return {"error": str(e)}

def get_column(self, column):
if "." in column.model_name:
column_path = column.model_name.split(".")
relationship = getattr(self.model, column_path[0])
model_column = getattr(relationship.property.mapper.entity, column_path[1])
else:
model_column = getattr(self.model, column.model_name)

column_path = column.model_name.split(".")
model_column = getattr(self.model, column_path[0])
for path_part in column_path[1:]:
model_column = getattr(model_column.property.mapper.entity, path_part)
return model_column

def searchable(self, func):
Expand All @@ -138,43 +114,22 @@ def _json(self):
query = self.query
total_records = query.count()

if callable(self.search_func) and search.get("value", None):
if callable(self.search_func) and search.get("value"):
query = self.search_func(query, search["value"])

for column_data in columns.values():
search_value = column_data["search"]["value"]
if (
not column_data["searchable"]
or not search_value
or not callable(self.column_search_func)
):
continue

column_name = column_data["data"]
column = self.columns_dict[column_name]

model_column = self.get_column(column)

query = self.column_search_func(model_column, query, str(search_value))
if column_data.get("searchable") and search_value and callable(self.column_search_func):
column = self.columns_dict[column_data["data"]]
model_column = self.get_column(column)
query = self.column_search_func(model_column, query, str(search_value))

for order in ordering.values():
direction, column = order["dir"], order["column"]

if column not in columns:
raise DataTablesError("Cannot order {}: column not found".format(column))

if not columns[column]["orderable"]:
continue

column_name = columns[column]["data"]
column = self.columns_dict[column_name]

model_column = self.get_column(column)

if isinstance(model_column, property):
raise DataTablesError("Cannot order by column {} as it is a property".format(column.model_name))

query = query.order_by(model_column.desc() if direction == "desc" else model_column.asc())
column_data = columns.get(order["column"])
if column_data and column_data.get("orderable"):
column = self.columns_dict[column_data["data"]]
model_column = self.get_column(column)
query = query.order_by(model_column.desc() if order["dir"] == "desc" else model_column.asc())

filtered_records = query.count()

Expand All @@ -185,34 +140,18 @@ def _json(self):
"draw": draw,
"recordsTotal": total_records,
"recordsFiltered": filtered_records,
"data": [
self.output_instance(instance) for instance in query.all()
]
"data": [self.output_instance(instance) for instance in query.all()]
}

def output_instance(self, instance):
returner = {
key.name: self.get_value(key, instance) for key in self.columns
}

returner = {key.name: self.get_value(key, instance) for key in self.columns}
if self.data:
returner["DT_RowData"] = {
k: v(instance) for k, v in self.data.items()
}

returner["DT_RowData"] = {k: v(instance) for k, v in self.data.items()}
return returner

def get_value(self, key, instance):
attr = key.model_name
if "." in attr:
tmp_list=attr.split(".")
attr=tmp_list[-1]
for sub in tmp_list[:-1]:
instance = getattr(instance, sub)

if key.filter is not None:
r = key.filter(instance)
else:
r = getattr(instance, attr)

return r() if inspect.isroutine(r) else r
for sub in attr.split(".")[:-1]:
instance = getattr(instance, sub)
value = key.filter(instance) if key.filter else getattr(instance, attr.split(".")[-1])
return value() if inspect.isroutine(value) else value