Skip to content

Commit 693efb8

Browse files
committed
feat: adding in delta support
1 parent 0666269 commit 693efb8

File tree

7 files changed

+408
-47
lines changed

7 files changed

+408
-47
lines changed

requirements.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,3 +5,4 @@ simplejson==3.20.1
55
flask==3.1.0
66
pyyaml==6.0.2
77
pyiceberg[sql-sqlite,pyarrow]==0.8.1
8+
deltalake==0.25.4

servc/svc/com/bus/rabbitmq.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
import simplejson
1010
from pika.adapters.asyncio_connection import AsyncioConnection # type: ignore
1111
from pika.adapters.blocking_connection import BlockingConnection # type: ignore
12+
1213
from servc.svc.com.bus import BusComponent, InputProcessor, OnConsuming
1314
from servc.svc.com.cache.redis import decimal_default
1415
from servc.svc.io.input import EventPayload, InputPayload, InputType

servc/svc/com/storage/delta.py

Lines changed: 188 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,188 @@
1+
import os
2+
from typing import Any, Dict, List, Tuple
3+
4+
import pyarrow as pa
5+
from deltalake import DeltaTable, write_deltalake
6+
from pyarrow import Schema, Table
7+
8+
from servc.svc.com.storage.lake import Lake, LakeTable
9+
from servc.svc.config import Config
10+
11+
12+
class Delta(Lake[DeltaTable]):
13+
_storageOptions: Dict[str, str] = {}
14+
15+
_location_prefix: str
16+
17+
_table: LakeTable
18+
19+
def __init__(self, config: Config, table: LakeTable):
20+
super().__init__(config, table)
21+
22+
self._table = table
23+
24+
catalog_properties_raw = config.get("catalog_properties")
25+
if not isinstance(catalog_properties_raw, dict):
26+
catalog_properties_raw = {}
27+
28+
# TODO: make generic for all storage types
29+
if catalog_properties_raw.get("type") == "local":
30+
self._location_prefix = str(
31+
catalog_properties_raw.get("location", "/tmp/delta")
32+
)
33+
self._storageOptions = {}
34+
else:
35+
self._location_prefix = os.path.join(
36+
str(catalog_properties_raw.get("warehouse")),
37+
str(catalog_properties_raw.get("s3.access-key-id")),
38+
)
39+
self._storageOptions = {
40+
"AWS_ACCESS_KEY_ID": str(
41+
catalog_properties_raw.get("s3.access-key-id")
42+
),
43+
"AWS_SECRET_ACCESS_KEY": str(
44+
catalog_properties_raw.get("s3.secret-access-key")
45+
),
46+
"AWS_ENDPOINT_URL": str(catalog_properties_raw.get("s3.endpoint")),
47+
"AWS_ALLOW_HTTP": "true",
48+
"aws_conditional_put": "etag",
49+
}
50+
51+
def _connect(self):
52+
if self.isOpen:
53+
return None
54+
55+
tablename = self._get_table_name()
56+
uri = os.path.join(self._location_prefix, tablename)
57+
self._conn = DeltaTable.create(
58+
table_uri=uri,
59+
name=tablename,
60+
schema=self._table["schema"],
61+
partition_by=self._table["partitions"],
62+
mode="ignore",
63+
storage_options=self._storageOptions,
64+
)
65+
66+
return super()._connect()
67+
68+
def optimize(self):
69+
table = self.getConn()
70+
71+
print("Optimizing", self._get_table_name(), flush=True)
72+
table.optimize.compact()
73+
table.vacuum()
74+
table.cleanup_metadata()
75+
table.create_checkpoint()
76+
77+
def getPartitions(self) -> Dict[str, List[Any]] | None:
78+
table = self.getConn()
79+
80+
partitions: Dict[str, List[Any]] = {}
81+
for obj in table.partitions():
82+
for key, value in obj.items():
83+
if key not in partitions:
84+
partitions[key] = []
85+
if value not in partitions[key]:
86+
partitions[key].append(value)
87+
88+
return partitions
89+
90+
def getCurrentVersion(self) -> str | None:
91+
table = self.getConn()
92+
return str(table.version())
93+
94+
def getVersions(self) -> List[str] | None:
95+
return [str(self.getCurrentVersion())]
96+
97+
def insert(self, data: List[Any]) -> bool:
98+
table = self.getConn()
99+
write_deltalake(
100+
table,
101+
data=pa.Table.from_pylist(data, self.getSchema()),
102+
storage_options=self._storageOptions,
103+
mode="append",
104+
)
105+
return True
106+
107+
def _filters(
108+
self,
109+
partitions: Dict[str, List[Any]] | None = None,
110+
) -> List[Tuple[str, str, Any]] | None:
111+
filters: List[Tuple[str, str, Any]] = []
112+
if partitions is None:
113+
return None
114+
for key, value in partitions.items():
115+
if len(value) == 1:
116+
filters.append((key, "=", value[0]))
117+
else:
118+
filters.append((key, "in", value))
119+
return filters if len(filters) > 0 else None
120+
121+
def overwrite(
122+
self, data: List[Any], partitions: Dict[str, List[Any]] | None = None
123+
) -> bool:
124+
table = self.getConn()
125+
126+
predicate: str | None = None
127+
filter = self._filters(partitions)
128+
if filter is not None:
129+
predicate = " & ".join([" ".join(x) for x in filter])
130+
131+
write_deltalake(
132+
table,
133+
data=pa.Table.from_pylist(data, self.getSchema()),
134+
storage_options=self._storageOptions,
135+
mode="overwrite",
136+
predicate=predicate,
137+
engine="rust",
138+
)
139+
return True
140+
141+
def readRaw(
142+
self,
143+
columns: List[str],
144+
partitions: Dict[str, List[Any]] | None = None,
145+
version: str | None = None,
146+
options: Any | None = None,
147+
) -> Table:
148+
table = self.getConn()
149+
if version is not None:
150+
table.load_as_version(int(version))
151+
152+
if options is None or not isinstance(options, dict):
153+
options = {}
154+
155+
rcolumns = columns if columns[0] != "*" else None
156+
157+
if options.get("filter", None) is not None:
158+
return table.to_pyarrow_dataset(
159+
partitions=self._filters(partitions),
160+
).to_table(
161+
filter=options.get("filter"),
162+
columns=rcolumns,
163+
)
164+
return table.to_pyarrow_table(
165+
columns=rcolumns,
166+
partitions=self._filters(partitions),
167+
)
168+
169+
def read(
170+
self,
171+
columns: List[str],
172+
partitions: Dict[str, List[Any]] | None = None,
173+
version: str | None = None,
174+
options: Any | None = None,
175+
) -> Table:
176+
return self.readRaw(columns, partitions, version, options)
177+
178+
def getSchema(self) -> Schema | None:
179+
table = self.getConn()
180+
181+
return table.schema().to_pyarrow()
182+
183+
def _close(self):
184+
if self._isOpen:
185+
self._isReady = False
186+
self._isOpen = False
187+
return True
188+
return False

servc/svc/com/storage/iceberg.py

Lines changed: 19 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -15,10 +15,9 @@
1515
from servc.svc.config import Config
1616

1717

18-
class IceBerg(Lake):
18+
class IceBerg(Lake[Table]):
1919
# _table
2020
_catalog: Catalog
21-
_ice: Table | None
2221

2322
def __init__(self, config: Config, table: LakeTable | str):
2423
super().__init__(config, table)
@@ -33,7 +32,6 @@ def __init__(self, config: Config, table: LakeTable | str):
3332
catalog_name,
3433
**{**catalog_properties},
3534
)
36-
self._ice = None
3735

3836
def _connect(self):
3937
if self.isOpen:
@@ -47,7 +45,7 @@ def _connect(self):
4745
except:
4846
doesExist = False
4947
if doesExist:
50-
self._ice = self._catalog.load_table(tableName)
48+
self._conn = self._catalog.load_table(tableName)
5149

5250
elif not doesExist and isinstance(self._table, str):
5351
raise Exception(f"Table {tableName} does not exist")
@@ -72,7 +70,7 @@ def _connect(self):
7270
self._catalog.create_namespace_if_not_exists(self._database)
7371

7472
# TODO: undo this garbage when rest catalog works
75-
self._ice = self._catalog.create_table_if_not_exists(
73+
self._conn = self._catalog.create_table_if_not_exists(
7674
tableName,
7775
self._table["schema"],
7876
partition_spec=partitionSpec,
@@ -82,9 +80,7 @@ def _connect(self):
8280
properties=self._table["options"].get("properties", {}),
8381
)
8482

85-
self._isReady = self._table is not None
86-
self._isOpen = self._table is not None
87-
return self._table is not None
83+
return super()._connect()
8884

8985
def _close(self):
9086
if self._isOpen:
@@ -94,13 +90,10 @@ def _close(self):
9490
return False
9591

9692
def getPartitions(self) -> Dict[str, List[Any]] | None:
97-
if not self._isOpen:
98-
self._connect()
99-
if self._ice is None:
100-
raise Exception("Table not connected")
93+
table = self.getConn()
10194

10295
partitions: Dict[str, List[Any]] = {}
103-
for obj in self._ice.inspect.partitions().to_pylist():
96+
for obj in table.inspect.partitions().to_pylist():
10497
for key, value in obj["partition"].items():
10598
field = key.replace("_partition", "")
10699
if field not in partitions:
@@ -109,54 +102,39 @@ def getPartitions(self) -> Dict[str, List[Any]] | None:
109102
return partitions
110103

111104
def getSchema(self) -> Schema | None:
112-
if not self._isOpen:
113-
self._connect()
114-
if self._ice is None:
115-
raise Exception("Table not connected")
105+
table = self.getConn()
116106

117-
return self._ice.schema().as_arrow()
107+
return table.schema().as_arrow()
118108

119109
def getCurrentVersion(self) -> str | None:
120-
if not self._isOpen:
121-
self._connect()
122-
if self._ice is None:
123-
raise Exception("Table not connected")
110+
table = self.getConn()
124111

125-
snapshot = self._ice.current_snapshot()
112+
snapshot = table.current_snapshot()
126113
if snapshot is None:
127114
return None
128115
return str(snapshot.snapshot_id)
129116

130117
def getVersions(self) -> List[str] | None:
131-
if not self._isOpen:
132-
self._connect()
133-
if self._ice is None:
134-
raise Exception("Table not connected")
118+
table = self.getConn()
135119

136-
snapshots: paTable = self._ice.inspect.snapshots()
120+
snapshots: paTable = table.inspect.snapshots()
137121
chunked = snapshots.column("snapshot_id")
138122
return [str(x) for x in chunked.to_pylist()]
139123

140124
def insert(self, data: List[Any]) -> bool:
141-
if not self._isOpen:
142-
self._connect()
143-
if self._ice is None:
144-
raise Exception("Table not connected")
125+
table = self.getConn()
145126

146-
self._ice.append(pa.Table.from_pylist(data, self.getSchema()))
127+
table.append(pa.Table.from_pylist(data, self.getSchema()))
147128
return True
148129

149130
def overwrite(
150131
self, data: List[Any], partitions: Dict[str, List[Any]] | None = None
151132
) -> bool:
152-
if not self._isOpen:
153-
self._connect()
154-
if self._ice is None:
155-
raise Exception("Table not connected")
133+
table = self.getConn()
156134

157135
df = pa.Table.from_pylist(data, self.getSchema())
158136
if partitions is None or len(partitions) == 0:
159-
self._ice.overwrite(df)
137+
table.overwrite(df)
160138
return True
161139

162140
# when partitions are provided, we need to filter the data
@@ -168,7 +146,7 @@ def overwrite(
168146
for i in range(1, len(boolPartition)):
169147
right_side = And(right_side, boolPartition[i])
170148

171-
self._ice.overwrite(df, overwrite_filter=right_side)
149+
table.overwrite(df, overwrite_filter=right_side)
172150
return True
173151

174152
def readRaw(
@@ -178,10 +156,7 @@ def readRaw(
178156
version: str | None = None,
179157
options: Any | None = None,
180158
) -> DataScan:
181-
if not self._isOpen:
182-
self._connect()
183-
if self._ice is None:
184-
raise Exception("Table not connected")
159+
table = self.getConn()
185160

186161
if options is None:
187162
options = {}
@@ -197,7 +172,7 @@ def readRaw(
197172
options.get("row_filter", AlwaysTrue()), right_side
198173
)
199174

200-
return self._ice.scan(
175+
return table.scan(
201176
row_filter=options.get("row_filter", AlwaysTrue()),
202177
selected_fields=tuple(columns),
203178
limit=options.get("limit", None),

0 commit comments

Comments
 (0)