Skip to content

Commit 28a20ff

Browse files
committed
feat: DB & Event listener integration
1 parent 8ed8670 commit 28a20ff

File tree

8 files changed

+1152
-346
lines changed

8 files changed

+1152
-346
lines changed

backend/package.json

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,12 +20,15 @@
2020
"blockstore-core": "^5.0.4",
2121
"datastore-core": "^10.0.4",
2222
"dotenv": "^17.2.1",
23+
"ethers": "^6.15.0",
2324
"fastify": "^5.5.0",
2425
"kubo-rpc-client": "^5.2.0",
26+
"pg": "^8.16.3",
2527
"pinata": "^2.4.9"
2628
},
2729
"devDependencies": {
2830
"@types/node": "^24.3.0",
31+
"@types/pg": "^8.15.5",
2932
"ts-node": "^10.9.2",
3033
"ts-node-dev": "^2.0.0",
3134
"typescript": "^5.9.2"

backend/src/app.ts

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,11 @@
11
import Fastify from "fastify"
22
import fastifySwagger from '@fastify/swagger'
33
import fastifySwaggerUi from '@fastify/swagger-ui'
4-
import dotenv from 'dotenv'
54

5+
import Db from './db/Db.ts'
6+
import EventListener from './listener/Listener.ts'
67
import taskRoutes from './routes/taskRoutes.ts';
78

8-
// Load env variables
9-
dotenv.config()
10-
119

1210
async function buildApp() {
1311
const app = Fastify({ logger: true })
@@ -28,6 +26,15 @@ async function buildApp() {
2826
uiConfig: { docExpansion: 'full', deepLinking: false }
2927
})
3028

29+
// Shared DB
30+
const db = new Db()
31+
await db.init()
32+
33+
// Start smart contract event listener
34+
const el = new EventListener(db)
35+
el.startListener().catch(err => {
36+
app.log.error(err)
37+
})
3138

3239
// Register routes
3340
app.register(taskRoutes, { prefix: '/api' })

backend/src/db/Db.ts

Lines changed: 95 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,95 @@
1+
/**
2+
* @file Db.ts
3+
* @brief Postgres database interface
4+
* @date 04.08.2025
5+
* @author Michal Ľaš
6+
*/
7+
8+
import dotenv from 'dotenv'
9+
import pg from 'pg';
10+
const { Pool } = pg;
11+
12+
// Load env variables
13+
dotenv.config();
14+
15+
export default class Db {
16+
private pool: pg.Pool;
17+
18+
constructor() {
19+
this.pool = new Pool({
20+
user: process.env.DATABASE_USER,
21+
password: process.env.DATABASE_PWD,
22+
host: process.env.DATABASE_HOST,
23+
database: process.env.DATABASE_NAME,
24+
port: parseInt(process.env.DATABASE_PORT!),
25+
});
26+
}
27+
28+
async init() {
29+
await this.pool.query(`
30+
CREATE TABLE IF NOT EXISTS tasks (
31+
task_id BIGINT PRIMARY KEY,
32+
difficulty TEXT NOT NULL,
33+
time TIMESTAMP NOT NULL,
34+
tags TEXT[] NOT NULL,
35+
uri TEXT NOT NULL,
36+
tx_hash TEXT NOT NULL
37+
);
38+
`)
39+
40+
await this.pool.query(`
41+
CREATE TABLE IF NOT EXISTS processed_blocks (
42+
id SERIAL PRIMARY KEY,
43+
last_processed_block BIGINT NOT NULL
44+
);
45+
`)
46+
47+
// ensure one row exists
48+
const res = await this.pool.query(`SELECT COUNT(*)::int AS count FROM processed_blocks;`)
49+
if (res.rows[0].count === 0) {
50+
await this.pool.query(`INSERT INTO processed_blocks (last_processed_block) VALUES (0);`)
51+
}
52+
}
53+
54+
async saveTask(task: {
55+
taskId: number;
56+
difficulty: string;
57+
time: number;
58+
tags: string[];
59+
uri: string;
60+
txHash: string;
61+
}) {
62+
await this.pool.query(
63+
`INSERT INTO tasks (task_id, difficulty, created_at, tags, uri, tx_hash, active)
64+
VALUES ($1, $2, to_timestamp($3), $4, $5, $6, true)
65+
ON CONFLICT (task_id) DO UPDATE
66+
SET difficulty = EXCLUDED.difficulty,
67+
created_at = EXCLUDED.created_at,
68+
tags = EXCLUDED.tags,
69+
uri = EXCLUDED.uri,
70+
tx_hash = EXCLUDED.tx_hash,
71+
active = true`,
72+
[task.taskId, task.difficulty, task.time, task.tags, task.uri, task.txHash]
73+
);
74+
}
75+
76+
async deactivateTask(taskId: number, txHash: string) {
77+
await this.pool.query(
78+
`UPDATE tasks SET active = false, tx_hash = $2 WHERE task_id = $1`,
79+
[taskId, txHash]
80+
);
81+
}
82+
83+
async getLastProcessedBlock(): Promise<number | null> {
84+
const res = await this.pool.query(`SELECT last_processed_block FROM sync_state ORDER BY id DESC LIMIT 1`);
85+
return res.rows.length > 0 ? Number(res.rows[0].last_processed_block) : null;
86+
}
87+
88+
async updateLastProcessedBlock(blockNumber: number) {
89+
await this.pool.query(
90+
`INSERT INTO sync_state (last_processed_block)
91+
VALUES ($1)`,
92+
[blockNumber]
93+
);
94+
}
95+
}

backend/src/db/db.sql

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
-- tasks table
2+
CREATE TABLE tasks (
3+
id SERIAL PRIMARY KEY,
4+
task_id BIGINT UNIQUE NOT NULL,
5+
difficulty TEXT NOT NULL,
6+
created_at TIMESTAMPTZ NOT NULL,
7+
tags TEXT[] NOT NULL,
8+
uri TEXT NOT NULL,
9+
tx_hash TEXT NOT NULL,
10+
active BOOLEAN DEFAULT true
11+
);
12+
13+
CREATE INDEX idx_tasks_difficulty ON tasks(difficulty);
14+
CREATE INDEX idx_tasks_tags ON tasks USING GIN (tags);
15+
CREATE INDEX idx_tasks_active ON tasks(active);
16+
17+
-- sync state (only 1 row expected)
18+
CREATE TABLE sync_state (
19+
id SERIAL PRIMARY KEY,
20+
last_processed_block BIGINT NOT NULL
21+
);

backend/src/listener/Listener.ts

Lines changed: 211 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,211 @@
1+
/**
2+
* @file Listener.ts
3+
* @brief Listens to TaskManager contract events and processes them.
4+
* Supports both local (Anvil) and production (Infura) modes.
5+
* Fetches past events for backfilling and listens for new events.
6+
* @date 04.08.2025
7+
* @author Michal Ľaš
8+
*/
9+
10+
import { ethers } from 'ethers';
11+
import dotenv from 'dotenv'
12+
import TaskManagerAbi from './abi/TaskManagerAbi.json' with {type: 'json'}; // compiled ABI
13+
import Db from '../db/Db.js';
14+
15+
// Load env variables
16+
dotenv.config();
17+
18+
19+
export default class EventListener {
20+
21+
private MODE
22+
private INFURA_API_KEY
23+
private CONTRACT_ADDRESS
24+
private START_BLOCK: number
25+
private FINALITY_BLOCKS: number
26+
private initialized
27+
private provider!: ethers.Provider
28+
private contract!: ethers.Contract
29+
private db: Db
30+
// Queue of events per block
31+
private eventQueue: Map<number, { blockHash: string, handlers: (() => Promise<void>)[] }[]> = new Map();
32+
33+
34+
constructor(db: Db) {
35+
this.db = db
36+
this.initialized = false
37+
this.MODE = process.env.MODE || 'local'
38+
this.INFURA_API_KEY = process.env.INFURA_API_KEY
39+
this.CONTRACT_ADDRESS = process.env.CONTRACT_ADDRESS
40+
this.START_BLOCK = parseInt(process.env.START_BLOCK || "0") // block when contract deployed
41+
this.FINALITY_BLOCKS = parseInt(process.env.FINALITY_BLOCKS || "5")
42+
this._validateConfig()
43+
}
44+
45+
/// Validate necessary config variables
46+
private _validateConfig(): void {
47+
if (this.CONTRACT_ADDRESS === undefined) {
48+
throw new Error('Missing CONTRACT_ADDRESS in .env')
49+
} else if (this.MODE === undefined) {
50+
throw new Error('Missing MODE in .env')
51+
} else if (this.MODE === 'prod' && this.INFURA_API_KEY === undefined) {
52+
throw new Error('Missing INFURA_API_KEY in .env')
53+
}
54+
}
55+
56+
/// Initialization of provider and contract
57+
private _initialize(): void {
58+
if (!this.initialized) {
59+
this.provider = this._getProvider()
60+
this.contract = new ethers.Contract(this.CONTRACT_ADDRESS!, TaskManagerAbi, this.provider);
61+
if (!this.contract) {
62+
throw new Error('Contract initialization failure')
63+
}
64+
}
65+
}
66+
67+
// Provider factory
68+
private _getProvider(): ethers.JsonRpcProvider | ethers.InfuraWebSocketProvider {
69+
if (this.MODE === 'local') {
70+
// Anvil runs on 127.0.0.1:8545
71+
return new ethers.JsonRpcProvider('http://127.0.0.1:8545')
72+
} else {
73+
if (!this.INFURA_API_KEY) {
74+
throw new Error('Missing INFURA_API_KEY in .env')
75+
}
76+
return ethers.InfuraProvider.getWebSocketProvider('sepolia', this.INFURA_API_KEY)
77+
}
78+
}
79+
80+
// Fetch past events from fromBlock to toBlock
81+
private async _fetchPastEvents(fromBlock: number, toBlock: number): Promise<void> {
82+
this._initialize()
83+
// Build filter for a specific event
84+
const filterCreated = this.contract.filters.TaskCreated!()
85+
const logsCreated = await this.contract.queryFilter(filterCreated, fromBlock, toBlock)
86+
87+
logsCreated.forEach(async (log: any) => {
88+
const { taskId, difficulty, time, tags, URI, event} = log.args;
89+
console.log("Past TaskCreated:", {
90+
taskId: taskId.toString(),
91+
difficulty: difficulty,
92+
time: Number(time),
93+
tags: tags,
94+
uri: URI,
95+
txHash: event.log.transactionHash,
96+
blockNumber: event.log.blockNumber
97+
})
98+
// Save to Postgres
99+
await this.db.saveTask({
100+
taskId: Number(taskId),
101+
difficulty,
102+
time: Number(time),
103+
tags,
104+
uri: URI,
105+
txHash: event.log.transactionHash
106+
});
107+
});
108+
109+
const filterDeactivated = this.contract.filters.TaskDeactivated!()
110+
const logsDeactivated = await this.contract!.queryFilter(filterDeactivated, fromBlock, toBlock)
111+
logsDeactivated.forEach(async (log: any) => {
112+
const { taskId, event} = log.args;
113+
console.log("Past TaskDeactivated:", {
114+
taskId: taskId.toString(),
115+
txHash: event.log.transactionHash,
116+
})
117+
await this.db.deactivateTask(Number(taskId), event.log.transactionHash)
118+
})
119+
120+
await this.db.updateLastProcessedBlock(toBlock);
121+
}
122+
123+
// Backfill past events from the deployed block to the latest block
124+
private async _backfill() {
125+
this._initialize()
126+
const currentBlock = await this.provider.getBlockNumber();
127+
let fromBlock = (await this.db.getLastProcessedBlock()) ?? this.START_BLOCK;
128+
if (fromBlock > currentBlock) fromBlock = currentBlock;
129+
await this._fetchPastEvents(fromBlock, currentBlock - this.FINALITY_BLOCKS);
130+
}
131+
132+
/// Adds emmited event into the processing queue - handler executes the action once after the blockNumber fill be considerd final
133+
private _queueEvent(blockNumber: number, blockHash: string, handler: () => Promise<void>) {
134+
if (!this.eventQueue.has(blockNumber)) {
135+
this.eventQueue.set(blockNumber, []);
136+
}
137+
this.eventQueue.get(blockNumber)!.push({ blockHash, handlers: [handler] });
138+
}
139+
140+
/// Process events that are considered final
141+
private async _processFinalizedBlocks(finalizedBlock: number) {
142+
for (let [blockNumber, entries] of this.eventQueue) {
143+
if (blockNumber <= finalizedBlock) {
144+
const canonicalBlock = await this.provider.getBlock(blockNumber);
145+
if (!canonicalBlock) continue;
146+
147+
for (let { blockHash, handlers } of entries) {
148+
if (canonicalBlock.hash === blockHash) {
149+
for (const h of handlers) await h();
150+
await this.db.updateLastProcessedBlock(blockNumber);
151+
} else {
152+
console.warn(`⚠️ Ignored orphaned block ${blockNumber} (hash=${blockHash})`);
153+
}
154+
}
155+
this.eventQueue.delete(blockNumber);
156+
}
157+
}
158+
}
159+
160+
// Start listening for events
161+
public async startListener() {
162+
this._backfill() // this._initialize() called inside _backfill()
163+
console.log(`Listening for TaskManager events in ${this.MODE} mode...`)
164+
165+
// Event: TaskCreated
166+
this.contract!.on("TaskCreated", (taskId, difficulty, time, tags, URI, event) => {
167+
console.log("TaskCreated Event:")
168+
console.log({
169+
taskId: taskId.toString(),
170+
difficulty: difficulty,
171+
time: Number(time),
172+
tags: tags,
173+
uri: URI,
174+
txHash: event.log.transactionHash,
175+
blockNumber: event.log.blockNumber
176+
})
177+
// Add event to queue
178+
this._queueEvent(event.log.blockNumber, event.log.blockHash, async () => {
179+
await this.db.saveTask({
180+
taskId: Number(taskId),
181+
difficulty,
182+
time: Number(time),
183+
tags,
184+
uri: URI,
185+
txHash: event.log.transactionHash,
186+
});
187+
})
188+
})
189+
190+
// Event: TaskDeactivated
191+
this.contract!.on("TaskDeactivated", (taskId, event) => {
192+
console.log("TaskDeactivated Event:")
193+
console.log({
194+
taskId: taskId.toString(),
195+
txHash: event.log.transactionHash,
196+
})
197+
// Add event to queue
198+
this._queueEvent(event.log.blockNumber, event.log.blockHash, async () => {
199+
await this.db.deactivateTask(Number(taskId), event.log.transactionHash);
200+
});
201+
})
202+
203+
this.provider!.on("block", async (blockNumber) => {
204+
console.log(`New block mined: ${blockNumber}`)
205+
const finalized = blockNumber - this.FINALITY_BLOCKS;
206+
if (finalized > 0) {
207+
await this._processFinalizedBlocks(finalized);
208+
}
209+
})
210+
}
211+
}

0 commit comments

Comments
 (0)