Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .vscode/settings.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
{
"python.analysis.extraPaths": [
"./banyan-python",
]
}
3 changes: 3 additions & 0 deletions banyan-polars/banyan_polars/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
__version__ = "0.1.0"

from .api.io import read_csv
File renamed without changes.
75 changes: 75 additions & 0 deletions banyan-polars/banyan_polars/api/df.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
import banyan as bn
import polars as pl
from typing_extensions import Self

from ..communication.lazy_aggregation import LazyAggregation
from .utils_constants import AGGREGATION_FUNCTIONS

# class GroupBy:
# def __init__(self, fut: bn.Future):
# self.future = fut

# def __future__(self) -> bn.Future:
# return self.future

# def agg(self, cols):
# return bn.record_task(
# "res",
# LazyAggregation,
# [self, pl.internals.dataframe.groupby.GroupBy.agg, ],
# ["Blocked", "Consolidated", "Grouped"],
# )


def is_aggregation(expr) -> bool:
if isinstance(expr, list):
return all(is_aggregation(e) for e in expr)
if isinstance(expr, str):
return False
expr_str = str(expr)
return any(s in expr_str for s in AGGREGATION_FUNCTIONS)


class DataFrame:
def __init__(self, fut: bn.Future):
self.future = fut

def __future__(self) -> bn.Future:
return self.future

# def filter(self, expr) -> Self:
# return DataFrame(
# bn.record_task(
# "res",
# pl.DataFrame.filter,
# [self, expr],
# ["Blocked", "Consolidated", "Grouped"],
# )
# )

def select(self, expr) -> Self:
if is_aggregation(expr):
raise ValueError(
f"select received expression {str(expr)} that has an aggregation function not currently supported"
)
return DataFrame(
bn.record_task(
"res",
pl.DataFrame.select,
[self, expr],
["Blocked", "Consolidated", "Grouped"],
)
)

# def groupby(self, cols) -> GroupBy:
# keys = [col for col in bn.utils.to_list(cols)]
# # TODO: Convert keys to strings if they are columns/expressions
# keys_grouping_pts = [bn.pt("Grouped", key=key for key in keys]
# return GroupBy(
# bn.record_task(
# "res",
# pl.DataFrame.groupby,
# [self, cols],
# ["Blocked", "Consolidated", *keys_grouping_pts],
# )
# )
16 changes: 16 additions & 0 deletions banyan-polars/banyan_polars/api/io.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
import banyan as bn

from ..communication.location_spec import LocationSpec
from . import df


def read_csv(p):
# res = LocationSpec(p) where p: Blocked | Consolidated | Grouped
return df.DataFrame(
bn.record_task(
"res",
LocationSpec,
[p, "csv"],
["Blocked", "Consolidated", "Grouped"],
)
)
86 changes: 86 additions & 0 deletions banyan-polars/banyan_polars/api/utils_constants.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
AGGREGATION_FUNCTIONS = [
"any",
"all",
"agg_groups",
"count",
"len",
"slice",
"append",
"rechunk",
"cumsum",
"cumprod",
"cummin",
"cummax",
"cumcount",
"dot",
"mode",
"sort",
"top_k",
"arg_sort",
"arg_max",
"arg_min",
"search_sorted",
"sort_by",
"take",
"shift",
"shift_and_fill",
"forward_fill",
"backward_fill",
"reverse",
"std",
"var",
"max",
"min",
"nan_max",
"nan_min",
"mean",
"median",
"product",
"n_unique",
"arg_unique",
"unique",
"first",
"last",
"over",
"is_unique",
"is_first",
"is_duplicated",
"quantile",
"flatten",
"explode",
"take_every",
"head",
"tail",
"limit",
"interpolate",
"rolling_min",
"rolling_max",
"rolling_mean",
"rolling_sum",
"rolling_std",
"rolling_var",
"rolling_median",
"rolling_quantile",
"rolling_apply",
"rolling_skew",
"argsort",
"rank",
"diff",
"pct_change",
"skew",
"curtosis",
"lower_bound",
"upper_bound",
"reshape",
"shuffle",
"sample",
"ewm_mean",
"ewm_std",
"ewm_var",
"extend_constant",
"value_counts",
"unique_counts",
"entropy",
"cumulative_eval",
"list",
]
Empty file.
1 change: 1 addition & 0 deletions banyan-polars/banyan_polars/communication/df.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
# TODO: Add `convert_partition_type` implementation here
24 changes: 24 additions & 0 deletions banyan-polars/banyan_polars/communication/lazy_aggregation.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
class LazyAggregation:
"""
Store information to lazily aggregate data across multiple workers when
the future for the `LazyAggregation` is converted from None to Consolidated
partition type.
"""

def __init__(
self,
data,
data_func,
value_func,
data_func_args=None,
value_func_args=None,
):
self.data = data
self.data_func = data_func
self.value_func = value_func
self.data_func_args = data_func_args if data_func_args is None else []
self.value_func_args = (
value_func_args if value_func_args is None else []
)

# TODO: Add `convert_partition_type` implementation here
6 changes: 6 additions & 0 deletions banyan-polars/banyan_polars/communication/location_spec.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
class LocationSpec:
def __init__(self, pattern, format):
self.pattern = pattern
self.format = format

# TODO: Add `convert_partition_type` implementation here
Loading