From 6a516a051271df48a860f3d4d464f19ecaeaebc7 Mon Sep 17 00:00:00 2001 From: konard Date: Sun, 18 Jan 2026 22:01:11 +0100 Subject: [PATCH 1/5] Initial commit with task details Adding CLAUDE.md with task information for AI processing. This file will be removed when the task is complete. Issue: https://github.com/link-foundation/links-queue/issues/22 --- CLAUDE.md | 5 +++++ 1 file changed, 5 insertions(+) create mode 100644 CLAUDE.md diff --git a/CLAUDE.md b/CLAUDE.md new file mode 100644 index 0000000..ed0d0f8 --- /dev/null +++ b/CLAUDE.md @@ -0,0 +1,5 @@ +Issue to solve: https://github.com/link-foundation/links-queue/issues/22 +Your prepared branch: issue-22-50458fb05c5c +Your prepared working directory: /tmp/gh-issue-solver-1768770069758 + +Proceed. From f571249d57e55c2c953064632153a9a227132e6c Mon Sep 17 00:00:00 2001 From: konard Date: Sun, 18 Jan 2026 22:15:29 +0100 Subject: [PATCH 2/5] Add basic examples for JS and Rust (1-4) Implement the first four examples demonstrating real-world use cases: JavaScript examples: - 01-hello-world: Basic queue operations (enqueue, dequeue, acknowledge) - 02-link-relationships: Nested links, universal links, graph structures - 03-producer-consumer: Multiple producers/consumers with async processing - 04-deduplication: MemoryLinkStore automatic deduplication Rust examples: - 01-hello-world: Basic MemoryQueue usage with tokio async - 02-link-relationships: Nested links, LinkPattern matching, graph ops - 03-producer-consumer: Concurrent producers/consumers with Arc - 04-deduplication: Type-generic deduplication with MemoryLinkStore Each example includes: - Runnable code demonstrating the concept - README.md with expected output and key concepts Closes #22 (partial - basic examples 1-4) Co-Authored-By: Claude Opus 4.5 --- js/examples/01-hello-world/README.md | 68 +++++ js/examples/01-hello-world/index.js | 107 ++++++++ js/examples/02-link-relationships/README.md | 98 ++++++++ js/examples/02-link-relationships/index.js | 202 +++++++++++++++ js/examples/03-producer-consumer/README.md | 97 ++++++++ js/examples/03-producer-consumer/index.js | 175 +++++++++++++ js/examples/04-deduplication/README.md | 123 ++++++++++ js/examples/04-deduplication/index.js | 206 ++++++++++++++++ rust/Cargo.toml | 16 ++ rust/examples/01-hello-world/README.md | 61 +++++ rust/examples/01-hello-world/main.rs | 97 ++++++++ rust/examples/02-link-relationships/README.md | 92 +++++++ rust/examples/02-link-relationships/main.rs | 227 +++++++++++++++++ rust/examples/03-producer-consumer/README.md | 97 ++++++++ rust/examples/03-producer-consumer/main.rs | 221 +++++++++++++++++ rust/examples/04-deduplication/README.md | 115 +++++++++ rust/examples/04-deduplication/main.rs | 232 ++++++++++++++++++ 17 files changed, 2234 insertions(+) create mode 100644 js/examples/01-hello-world/README.md create mode 100644 js/examples/01-hello-world/index.js create mode 100644 js/examples/02-link-relationships/README.md create mode 100644 js/examples/02-link-relationships/index.js create mode 100644 js/examples/03-producer-consumer/README.md create mode 100644 js/examples/03-producer-consumer/index.js create mode 100644 js/examples/04-deduplication/README.md create mode 100644 js/examples/04-deduplication/index.js create mode 100644 rust/examples/01-hello-world/README.md create mode 100644 rust/examples/01-hello-world/main.rs create mode 100644 rust/examples/02-link-relationships/README.md create mode 100644 rust/examples/02-link-relationships/main.rs create mode 100644 rust/examples/03-producer-consumer/README.md create mode 100644 rust/examples/03-producer-consumer/main.rs create mode 100644 rust/examples/04-deduplication/README.md create mode 100644 rust/examples/04-deduplication/main.rs diff --git a/js/examples/01-hello-world/README.md b/js/examples/01-hello-world/README.md new file mode 100644 index 0000000..c40b35c --- /dev/null +++ b/js/examples/01-hello-world/README.md @@ -0,0 +1,68 @@ +# Hello World Example + +This example demonstrates the most basic usage of Links Queue: simple enqueue and dequeue operations. + +## Key Concepts + +- **Link**: The fundamental data unit with `id`, `source`, and `target` fields +- **LinksQueue**: A FIFO queue that holds links for processing +- **Enqueue/Dequeue**: Standard queue operations to add and remove items +- **Acknowledge**: Confirm that a dequeued item was processed successfully + +## Running the Example + +```bash +# Node.js +node examples/01-hello-world/index.js + +# Bun +bun examples/01-hello-world/index.js + +# Deno +deno run examples/01-hello-world/index.js +``` + +## Expected Output + +``` +=== Links Queue: Hello World === + +Created links: + Greeting: { id: 1, source: 'hello', target: 'world' } + Message: { id: 2, source: 'links', target: 'queue' } + Data: { id: 3, source: 42, target: 100 } + +--- Enqueuing links --- +Enqueued link 1 at position 0 +Enqueued link 2 at position 1 +Enqueued link 3 at position 2 + +Queue stats after enqueueing: + Depth: 3 + Enqueued: 3 + +--- Dequeuing and processing --- +Processing: hello -> world +Acknowledged link 1 +Processing: links -> queue +Acknowledged link 2 +Processing: 42 -> 100 +Acknowledged link 3 + +Final queue stats: + Depth: 0 + Enqueued: 3 + Dequeued: 3 + Acknowledged: 3 + +Dequeue from empty queue: null + +=== Hello World Complete! === +``` + +## What This Example Shows + +1. **Creating Links**: Links can have various types for source and target (strings, numbers) +2. **Queue Setup**: How to create a `LinksQueue` with a `MemoryLinkStore` backend +3. **FIFO Processing**: Items are processed in the order they were enqueued +4. **Acknowledgment Pattern**: The dequeue-process-acknowledge workflow for reliable processing diff --git a/js/examples/01-hello-world/index.js b/js/examples/01-hello-world/index.js new file mode 100644 index 0000000..70d13f7 --- /dev/null +++ b/js/examples/01-hello-world/index.js @@ -0,0 +1,107 @@ +/** + * Hello World Example for links-queue + * + * This example demonstrates basic enqueue/dequeue operations using + * the links-queue library. It shows how to: + * - Create a queue + * - Enqueue links (messages) + * - Dequeue links + * - Acknowledge processed items + * + * Run with any runtime: + * - Node.js: node examples/01-hello-world/index.js + * - Bun: bun examples/01-hello-world/index.js + * - Deno: deno run examples/01-hello-world/index.js + */ + +import { createLink, MemoryLinkStore, LinksQueue } from '../../src/index.js'; + +async function main() { + console.log('=== Links Queue: Hello World ===\n'); + + // Create a MemoryLinkStore for storing links + const store = new MemoryLinkStore(); + + // Create a queue with basic configuration + const queue = new LinksQueue({ + name: 'hello-world', + store, + options: { + visibilityTimeout: 30, // seconds + retryLimit: 3, + }, + }); + + // Create some links to enqueue + // Links are the fundamental data unit - they have id, source, and target + const greetingLink = createLink(1, 'hello', 'world'); + const messageLink = createLink(2, 'links', 'queue'); + const dataLink = createLink(3, 42, 100); + + console.log('Created links:'); + console.log(' Greeting:', greetingLink); + console.log(' Message:', messageLink); + console.log(' Data:', dataLink); + + // Enqueue links + console.log('\n--- Enqueuing links ---'); + const result1 = await queue.enqueue(greetingLink); + console.log(`Enqueued link 1 at position ${result1.position}`); + + const result2 = await queue.enqueue(messageLink); + console.log(`Enqueued link 2 at position ${result2.position}`); + + const result3 = await queue.enqueue(dataLink); + console.log(`Enqueued link 3 at position ${result3.position}`); + + // Check queue stats + const stats = queue.getStats(); + console.log('\nQueue stats after enqueueing:'); + console.log(` Depth: ${stats.depth}`); + console.log(` Enqueued: ${stats.enqueued}`); + + // Dequeue and process links + console.log('\n--- Dequeuing and processing ---'); + + // Dequeue first link + const item1 = await queue.dequeue(); + if (item1) { + console.log(`Processing: ${item1.source} -> ${item1.target}`); + // Acknowledge successful processing + await queue.acknowledge(item1.id); + console.log(`Acknowledged link ${item1.id}`); + } + + // Dequeue second link + const item2 = await queue.dequeue(); + if (item2) { + console.log(`Processing: ${item2.source} -> ${item2.target}`); + await queue.acknowledge(item2.id); + console.log(`Acknowledged link ${item2.id}`); + } + + // Dequeue third link + const item3 = await queue.dequeue(); + if (item3) { + console.log(`Processing: ${item3.source} -> ${item3.target}`); + await queue.acknowledge(item3.id); + console.log(`Acknowledged link ${item3.id}`); + } + + // Final stats + const finalStats = queue.getStats(); + console.log('\nFinal queue stats:'); + console.log(` Depth: ${finalStats.depth}`); + console.log(` Enqueued: ${finalStats.enqueued}`); + console.log(` Dequeued: ${finalStats.dequeued}`); + console.log(` Acknowledged: ${finalStats.acknowledged}`); + + // Try to dequeue from empty queue + const empty = await queue.dequeue(); + console.log(`\nDequeue from empty queue: ${empty}`); // null + + console.log('\n=== Hello World Complete! ==='); +} + +// Run the example +main().catch(console.error); diff --git a/js/examples/02-link-relationships/README.md b/js/examples/02-link-relationships/README.md new file mode 100644 index 0000000..e9b5348 --- /dev/null +++ b/js/examples/02-link-relationships/README.md @@ -0,0 +1,98 @@ +# Link Relationships Example + +This example demonstrates nested links and graph structures - the unique link-based data model that distinguishes Links Queue from traditional message queues. + +## Key Concepts + +- **Links as Relations**: Each link represents a directed relationship from source to target +- **Nested Links**: Source and target can themselves be links, enabling hierarchical data +- **Universal Links**: Links with additional values for n-ary relationships +- **Graph Structures**: Multiple links form a graph that can be traversed and queried + +## Running the Example + +```bash +# Node.js +node examples/02-link-relationships/index.js + +# Bun +bun examples/02-link-relationships/index.js + +# Deno +deno run examples/02-link-relationships/index.js +``` + +## Expected Output + +``` +=== Links Queue: Link Relationships === + +--- Part 1: Basic Links as Relations --- + +Alice knows Bob: { id: 1, source: 'alice', target: 'bob' } +Alice person link: { id: 2, source: 100, target: 'alice' } +Alice name link: { id: 3, source: { id: 2, source: 100, target: 'alice' }, target: 'Alice Smith' } + +--- Part 2: Nested Links (Recursive Structures) --- + +Nested link (statement about relationship): + Base: { id: 10, source: 'alice', target: 'bob' } + Meta: { id: 11, source: { id: 10, source: 'alice', target: 'bob' }, target: 2024 } + Established relationship: alice -> bob + In year: 2024 + +--- Part 3: Universal Links (Multiple Values) --- + +Universal link (n-ary relation): { id: 20, source: 'Earth', target: 'orbits', values: [ 'Sun', 0.999, 'astronomy' ] } + Subject: Earth + Predicate: orbits + Additional values: [ 'Sun', 0.999, 'astronomy' ] + +--- Part 4: Graph Structures --- + +Social graph edges: + alice -> bob + bob -> charlie + alice -> charlie + charlie -> alice + +Alice is connected to: + bob + charlie + +Charlie receives connections from: + bob + alice + +--- Part 5: Processing Graph Operations via Queue --- + +Queued: add_edge node1->node2 +Queued: add_edge node2->node3 +Queued: query {"source":"node1","target":{}} +Queued: remove_edge node1->node2 + +Processing operations: + ADD: node1 -> node2 + ADD: node2 -> node3 + QUERY: source=node1 + REMOVE: node1 -> node2 + +=== Link Relationships Complete! === +``` + +## What This Example Shows + +1. **Binary Relations**: Simple A -> B relationships (social connections, edges) +2. **Typed Links**: Using numeric IDs to represent types and properties +3. **Meta-Links**: Links about other links (provenance, timestamps) +4. **N-ary Relations**: Universal links with values array for complex relations +5. **Graph Building**: Constructing and querying graph structures +6. **Queue Integration**: Processing graph operations through a queue + +## Use Cases + +- Knowledge graphs and semantic data +- Social network relationships +- Provenance tracking (who said what, when) +- Complex event processing with metadata +- Graph database operations via message queue diff --git a/js/examples/02-link-relationships/index.js b/js/examples/02-link-relationships/index.js new file mode 100644 index 0000000..efe0544 --- /dev/null +++ b/js/examples/02-link-relationships/index.js @@ -0,0 +1,202 @@ +/** + * Link Relationships Example for links-queue + * + * This example demonstrates nested links and graph structures. + * Links Queue uses a unique link-based data model where: + * - Each link has an id, source, and target + * - Source and target can be IDs or nested links + * - This allows representing complex relationships and graph structures + * + * Run with any runtime: + * - Node.js: node examples/02-link-relationships/index.js + * - Bun: bun examples/02-link-relationships/index.js + * - Deno: deno run examples/02-link-relationships/index.js + */ + +import { + createLink, + MemoryLinkStore, + LinksQueue, + getLinkId, + isLink, + matchesPattern, + Any, +} from '../../src/index.js'; + +// Part 1: Basic Links as Relations +function demoBasicRelations() { + console.log('--- Part 1: Basic Links as Relations ---\n'); + + // Links naturally represent relationships (edges in a graph) + const alice = 'alice'; + const bob = 'bob'; + const knowsRelation = createLink(1, alice, bob); + console.log('Alice knows Bob:', knowsRelation); + + // Representing a typed relationship + const personType = 100; + const alicePerson = createLink(2, personType, alice); + const aliceName = createLink(3, alicePerson, 'Alice Smith'); + console.log('Alice person link:', alicePerson); + console.log('Alice name link:', aliceName); + + return { alice, bob }; +} + +// Part 2: Nested Links (Recursive Structures) +function demoNestedLinks(alice, bob) { + console.log('\n--- Part 2: Nested Links (Recursive Structures) ---\n'); + + // First, the base relationship + const aliceKnowsBob = createLink(10, alice, bob); + + // Then, a link about that relationship (meta-link) + const establishedIn = createLink(11, aliceKnowsBob, 2024); + console.log('Nested link (statement about relationship):'); + console.log(' Base:', aliceKnowsBob); + console.log(' Meta:', establishedIn); + + // Accessing nested data + if (isLink(establishedIn.source)) { + console.log( + ' Established relationship:', + establishedIn.source.source, + '->', + establishedIn.source.target + ); + console.log(' In year:', establishedIn.target); + } +} + +// Part 3: Universal Links (Multiple Values) +function demoUniversalLinks() { + console.log('\n--- Part 3: Universal Links (Multiple Values) ---\n'); + + // Universal links for n-ary relations + const subject = 'Earth'; + const predicate = 'orbits'; + const object = 'Sun'; + const confidence = 0.999; + const source = 'astronomy'; + + const universalLink = createLink(20, subject, predicate, [ + object, + confidence, + source, + ]); + console.log('Universal link (n-ary relation):', universalLink); + console.log(' Subject:', universalLink.source); + console.log(' Predicate:', universalLink.target); + console.log(' Additional values:', universalLink.values); +} + +// Part 4: Graph Structures +function demoGraphStructures() { + console.log('\n--- Part 4: Graph Structures ---\n'); + + const users = { alice: 'alice', bob: 'bob', charlie: 'charlie' }; + + const graph = [ + createLink(30, users.alice, users.bob), + createLink(31, users.bob, users.charlie), + createLink(32, users.alice, users.charlie), + createLink(33, users.charlie, users.alice), + ]; + + console.log('Social graph edges:'); + graph.forEach((link) => { + console.log(` ${link.source} -> ${link.target}`); + }); + + // Find all connections from Alice + const aliceConnections = graph.filter((link) => + matchesPattern(link, { source: users.alice }) + ); + console.log('\nAlice is connected to:'); + aliceConnections.forEach((link) => { + console.log(` ${link.target}`); + }); + + // Find all connections to Charlie + const charlieIncoming = graph.filter((link) => + matchesPattern(link, { target: users.charlie }) + ); + console.log('\nCharlie receives connections from:'); + charlieIncoming.forEach((link) => { + console.log(` ${link.source}`); + }); +} + +// Part 5: Processing Graph Operations via Queue +async function demoQueueOperations() { + console.log('\n--- Part 5: Processing Graph Operations via Queue ---\n'); + + const store = new MemoryLinkStore(); + const queue = new LinksQueue({ name: 'graph-operations', store }); + + const ADD_EDGE = 'add_edge'; + const REMOVE_EDGE = 'remove_edge'; + const QUERY = 'query'; + + // Queue up some graph operations + const operations = [ + createLink(100, ADD_EDGE, createLink(101, 'node1', 'node2')), + createLink(102, ADD_EDGE, createLink(103, 'node2', 'node3')), + createLink(104, QUERY, { source: 'node1', target: Any }), + createLink(105, REMOVE_EDGE, createLink(106, 'node1', 'node2')), + ]; + + // Enqueue operations + for (const op of operations) { + await queue.enqueue(op); + const opType = op.source; + const opData = isLink(op.target) + ? `${op.target.source}->${op.target.target}` + : JSON.stringify(op.target); + console.log(`Queued: ${opType} ${opData}`); + } + + // Process operations + console.log('\nProcessing operations:'); + let item; + while ((item = await queue.dequeue()) !== null) { + const operation = item.source; + const payload = item.target; + + switch (operation) { + case ADD_EDGE: + console.log( + ` ADD: ${getLinkId(payload.source)} -> ${getLinkId(payload.target)}` + ); + break; + case REMOVE_EDGE: + console.log( + ` REMOVE: ${getLinkId(payload.source)} -> ${getLinkId(payload.target)}` + ); + break; + case QUERY: + console.log(` QUERY: source=${payload.source}`); + break; + default: + console.log(` UNKNOWN: ${operation}`); + } + + await queue.acknowledge(item.id); + } +} + +// Main function +async function main() { + console.log('=== Links Queue: Link Relationships ===\n'); + + const { alice, bob } = demoBasicRelations(); + demoNestedLinks(alice, bob); + demoUniversalLinks(); + demoGraphStructures(); + await demoQueueOperations(); + + console.log('\n=== Link Relationships Complete! ==='); +} + +// Run the example +main().catch(console.error); diff --git a/js/examples/03-producer-consumer/README.md b/js/examples/03-producer-consumer/README.md new file mode 100644 index 0000000..f2979c8 --- /dev/null +++ b/js/examples/03-producer-consumer/README.md @@ -0,0 +1,97 @@ +# Producer-Consumer Example + +This example demonstrates the classic producer-consumer pattern using Links Queue as a work queue. Multiple producers generate tasks, and multiple consumers process them concurrently. + +## Key Concepts + +- **Work Queue**: A shared queue where producers enqueue work items for consumers +- **Competing Consumers**: Multiple consumers process items from the same queue +- **Acknowledgment**: Consumers confirm successful processing before items are removed +- **Load Balancing**: Work is naturally distributed among available consumers + +## Running the Example + +```bash +# Node.js +node examples/03-producer-consumer/index.js + +# Bun +bun examples/03-producer-consumer/index.js + +# Deno +deno run examples/03-producer-consumer/index.js +``` + +## Expected Output + +``` +=== Links Queue: Producer-Consumer Pattern === + +Configuration: + Producers: 2 + Consumers: 3 + Tasks per producer: 5 + Total tasks: 10 + +Producer 1: Starting, will create 5 tasks +Consumer 1: Starting +Consumer 2: Starting +Consumer 3: Starting +Producer 2: Starting, will create 5 tasks +Producer 1: Enqueued task 1-0 at position 0 +Consumer 1: Processing task 1-0 (type: task-type-1) +Producer 2: Enqueued task 2-0 at position 0 +Consumer 2: Processing task 2-0 (type: task-type-1) +... + +[Monitor] Queue depth: 3, Enqueued: 8, Dequeued: 5, Acked: 3, In-flight: 2 + +... + +--- All producers finished --- + +... + +=== Final Results === +Total tasks enqueued: 10 +Total tasks dequeued: 10 +Total tasks acknowledged: 10 +Tasks processed per consumer: 4, 3, 3 +Total processed: 10 + +=== Producer-Consumer Complete! === +``` + +## What This Example Shows + +1. **Multiple Producers**: Two producers creating tasks concurrently +2. **Multiple Consumers**: Three consumers processing tasks in parallel +3. **Fair Distribution**: Work is distributed among available consumers +4. **Monitoring**: Real-time queue statistics during processing +5. **Graceful Shutdown**: Waiting for queue to drain before stopping + +## Pattern Details + +### Task Structure + +Tasks are represented as links: + +- `id`: Unique task identifier +- `source`: Task type/category for routing or prioritization +- `target`: Task payload (can be any serializable data) + +### Processing Flow + +1. **Producer** creates a task link and enqueues it +2. **Consumer** dequeues a task (task becomes "in-flight") +3. Consumer processes the task +4. Consumer acknowledges the task (removes from in-flight tracking) +5. If acknowledgment fails, task can be requeued after visibility timeout + +### Use Cases + +- Background job processing +- Task distribution across workers +- Request buffering and load leveling +- Microservice communication +- Event processing pipelines diff --git a/js/examples/03-producer-consumer/index.js b/js/examples/03-producer-consumer/index.js new file mode 100644 index 0000000..1cecfa3 --- /dev/null +++ b/js/examples/03-producer-consumer/index.js @@ -0,0 +1,175 @@ +/** + * Producer-Consumer Example for links-queue + * + * This example demonstrates the classic producer-consumer pattern using + * Links Queue as a work queue. Multiple producers generate tasks and + * multiple consumers process them concurrently. + * + * Run with any runtime: + * - Node.js: node examples/03-producer-consumer/index.js + * - Bun: bun examples/03-producer-consumer/index.js + * - Deno: deno run examples/03-producer-consumer/index.js + */ + +import { + createLink, + MemoryLinkStore, + LinksQueue, + delay, +} from '../../src/index.js'; + +// Simulated processing time (50-200ms) +const simulateWork = () => delay(Math.floor(Math.random() * 150) + 50); + +/** + * Producer: generates tasks and enqueues them + */ +async function producer(queue, producerId, taskCount) { + console.log( + `Producer ${producerId}: Starting, will create ${taskCount} tasks` + ); + + for (let i = 0; i < taskCount; i++) { + // Create a task link + // source = task type/category + // target = task payload + const taskLink = createLink( + `${producerId}-${i}`, // unique task ID + `task-type-${(i % 3) + 1}`, // task type (1, 2, or 3) + { data: `payload-${producerId}-${i}`, priority: i % 5 } // task data + ); + + const result = await queue.enqueue(taskLink); + console.log( + `Producer ${producerId}: Enqueued task ${taskLink.id} at position ${result.position}` + ); + + // Small delay between producing tasks + await delay(10); + } + + console.log(`Producer ${producerId}: Finished producing`); +} + +/** + * Consumer: processes tasks from the queue + */ +async function consumer(queue, consumerId, stopSignal) { + console.log(`Consumer ${consumerId}: Starting`); + let processed = 0; + + while (!stopSignal.stopped) { + const task = await queue.dequeue(); + + if (task === null) { + // No tasks available, wait a bit + await delay(50); + continue; + } + + console.log( + `Consumer ${consumerId}: Processing task ${task.id} (type: ${task.source})` + ); + + // Simulate processing work + await simulateWork(); + + // Acknowledge successful processing + await queue.acknowledge(task.id); + processed++; + console.log(`Consumer ${consumerId}: Completed task ${task.id}`); + } + + console.log( + `Consumer ${consumerId}: Stopped after processing ${processed} tasks` + ); + return processed; +} + +/** + * Monitor: periodically reports queue status + */ +async function monitor(queue, stopSignal) { + while (!stopSignal.stopped) { + const stats = queue.getStats(); + console.log( + `\n[Monitor] Queue depth: ${stats.depth}, ` + + `Enqueued: ${stats.enqueued}, Dequeued: ${stats.dequeued}, ` + + `Acked: ${stats.acknowledged}, In-flight: ${stats.inFlight}\n` + ); + await delay(500); + } +} + +async function main() { + console.log('=== Links Queue: Producer-Consumer Pattern ===\n'); + + // Create queue + const store = new MemoryLinkStore(); + const queue = new LinksQueue({ + name: 'work-queue', + store, + options: { + visibilityTimeout: 60, + retryLimit: 3, + }, + }); + + // Stop signal for consumers + const stopSignal = { stopped: false }; + + // Configuration + const PRODUCER_COUNT = 2; + const CONSUMER_COUNT = 3; + const TASKS_PER_PRODUCER = 5; + + console.log(`Configuration:`); + console.log(` Producers: ${PRODUCER_COUNT}`); + console.log(` Consumers: ${CONSUMER_COUNT}`); + console.log(` Tasks per producer: ${TASKS_PER_PRODUCER}`); + console.log(` Total tasks: ${PRODUCER_COUNT * TASKS_PER_PRODUCER}\n`); + + // Start monitor in background (intentionally not awaited) + monitor(queue, stopSignal); + + // Start consumers + const consumerPromises = []; + for (let i = 1; i <= CONSUMER_COUNT; i++) { + consumerPromises.push(consumer(queue, i, stopSignal)); + } + + // Start producers + const producerPromises = []; + for (let i = 1; i <= PRODUCER_COUNT; i++) { + producerPromises.push(producer(queue, i, TASKS_PER_PRODUCER)); + } + + // Wait for all producers to finish + await Promise.all(producerPromises); + console.log('\n--- All producers finished ---\n'); + + // Wait for queue to drain + while (queue.getStats().depth > 0 || queue.getStats().inFlight > 0) { + await delay(100); + } + + // Stop consumers + stopSignal.stopped = true; + await delay(100); // Allow consumers to exit + + // Final results + const totalProcessed = await Promise.all(consumerPromises); + const stats = queue.getStats(); + + console.log('\n=== Final Results ==='); + console.log(`Total tasks enqueued: ${stats.enqueued}`); + console.log(`Total tasks dequeued: ${stats.dequeued}`); + console.log(`Total tasks acknowledged: ${stats.acknowledged}`); + console.log(`Tasks processed per consumer: ${totalProcessed.join(', ')}`); + console.log(`Total processed: ${totalProcessed.reduce((a, b) => a + b, 0)}`); + + console.log('\n=== Producer-Consumer Complete! ==='); +} + +// Run the example +main().catch(console.error); diff --git a/js/examples/04-deduplication/README.md b/js/examples/04-deduplication/README.md new file mode 100644 index 0000000..a238d3e --- /dev/null +++ b/js/examples/04-deduplication/README.md @@ -0,0 +1,123 @@ +# Deduplication Example + +This example demonstrates automatic link deduplication - one of the unique features of Links Queue. When you create links with identical source and target, the system automatically returns the existing link instead of creating duplicates. + +## Key Concepts + +- **Automatic Deduplication**: Links with identical source and target share the same ID +- **Content-Addressed Storage**: Link identity is based on content, not creation order +- **Type-Aware**: Different types (number, string, bigint) are tracked separately +- **Universal Links Exception**: Links with values are not deduplicated + +## Running the Example + +```bash +# Node.js +node examples/04-deduplication/index.js + +# Bun +bun examples/04-deduplication/index.js + +# Deno +deno run examples/04-deduplication/index.js +``` + +## Expected Output + +``` +=== Links Queue: Deduplication === + +--- Part 1: Basic Deduplication --- + +Created link 1: { id: 1, source: 'hello', target: 'world' } + ID: 1 + +Created link 2 (same source/target): { id: 1, source: 'hello', target: 'world' } + ID: 1 + +Are they the same link? true +Total links in store: 1 + +Created link 3 (different target): { id: 2, source: 'hello', target: 'universe' } + ID: 2 +Total links after link3: 2 + +--- Part 2: Deduplication with Different Types --- + +Number links deduplicated: true +String links deduplicated: true +BigInt links deduplicated: true +Number vs String (different): true +Total unique links: 5 + +--- Part 3: Nested Link Deduplication --- + +Inner links are same: true +Outer links with same nested source are same: true +Total links (1 inner + 1 outer): 2 + +--- Part 4: Universal Links (No Deduplication) --- + +Universal link 1: { id: 1, source: 'subject', target: 'predicate', values: ['object1'] } +Universal link 2: { id: 2, source: 'subject', target: 'predicate', values: ['object1'] } +Same ID? false +Total universal links: 2 + +--- Part 5: Practical Use Case - Event Deduplication --- + +Processing events with deduplication: + ENQUEUED: user_login - user123 (id: 1) + SKIPPED (duplicate): user_login - user123 + ENQUEUED: user_logout - user123 (id: 2) + ENQUEUED: user_login - user456 (id: 3) + SKIPPED (duplicate): user_login - user123 + +Queue stats: + Events in queue: 3 + Unique events: 3 + Duplicates filtered: 2 + +--- Part 6: Pattern-Based Duplicate Check --- + +Existing API calls: 3 +API call to /users already tracked (id: 1) + +=== Deduplication Complete! === +``` + +## What This Example Shows + +1. **Basic Deduplication**: Same source+target = same link ID +2. **Type Awareness**: Number 1 vs String "1" are different values +3. **Nested Deduplication**: Deduplication works with nested links +4. **Universal Links**: Links with values are NOT deduplicated (by design) +5. **Event Deduplication**: Practical pattern for filtering duplicate events +6. **Existence Checking**: Using patterns to check before creating + +## Benefits of Deduplication + +### Memory Efficiency + +- No duplicate data stored +- Constant-time lookup for existing links +- Natural data normalization + +### Idempotent Operations + +- Creating the same link twice is safe +- No need for external deduplication logic +- Simplifies retry logic + +### Graph Consistency + +- Edge relationships are unique +- Natural prevention of duplicate relationships +- Consistent graph structure + +## Use Cases + +- **Event Processing**: Automatically filter duplicate events +- **API Idempotency**: Ensure operations are safe to retry +- **Graph Building**: Prevent duplicate edges in a graph +- **Caching**: Content-addressed caching of relationships +- **Data Normalization**: Automatic structural sharing diff --git a/js/examples/04-deduplication/index.js b/js/examples/04-deduplication/index.js new file mode 100644 index 0000000..3debaa2 --- /dev/null +++ b/js/examples/04-deduplication/index.js @@ -0,0 +1,206 @@ +/** + * Deduplication Example for links-queue + * + * This example demonstrates automatic link deduplication - one of the + * unique features of Links Queue. When you create links with identical + * source and target, the system automatically returns the existing link + * instead of creating duplicates. + * + * Run with any runtime: + * - Node.js: node examples/04-deduplication/index.js + * - Bun: bun examples/04-deduplication/index.js + * - Deno: deno run examples/04-deduplication/index.js + */ + +import { MemoryLinkStore, LinksQueue, createLink } from '../../src/index.js'; + +// Part 1: Basic Deduplication +async function demoBasicDeduplication() { + console.log('--- Part 1: Basic Deduplication ---\n'); + + const store = new MemoryLinkStore(); + + // Create a link + const link1 = await store.create('hello', 'world'); + console.log('Created link 1:', link1); + console.log(' ID:', link1.id); + + // Create another link with the same source and target + const link2 = await store.create('hello', 'world'); + console.log('\nCreated link 2 (same source/target):', link2); + console.log(' ID:', link2.id); + + // They are the same link! + console.log('\nAre they the same link?', link1.id === link2.id); + + // Total count should be 1 + console.log('Total links in store:', await store.count()); + + // Different source/target creates a new link + const link3 = await store.create('hello', 'universe'); + console.log('\nCreated link 3 (different target):', link3); + console.log(' ID:', link3.id); + console.log('Total links after link3:', await store.count()); +} + +// Part 2: Type-aware Deduplication +async function demoTypeDeduplication() { + console.log('\n--- Part 2: Deduplication with Different Types ---\n'); + + const store = new MemoryLinkStore(); + + // Number IDs + const numLink1 = await store.create(1, 2); + const numLink2 = await store.create(1, 2); + console.log('Number links deduplicated:', numLink1.id === numLink2.id); + + // String IDs + const strLink1 = await store.create('a', 'b'); + const strLink2 = await store.create('a', 'b'); + console.log('String links deduplicated:', strLink1.id === strLink2.id); + + // BigInt IDs + const bigLink1 = await store.create(1n, 2n); + const bigLink2 = await store.create(1n, 2n); + console.log('BigInt links deduplicated:', bigLink1.id === bigLink2.id); + + // Type matters! Number 1 vs String "1" are different + const num1 = await store.create(1, 2); + const str1 = await store.create('1', '2'); + console.log('Number vs String (different):', num1.id !== str1.id); + + console.log('Total unique links:', await store.count()); +} + +// Part 3: Nested Link Deduplication +async function demoNestedDeduplication() { + console.log('\n--- Part 3: Nested Link Deduplication ---\n'); + + const store = new MemoryLinkStore(); + + // Create inner links + const inner1 = await store.create(10, 20); + const inner2 = await store.create(10, 20); + console.log('Inner links are same:', inner1.id === inner2.id); + + // Create outer links using the inner link as source + const outer1 = await store.create(inner1, 30); + const outer2 = await store.create(inner2, 30); + console.log( + 'Outer links with same nested source are same:', + outer1.id === outer2.id + ); + console.log('Total links (1 inner + 1 outer):', await store.count()); +} + +// Part 4: Universal Links (No Deduplication) +async function demoUniversalLinks() { + console.log('\n--- Part 4: Universal Links (No Deduplication) ---\n'); + + const store = new MemoryLinkStore(); + + // Universal links with values are NOT deduplicated + const universal1 = await store.createWithValues('subject', 'predicate', [ + 'object1', + ]); + const universal2 = await store.createWithValues('subject', 'predicate', [ + 'object1', + ]); + + console.log('Universal link 1:', universal1); + console.log('Universal link 2:', universal2); + console.log('Same ID?', universal1.id === universal2.id); + console.log('Total universal links:', await store.count()); +} + +// Part 5: Event Deduplication Use Case +async function demoEventDeduplication() { + console.log('\n--- Part 5: Practical Use Case - Event Deduplication ---\n'); + + const eventStore = new MemoryLinkStore(); + const eventQueue = new LinksQueue({ name: 'events', store: eventStore }); + + const events = [ + { type: 'user_login', userId: 'user123' }, + { type: 'user_login', userId: 'user123' }, // Duplicate! + { type: 'user_logout', userId: 'user123' }, + { type: 'user_login', userId: 'user456' }, + { type: 'user_login', userId: 'user123' }, // Duplicate! + ]; + + console.log('Processing events with deduplication:'); + + const processedIds = new Set(); + for (const event of events) { + const eventLink = await eventStore.create(event.type, event.userId); + + if (processedIds.has(eventLink.id)) { + console.log(` SKIPPED (duplicate): ${event.type} - ${event.userId}`); + continue; + } + + const completeLink = createLink( + eventLink.id, + eventLink.source, + eventLink.target + ); + await eventQueue.enqueue(completeLink); + processedIds.add(eventLink.id); + console.log( + ` ENQUEUED: ${event.type} - ${event.userId} (id: ${eventLink.id})` + ); + } + + console.log('\nQueue stats:'); + const stats = eventQueue.getStats(); + console.log(` Events in queue: ${stats.depth}`); + console.log(` Unique events: ${processedIds.size}`); + console.log(` Duplicates filtered: ${events.length - processedIds.size}`); +} + +// Part 6: Pattern-Based Duplicate Check +async function demoPatternCheck() { + console.log('\n--- Part 6: Pattern-Based Duplicate Check ---\n'); + + const store = new MemoryLinkStore(); + + await store.create('api-call', '/users'); + await store.create('api-call', '/products'); + await store.create('api-call', '/orders'); + await store.create('database-query', 'SELECT * FROM users'); + + const existingApiCalls = await store.find({ source: 'api-call' }); + console.log('Existing API calls:', existingApiCalls.length); + + const newEndpoint = '/users'; + const existing = await store.find({ + source: 'api-call', + target: newEndpoint, + }); + + if (existing.length > 0) { + console.log( + `API call to ${newEndpoint} already tracked (id: ${existing[0].id})` + ); + } else { + const newLink = await store.create('api-call', newEndpoint); + console.log(`New API call tracked: ${newLink.id}`); + } +} + +// Main function +async function main() { + console.log('=== Links Queue: Deduplication ===\n'); + + await demoBasicDeduplication(); + await demoTypeDeduplication(); + await demoNestedDeduplication(); + await demoUniversalLinks(); + await demoEventDeduplication(); + await demoPatternCheck(); + + console.log('\n=== Deduplication Complete! ==='); +} + +// Run the example +main().catch(console.error); diff --git a/rust/Cargo.toml b/rust/Cargo.toml index a278e01..5db8004 100644 --- a/rust/Cargo.toml +++ b/rust/Cargo.toml @@ -19,6 +19,22 @@ path = "src/lib.rs" name = "links-queue" path = "src/main.rs" +[[example]] +name = "01-hello-world" +path = "examples/01-hello-world/main.rs" + +[[example]] +name = "02-link-relationships" +path = "examples/02-link-relationships/main.rs" + +[[example]] +name = "03-producer-consumer" +path = "examples/03-producer-consumer/main.rs" + +[[example]] +name = "04-deduplication" +path = "examples/04-deduplication/main.rs" + [dependencies] tokio = { version = "1.0", features = ["rt-multi-thread", "macros", "time", "process", "io-util", "sync", "net", "signal"] } diff --git a/rust/examples/01-hello-world/README.md b/rust/examples/01-hello-world/README.md new file mode 100644 index 0000000..18c84cc --- /dev/null +++ b/rust/examples/01-hello-world/README.md @@ -0,0 +1,61 @@ +# Hello World Example (Rust) + +This example demonstrates the most basic usage of Links Queue: simple enqueue and dequeue operations. + +## Key Concepts + +- **Link**: The fundamental data unit with `id`, `source`, and `target` fields +- **MemoryQueue**: An in-memory FIFO queue that holds links for processing +- **Enqueue/Dequeue**: Standard queue operations to add and remove items +- **Acknowledge**: Confirm that a dequeued item was processed successfully + +## Running the Example + +```bash +cargo run --example 01-hello-world +``` + +## Expected Output + +``` +=== Links Queue: Hello World === + +Created links: + Greeting: Link { id: 1, source: Id(100), target: Id(200), values: None } + Message: Link { id: 2, source: Id(300), target: Id(400), values: None } + Data: Link { id: 3, source: Id(42), target: Id(100), values: None } + +--- Enqueuing links --- +Enqueued link 1 at position 0 +Enqueued link 2 at position 1 +Enqueued link 3 at position 2 + +Queue stats after enqueueing: + Depth: 3 + Enqueued: 3 + +--- Dequeuing and processing --- +Processing: 100 -> 200 +Acknowledged link 1 +Processing: 300 -> 400 +Acknowledged link 2 +Processing: 42 -> 100 +Acknowledged link 3 + +Final queue stats: + Depth: 0 + Enqueued: 3 + Dequeued: 3 + Acknowledged: 3 + +Dequeue from empty queue: None + +=== Hello World Complete! === +``` + +## What This Example Shows + +1. **Creating Links**: Links use numeric IDs for source and target in Rust +2. **Queue Setup**: How to create a `MemoryQueue` with `QueueOptions` +3. **FIFO Processing**: Items are processed in the order they were enqueued +4. **Acknowledgment Pattern**: The dequeue-process-acknowledge workflow for reliable processing diff --git a/rust/examples/01-hello-world/main.rs b/rust/examples/01-hello-world/main.rs new file mode 100644 index 0000000..7bf3752 --- /dev/null +++ b/rust/examples/01-hello-world/main.rs @@ -0,0 +1,97 @@ +//! Hello World Example for links-queue +//! +//! This example demonstrates basic enqueue/dequeue operations using +//! the links-queue library. It shows how to: +//! - Create a queue +//! - Enqueue links (messages) +//! - Dequeue links +//! - Acknowledge processed items +//! +//! Run with: `cargo run --example 01-hello-world` + +use links_queue::{Link, LinkRef, MemoryQueue, Queue, QueueOptions}; + +#[tokio::main] +async fn main() -> Result<(), Box> { + println!("=== Links Queue: Hello World ===\n"); + + // Create a queue with basic configuration + let queue = MemoryQueue::::new( + "hello-world", + QueueOptions::new() + .with_visibility_timeout(30) + .with_retry_limit(3), + ); + + // Create some links to enqueue + // Links are the fundamental data unit - they have id, source, and target + let greeting_link = Link::new(1, LinkRef::Id(100), LinkRef::Id(200)); // "hello" -> "world" as IDs + let message_link = Link::new(2, LinkRef::Id(300), LinkRef::Id(400)); // "links" -> "queue" + let data_link = Link::new(3, LinkRef::Id(42), LinkRef::Id(100)); // numeric data + + println!("Created links:"); + println!(" Greeting: {greeting_link:?}"); + println!(" Message: {message_link:?}"); + println!(" Data: {data_link:?}"); + + // Enqueue links + println!("\n--- Enqueuing links ---"); + let result1 = queue.enqueue(greeting_link.clone()).await?; + println!("Enqueued link 1 at position {}", result1.position); + + let result2 = queue.enqueue(message_link.clone()).await?; + println!("Enqueued link 2 at position {}", result2.position); + + let result3 = queue.enqueue(data_link.clone()).await?; + println!("Enqueued link 3 at position {}", result3.position); + + // Check queue stats + let stats = queue.stats(); + println!("\nQueue stats after enqueueing:"); + println!(" Depth: {}", stats.depth); + println!(" Enqueued: {}", stats.enqueued); + + // Dequeue and process links + println!("\n--- Dequeuing and processing ---"); + + // Dequeue first link + if let Some(item) = queue.dequeue().await? { + println!("Processing: {} -> {}", item.source_id(), item.target_id()); + // Acknowledge successful processing + queue.acknowledge(item.id).await?; + println!("Acknowledged link {}", item.id); + } + + // Dequeue second link + if let Some(item) = queue.dequeue().await? { + println!("Processing: {} -> {}", item.source_id(), item.target_id()); + queue.acknowledge(item.id).await?; + println!("Acknowledged link {}", item.id); + } + + // Dequeue third link + if let Some(item) = queue.dequeue().await? { + println!("Processing: {} -> {}", item.source_id(), item.target_id()); + queue.acknowledge(item.id).await?; + println!("Acknowledged link {}", item.id); + } + + // Final stats + let final_stats = queue.stats(); + println!("\nFinal queue stats:"); + println!(" Depth: {}", final_stats.depth); + println!(" Enqueued: {}", final_stats.enqueued); + println!(" Dequeued: {}", final_stats.dequeued); + println!(" Acknowledged: {}", final_stats.acknowledged); + + // Try to dequeue from empty queue + let empty = queue.dequeue().await?; + println!( + "\nDequeue from empty queue: {}", + if empty.is_none() { "None" } else { "Some" } + ); + + println!("\n=== Hello World Complete! ==="); + + Ok(()) +} diff --git a/rust/examples/02-link-relationships/README.md b/rust/examples/02-link-relationships/README.md new file mode 100644 index 0000000..117e0cf --- /dev/null +++ b/rust/examples/02-link-relationships/README.md @@ -0,0 +1,92 @@ +# Link Relationships Example (Rust) + +This example demonstrates nested links and graph structures - the unique link-based data model that distinguishes Links Queue from traditional message queues. + +## Key Concepts + +- **Links as Relations**: Each link represents a directed relationship from source to target +- **Nested Links**: Source and target can themselves be links, enabling hierarchical data +- **Universal Links**: Links with additional values for n-ary relationships +- **Graph Structures**: Multiple links form a graph that can be traversed and queried + +## Running the Example + +```bash +cargo run --example 02-link-relationships +``` + +## Expected Output + +``` +=== Links Queue: Link Relationships === + +--- Part 1: Basic Links as Relations --- + +Alice knows Bob: Link { id: 100, source: Id(1), target: Id(2), values: None } +Alice person link: Link { id: 101, source: Id(1000), target: Id(1), values: None } +Alice name link: Link { id: 102, source: Link(...), target: Id(1001), values: None } + +--- Part 2: Nested Links (Recursive Structures) --- + +Nested link (statement about relationship): + Base: Link { id: 10, source: Id(1), target: Id(2), values: None } + Meta: Link { id: 11, source: Link(...), target: Id(2024), values: None } + Established relationship: 1 -> 2 + In year: 2024 + +--- Part 3: Universal Links (Multiple Values) --- + +Universal link (n-ary relation): Link { id: 200, source: Id(10), target: Id(20), values: Some([...]) } + Subject: 10 + Predicate: 20 + Has values: true + Values: [30, 99, 40] + +--- Part 4: Graph Structures --- + +Social graph edges: + 1 -> 2 + 2 -> 3 + 1 -> 3 + 3 -> 1 + +Alice (1) is connected to: + 2 + 3 + +Charlie (3) receives connections from: + 2 + 1 + +--- Part 5: Processing Graph Operations via Queue --- + +Queued: ADD_EDGE 100->200 +Queued: ADD_EDGE 200->300 +Queued: QUERY node 100 +Queued: REMOVE_EDGE 100->200 + +Processing operations: + ADD: 100 -> 200 + ADD: 200 -> 300 + QUERY: from node 100 + REMOVE: 100 -> 200 + +=== Link Relationships Complete! === +``` + +## What This Example Shows + +1. **Binary Relations**: Simple A -> B relationships (social connections, edges) +2. **Typed Links**: Using numeric IDs to represent types and properties +3. **Meta-Links**: Links about other links (provenance, timestamps) +4. **N-ary Relations**: Universal links with values for complex relations +5. **Graph Building**: Constructing and querying graph structures +6. **Queue Integration**: Processing graph operations through a queue + +## Use Cases + +- Knowledge graphs and semantic data +- Social network relationships +- Provenance tracking (who said what, when) +- Complex event processing with metadata +- Graph database operations via message queue diff --git a/rust/examples/02-link-relationships/main.rs b/rust/examples/02-link-relationships/main.rs new file mode 100644 index 0000000..7306b83 --- /dev/null +++ b/rust/examples/02-link-relationships/main.rs @@ -0,0 +1,227 @@ +//! Link Relationships Example for links-queue +//! +//! This example demonstrates nested links and graph structures. +//! Links Queue uses a unique link-based data model where: +//! - Each link has an id, source, and target +//! - Source and target can be IDs or nested links +//! - This allows representing complex relationships and graph structures +//! +//! Run with: `cargo run --example 02-link-relationships` + +use links_queue::{Link, LinkPattern, LinkRef, MemoryQueue, Queue, QueueOptions}; + +#[tokio::main] +async fn main() -> Result<(), Box> { + println!("=== Links Queue: Link Relationships ===\n"); + + // ========================================================================= + // Part 1: Basic Links as Relations + // ========================================================================= + + println!("--- Part 1: Basic Links as Relations ---\n"); + + // Links naturally represent relationships (edges in a graph) + // Link structure: (id: source -> target) + + // Example: Representing "Alice knows Bob" using IDs + // Let's say: Alice = 1, Bob = 2 + let alice_id: u64 = 1; + let bob_id: u64 = 2; + let knows_relation = Link::new(100, LinkRef::Id(alice_id), LinkRef::Id(bob_id)); + println!("Alice knows Bob: {knows_relation:?}"); + + // Example: Representing a typed relationship + // "Person Alice has Name property" + // Type IDs: Person = 1000, Name = 1001 + let person_type: u64 = 1000; + let name_property: u64 = 1001; + let alice_person = Link::new(101, LinkRef::Id(person_type), LinkRef::Id(alice_id)); + let alice_name = Link::new(102, LinkRef::link(alice_person.clone()), LinkRef::Id(name_property)); + println!("Alice person link: {alice_person:?}"); + println!("Alice name link: {alice_name:?}"); + + // ========================================================================= + // Part 2: Nested Links (Recursive Structures) + // ========================================================================= + + println!("\n--- Part 2: Nested Links (Recursive Structures) ---\n"); + + // Links can contain other links as source or target + // This allows representing complex, hierarchical data + + // Example: Representing a statement about a relationship + // "The fact that Alice knows Bob was established in 2024" + + // First, the base relationship + let alice_knows_bob = Link::new(10, LinkRef::Id(alice_id), LinkRef::Id(bob_id)); + + // Then, a link about that relationship (meta-link) + let year_2024: u64 = 2024; + let established_in = Link::new(11, LinkRef::link(alice_knows_bob.clone()), LinkRef::Id(year_2024)); + println!("Nested link (statement about relationship):"); + println!(" Base: {alice_knows_bob:?}"); + println!(" Meta: {established_in:?}"); + + // Accessing nested data + if established_in.source.is_link() { + if let Some(inner) = established_in.source.as_link() { + println!( + " Established relationship: {} -> {}", + inner.source_id(), + inner.target_id() + ); + } + } + println!(" In year: {}", established_in.target_id()); + + // ========================================================================= + // Part 3: Universal Links (Multiple Values) + // ========================================================================= + + println!("\n--- Part 3: Universal Links (Multiple Values) ---\n"); + + // Universal links extend beyond binary relationships + // They can have additional values array for n-ary relations + + // Example: RDF-like triple with additional metadata + // (Subject, Predicate, Object1, Object2, ...) + // Let's represent: Earth(10) orbits(20) Sun(30) with confidence(99) + let subject: u64 = 10; // Earth + let predicate: u64 = 20; // orbits + let object: u64 = 30; // Sun + let confidence: u64 = 99; // confidence score + let source_id: u64 = 40; // data source + + let universal_link = Link::with_values( + 200, + LinkRef::Id(subject), + LinkRef::Id(predicate), + vec![LinkRef::Id(object), LinkRef::Id(confidence), LinkRef::Id(source_id)], + ); + println!("Universal link (n-ary relation): {universal_link:?}"); + println!(" Subject: {}", universal_link.source_id()); + println!(" Predicate: {}", universal_link.target_id()); + println!(" Has values: {}", universal_link.has_values()); + if let Some(values) = &universal_link.values { + println!( + " Values: {:?}", + values.iter().map(links_queue::LinkRef::get_id).collect::>() + ); + } + + // ========================================================================= + // Part 4: Graph Structures + // ========================================================================= + + println!("\n--- Part 4: Graph Structures ---\n"); + + // Building a simple social graph + // Users: alice=1, bob=2, charlie=3 + let charlie_id: u64 = 3; + + let graph = vec![ + Link::new(30, LinkRef::Id(alice_id), LinkRef::Id(bob_id)), // Alice -> Bob + Link::new(31, LinkRef::Id(bob_id), LinkRef::Id(charlie_id)), // Bob -> Charlie + Link::new(32, LinkRef::Id(alice_id), LinkRef::Id(charlie_id)), // Alice -> Charlie + Link::new(33, LinkRef::Id(charlie_id), LinkRef::Id(alice_id)), // Charlie -> Alice (cycle) + ]; + + println!("Social graph edges:"); + for link in &graph { + println!(" {} -> {}", link.source_id(), link.target_id()); + } + + // Find all connections from Alice using pattern matching + let alice_pattern = LinkPattern::with_source(LinkRef::Id(alice_id)); + let alice_connections: Vec<_> = graph.iter().filter(|l| alice_pattern.matches(l)).collect(); + println!("\nAlice (1) is connected to:"); + for link in &alice_connections { + println!(" {}", link.target_id()); + } + + // Find all connections to Charlie + let charlie_incoming_pattern = LinkPattern::with_target(LinkRef::Id(charlie_id)); + let charlie_incoming: Vec<_> = graph + .iter() + .filter(|l| charlie_incoming_pattern.matches(l)) + .collect(); + println!("\nCharlie (3) receives connections from:"); + for link in &charlie_incoming { + println!(" {}", link.source_id()); + } + + // ========================================================================= + // Part 5: Using Links in a Queue + // ========================================================================= + + println!("\n--- Part 5: Processing Graph Operations via Queue ---\n"); + + let queue = MemoryQueue::::new("graph-operations", QueueOptions::default()); + + // Define operation types as IDs + const ADD_EDGE: u64 = 1; + const REMOVE_EDGE: u64 = 2; + const QUERY: u64 = 3; + + // Queue up some graph operations as links + // Operation link: (op_id: operation_type -> edge_link) + let edge1 = Link::new(101, LinkRef::Id(100), LinkRef::Id(200)); // node100 -> node200 + let edge2 = Link::new(102, LinkRef::Id(200), LinkRef::Id(300)); // node200 -> node300 + + let operations = vec![ + Link::new(1001, LinkRef::Id(ADD_EDGE), LinkRef::link(edge1.clone())), + Link::new(1002, LinkRef::Id(ADD_EDGE), LinkRef::link(edge2.clone())), + Link::new(1003, LinkRef::Id(QUERY), LinkRef::Id(100)), // Query from node 100 + Link::new(1004, LinkRef::Id(REMOVE_EDGE), LinkRef::link(edge1)), + ]; + + // Enqueue operations + for op in &operations { + queue.enqueue(op.clone()).await?; + let op_type = match op.source_id() { + ADD_EDGE => "ADD_EDGE", + REMOVE_EDGE => "REMOVE_EDGE", + QUERY => "QUERY", + _ => "UNKNOWN", + }; + let op_data = if op.target.is_link() { + if let Some(edge) = op.target.as_link() { + format!("{}->{}", edge.source_id(), edge.target_id()) + } else { + "?".to_string() + } + } else { + format!("node {}", op.target_id()) + }; + println!("Queued: {op_type} {op_data}"); + } + + // Process operations + println!("\nProcessing operations:"); + while let Some(item) = queue.dequeue().await? { + let operation = item.source_id(); + match operation { + ADD_EDGE => { + if let Some(edge) = item.target.as_link() { + println!(" ADD: {} -> {}", edge.source_id(), edge.target_id()); + } + } + REMOVE_EDGE => { + if let Some(edge) = item.target.as_link() { + println!(" REMOVE: {} -> {}", edge.source_id(), edge.target_id()); + } + } + QUERY => { + println!(" QUERY: from node {}", item.target_id()); + } + _ => { + println!(" UNKNOWN: {operation}"); + } + } + queue.acknowledge(item.id).await?; + } + + println!("\n=== Link Relationships Complete! ==="); + + Ok(()) +} diff --git a/rust/examples/03-producer-consumer/README.md b/rust/examples/03-producer-consumer/README.md new file mode 100644 index 0000000..a5e3b8a --- /dev/null +++ b/rust/examples/03-producer-consumer/README.md @@ -0,0 +1,97 @@ +# Producer-Consumer Example (Rust) + +This example demonstrates the classic producer-consumer pattern using Links Queue as a work queue. Multiple producers generate tasks, and multiple consumers process them concurrently using Tokio async runtime. + +## Key Concepts + +- **Work Queue**: A shared queue where producers enqueue work items for consumers +- **Competing Consumers**: Multiple consumers process items from the same queue +- **Acknowledgment**: Consumers confirm successful processing before items are removed +- **Load Balancing**: Work is naturally distributed among available consumers +- **Async Concurrency**: Uses Tokio for efficient concurrent processing + +## Running the Example + +```bash +cargo run --example 03-producer-consumer +``` + +## Expected Output + +``` +=== Links Queue: Producer-Consumer Pattern === + +Configuration: + Producers: 2 + Consumers: 3 + Tasks per producer: 5 + Total tasks: 10 + +Producer 1: Starting, will create 5 tasks +Consumer 1: Starting +Consumer 2: Starting +Consumer 3: Starting +Producer 2: Starting, will create 5 tasks +Producer 1: Enqueued task 1000 at position 0 +Consumer 1: Processing task 1000 (type: 1) +Producer 2: Enqueued task 2000 at position 0 +Consumer 2: Processing task 2000 (type: 1) +... + +[Monitor] Queue depth: 3, Enqueued: 8, Dequeued: 5, Acked: 3, In-flight: 2 + +... + +--- All producers finished --- + +... + +=== Final Results === +Total tasks enqueued: 10 +Total tasks dequeued: 10 +Total tasks acknowledged: 10 +Tasks processed per consumer: [4, 3, 3] +Total processed: 10 + +=== Producer-Consumer Complete! === +``` + +## What This Example Shows + +1. **Multiple Producers**: Two producers creating tasks concurrently +2. **Multiple Consumers**: Three consumers processing tasks in parallel +3. **Fair Distribution**: Work is distributed among available consumers +4. **Monitoring**: Real-time queue statistics during processing +5. **Graceful Shutdown**: Waiting for queue to drain before stopping + +## Pattern Details + +### Task Structure + +Tasks are represented as links: +- `id`: Unique task identifier +- `source`: Task type/category (1, 2, or 3) +- `target`: Task payload data + +### Processing Flow + +1. **Producer** creates a task link and enqueues it +2. **Consumer** dequeues a task (task becomes "in-flight") +3. Consumer processes the task (simulated work) +4. Consumer acknowledges the task (removes from in-flight tracking) +5. If acknowledgment fails, task can be requeued after visibility timeout + +### Concurrency Model + +- Uses `Arc` for shared ownership across tasks +- `AtomicBool` for stop signal coordination +- `AtomicUsize` for thread-safe processed count +- Tokio tasks for async execution of producers and consumers + +## Use Cases + +- Background job processing +- Task distribution across workers +- Request buffering and load leveling +- Microservice communication +- Event processing pipelines diff --git a/rust/examples/03-producer-consumer/main.rs b/rust/examples/03-producer-consumer/main.rs new file mode 100644 index 0000000..47eb3ce --- /dev/null +++ b/rust/examples/03-producer-consumer/main.rs @@ -0,0 +1,221 @@ +//! Producer-Consumer Example for links-queue +//! +//! This example demonstrates the classic producer-consumer pattern using +//! Links Queue as a work queue. Multiple producers generate tasks and +//! multiple consumers process them concurrently. +//! +//! Run with: `cargo run --example 03-producer-consumer` + +use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; +use std::sync::Arc; +use std::time::Duration; +use tokio::time::sleep; + +use links_queue::{Link, LinkRef, MemoryQueue, Queue, QueueOptions}; + +/// Simulated processing time (50-200ms) +async fn simulate_work() { + let delay_ms = 50 + rand_u64() % 150; + sleep(Duration::from_millis(delay_ms)).await; +} + +/// Simple pseudo-random number generator (for demo purposes) +fn rand_u64() -> u64 { + use std::time::SystemTime; + let now = SystemTime::now() + .duration_since(SystemTime::UNIX_EPOCH) + .unwrap() + .as_nanos() as u64; + // Simple LCG + now.wrapping_mul(6364136223846793005).wrapping_add(1442695040888963407) % 256 +} + +/// Producer: generates tasks and enqueues them +async fn producer( + queue: Arc>, + producer_id: u32, + task_count: u32, +) -> Result<(), Box> { + println!("Producer {producer_id}: Starting, will create {task_count} tasks"); + + for i in 0..task_count { + // Create a task link + // id = unique task ID + // source = task type (1, 2, or 3) + // target = task payload (producer_id * 1000 + task_number) + let task_id = u64::from(producer_id) * 1000 + u64::from(i); + let task_type = (i % 3) + 1; // task type 1, 2, or 3 + let payload = u64::from(producer_id) * 10000 + u64::from(i); + + let task_link = Link::new(task_id, LinkRef::Id(u64::from(task_type)), LinkRef::Id(payload)); + + let result = queue.enqueue(task_link).await?; + println!( + "Producer {}: Enqueued task {} at position {}", + producer_id, task_id, result.position + ); + + // Small delay between producing tasks + sleep(Duration::from_millis(10)).await; + } + + println!("Producer {producer_id}: Finished producing"); + Ok(()) +} + +/// Consumer: processes tasks from the queue +async fn consumer( + queue: Arc>, + consumer_id: u32, + stop_signal: Arc, + processed_count: Arc, +) -> u32 { + println!("Consumer {consumer_id}: Starting"); + let mut processed = 0u32; + + while !stop_signal.load(Ordering::Relaxed) { + match queue.dequeue().await { + Ok(Some(task)) => { + println!( + "Consumer {}: Processing task {} (type: {})", + consumer_id, + task.id, + task.source_id() + ); + + // Simulate processing work + simulate_work().await; + + // Acknowledge successful processing + if let Err(e) = queue.acknowledge(task.id).await { + eprintln!("Consumer {}: Failed to acknowledge task {}: {}", consumer_id, task.id, e); + } else { + processed += 1; + processed_count.fetch_add(1, Ordering::Relaxed); + println!("Consumer {}: Completed task {}", consumer_id, task.id); + } + } + Ok(None) => { + // No tasks available, wait a bit + sleep(Duration::from_millis(50)).await; + } + Err(e) => { + eprintln!("Consumer {consumer_id}: Error dequeuing: {e}"); + sleep(Duration::from_millis(100)).await; + } + } + } + + println!("Consumer {consumer_id}: Stopped after processing {processed} tasks"); + processed +} + +/// Monitor: periodically reports queue status +async fn monitor(queue: Arc>, stop_signal: Arc) { + while !stop_signal.load(Ordering::Relaxed) { + let stats = queue.stats(); + println!( + "\n[Monitor] Queue depth: {}, Enqueued: {}, Dequeued: {}, Acked: {}, In-flight: {}\n", + stats.depth, stats.enqueued, stats.dequeued, stats.acknowledged, stats.in_flight + ); + sleep(Duration::from_millis(500)).await; + } +} + +#[tokio::main] +async fn main() -> Result<(), Box> { + println!("=== Links Queue: Producer-Consumer Pattern ===\n"); + + // Create queue + let queue = Arc::new(MemoryQueue::::new( + "work-queue", + QueueOptions::new() + .with_visibility_timeout(60) + .with_retry_limit(3), + )); + + // Stop signal for consumers + let stop_signal = Arc::new(AtomicBool::new(false)); + let total_processed = Arc::new(AtomicUsize::new(0)); + + // Configuration + const PRODUCER_COUNT: u32 = 2; + const CONSUMER_COUNT: u32 = 3; + const TASKS_PER_PRODUCER: u32 = 5; + + println!("Configuration:"); + println!(" Producers: {PRODUCER_COUNT}"); + println!(" Consumers: {CONSUMER_COUNT}"); + println!(" Tasks per producer: {TASKS_PER_PRODUCER}"); + println!(" Total tasks: {}\n", PRODUCER_COUNT * TASKS_PER_PRODUCER); + + // Start monitor + let monitor_queue = Arc::clone(&queue); + let monitor_stop = Arc::clone(&stop_signal); + let monitor_handle = tokio::spawn(async move { + monitor(monitor_queue, monitor_stop).await; + }); + + // Start consumers + let mut consumer_handles = Vec::new(); + for i in 1..=CONSUMER_COUNT { + let q = Arc::clone(&queue); + let stop = Arc::clone(&stop_signal); + let processed = Arc::clone(&total_processed); + consumer_handles.push(tokio::spawn(async move { + consumer(q, i, stop, processed).await + })); + } + + // Start producers + let mut producer_handles = Vec::new(); + for i in 1..=PRODUCER_COUNT { + let q = Arc::clone(&queue); + producer_handles.push(tokio::spawn(async move { + producer(q, i, TASKS_PER_PRODUCER).await + })); + } + + // Wait for all producers to finish + for handle in producer_handles { + handle.await??; + } + println!("\n--- All producers finished ---\n"); + + // Wait for queue to drain + let expected_total = (PRODUCER_COUNT * TASKS_PER_PRODUCER) as usize; + while total_processed.load(Ordering::Relaxed) < expected_total { + sleep(Duration::from_millis(100)).await; + } + + // Stop consumers and monitor + stop_signal.store(true, Ordering::Relaxed); + sleep(Duration::from_millis(100)).await; // Allow consumers to exit + + // Wait for consumer handles + let mut consumer_results = Vec::new(); + for handle in consumer_handles { + consumer_results.push(handle.await?); + } + + // Wait for monitor + let _ = monitor_handle.await; + + // Final results + let stats = queue.stats(); + println!("\n=== Final Results ==="); + println!("Total tasks enqueued: {}", stats.enqueued); + println!("Total tasks dequeued: {}", stats.dequeued); + println!("Total tasks acknowledged: {}", stats.acknowledged); + println!( + "Tasks processed per consumer: {consumer_results:?}" + ); + println!( + "Total processed: {}", + consumer_results.iter().map(|&x| x as usize).sum::() + ); + + println!("\n=== Producer-Consumer Complete! ==="); + + Ok(()) +} diff --git a/rust/examples/04-deduplication/README.md b/rust/examples/04-deduplication/README.md new file mode 100644 index 0000000..7fc8467 --- /dev/null +++ b/rust/examples/04-deduplication/README.md @@ -0,0 +1,115 @@ +# Deduplication Example (Rust) + +This example demonstrates automatic link deduplication - one of the unique features of Links Queue. When you create links with identical source and target, the system automatically returns the existing link instead of creating duplicates. + +## Key Concepts + +- **Automatic Deduplication**: Links with identical source and target share the same ID +- **Content-Addressed Storage**: Link identity is based on content, not creation order +- **Type-Generic**: Works with any numeric type implementing `LinkType` (u32, u64, usize, etc.) +- **Universal Links**: Deduplication also considers values array for universal links + +## Running the Example + +```bash +cargo run --example 04-deduplication +``` + +## Expected Output + +``` +=== Links Queue: Deduplication === + +--- Part 1: Basic Deduplication --- + +Created link 1: Link { id: 1, source: Id(100), target: Id(200), values: None } + ID: 1 + +Created link 2 (same source/target): Link { id: 1, source: Id(100), target: Id(200), values: None } + ID: 1 + +Are they the same link? true +Total links in store: 1 + +Created link 3 (different target): Link { id: 2, source: Id(100), target: Id(300), values: None } + ID: 2 +Total links after link3: 2 + +--- Part 2: Deduplication with Different ID Types --- + +u32 links deduplicated: true +u64 links deduplicated: true +usize links deduplicated: true +Total unique links in each store: u32=1, u64=1, usize=1 + +--- Part 3: Nested Link Deduplication --- + +Inner links are same: true +Outer links with same nested source are same: true +Total links (1 inner + 1 outer): 2 + +--- Part 4: Universal Links (Deduplication Includes Values) --- + +Universal link 1: Link { id: 1, source: Id(100), target: Id(200), values: Some([Id(300), Id(400)]) } +Universal link 2: Link { id: 1, source: Id(100), target: Id(200), values: Some([Id(300), Id(400)]) } +Same ID (deduplication includes values): true + +Universal link 3 (different values): Link { id: 2, source: Id(100), target: Id(200), values: Some([Id(300), Id(500)]) } +Different from link 1: true +Total universal links: 2 + +--- Part 5: Practical Use Case - Event Deduplication --- + +Processing events with deduplication: + ENQUEUED: LOGIN - user 123 (id: 1) + SKIPPED (duplicate): LOGIN - user 123 + ENQUEUED: LOGOUT - user 123 (id: 2) + ENQUEUED: LOGIN - user 456 (id: 3) + SKIPPED (duplicate): LOGIN - user 123 + +Queue stats: + Events in queue: 3 + Unique events: 3 + Duplicates filtered: 2 + +--- Part 6: Pattern-Based Duplicate Check --- + +Existing API calls: 3 +API call to endpoint 100 already tracked (id: 1) + +=== Deduplication Complete! === +``` + +## What This Example Shows + +1. **Basic Deduplication**: Same source+target = same link ID +2. **Type Flexibility**: Works with u32, u64, usize, etc. +3. **Nested Deduplication**: Deduplication works with nested links +4. **Universal Links**: Values are included in deduplication comparison +5. **Event Deduplication**: Practical pattern for filtering duplicate events +6. **Existence Checking**: Using patterns to check before creating + +## Benefits of Deduplication + +### Memory Efficiency +- No duplicate data stored +- Constant-time lookup for existing links +- Natural data normalization + +### Idempotent Operations +- Creating the same link twice is safe +- No need for external deduplication logic +- Simplifies retry logic + +### Graph Consistency +- Edge relationships are unique +- Natural prevention of duplicate relationships +- Consistent graph structure + +## Use Cases + +- **Event Processing**: Automatically filter duplicate events +- **API Idempotency**: Ensure operations are safe to retry +- **Graph Building**: Prevent duplicate edges in a graph +- **Caching**: Content-addressed caching of relationships +- **Data Normalization**: Automatic structural sharing diff --git a/rust/examples/04-deduplication/main.rs b/rust/examples/04-deduplication/main.rs new file mode 100644 index 0000000..6e96cee --- /dev/null +++ b/rust/examples/04-deduplication/main.rs @@ -0,0 +1,232 @@ +//! Deduplication Example for links-queue +//! +//! This example demonstrates automatic link deduplication - one of the +//! unique features of Links Queue. When you create links with identical +//! source and target, the system automatically returns the existing link +//! instead of creating duplicates. +//! +//! Run with: `cargo run --example 04-deduplication` + +use links_queue::{Link, LinkPattern, LinkRef, LinkStore, MemoryLinkStore, MemoryQueue, Queue, QueueOptions}; + +#[tokio::main] +async fn main() -> Result<(), Box> { + println!("=== Links Queue: Deduplication ===\n"); + + // ========================================================================= + // Part 1: Basic Deduplication in MemoryLinkStore + // ========================================================================= + + println!("--- Part 1: Basic Deduplication ---\n"); + + let mut store = MemoryLinkStore::::new(); + + // Create a link + let link1 = store.create(LinkRef::Id(100), LinkRef::Id(200))?; + println!("Created link 1: {link1:?}"); + println!(" ID: {}", link1.id); + + // Create another link with the same source and target + let link2 = store.create(LinkRef::Id(100), LinkRef::Id(200))?; + println!("\nCreated link 2 (same source/target): {link2:?}"); + println!(" ID: {}", link2.id); + + // They are the same link! + println!("\nAre they the same link? {}", link1.id == link2.id); // true + + // Total count should be 1 + let count = store.total_count(); + println!("Total links in store: {count}"); // 1 + + // Different source/target creates a new link + let link3 = store.create(LinkRef::Id(100), LinkRef::Id(300))?; + println!("\nCreated link 3 (different target): {link3:?}"); + println!(" ID: {}", link3.id); // Different ID + + let final_count = store.total_count(); + println!("Total links after link3: {final_count}"); // 2 + + // ========================================================================= + // Part 2: Deduplication with Different ID Types + // ========================================================================= + + println!("\n--- Part 2: Deduplication with Different ID Types ---\n"); + + // u32 IDs + let mut store32 = MemoryLinkStore::::new(); + let u32_link1 = store32.create(LinkRef::Id(1), LinkRef::Id(2))?; + let u32_link2 = store32.create(LinkRef::Id(1), LinkRef::Id(2))?; + println!("u32 links deduplicated: {}", u32_link1.id == u32_link2.id); // true + + // u64 IDs + let mut store64 = MemoryLinkStore::::new(); + let u64_link1 = store64.create(LinkRef::Id(1), LinkRef::Id(2))?; + let u64_link2 = store64.create(LinkRef::Id(1), LinkRef::Id(2))?; + println!("u64 links deduplicated: {}", u64_link1.id == u64_link2.id); // true + + // usize IDs + let mut store_usize = MemoryLinkStore::::new(); + let usize_link1 = store_usize.create(LinkRef::Id(1), LinkRef::Id(2))?; + let usize_link2 = store_usize.create(LinkRef::Id(1), LinkRef::Id(2))?; + println!("usize links deduplicated: {}", usize_link1.id == usize_link2.id); // true + + println!("Total unique links in each store: u32={}, u64={}, usize={}", + store32.total_count(), + store64.total_count(), + store_usize.total_count() + ); + + // ========================================================================= + // Part 3: Nested Link Deduplication + // ========================================================================= + + println!("\n--- Part 3: Nested Link Deduplication ---\n"); + + let mut store3 = MemoryLinkStore::::new(); + + // Create inner links + let inner1 = store3.create(LinkRef::Id(10), LinkRef::Id(20))?; + let inner2 = store3.create(LinkRef::Id(10), LinkRef::Id(20))?; // Same, deduplicated + + println!("Inner links are same: {}", inner1.id == inner2.id); // true + + // Create outer links using the inner link as source + let outer1 = store3.create(LinkRef::link(inner1.clone()), LinkRef::Id(30))?; + let outer2 = store3.create(LinkRef::link(inner2.clone()), LinkRef::Id(30))?; + + println!("Outer links with same nested source are same: {}", outer1.id == outer2.id); // true + println!("Total links (1 inner + 1 outer): {}", store3.total_count()); // 2 + + // ========================================================================= + // Part 4: Universal Links - Deduplication Includes Values + // ========================================================================= + + println!("\n--- Part 4: Universal Links (Deduplication Includes Values) ---\n"); + + let mut store4 = MemoryLinkStore::::new(); + + // Universal links with same values are deduplicated + let universal1 = store4.create_with_values( + LinkRef::Id(100), + LinkRef::Id(200), + vec![LinkRef::Id(300), LinkRef::Id(400)], + )?; + let universal2 = store4.create_with_values( + LinkRef::Id(100), + LinkRef::Id(200), + vec![LinkRef::Id(300), LinkRef::Id(400)], + )?; + + println!("Universal link 1: {universal1:?}"); + println!("Universal link 2: {universal2:?}"); + println!("Same ID (deduplication includes values): {}", universal1.id == universal2.id); // true + + // Different values = different link + let universal3 = store4.create_with_values( + LinkRef::Id(100), + LinkRef::Id(200), + vec![LinkRef::Id(300), LinkRef::Id(500)], // Different value! + )?; + println!("\nUniversal link 3 (different values): {universal3:?}"); + println!("Different from link 1: {}", universal1.id != universal3.id); // true + + println!("Total universal links: {}", store4.total_count()); // 2 + + // ========================================================================= + // Part 5: Practical Use Case - Event Deduplication + // ========================================================================= + + println!("\n--- Part 5: Practical Use Case - Event Deduplication ---\n"); + + let mut event_store = MemoryLinkStore::::new(); + let queue = MemoryQueue::::new("events", QueueOptions::default()); + + // Define event types + const USER_LOGIN: u64 = 1; + const USER_LOGOUT: u64 = 2; + + // Users + const USER_123: u64 = 123; + const USER_456: u64 = 456; + + // Simulate receiving duplicate events + let events = vec![ + (USER_LOGIN, USER_123), + (USER_LOGIN, USER_123), // Duplicate! + (USER_LOGOUT, USER_123), + (USER_LOGIN, USER_456), + (USER_LOGIN, USER_123), // Duplicate! + ]; + + println!("Processing events with deduplication:"); + + let mut processed_ids = std::collections::HashSet::new(); + for (event_type, user_id) in &events { + // Create a link representing the event + // Using source=type, target=userId creates unique event signatures + let event_link = event_store.create(LinkRef::Id(*event_type), LinkRef::Id(*user_id))?; + + // Check if we've already processed this exact event + if processed_ids.contains(&event_link.id) { + let event_name = if *event_type == USER_LOGIN { "LOGIN" } else { "LOGOUT" }; + println!(" SKIPPED (duplicate): {event_name} - user {user_id}"); + continue; + } + + // New event - enqueue and track + let complete_link = Link::new(event_link.id, event_link.source.clone(), event_link.target.clone()); + queue.enqueue(complete_link).await?; + processed_ids.insert(event_link.id); + let event_name = if *event_type == USER_LOGIN { "LOGIN" } else { "LOGOUT" }; + println!(" ENQUEUED: {} - user {} (id: {})", event_name, user_id, event_link.id); + } + + println!("\nQueue stats:"); + let stats = queue.stats(); + println!(" Events in queue: {}", stats.depth); + println!(" Unique events: {}", processed_ids.len()); + println!(" Duplicates filtered: {}", events.len() - processed_ids.len()); + + // ========================================================================= + // Part 6: Pattern-Based Deduplication Check + // ========================================================================= + + println!("\n--- Part 6: Pattern-Based Duplicate Check ---\n"); + + let mut store5 = MemoryLinkStore::::new(); + + // Create some links representing API calls + const API_CALL: u64 = 1; + const DB_QUERY: u64 = 2; + const USERS_ENDPOINT: u64 = 100; + const PRODUCTS_ENDPOINT: u64 = 101; + const ORDERS_ENDPOINT: u64 = 102; + + store5.create(LinkRef::Id(API_CALL), LinkRef::Id(USERS_ENDPOINT))?; + store5.create(LinkRef::Id(API_CALL), LinkRef::Id(PRODUCTS_ENDPOINT))?; + store5.create(LinkRef::Id(API_CALL), LinkRef::Id(ORDERS_ENDPOINT))?; + store5.create(LinkRef::Id(DB_QUERY), LinkRef::Id(USERS_ENDPOINT))?; + + // Check if a specific pattern exists (deduplication check) + let api_call_pattern = LinkPattern::with_source(LinkRef::Id(API_CALL)); + let existing_api_calls = store5.find(&api_call_pattern); + println!("Existing API calls: {}", existing_api_calls.len()); + + // Before creating a new link, check if it exists + let new_endpoint = USERS_ENDPOINT; // This already exists! + let existing = store5.find(&LinkPattern::with_source_target( + LinkRef::Id(API_CALL), + LinkRef::Id(new_endpoint), + )); + + if existing.is_empty() { + let new_link = store5.create(LinkRef::Id(API_CALL), LinkRef::Id(new_endpoint))?; + println!("New API call tracked: {}", new_link.id); + } else { + println!("API call to endpoint {} already tracked (id: {})", new_endpoint, existing[0].id); + } + + println!("\n=== Deduplication Complete! ==="); + + Ok(()) +} From a56a1bafa1268700c3882f1d7200373087d3db6f Mon Sep 17 00:00:00 2001 From: konard Date: Sun, 18 Jan 2026 22:16:51 +0100 Subject: [PATCH 3/5] Revert "Initial commit with task details" This reverts commit 6a516a051271df48a860f3d4d464f19ecaeaebc7. --- CLAUDE.md | 5 ----- 1 file changed, 5 deletions(-) delete mode 100644 CLAUDE.md diff --git a/CLAUDE.md b/CLAUDE.md deleted file mode 100644 index ed0d0f8..0000000 --- a/CLAUDE.md +++ /dev/null @@ -1,5 +0,0 @@ -Issue to solve: https://github.com/link-foundation/links-queue/issues/22 -Your prepared branch: issue-22-50458fb05c5c -Your prepared working directory: /tmp/gh-issue-solver-1768770069758 - -Proceed. From 64ef554493264d7db1dd434f845dc2e44b63988a Mon Sep 17 00:00:00 2001 From: konard Date: Mon, 19 Jan 2026 11:40:00 +0100 Subject: [PATCH 4/5] Fix Rust code formatting in examples Run cargo fmt to fix formatting issues that were causing the CI lint check to fail. Co-Authored-By: Claude Opus 4.5 --- rust/examples/02-link-relationships/main.rs | 27 +++++++--- rust/examples/03-producer-consumer/main.rs | 19 ++++--- rust/examples/04-deduplication/main.rs | 59 ++++++++++++++++----- 3 files changed, 80 insertions(+), 25 deletions(-) diff --git a/rust/examples/02-link-relationships/main.rs b/rust/examples/02-link-relationships/main.rs index 7306b83..bf7b002 100644 --- a/rust/examples/02-link-relationships/main.rs +++ b/rust/examples/02-link-relationships/main.rs @@ -36,7 +36,11 @@ async fn main() -> Result<(), Box> { let person_type: u64 = 1000; let name_property: u64 = 1001; let alice_person = Link::new(101, LinkRef::Id(person_type), LinkRef::Id(alice_id)); - let alice_name = Link::new(102, LinkRef::link(alice_person.clone()), LinkRef::Id(name_property)); + let alice_name = Link::new( + 102, + LinkRef::link(alice_person.clone()), + LinkRef::Id(name_property), + ); println!("Alice person link: {alice_person:?}"); println!("Alice name link: {alice_name:?}"); @@ -57,7 +61,11 @@ async fn main() -> Result<(), Box> { // Then, a link about that relationship (meta-link) let year_2024: u64 = 2024; - let established_in = Link::new(11, LinkRef::link(alice_knows_bob.clone()), LinkRef::Id(year_2024)); + let established_in = Link::new( + 11, + LinkRef::link(alice_knows_bob.clone()), + LinkRef::Id(year_2024), + ); println!("Nested link (statement about relationship):"); println!(" Base: {alice_knows_bob:?}"); println!(" Meta: {established_in:?}"); @@ -96,7 +104,11 @@ async fn main() -> Result<(), Box> { 200, LinkRef::Id(subject), LinkRef::Id(predicate), - vec![LinkRef::Id(object), LinkRef::Id(confidence), LinkRef::Id(source_id)], + vec![ + LinkRef::Id(object), + LinkRef::Id(confidence), + LinkRef::Id(source_id), + ], ); println!("Universal link (n-ary relation): {universal_link:?}"); println!(" Subject: {}", universal_link.source_id()); @@ -105,7 +117,10 @@ async fn main() -> Result<(), Box> { if let Some(values) = &universal_link.values { println!( " Values: {:?}", - values.iter().map(links_queue::LinkRef::get_id).collect::>() + values + .iter() + .map(links_queue::LinkRef::get_id) + .collect::>() ); } @@ -120,8 +135,8 @@ async fn main() -> Result<(), Box> { let charlie_id: u64 = 3; let graph = vec![ - Link::new(30, LinkRef::Id(alice_id), LinkRef::Id(bob_id)), // Alice -> Bob - Link::new(31, LinkRef::Id(bob_id), LinkRef::Id(charlie_id)), // Bob -> Charlie + Link::new(30, LinkRef::Id(alice_id), LinkRef::Id(bob_id)), // Alice -> Bob + Link::new(31, LinkRef::Id(bob_id), LinkRef::Id(charlie_id)), // Bob -> Charlie Link::new(32, LinkRef::Id(alice_id), LinkRef::Id(charlie_id)), // Alice -> Charlie Link::new(33, LinkRef::Id(charlie_id), LinkRef::Id(alice_id)), // Charlie -> Alice (cycle) ]; diff --git a/rust/examples/03-producer-consumer/main.rs b/rust/examples/03-producer-consumer/main.rs index 47eb3ce..b618f46 100644 --- a/rust/examples/03-producer-consumer/main.rs +++ b/rust/examples/03-producer-consumer/main.rs @@ -27,7 +27,9 @@ fn rand_u64() -> u64 { .unwrap() .as_nanos() as u64; // Simple LCG - now.wrapping_mul(6364136223846793005).wrapping_add(1442695040888963407) % 256 + now.wrapping_mul(6364136223846793005) + .wrapping_add(1442695040888963407) + % 256 } /// Producer: generates tasks and enqueues them @@ -47,7 +49,11 @@ async fn producer( let task_type = (i % 3) + 1; // task type 1, 2, or 3 let payload = u64::from(producer_id) * 10000 + u64::from(i); - let task_link = Link::new(task_id, LinkRef::Id(u64::from(task_type)), LinkRef::Id(payload)); + let task_link = Link::new( + task_id, + LinkRef::Id(u64::from(task_type)), + LinkRef::Id(payload), + ); let result = queue.enqueue(task_link).await?; println!( @@ -88,7 +94,10 @@ async fn consumer( // Acknowledge successful processing if let Err(e) = queue.acknowledge(task.id).await { - eprintln!("Consumer {}: Failed to acknowledge task {}: {}", consumer_id, task.id, e); + eprintln!( + "Consumer {}: Failed to acknowledge task {}: {}", + consumer_id, task.id, e + ); } else { processed += 1; processed_count.fetch_add(1, Ordering::Relaxed); @@ -207,9 +216,7 @@ async fn main() -> Result<(), Box> { println!("Total tasks enqueued: {}", stats.enqueued); println!("Total tasks dequeued: {}", stats.dequeued); println!("Total tasks acknowledged: {}", stats.acknowledged); - println!( - "Tasks processed per consumer: {consumer_results:?}" - ); + println!("Tasks processed per consumer: {consumer_results:?}"); println!( "Total processed: {}", consumer_results.iter().map(|&x| x as usize).sum::() diff --git a/rust/examples/04-deduplication/main.rs b/rust/examples/04-deduplication/main.rs index 6e96cee..21d8e25 100644 --- a/rust/examples/04-deduplication/main.rs +++ b/rust/examples/04-deduplication/main.rs @@ -7,7 +7,9 @@ //! //! Run with: `cargo run --example 04-deduplication` -use links_queue::{Link, LinkPattern, LinkRef, LinkStore, MemoryLinkStore, MemoryQueue, Queue, QueueOptions}; +use links_queue::{ + Link, LinkPattern, LinkRef, LinkStore, MemoryLinkStore, MemoryQueue, Queue, QueueOptions, +}; #[tokio::main] async fn main() -> Result<(), Box> { @@ -68,9 +70,13 @@ async fn main() -> Result<(), Box> { let mut store_usize = MemoryLinkStore::::new(); let usize_link1 = store_usize.create(LinkRef::Id(1), LinkRef::Id(2))?; let usize_link2 = store_usize.create(LinkRef::Id(1), LinkRef::Id(2))?; - println!("usize links deduplicated: {}", usize_link1.id == usize_link2.id); // true + println!( + "usize links deduplicated: {}", + usize_link1.id == usize_link2.id + ); // true - println!("Total unique links in each store: u32={}, u64={}, usize={}", + println!( + "Total unique links in each store: u32={}, u64={}, usize={}", store32.total_count(), store64.total_count(), store_usize.total_count() @@ -94,7 +100,10 @@ async fn main() -> Result<(), Box> { let outer1 = store3.create(LinkRef::link(inner1.clone()), LinkRef::Id(30))?; let outer2 = store3.create(LinkRef::link(inner2.clone()), LinkRef::Id(30))?; - println!("Outer links with same nested source are same: {}", outer1.id == outer2.id); // true + println!( + "Outer links with same nested source are same: {}", + outer1.id == outer2.id + ); // true println!("Total links (1 inner + 1 outer): {}", store3.total_count()); // 2 // ========================================================================= @@ -119,7 +128,10 @@ async fn main() -> Result<(), Box> { println!("Universal link 1: {universal1:?}"); println!("Universal link 2: {universal2:?}"); - println!("Same ID (deduplication includes values): {}", universal1.id == universal2.id); // true + println!( + "Same ID (deduplication includes values): {}", + universal1.id == universal2.id + ); // true // Different values = different link let universal3 = store4.create_with_values( @@ -152,10 +164,10 @@ async fn main() -> Result<(), Box> { // Simulate receiving duplicate events let events = vec![ (USER_LOGIN, USER_123), - (USER_LOGIN, USER_123), // Duplicate! + (USER_LOGIN, USER_123), // Duplicate! (USER_LOGOUT, USER_123), (USER_LOGIN, USER_456), - (USER_LOGIN, USER_123), // Duplicate! + (USER_LOGIN, USER_123), // Duplicate! ]; println!("Processing events with deduplication:"); @@ -168,24 +180,42 @@ async fn main() -> Result<(), Box> { // Check if we've already processed this exact event if processed_ids.contains(&event_link.id) { - let event_name = if *event_type == USER_LOGIN { "LOGIN" } else { "LOGOUT" }; + let event_name = if *event_type == USER_LOGIN { + "LOGIN" + } else { + "LOGOUT" + }; println!(" SKIPPED (duplicate): {event_name} - user {user_id}"); continue; } // New event - enqueue and track - let complete_link = Link::new(event_link.id, event_link.source.clone(), event_link.target.clone()); + let complete_link = Link::new( + event_link.id, + event_link.source.clone(), + event_link.target.clone(), + ); queue.enqueue(complete_link).await?; processed_ids.insert(event_link.id); - let event_name = if *event_type == USER_LOGIN { "LOGIN" } else { "LOGOUT" }; - println!(" ENQUEUED: {} - user {} (id: {})", event_name, user_id, event_link.id); + let event_name = if *event_type == USER_LOGIN { + "LOGIN" + } else { + "LOGOUT" + }; + println!( + " ENQUEUED: {} - user {} (id: {})", + event_name, user_id, event_link.id + ); } println!("\nQueue stats:"); let stats = queue.stats(); println!(" Events in queue: {}", stats.depth); println!(" Unique events: {}", processed_ids.len()); - println!(" Duplicates filtered: {}", events.len() - processed_ids.len()); + println!( + " Duplicates filtered: {}", + events.len() - processed_ids.len() + ); // ========================================================================= // Part 6: Pattern-Based Deduplication Check @@ -223,7 +253,10 @@ async fn main() -> Result<(), Box> { let new_link = store5.create(LinkRef::Id(API_CALL), LinkRef::Id(new_endpoint))?; println!("New API call tracked: {}", new_link.id); } else { - println!("API call to endpoint {} already tracked (id: {})", new_endpoint, existing[0].id); + println!( + "API call to endpoint {} already tracked (id: {})", + new_endpoint, existing[0].id + ); } println!("\n=== Deduplication Complete! ==="); From ed885cbc747f37ed1f2139f15e03ede89d9dfafb Mon Sep 17 00:00:00 2001 From: konard Date: Mon, 19 Jan 2026 11:45:22 +0100 Subject: [PATCH 5/5] Fix clippy warnings in Rust examples - Move const declarations to module level to satisfy clippy::items_after_statements lint - Use map_or_else instead of if let/else for Option handling - Add underscores to long numeric literals for readability - Allow clippy::cast_possible_truncation for intentional u128 to u64 cast - Rename store4 to store_universal to avoid similar_names lint Co-Authored-By: Claude Opus 4.5 --- rust/examples/02-link-relationships/main.rs | 19 +++++----- rust/examples/03-producer-consumer/main.rs | 16 +++++---- rust/examples/04-deduplication/main.rs | 39 +++++++++++---------- 3 files changed, 38 insertions(+), 36 deletions(-) diff --git a/rust/examples/02-link-relationships/main.rs b/rust/examples/02-link-relationships/main.rs index bf7b002..ef5c6e3 100644 --- a/rust/examples/02-link-relationships/main.rs +++ b/rust/examples/02-link-relationships/main.rs @@ -10,6 +10,11 @@ use links_queue::{Link, LinkPattern, LinkRef, MemoryQueue, Queue, QueueOptions}; +// Define operation types as IDs (moved to module level to satisfy clippy) +const ADD_EDGE: u64 = 1; +const REMOVE_EDGE: u64 = 2; +const QUERY: u64 = 3; + #[tokio::main] async fn main() -> Result<(), Box> { println!("=== Links Queue: Link Relationships ===\n"); @@ -173,11 +178,6 @@ async fn main() -> Result<(), Box> { let queue = MemoryQueue::::new("graph-operations", QueueOptions::default()); - // Define operation types as IDs - const ADD_EDGE: u64 = 1; - const REMOVE_EDGE: u64 = 2; - const QUERY: u64 = 3; - // Queue up some graph operations as links // Operation link: (op_id: operation_type -> edge_link) let edge1 = Link::new(101, LinkRef::Id(100), LinkRef::Id(200)); // node100 -> node200 @@ -200,11 +200,10 @@ async fn main() -> Result<(), Box> { _ => "UNKNOWN", }; let op_data = if op.target.is_link() { - if let Some(edge) = op.target.as_link() { - format!("{}->{}", edge.source_id(), edge.target_id()) - } else { - "?".to_string() - } + op.target.as_link().map_or_else( + || "?".to_string(), + |edge| format!("{}->{}", edge.source_id(), edge.target_id()), + ) } else { format!("node {}", op.target_id()) }; diff --git a/rust/examples/03-producer-consumer/main.rs b/rust/examples/03-producer-consumer/main.rs index b618f46..07f8c37 100644 --- a/rust/examples/03-producer-consumer/main.rs +++ b/rust/examples/03-producer-consumer/main.rs @@ -13,6 +13,11 @@ use tokio::time::sleep; use links_queue::{Link, LinkRef, MemoryQueue, Queue, QueueOptions}; +// Configuration constants (moved to module level to satisfy clippy) +const PRODUCER_COUNT: u32 = 2; +const CONSUMER_COUNT: u32 = 3; +const TASKS_PER_PRODUCER: u32 = 5; + /// Simulated processing time (50-200ms) async fn simulate_work() { let delay_ms = 50 + rand_u64() % 150; @@ -20,15 +25,17 @@ async fn simulate_work() { } /// Simple pseudo-random number generator (for demo purposes) +#[allow(clippy::cast_possible_truncation)] fn rand_u64() -> u64 { use std::time::SystemTime; + // Note: truncation from u128 is intentional for randomness let now = SystemTime::now() .duration_since(SystemTime::UNIX_EPOCH) .unwrap() .as_nanos() as u64; // Simple LCG - now.wrapping_mul(6364136223846793005) - .wrapping_add(1442695040888963407) + now.wrapping_mul(6_364_136_223_846_793_005) + .wrapping_add(1_442_695_040_888_963_407) % 256 } @@ -147,11 +154,6 @@ async fn main() -> Result<(), Box> { let stop_signal = Arc::new(AtomicBool::new(false)); let total_processed = Arc::new(AtomicUsize::new(0)); - // Configuration - const PRODUCER_COUNT: u32 = 2; - const CONSUMER_COUNT: u32 = 3; - const TASKS_PER_PRODUCER: u32 = 5; - println!("Configuration:"); println!(" Producers: {PRODUCER_COUNT}"); println!(" Consumers: {CONSUMER_COUNT}"); diff --git a/rust/examples/04-deduplication/main.rs b/rust/examples/04-deduplication/main.rs index 21d8e25..101b38b 100644 --- a/rust/examples/04-deduplication/main.rs +++ b/rust/examples/04-deduplication/main.rs @@ -11,6 +11,21 @@ use links_queue::{ Link, LinkPattern, LinkRef, LinkStore, MemoryLinkStore, MemoryQueue, Queue, QueueOptions, }; +// Event types (moved to module level to satisfy clippy) +const USER_LOGIN: u64 = 1; +const USER_LOGOUT: u64 = 2; + +// User IDs +const USER_123: u64 = 123; +const USER_456: u64 = 456; + +// API constants +const API_CALL: u64 = 1; +const DB_QUERY: u64 = 2; +const USERS_ENDPOINT: u64 = 100; +const PRODUCTS_ENDPOINT: u64 = 101; +const ORDERS_ENDPOINT: u64 = 102; + #[tokio::main] async fn main() -> Result<(), Box> { println!("=== Links Queue: Deduplication ===\n"); @@ -112,15 +127,15 @@ async fn main() -> Result<(), Box> { println!("\n--- Part 4: Universal Links (Deduplication Includes Values) ---\n"); - let mut store4 = MemoryLinkStore::::new(); + let mut store_universal = MemoryLinkStore::::new(); // Universal links with same values are deduplicated - let universal1 = store4.create_with_values( + let universal1 = store_universal.create_with_values( LinkRef::Id(100), LinkRef::Id(200), vec![LinkRef::Id(300), LinkRef::Id(400)], )?; - let universal2 = store4.create_with_values( + let universal2 = store_universal.create_with_values( LinkRef::Id(100), LinkRef::Id(200), vec![LinkRef::Id(300), LinkRef::Id(400)], @@ -134,7 +149,7 @@ async fn main() -> Result<(), Box> { ); // true // Different values = different link - let universal3 = store4.create_with_values( + let universal3 = store_universal.create_with_values( LinkRef::Id(100), LinkRef::Id(200), vec![LinkRef::Id(300), LinkRef::Id(500)], // Different value! @@ -142,7 +157,7 @@ async fn main() -> Result<(), Box> { println!("\nUniversal link 3 (different values): {universal3:?}"); println!("Different from link 1: {}", universal1.id != universal3.id); // true - println!("Total universal links: {}", store4.total_count()); // 2 + println!("Total universal links: {}", store_universal.total_count()); // 2 // ========================================================================= // Part 5: Practical Use Case - Event Deduplication @@ -153,14 +168,6 @@ async fn main() -> Result<(), Box> { let mut event_store = MemoryLinkStore::::new(); let queue = MemoryQueue::::new("events", QueueOptions::default()); - // Define event types - const USER_LOGIN: u64 = 1; - const USER_LOGOUT: u64 = 2; - - // Users - const USER_123: u64 = 123; - const USER_456: u64 = 456; - // Simulate receiving duplicate events let events = vec![ (USER_LOGIN, USER_123), @@ -226,12 +233,6 @@ async fn main() -> Result<(), Box> { let mut store5 = MemoryLinkStore::::new(); // Create some links representing API calls - const API_CALL: u64 = 1; - const DB_QUERY: u64 = 2; - const USERS_ENDPOINT: u64 = 100; - const PRODUCTS_ENDPOINT: u64 = 101; - const ORDERS_ENDPOINT: u64 = 102; - store5.create(LinkRef::Id(API_CALL), LinkRef::Id(USERS_ENDPOINT))?; store5.create(LinkRef::Id(API_CALL), LinkRef::Id(PRODUCTS_ENDPOINT))?; store5.create(LinkRef::Id(API_CALL), LinkRef::Id(ORDERS_ENDPOINT))?;