Produce & consume messages against an AWS MSK (IAM-auth) cluster, supporting optional cross-account AssumeRole. Node.js 20+ recommended.
npm install --omit=dev
kafkajskafkajs-msk-iam-authentication-mechanism(addsAWS_MSK_IAMSASL mechanism for MSK IAM)- AWS SDK v3 credential providers + STS (for optional AssumeRole)
Two subcommands: produce and consume.
--ssl(default true) Set--ssl=falseto attempt PLAINTEXT (debug only; AWS_MSK_IAM normally requires TLS listener).
Single message:
node index.js produce \
--brokers "b-1.example.amazonaws.com:9098,b-2.example.amazonaws.com:9098" \
--topic prism.raw.gap.school \
--region eu-west-1 \
--message "probe" \
--assumeRoleArn $REMOTE_KAFKA_ROLE_ARN
Burst 10 messages (100ms interval):
node index.js produce \
--brokers "$BROKERS" \
--topic prism.raw.gap.school \
--region $DEST_KAFKA_REGION \
--count 10 \
--intervalMs 100
Stream from latest:
node index.js consume \
--brokers "$BROKERS" \
--topic prism.raw.gap.school \
--region $DEST_KAFKA_REGION \
--groupId test-consumer
Read first 5 from beginning then exit:
node index.js consume \
--brokers "$BROKERS" \
--topic prism.raw.gap.school \
--region $DEST_KAFKA_REGION \
--fromBeginning \
--limit 5
--brokersComma list of bootstrap brokers (IAM/SASL_SSL port, usually 9098). Use theIamorIamTlsline fromaws kafka get-bootstrap-brokers.--regionKafka cluster AWS region (SigV4 signing region).--assumeRoleArn(optional) Remote role to assume. If omitted uses base creds (IRSA / env / shared config).--clientIdKafka client id (defaultkafka-test-tool).--logLevelinfo|warn|error|debug|nothing.
Produce-specific:
--messageBody template (string) used for each message.--countNumber of messages (default 1).--intervalMsDelay between sends.
Consume-specific:
--groupIdConsumer group (defaultkafka-test-tool-group).--fromBeginningStart at earliest offset.--limitStop after N messages (0 = keep running).
- Base credentials (IRSA / env / profile) acquired via provider chain.
- If
--assumeRoleArnsupplied, STS AssumeRole executed and short‑lived credentials cached until near expiration. - KafkaJS IAM mechanism signs each request (SigV4) with current credentials.
- Ensure MSK cluster has IAM authentication enabled and you are using the IAM listener port.
- Role policy must include necessary
kafka-cluster:*permissions for topics/consumer groups. - Clock skew can break SigV4; container time must be in sync.
- Error
UNSUPPORTED_SASL_MECHANISMusually means wrong listener/port or cluster not IAM-enabled.
npm run produce -- --brokers "$BROKERS" --topic prism.raw.gap.school --region $DEST_KAFKA_REGION --message test
npm run consume -- --brokers "$BROKERS" --topic prism.raw.gap.school --region $DEST_KAFKA_REGION
Cross-account example:
npm run consume -- --brokers b-3-public.kafka-prism-dev.j4p0qp.c3.kafka.eu-west-1.amazonaws.com:9198 \
--assumeRoleArn "arn:aws:iam::291654376946:role/ef-studio-prism-elive-integration" \
--topic "prism.raw.gap.school" \
--region "eu-west-1"
npm run consume -- --brokers b-3-public.kafka-prism-dev.j4p0qp.c3.kafka.eu-west-1.amazonaws.com:9198 --assumeRoleArn arn:aws:iam::291654376946:role/ef-studio-prism-elive-integration --topic prism.raw.gap.school --region eu-west-1 --limit 1 --logLevel debug --groupId external.elive.kafka-test-tool-group
node index.js consume
--brokers "kafka-proxy-1.shared.svc.cluster.local:9093,kafka-proxy-2.shared.svc.cluster.local:9093,kafka-proxy-3.shared.svc.cluster.local:9093"
--topic prism.raw.catalyst.elive
--region eu-west-1
--groupId external.elive.prism.kafka.proxy
--clientId el-prism-kafka-proxy
--assumeRoleArn arn:aws:iam::291654376946:role/ef-studio-prism-elive-integration
--logLevel debug
--limit 1
node index.js consume
--brokers "kafka-proxy-1.shared.svc.cluster.local:9093"
--topic prism.raw.catalyst.elive
--region eu-west-1
--groupId external.elive.prism.kafka.proxy-test
--clientId el-prism-kafka-proxy
--assumeRoleArn arn:aws:iam::291654376946:role/ef-studio-prism-elive-integration
--logLevel debug
--limit 1
node index.js consume --brokers "b-2-public.kafka-prism-dev.j4p0qp.c3.kafka.eu-west-1.amazonaws.com:9198,b-1-public.kafka-prism-dev.j4p0qp.c3.kafka.eu-west-1.amazonaws.com:9198,b-3-public.kafka-prism-dev.j4p0qp.c3.kafka.eu-west-1.amazonaws.com:9198" --topic prism.raw.catalyst.elive --region eu-west-1 --groupId external.elive.prism.kafka.proxy.tool --clientId el-prism-kafka-proxy --assumeRoleArn arn:aws:iam::291654376946:role/ef-studio-prism-elive-integration --logLevel debug --limit 1 --ssl=true
Ctrl+C triggers graceful disconnect.
npm run consume -- --brokers b-3-public.kafka-prism-dev.j4p0qp.c3.kafka.eu-west-1.amazonaws.com:9198 --assumeRoleArn arn:aws:iam::291654376946:role/ef-studio-prism-elive-integration --topic prism.raw.catalyst.elive --region eu-west-1 --limit 1 --logLevel debug