diff --git a/cmd/service/metro/component.go b/cmd/service/metro/component.go index 76afdfdf..564cb724 100644 --- a/cmd/service/metro/component.go +++ b/cmd/service/metro/component.go @@ -6,6 +6,7 @@ import ( "github.com/razorpay/metro/internal/config" "github.com/razorpay/metro/pkg/logger" "github.com/razorpay/metro/service" + consumeplane "github.com/razorpay/metro/service/consume-plane" openapiserver "github.com/razorpay/metro/service/openapi-server" "github.com/razorpay/metro/service/web" "github.com/razorpay/metro/service/worker" @@ -29,6 +30,8 @@ func NewComponent(component string, cfg config.Config) (*Component, error) { svc, err = worker.NewService(&cfg.Worker, &cfg.Registry, &cfg.Cache) case OpenAPIServer: svc, err = openapiserver.NewService(&cfg.OpenAPIServer) + case ConsumePlane: + svc, err = consumeplane.NewService(&cfg.ConsumePlane, &cfg.Registry) } if err != nil { diff --git a/cmd/service/metro/metro.go b/cmd/service/metro/metro.go index 94cc570c..bf640dcc 100644 --- a/cmd/service/metro/metro.go +++ b/cmd/service/metro/metro.go @@ -21,9 +21,11 @@ const ( Worker = "worker" // OpenAPIServer to server swagger docs OpenAPIServer = "openapi-server" + // ConsumePlane component serves as a broker interface + ConsumePlane = "consume-plane" ) -var validComponents = []string{Web, Worker, OpenAPIServer} +var validComponents = []string{Web, Worker, OpenAPIServer, ConsumePlane} var component *Component // isValidComponent validates if the input component is a valid metro component diff --git a/config/default.toml b/config/default.toml index c60cc350..f2a3d410 100644 --- a/config/default.toml +++ b/config/default.toml @@ -17,6 +17,8 @@ errorLevel = 1 [web] + replicaCount = 1 + consumePlaneDeployment = "metro.svc.cluster.local" [web.broker] variant = "kafka" [web.broker.brokerconfig] @@ -28,6 +30,31 @@ GrpcServerAddress = "0.0.0.0:8081" HttpServerAddress = "0.0.0.0:8082" InternalHttpServerAddress = "0.0.0.0:9000" + [web.httpclientconfig] + connectTimeoutMs = 2000 + connKeepAliveMs = 0 + expectContinueTimeoutMs = 0 + idleConnTimeoutMs = 60000 + maxAllIdleConns = 1000 + maxHostIdleConns = 1000 + responseHeaderTimeoutMs = 25000 + tlsHandshakeTimeoutMs = 2000 + + +[consumePlane] + replicaCount = 1 + ordinalID = 0 + [consumePlane.broker] + variant = "kafka" + [consumePlane.broker.brokerconfig] + brokers = ["localhost:9092"] + enableTLS = false + [consumePlane.broker.brokerconfig.consumePlane] + [consumePlane.interfaces] + [consumePlane.interfaces.api] + GrpcServerAddress = "0.0.0.0:8088" + HttpServerAddress = "0.0.0.0:8089" + InternalHttpServerAddress = "0.0.0.0:9003" [worker] [worker.broker] diff --git a/config/dev_docker.toml b/config/dev_docker.toml index 29ee5c56..947232af 100644 --- a/config/dev_docker.toml +++ b/config/dev_docker.toml @@ -17,6 +17,8 @@ errorLevel = 1 [web] + replicaCount = 1 + consumePlaneDeployment = "metro.svc.cluster.local" [web.broker] variant = "kafka" [web.broker.brokerconfig] @@ -28,6 +30,21 @@ HttpServerAddress = "0.0.0.0:8082" InternalHttpServerAddress = "0.0.0.0:9000" +[consumePlane] + replicaCount = 1 + ordinalID = 0 + [consumePlane.broker] + variant = "kafka" + [consumePlane.broker.brokerconfig] + brokers = ["localhost:9092"] + enableTLS = false + [consumePlane.broker.brokerconfig.consumePlane] + [consumePlane.interfaces] + [consumePlane.interfaces.api] + GrpcServerAddress = "0.0.0.0:8085" + HttpServerAddress = "0.0.0.0:8086" + InternalHttpServerAddress = "0.0.0.0:9003" + [worker] [worker.broker] variant = "kafka" diff --git a/config/func.toml b/config/func.toml index de7115ee..6e602bb6 100644 --- a/config/func.toml +++ b/config/func.toml @@ -17,6 +17,8 @@ errorLevel = 1 [web] + replicaCount = 1 + consumePlaneDeployment = "int.perf.razorpay.in" [web.broker] variant = "kafka" [web.broker.brokerconfig] diff --git a/config/perf.toml b/config/perf.toml index 736f13e2..4a076b72 100644 --- a/config/perf.toml +++ b/config/perf.toml @@ -17,6 +17,8 @@ errorLevel = 1 [web] + replicaCount = 1 + consumePlaneDeployment = "int.perf.razorpay.in" [web.broker] variant = "kafka" [web.broker.brokerconfig] diff --git a/config/stage.toml b/config/stage.toml index 63962e97..7ce508f4 100644 --- a/config/stage.toml +++ b/config/stage.toml @@ -17,6 +17,8 @@ errorLevel = 1 [web] + replicaCount = 1 + consumePlaneDeployment = "int.stage.razorpay.in" [web.broker] variant = "kafka" [web.broker.brokerconfig] diff --git a/go.mod b/go.mod index b20a4b87..fc480d76 100644 --- a/go.mod +++ b/go.mod @@ -15,6 +15,7 @@ require ( github.com/dvsekhvalnov/jose2go v0.0.0-20201001154944-b09cfaf05951 // indirect github.com/fatih/color v1.10.0 // indirect github.com/getsentry/sentry-go v0.11.0 + github.com/gogo/protobuf v1.3.2 // indirect github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect github.com/golang/mock v1.6.0 github.com/golang/protobuf v1.5.2 diff --git a/internal/config/config.go b/internal/config/config.go index 781fa60e..065b5e42 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -7,6 +7,7 @@ import ( "github.com/razorpay/metro/pkg/monitoring/sentry" "github.com/razorpay/metro/pkg/registry" "github.com/razorpay/metro/pkg/tracing" + consumeplane "github.com/razorpay/metro/service/consume-plane" openapiserver "github.com/razorpay/metro/service/openapi-server" "github.com/razorpay/metro/service/web" worker "github.com/razorpay/metro/service/worker" @@ -19,6 +20,7 @@ type Config struct { Sentry sentry.Config Web web.Config Worker worker.Config + ConsumePlane consumeplane.Config Registry registry.Config Cache cache.Config OpenAPIServer openapiserver.Config diff --git a/internal/consumer/consumer.go b/internal/consumer/consumer.go new file mode 100644 index 00000000..998e2702 --- /dev/null +++ b/internal/consumer/consumer.go @@ -0,0 +1,143 @@ +package consumer + +import ( + "context" + "fmt" + "io" + "time" + + "github.com/razorpay/metro/internal/subscriber" + "github.com/razorpay/metro/internal/subscription" + "github.com/razorpay/metro/pkg/logger" + "github.com/razorpay/metro/pkg/messagebroker" + metrov1 "github.com/razorpay/metro/rpc/proto/v1" +) + +// IConsumer defines the set of methods to access a consumer object +type IConsumer interface { + Run() error + Acknowledge(ctx context.Context, req *ParsedAcknowledgeRequest) + ModifyAckDeadline(ctx context.Context, req *ParsedModifyAckDeadlineRequest) + Fetch(ctx context.Context, messageCount int) (*metrov1.PullResponse, error) +} + +// Consumer entity represents a single subscription-partition specific client +type Consumer struct { + computedHash int + subscriberID string + subscription *subscription.Model + subscriberCore subscriber.ICore + subscriptionSubscriber subscriber.ISubscriber + ctx context.Context + errChan chan error +} + +// DefaultNumMessageCount ... +var DefaultNumMessageCount int32 = 10 + +// NewConsumer intializes a consumer entity +func NewConsumer(ctx context.Context, computedHash int, subscriberID string, subscription *subscription.Model, subCore subscriber.ICore, subs subscriber.ISubscriber) *Consumer { + con := &Consumer{ + ctx: ctx, + computedHash: computedHash, + subscriberID: subscriberID, + subscription: subscription, + subscriptionSubscriber: subs, + errChan: make(chan error), + } + return con +} + +// Fetch retrieves messages for a given consumer, it takes ackDeadline, retry and maxMessages into account. +func (c *Consumer) Fetch(ctx context.Context, messageCount int) (*metrov1.PullResponse, error) { + respChan := make(chan *metrov1.PullResponse) + defer close(respChan) + c.subscriptionSubscriber.GetRequestChannel() <- (&subscriber.PullRequest{ + MaxNumOfMessages: int32(messageCount), + RespChan: respChan, + }).WithContext(ctx) + + select { + case resp := <-respChan: + return resp, nil + case <-ctx.Done(): + return &metrov1.PullResponse{}, ctx.Err() + } + +} + +// Acknowledge send an ACK for a set of messages +func (c *Consumer) Acknowledge(ctx context.Context, ackMsgs []*subscriber.AckMessage) { + for _, ackMsg := range ackMsgs { + c.subscriptionSubscriber.GetAckChannel() <- ackMsg.WithContext(ctx) + } +} + +// ModifyAckDeadline allows modification of Ack deadline for a messages(s). +// Deadline of 0 indicates a Nack operation. +func (c *Consumer) ModifyAckDeadline(ctx context.Context, mackMsgs []*subscriber.AckMessage) { + for _, modAckMsg := range mackMsgs { + modAckReq := subscriber.NewModAckMessage(modAckMsg, modAckMsg.Deadline) + modAckReq = modAckReq.WithContext(ctx) + c.subscriptionSubscriber.GetModAckChannel() <- modAckReq + } +} + +// Run ensures that the lifecycle of a consumer is instantiated. +func (c *Consumer) Run() error { + // stream ack timeout + streamAckDeadlineSecs := int32(30) // init with some sane value + timeout := time.NewTicker(time.Duration(streamAckDeadlineSecs) * time.Second) + for { + select { + case <-c.ctx.Done(): + logger.Ctx(c.ctx).Infow("stopping subscriber from <-s.ctx.Done()") + c.stop() + return c.ctx.Err() + case <-timeout.C: + logger.Ctx(c.ctx).Infow("stopping subscriber from <-timeout.C") + c.stop() + return fmt.Errorf("stream: ack deadline seconds crossed") + case err := <-c.errChan: + logger.Ctx(c.ctx).Infow("stopping subscriber from err := <-s.errChan") + c.stop() + if err == io.EOF { + // return will close stream from server side + logger.Ctx(c.ctx).Errorw("stream: EOF received from client") + } else if err != nil { + logger.Ctx(c.ctx).Errorw("stream: error received from client", "error", err.Error()) + } + return nil + case err := <-c.subscriptionSubscriber.GetErrorChannel(): + // streamManagerSubscriberErrors.WithLabelValues(env, s.subscriberID, s.subscriptionSubscriber.GetSubscriptionName(), err.Error()).Inc() + if messagebroker.IsErrorRecoverable(err) { + // no need to stop the subscriber in such cases. just log and return + logger.Ctx(c.ctx).Errorw("subscriber: got recoverable error", err.Error()) + return nil + } + + logger.Ctx(c.ctx).Errorw("subscriber: got un-recoverable error", "error", err.Error()) + logger.Ctx(c.ctx).Infow("stopping subscriber from err := <-s.subscriptionSubscriber.GetErrorChannel()") + c.stop() + return err + + default: + timeout.Reset(time.Duration(streamAckDeadlineSecs) * time.Second) + } + } +} + +func (c *Consumer) stop() { + c.subscriptionSubscriber.Stop() + c.closeSubscriberChannels() + + logger.Ctx(c.ctx).Infow("stopped subscriber...", "subscriberId", c.subscriberID) + +} + +func (c *Consumer) closeSubscriberChannels() { + close(c.errChan) + close(c.subscriptionSubscriber.GetRequestChannel()) + close(c.subscriptionSubscriber.GetAckChannel()) + close(c.subscriptionSubscriber.GetModAckChannel()) +} diff --git a/internal/consumer/lifecycle.go b/internal/consumer/lifecycle.go new file mode 100644 index 00000000..3107120f --- /dev/null +++ b/internal/consumer/lifecycle.go @@ -0,0 +1,281 @@ +package consumer + +import ( + "context" + "sync" + + "github.com/google/uuid" + "github.com/pkg/errors" + "github.com/razorpay/metro/pkg/logger" + "github.com/razorpay/metro/pkg/registry" + "golang.org/x/sync/errgroup" + + "github.com/razorpay/metro/internal/brokerstore" + "github.com/razorpay/metro/internal/common" + "github.com/razorpay/metro/internal/subscriber" + "github.com/razorpay/metro/internal/subscription" +) + +// ILifecycle interface defines lifecycle manager methods +type ILifecycle interface { + GetConsumer(ctx context.Context, sub string, partition int) (*Consumer, error) + CloseConsumer(ctx context.Context, computedHash int) error + Run() error +} + +// Manager ... +type Manager struct { + consumers map[int]*Consumer + subscriptionCore subscription.ICore + subscriberCore subscriber.ICore + subCache []*subscription.Model + cleanupCh chan consumerIdentifier + recoveryCh chan consumerIdentifier + registry registry.IRegistry + replicas int + ordinalID int + bs brokerstore.IBrokerStore + mutex *sync.Mutex + ctx context.Context + subWatchData chan *struct{} +} + +// NewLifecycleManager ... +func NewLifecycleManager(ctx context.Context, replicas int, ordinalID int, subscriptionCore subscription.ICore, subscriberCore subscriber.ICore, bs brokerstore.IBrokerStore, r registry.IRegistry) (ILifecycle, error) { + + allSubs, err := subscriptionCore.List(ctx, subscription.Prefix) + if err != nil { + logger.Ctx(ctx).Errorw("error fetching new subscription list", "error", err) + return nil, err + } + + // Filter Pull Subscriptions + var pullSubs []*subscription.Model + for _, sub := range allSubs { + if !sub.IsPush() { + pullSubs = append(pullSubs, sub) + } + } + + mgr := &Manager{ + consumers: make(map[int]*Consumer), + subscriptionCore: subscriptionCore, + subscriberCore: subscriberCore, + registry: r, + bs: bs, + cleanupCh: make(chan consumerIdentifier), + recoveryCh: make(chan consumerIdentifier), + replicas: replicas, + ordinalID: ordinalID, + mutex: &sync.Mutex{}, + ctx: ctx, + subCache: pullSubs, + subWatchData: make(chan *struct{}), + } + + return mgr, nil +} + +// Run instantiates the listeners for a lifecycle manager +func (m *Manager) Run() error { + + // Start consumers + m.refreshConsumers() + + var subWatcher registry.IWatcher + + // TODO: Implement watch on subscripitons to update active consumers based on watch updates. + + swh := registry.WatchConfig{ + WatchType: "keyprefix", + WatchPath: common.GetBasePrefix() + subscription.Prefix, + Handler: func(ctx context.Context, pairs []registry.Pair) { + logger.Ctx(ctx).Infow("subscriptions watch handler data", "pairs", pairs) + m.subWatchData <- &struct{}{} + }, + } + + logger.Ctx(m.ctx).Infof("setting watch on subscriptions") + subWatcher, err := m.registry.Watch(m.ctx, &swh) + if err != nil { + return err + } + + leadgrp, gctx := errgroup.WithContext(m.ctx) + + // watch the Subscriptions path for new subscriptions and rebalance + leadgrp.Go(func() error { + watchErr := subWatcher.StartWatch() + close(m.subWatchData) + return watchErr + }) + + // wait for done channel to be closed and stop watches if received done + leadgrp.Go(func() error { + <-gctx.Done() + + if subWatcher != nil { + subWatcher.StopWatch() + } + + logger.Ctx(gctx).Info("scheduler group context done") + return gctx.Err() + }) + for { + select { + case <-m.ctx.Done(): + for h := range m.consumers { + delete(m.consumers, h) + } + return nil + case val := <-m.subWatchData: + if val == nil { + continue + } + allSubs, serr := m.subscriptionCore.List(gctx, subscription.Prefix) + if serr != nil { + logger.Ctx(gctx).Errorw("error fetching new subscription list", "error", serr) + return err + } + // Filter Push Subscriptions + var newSubs []*subscription.Model + for _, sub := range allSubs { + if !sub.IsPush() { + newSubs = append(newSubs, sub) + } + } + + m.subCache = newSubs + m.refreshConsumers() + case recoveryMessage := <-m.recoveryCh: + logger.Ctx(m.ctx).Infow("lifecyclemanager: consumer encountered irrecoverable error, attempting to recover", "recoveryMessage", recoveryMessage) + if existingCon, ok := m.consumers[recoveryMessage.computedHash]; ok { + + sub := existingCon.subscription + part := existingCon.subscription.Partition + + m.CloseConsumer(m.ctx, recoveryMessage.computedHash) + + con, err := m.CreateConsumer(m.ctx, sub, part, recoveryMessage.computedHash) + if err != nil { + logger.Ctx(m.ctx).Errorw("lifecyclemanager: failed to restore consumer", "subscription", sub.Name, "partition", part) + } + m.consumers[recoveryMessage.computedHash] = con + } + + case cleanupMessage := <-m.cleanupCh: + logger.Ctx(m.ctx).Infow("lifecyclemanager: got request to cleanup subscriber", "cleanupMessage", cleanupMessage) + m.CloseConsumer(m.ctx, cleanupMessage.computedHash) + } + } +} + +func (m *Manager) refreshConsumers() { + + logger.Ctx(m.ctx).Infow("lifecyclemanager: Consumer refresh intiated") + existingConsumers := make(map[int]subPartition) + consumersToAdd := make(map[int]subPartition) + + for _, sub := range m.subCache { + subPart, err := m.subscriptionCore.FetchPartitionsForHash(m.ctx, sub, m.ordinalID) + if err != nil { + logger.Ctx(m.ctx).Errorw("lifecyclemanager: Error resolving partitions for subscription", "subscription", sub.Name) + } + for _, partition := range subPart { + computedHash := m.subscriptionCore.FetchSubscriptionHash(m.ctx, sub.Name, partition) + if _, ok := m.consumers[computedHash]; ok { + existingConsumers[computedHash] = subPartition{ + subscription: sub, + partition: partition, + } + } else { + consumersToAdd[computedHash] = subPartition{ + subscription: sub, + partition: partition, + } + } + } + } + + logger.Ctx(m.ctx).Infow("lifecyclemanager: Consumer update status", "existing", len(existingConsumers), "new", len(consumersToAdd)) + // Gracefully shutdown reassigned consumers + for h := range m.consumers { + if _, ok := m.consumers[h]; !ok { + m.cleanupCh <- consumerIdentifier{ + computedHash: h, + } + } + + } + for hash, sp := range consumersToAdd { + consumer, err := m.CreateConsumer(m.ctx, sp.subscription, sp.partition, hash) + if err != nil { + // Proceed without failing since this requires other subscribers to be setup + logger.Ctx(m.ctx).Errorw("lifecyclemanager: failed to create consumer for subscription-partition assignment", "subscription", sp.subscription.Name, "partition", sp.partition) + } else { + logger.Ctx(m.ctx).Infow("lifecyclemanager: Successfully added consumer", "consumerHash", hash, "subscription", sp.subscription.Name, "partition", sp.partition) + m.consumers[hash] = consumer + } + + } + +} + +// CreateConsumer sets up the underlying subscriber and returns the consumer object +func (m *Manager) CreateConsumer(ctx context.Context, sub *subscription.Model, partition int, hash int) (*Consumer, error) { + subscriberID := uuid.New().String() + + subscriberRequestCh := make(chan *subscriber.PullRequest) + subscriberAckCh := make(chan *subscriber.AckMessage) + subscriberModAckCh := make(chan *subscriber.ModAckMessage) + + subscriber, err := m.subscriberCore.NewSubscriber(ctx, subscriberID, sub, 100, 50, 5000, subscriberRequestCh, subscriberAckCh, subscriberModAckCh) + // subscriber, err := m.subscriberCore.NewOpinionatedSubscriber(ctx, subscriberID, sub, partition, hash, 100, 50, 5000, + // subscriberRequestCh, subscriberAckCh, subscriberModAckCh) + if err != nil { + logger.Ctx(ctx).Errorw("lifecyclemanager: failed to create subscriber for subscription-partition", "subscription", sub.Name, "partition", partition) + return nil, err + } + return &Consumer{ + ctx: m.ctx, + computedHash: hash, + subscriberID: subscriberID, + subscription: sub, + subscriberCore: m.subscriberCore, + subscriptionSubscriber: subscriber, + }, nil + +} + +// GetConsumer fetches the relevant consumer from the memory map. +func (m *Manager) GetConsumer(ctx context.Context, sub string, partition int) (*Consumer, error) { + computedHash := m.subscriptionCore.FetchSubscriptionHash(ctx, sub, partition) + consumer := m.consumers[computedHash] + if consumer == nil { + return nil, errors.Errorf("lifecyclemanager: No active consumer found") + } + return consumer, nil +} + +// CloseConsumer ensures that the consumer is greacefully exited. +func (m *Manager) CloseConsumer(ctx context.Context, computedhash int) error { + con := m.consumers[computedhash] + if con != nil { + m.mutex.Lock() + con.stop() + delete(m.consumers, computedhash) + m.mutex.Unlock() + } + + return nil +} + +// consumerIdentifier ... +type consumerIdentifier struct { + computedHash int +} + +type subPartition struct { + subscription *subscription.Model + partition int +} diff --git a/internal/consumer/request.go b/internal/consumer/request.go new file mode 100644 index 00000000..345154fb --- /dev/null +++ b/internal/consumer/request.go @@ -0,0 +1,104 @@ +package consumer + +import ( + "github.com/razorpay/metro/internal/subscriber" + metrov1 "github.com/razorpay/metro/rpc/proto/v1" +) + +// ParsedFetchRequest ... +type ParsedFetchRequest struct { + Subscription string + MessageCount int + Partition int +} + +// ParsedAcknowledgeRequest ... +type ParsedAcknowledgeRequest struct { + Subscription string + AckIDs []string + AckMessages []*subscriber.AckMessage +} + +//ParsedModifyAckDeadlineRequest ... +type ParsedModifyAckDeadlineRequest struct { + Subscription string + AckIDs []string + AckMessages []*subscriber.AckMessage + ModifyDeadlineMsgIdsWithSecs map[string]int32 +} + +// NewParsedFetchRequest ... +func NewParsedFetchRequest(req *metrov1.FetchRequest) (*ParsedFetchRequest, error) { + parsedReq := &ParsedFetchRequest{} + parsedReq.Subscription = req.Subscription + parsedReq.MessageCount = int(req.MaxMessages) + parsedReq.Partition = int(req.Partition) + + return parsedReq, nil +} + +// NewParsedAcknowledgeRequest ... +func NewParsedAcknowledgeRequest(req *metrov1.AcknowledgeRequest) (*ParsedAcknowledgeRequest, error) { + parsedReq := &ParsedAcknowledgeRequest{} + + // TODO : add validations and throw error + parsedReq.Subscription = req.Subscription + if req.AckIds != nil && len(req.AckIds) > 0 { + ackMessages := make([]*subscriber.AckMessage, 0) + parsedReq.AckIDs = req.AckIds + for _, ackID := range req.AckIds { + ackMessage, err := subscriber.ParseAckID(ackID) + if err != nil { + return nil, err + } + ackMessages = append(ackMessages, ackMessage) + } + parsedReq.AckIDs = req.AckIds + parsedReq.AckMessages = ackMessages + } + + return parsedReq, nil +} + +// NewParsedModifyAckDeadlineRequest ... +func NewParsedModifyAckDeadlineRequest(req *metrov1.ModifyAckDeadlineRequest) (*ParsedModifyAckDeadlineRequest, error) { + parsedReq := &ParsedModifyAckDeadlineRequest{} + + // TODO : add validations and throw error + parsedReq.Subscription = req.Subscription + if req.AckIds != nil && len(req.AckIds) > 0 { + ackMessages := make([]*subscriber.AckMessage, 0) + modifyDeadlineMsgIdsWithSecs := make(map[string]int32) + parsedReq.AckIDs = req.AckIds + for _, ackID := range req.AckIds { + ackMessage, err := subscriber.ParseAckID(ackID) + if err != nil { + return nil, err + } + ackMessage.Deadline = req.AckDeadlineSeconds + ackMessages = append(ackMessages, ackMessage) + modifyDeadlineMsgIdsWithSecs[ackMessage.MessageID] = req.AckDeadlineSeconds + } + parsedReq.AckIDs = req.AckIds + parsedReq.AckMessages = ackMessages + parsedReq.ModifyDeadlineMsgIdsWithSecs = modifyDeadlineMsgIdsWithSecs + } + + return parsedReq, nil +} + +// requestType specifies the type of proxy request, ack or modack +type requestType int + +const ( + ack requestType = iota + modAck +) + +type proxyRequest struct { + addr string + grpcServerAddr string + ackMsgs []*subscriber.AckMessage + parsedReq *metrov1.PullRequest + requestType requestType +} diff --git a/internal/hash/hash.go b/internal/hash/hash.go new file mode 100644 index 00000000..5d7beca1 --- /dev/null +++ b/internal/hash/hash.go @@ -0,0 +1,12 @@ +package hash + +import ( + "hash/fnv" +) + +// ComputeHash resolves a checksum given a byte array +func ComputeHash(arr []byte) int { + h := fnv.New32a() + h.Write(arr) + return int(h.Sum32()) +} diff --git a/internal/hash/hash_test.go b/internal/hash/hash_test.go new file mode 100644 index 00000000..e5da0c37 --- /dev/null +++ b/internal/hash/hash_test.go @@ -0,0 +1,36 @@ +package hash + +import "testing" + +func TestComputeHash(t *testing.T) { + type args struct { + arr []byte + } + tests := []struct { + name string + args args + want int + }{ + { + name: "test string 1", + args: args{ + arr: []byte("abcdef-123"), + }, + want: 2675134171, + }, + { + name: "test string 2", + args: args{ + arr: []byte("lorem ipsum dolor sit amet"), + }, + want: 2828033737, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + if got := ComputeHash(tt.args.arr); got != tt.want { + t.Errorf("ComputeHash() = %v, want %v", got, tt.want) + } + }) + } +} diff --git a/internal/subscriber/core.go b/internal/subscriber/core.go index ec6cc3a3..2a78bbb9 100644 --- a/internal/subscriber/core.go +++ b/internal/subscriber/core.go @@ -17,6 +17,8 @@ import ( type ICore interface { NewSubscriber(ctx context.Context, id string, subscription *subscription.Model, timeoutInMs int, maxOutstandingMessages int64, maxOutstandingBytes int64, requestCh chan *PullRequest, ackCh chan *AckMessage, modAckCh chan *ModAckMessage) (ISubscriber, error) + NewOpinionatedSubscriber(ctx context.Context, id string, subscription *subscription.Model, partiton int, computedHash int, timeoutInMs int, maxOutstandingMessages int64, maxOutstandingBytes int64, + requestCh chan *PullRequest, ackCh chan *AckMessage, modAckCh chan *ModAckMessage) (ISubscriber, error) } // Core implements ICore @@ -42,7 +44,7 @@ func (c *Core) NewSubscriber(ctx context.Context, ackCh chan *AckMessage, modAckCh chan *ModAckMessage) (ISubscriber, error) { - consumer, err := NewConsumerManager(ctx, c.bs, timeoutInMs, subscriberID, subscription.Name, subscription.Topic, subscription.GetRetryTopic()) + consumer, err := NewConsumerManager(ctx, c.bs, timeoutInMs, subscriberID, subscription.Name, subscription.Topic, subscription.GetRetryTopic(), 0) if err != nil { logger.Ctx(ctx).Errorw("subscriber: failed to create consumer", "error", err.Error()) return nil, err @@ -123,3 +125,76 @@ func (c *Core) NewSubscriber(ctx context.Context, go s.Run(subsCtx) return s, nil } + +// NewOpinionatedSubscriber ... +func (c *Core) NewOpinionatedSubscriber(ctx context.Context, + subscriberID string, + subscription *subscription.Model, + partition int, + computedHash int, + timeoutInMs int, + maxOutstandingMessages int64, + maxOutstandingBytes int64, + requestCh chan *PullRequest, + ackCh chan *AckMessage, + modAckCh chan *ModAckMessage) (ISubscriber, error) { + + consumer, err := NewConsumerManager(ctx, c.bs, timeoutInMs, subscriberID, subscription.Name, subscription.Topic, subscription.GetRetryTopic()) + if err != nil { + logger.Ctx(ctx).Errorw("subscriber: failed to create consumer", "error", err.Error()) + return nil, err + } + + subsCtx, cancelFunc := context.WithCancel(ctx) + // using the subscriber ctx for retrier as well. This way when the ctx() for subscribers is done, + // all the delay-consumers spawned within retrier would also get marked as done. + var retrier retry.IRetrier + if subscription.RetryPolicy != nil && subscription.DeadLetterPolicy != nil { + + retrier = retry.NewRetrierBuilder(). + WithSubscription(subscription). + WithBrokerStore(c.bs). + WithBackoff(retry.NewExponentialWindowBackoff()). + WithIntervalFinder(retry.NewClosestIntervalWithCeil()). + WithMessageHandler(retry.NewPushToPrimaryRetryTopicHandler(c.bs)). + WithSubscriberID(subscriberID). + WithPartition(partition). + Build() + + err = retrier.Start(subsCtx) + if err != nil { + return nil, err + } + } + + s := &OpinionatedSubscriber{ + Subscriber{ + subscription: subscription, + topic: subscription.Topic, + subscriberID: subscriberID, + subscriptionCore: c.subscriptionCore, + offsetCore: c.offsetCore, + requestChan: requestCh, + responseChan: make(chan *metrov1.PullResponse), + errChan: make(chan error, 1000), + closeChan: make(chan struct{}), + ackChan: ackCh, + modAckChan: modAckCh, + deadlineTicker: time.NewTicker(deadlineTickerInterval), + timeoutInMs: timeoutInMs, + consumer: consumer, + cancelFunc: cancelFunc, + maxOutstandingMessages: maxOutstandingMessages, + maxOutstandingBytes: maxOutstandingBytes, + consumedMessageStats: make(map[TopicPartition]*ConsumptionMetadata), + ctx: subsCtx, + bs: c.bs, + retrier: retrier, + }, + partition, + computedHash, + } + + go s.Run(subsCtx) + return s, nil +} diff --git a/internal/subscriber/model.go b/internal/subscriber/model.go index 7c9bdf21..645948f3 100644 --- a/internal/subscriber/model.go +++ b/internal/subscriber/model.go @@ -15,12 +15,14 @@ import ( "github.com/razorpay/metro/internal/subscriber/customheap" "github.com/razorpay/metro/pkg/messagebroker" "github.com/razorpay/metro/pkg/utils" + metrov1 "github.com/razorpay/metro/rpc/proto/v1" ) // PullRequest ... type PullRequest struct { ctx context.Context MaxNumOfMessages int32 + RespChan chan *metrov1.PullResponse } // WithContext can be used to set the current context to the request diff --git a/internal/subscriber/retry/builder.go b/internal/subscriber/retry/builder.go index 142f5b19..c83575a8 100644 --- a/internal/subscriber/retry/builder.go +++ b/internal/subscriber/retry/builder.go @@ -15,6 +15,7 @@ type Builder interface { WithSubscription(subs *subscription.Model) Builder WithMessageHandler(handler MessageHandler) Builder WithSubscriberID(subscriberID string) Builder + WithPartition(partition int) Builder Build() IRetrier } @@ -52,6 +53,12 @@ func (retrier *Retrier) WithSubscription(subs *subscription.Model) Builder { return retrier } +// WithPartition ... +func (retrier *Retrier) WithPartition(partition int) Builder { + retrier.partition = partition + return retrier +} + // WithMessageHandler ... func (retrier *Retrier) WithMessageHandler(handler MessageHandler) Builder { retrier.handler = handler diff --git a/internal/subscriber/retry/delayconsumer.go b/internal/subscriber/retry/delayconsumer.go index 7608fd99..141f26bc 100644 --- a/internal/subscriber/retry/delayconsumer.go +++ b/internal/subscriber/retry/delayconsumer.go @@ -15,8 +15,10 @@ type DelayConsumer struct { ctx context.Context cancelFunc func() doneCh chan struct{} + cleanupCh chan struct{} subs *subscription.Model topic string + partition int isPaused bool consumer messagebroker.Consumer bs brokerstore.IBrokerStore @@ -27,13 +29,17 @@ type DelayConsumer struct { } // NewDelayConsumer inits a new delay-consumer with the pre-defined message handler -func NewDelayConsumer(ctx context.Context, subscriberID string, topic string, subs *subscription.Model, bs brokerstore.IBrokerStore, handler MessageHandler) (*DelayConsumer, error) { +func NewDelayConsumer(ctx context.Context, subscriberID string, topic string, partition int, subs *subscription.Model, bs brokerstore.IBrokerStore, handler MessageHandler) (*DelayConsumer, error) { delayCtx, cancel := context.WithCancel(ctx) // only delay-consumer will consume from a subscription specific delay-topic, so can use the same groupID and groupInstanceID consumerOps := messagebroker.ConsumerClientOptions{ - Topics: []string{topic}, - GroupID: subs.GetDelayConsumerGroupID(topic), + Topics: []messagebroker.TopicPartition{ + { + Topic: topic, + }, + }, + GroupID: subs.GetDelayConsumerGroupID(topic, partition), GroupInstanceID: subs.GetDelayConsumerGroupInstanceID(subscriberID, topic), } consumer, err := bs.GetConsumer(ctx, consumerOps) @@ -50,6 +56,7 @@ func NewDelayConsumer(ctx context.Context, subscriberID string, topic string, su ctx: delayCtx, cancelFunc: cancel, topic: topic, + partition: partition, consumer: consumer, subs: subs, bs: bs, @@ -68,10 +75,10 @@ func (dc *DelayConsumer) Run(ctx context.Context) { select { case <-dc.ctx.Done(): logger.Ctx(dc.ctx).Infow("delay-consumer: stopping <-ctx.Done() called", dc.LogFields()...) - dc.bs.RemoveConsumer(ctx, messagebroker.ConsumerClientOptions{GroupID: dc.subs.GetDelayConsumerGroupID(dc.topic), GroupInstanceID: dc.subs.GetDelayConsumerGroupInstanceID(dc.subscriberID, dc.topic)}) + dc.bs.RemoveConsumer(ctx, messagebroker.ConsumerClientOptions{GroupID: dc.subs.GetDelayConsumerGroupID(dc.topic, dc.partition), GroupInstanceID: dc.subs.GetDelayConsumerGroupInstanceID(dc.subscriberID, dc.topic)}) dc.consumer.Close(dc.ctx) return - default: + case <-dc.doneCh: resp, err := dc.consumer.ReceiveMessages(dc.ctx, messagebroker.GetMessagesFromTopicRequest{NumOfMessages: 10, TimeoutMs: int(defaultBrokerOperationsTimeoutMs)}) if err != nil { if !messagebroker.IsErrorRecoverable(err) { @@ -147,13 +154,19 @@ func (dc *DelayConsumer) processMsgs() { return } } else { - // submit to - logger.Ctx(dc.ctx).Infow("delay-consumer: processing message", dc.LogFields("messageID", msg.MessageID)...) - err := dc.handler.Do(dc.ctx, msg) - if err != nil { - logger.Ctx(dc.ctx).Errorw("delay-consumer: error in msg handler", - dc.LogFields("messageID", msg.MessageID, "error", err.Error())...) - return + // Only process message if it belongs to the partition that the current subscriber handles + // Since the consumer group ID is unique for each subscrption-partition consumer, + // messages that do not confirm to the current partition will be handled + // by the delay consumer in the corresponding subscription-partition delay consumer + if int(msg.Partition) == dc.partition { + // submit to delay consumer handler + logger.Ctx(dc.ctx).Infow("delay-consumer: processing message", dc.LogFields("messageID", msg.MessageID)...) + err := dc.handler.Do(dc.ctx, msg) + if err != nil { + logger.Ctx(dc.ctx).Errorw("delay-consumer: error in msg handler", + dc.LogFields("messageID", msg.MessageID, "error", err.Error())...) + return + } } } @@ -193,7 +206,7 @@ func (dc *DelayConsumer) LogFields(kv ...interface{}) []interface{} { fields := []interface{}{ "delayConsumerConfig", map[string]interface{}{ "topic": dc.topic, - "groupID": dc.subs.GetDelayConsumerGroupID(dc.topic), + "groupID": dc.subs.GetDelayConsumerGroupID(dc.topic, dc.partition), "groupInstanceID": dc.subs.GetDelayConsumerGroupInstanceID(dc.subscriberID, dc.topic), }, } diff --git a/internal/subscriber/retry/delayconsumer_test.go b/internal/subscriber/retry/delayconsumer_test.go index a429f6b7..ca57dec2 100644 --- a/internal/subscriber/retry/delayconsumer_test.go +++ b/internal/subscriber/retry/delayconsumer_test.go @@ -1,3 +1,4 @@ +//go:build unit // +build unit package retry @@ -55,7 +56,7 @@ func TestDelayConsumer_Run(t *testing.T) { mockConsumer.EXPECT().Resume(gomock.AssignableToTypeOf(ctx), gomock.Any()).Return(nil).AnyTimes() // initialize delay consumer - dc, err := NewDelayConsumer(ctx, subscriberID, "t1", subs, mockBrokerStore, mockHandler) + dc, err := NewDelayConsumer(ctx, subscriberID, "t1", 0, subs, mockBrokerStore, mockHandler) assert.NotNil(t, dc) assert.Nil(t, err) assert.NotNil(t, dc.LogFields()) diff --git a/internal/subscriber/retry/retrier.go b/internal/subscriber/retry/retrier.go index c77c963f..d2030b3a 100644 --- a/internal/subscriber/retry/retrier.go +++ b/internal/subscriber/retry/retrier.go @@ -30,6 +30,7 @@ type Retrier struct { backoff Backoff finder IntervalFinder handler MessageHandler + partition int delayConsumers sync.Map } @@ -37,7 +38,7 @@ type Retrier struct { func (r *Retrier) Start(ctx context.Context) error { // TODO : validate retrier params for nils and substitute with defaults for interval, topic := range r.subs.GetDelayTopicsMap() { - dc, err := NewDelayConsumer(ctx, r.subscriberID, topic, r.subs, r.bs, r.handler) + dc, err := NewDelayConsumer(ctx, r.subscriberID, topic, r.partition, r.subs, r.bs, r.handler) if err != nil { return err } diff --git a/internal/subscriber/subscriber.go b/internal/subscriber/subscriber.go index 29c2679e..0791b033 100644 --- a/internal/subscriber/subscriber.go +++ b/internal/subscriber/subscriber.go @@ -48,6 +48,7 @@ type Implementation interface { type Subscriber struct { subscription *subscription.Model topic string + partition int subscriberID string requestChan chan *PullRequest responseChan chan *metrov1.PullResponse diff --git a/internal/subscription/core.go b/internal/subscription/core.go index ff12cf8e..9780c54a 100644 --- a/internal/subscription/core.go +++ b/internal/subscription/core.go @@ -2,11 +2,13 @@ package subscription import ( "context" + "strconv" "time" metrov1 "github.com/razorpay/metro/rpc/proto/v1" "github.com/razorpay/metro/internal/common" + "github.com/razorpay/metro/internal/hash" "github.com/razorpay/metro/internal/merror" "github.com/razorpay/metro/internal/project" "github.com/razorpay/metro/internal/topic" @@ -25,18 +27,25 @@ type ICore interface { List(ctx context.Context, prefix string) ([]*Model, error) Get(ctx context.Context, key string) (*Model, error) Migrate(ctx context.Context, names []string) error + FetchPartitionsForHash(ctx context.Context, m *Model, node int) ([]int, error) + FetchSubscriptionHash(ctx context.Context, subName string, partition int) int } +const ( + subscriptionHashFormat = "%s-%s-" +) + // Core implements all business logic for a subscription type Core struct { + TotalNodes int repo IRepo projectCore project.ICore topicCore topic.ICore } // NewCore returns an instance of Core -func NewCore(repo IRepo, projectCore project.ICore, topicCore topic.ICore) ICore { - return &Core{repo, projectCore, topicCore} +func NewCore(nodes int, repo IRepo, projectCore project.ICore, topicCore topic.ICore) ICore { + return &Core{nodes, repo, projectCore, topicCore} } // CreateSubscription creates a subscription for a given topic @@ -329,6 +338,38 @@ func createDelayTopics(ctx context.Context, m *Model, topicCore topic.ICore) err return nil } +// FetchPartitionsForHash ... +func (c *Core) FetchPartitionsForHash(ctx context.Context, sub *Model, node int) ([]int, error) { + matchingPartitions := make([]int, 0) + topicModel, err := c.topicCore.Get(ctx, sub.GetTopic()) + if err != nil { + logger.Ctx(ctx).Errorw("fetchpartitionsforhash: failed to fetch topic for subscription", "subscription", sub.Name, "topic", sub.GetTopic()) + return matchingPartitions, err + } + + if c.TotalNodes == 1 { + partitions := make([]int, topicModel.NumPartitions) + for i := 0; i < topicModel.NumPartitions; i++ { + partitions = append(partitions, i) + } + return partitions, nil + } + for i := 0; i < topicModel.NumPartitions; i++ { + computedHash := c.FetchSubscriptionHash(ctx, sub.Name, i) + partitionNode := computedHash % c.TotalNodes + if partitionNode == node { + matchingPartitions = append(matchingPartitions, partitionNode) + } + } + + return matchingPartitions, nil +} + +// FetchSubscriptionHash ... +func (c *Core) FetchSubscriptionHash(ctx context.Context, sub string, partition int) int { + return hash.ComputeHash([]byte(sub + strconv.Itoa(partition))) +} + // Migrate takes care of backfilling subscription topics for existing subscriptions. // This is an idempotent operation that creates topics for each subscription. // Migrate can be modified in the future for other use-cases as well. diff --git a/internal/subscription/core_test.go b/internal/subscription/core_test.go index 574509de..c12fed5d 100644 --- a/internal/subscription/core_test.go +++ b/internal/subscription/core_test.go @@ -1,3 +1,4 @@ +//go:build unit // +build unit package subscription @@ -20,7 +21,7 @@ func TestSubscriptionCore_UpdateSubscription(t *testing.T) { mockTopicCore := tCore.NewMockICore(ctrl) mockRepo := repo.NewMockIRepo(ctrl) - core := NewCore(mockRepo, mockProjectCore, mockTopicCore) + core := NewCore(1, mockRepo, mockProjectCore, mockTopicCore) sub := Model{ Name: "projects/project123/subscriptions/subscription123", Topic: "projects/project123/subscriptions/subscription123", @@ -45,7 +46,7 @@ func TestSubscriptionCore_UpdateSubscriptionNotExists(t *testing.T) { mockTopicCore := tCore.NewMockICore(ctrl) mockRepo := repo.NewMockIRepo(ctrl) - core := NewCore(mockRepo, mockProjectCore, mockTopicCore) + core := NewCore(1, mockRepo, mockProjectCore, mockTopicCore) sub := Model{ Name: "projects/project123/subscriptions/subscription123", Topic: "projects/project123/subscriptions/subscription123", @@ -70,7 +71,7 @@ func TestSubscriptionCore_UpdateSubscriptionProjectNotExists(t *testing.T) { mockTopicCore := tCore.NewMockICore(ctrl) mockRepo := repo.NewMockIRepo(ctrl) - core := NewCore(mockRepo, mockProjectCore, mockTopicCore) + core := NewCore(1, mockRepo, mockProjectCore, mockTopicCore) sub := Model{ Name: "projects/project123/subscriptions/subscription123", Topic: "projects/project123/subscriptions/subscription123", diff --git a/internal/subscription/delay.go b/internal/subscription/delay.go index 5d25f66e..ef548ace 100644 --- a/internal/subscription/delay.go +++ b/internal/subscription/delay.go @@ -6,8 +6,8 @@ const delayTopicNameFormat = "%v.delay.%v.seconds" // projects/p1/topics/subs.delay.30.seconds const delayTopicWithProjectNameFormat = "projects/%v/topics/%v.delay.%v.seconds" -// subs.delay.30.seconds-cg -const delayConsumerGroupIDFormat = "%v-cg" +// subs.delay.30.seconds-0-cg +const delayConsumerGroupIDFormat = "%v-%d-cg" // delayTopicName-subscriberID const delayConsumerGroupInstanceIDFormat = "%v-%v" diff --git a/internal/subscription/model.go b/internal/subscription/model.go index eba8112b..8c41a0fe 100644 --- a/internal/subscription/model.go +++ b/internal/subscription/model.go @@ -19,6 +19,7 @@ type Model struct { common.BaseModel Name string `json:"name,omitempty"` Topic string `json:"topic,omitempty"` + Partition int `json:partition` PushConfig *PushConfig `json:"push_config,omitempty"` AckDeadlineSeconds int32 `json:"ack_deadline_seconds,omitempty"` RetainAckedMessages bool `json:"retain_acked_messages,omitempty"` @@ -202,8 +203,8 @@ func (m *Model) GetDelay3600secTopic() string { } // GetDelayConsumerGroupID returns the consumer group ID to be used by the delay consumers -func (m *Model) GetDelayConsumerGroupID(delayTopic string) string { - return fmt.Sprintf(delayConsumerGroupIDFormat, delayTopic) +func (m *Model) GetDelayConsumerGroupID(delayTopic string, partition int) string { + return fmt.Sprintf(delayConsumerGroupIDFormat, delayTopic, partition) } // GetDelayConsumerGroupInstanceID returns the consumer group ID to be used by the specific delay consumer diff --git a/internal/subscription/model_test.go b/internal/subscription/model_test.go index ce42e026..9c756f4a 100644 --- a/internal/subscription/model_test.go +++ b/internal/subscription/model_test.go @@ -79,6 +79,6 @@ func Test_Model(t *testing.T) { assert.Equal(t, 8, len(dSubscription.GetDelayTopicsMap())) assert.Equal(t, []string{"projects/test-project/topics/test-subscription.delay.5.seconds", "projects/test-project/topics/test-subscription.delay.30.seconds", "projects/test-project/topics/test-subscription.delay.60.seconds", "projects/test-project/topics/test-subscription.delay.150.seconds", "projects/test-project/topics/test-subscription.delay.300.seconds", "projects/test-project/topics/test-subscription.delay.600.seconds", "projects/test-project/topics/test-subscription.delay.1800.seconds", "projects/test-project/topics/test-subscription.delay.3600.seconds"}, delayTopics) - assert.Equal(t, "test-subscription.delay.5.seconds-cg", dSubscription.GetDelayConsumerGroupID("test-subscription.delay.5.seconds")) + assert.Equal(t, "test-subscription.delay.5.seconds-0-cg", dSubscription.GetDelayConsumerGroupID("test-subscription.delay.5.seconds", 0)) assert.Equal(t, "test-subscription.delay.5.seconds-subscriberID", dSubscription.GetDelayConsumerGroupInstanceID("subscriberID", "test-subscription.delay.5.seconds")) } diff --git a/metro-proto b/metro-proto index 337b7974..b627bce0 160000 --- a/metro-proto +++ b/metro-proto @@ -1 +1 @@ -Subproject commit 337b7974cb5ff0e561ff3c22f15dc4ca755cd592 +Subproject commit b627bce018dc6962b977423d6ddd382284b287b4 diff --git a/pkg/messagebroker/config.go b/pkg/messagebroker/config.go index a8d723a1..a0af94c1 100644 --- a/pkg/messagebroker/config.go +++ b/pkg/messagebroker/config.go @@ -45,10 +45,16 @@ type ConsumerConfig struct { // AdminConfig holds configuration for admin APIs type AdminConfig struct{} +// TopicPartition represents a topic and one of it's partitions +type TopicPartition struct { + Topic string + Partition int +} + // ConsumerClientOptions holds client specific configuration for consumer type ConsumerClientOptions struct { // Specify a list of topics to consume messages from - Topics []string + Topics []TopicPartition // Specify the subscription name for this consumer. Only used for pulsar Subscription string // A unique string that identifies the consumer group this consumer belongs to. diff --git a/pkg/messagebroker/kafka.go b/pkg/messagebroker/kafka.go index 1129438a..0d356909 100644 --- a/pkg/messagebroker/kafka.go +++ b/pkg/messagebroker/kafka.go @@ -39,9 +39,24 @@ type KafkaBroker struct { // newKafkaConsumerClient returns a kafka consumer func newKafkaConsumerClient(ctx context.Context, bConfig *BrokerConfig, options *ConsumerClientOptions) (Consumer, error) { + //////////////////////////////////////////////////////////////////////////////////////////////////////////// + // ** This section is deferred until ingestion/subscription topics are in place. + // ** Explicit Partition Assignment not to be undertaken since consumer groups are no longer honored. + // + // normalizedTopics := make([]kafkapkg.TopicPartition, 0) + // for i := range options.Topics { + // brokerTopicName := normalizeTopicName(options.Topics[i].Topic) + // kfkTp := kafkapkg.TopicPartition{ + // Topic: &brokerTopicName, + // Partition: int32(options.Topics[i].Partition), + // } + // normalizedTopics = append(normalizedTopics, kfkTp) + // } + //////////////////////////////////////////////////////////////////////////////////////////////////////////// + normalizedTopics := make([]string, 0) for _, topic := range options.Topics { - normalizedTopics = append(normalizedTopics, normalizeTopicName(topic)) + normalizedTopics = append(normalizedTopics, normalizeTopicName(topic.Topic)) } err := validateKafkaConsumerBrokerConfig(bConfig) @@ -83,8 +98,22 @@ func newKafkaConsumerClient(ctx context.Context, bConfig *BrokerConfig, options return nil, err } + // With partition specific consumers in place at the consume plane this would still work given + // only one partition per topic is currently supported. + // This behaviour is not guaranteed with multiple partitions per primary topic. c.SubscribeTopics(normalizedTopics, nil) + ////////////////////////////////////////////////////// + // *** Warning *** DO NOT explicitly assign topic partitions to consumers + // unless subscriptions have dedicated topics. This will result in consumers being shared across subscribers + // and messages being consumed erratically. + // + // err = c.Assign(normalizedTopics) + // if err != nil { + // return nil, err + // } + ////////////////////////////////////////////////////// + logger.Ctx(ctx).Infow("kafka consumer: initialized", "options", options) return &KafkaBroker{ @@ -303,8 +332,51 @@ func (k *KafkaBroker) DeleteTopic(ctx context.Context, request DeleteTopicReques }, nil } -// GetTopicMetadata fetches the given topics metadata stored in the broker -func (k *KafkaBroker) GetTopicMetadata(ctx context.Context, request GetTopicMetadataRequest) (GetTopicMetadataResponse, error) { +// ListTopics fetches all topics from the broker +func (k *KafkaBroker) ListTopics(ctx context.Context) (ListTopicsResponse, error) { + logger.Ctx(ctx).Infow("kafka: list topics request received") + defer func() { + logger.Ctx(ctx).Infow("kafka: list topics request completed") + }() + + span, ctx := opentracing.StartSpanFromContext(ctx, "Kafka.ListTopics") + defer span.Finish() + + messageBrokerOperationCount.WithLabelValues(env, Kafka, "ListTopics").Inc() + + startTime := time.Now() + defer func() { + messageBrokerOperationTimeTaken.WithLabelValues(env, Kafka, "ListTopics").Observe(time.Now().Sub(startTime).Seconds()) + }() + + // TODO : normalize timeouts + resp, err := k.Consumer.GetMetadata(nil, true, 5000) + if err != nil { + messageBrokerOperationError.WithLabelValues(env, Kafka, "ListTopics", err.Error()).Inc() + return ListTopicsResponse{}, err + } + + topics := make(map[string]TopicMetadata) + for _, topic := range resp.Topics { + tmd := TopicMetadata{ + Topic: topic.Topic, + Partitions: make([]PartitionMetadata, 0), + } + for _, part := range topic.Partitions { + pmd := PartitionMetadata{ + ID: int(part.ID), + Leader: int(part.Leader), + } + tmd.Partitions = append(tmd.Partitions, pmd) + } + topics[topic.Topic] = tmd + } + return ListTopicsResponse{topics}, nil + +} + +// GetTopicPartitionMetadata fetches the given topics metadata stored in the broker +func (k *KafkaBroker) GetTopicPartitionMetadata(ctx context.Context, request GetTopicPartitionMetadataRequest) (GetTopicPartitionMetadataResponse, error) { logger.Ctx(ctx).Infow("kafka: get metadata request received", "request", request) defer func() { logger.Ctx(ctx).Infow("kafka: get metadata request completed", "request", request) @@ -313,11 +385,11 @@ func (k *KafkaBroker) GetTopicMetadata(ctx context.Context, request GetTopicMeta span, ctx := opentracing.StartSpanFromContext(ctx, "Kafka.GetMetadata") defer span.Finish() - messageBrokerOperationCount.WithLabelValues(env, Kafka, "GetTopicMetadata").Inc() + messageBrokerOperationCount.WithLabelValues(env, Kafka, "GetTopicPartitionMetadata").Inc() startTime := time.Now() defer func() { - messageBrokerOperationTimeTaken.WithLabelValues(env, Kafka, "GetTopicMetadata").Observe(time.Now().Sub(startTime).Seconds()) + messageBrokerOperationTimeTaken.WithLabelValues(env, Kafka, "GetTopicPartitionMetadata").Observe(time.Now().Sub(startTime).Seconds()) }() topicN := normalizeTopicName(request.Topic) @@ -332,13 +404,13 @@ func (k *KafkaBroker) GetTopicMetadata(ctx context.Context, request GetTopicMeta // TODO : normalize timeouts resp, err := k.Consumer.Committed(tps, 5000) if err != nil { - messageBrokerOperationError.WithLabelValues(env, Kafka, "GetTopicMetadata", err.Error()).Inc() - return GetTopicMetadataResponse{}, err + messageBrokerOperationError.WithLabelValues(env, Kafka, "GetTopicPartitionMetadata", err.Error()).Inc() + return GetTopicPartitionMetadataResponse{}, err } tpStats := resp[0] offset, _ := strconv.ParseInt(tpStats.Offset.String(), 10, 0) - return GetTopicMetadataResponse{ + return GetTopicPartitionMetadataResponse{ Topic: request.Topic, Partition: request.Partition, Offset: int32(offset), diff --git a/pkg/messagebroker/kafka_test.go b/pkg/messagebroker/kafka_test.go index 33a22542..740414b1 100644 --- a/pkg/messagebroker/kafka_test.go +++ b/pkg/messagebroker/kafka_test.go @@ -1,3 +1,4 @@ +//go:build unit // +build unit package messagebroker @@ -60,7 +61,12 @@ func getValidBrokerConfig() *BrokerConfig { func getValidConsumerClientOptions() *ConsumerClientOptions { return &ConsumerClientOptions{ - Topics: []string{"t1"}, + Topics: []TopicPartition{ + { + Topic: "t1", + Partition: 0, + }, + }, Subscription: "s1", GroupID: "sg1", GroupInstanceID: "sg_i1", diff --git a/pkg/messagebroker/messagebroker.go b/pkg/messagebroker/messagebroker.go index 4c63b6ff..ca8308d6 100644 --- a/pkg/messagebroker/messagebroker.go +++ b/pkg/messagebroker/messagebroker.go @@ -51,8 +51,11 @@ type Consumer interface { // CommitByMsgID Commits a message by ID CommitByMsgID(context.Context, CommitOnTopicRequest) (CommitOnTopicResponse, error) - // GetTopicMetadata gets the topic metadata - GetTopicMetadata(context.Context, GetTopicMetadataRequest) (GetTopicMetadataResponse, error) + // GetTopicPartitionMetadata gets the topic metadata + GetTopicPartitionMetadata(context.Context, GetTopicPartitionMetadataRequest) (GetTopicPartitionMetadataResponse, error) + + // ListTopics fetches metadata for all topics + ListTopics(context.Context) (ListTopicsResponse, error) // Pause pause the consumer Pause(context.Context, PauseOnTopicRequest) error diff --git a/pkg/messagebroker/models.go b/pkg/messagebroker/models.go index d29309ed..abdc440f 100644 --- a/pkg/messagebroker/models.go +++ b/pkg/messagebroker/models.go @@ -107,8 +107,8 @@ type CommitOnTopicRequest struct { ID string } -// GetTopicMetadataRequest ... -type GetTopicMetadataRequest struct { +// GetTopicPartitionMetadataRequest ... +type GetTopicPartitionMetadataRequest struct { Topic string Partition int32 } @@ -123,6 +123,23 @@ type AddTopicPartitionResponse struct { Response interface{} } +// PartitionMetadata holds information about a given topic +type PartitionMetadata struct { + ID int + Leader int +} + +// TopicMetadata holds information about a given topic +type TopicMetadata struct { + Topic string + Partitions []PartitionMetadata +} + +// ListTopicsResponse retuns metadata for all topics in the broker +type ListTopicsResponse struct { + topics map[string]TopicMetadata +} + // DeleteTopicResponse ... type DeleteTopicResponse struct { Response interface{} @@ -194,8 +211,8 @@ type CommitOnTopicResponse struct { Response interface{} } -// GetTopicMetadataResponse ... -type GetTopicMetadataResponse struct { +// GetTopicPartitionMetadataResponse ... +type GetTopicPartitionMetadataResponse struct { Topic string Partition int32 Offset int32 diff --git a/pkg/messagebroker/pulsar.go b/pkg/messagebroker/pulsar.go index 7bf0a0ee..7ace95e8 100644 --- a/pkg/messagebroker/pulsar.go +++ b/pkg/messagebroker/pulsar.go @@ -50,8 +50,13 @@ func newPulsarConsumerClient(_ context.Context, bConfig *BrokerConfig, options * return nil, err } + topics := make([]string, 0) + + for _, tp := range options.Topics { + topics = append(topics, tp.Topic) + } c, err := client.Subscribe(pulsar.ConsumerOptions{ - Topics: options.Topics, + Topics: topics, SubscriptionName: options.Subscription, Type: pulsar.SubscriptionType(bConfig.Consumer.SubscriptionType), Name: options.GroupInstanceID, @@ -253,26 +258,32 @@ func (p *PulsarBroker) CommitByMsgID(ctx context.Context, request CommitOnTopicR return CommitOnTopicResponse{}, nil } -// GetTopicMetadata ... -func (p *PulsarBroker) GetTopicMetadata(ctx context.Context, request GetTopicMetadataRequest) (GetTopicMetadataResponse, error) { - messageBrokerOperationCount.WithLabelValues(env, Pulsar, "GetTopicMetadata").Inc() +// ListTopics ... +func (p *PulsarBroker) ListTopics(ctx context.Context) (ListTopicsResponse, error) { + // TODO: Implement ListTopics + return ListTopicsResponse{}, nil +} + +// GetTopicPartitionMetadata ... +func (p *PulsarBroker) GetTopicPartitionMetadata(ctx context.Context, request GetTopicPartitionMetadataRequest) (GetTopicPartitionMetadataResponse, error) { + messageBrokerOperationCount.WithLabelValues(env, Pulsar, "GetTopicPartitionMetadata").Inc() startTime := time.Now() defer func() { - messageBrokerOperationTimeTaken.WithLabelValues(env, Pulsar, "GetTopicMetadata").Observe(time.Now().Sub(startTime).Seconds()) + messageBrokerOperationTimeTaken.WithLabelValues(env, Pulsar, "GetTopicPartitionMetadata").Observe(time.Now().Sub(startTime).Seconds()) }() pulsarTopic, terr := utils.GetTopicName(request.Topic) if terr != nil { - return GetTopicMetadataResponse{}, terr + return GetTopicPartitionMetadataResponse{}, terr } stats, err := p.Admin.Topics().GetInternalStats(*pulsarTopic) if err != nil { - return GetTopicMetadataResponse{}, err + return GetTopicPartitionMetadataResponse{}, err } - return GetTopicMetadataResponse{ + return GetTopicPartitionMetadataResponse{ Topic: request.Topic, Offset: int32(stats.Cursors[request.Topic].MessagesConsumedCounter), }, nil diff --git a/pkg/messagebroker/validations_test.go b/pkg/messagebroker/validations_test.go index a30aef6e..29bc6370 100644 --- a/pkg/messagebroker/validations_test.go +++ b/pkg/messagebroker/validations_test.go @@ -1,3 +1,4 @@ +//go:build unit // +build unit package messagebroker @@ -10,7 +11,12 @@ import ( func Test_validateKafkaConsumerClientConfig(t *testing.T) { op1 := ConsumerClientOptions{ - Topics: []string{"t1"}, + Topics: []TopicPartition{ + { + Topic: "t1", + Partition: 0, + }, + }, GroupID: "g1", } @@ -18,10 +24,15 @@ func Test_validateKafkaConsumerClientConfig(t *testing.T) { ops := []ConsumerClientOptions{ { - Topics: []string{"t1"}, + Topics: []TopicPartition{ + { + Topic: "t1", + Partition: 0, + }, + }, GroupID: "", }, { - Topics: []string{}, + Topics: []TopicPartition{}, GroupID: "g3", }, } diff --git a/service/consume-plane/config.go b/service/consume-plane/config.go new file mode 100644 index 00000000..ebf2632c --- /dev/null +++ b/service/consume-plane/config.go @@ -0,0 +1,28 @@ +package consumeplane + +import ( + "github.com/razorpay/metro/pkg/messagebroker" +) + +// Config for consumer +type Config struct { + Broker Broker + ReplicaCount int + OrdinalID int + Interfaces struct { + API NetworkInterfaces + } +} + +// Broker Config (Kafka/Pulsar) +type Broker struct { + Variant string // kafka or pulsar + BrokerConfig messagebroker.BrokerConfig +} + +// NetworkInterfaces contains all exposed interfaces +type NetworkInterfaces struct { + GrpcServerAddress string + HTTPServerAddress string + InternalHTTPServerAddress string +} diff --git a/service/consume-plane/consumeplaneserver.go b/service/consume-plane/consumeplaneserver.go new file mode 100644 index 00000000..882ac7fb --- /dev/null +++ b/service/consume-plane/consumeplaneserver.go @@ -0,0 +1,124 @@ +package consumeplane + +import ( + "context" + + "github.com/opentracing/opentracing-go" + "github.com/razorpay/metro/internal/brokerstore" + "github.com/razorpay/metro/internal/consumer" + "github.com/razorpay/metro/internal/merror" + "github.com/razorpay/metro/internal/subscriber" + "github.com/razorpay/metro/internal/subscription" + "github.com/razorpay/metro/pkg/logger" + metrov1 "github.com/razorpay/metro/rpc/proto/v1" + "github.com/razorpay/metro/service/web/stream" + "google.golang.org/protobuf/types/known/emptypb" +) + +type consumeplaneserver struct { + brokerStore brokerstore.IBrokerStore + subscriptionCore subscription.ICore + subscriberCore subscriber.ICore + manager consumer.ILifecycle +} + +func newConsumePlaneServer(brokerStore brokerstore.IBrokerStore, subscriptionCore subscription.ICore, subscriberCore subscriber.ICore, mgr consumer.ILifecycle) *consumeplaneserver { + + return &consumeplaneserver{ + brokerStore, + subscriptionCore, + subscriberCore, + mgr, + } +} + +// Acknowledge a message +func (c consumeplaneserver) Acknowledge(ctx context.Context, req *metrov1.AcknowledgeRequest) (*emptypb.Empty, error) { + logger.Ctx(ctx).Infow("consumeplaneserver: received request to ack messages", "ack_req", req.String()) + + span, ctx := opentracing.StartSpanFromContext(ctx, "ConsumePlaneServer.Acknowledge", opentracing.Tags{ + "subscription": req.Subscription, + "ack_ids": req.AckIds, + }) + defer span.Finish() + + parsedReq, parseErr := stream.NewParsedAcknowledgeRequest(req) + if parseErr != nil { + logger.Ctx(ctx).Errorw("consumeplaneserver: error is parsing ack request", "request", req, "error", parseErr.Error()) + return nil, merror.ToGRPCError(parseErr) + } + partitionAckMsgs := make(map[int][]*subscriber.AckMessage, 0) + for _, ack := range parsedReq.AckMessages { + partitionAckMsgs[int(ack.Partition)] = append(partitionAckMsgs[int(ack.Partition)], ack) + } + + for partition, ackMsgs := range partitionAckMsgs { + consumer, err := c.manager.GetConsumer(ctx, parsedReq.Subscription, partition) + if err != nil { + logger.Ctx(ctx).Errorw("consumeplaneserver: error is fetching consumer for fetch request", "request", req, "error", parseErr.Error()) + return nil, err + } + consumer.Acknowledge(ctx, ackMsgs) + + } + + return new(emptypb.Empty), nil +} + +// Fetch messages from the topic-partition +func (c consumeplaneserver) Fetch(ctx context.Context, req *metrov1.FetchRequest) (*metrov1.PullResponse, error) { + logger.Ctx(ctx).Infow("consumeplaneserver: received request to pull messages", "pull_req", req.String()) + span, ctx := opentracing.StartSpanFromContext(ctx, "ConsumePlaneServer.Pull", opentracing.Tags{ + "subscription": req.Subscription, + }) + + parsedReq, parseErr := consumer.NewParsedFetchRequest(req) + if parseErr != nil { + logger.Ctx(ctx).Errorw("consumeplaneserver: error is parsing pull request", "request", req, "error", parseErr.Error()) + return nil, parseErr + } + defer span.Finish() + consumer, err := c.manager.GetConsumer(ctx, parsedReq.Subscription, parsedReq.Partition) + if err != nil { + logger.Ctx(ctx).Errorw("consumeplaneserver: error in fetching consumer for fetch request", "subscription", parsedReq.Subscription) + return &metrov1.PullResponse{}, err + } + res, err := consumer.Fetch(ctx, int(req.MaxMessages)) + if err != nil { + logger.Ctx(ctx).Errorw("consumeplaneserver: Failed to fetch messages", "err", err.Error()) + return &metrov1.PullResponse{}, err + } + return res, nil +} + +// ModifyAckDeadline updates dealine for the given ackMessages +func (c consumeplaneserver) ModifyAckDeadline(ctx context.Context, req *metrov1.ModifyAckDeadlineRequest) (*emptypb.Empty, error) { + logger.Ctx(ctx).Infow("consumeplaneserver: received request to modack messages", "mod_ack_req", req.String()) + + span, ctx := opentracing.StartSpanFromContext(ctx, "ConsumePlaneServer.ModifyAckDeadline", opentracing.Tags{ + "subscription": req.Subscription, + "ack_ids": req.AckIds, + }) + defer span.Finish() + + parsedReq, parseErr := consumer.NewParsedModifyAckDeadlineRequest(req) + if parseErr != nil { + logger.Ctx(ctx).Errorw("consumeplaneserver: error is parsing modack request", "request", req, "error", parseErr.Error()) + return nil, merror.ToGRPCError(parseErr) + } + partitionAckMsgs := make(map[int][]*subscriber.AckMessage, 0) + for _, ack := range parsedReq.AckMessages { + partitionAckMsgs[int(ack.Partition)] = append(partitionAckMsgs[int(ack.Partition)], ack) + } + + for partition, ackMsgs := range partitionAckMsgs { + consumer, err := c.manager.GetConsumer(ctx, parsedReq.Subscription, partition) + if err != nil { + logger.Ctx(ctx).Errorw("consumeplaneserver: error is fetching consumer for fetch request", "request", req, "error", parseErr.Error()) + return nil, err + } + consumer.ModifyAckDeadline(ctx, ackMsgs) + + } + return new(emptypb.Empty), nil +} diff --git a/service/consume-plane/server.go b/service/consume-plane/server.go new file mode 100644 index 00000000..5fb72c75 --- /dev/null +++ b/service/consume-plane/server.go @@ -0,0 +1,37 @@ +package consumeplane + +import ( + "context" + "regexp" + + "github.com/razorpay/metro/internal/merror" + "github.com/razorpay/metro/pkg/logger" + metrov1 "github.com/razorpay/metro/rpc/proto/v1" +) + +var projectIDRegex *regexp.Regexp + +var ( + unknownResourceError = merror.New(merror.InvalidArgument, "unknown resource type") + invalidResourceNameError = merror.New(merror.InvalidArgument, "resource name is invalid") +) + +func init() { + // Regex to capture project id from resource names + // Resources are in the form /projects/// + // projectIDRegex = regexp.MustCompile(`^projects\/([^\/]+)\/.*`) +} + +func getResourceNameFromRequest(ctx context.Context, req interface{}) (string, error) { + switch t := req.(type) { + case *metrov1.AcknowledgeRequest: + return req.(*metrov1.AcknowledgeRequest).Subscription, nil + case *metrov1.PullRequest: + return req.(*metrov1.PullRequest).Subscription, nil + case *metrov1.ModifyAckDeadlineRequest: + return req.(*metrov1.ModifyAckDeadlineRequest).Subscription, nil + default: + logger.Ctx(ctx).Infof("unknown request type: %v", t) + return "", unknownResourceError + } +} diff --git a/service/consume-plane/service.go b/service/consume-plane/service.go new file mode 100644 index 00000000..bddca0dc --- /dev/null +++ b/service/consume-plane/service.go @@ -0,0 +1,125 @@ +package consumeplane + +import ( + "context" + + "github.com/grpc-ecosystem/grpc-gateway/v2/runtime" + "golang.org/x/sync/errgroup" + "google.golang.org/grpc" + + "github.com/razorpay/metro/internal/brokerstore" + "github.com/razorpay/metro/internal/consumer" + "github.com/razorpay/metro/internal/health" + "github.com/razorpay/metro/internal/offset" + "github.com/razorpay/metro/internal/project" + "github.com/razorpay/metro/internal/server" + "github.com/razorpay/metro/internal/subscriber" + "github.com/razorpay/metro/internal/subscription" + "github.com/razorpay/metro/internal/topic" + "github.com/razorpay/metro/pkg/logger" + "github.com/razorpay/metro/pkg/messagebroker" + "github.com/razorpay/metro/pkg/registry" + metrov1 "github.com/razorpay/metro/rpc/proto/v1" +) + +// Service for producer +type Service struct { + consumeConfig *Config + registryConfig *registry.Config +} + +// NewService creates an instance of new producer service +func NewService(consumeConfig *Config, registryConfig *registry.Config) (*Service, error) { + return &Service{ + consumeConfig: consumeConfig, + registryConfig: registryConfig, + }, nil +} + +// Start the service +func (svc *Service) Start(ctx context.Context) error { + // Define server handlers + r, err := registry.NewRegistry(svc.registryConfig) + if err != nil { + return err + } + + brokerStore, err := brokerstore.NewBrokerStore(svc.consumeConfig.Broker.Variant, &svc.consumeConfig.Broker.BrokerConfig) + if err != nil { + return err + } + + admin, _ := brokerStore.GetAdmin(ctx, messagebroker.AdminClientOptions{}) + // init broker health checker + brokerHealthChecker := health.NewBrokerHealthChecker(admin) + + // register broker and registry health checkers on the health server + healthCore, err := health.NewCore(brokerHealthChecker) + if err != nil { + return err + } + + projectCore := project.NewCore(project.NewRepo(r)) + + topicCore := topic.NewCore(topic.NewRepo(r), projectCore, brokerStore) + + subscriptionCore := subscription.NewCore(svc.consumeConfig.ReplicaCount, subscription.NewRepo(r), projectCore, topicCore) + + subscriberCore := subscriber.NewCore(brokerStore, subscriptionCore, offset.NewCore(offset.NewRepo(r))) + + mgr, err := consumer.NewLifecycleManager(ctx, svc.consumeConfig.ReplicaCount, svc.consumeConfig.OrdinalID, subscriptionCore, subscriberCore, brokerStore, r) + if err != nil { + logger.Ctx(ctx).Errorw("consumeplaneserver: error setting up lifecycle manager", "error", err.Error()) + } + // initiates a error group + grp, gctx := errgroup.WithContext(ctx) + + grp.Go(func() error { + mgr.Run() + return nil + }) + grp.Go(func() error { + err := server.RunGRPCServer( + gctx, + svc.consumeConfig.Interfaces.API.GrpcServerAddress, + func(server *grpc.Server) error { + metrov1.RegisterStatusCheckAPIServer(server, health.NewServer(healthCore)) + metrov1.RegisterConsumePlaneServer(server, newConsumePlaneServer(brokerStore, subscriptionCore, subscriberCore, mgr)) + return nil + }, + ) + + return err + }) + + grp.Go(func() error { + err := server.RunHTTPServer( + gctx, + svc.consumeConfig.Interfaces.API.HTTPServerAddress, + func(mux *runtime.ServeMux) error { + err := metrov1.RegisterStatusCheckAPIHandlerFromEndpoint(gctx, mux, svc.consumeConfig.Interfaces.API.GrpcServerAddress, []grpc.DialOption{grpc.WithInsecure()}) + if err != nil { + return err + } + + err = metrov1.RegisterConsumePlaneHandlerFromEndpoint(gctx, mux, svc.consumeConfig.Interfaces.API.GrpcServerAddress, []grpc.DialOption{grpc.WithInsecure()}) + if err != nil { + return err + } + return nil + }) + + return err + }) + + grp.Go(func() error { + err := server.RunInternalHTTPServer(gctx, svc.consumeConfig.Interfaces.API.InternalHTTPServerAddress) + return err + }) + + err = grp.Wait() + if err != nil { + logger.Ctx(gctx).Errorf("web service error: %s", err.Error()) + } + return err +} diff --git a/service/web/config.go b/service/web/config.go index 89a4c28c..a65d5f4b 100644 --- a/service/web/config.go +++ b/service/web/config.go @@ -1,13 +1,17 @@ package web import ( + "github.com/razorpay/metro/pkg/httpclient" "github.com/razorpay/metro/pkg/messagebroker" ) // Config for producer type Config struct { - Broker Broker - Interfaces struct { + Broker Broker + ReplicaCount int + ConsumePlaneAddress string + HTTPClientConfig httpclient.Config + Interfaces struct { API NetworkInterfaces } } diff --git a/service/web/service.go b/service/web/service.go index baef4b9b..0ea3b475 100644 --- a/service/web/service.go +++ b/service/web/service.go @@ -11,7 +11,6 @@ import ( "github.com/razorpay/metro/internal/credentials" "github.com/razorpay/metro/internal/health" "github.com/razorpay/metro/internal/interceptors" - "github.com/razorpay/metro/internal/offset" "github.com/razorpay/metro/internal/project" "github.com/razorpay/metro/internal/publisher" "github.com/razorpay/metro/internal/server" @@ -22,7 +21,6 @@ import ( "github.com/razorpay/metro/pkg/messagebroker" "github.com/razorpay/metro/pkg/registry" metrov1 "github.com/razorpay/metro/rpc/proto/v1" - "github.com/razorpay/metro/service/web/stream" _ "github.com/razorpay/metro/statik" // to serve openAPI static assets ) @@ -82,15 +80,14 @@ func (svc *Service) Start(ctx context.Context) error { topicCore := topic.NewCore(topic.NewRepo(r), projectCore, brokerStore) - subscriptionCore := subscription.NewCore(subscription.NewRepo(r), projectCore, topicCore) + subscriptionCore := subscription.NewCore(0, subscription.NewRepo(r), projectCore, topicCore) credentialsCore := credentials.NewCore(credentials.NewRepo(r), projectCore) publisherCore := publisher.NewCore(brokerStore) - offsetCore := offset.NewCore(offset.NewRepo(r)) - - streamManager := stream.NewStreamManager(ctx, subscriptionCore, offsetCore, brokerStore, svc.webConfig.Interfaces.API.GrpcServerAddress) + replicaCount := svc.webConfig.ReplicaCount + consumePlaneDeployment := svc.webConfig.ConsumePlaneAddress // initiates a error group grp, gctx := errgroup.WithContext(ctx) @@ -103,7 +100,7 @@ func (svc *Service) Start(ctx context.Context) error { metrov1.RegisterStatusCheckAPIServer(server, health.NewServer(healthCore)) metrov1.RegisterPublisherServer(server, newPublisherServer(projectCore, brokerStore, topicCore, credentialsCore, publisherCore)) metrov1.RegisterAdminServiceServer(server, newAdminServer(svc.admin, projectCore, subscriptionCore, topicCore, credentialsCore, brokerStore)) - metrov1.RegisterSubscriberServer(server, newSubscriberServer(projectCore, brokerStore, subscriptionCore, credentialsCore, streamManager)) + metrov1.RegisterSubscriberServer(server, newSubscriberServer(projectCore, brokerStore, subscriptionCore, topicCore, credentialsCore, replicaCount, consumePlaneDeployment, &svc.webConfig.HTTPClientConfig)) return nil }, getInterceptors()..., diff --git a/service/web/stream/request.go b/service/web/stream/request.go index 258e8d3e..31eafe65 100644 --- a/service/web/stream/request.go +++ b/service/web/stream/request.go @@ -18,6 +18,12 @@ type ParsedStreamingPullRequest struct { ModifyDeadlineMsgIdsWithSecs map[string]int32 } +// ParsedPullRequest ... +type ParsedPullRequest struct { + Subscription string + MaxMessages int +} + // HasSubscription ... func (r *ParsedStreamingPullRequest) HasSubscription() bool { return len(strings.Trim(r.Subscription, " ")) > 0 @@ -71,6 +77,16 @@ func NewParsedStreamingPullRequest(req *metrov1.StreamingPullRequest) (*ParsedSt return parsedReq, nil } +// NewParsedPullRequest ... +func NewParsedPullRequest(req *metrov1.PullRequest) (*ParsedPullRequest, error) { + parsedReq := &ParsedPullRequest{ + Subscription: req.Subscription, + MaxMessages: int(req.MaxMessages), + } + + return parsedReq, nil +} + // NewParsedAcknowledgeRequest ... func NewParsedAcknowledgeRequest(req *metrov1.AcknowledgeRequest) (*ParsedStreamingPullRequest, error) { parsedReq := &ParsedStreamingPullRequest{} diff --git a/service/web/subscriberserver.go b/service/web/subscriberserver.go index 5af744a8..3191d5b8 100644 --- a/service/web/subscriberserver.go +++ b/service/web/subscriberserver.go @@ -1,8 +1,12 @@ package web import ( + "bytes" "context" + "fmt" + "net/http" + "github.com/gogo/protobuf/jsonpb" "github.com/imdario/mergo" "github.com/mennanov/fmutils" "github.com/opentracing/opentracing-go" @@ -11,23 +15,63 @@ import ( "github.com/razorpay/metro/internal/merror" "github.com/razorpay/metro/internal/project" "github.com/razorpay/metro/internal/subscription" + "github.com/razorpay/metro/internal/topic" + "github.com/razorpay/metro/pkg/httpclient" "github.com/razorpay/metro/pkg/logger" metrov1 "github.com/razorpay/metro/rpc/proto/v1" "github.com/razorpay/metro/service/web/stream" - "golang.org/x/sync/errgroup" "google.golang.org/protobuf/types/known/emptypb" ) +const ( + consumePlaneAddressFormat string = "https://%s-%d.%s" + consumePlaneDeployment string = "metro-consume-plane" + acknowledgeRequestFormat string = "%s/v1/%s:acknowledge" + fetchRequestFormat string = "%s/v1/%s:fetch" + modAckRequestFormat string = "%s/v1/%s:modifyAckDeadline" +) + type subscriberserver struct { projectCore project.ICore brokerStore brokerstore.IBrokerStore subscriptionCore subscription.ICore + topicCore topic.ICore credentialCore credentials.ICore - psm stream.IManager + replicaCount int + consumeNodes map[int]string + marshaler jsonpb.Marshaler + unmarshaler jsonpb.Unmarshaler + httpClient *http.Client } -func newSubscriberServer(projectCore project.ICore, brokerStore brokerstore.IBrokerStore, subscriptionCore subscription.ICore, credentialCore credentials.ICore, psm stream.IManager) *subscriberserver { - return &subscriberserver{projectCore, brokerStore, subscriptionCore, credentialCore, psm} +func newSubscriberServer( + projectCore project.ICore, + brokerStore brokerstore.IBrokerStore, + subscriptionCore subscription.ICore, + topicCore topic.ICore, + credentialCore credentials.ICore, + replicaCount int, + consumePlaneAddress string, + config *httpclient.Config, +) *subscriberserver { + consumeNodes := make(map[int]string, 0) + for i := 0; i < replicaCount; i++ { + consumeNodes[i] = fmt.Sprintf(consumePlaneAddress, consumePlaneDeployment, i, consumePlaneAddress) + } + marshaler := jsonpb.Marshaler{ + EnumsAsInts: false, + EmitDefaults: false, + Indent: "", + OrigName: false, + AnyResolver: nil, + } + + unmarshaler := jsonpb.Unmarshaler{ + AllowUnknownFields: true, + AnyResolver: nil, + } + httpClient := httpclient.NewClient(config) + return &subscriberserver{projectCore, brokerStore, subscriptionCore, topicCore, credentialCore, replicaCount, consumeNodes, marshaler, unmarshaler, httpClient} } // CreateSubscription to create a new subscription @@ -101,12 +145,43 @@ func (s subscriberserver) Acknowledge(ctx context.Context, req *metrov1.Acknowle logger.Ctx(ctx).Errorw("subscriberserver: error is parsing ack request", "request", req, "error", parseErr.Error()) return nil, merror.ToGRPCError(parseErr) } - - err := s.psm.Acknowledge(ctx, parsedReq) - if err != nil { - return nil, merror.ToGRPCError(err) + AckIdsByPartitions := make(map[int32][]string) + for _, ack := range parsedReq.AckMessages { + AckIdsByPartitions[ack.Partition] = append(AckIdsByPartitions[ack.Partition], ack.AckID) } + for part, msgs := range AckIdsByPartitions { + hash := s.subscriptionCore.FetchSubscriptionHash(ctx, req.Subscription, int(part)) + + if node, ok := s.consumeNodes[hash%s.replicaCount]; ok { + var b []byte + byteBuffer := bytes.NewBuffer(b) + body := &metrov1.AcknowledgeRequest{ + Subscription: req.Subscription, + AckIds: msgs, + } + err := s.marshaler.Marshal(byteBuffer, body) + if err != nil { + logger.Ctx(ctx).Errorw("subscriberserver: Failed to marshal upstream request", "error", err.Error(), "subscription", parsedReq.Subscription) + } + cpReq, err := http.NewRequestWithContext(ctx, "POST", fmt.Sprintf(acknowledgeRequestFormat, node, parsedReq.Subscription), byteBuffer) + + _, err = s.httpClient.Do(cpReq) + if err != nil { + logger.Ctx(ctx).Errorw("subscriberserver: Failed to send request to consume plane", "error", err.Error()) + } + + } else { + logger.Ctx(ctx).Errorw("subscriberserver: Failed to fetch consume plane for partition", + "subscription", parsedReq.Subscription, + "partition", part, + "hash", hash, + "replicaCount", s.replicaCount, + ) + continue + } + + } return new(emptypb.Empty), nil } @@ -118,73 +193,66 @@ func (s subscriberserver) Pull(ctx context.Context, req *metrov1.PullRequest) (* "subscription": req.Subscription, }) defer span.Finish() - - // non streaming pull not to be supported - /* - res, err := s.subscriberCore.Pull(ctx, &subscriber.PullRequest{req.Subscription, 0, 0}, 2, xid.New().String()) // TODO: fix - if err != nil { - logger.Ctx(ctx).Errorw("pull response errored", "msg", err.Error()) - return nil, merror.ToGRPCError(err) - } - return &metrov1.PullResponse{ReceivedMessages: res.ReceivedMessages}, nil - - */ - return &metrov1.PullResponse{}, nil -} - -// StreamingPull ... -func (s subscriberserver) StreamingPull(server metrov1.Subscriber_StreamingPullServer) error { - // TODO: check if the requested subscription is push based and handle it the way pubsub does - ctx := server.Context() - errGroup := new(errgroup.Group) - - // the first request reaching this server path would always be to establish a new stream. - // once established the active stream server instance will be held in pullstream and - // periodically polled for new requests - req, err := server.Recv() - if err != nil { - return err - } - - parsedReq, parseErr := stream.NewParsedStreamingPullRequest(req) + parsedReq, parseErr := stream.NewParsedPullRequest(req) if parseErr != nil { logger.Ctx(ctx).Errorw("subscriberserver: error is parsing pull request", "request", req, "error", parseErr.Error()) - return nil + return &metrov1.PullResponse{}, parseErr } - // request to init a new stream - if parsedReq.HasSubscription() { - err := s.psm.CreateNewStream(server, parsedReq, errGroup) + // Restricting to one partition till dynamic re-partitioning is in place. + // This helps us avoid two lookups for now (subscription lookup, topic lookup) + hash := s.subscriptionCore.FetchSubscriptionHash(ctx, req.Subscription, 0) + + if node, ok := s.consumeNodes[hash%s.replicaCount]; ok { + var b []byte + byteBuffer := bytes.NewBuffer(b) + body := &metrov1.FetchRequest{ + Subscription: parsedReq.Subscription, + Partition: 0, + MaxMessages: int32(parsedReq.MaxMessages), + } + err := s.marshaler.Marshal(byteBuffer, body) if err != nil { - return merror.ToGRPCError(err) + logger.Ctx(ctx).Errorw("subscriberserver: Failed to marshal upstream request", "error", err.Error(), "subscription", parsedReq.Subscription) } - } else { - return merror.New(merror.InvalidArgument, "subscription name empty").ToGRPCError() - } - // ack and modack here for the first time - // later it happens in stream handler - if parsedReq.HasModifyAcknowledgement() { - // request to modify acknowledgement deadlines - // Nack indicated by modifying the deadline to zero - // https://github.com/googleapis/google-cloud-go/blob/pubsub/v1.10.0/pubsub/iterator.go#L348 - err := s.psm.ModifyAcknowledgement(ctx, parsedReq) + cpReq, err := http.NewRequestWithContext(ctx, "POST", fmt.Sprintf(fetchRequestFormat, node, parsedReq.Subscription), byteBuffer) if err != nil { - return merror.ToGRPCError(err) + logger.Ctx(ctx).Errorw("subscriberserver: Failed to create request object for consume plane", "error", err.Error(), "url", fmt.Sprintf(fetchRequestFormat, node, parsedReq.Subscription)) } - } - if parsedReq.HasAcknowledgement() { - // request to acknowledge existing messages - err := s.psm.Acknowledge(ctx, parsedReq) + cpRes, err := s.httpClient.Do(cpReq) if err != nil { - return merror.ToGRPCError(err) + logger.Ctx(ctx).Errorw("subscriberserver: Failed to send request to consume plane", "error", err.Error(), "url", fmt.Sprintf(fetchRequestFormat, node, parsedReq.Subscription)) } + defer cpRes.Body.Close() + if cpRes.StatusCode == http.StatusOK { + resp := &metrov1.PullResponse{} + + err = s.unmarshaler.Unmarshal(cpRes.Body, resp) + if err != nil { + logger.Ctx(ctx).Errorw("subscriberserver: Failed to Unmarshal response", "error", err.Error()) + } + return resp, nil + + } + } else { + logger.Ctx(ctx).Errorw("subscriberserver: Failed to fetch consume plane for partition", + "subscription", parsedReq.Subscription, + "hash", hash, + "replicaCount", s.replicaCount, + ) + } - if err := errGroup.Wait(); err != nil { - return err - } - return nil + + return &metrov1.PullResponse{}, nil +} + +// StreamingPull ... +func (s subscriberserver) StreamingPull(server metrov1.Subscriber_StreamingPullServer) error { + // TODO: Implement streaming pull with state management + + return http.ErrNotSupported } // DeleteSubscription deletes a subscription @@ -222,9 +290,42 @@ func (s subscriberserver) ModifyAckDeadline(ctx context.Context, req *metrov1.Mo logger.Ctx(ctx).Errorw("subscriberserver: error is parsing modack request", "request", req, "error", parseErr.Error()) return nil, merror.ToGRPCError(parseErr) } - err := s.psm.ModifyAcknowledgement(ctx, parsedReq) - if err != nil { - return nil, merror.ToGRPCError(err) + AckIdsByPartitions := make(map[int32][]string) + for _, ack := range parsedReq.AckMessages { + AckIdsByPartitions[ack.Partition] = append(AckIdsByPartitions[ack.Partition], ack.AckID) + } + + for part, msgs := range AckIdsByPartitions { + hash := s.subscriptionCore.FetchSubscriptionHash(ctx, parsedReq.Subscription, int(part)) + if node, ok := s.consumeNodes[hash%s.replicaCount]; ok { + var b []byte + byteBuffer := bytes.NewBuffer(b) + body := &metrov1.ModifyAckDeadlineRequest{ + Subscription: req.Subscription, + AckIds: msgs, + AckDeadlineSeconds: req.AckDeadlineSeconds, + } + err := s.marshaler.Marshal(byteBuffer, body) + if err != nil { + logger.Ctx(ctx).Errorw("subscriberserver: Failed to marshal upstream request", "error", err.Error(), "subscription", parsedReq.Subscription) + } + cpReq, err := http.NewRequestWithContext(ctx, "POST", fmt.Sprintf(modAckRequestFormat, node, parsedReq.Subscription), byteBuffer) + + _, err = s.httpClient.Do(cpReq) + if err != nil { + logger.Ctx(ctx).Errorw("subscriberserver: Failed to send request to consume plane", "error", err.Error()) + } + + } else { + logger.Ctx(ctx).Errorw("subscriberserver: Failed to fetch consume plane for partition", + "subscription", parsedReq.Subscription, + "partition", part, + "hash", hash, + "replicaCount", s.replicaCount, + ) + continue + } + } return new(emptypb.Empty), nil } diff --git a/service/worker/service.go b/service/worker/service.go index 8729db62..bdd5165a 100644 --- a/service/worker/service.go +++ b/service/worker/service.go @@ -67,6 +67,7 @@ func NewService(workerConfig *Config, registryConfig *registry.Config, cacheConf topicCore := topic.NewCore(topic.NewRepo(reg), projectCore, brokerStore) subscriptionCore := subscription.NewCore( + 1, // Workers do not fetch from consume plane. Hence init nodes as 1. subscription.NewRepo(reg), projectCore, topicCore) diff --git a/tests/integration/messagebroker/messagebroker_kafka_test.go b/tests/integration/messagebroker/messagebroker_kafka_test.go index 7f4dbcc4..1f889ef1 100644 --- a/tests/integration/messagebroker/messagebroker_kafka_test.go +++ b/tests/integration/messagebroker/messagebroker_kafka_test.go @@ -1,3 +1,4 @@ +//go:build integration // +build integration package messagebroker @@ -32,10 +33,15 @@ func Test_CreateValidTopic(t *testing.T) { // create consumer to fetch topic metadata consumer1, err := messagebroker.NewConsumerClient(context.Background(), "kafka", getKafkaBrokerConfig(), &messagebroker.ConsumerClientOptions{ - Topics: []string{topic}, + Topics: []messagebroker.TopicPartition{ + { + Topic: topic, + Partition: 0, + }, + }, GroupID: "dummy-group-2", }) - metadata, merr := consumer1.GetTopicMetadata(context.Background(), messagebroker.GetTopicMetadataRequest{Topic: topic}) + metadata, merr := consumer1.GetTopicPartitionMetadata(context.Background(), messagebroker.GetTopicPartitionMetadataRequest{Topic: topic}) assert.Nil(t, merr) assert.NotNil(t, metadata) @@ -149,7 +155,12 @@ func Test_ProduceAndConsumeMessagesInDetail(t *testing.T) { // now consume the messages and assert the message ids generated in the previous step consumer1, err := messagebroker.NewConsumerClient(context.Background(), "kafka", getKafkaBrokerConfig(), &messagebroker.ConsumerClientOptions{ - Topics: []string{topic}, + Topics: []messagebroker.TopicPartition{ + { + Topic: topic, + Partition: 0, + }, + }, GroupID: "dummy-group-1", }) @@ -174,7 +185,12 @@ func Test_ProduceAndConsumeMessagesInDetail(t *testing.T) { // spwan a new consumer and try to re-receive after commit and make sure no new messages are available consumer3, err := messagebroker.NewConsumerClient(context.Background(), "kafka", getKafkaBrokerConfig(), &messagebroker.ConsumerClientOptions{ - Topics: []string{topic}, + Topics: []messagebroker.TopicPartition{ + { + Topic: topic, + Partition: 0, + }, + }, GroupID: "dummy-group-1", })