Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
67 commits
Select commit Hold shift + click to select a range
34c9b2b
(#11) Debezium Engine
AntiO2 Oct 5, 2025
f94acc9
Storage
AntiO2 Oct 6, 2025
88feebf
Monitor
AntiO2 Oct 7, 2025
a191ea0
10.7
AntiO2 Oct 8, 2025
2c5a6a1
Add Web Monitor
AntiO2 Oct 8, 2025
6e67f8d
Refactor
AntiO2 Oct 10, 2025
ad3e847
Refactor
AntiO2 Oct 11, 2025
2681b1c
Fix: Provider doesn't start
AntiO2 Oct 11, 2025
411f700
hehe
AntiO2 Oct 12, 2025
dd2c4a0
Merge branch 'dev/fileio' of github.com:AntiO2/pixels-sink into dev/f…
AntiO2 Oct 12, 2025
b383177
fix small bug
AntiO2 Oct 13, 2025
e1a76da
Feat: add bucket
AntiO2 Oct 14, 2025
3911a79
Retina Writer: Handle Pk ChangeEvent
AntiO2 Oct 15, 2025
f25442a
Handle Retina Service Error
AntiO2 Oct 16, 2025
85b1d02
update retina rpc
AntiO2 Oct 16, 2025
d924bf3
transaction close
AntiO2 Nov 10, 2025
f1464e0
add trace log
AntiO2 Nov 10, 2025
a9795ea
feature: limit active txns
AntiO2 Nov 11, 2025
1b62822
feature: record freshness stats
AntiO2 Nov 11, 2025
8610c8d
feature: record rps stats
AntiO2 Nov 11, 2025
a67dcac
modify config
AntiO2 Nov 14, 2025
83d885f
Enhance Freshness Record
AntiO2 Nov 14, 2025
bbe60d7
Add RateLimiter
AntiO2 Nov 14, 2025
31cbc13
Check Queue
AntiO2 Nov 14, 2025
8d0eea7
Fix: freshness report path
AntiO2 Nov 14, 2025
4ebddb4
Perf Freshness
AntiO2 Nov 15, 2025
0b4bf48
feature: record freshness stats
AntiO2 Nov 19, 2025
30b7224
Freshness
AntiO2 Nov 23, 2025
8d8047d
support multiple nodes
AntiO2 Nov 26, 2025
21a0ddb
docs: Overview
AntiO2 Nov 26, 2025
b8cc525
Add FreshnessClient
AntiO2 Nov 27, 2025
7b8359a
Stats
AntiO2 Nov 28, 2025
383d4e4
fix freshness client
AntiO2 Nov 28, 2025
6d86f97
fix sinkcontext
AntiO2 Nov 28, 2025
26880b5
update ratelimiter
AntiO2 Nov 28, 2025
cf86855
RateLimit Transaction
AntiO2 Nov 28, 2025
e4b2669
Refine NoneWriter And ProtoWriter
AntiO2 Nov 28, 2025
5c3162b
Support Single Table Transaction
AntiO2 Dec 3, 2025
343cb1b
Implement of FlinkWriter and RPC Server (#14)
Rolland1944 Dec 3, 2025
a1aefb1
Change License to AGPL
AntiO2 Dec 4, 2025
9e8b462
small fix
AntiO2 Dec 7, 2025
6329209
IDEA FILE
AntiO2 Dec 8, 2025
508e5aa
Trans Issue
AntiO2 Dec 9, 2025
0b8e444
Verbose Freshness
AntiO2 Dec 9, 2025
a07b759
Embed Freshness
AntiO2 Dec 11, 2025
53a74a6
Enhance Freshness & TableWriter
AntiO2 Dec 16, 2025
f82964a
Freshness Snapshot Query
AntiO2 Dec 17, 2025
83dacac
Improve FreshnessClient
AntiO2 Dec 18, 2025
a56ad3c
Feature: Flink & Retina Implement Abstract Bucket Writer
AntiO2 Dec 18, 2025
fb29502
comment
AntiO2 Dec 18, 2025
5bd3860
update perf scripts
AntiO2 Dec 18, 2025
7fbf966
Reformat Code
AntiO2 Dec 24, 2025
a14e12f
Reformat Code
AntiO2 Dec 24, 2025
b7bca9c
Test class add license header
AntiO2 Dec 24, 2025
8b66731
Add log for Flink Server
AntiO2 Jan 1, 2026
e5b092a
Enhance Flow Control
AntiO2 Jan 4, 2026
381d708
Fit CHBenchmark
AntiO2 Jan 25, 2026
076f9bb
Format Source Code
AntiO2 Jan 25, 2026
ca568ab
Exp Backup
AntiO2 Mar 5, 2026
9185f56
Update Flink Config
AntiO2 Mar 6, 2026
135a8c7
lance
AntiO2 Mar 11, 2026
63d7586
Remove data
AntiO2 Mar 11, 2026
a425b78
Update docs and dev environment
AntiO2 Mar 11, 2026
5c03520
Add scripts README and update configuration docs
AntiO2 Mar 11, 2026
224032d
Add AGPL headers to scripts
AntiO2 Mar 11, 2026
131a706
Remove .idea from version control
AntiO2 Mar 11, 2026
c08f49f
Translate script comments to English
AntiO2 Mar 11, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 18 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
# virtual machine crash logs, see http://www.java.com/en/download/help/error_hotspot.xml
hs_err_pid*

.idea
build
target
.classpath*
Expand All @@ -48,3 +47,21 @@ resources/*.xml
*.so
*.o
.vscode
.idea

data/
tmp/

!requirements.txt
freshness*.png
rate*.png
resulti7i/
result1k2_2/
result_lance
cluster*.png
pixels-sink.out
/*.png
/*.csv
*logs/*.out
*logs/*.png
*logs/**/*.png
50 changes: 48 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,50 @@
# pixels-sink
The data sink for Pixels. It ingests data into Pixels in a streaming manner.

> This project is under development.
Pixels Sink is the data sink service for Pixels. It ingests debezium-format change events from multiple sources (Debezium engine, Kafka, or storage files), converts them into Pixels events, and writes them to a configured sink (Retina, CSV, Proto, Flink, or none).

This project is under active development.

## Docs

- Architecture and pipeline overview: [docs/overview.md](docs/overview.md)
- Transaction handling: [docs/transaction.md](docs/transaction.md)
- Usage guide: [docs/usage.md](docs/usage.md)
- Configuration reference: [docs/configuration.md](docs/configuration.md)
- Local dev environment (Docker): [develop/README.md](develop/README.md)

## Quick Start

### Requirements
- [Pixels](https://github.com/pixelsdb/pixels)
- Java 17
- Maven 3.9+
- Source and sink dependencies based on your configuration
- Kafka broker if `sink.datasource=kafka`
- Debezium + database access if `sink.datasource=engine`
- Retina service if `sink.mode=retina`
- Trino if freshness checking is enabled and uses Trino

### Build
```bash
mvn -q -DskipTests package
```

### Run (Script)
```bash
./pixels-sink [config.properties]
```

The script reads `conf/jvm.conf` and uses a properties file configured inside `./pixels-sink` by default. If you pass a path, it overrides the default.

### Run (IDE)
- Main class: `io.pixelsdb.pixels.sink.PixelsSinkApp`
- Program arguments: `-c conf/pixels-sink.aws.properties`

## Configuration
- Sample configs are in `conf/`.
- Start with `conf/pixels-sink.aws.properties` and adjust.
- See [docs/configuration.md](docs/configuration.md) for a full key reference and guidance.

## Monitoring
- Enable Prometheus metrics with `sink.monitor.enable=true`.
- Metrics endpoint listens on `sink.monitor.port` (default `9464`).
26 changes: 26 additions & 0 deletions conf/jvm.conf
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
-server
-XX:+AlwaysPreTouch
-Dfile.encoding=UTF-8
-Duser.timezone=UTC

-Xms8g
-Xmx60g

-XX:+UseG1GC
-XX:MaxGCPauseMillis=200
-XX:InitiatingHeapOccupancyPercent=35
-XX:+ParallelRefProcEnabled
-XX:+UnlockExperimentalVMOptions
-XX:+TrustFinalNonStaticFields
-XX:+DisableExplicitGC

-Xss512k


-XX:+HeapDumpOnOutOfMemoryError
-XX:HeapDumpPath=/var/log/java/java_heapdump.hprof
-XX:+ExitOnOutOfMemoryError

-agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=*:10086
--add-opens=java.base/sun.nio.ch=ALL-UNNAMED
--add-opens=java.base/java.nio=ALL-UNNAMED
118 changes: 118 additions & 0 deletions conf/pixels-sink.aws.properties
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
# engine | kafka | storage
sink.datasource=storage
# -1 means no limit, Only implement in retina sink mode yet:
sink.datasource.rate.limit=200000
sink.monitor.report.file=/home/ubuntu/pixels-sink/result1k2_feb/rate_8192tile_3.csv
sink.monitor.freshness.file=/home/ubuntu/pixels-sink/result1k2_feb/fresh_8192tile_3.csv
# rate limiter implement: guava or semaphore
sink.datasource.rate.limit.type=guava
# Sink Config: retina | csv | proto | flink | none
sink.mode=retina
sink.retina.client=1
# in flight rpc
sink.retina.rpc.limit=100000
sink.retina.trans.limit=10000000
sink.retina.log.queue=false
sink.retina.trans.request.batch=true
sink.commit.method=async
sink.retina.trans.request.batch.size=100
## batch or single or record, batch is recommend. record is faster, but doesn't have ACID feature
sink.trans.mode=batch
sink.monitor.report.enable=true
# trino for freshness query
trino.url=jdbc:trino://realtime-pixels-coordinator:8080/pixels/pixels_bench
# trino.url=jdbc:trino://realtime-pixels-coordinator:8080/pixels/pixels_bench_sf10x
trino.user=pixels
trino.password=password
trino.parallel=1
# row or txn or embed
sink.monitor.freshness.level=embed
sink.monitor.freshness.verbose=true
sink.monitor.freshness.embed.warmup=10
sink.monitor.freshness.embed.static=false
sink.monitor.freshness.embed.snapshot=true
sink.monitor.freshness.embed.tablelist=loantrans
sink.monitor.freshness.embed.delay=10
# sink.monitor.freshness.embed.tablelist=savingaccount
sink.monitor.freshness.timestamp=true
sink.storage.loop=true
# Kafka Config
bootstrap.servers=realtime-kafka-2:29092
group.id=3078
auto.offset.reset=earliest
key.deserializer=org.apache.kafka.common.serialization.StringDeserializer
#value.deserializer=io.pixelsdb.pixels.writer.deserializer.RowChangeEventAvroDeserializer
value.deserializer=io.pixelsdb.pixels.sink.event.deserializer.RowChangeEventJsonDeserializer
# Topic & Database Config
topic.prefix=postgresql.oltp_server
consumer.capture_database=pixels_bench_sf1x
consumer.include_tables=
sink.csv.path=./data
sink.csv.enable_header=false
## Retina Config
sink.retina.embedded=false
# stub or stream
sink.retina.mode=stream
#writer.retina.mode=stub
sink.remote.host=localhost
sink.remote.port=29422
sink.timeout.ms=5000
sink.flush.interval.ms=5
sink.flush.batch.size=100
sink.max.retries=3
## writer commit
# sync or async
sink.commit.batch.size=100
sink.commit.batch.worker=4
sink.commit.batch.delay=10
## Proto Config
sink.proto.dir=file:///home/ubuntu/disk2/hybench/
sink.proto.data=hybench1000_4
# sink.proto.data=hybench100_3
# sink.proto.data=hybench100_4
# sink.proto.data=hybench10_10
sink.proto.maxRecords=100000
## Flink Config
sink.flink.server.port=9091
## Schema Registry
sink.registry.url=http://localhost:8080/apis/registry/v2
# Transaction Config
transaction.topic.suffix=transaction
#transaction.topic.value.deserializer=io.pixelsdb.pixels.writer.deserializer.TransactionAvroMessageDeserializer
transaction.topic.value.deserializer=io.pixelsdb.pixels.sink.event.deserializer.TransactionJsonMessageDeserializer
sink.trans.batch.size=100

# Sink Metrics
sink.monitor.enable=true
sink.monitor.port=9464
sink.monitor.report.interval=1000
sink.monitor.freshness.interval=1000

# Interact with other rpc
sink.rpc.enable=true
sink.rpc.mock.delay=20
# debezium engine config
debezium.name=testEngine
debezium.connector.class=io.debezium.connector.postgresql.PostgresConnector
debezium.provide.transaction.metadata=true
debezium.offset.storage=org.apache.kafka.connect.storage.FileOffsetBackingStore
debezium.offset.storage.file.filename=/tmp/offsets.dat
debezium.offset.flush.interval.ms=60000
debezium.schema.history.internal=io.debezium.storage.file.history.FileSchemaHistory
debezium.schema.history.internal.file.filename=/tmp/schemahistory.dat
debezium.database.hostname=realtime-pg-2
debezium.database.port=5432
debezium.database.user=pixels
debezium.database.password=pixels_realtime_crud
debezium.database.dbname=pixels_bench
debezium.plugin.name=pgoutput
debezium.database.server.id=1
debezium.schema.include.list=public
debezium.snapshot.mode=never
debezium.key.converter=org.apache.kafka.connect.json.JsonConverter
debezium.value.converter=org.apache.kafka.connect.json.JsonConverter
debezium.topic.prefix=postgresql.oltp_server
debezium.transforms=topicRouting
debezium.transforms.topicRouting.type=org.apache.kafka.connect.transforms.RegexRouter
debezium.transforms.topicRouting.regex=postgresql\\.oltp_server\\.public\\.(.*)
debezium.transforms.topicRouting.replacement=postgresql.oltp_server.pixels_bench_sf10x.$1
115 changes: 115 additions & 0 deletions conf/pixels-sink.ch.properties
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
# engine | kafka | storage
sink.datasource=storage
sink.mode=retina
#sink.datasource=engine
#sink.mode=proto
sink.proto.data=CH10K_2
# -1 means no limit, Only implement in retina sink mode yet:
sink.datasource.rate.limit=80000
sink.monitor.report.file=/home/ubuntu/pixels-sink/result_ch/rate_100K.csv
sink.monitor.freshness.file=/home/ubuntu/pixels-sink/result_ch/fresh_100K.csv
# rate limiter implement: guava or semaphore
sink.datasource.rate.limit.type=guava
sink.retina.client=1
# in flight rpc
sink.retina.rpc.limit=1000000
sink.retina.trans.limit=1000000
sink.retina.log.queue=false
sink.retina.trans.request.batch=false
sink.commit.method=sync
sink.retina.trans.request.batch.size=1000
## batch or single or record, batch is recommend. record is faster, but doesn't have ACID feature
sink.trans.mode=batch
# sink.trans.mode=record
sink.monitor.report.enable=true
# trino for freshness query
trino.url=jdbc:trino://realtime-pixels-coordinator:8080/pixels/pixels_bench
trino.user=pixels
trino.password=password
trino.parallel=1
# row or txn or embed
sink.monitor.freshness.level=embed
sink.monitor.freshness.verbose=true
sink.monitor.freshness.embed.warmup=10
sink.monitor.freshness.embed.static=false
sink.monitor.freshness.embed.snapshot=true
sink.monitor.freshness.embed.tablelist=stock
sink.monitor.freshness.embed.delay=10
sink.monitor.freshness.timestamp=true
sink.storage.loop=true
# Kafka Config
bootstrap.servers=realtime-kafka-2:29092
group.id=3078
auto.offset.reset=earliest
key.deserializer=org.apache.kafka.common.serialization.StringDeserializer
#value.deserializer=io.pixelsdb.pixels.writer.deserializer.RowChangeEventAvroDeserializer
value.deserializer=io.pixelsdb.pixels.sink.event.deserializer.RowChangeEventJsonDeserializer
# Topic & Database Config
topic.prefix=postgresql.oltp_server
consumer.capture_database=pixels_bench_sf1x
consumer.include_tables=
sink.csv.path=./data
sink.csv.enable_header=false
## Retina Config
sink.retina.embedded=false
# stub or stream
sink.retina.mode=stream
#writer.retina.mode=stub
sink.remote.host=localhost
sink.remote.port=29422
sink.timeout.ms=5000
sink.flush.interval.ms=10
sink.flush.batch.size=200
sink.max.retries=3
## writer commit
# sync or async
sink.commit.batch.size=1000
sink.commit.batch.worker=8
sink.commit.batch.delay=1000
## Proto Config
sink.proto.dir=file:///home/ubuntu/disk2/chbench/
sink.proto.maxRecords=100000
## Flink Config
sink.flink.server.port=9091
## Schema Registry
sink.registry.url=http://localhost:8080/apis/registry/v2
# Transaction Config
transaction.topic.suffix=transaction
#transaction.topic.value.deserializer=io.pixelsdb.pixels.writer.deserializer.TransactionAvroMessageDeserializer
transaction.topic.value.deserializer=io.pixelsdb.pixels.sink.event.deserializer.TransactionJsonMessageDeserializer
sink.trans.batch.size=100

# Sink Metrics
sink.monitor.enable=true
sink.monitor.port=9464
sink.monitor.report.interval=1000
sink.monitor.freshness.interval=1000

# Interact with other rpc
sink.rpc.enable=true
sink.rpc.mock.delay=20
# debezium engine config
debezium.name=testEngine
debezium.connector.class=io.debezium.connector.postgresql.PostgresConnector
debezium.provide.transaction.metadata=true
debezium.offset.storage=org.apache.kafka.connect.storage.FileOffsetBackingStore
debezium.offset.storage.file.filename=/tmp/offsets.dat
debezium.offset.flush.interval.ms=60000
debezium.schema.history.internal=io.debezium.storage.file.history.FileSchemaHistory
debezium.schema.history.internal.file.filename=/tmp/schemahistory.dat
debezium.database.hostname=realtime-pg-2
debezium.database.port=5432
debezium.database.user=pixels
debezium.database.password=pixels_realtime_crud
debezium.database.dbname=pixels_bench
debezium.plugin.name=pgoutput
debezium.database.server.id=1
debezium.schema.include.list=tpcch
debezium.snapshot.mode=never
debezium.key.converter=org.apache.kafka.connect.json.JsonConverter
debezium.value.converter=org.apache.kafka.connect.json.JsonConverter
debezium.topic.prefix=postgresql.oltp_server
debezium.transforms=topicRouting
debezium.transforms.topicRouting.type=org.apache.kafka.connect.transforms.RegexRouter
debezium.transforms.topicRouting.regex=postgresql\\.oltp_server\\.public\\.(.*)
debezium.transforms.topicRouting.replacement=postgresql.oltp_server.pixels_bench_sf10x.$1
45 changes: 45 additions & 0 deletions conf/pixels-sink.flink.properties
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
# engine | kafka | storage
sink.datasource=storage
# -1 means no limit, Only implement in retina sink mode yet
sink.datasource.rate.limit=50000
# Sink Config: retina | csv | proto | flink | none
sink.mode=flink
sink.commit.batch.size=20
## batch or single or record, batch is recommend. record is faster, but doesn't have ACID feature
sink.trans.mode=batch
sink.monitor.report.enable=true
sink.monitor.report.file=/home/ubuntu/pixels-sink/result_lance/rate_test.csv
sink.monitor.freshness.file=/home/ubuntu/pixels-sink/result_lance/fresh_test.csv
# trino for freshness query
trino.url=jdbc:trino://realtime-pixels-coordinator:8080/lance/default
trino.user=pixels
trino.password=password
trino.parallel=1
# row or txn or embed
sink.monitor.freshness.level=embed
sink.monitor.freshness.embed.warmup=10
sink.monitor.freshness.embed.static=false
sink.monitor.freshness.embed.snapshot=false
sink.monitor.freshness.embed.tablelist=stock
sink.monitor.freshness.verbose=true
sink.monitor.freshness.timestamp=true
sink.storage.loop=true

sink.remote.host=localhost
sink.remote.port=29422
sink.timeout.ms=5000
sink.flush.interval.ms=50
sink.flush.batch.size=10
sink.max.retries=3

## Proto Config
# sink.proto.data=hybench1000_4
sink.proto.data=CH10K_2
## Flink Config
sink.flink.server.port=9091

# Sink Metrics
sink.monitor.enable=true
sink.monitor.port=9465
sink.monitor.report.interval=10000
sink.monitor.freshness.interval=1000
Loading