Build event‑driven applications in TypeScript with a clean, strongly‑typed API. Knot lets you define domain events once and plug in any message broker via adapters.
- Strongly‑typed events: model your domain with typed payloads.
- Simple producer/consumer APIs: minimal surface area, easy to test.
- Broker‑agnostic core: swap Kafka for Redis/NATS/RabbitMQ (coming soon) by changing the adapter.
Install the core and your preferred adapter.
npm install @knot/core @knot/kafka-adapterimport { Event } from '@knot/core'
type UserCreatedPayload = {
id: string
name: string
}
export class UserCreated extends Event<UserCreatedPayload> {
static aggregateRoot = 'User'
static routingKey = 'id'
}import { Knot } from '@knot/core'
import { KafkaAdapter } from '@knot/kafka-adapter'
const knot = new Knot({
adapter: new KafkaAdapter({ brokers: ['localhost:9092'] }),
})import { UserCreated } from './events/UserCreated'
const { produce } = knot.producer()
const event = new UserCreated({ id: '42', name: 'Ada' })
await produce({ event })import { UserCreated } from './events/UserCreated'
const { consume, start } = knot.consumer('analytics')
// Declare a handler for the events
consume([UserCreated], async (event) => {
const { id, name } = (event as UserCreated).payload
console.log(id, name)
})
await start() // will start consuming from the topicsconst event1 = new UserCreated({ id: '1', name: 'Ada' })
const event2 = new UserCreated({ id: '2', name: 'Linus' })
await produce({
events: [event1, event2],
})import { Knot } from '@knot/core'
import { KafkaAdapter } from '@knot/kafka-adapter'
const knot = new Knot({
adapter: new KafkaAdapter({ brokers: ['localhost:9092'] }),
consumerGroupNamespace: 'myapp',
})
const { start, consume } = knot.consumer('analytics')
// consume(...)
await start({ fromBeginning: true }) // will start consume from the beginningconsumerGroupNamespace prefixes your group IDs, helpful when running multiple environments against the same cluster.
You can also disable producing globally (useful in dry‑runs):
const knot = new Knot({
adapter: new KafkaAdapter({ brokers: ['localhost:9092'] }),
disableProducer: true,
})Provide your own logger by implementing the Logger interface and passing it to Knot.
import { Knot, ConsoleLogger } from '@knot/core'
import { KafkaAdapter } from '@knot/kafka-adapter'
const knot = new Knot({
adapter: new KafkaAdapter({ brokers: ['localhost:9092'] }),
logger: new ConsoleLogger(), // this is default, or use your own logger
})Configure the Kafka adapter with your brokers. SASL/SSL map directly to kafkajs options.
import { KafkaAdapter } from '@knot/kafka-adapter'
const adapter = new KafkaAdapter({
brokers: ['localhost:9092'],
})Your adapter should expose producer() and consumer({ groupId }) compatible with your broker. See packages/kafka-adapter for a reference implementation.
-
Event
payload: T- static
aggregateRoot: string - static
routingKey: string
-
Knot(options)
adapter: MessageBrokerAdapterdisableProducer?: booleanconsumerGroupNamespace?: stringlogger?: Logger
-
producer() → { produce }
produce({ event?: Event<T>; events?: Event<T>[] }): Promise<void>
-
consumer(groupId) → { consume, start }
consume(events: Array<typeof Event>, handler: (event, topic, partition) => Promise<void> | void): Promise<void>start(options?: { fromBeginning?: boolean }): Promise<void>
Messages are encoded as JSON buffers of { payload, metadata: { event, producedAt } }.
Run unit tests:
npm run test:coreRun integration tests against Kafka (starts the cluster locally):
docker compose -f packages/kafka-adapter/docker-compose.yml up -d
npm run test:integrationRun the full test suite:
npm testApache-2.0