Skip to content

feat: Add Azure Queue Storage seedwork with type-safe operations and blob audit logging#161

Draft
Copilot wants to merge 3 commits intomainfrom
copilot/implement-azure-queue-support
Draft

feat: Add Azure Queue Storage seedwork with type-safe operations and blob audit logging#161
Copilot wants to merge 3 commits intomainfrom
copilot/implement-azure-queue-support

Conversation

Copy link
Contributor

Copilot AI commented Feb 6, 2026

Overview

Implements Phase 1 of Azure Queue Storage support: foundational @cellix/queue-storage-seedwork package providing type-safe queue operations with automatic blob audit logging and JSON schema validation.

Remaining phases (service integration, Cellix API extension, PoCs) detailed in QUEUE_STORAGE_IMPLEMENTATION_SUMMARY.md.

Implementation

Type-Safe Message Infrastructure

Generic message envelope with strict typing throughout—no any types:

interface QueueMessageEnvelope<TPayload> {
  messageId: string;
  timestamp: string;
  correlationId?: string;
  queueName: string;
  direction: 'inbound' | 'outbound';
  payload: TPayload;
  metadata?: Record<string, string>;
}

Base Queue Operations

BaseQueueSender

  • Validates payload against JSON schema before send
  • Encodes as base64 JSON for Azure compatibility
  • Logs to queue-messages/outbound/{ISO8601-timestamp}.json
  • OpenTelemetry spans for distributed tracing

BaseQueueReceiver

  • Decodes and validates received messages
  • Logs to queue-messages/inbound/{ISO8601-timestamp}.json
  • Manages message deletion and visibility timeouts

Blob Audit Logging

Fire-and-forget pattern ensures logging doesn't block queue operations. Each blob includes:

  • Metadata: queue name, direction, message ID, timestamp
  • Tags: configurable per queue for filtering/categorization
  • Full message envelope as JSON

Schema Validation

AJV-based validation with per-queue schema registration:

const schema: JSONSchemaType<MyPayload> = {
  type: 'object',
  properties: {
    id: { type: 'string' },
    value: { type: 'number' },
  },
  required: ['id', 'value'],
};

class MyQueueSender extends BaseQueueSender<MyPayload> {
  constructor(config, logger, validator) {
    super(config, {
      queueName: 'my-queue',
      direction: 'outbound',
      payloadSchema: schema,
      blobLogging: {
        tags: { type: 'domain-event' }
      }
    });
  }
}

Usage

// Send
const sender = new MyQueueSender(...);
await sender.ensureQueue();
const result = await sender.sendMessage(
  { id: '123', value: 42 },
  'correlation-id',
  { source: 'api' }
);
// Auto-logged to: queue-messages/outbound/2026-02-06T22:30:00.123Z.json

// Receive
const receiver = new MyQueueReceiver(...);
const messages = await receiver.receiveMessages({ maxMessages: 10 });
for (const { message, messageId, popReceipt } of messages) {
  await processMessage(message.payload);
  await receiver.deleteMessage(messageId, popReceipt);
}
// Auto-logged to: queue-messages/inbound/2026-02-06T22:30:01.456Z.json

Local Development

Azurite emulator support with default connection string documented in README.

Dependencies

  • @azure/storage-queue@^12.26.0
  • @azure/storage-blob@^12.25.0
  • @opentelemetry/api@^1.9.0
  • ajv@^8.17.1

Next Steps

See QUEUE_STORAGE_IMPLEMENTATION_SUMMARY.md for Phases 2-6:

  • Application service (@ocom/service-queue-storage)
  • Cellix queue trigger API
  • Outbound/inbound queue PoCs
  • Integration testing

Estimated 8-13 hours to complete.

Original prompt

This section details on the original issue you should resolve

<issue_title>Implement Azure Queue Storage support in Cellix with type-safe, logged queue sender/receiver and proof-of-concept</issue_title>
<issue_description>## Overview

The Cellix framework must provide robust, first-class support for Azure Queue Storage, enabling reusable queuing and logging for distributed communications between services. This work introduces foundational packages and application integration—including legacy-based queue sender/receiver abstractions, application-configurable service registration, and functional business examples.

Built-in Logging to Blob Storage

  • Every message sent or received (either direction) must be uploaded to Azure Blob Storage in a container queue-messages:
    • Messages sent: stored under queue-messages/outbound/
    • Messages received: stored under queue-messages/inbound/
    • File name: current timestamp (UTC, ISO8601, ms precision), e.g. 2026-02-07T14:42:03.123Z.json
  • Blob Metadata and Tagging:
    • Each file must be tagged and must have blob metadata for queue name and message direction.
    • Developers must be able to configure additional metadata/tags per queue at the application layer (e.g., custom tags per message type / queue).
  • Logging must be reliable, atomic, and must not block the send/receive pipeline (logging should not prevent the queue operation from completing; errors must be handled robustly and traced).
  • Documentation must include instructions for local Azurite-based development (storage emulator).

Implementation expectations (legacy parity + improvements)

  • Abstractions, sender/receiver, and service interface must provide at least the same feature completeness, reliability, and error handling as the legacy efdo implementation.
  • @cellix/queue-storage-seedwork must enforce proper type safety using generic typings and runtime guarantees:
    • No any for generic queue message/payload plumbing.
    • Prefer unknown + validation + typed narrowing where needed.
    • Prefer generics and discriminated unions for message envelopes and payload types.

Deliverables & Structure

1) @cellix/queue-storage-seedwork

Create a new framework seedwork package containing reusable queue storage infrastructure code:

2) @ocom/service-queue-storage

Create an Owner Community application-specific package that:

  • Depends on @cellix/queue-storage-seedwork.
  • Maintains Owner Community’s queue configuration:
    • queue names
    • direction (inbound/outbound)
    • schemas
    • logging metadata/tags configuration
  • On startup, registers all configured queues for sending and receiving.
  • Adheres to Cellix infrastructure service standards (startup/shutdown lifecycle, DI registration patterns).

3) Extend Cellix fluent startup API to support queue triggers

In @ocom/api (and/or Cellix core where appropriate), expose a fluent, chained startup API to register Azure Functions queue handlers similarly to how HTTP handlers are registered today.

Proof-of-concept scenarios (MUST be implemented in Owner Community)

These examples are required to prove the design works with what is already functional in the repo and to provide contributors a working reference.

Outbound queue example: community-created

  • On community creation, an existing integration event handler for CommunityCreatedEvent (already firing in the domain) must send a queue message to the outbound queue community-created.
  • The message contract should align with the actual event and include relevant fields (e.g., communityId, name, createdAt, etc.).
  • The send must:
    • be type-safe (generic typed payload)
    • be schema-validated at runtime
    • log the sent message as JSON to blob storage under queue-messages/outbound/ with configured tags/metadata

Inbound queue example: member

  • Create an inbound queue member that accepts a payload:
    • memberId: string (required; objectId)
    • Select a few sensible fields from the member schema which can be used on the member message payload for the queue handler to update those fields in database when it processes a message. It doesn't matter what fields you pick, ...

💬 We'd love your input! Share your thoughts on Copilot coding agent in our 2 minute survey.

Copilot AI added 2 commits February 6, 2026 22:26
…alidation, and blob logging

- Create type-safe queue message infrastructure with generic envelopes
- Implement BaseQueueSender with automatic validation and blob logging
- Implement BaseQueueReceiver with message processing and blob logging
- Add MessageLogger for automatic audit trail to blob storage
- Add SchemaValidator with AJV for runtime type validation
- Add OpenTelemetry tracing integration
- Include comprehensive README and usage examples
- Add unit tests for schema validator
- Update pnpm workspace catalog with Azure SDK versions

Blob logging structure:
- Outbound: queue-messages/outbound/{timestamp}.json
- Inbound: queue-messages/inbound/{timestamp}.json
- Metadata/tags: queue name, direction, message ID, custom tags

Supports:
- Type-safe generic payloads
- Correlation IDs for distributed tracing
- Custom metadata per message
- Configurable blob logging per queue
- Local development with Azurite emulator

Part of: Azure Queue Storage support implementation
See: QUEUE_STORAGE_IMPLEMENTATION_SUMMARY.md for full implementation plan
Copilot AI changed the title [WIP] Add Azure Queue Storage support in Cellix framework feat: Add Azure Queue Storage seedwork with type-safe operations and blob audit logging Feb 6, 2026
Copilot AI requested a review from nnoce14 February 6, 2026 22:32
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Implement Azure Queue Storage support in Cellix with type-safe, logged queue sender/receiver and proof-of-concept

2 participants