From 6616a863a1a5ae50eef80916537f5af0a3d8dae5 Mon Sep 17 00:00:00 2001 From: vnktram Date: Mon, 13 Dec 2021 20:31:23 +0530 Subject: [PATCH 01/14] Consume plane setup --- cmd/service/metro/component.go | 3 + cmd/service/metro/metro.go | 4 +- config/default.toml | 16 ++ config/dev_docker.toml | 15 ++ internal/config/config.go | 2 + internal/consumer/consumer.go | 136 +++++++++++++++ internal/consumer/lifecycle.go | 156 ++++++++++++++++++ internal/consumer/request.go | 101 ++++++++++++ internal/hash/hash.go | 11 ++ internal/subscriber/core.go | 95 ++++++++++- internal/subscriber/model.go | 2 + internal/subscriber/retry/delayconsumer.go | 30 +++- internal/subscriber/subscriber.go | 22 ++- internal/subscription/core.go | 44 ++++- internal/subscription/model.go | 1 + pkg/messagebroker/config.go | 7 +- pkg/messagebroker/kafka.go | 59 ++++++- pkg/messagebroker/messagebroker.go | 7 +- pkg/messagebroker/models.go | 25 ++- pkg/messagebroker/pulsar.go | 27 ++- service/consume-plane/config.go | 28 ++++ service/consume-plane/consumeplaneserver.go | 122 ++++++++++++++ service/consume-plane/server.go | 37 +++++ service/consume-plane/service.go | 122 ++++++++++++++ service/web/service.go | 2 +- service/worker/service.go | 1 + .../messagebroker/messagebroker_kafka_test.go | 3 +- 27 files changed, 1044 insertions(+), 34 deletions(-) create mode 100644 internal/consumer/consumer.go create mode 100644 internal/consumer/lifecycle.go create mode 100644 internal/consumer/request.go create mode 100644 internal/hash/hash.go create mode 100644 service/consume-plane/config.go create mode 100644 service/consume-plane/consumeplaneserver.go create mode 100644 service/consume-plane/server.go create mode 100644 service/consume-plane/service.go diff --git a/cmd/service/metro/component.go b/cmd/service/metro/component.go index 390c3cef..c497b909 100644 --- a/cmd/service/metro/component.go +++ b/cmd/service/metro/component.go @@ -6,6 +6,7 @@ import ( "github.com/razorpay/metro/internal/config" "github.com/razorpay/metro/pkg/logger" "github.com/razorpay/metro/service" + consumeplane "github.com/razorpay/metro/service/consume-plane" openapiserver "github.com/razorpay/metro/service/openapi-server" "github.com/razorpay/metro/service/web" "github.com/razorpay/metro/service/worker" @@ -29,6 +30,8 @@ func NewComponent(component string, cfg config.Config) (*Component, error) { svc, err = worker.NewService(&cfg.Worker, &cfg.Registry) case OpenAPIServer: svc, err = openapiserver.NewService(&cfg.OpenAPIServer) + case ConsumePlane: + svc, err = consumeplane.NewService(&cfg.ConsumePlane, &cfg.Registry) } if err != nil { diff --git a/cmd/service/metro/metro.go b/cmd/service/metro/metro.go index 94cc570c..bf640dcc 100644 --- a/cmd/service/metro/metro.go +++ b/cmd/service/metro/metro.go @@ -21,9 +21,11 @@ const ( Worker = "worker" // OpenAPIServer to server swagger docs OpenAPIServer = "openapi-server" + // ConsumePlane component serves as a broker interface + ConsumePlane = "consume-plane" ) -var validComponents = []string{Web, Worker, OpenAPIServer} +var validComponents = []string{Web, Worker, OpenAPIServer, ConsumePlane} var component *Component // isValidComponent validates if the input component is a valid metro component diff --git a/config/default.toml b/config/default.toml index 8e8b54d2..324ab246 100644 --- a/config/default.toml +++ b/config/default.toml @@ -29,6 +29,22 @@ HttpServerAddress = "0.0.0.0:8082" InternalHttpServerAddress = "0.0.0.0:9000" + +[consumePlane] + replicaCount = 1 + ordinalId = 0 + [consumePlane.broker] + variant = "kafka" + [consumePlane.broker.brokerconfig] + brokers = ["localhost:9092"] + enableTLS = false + [consumePlane.broker.brokerconfig.consumePlane] + [consumePlane.interfaces] + [consumePlane.interfaces.api] + GrpcServerAddress = "0.0.0.0:8085" + HttpServerAddress = "0.0.0.0:8086" + InternalHttpServerAddress = "0.0.0.0:9003" + [worker] [worker.broker] variant = "kafka" diff --git a/config/dev_docker.toml b/config/dev_docker.toml index 34dc2710..b022571c 100644 --- a/config/dev_docker.toml +++ b/config/dev_docker.toml @@ -28,6 +28,21 @@ HttpServerAddress = "0.0.0.0:8082" InternalHttpServerAddress = "0.0.0.0:9000" +[consumePlane] + replicaCount = 1 + ordinalId = 0 + [consumePlane.broker] + variant = "kafka" + [consumePlane.broker.brokerconfig] + brokers = ["localhost:9092"] + enableTLS = false + [consumePlane.broker.brokerconfig.consumePlane] + [consumePlane.interfaces] + [consumePlane.interfaces.api] + GrpcServerAddress = "0.0.0.0:8085" + HttpServerAddress = "0.0.0.0:8086" + InternalHttpServerAddress = "0.0.0.0:9003" + [worker] [worker.broker] variant = "kafka" diff --git a/internal/config/config.go b/internal/config/config.go index 27cbe612..5ec4b600 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -6,6 +6,7 @@ import ( "github.com/razorpay/metro/pkg/monitoring/sentry" "github.com/razorpay/metro/pkg/registry" "github.com/razorpay/metro/pkg/tracing" + consumeplane "github.com/razorpay/metro/service/consume-plane" openapiserver "github.com/razorpay/metro/service/openapi-server" "github.com/razorpay/metro/service/web" worker "github.com/razorpay/metro/service/worker" @@ -18,6 +19,7 @@ type Config struct { Sentry sentry.Config Web web.Config Worker worker.Config + ConsumePlane consumeplane.Config Registry registry.Config OpenAPIServer openapiserver.Config Admin credentials.Model diff --git a/internal/consumer/consumer.go b/internal/consumer/consumer.go new file mode 100644 index 00000000..515f3ca2 --- /dev/null +++ b/internal/consumer/consumer.go @@ -0,0 +1,136 @@ +package consumer + +import ( + "context" + "fmt" + "io" + "time" + + "github.com/razorpay/metro/internal/subscriber" + "github.com/razorpay/metro/internal/subscription" + "github.com/razorpay/metro/pkg/logger" + "github.com/razorpay/metro/pkg/messagebroker" + metrov1 "github.com/razorpay/metro/rpc/proto/v1" +) + +type IConsumer interface { + Run() error + Stop() + Acknowledge(ctx context.Context, req *ParsedAcknowledgeRequest) + ModifyAckDeadline(ctx context.Context, req *ParsedModifyAckDeadlineRequest) + Fetch(ctx context.Context, messageCount int) (*metrov1.PullResponse, error) +} + +type Consumer struct { + computedHash int + subscriberID string + subscription *subscription.Model + subscriberCore subscriber.ICore + subscriptionSubscriber subscriber.ISubscriber + ctx context.Context + errChan chan error +} + +// DefaultNumMessageCount ... +var DefaultNumMessageCount int32 = 10 + +func NewConsumer(ctx context.Context, computedHash int, subscriberID string, subscription *subscription.Model, subCore subscriber.ICore, subs subscriber.ISubscriber) *Consumer { + con := &Consumer{ + ctx: ctx, + computedHash: computedHash, + subscriberID: subscriberID, + subscription: subscription, + subscriptionSubscriber: subs, + errChan: make(chan error), + } + return con +} + +func (c *Consumer) Fetch(ctx context.Context, messageCount int) (*metrov1.PullResponse, error) { + respChan := make(chan *metrov1.PullResponse) + defer close(respChan) + c.subscriptionSubscriber.GetRequestChannel() <- (&subscriber.PullRequest{ + MaxNumOfMessages: int32(messageCount), + RespChan: respChan, + }).WithContext(ctx) + + select { + case resp := <-respChan: + return resp, nil + case <-ctx.Done(): + return &metrov1.PullResponse{}, ctx.Err() + } + +} + +func (c *Consumer) Acknowledge(ctx context.Context, ackMsgs []*subscriber.AckMessage) { + for _, ackMsg := range ackMsgs { + c.subscriptionSubscriber.GetAckChannel() <- ackMsg.WithContext(ctx) + } +} + +func (c *Consumer) ModifyAckDeadline(ctx context.Context, mackMsgs []*subscriber.AckMessage) { + for _, modAckMsg := range mackMsgs { + modAckReq := subscriber.NewModAckMessage(modAckMsg, modAckMsg.Deadline) + modAckReq = modAckReq.WithContext(ctx) + c.subscriptionSubscriber.GetModAckChannel() <- modAckReq + } +} + +func (c *Consumer) Run() error { + // stream ack timeout + streamAckDeadlineSecs := int32(30) // init with some sane value + timeout := time.NewTicker(time.Duration(streamAckDeadlineSecs) * time.Second) + for { + select { + case <-c.ctx.Done(): + logger.Ctx(c.ctx).Infow("stopping subscriber from <-s.ctx.Done()") + c.stop() + return c.ctx.Err() + case <-timeout.C: + logger.Ctx(c.ctx).Infow("stopping subscriber from <-timeout.C") + c.stop() + return fmt.Errorf("stream: ack deadline seconds crossed") + case err := <-c.errChan: + logger.Ctx(c.ctx).Infow("stopping subscriber from err := <-s.errChan") + c.stop() + if err == io.EOF { + // return will close stream from server side + logger.Ctx(c.ctx).Errorw("stream: EOF received from client") + } else if err != nil { + logger.Ctx(c.ctx).Errorw("stream: error received from client", "error", err.Error()) + } + return nil + case err := <-c.subscriptionSubscriber.GetErrorChannel(): + // streamManagerSubscriberErrors.WithLabelValues(env, s.subscriberID, s.subscriptionSubscriber.GetSubscriptionName(), err.Error()).Inc() + if messagebroker.IsErrorRecoverable(err) { + // no need to stop the subscriber in such cases. just log and return + logger.Ctx(c.ctx).Errorw("subscriber: got recoverable error", err.Error()) + return nil + } + + logger.Ctx(c.ctx).Errorw("subscriber: got un-recoverable error", "error", err.Error()) + logger.Ctx(c.ctx).Infow("stopping subscriber from err := <-s.subscriptionSubscriber.GetErrorChannel()") + c.stop() + return err + + default: + timeout.Reset(time.Duration(streamAckDeadlineSecs) * time.Second) + } + } +} + +func (c *Consumer) stop() { + c.subscriptionSubscriber.Stop() + c.closeSubscriberChannels() + + logger.Ctx(c.ctx).Infow("stopped subscriber...", "subscriberId", c.subscriberID) + +} + +func (s *Consumer) closeSubscriberChannels() { + close(s.errChan) + close(s.subscriptionSubscriber.GetRequestChannel()) + close(s.subscriptionSubscriber.GetAckChannel()) + close(s.subscriptionSubscriber.GetModAckChannel()) +} diff --git a/internal/consumer/lifecycle.go b/internal/consumer/lifecycle.go new file mode 100644 index 00000000..f23c098d --- /dev/null +++ b/internal/consumer/lifecycle.go @@ -0,0 +1,156 @@ +package consumer + +import ( + "context" + "sync" + + "github.com/google/uuid" + "github.com/pkg/errors" + "github.com/razorpay/metro/pkg/logger" + + "github.com/razorpay/metro/internal/brokerstore" + "github.com/razorpay/metro/internal/subscriber" + "github.com/razorpay/metro/internal/subscription" +) + +// ILifecycle interface defines lifecycle manager methods +type ILifecycle interface { + GetConsumer(ctx context.Context, sub string, partition int) (*Consumer, error) + CloseConsumer(ctx context.Context, computedHash int) error + Run() +} + +// Manager ... +type Manager struct { + consumers map[int]*Consumer + subscriptionCore subscription.ICore + cleanupCh chan cleanupMessage + replicas int + ordinalId int + bs brokerstore.IBrokerStore + mutex *sync.Mutex + ctx context.Context +} + +// NewLifecycleManager ... +func NewLifecycleManager(ctx context.Context, replicas int, ordinalId int, subscriptionCore subscription.ICore, subscriberCore subscriber.ICore, bs brokerstore.IBrokerStore) (ILifecycle, error) { + mgr := &Manager{ + consumers: make(map[int]*Consumer), + subscriptionCore: subscriptionCore, + bs: bs, + replicas: replicas, + ordinalId: ordinalId, + mutex: &sync.Mutex{}, + ctx: ctx, + } + + allSubs, err := subscriptionCore.List(ctx, subscription.Prefix) + if err != nil { + logger.Ctx(ctx).Errorw("error fetching new subscription list", "error", err) + return nil, err + } + + // Filter Pull Subscriptions + var pullSubs []*subscription.Model + for _, sub := range allSubs { + if !sub.IsPush() { + pullSubs = append(pullSubs, sub) + } + } + + for _, sub := range pullSubs { + subPartitions, err := subscriptionCore.FetchPartitionsForHash(ctx, sub, ordinalId) + if err != nil { + logger.Ctx(ctx).Errorw("Error resolving partitions for subscription", "subscription", sub.Name) + } + for _, partition := range subPartitions { + subscriberID := uuid.New().String() + + var ( + // init these channels and pass to subscriber + // the lifecycle of these channels should be maintain by the user + subscriberRequestCh = make(chan *subscriber.PullRequest) + subscriberAckCh = make(chan *subscriber.AckMessage) + subscriberModAckCh = make(chan *subscriber.ModAckMessage) + ) + computedHash := subscriptionCore.FetchSubscriptionHash(ctx, sub.Name, partition) + subscriber, err := subscriberCore.NewOpinionatedSubscriber(ctx, subscriberID, sub, partition, computedHash, 100, 50, 5000, + subscriberRequestCh, subscriberAckCh, subscriberModAckCh) + if err != nil { + logger.Ctx(ctx).Errorw("lifecyclemanager: failed to create subscriber for subscription-partition", "subscription", sub.Name, "partition", partition) + // Proceed without failing since this requires other subscribers to be setup + } else { + mgr.consumers[computedHash] = &Consumer{ + ctx: ctx, + computedHash: computedHash, + subscriberID: subscriberID, + subscription: sub, + subscriberCore: subscriberCore, + subscriptionSubscriber: subscriber, + } + } + } + + } + + // swh := registry.WatchConfig{ + // WatchType: "keyprefix", + // WatchPath: common.GetBasePrefix() + subscription.Prefix, + // Handler: func(ctx context.Context, pairs []registry.Pair) { + // logger.Ctx(ctx).Infow("subscriptions watch handler data", "pairs", pairs) + // sm.subWatchData <- &struct{}{} + // }, + // } + + // logger.Ctx(ctx).Infof("setting watch on subscriptions") + // subWatcher, err = sm.registry.Watch(ctx, &swh) + // if err != nil { + // } + + return mgr, nil +} + +func (s *Manager) Run() { + + for { + select { + case <-s.ctx.Done(): + for _, con := range s.consumers { + con.stop() + } + return + case cleanupMessage := <-s.cleanupCh: + logger.Ctx(s.ctx).Infow("manager: got request to cleanup subscriber", "cleanupMessage", cleanupMessage) + s.mutex.Lock() + //Implement cleanup here + s.mutex.Unlock() + } + } +} + +func (m *Manager) GetConsumer(ctx context.Context, sub string, partition int) (*Consumer, error) { + computedHash := m.subscriptionCore.FetchSubscriptionHash(ctx, sub, partition) + consumer := m.consumers[computedHash] + if consumer == nil { + return nil, errors.Errorf("lifecyclemanager: No active consumer found") + } + return consumer, nil +} + +func (m *Manager) CloseConsumer(ctx context.Context, computedhash int) error { + con := m.consumers[computedhash] + if con != nil { + m.mutex.Lock() + con.stop() + delete(m.consumers, computedhash) + m.mutex.Unlock() + } + + return nil +} + +// cleanupMessage ... +type cleanupMessage struct { + subscriberID string + subscription string +} diff --git a/internal/consumer/request.go b/internal/consumer/request.go new file mode 100644 index 00000000..2f20aec5 --- /dev/null +++ b/internal/consumer/request.go @@ -0,0 +1,101 @@ +package consumer + +import ( + "github.com/razorpay/metro/internal/subscriber" + metrov1 "github.com/razorpay/metro/rpc/proto/v1" +) + +type ParsedFetchRequest struct { + Subscription string + MessageCount int + Partition int +} + +type ParsedAcknowledgeRequest struct { + Subscription string + AckIDs []string + AckMessages []*subscriber.AckMessage +} + +type ParsedModifyAckDeadlineRequest struct { + Subscription string + AckIDs []string + AckMessages []*subscriber.AckMessage + ModifyDeadlineMsgIdsWithSecs map[string]int32 +} + +// +func NewParsedFetchRequest(req *metrov1.FetchRequest) (*ParsedFetchRequest, error) { + parsedReq := &ParsedFetchRequest{} + parsedReq.Subscription = req.Subscription + parsedReq.MessageCount = int(req.MaxMessages) + parsedReq.Partition = int(req.Partition) + + return parsedReq, nil +} + +// NewParsedAcknowledgeRequest ... +func NewParsedAcknowledgeRequest(req *metrov1.AcknowledgeRequest) (*ParsedAcknowledgeRequest, error) { + parsedReq := &ParsedAcknowledgeRequest{} + + // TODO : add validations and throw error + parsedReq.Subscription = req.Subscription + if req.AckIds != nil && len(req.AckIds) > 0 { + ackMessages := make([]*subscriber.AckMessage, 0) + parsedReq.AckIDs = req.AckIds + for _, ackID := range req.AckIds { + ackMessage, err := subscriber.ParseAckID(ackID) + if err != nil { + return nil, err + } + ackMessages = append(ackMessages, ackMessage) + } + parsedReq.AckIDs = req.AckIds + parsedReq.AckMessages = ackMessages + } + + return parsedReq, nil +} + +// NewParsedModifyAckDeadlineRequest ... +func NewParsedModifyAckDeadlineRequest(req *metrov1.ModifyAckDeadlineRequest) (*ParsedModifyAckDeadlineRequest, error) { + parsedReq := &ParsedModifyAckDeadlineRequest{} + + // TODO : add validations and throw error + parsedReq.Subscription = req.Subscription + if req.AckIds != nil && len(req.AckIds) > 0 { + ackMessages := make([]*subscriber.AckMessage, 0) + modifyDeadlineMsgIdsWithSecs := make(map[string]int32) + parsedReq.AckIDs = req.AckIds + for _, ackID := range req.AckIds { + ackMessage, err := subscriber.ParseAckID(ackID) + if err != nil { + return nil, err + } + ackMessage.Deadline = req.AckDeadlineSeconds + ackMessages = append(ackMessages, ackMessage) + modifyDeadlineMsgIdsWithSecs[ackMessage.MessageID] = req.AckDeadlineSeconds + } + parsedReq.AckIDs = req.AckIds + parsedReq.AckMessages = ackMessages + parsedReq.ModifyDeadlineMsgIdsWithSecs = modifyDeadlineMsgIdsWithSecs + } + + return parsedReq, nil +} + +// requestType specifies the type of proxy request, ack or modack +type requestType int + +const ( + ack requestType = iota + modAck +) + +type proxyRequest struct { + addr string + grpcServerAddr string + ackMsgs []*subscriber.AckMessage + parsedReq *metrov1.PullRequest + requestType requestType +} diff --git a/internal/hash/hash.go b/internal/hash/hash.go new file mode 100644 index 00000000..5617cf97 --- /dev/null +++ b/internal/hash/hash.go @@ -0,0 +1,11 @@ +package hash + +import ( + "hash/fnv" +) + +func ComputeHash(arr []byte) int { + h := fnv.New32a() + h.Write(arr) + return int(h.Sum32()) +} diff --git a/internal/subscriber/core.go b/internal/subscriber/core.go index c4892530..035552d3 100644 --- a/internal/subscriber/core.go +++ b/internal/subscriber/core.go @@ -17,6 +17,8 @@ import ( type ICore interface { NewSubscriber(ctx context.Context, id string, subscription *subscription.Model, timeoutInMs int, maxOutstandingMessages int64, maxOutstandingBytes int64, requestCh chan *PullRequest, ackCh chan *AckMessage, modAckCh chan *ModAckMessage) (ISubscriber, error) + NewOpinionatedSubscriber(ctx context.Context, id string, subscription *subscription.Model, partiton int, computedHash int, timeoutInMs int, maxOutstandingMessages int64, maxOutstandingBytes int64, + requestCh chan *PullRequest, ackCh chan *AckMessage, modAckCh chan *ModAckMessage) (ISubscriber, error) } // Core implements ICore @@ -44,7 +46,16 @@ func (c *Core) NewSubscriber(ctx context.Context, groupID := subscription.Name - consumer, err := c.bs.GetConsumer(ctx, messagebroker.ConsumerClientOptions{Topics: []string{subscription.Topic, subscription.GetRetryTopic()}, GroupID: groupID, GroupInstanceID: subscriberID}) + consumer, err := c.bs.GetConsumer(ctx, messagebroker.ConsumerClientOptions{Topics: []messagebroker.TopicPartition{ + { + Topic: subscription.Topic, + Partition: 0, + }, + { + Topic: subscription.GetRetryTopic(), + Partition: 0, + }, + }, GroupID: groupID, GroupInstanceID: subscriberID}) if err != nil { logger.Ctx(ctx).Errorw("subscriber: failed to create consumer", "error", err.Error()) return nil, err @@ -98,3 +109,85 @@ func (c *Core) NewSubscriber(ctx context.Context, go s.Run(subsCtx) return s, nil } + +func (c *Core) NewOpinionatedSubscriber(ctx context.Context, + subscriberID string, + subscription *subscription.Model, + partition int, + computedHash int, + timeoutInMs int, + maxOutstandingMessages int64, + maxOutstandingBytes int64, + requestCh chan *PullRequest, + ackCh chan *AckMessage, + modAckCh chan *ModAckMessage) (ISubscriber, error) { + + groupID := subscription.Name + + consumer, err := c.bs.GetConsumer(ctx, messagebroker.ConsumerClientOptions{Topics: []messagebroker.TopicPartition{ + { + Topic: subscription.Topic, + Partition: partition, + }, + { + Topic: subscription.GetRetryTopic(), + Partition: partition, + }, + }, GroupID: groupID, GroupInstanceID: subscriberID}) + if err != nil { + logger.Ctx(ctx).Errorw("subscriber: failed to create consumer", "error", err.Error()) + return nil, err + } + + subsCtx, cancelFunc := context.WithCancel(ctx) + // using the subscriber ctx for retrier as well. This way when the ctx() for subscribers is done, + // all the delay-consumers spawned within retrier would also get marked as done. + var retrier retry.IRetrier + if subscription.RetryPolicy != nil && subscription.DeadLetterPolicy != nil { + + retrier = retry.NewRetrierBuilder(). + WithSubscription(subscription). + WithBrokerStore(c.bs). + WithBackoff(retry.NewExponentialWindowBackoff()). + WithIntervalFinder(retry.NewClosestIntervalWithCeil()). + WithMessageHandler(retry.NewPushToPrimaryRetryTopicHandler(c.bs)). + WithSubscriberID(subscriberID). + Build() + + err = retrier.Start(subsCtx) + if err != nil { + return nil, err + } + } + + s := &OpinionatedSubscriber{ + Subscriber{ + subscription: subscription, + topic: subscription.Topic, + subscriberID: subscriberID, + subscriptionCore: c.subscriptionCore, + offsetCore: c.offsetCore, + requestChan: requestCh, + responseChan: make(chan *metrov1.PullResponse), + errChan: make(chan error, 1000), + closeChan: make(chan struct{}), + ackChan: ackCh, + modAckChan: modAckCh, + deadlineTicker: time.NewTicker(deadlineTickerInterval), + timeoutInMs: timeoutInMs, + consumer: consumer, + cancelFunc: cancelFunc, + maxOutstandingMessages: maxOutstandingMessages, + maxOutstandingBytes: maxOutstandingBytes, + consumedMessageStats: make(map[TopicPartition]*ConsumptionMetadata), + ctx: subsCtx, + bs: c.bs, + retrier: retrier, + }, + partition, + computedHash, + } + + go s.Run(subsCtx) + return s, nil +} diff --git a/internal/subscriber/model.go b/internal/subscriber/model.go index c50ce4fc..b89bc65c 100644 --- a/internal/subscriber/model.go +++ b/internal/subscriber/model.go @@ -15,12 +15,14 @@ import ( "github.com/razorpay/metro/internal/subscriber/customheap" "github.com/razorpay/metro/pkg/messagebroker" "github.com/razorpay/metro/pkg/utils" + metrov1 "github.com/razorpay/metro/rpc/proto/v1" ) // PullRequest ... type PullRequest struct { ctx context.Context MaxNumOfMessages int32 + RespChan chan *metrov1.PullResponse } // WithContext can be used to set the current context to the request diff --git a/internal/subscriber/retry/delayconsumer.go b/internal/subscriber/retry/delayconsumer.go index d6f685a9..6dd7a22d 100644 --- a/internal/subscriber/retry/delayconsumer.go +++ b/internal/subscriber/retry/delayconsumer.go @@ -15,6 +15,7 @@ type DelayConsumer struct { ctx context.Context cancelFunc func() doneCh chan struct{} + cleanupCh chan struct{} subs *subscription.Model topic string isPaused bool @@ -32,7 +33,11 @@ func NewDelayConsumer(ctx context.Context, subscriberID string, topic string, su delayCtx, cancel := context.WithCancel(ctx) // only delay-consumer will consume from a subscription specific delay-topic, so can use the same groupID and groupInstanceID consumerOps := messagebroker.ConsumerClientOptions{ - Topics: []string{topic}, + Topics: []messagebroker.TopicPartition{ + { + Topic: topic, + }, + }, GroupID: subs.GetDelayConsumerGroupID(topic), GroupInstanceID: subs.GetDelayConsumerGroupInstanceID(subscriberID, topic), } @@ -71,7 +76,7 @@ func (dc *DelayConsumer) Run(ctx context.Context) { dc.bs.RemoveConsumer(ctx, messagebroker.ConsumerClientOptions{GroupID: dc.subs.GetDelayConsumerGroupID(dc.topic), GroupInstanceID: dc.subs.GetDelayConsumerGroupInstanceID(dc.subscriberID, dc.topic)}) dc.consumer.Close(dc.ctx) return - default: + case <-dc.doneCh: resp, err := dc.consumer.ReceiveMessages(dc.ctx, messagebroker.GetMessagesFromTopicRequest{NumOfMessages: 10, TimeoutMs: int(defaultBrokerOperationsTimeoutMs)}) if err != nil { if !messagebroker.IsErrorRecoverable(err) { @@ -91,6 +96,27 @@ func (dc *DelayConsumer) Run(ctx context.Context) { } } +// Run spawns the delay-consumer +func (dc *DelayConsumer) SetupWatch(ctx context.Context) { + defer close(dc.doneCh) + + logger.Ctx(ctx).Infow("delay-consumer: running", dc.LogFields()...) + for { + select { + case <-dc.ctx.Done(): + logger.Ctx(dc.ctx).Infow("delay-consumer: stopping <-ctx.Done() called", dc.LogFields()...) + dc.bs.RemoveConsumer(ctx, messagebroker.ConsumerClientOptions{GroupID: dc.subs.GetDelayConsumerGroupID(dc.topic), GroupInstanceID: dc.subs.GetDelayConsumerGroupInstanceID(dc.subscriberID, dc.topic)}) + dc.consumer.Close(dc.ctx) + return + case <-dc.cleanupCh: + logger.Ctx(dc.ctx).Infow("delay-consumer: cleanup task on subscription", dc.LogFields()...) + dc.bs.RemoveConsumer(ctx, messagebroker.ConsumerClientOptions{GroupID: dc.subs.GetDelayConsumerGroupID(dc.topic), GroupInstanceID: dc.subs.GetDelayConsumerGroupInstanceID(dc.subscriberID, dc.topic)}) + dc.consumer.Close(dc.ctx) + return + } + } +} + // resumes a paused delay-consumer. Additionally process any previously cached messages in buffer. func (dc *DelayConsumer) resume() { logger.Ctx(dc.ctx).Infow("delay-consumer: resuming", dc.LogFields("error", nil)...) diff --git a/internal/subscriber/subscriber.go b/internal/subscriber/subscriber.go index ce8aad10..4f0b2d14 100644 --- a/internal/subscriber/subscriber.go +++ b/internal/subscriber/subscriber.go @@ -44,6 +44,7 @@ type Subscriber struct { subscription *subscription.Model topic string subscriberID string + partition int subscriptionCore subscription.ICore offsetCore offset.ICore requestChan chan *PullRequest @@ -65,6 +66,12 @@ type Subscriber struct { retrier retry.IRetrier } +type OpinionatedSubscriber struct { + Subscriber + partition int + computedHash int +} + // canConsumeMore looks at sum of all consumed messages in all the active topic partitions and checks threshold func (s *Subscriber) canConsumeMore() bool { totalConsumedMsgsForTopic := 0 @@ -513,7 +520,11 @@ func (s *Subscriber) pull(req *PullRequest) { // Write empty data on the response channel in case of error, this is needed because sender blocks // on the response channel in a goroutine after sending request, error channel is not read until // response channel blocking call returns - s.responseChan <- &metrov1.PullResponse{ReceivedMessages: sm} + if req.RespChan != nil { + req.RespChan <- &metrov1.PullResponse{ReceivedMessages: sm} + } else { + s.responseChan <- &metrov1.PullResponse{ReceivedMessages: sm} + } // send error details via error channel s.errChan <- err @@ -548,7 +559,7 @@ func (s *Subscriber) pull(req *PullRequest) { s.consumedMessageStats[tp] = NewConsumptionMetadata() // query and set the max committed offset for each topic partition - resp, err := s.consumer.GetTopicMetadata(ctx, messagebroker.GetTopicMetadataRequest{ + resp, err := s.consumer.GetTopicPartitionMetadata(ctx, messagebroker.GetTopicPartitionMetadataRequest{ Topic: s.topic, Partition: msg.Partition, }) @@ -581,7 +592,12 @@ func (s *Subscriber) pull(req *PullRequest) { if len(sm) > 0 { s.logInMemoryStats(ctx) } - s.responseChan <- &metrov1.PullResponse{ReceivedMessages: sm} + if req.RespChan != nil { + req.RespChan <- &metrov1.PullResponse{ReceivedMessages: sm} + } else { + s.responseChan <- &metrov1.PullResponse{ReceivedMessages: sm} + } + }() } diff --git a/internal/subscription/core.go b/internal/subscription/core.go index ff12cf8e..25c8e04e 100644 --- a/internal/subscription/core.go +++ b/internal/subscription/core.go @@ -2,11 +2,13 @@ package subscription import ( "context" + "strconv" "time" metrov1 "github.com/razorpay/metro/rpc/proto/v1" "github.com/razorpay/metro/internal/common" + "github.com/razorpay/metro/internal/hash" "github.com/razorpay/metro/internal/merror" "github.com/razorpay/metro/internal/project" "github.com/razorpay/metro/internal/topic" @@ -25,18 +27,26 @@ type ICore interface { List(ctx context.Context, prefix string) ([]*Model, error) Get(ctx context.Context, key string) (*Model, error) Migrate(ctx context.Context, names []string) error + FetchPartitionsForHash(ctx context.Context, m *Model, node int) ([]int, error) + FetchSubscriptionHash(ctx context.Context, subName string, partition int) int + GetPartition(ctx context.Context, m *Model) int } +const ( + subscriptionHashFormat = "%s-%s-" +) + // Core implements all business logic for a subscription type Core struct { + TotalNodes int repo IRepo projectCore project.ICore topicCore topic.ICore } // NewCore returns an instance of Core -func NewCore(repo IRepo, projectCore project.ICore, topicCore topic.ICore) ICore { - return &Core{repo, projectCore, topicCore} +func NewCore(nodes int, repo IRepo, projectCore project.ICore, topicCore topic.ICore) ICore { + return &Core{nodes, repo, projectCore, topicCore} } // CreateSubscription creates a subscription for a given topic @@ -329,6 +339,36 @@ func createDelayTopics(ctx context.Context, m *Model, topicCore topic.ICore) err return nil } +// FetchPartitionsForHash ... +func (c *Core) FetchPartitionsForHash(ctx context.Context, sub *Model, node int) ([]int, error) { + matchingPartitions := make([]int, 0) + topicModel, err := c.topicCore.Get(ctx, sub.GetTopic()) + if err != nil { + logger.Ctx(ctx).Errorw("fetchpartitionsforhash: failed to fetch topic for subscription", "subscription", sub.Name, "topic", sub.GetTopic()) + return matchingPartitions, err + } + + for i := 0; i < topicModel.NumPartitions; i++ { + computedHash := c.FetchSubscriptionHash(ctx, sub.Name, i) + partitionNode := computedHash % c.TotalNodes + if partitionNode == node { + matchingPartitions = append(matchingPartitions, partitionNode) + } + } + + return matchingPartitions, nil +} + +// FetchSubscriptionHash ... +func (c *Core) FetchSubscriptionHash(ctx context.Context, sub string, partition int) int { + return hash.ComputeHash([]byte(sub + strconv.Itoa(partition))) +} + +// GetPartition ... +func (c *Core) GetPartition(ctx context.Context, m *Model) int { + return m.Partition +} + // Migrate takes care of backfilling subscription topics for existing subscriptions. // This is an idempotent operation that creates topics for each subscription. // Migrate can be modified in the future for other use-cases as well. diff --git a/internal/subscription/model.go b/internal/subscription/model.go index 4c0e3d7a..96aebcdc 100644 --- a/internal/subscription/model.go +++ b/internal/subscription/model.go @@ -19,6 +19,7 @@ type Model struct { Name string `json:"name,omitempty"` Topic string `json:"topic,omitempty"` PushConfig *PushConfig `json:"push_config,omitempty"` + Partition int `json:partition` AckDeadlineSeconds int32 `json:"ack_deadline_seconds,omitempty"` RetainAckedMessages bool `json:"retain_acked_messages,omitempty"` MessageRetentionDuration uint `json:"message_retention_duration,omitempty"` diff --git a/pkg/messagebroker/config.go b/pkg/messagebroker/config.go index a8d723a1..a11da3d3 100644 --- a/pkg/messagebroker/config.go +++ b/pkg/messagebroker/config.go @@ -45,10 +45,15 @@ type ConsumerConfig struct { // AdminConfig holds configuration for admin APIs type AdminConfig struct{} +type TopicPartition struct { + Topic string + Partition int +} + // ConsumerClientOptions holds client specific configuration for consumer type ConsumerClientOptions struct { // Specify a list of topics to consume messages from - Topics []string + Topics []TopicPartition // Specify the subscription name for this consumer. Only used for pulsar Subscription string // A unique string that identifies the consumer group this consumer belongs to. diff --git a/pkg/messagebroker/kafka.go b/pkg/messagebroker/kafka.go index ead7bcb5..0fe38052 100644 --- a/pkg/messagebroker/kafka.go +++ b/pkg/messagebroker/kafka.go @@ -41,7 +41,7 @@ func newKafkaConsumerClient(ctx context.Context, bConfig *BrokerConfig, options normalizedTopics := make([]string, 0) for _, topic := range options.Topics { - normalizedTopics = append(normalizedTopics, normalizeTopicName(topic)) + normalizedTopics = append(normalizedTopics, normalizeTopicName(topic.Topic)) } err := validateKafkaConsumerBrokerConfig(bConfig) @@ -303,8 +303,51 @@ func (k *KafkaBroker) DeleteTopic(ctx context.Context, request DeleteTopicReques }, nil } -// GetTopicMetadata fetches the given topics metadata stored in the broker -func (k *KafkaBroker) GetTopicMetadata(ctx context.Context, request GetTopicMetadataRequest) (GetTopicMetadataResponse, error) { +// ListTopics fetches all topics from the broker +func (k *KafkaBroker) ListTopics(ctx context.Context) (ListTopicsResponse, error) { + logger.Ctx(ctx).Infow("kafka: list topics request received") + defer func() { + logger.Ctx(ctx).Infow("kafka: list topics request completed") + }() + + span, ctx := opentracing.StartSpanFromContext(ctx, "Kafka.ListTopics") + defer span.Finish() + + messageBrokerOperationCount.WithLabelValues(env, Kafka, "ListTopics").Inc() + + startTime := time.Now() + defer func() { + messageBrokerOperationTimeTaken.WithLabelValues(env, Kafka, "ListTopics").Observe(time.Now().Sub(startTime).Seconds()) + }() + + // TODO : normalize timeouts + resp, err := k.Consumer.GetMetadata(nil, true, 5000) + if err != nil { + messageBrokerOperationError.WithLabelValues(env, Kafka, "ListTopics", err.Error()).Inc() + return ListTopicsResponse{}, err + } + + topics := make(map[string]TopicMetadata) + for _, topic := range resp.Topics { + tmd := TopicMetadata{ + Topic: topic.Topic, + Partitions: make([]PartitionMetadata, 0), + } + for _, part := range topic.Partitions { + pmd := PartitionMetadata{ + ID: int(part.ID), + Leader: int(part.Leader), + } + tmd.Partitions = append(tmd.Partitions, pmd) + } + topics[topic.Topic] = tmd + } + return ListTopicsResponse{topics}, nil + +} + +// GetTopicPartitionMetadata fetches the given topics metadata stored in the broker +func (k *KafkaBroker) GetTopicPartitionMetadata(ctx context.Context, request GetTopicPartitionMetadataRequest) (GetTopicPartitionMetadataResponse, error) { logger.Ctx(ctx).Infow("kafka: get metadata request received", "request", request) defer func() { logger.Ctx(ctx).Infow("kafka: get metadata request completed", "request", request) @@ -313,11 +356,11 @@ func (k *KafkaBroker) GetTopicMetadata(ctx context.Context, request GetTopicMeta span, ctx := opentracing.StartSpanFromContext(ctx, "Kafka.GetMetadata") defer span.Finish() - messageBrokerOperationCount.WithLabelValues(env, Kafka, "GetTopicMetadata").Inc() + messageBrokerOperationCount.WithLabelValues(env, Kafka, "GetTopicPartitionMetadata").Inc() startTime := time.Now() defer func() { - messageBrokerOperationTimeTaken.WithLabelValues(env, Kafka, "GetTopicMetadata").Observe(time.Now().Sub(startTime).Seconds()) + messageBrokerOperationTimeTaken.WithLabelValues(env, Kafka, "GetTopicPartitionMetadata").Observe(time.Now().Sub(startTime).Seconds()) }() topicN := normalizeTopicName(request.Topic) @@ -332,13 +375,13 @@ func (k *KafkaBroker) GetTopicMetadata(ctx context.Context, request GetTopicMeta // TODO : normalize timeouts resp, err := k.Consumer.Committed(tps, 5000) if err != nil { - messageBrokerOperationError.WithLabelValues(env, Kafka, "GetTopicMetadata", err.Error()).Inc() - return GetTopicMetadataResponse{}, err + messageBrokerOperationError.WithLabelValues(env, Kafka, "GetTopicPartitionMetadata", err.Error()).Inc() + return GetTopicPartitionMetadataResponse{}, err } tpStats := resp[0] offset, _ := strconv.ParseInt(tpStats.Offset.String(), 10, 0) - return GetTopicMetadataResponse{ + return GetTopicPartitionMetadataResponse{ Topic: request.Topic, Partition: request.Partition, Offset: int32(offset), diff --git a/pkg/messagebroker/messagebroker.go b/pkg/messagebroker/messagebroker.go index 4c63b6ff..ca8308d6 100644 --- a/pkg/messagebroker/messagebroker.go +++ b/pkg/messagebroker/messagebroker.go @@ -51,8 +51,11 @@ type Consumer interface { // CommitByMsgID Commits a message by ID CommitByMsgID(context.Context, CommitOnTopicRequest) (CommitOnTopicResponse, error) - // GetTopicMetadata gets the topic metadata - GetTopicMetadata(context.Context, GetTopicMetadataRequest) (GetTopicMetadataResponse, error) + // GetTopicPartitionMetadata gets the topic metadata + GetTopicPartitionMetadata(context.Context, GetTopicPartitionMetadataRequest) (GetTopicPartitionMetadataResponse, error) + + // ListTopics fetches metadata for all topics + ListTopics(context.Context) (ListTopicsResponse, error) // Pause pause the consumer Pause(context.Context, PauseOnTopicRequest) error diff --git a/pkg/messagebroker/models.go b/pkg/messagebroker/models.go index c2fbd2f7..f69aacf0 100644 --- a/pkg/messagebroker/models.go +++ b/pkg/messagebroker/models.go @@ -101,8 +101,8 @@ type CommitOnTopicRequest struct { ID string } -// GetTopicMetadataRequest ... -type GetTopicMetadataRequest struct { +// GetTopicPartitionMetadataRequest ... +type GetTopicPartitionMetadataRequest struct { Topic string Partition int32 } @@ -117,6 +117,23 @@ type AddTopicPartitionResponse struct { Response interface{} } +// PartitionMetadata holds information about a given topic +type PartitionMetadata struct { + ID int + Leader int +} + +// TopicMetadata holds information about a given topic +type TopicMetadata struct { + Topic string + Partitions []PartitionMetadata +} + +// ListTopicsResponse retuns metadata for all topics in the broker +type ListTopicsResponse struct { + topics map[string]TopicMetadata +} + // DeleteTopicResponse ... type DeleteTopicResponse struct { Response interface{} @@ -182,8 +199,8 @@ type CommitOnTopicResponse struct { Response interface{} } -// GetTopicMetadataResponse ... -type GetTopicMetadataResponse struct { +// GetTopicPartitionMetadataResponse ... +type GetTopicPartitionMetadataResponse struct { Topic string Partition int32 Offset int32 diff --git a/pkg/messagebroker/pulsar.go b/pkg/messagebroker/pulsar.go index 110dd15c..dea345bf 100644 --- a/pkg/messagebroker/pulsar.go +++ b/pkg/messagebroker/pulsar.go @@ -50,8 +50,13 @@ func newPulsarConsumerClient(_ context.Context, bConfig *BrokerConfig, options * return nil, err } + topics := make([]string, 0) + + for _, tp := range options.Topics { + topics = append(topics, tp.Topic) + } c, err := client.Subscribe(pulsar.ConsumerOptions{ - Topics: options.Topics, + Topics: topics, SubscriptionName: options.Subscription, Type: pulsar.SubscriptionType(bConfig.Consumer.SubscriptionType), Name: options.GroupInstanceID, @@ -253,26 +258,32 @@ func (p *PulsarBroker) CommitByMsgID(ctx context.Context, request CommitOnTopicR return CommitOnTopicResponse{}, nil } -// GetTopicMetadata ... -func (p *PulsarBroker) GetTopicMetadata(ctx context.Context, request GetTopicMetadataRequest) (GetTopicMetadataResponse, error) { - messageBrokerOperationCount.WithLabelValues(env, Pulsar, "GetTopicMetadata").Inc() +// ListTopics ... +func (p *PulsarBroker) ListTopics(ctx context.Context) (ListTopicsResponse, error) { + // TODO: Implement ListTopics + return ListTopicsResponse{}, nil +} + +// GetTopicPartitionMetadata ... +func (p *PulsarBroker) GetTopicPartitionMetadata(ctx context.Context, request GetTopicPartitionMetadataRequest) (GetTopicPartitionMetadataResponse, error) { + messageBrokerOperationCount.WithLabelValues(env, Pulsar, "GetTopicPartitionMetadata").Inc() startTime := time.Now() defer func() { - messageBrokerOperationTimeTaken.WithLabelValues(env, Pulsar, "GetTopicMetadata").Observe(time.Now().Sub(startTime).Seconds()) + messageBrokerOperationTimeTaken.WithLabelValues(env, Pulsar, "GetTopicPartitionMetadata").Observe(time.Now().Sub(startTime).Seconds()) }() pulsarTopic, terr := utils.GetTopicName(request.Topic) if terr != nil { - return GetTopicMetadataResponse{}, terr + return GetTopicPartitionMetadataResponse{}, terr } stats, err := p.Admin.Topics().GetInternalStats(*pulsarTopic) if err != nil { - return GetTopicMetadataResponse{}, err + return GetTopicPartitionMetadataResponse{}, err } - return GetTopicMetadataResponse{ + return GetTopicPartitionMetadataResponse{ Topic: request.Topic, Offset: int32(stats.Cursors[request.Topic].MessagesConsumedCounter), }, nil diff --git a/service/consume-plane/config.go b/service/consume-plane/config.go new file mode 100644 index 00000000..0938b747 --- /dev/null +++ b/service/consume-plane/config.go @@ -0,0 +1,28 @@ +package consume_plane + +import ( + "github.com/razorpay/metro/pkg/messagebroker" +) + +// Config for consumer +type Config struct { + Broker Broker + ReplicaCount int + OrdinalId int + Interfaces struct { + API NetworkInterfaces + } +} + +// Broker Config (Kafka/Pulsar) +type Broker struct { + Variant string // kafka or pulsar + BrokerConfig messagebroker.BrokerConfig +} + +// NetworkInterfaces contains all exposed interfaces +type NetworkInterfaces struct { + GrpcServerAddress string + HTTPServerAddress string + InternalHTTPServerAddress string +} diff --git a/service/consume-plane/consumeplaneserver.go b/service/consume-plane/consumeplaneserver.go new file mode 100644 index 00000000..36c9bbe0 --- /dev/null +++ b/service/consume-plane/consumeplaneserver.go @@ -0,0 +1,122 @@ +package consume_plane + +import ( + "context" + + "github.com/opentracing/opentracing-go" + "github.com/razorpay/metro/internal/brokerstore" + "github.com/razorpay/metro/internal/consumer" + "github.com/razorpay/metro/internal/merror" + "github.com/razorpay/metro/internal/subscriber" + "github.com/razorpay/metro/internal/subscription" + "github.com/razorpay/metro/pkg/logger" + metrov1 "github.com/razorpay/metro/rpc/proto/v1" + "github.com/razorpay/metro/service/web/stream" + "google.golang.org/protobuf/types/known/emptypb" +) + +type consumeplaneserver struct { + brokerStore brokerstore.IBrokerStore + subscriptionCore subscription.ICore + subscriberCore subscriber.ICore + manager consumer.ILifecycle +} + +func newConsumePlaneServer(brokerStore brokerstore.IBrokerStore, subscriptionCore subscription.ICore, subscriberCore subscriber.ICore, mgr consumer.ILifecycle) *consumeplaneserver { + + return &consumeplaneserver{ + brokerStore, + subscriptionCore, + subscriberCore, + mgr, + } +} + +// Acknowledge a message +func (s consumeplaneserver) Acknowledge(ctx context.Context, req *metrov1.AcknowledgeRequest) (*emptypb.Empty, error) { + logger.Ctx(ctx).Infow("consumeplaneserver: received request to ack messages", "ack_req", req.String()) + + span, ctx := opentracing.StartSpanFromContext(ctx, "ConsumePlaneServer.Acknowledge", opentracing.Tags{ + "subscription": req.Subscription, + "ack_ids": req.AckIds, + }) + defer span.Finish() + + parsedReq, parseErr := stream.NewParsedAcknowledgeRequest(req) + if parseErr != nil { + logger.Ctx(ctx).Errorw("consumeplaneserver: error is parsing ack request", "request", req, "error", parseErr.Error()) + return nil, merror.ToGRPCError(parseErr) + } + partitionAckMsgs := make(map[int][]*subscriber.AckMessage, 0) + for _, ack := range parsedReq.AckMessages { + partitionAckMsgs[int(ack.Partition)] = append(partitionAckMsgs[int(ack.Partition)], ack) + } + + for partition, ackMsgs := range partitionAckMsgs { + consumer, err := s.manager.GetConsumer(ctx, parsedReq.Subscription, partition) + if err != nil { + logger.Ctx(ctx).Errorw("consumeplaneserver: error is fetching consumer for fetch request", "request", req, "error", parseErr.Error()) + return nil, err + } + consumer.Acknowledge(ctx, ackMsgs) + + } + + return new(emptypb.Empty), nil +} + +// Pull messages +func (c consumeplaneserver) Fetch(ctx context.Context, req *metrov1.FetchRequest) (*metrov1.PullResponse, error) { + logger.Ctx(ctx).Infow("consumeplaneserver: received request to pull messages", "pull_req", req.String()) + span, ctx := opentracing.StartSpanFromContext(ctx, "ConsumePlaneServer.Pull", opentracing.Tags{ + "subscription": req.Subscription, + }) + + parsedReq, parseErr := consumer.NewParsedFetchRequest(req) + if parseErr != nil { + logger.Ctx(ctx).Errorw("subscriberserver: error is parsing pull request", "request", req, "error", parseErr.Error()) + return nil, parseErr + } + defer span.Finish() + consumer, err := c.manager.GetConsumer(ctx, parsedReq.Subscription, parsedReq.Partition) + if err != nil { + logger.Ctx(ctx).Errorw("subscriberserver: error is fetching consumer for fetch request", "request", req, "error", parseErr.Error()) + return nil, err + } + res, err := consumer.Fetch(ctx, int(req.MaxMessages)) + if err != nil { + return &metrov1.PullResponse{}, err + } + return res, nil +} + +func (s consumeplaneserver) ModifyAckDeadline(ctx context.Context, req *metrov1.ModifyAckDeadlineRequest) (*emptypb.Empty, error) { + logger.Ctx(ctx).Infow("consumeplaneserver: received request to modack messages", "mod_ack_req", req.String()) + + span, ctx := opentracing.StartSpanFromContext(ctx, "ConsumePlaneServer.ModifyAckDeadline", opentracing.Tags{ + "subscription": req.Subscription, + "ack_ids": req.AckIds, + }) + defer span.Finish() + + parsedReq, parseErr := consumer.NewParsedModifyAckDeadlineRequest(req) + if parseErr != nil { + logger.Ctx(ctx).Errorw("consumeplaneserver: error is parsing modack request", "request", req, "error", parseErr.Error()) + return nil, merror.ToGRPCError(parseErr) + } + partitionAckMsgs := make(map[int][]*subscriber.AckMessage, 0) + for _, ack := range parsedReq.AckMessages { + partitionAckMsgs[int(ack.Partition)] = append(partitionAckMsgs[int(ack.Partition)], ack) + } + + for partition, ackMsgs := range partitionAckMsgs { + consumer, err := s.manager.GetConsumer(ctx, parsedReq.Subscription, partition) + if err != nil { + logger.Ctx(ctx).Errorw("consumeplaneserver: error is fetching consumer for fetch request", "request", req, "error", parseErr.Error()) + return nil, err + } + consumer.ModifyAckDeadline(ctx, ackMsgs) + + } + return new(emptypb.Empty), nil +} diff --git a/service/consume-plane/server.go b/service/consume-plane/server.go new file mode 100644 index 00000000..30970c7b --- /dev/null +++ b/service/consume-plane/server.go @@ -0,0 +1,37 @@ +package consume_plane + +import ( + "context" + "regexp" + + "github.com/razorpay/metro/internal/merror" + "github.com/razorpay/metro/pkg/logger" + metrov1 "github.com/razorpay/metro/rpc/proto/v1" +) + +var projectIDRegex *regexp.Regexp + +var ( + unknownResourceError = merror.New(merror.InvalidArgument, "unknown resource type") + invalidResourceNameError = merror.New(merror.InvalidArgument, "resource name is invalid") +) + +func init() { + // Regex to capture project id from resource names + // Resources are in the form /projects/// + // projectIDRegex = regexp.MustCompile(`^projects\/([^\/]+)\/.*`) +} + +func getResourceNameFromRequest(ctx context.Context, req interface{}) (string, error) { + switch t := req.(type) { + case *metrov1.AcknowledgeRequest: + return req.(*metrov1.AcknowledgeRequest).Subscription, nil + case *metrov1.PullRequest: + return req.(*metrov1.PullRequest).Subscription, nil + case *metrov1.ModifyAckDeadlineRequest: + return req.(*metrov1.ModifyAckDeadlineRequest).Subscription, nil + default: + logger.Ctx(ctx).Infof("unknown request type: %v", t) + return "", unknownResourceError + } +} diff --git a/service/consume-plane/service.go b/service/consume-plane/service.go new file mode 100644 index 00000000..e3580a1b --- /dev/null +++ b/service/consume-plane/service.go @@ -0,0 +1,122 @@ +package consume_plane + +import ( + "context" + + "github.com/grpc-ecosystem/grpc-gateway/v2/runtime" + "golang.org/x/sync/errgroup" + "google.golang.org/grpc" + + "github.com/razorpay/metro/internal/brokerstore" + "github.com/razorpay/metro/internal/consumer" + "github.com/razorpay/metro/internal/health" + "github.com/razorpay/metro/internal/offset" + "github.com/razorpay/metro/internal/project" + "github.com/razorpay/metro/internal/server" + "github.com/razorpay/metro/internal/subscriber" + "github.com/razorpay/metro/internal/subscription" + "github.com/razorpay/metro/internal/topic" + "github.com/razorpay/metro/pkg/logger" + "github.com/razorpay/metro/pkg/messagebroker" + "github.com/razorpay/metro/pkg/registry" + metrov1 "github.com/razorpay/metro/rpc/proto/v1" + _ "github.com/razorpay/metro/statik" // to serve openAPI static assets +) + +// Service for producer +type Service struct { + consumeConfig *Config + registryConfig *registry.Config +} + +// NewService creates an instance of new producer service +func NewService(consumeConfig *Config, registryConfig *registry.Config) (*Service, error) { + return &Service{ + consumeConfig: consumeConfig, + registryConfig: registryConfig, + }, nil +} + +// Start the service +func (svc *Service) Start(ctx context.Context) error { + // Define server handlers + r, err := registry.NewRegistry(svc.registryConfig) + if err != nil { + return err + } + + brokerStore, err := brokerstore.NewBrokerStore(svc.consumeConfig.Broker.Variant, &svc.consumeConfig.Broker.BrokerConfig) + if err != nil { + return err + } + + admin, _ := brokerStore.GetAdmin(ctx, messagebroker.AdminClientOptions{}) + // init broker health checker + brokerHealthChecker := health.NewBrokerHealthChecker(admin) + + // register broker and registry health checkers on the health server + healthCore, err := health.NewCore(brokerHealthChecker) + if err != nil { + return err + } + + projectCore := project.NewCore(project.NewRepo(r)) + + topicCore := topic.NewCore(topic.NewRepo(r), projectCore, brokerStore) + + subscriptionCore := subscription.NewCore(svc.consumeConfig.ReplicaCount, subscription.NewRepo(r), projectCore, topicCore) + + subscriberCore := subscriber.NewCore(brokerStore, subscriptionCore, offset.NewCore(offset.NewRepo(r))) + + mgr, err := consumer.NewLifecycleManager(ctx, svc.consumeConfig.ReplicaCount, svc.consumeConfig.OrdinalId, subscriptionCore, subscriberCore, brokerStore) + if err != nil { + logger.Ctx(ctx).Errorw("consumeplaneserver: error setting up lifecycle manager", "error", err.Error()) + } + // initiates a error group + grp, gctx := errgroup.WithContext(ctx) + + grp.Go(func() error { + err := server.RunGRPCServer( + gctx, + svc.consumeConfig.Interfaces.API.GrpcServerAddress, + func(server *grpc.Server) error { + metrov1.RegisterStatusCheckAPIServer(server, health.NewServer(healthCore)) + metrov1.RegisterConsumePlaneServer(server, newConsumePlaneServer(brokerStore, subscriptionCore, subscriberCore, mgr)) + return nil + }, + ) + + return err + }) + + grp.Go(func() error { + err := server.RunHTTPServer( + gctx, + svc.consumeConfig.Interfaces.API.HTTPServerAddress, + func(mux *runtime.ServeMux) error { + err := metrov1.RegisterStatusCheckAPIHandlerFromEndpoint(gctx, mux, svc.consumeConfig.Interfaces.API.GrpcServerAddress, []grpc.DialOption{grpc.WithInsecure()}) + if err != nil { + return err + } + + err = metrov1.RegisterConsumePlaneHandlerFromEndpoint(gctx, mux, svc.consumeConfig.Interfaces.API.GrpcServerAddress, []grpc.DialOption{grpc.WithInsecure()}) + if err != nil { + return err + } + return nil + }) + + return err + }) + + grp.Go(func() error { + err := server.RunInternalHTTPServer(gctx, svc.consumeConfig.Interfaces.API.InternalHTTPServerAddress) + return err + }) + + err = grp.Wait() + if err != nil { + logger.Ctx(gctx).Errorf("web service error: %s", err.Error()) + } + return err +} diff --git a/service/web/service.go b/service/web/service.go index c893d5b5..6205f059 100644 --- a/service/web/service.go +++ b/service/web/service.go @@ -71,7 +71,7 @@ func (svc *Service) Start(ctx context.Context) error { topicCore := topic.NewCore(topic.NewRepo(r), projectCore, brokerStore) - subscriptionCore := subscription.NewCore(subscription.NewRepo(r), projectCore, topicCore) + subscriptionCore := subscription.NewCore(0, subscription.NewRepo(r), projectCore, topicCore) credentialsCore := credentials.NewCore(credentials.NewRepo(r), projectCore) diff --git a/service/worker/service.go b/service/worker/service.go index c2c9a33c..caf567d7 100644 --- a/service/worker/service.go +++ b/service/worker/service.go @@ -60,6 +60,7 @@ func NewService(workerConfig *Config, registryConfig *registry.Config) (*Service topicCore := topic.NewCore(topic.NewRepo(reg), projectCore, brokerStore) subscriptionCore := subscription.NewCore( + 1, // Workers do not fetch from consume plane. Hence init nodes as 1. subscription.NewRepo(reg), projectCore, topicCore) diff --git a/tests/integration/messagebroker/messagebroker_kafka_test.go b/tests/integration/messagebroker/messagebroker_kafka_test.go index 9b4aeb43..2c172eab 100644 --- a/tests/integration/messagebroker/messagebroker_kafka_test.go +++ b/tests/integration/messagebroker/messagebroker_kafka_test.go @@ -1,3 +1,4 @@ +//go:build integration // +build integration package messagebroker @@ -35,7 +36,7 @@ func Test_CreateValidTopic(t *testing.T) { Topics: []string{topic}, GroupID: "dummy-group-2", }) - metadata, merr := consumer1.GetTopicMetadata(context.Background(), messagebroker.GetTopicMetadataRequest{Topic: topic}) + metadata, merr := consumer1.GetTopicPartitionMetadata(context.Background(), messagebroker.GetTopicPartitionMetadataRequest{Topic: topic}) assert.Nil(t, merr) assert.NotNil(t, metadata) From 0e5ca1d9b5cd66e80d8070dd3a94c376d4282e18 Mon Sep 17 00:00:00 2001 From: vnktram Date: Tue, 14 Dec 2021 00:21:05 +0530 Subject: [PATCH 02/14] Introduce partition aware delay consumers --- config/default.toml | 4 +-- internal/subscriber/core.go | 4 ++- internal/subscriber/retry/builder.go | 7 +++++ internal/subscriber/retry/delayconsumer.go | 34 +++++++++++++-------- internal/subscriber/retry/retrier.go | 3 +- internal/subscriber/subscriber.go | 7 ++++- internal/subscription/delay.go | 4 +-- internal/subscription/model.go | 4 +-- pkg/messagebroker/kafka.go | 16 +++++++--- service/consume-plane/consumeplaneserver.go | 4 +-- service/consume-plane/service.go | 1 - 11 files changed, 59 insertions(+), 29 deletions(-) diff --git a/config/default.toml b/config/default.toml index 324ab246..068b4d97 100644 --- a/config/default.toml +++ b/config/default.toml @@ -41,8 +41,8 @@ [consumePlane.broker.brokerconfig.consumePlane] [consumePlane.interfaces] [consumePlane.interfaces.api] - GrpcServerAddress = "0.0.0.0:8085" - HttpServerAddress = "0.0.0.0:8086" + GrpcServerAddress = "0.0.0.0:8088" + HttpServerAddress = "0.0.0.0:8089" InternalHttpServerAddress = "0.0.0.0:9003" [worker] diff --git a/internal/subscriber/core.go b/internal/subscriber/core.go index 035552d3..8948dead 100644 --- a/internal/subscriber/core.go +++ b/internal/subscriber/core.go @@ -2,6 +2,7 @@ package subscriber import ( "context" + "strconv" "time" "github.com/razorpay/metro/internal/brokerstore" @@ -122,7 +123,7 @@ func (c *Core) NewOpinionatedSubscriber(ctx context.Context, ackCh chan *AckMessage, modAckCh chan *ModAckMessage) (ISubscriber, error) { - groupID := subscription.Name + groupID := subscription.Name + "-" + strconv.Itoa(partition) consumer, err := c.bs.GetConsumer(ctx, messagebroker.ConsumerClientOptions{Topics: []messagebroker.TopicPartition{ { @@ -152,6 +153,7 @@ func (c *Core) NewOpinionatedSubscriber(ctx context.Context, WithIntervalFinder(retry.NewClosestIntervalWithCeil()). WithMessageHandler(retry.NewPushToPrimaryRetryTopicHandler(c.bs)). WithSubscriberID(subscriberID). + WithPartition(partition). Build() err = retrier.Start(subsCtx) diff --git a/internal/subscriber/retry/builder.go b/internal/subscriber/retry/builder.go index 142f5b19..c83575a8 100644 --- a/internal/subscriber/retry/builder.go +++ b/internal/subscriber/retry/builder.go @@ -15,6 +15,7 @@ type Builder interface { WithSubscription(subs *subscription.Model) Builder WithMessageHandler(handler MessageHandler) Builder WithSubscriberID(subscriberID string) Builder + WithPartition(partition int) Builder Build() IRetrier } @@ -52,6 +53,12 @@ func (retrier *Retrier) WithSubscription(subs *subscription.Model) Builder { return retrier } +// WithPartition ... +func (retrier *Retrier) WithPartition(partition int) Builder { + retrier.partition = partition + return retrier +} + // WithMessageHandler ... func (retrier *Retrier) WithMessageHandler(handler MessageHandler) Builder { retrier.handler = handler diff --git a/internal/subscriber/retry/delayconsumer.go b/internal/subscriber/retry/delayconsumer.go index 6dd7a22d..3d15b4a1 100644 --- a/internal/subscriber/retry/delayconsumer.go +++ b/internal/subscriber/retry/delayconsumer.go @@ -18,6 +18,7 @@ type DelayConsumer struct { cleanupCh chan struct{} subs *subscription.Model topic string + partition int isPaused bool consumer messagebroker.Consumer bs brokerstore.IBrokerStore @@ -28,7 +29,7 @@ type DelayConsumer struct { } // NewDelayConsumer inits a new delay-consumer with the pre-defined message handler -func NewDelayConsumer(ctx context.Context, subscriberID string, topic string, subs *subscription.Model, bs brokerstore.IBrokerStore, handler MessageHandler) (*DelayConsumer, error) { +func NewDelayConsumer(ctx context.Context, subscriberID string, topic string, partition int, subs *subscription.Model, bs brokerstore.IBrokerStore, handler MessageHandler) (*DelayConsumer, error) { delayCtx, cancel := context.WithCancel(ctx) // only delay-consumer will consume from a subscription specific delay-topic, so can use the same groupID and groupInstanceID @@ -38,7 +39,7 @@ func NewDelayConsumer(ctx context.Context, subscriberID string, topic string, su Topic: topic, }, }, - GroupID: subs.GetDelayConsumerGroupID(topic), + GroupID: subs.GetDelayConsumerGroupID(topic, partition), GroupInstanceID: subs.GetDelayConsumerGroupInstanceID(subscriberID, topic), } consumer, err := bs.GetConsumer(ctx, consumerOps) @@ -55,6 +56,7 @@ func NewDelayConsumer(ctx context.Context, subscriberID string, topic string, su ctx: delayCtx, cancelFunc: cancel, topic: topic, + partition: partition, consumer: consumer, subs: subs, bs: bs, @@ -73,7 +75,7 @@ func (dc *DelayConsumer) Run(ctx context.Context) { select { case <-dc.ctx.Done(): logger.Ctx(dc.ctx).Infow("delay-consumer: stopping <-ctx.Done() called", dc.LogFields()...) - dc.bs.RemoveConsumer(ctx, messagebroker.ConsumerClientOptions{GroupID: dc.subs.GetDelayConsumerGroupID(dc.topic), GroupInstanceID: dc.subs.GetDelayConsumerGroupInstanceID(dc.subscriberID, dc.topic)}) + dc.bs.RemoveConsumer(ctx, messagebroker.ConsumerClientOptions{GroupID: dc.subs.GetDelayConsumerGroupID(dc.topic, dc.partition), GroupInstanceID: dc.subs.GetDelayConsumerGroupInstanceID(dc.subscriberID, dc.topic)}) dc.consumer.Close(dc.ctx) return case <-dc.doneCh: @@ -105,12 +107,12 @@ func (dc *DelayConsumer) SetupWatch(ctx context.Context) { select { case <-dc.ctx.Done(): logger.Ctx(dc.ctx).Infow("delay-consumer: stopping <-ctx.Done() called", dc.LogFields()...) - dc.bs.RemoveConsumer(ctx, messagebroker.ConsumerClientOptions{GroupID: dc.subs.GetDelayConsumerGroupID(dc.topic), GroupInstanceID: dc.subs.GetDelayConsumerGroupInstanceID(dc.subscriberID, dc.topic)}) + dc.bs.RemoveConsumer(ctx, messagebroker.ConsumerClientOptions{GroupID: dc.subs.GetDelayConsumerGroupID(dc.topic, dc.partition), GroupInstanceID: dc.subs.GetDelayConsumerGroupInstanceID(dc.subscriberID, dc.topic)}) dc.consumer.Close(dc.ctx) return case <-dc.cleanupCh: logger.Ctx(dc.ctx).Infow("delay-consumer: cleanup task on subscription", dc.LogFields()...) - dc.bs.RemoveConsumer(ctx, messagebroker.ConsumerClientOptions{GroupID: dc.subs.GetDelayConsumerGroupID(dc.topic), GroupInstanceID: dc.subs.GetDelayConsumerGroupInstanceID(dc.subscriberID, dc.topic)}) + dc.bs.RemoveConsumer(ctx, messagebroker.ConsumerClientOptions{GroupID: dc.subs.GetDelayConsumerGroupID(dc.topic, dc.partition), GroupInstanceID: dc.subs.GetDelayConsumerGroupInstanceID(dc.subscriberID, dc.topic)}) dc.consumer.Close(dc.ctx) return } @@ -173,13 +175,19 @@ func (dc *DelayConsumer) processMsgs() { return } } else { - // submit to - logger.Ctx(dc.ctx).Infow("delay-consumer: processing message", dc.LogFields("messageID", msg.MessageID)...) - err := dc.handler.Do(dc.ctx, msg) - if err != nil { - logger.Ctx(dc.ctx).Errorw("delay-consumer: error in msg handler", - dc.LogFields("messageID", msg.MessageID, "error", err.Error())...) - return + // Only process message if it belongs to the partition that the current subscriber handles + // Since the consumer group ID is unique for each subscrption-partition consumer, + // messages that do not confirm to the current partition will be handled + // by the delay consumer in the corresponding subscription-partition delay consumer + if int(msg.Partition) == dc.partition { + // submit to delay consumer handler + logger.Ctx(dc.ctx).Infow("delay-consumer: processing message", dc.LogFields("messageID", msg.MessageID)...) + err := dc.handler.Do(dc.ctx, msg) + if err != nil { + logger.Ctx(dc.ctx).Errorw("delay-consumer: error in msg handler", + dc.LogFields("messageID", msg.MessageID, "error", err.Error())...) + return + } } } @@ -219,7 +227,7 @@ func (dc *DelayConsumer) LogFields(kv ...interface{}) []interface{} { fields := []interface{}{ "delayConsumerConfig", map[string]interface{}{ "topic": dc.topic, - "groupID": dc.subs.GetDelayConsumerGroupID(dc.topic), + "groupID": dc.subs.GetDelayConsumerGroupID(dc.topic, dc.partition), "groupInstanceID": dc.subs.GetDelayConsumerGroupInstanceID(dc.subscriberID, dc.topic), }, } diff --git a/internal/subscriber/retry/retrier.go b/internal/subscriber/retry/retrier.go index 3e858b0f..8a228919 100644 --- a/internal/subscriber/retry/retrier.go +++ b/internal/subscriber/retry/retrier.go @@ -30,6 +30,7 @@ type Retrier struct { backoff Backoff finder IntervalFinder handler MessageHandler + partition int delayConsumers sync.Map } @@ -37,7 +38,7 @@ type Retrier struct { func (r *Retrier) Start(ctx context.Context) error { // TODO : validate retrier params for nils and substitute with defaults for interval, topic := range r.subs.GetDelayTopicsMap() { - dc, err := NewDelayConsumer(ctx, r.subscriberID, topic, r.subs, r.bs, r.handler) + dc, err := NewDelayConsumer(ctx, r.subscriberID, topic, r.partition, r.subs, r.bs, r.handler) if err != nil { return err } diff --git a/internal/subscriber/subscriber.go b/internal/subscriber/subscriber.go index 4f0b2d14..1d503183 100644 --- a/internal/subscriber/subscriber.go +++ b/internal/subscriber/subscriber.go @@ -3,6 +3,7 @@ package subscriber import ( "container/heap" "context" + "errors" "strconv" "time" @@ -593,7 +594,11 @@ func (s *Subscriber) pull(req *PullRequest) { s.logInMemoryStats(ctx) } if req.RespChan != nil { - req.RespChan <- &metrov1.PullResponse{ReceivedMessages: sm} + if req.ctx.Err() != nil && errors.Is(req.ctx.Err(), context.Canceled) { + // Request has been terminated, do not respond on the RespChan + } else { + req.RespChan <- &metrov1.PullResponse{ReceivedMessages: sm} + } } else { s.responseChan <- &metrov1.PullResponse{ReceivedMessages: sm} } diff --git a/internal/subscription/delay.go b/internal/subscription/delay.go index 5d25f66e..ef548ace 100644 --- a/internal/subscription/delay.go +++ b/internal/subscription/delay.go @@ -6,8 +6,8 @@ const delayTopicNameFormat = "%v.delay.%v.seconds" // projects/p1/topics/subs.delay.30.seconds const delayTopicWithProjectNameFormat = "projects/%v/topics/%v.delay.%v.seconds" -// subs.delay.30.seconds-cg -const delayConsumerGroupIDFormat = "%v-cg" +// subs.delay.30.seconds-0-cg +const delayConsumerGroupIDFormat = "%v-%d-cg" // delayTopicName-subscriberID const delayConsumerGroupInstanceIDFormat = "%v-%v" diff --git a/internal/subscription/model.go b/internal/subscription/model.go index 96aebcdc..dd3f1aff 100644 --- a/internal/subscription/model.go +++ b/internal/subscription/model.go @@ -197,8 +197,8 @@ func (m *Model) GetDelay3600secTopic() string { } // GetDelayConsumerGroupID returns the consumer group ID to be used by the delay consumers -func (m *Model) GetDelayConsumerGroupID(delayTopic string) string { - return fmt.Sprintf(delayConsumerGroupIDFormat, delayTopic) +func (m *Model) GetDelayConsumerGroupID(delayTopic string, partition int) string { + return fmt.Sprintf(delayConsumerGroupIDFormat, delayTopic, partition) } // GetDelayConsumerGroupInstanceID returns the consumer group ID to be used by the specific delay consumer diff --git a/pkg/messagebroker/kafka.go b/pkg/messagebroker/kafka.go index 0fe38052..6ae483a5 100644 --- a/pkg/messagebroker/kafka.go +++ b/pkg/messagebroker/kafka.go @@ -39,9 +39,14 @@ type KafkaBroker struct { // newKafkaConsumerClient returns a kafka consumer func newKafkaConsumerClient(ctx context.Context, bConfig *BrokerConfig, options *ConsumerClientOptions) (Consumer, error) { - normalizedTopics := make([]string, 0) - for _, topic := range options.Topics { - normalizedTopics = append(normalizedTopics, normalizeTopicName(topic.Topic)) + normalizedTopics := make([]kafkapkg.TopicPartition, 0) + for i := range options.Topics { + brokerTopicName := normalizeTopicName(options.Topics[i].Topic) + kfkTp := kafkapkg.TopicPartition{ + Topic: &brokerTopicName, + Partition: int32(options.Topics[i].Partition), + } + normalizedTopics = append(normalizedTopics, kfkTp) } err := validateKafkaConsumerBrokerConfig(bConfig) @@ -83,7 +88,10 @@ func newKafkaConsumerClient(ctx context.Context, bConfig *BrokerConfig, options return nil, err } - c.SubscribeTopics(normalizedTopics, nil) + err = c.Assign(normalizedTopics) + if err != nil { + return nil, err + } logger.Ctx(ctx).Infow("kafka consumer: initialized", "options", options) diff --git a/service/consume-plane/consumeplaneserver.go b/service/consume-plane/consumeplaneserver.go index 36c9bbe0..2b1cfb37 100644 --- a/service/consume-plane/consumeplaneserver.go +++ b/service/consume-plane/consumeplaneserver.go @@ -80,8 +80,8 @@ func (c consumeplaneserver) Fetch(ctx context.Context, req *metrov1.FetchRequest defer span.Finish() consumer, err := c.manager.GetConsumer(ctx, parsedReq.Subscription, parsedReq.Partition) if err != nil { - logger.Ctx(ctx).Errorw("subscriberserver: error is fetching consumer for fetch request", "request", req, "error", parseErr.Error()) - return nil, err + logger.Ctx(ctx).Errorw("subscriberserver: error in fetching consumer for fetch request", "request", req, "error", err.Error()) + return &metrov1.PullResponse{}, err } res, err := consumer.Fetch(ctx, int(req.MaxMessages)) if err != nil { diff --git a/service/consume-plane/service.go b/service/consume-plane/service.go index e3580a1b..e29d532e 100644 --- a/service/consume-plane/service.go +++ b/service/consume-plane/service.go @@ -20,7 +20,6 @@ import ( "github.com/razorpay/metro/pkg/messagebroker" "github.com/razorpay/metro/pkg/registry" metrov1 "github.com/razorpay/metro/rpc/proto/v1" - _ "github.com/razorpay/metro/statik" // to serve openAPI static assets ) // Service for producer From 7187d54b9d8e7f555d7eb5a793219f05594641f8 Mon Sep 17 00:00:00 2001 From: vnktram Date: Wed, 15 Dec 2021 18:47:23 +0530 Subject: [PATCH 03/14] Fix lint checks --- service/consume-plane/config.go | 2 +- service/consume-plane/consumeplaneserver.go | 2 +- service/consume-plane/server.go | 2 +- service/consume-plane/service.go | 2 +- 4 files changed, 4 insertions(+), 4 deletions(-) diff --git a/service/consume-plane/config.go b/service/consume-plane/config.go index 0938b747..3f5ba042 100644 --- a/service/consume-plane/config.go +++ b/service/consume-plane/config.go @@ -1,4 +1,4 @@ -package consume_plane +package consumeplane import ( "github.com/razorpay/metro/pkg/messagebroker" diff --git a/service/consume-plane/consumeplaneserver.go b/service/consume-plane/consumeplaneserver.go index 2b1cfb37..a966abc6 100644 --- a/service/consume-plane/consumeplaneserver.go +++ b/service/consume-plane/consumeplaneserver.go @@ -1,4 +1,4 @@ -package consume_plane +package consumeplane import ( "context" diff --git a/service/consume-plane/server.go b/service/consume-plane/server.go index 30970c7b..5fb72c75 100644 --- a/service/consume-plane/server.go +++ b/service/consume-plane/server.go @@ -1,4 +1,4 @@ -package consume_plane +package consumeplane import ( "context" diff --git a/service/consume-plane/service.go b/service/consume-plane/service.go index e29d532e..e4e22faa 100644 --- a/service/consume-plane/service.go +++ b/service/consume-plane/service.go @@ -1,4 +1,4 @@ -package consume_plane +package consumeplane import ( "context" From 41f1f2ae20db3d7af815d73d09ff8d208152693a Mon Sep 17 00:00:00 2001 From: vnktram Date: Wed, 15 Dec 2021 19:11:23 +0530 Subject: [PATCH 04/14] Fix lint comments --- config/default.toml | 2 +- config/dev_docker.toml | 2 +- internal/consumer/consumer.go | 19 ++++++++++----- internal/consumer/lifecycle.go | 27 ++++++++++++--------- internal/consumer/request.go | 5 +++- internal/hash/hash.go | 1 + internal/subscriber/core.go | 1 + internal/subscriber/retry/delayconsumer.go | 21 ---------------- internal/subscriber/subscriber.go | 1 + pkg/messagebroker/config.go | 1 + service/consume-plane/config.go | 2 +- service/consume-plane/consumeplaneserver.go | 15 ++++++------ service/consume-plane/service.go | 2 +- 13 files changed, 49 insertions(+), 50 deletions(-) diff --git a/config/default.toml b/config/default.toml index 068b4d97..a896f1fa 100644 --- a/config/default.toml +++ b/config/default.toml @@ -32,7 +32,7 @@ [consumePlane] replicaCount = 1 - ordinalId = 0 + ordinalID = 0 [consumePlane.broker] variant = "kafka" [consumePlane.broker.brokerconfig] diff --git a/config/dev_docker.toml b/config/dev_docker.toml index b022571c..c3ba36c5 100644 --- a/config/dev_docker.toml +++ b/config/dev_docker.toml @@ -30,7 +30,7 @@ [consumePlane] replicaCount = 1 - ordinalId = 0 + ordinalID = 0 [consumePlane.broker] variant = "kafka" [consumePlane.broker.brokerconfig] diff --git a/internal/consumer/consumer.go b/internal/consumer/consumer.go index 515f3ca2..998e2702 100644 --- a/internal/consumer/consumer.go +++ b/internal/consumer/consumer.go @@ -13,14 +13,15 @@ import ( metrov1 "github.com/razorpay/metro/rpc/proto/v1" ) +// IConsumer defines the set of methods to access a consumer object type IConsumer interface { Run() error - Stop() Acknowledge(ctx context.Context, req *ParsedAcknowledgeRequest) ModifyAckDeadline(ctx context.Context, req *ParsedModifyAckDeadlineRequest) Fetch(ctx context.Context, messageCount int) (*metrov1.PullResponse, error) } +// Consumer entity represents a single subscription-partition specific client type Consumer struct { computedHash int subscriberID string @@ -34,6 +35,7 @@ type Consumer struct { // DefaultNumMessageCount ... var DefaultNumMessageCount int32 = 10 +// NewConsumer intializes a consumer entity func NewConsumer(ctx context.Context, computedHash int, subscriberID string, subscription *subscription.Model, subCore subscriber.ICore, subs subscriber.ISubscriber) *Consumer { con := &Consumer{ ctx: ctx, @@ -46,6 +48,7 @@ func NewConsumer(ctx context.Context, computedHash int, subscriberID string, sub return con } +// Fetch retrieves messages for a given consumer, it takes ackDeadline, retry and maxMessages into account. func (c *Consumer) Fetch(ctx context.Context, messageCount int) (*metrov1.PullResponse, error) { respChan := make(chan *metrov1.PullResponse) defer close(respChan) @@ -63,12 +66,15 @@ func (c *Consumer) Fetch(ctx context.Context, messageCount int) (*metrov1.PullRe } +// Acknowledge send an ACK for a set of messages func (c *Consumer) Acknowledge(ctx context.Context, ackMsgs []*subscriber.AckMessage) { for _, ackMsg := range ackMsgs { c.subscriptionSubscriber.GetAckChannel() <- ackMsg.WithContext(ctx) } } +// ModifyAckDeadline allows modification of Ack deadline for a messages(s). +// Deadline of 0 indicates a Nack operation. func (c *Consumer) ModifyAckDeadline(ctx context.Context, mackMsgs []*subscriber.AckMessage) { for _, modAckMsg := range mackMsgs { modAckReq := subscriber.NewModAckMessage(modAckMsg, modAckMsg.Deadline) @@ -77,6 +83,7 @@ func (c *Consumer) ModifyAckDeadline(ctx context.Context, mackMsgs []*subscriber } } +// Run ensures that the lifecycle of a consumer is instantiated. func (c *Consumer) Run() error { // stream ack timeout streamAckDeadlineSecs := int32(30) // init with some sane value @@ -128,9 +135,9 @@ func (c *Consumer) stop() { } -func (s *Consumer) closeSubscriberChannels() { - close(s.errChan) - close(s.subscriptionSubscriber.GetRequestChannel()) - close(s.subscriptionSubscriber.GetAckChannel()) - close(s.subscriptionSubscriber.GetModAckChannel()) +func (c *Consumer) closeSubscriberChannels() { + close(c.errChan) + close(c.subscriptionSubscriber.GetRequestChannel()) + close(c.subscriptionSubscriber.GetAckChannel()) + close(c.subscriptionSubscriber.GetModAckChannel()) } diff --git a/internal/consumer/lifecycle.go b/internal/consumer/lifecycle.go index f23c098d..ada7907b 100644 --- a/internal/consumer/lifecycle.go +++ b/internal/consumer/lifecycle.go @@ -26,20 +26,20 @@ type Manager struct { subscriptionCore subscription.ICore cleanupCh chan cleanupMessage replicas int - ordinalId int + ordinalID int bs brokerstore.IBrokerStore mutex *sync.Mutex ctx context.Context } // NewLifecycleManager ... -func NewLifecycleManager(ctx context.Context, replicas int, ordinalId int, subscriptionCore subscription.ICore, subscriberCore subscriber.ICore, bs brokerstore.IBrokerStore) (ILifecycle, error) { +func NewLifecycleManager(ctx context.Context, replicas int, ordinalID int, subscriptionCore subscription.ICore, subscriberCore subscriber.ICore, bs brokerstore.IBrokerStore) (ILifecycle, error) { mgr := &Manager{ consumers: make(map[int]*Consumer), subscriptionCore: subscriptionCore, bs: bs, replicas: replicas, - ordinalId: ordinalId, + ordinalID: ordinalID, mutex: &sync.Mutex{}, ctx: ctx, } @@ -59,7 +59,7 @@ func NewLifecycleManager(ctx context.Context, replicas int, ordinalId int, subsc } for _, sub := range pullSubs { - subPartitions, err := subscriptionCore.FetchPartitionsForHash(ctx, sub, ordinalId) + subPartitions, err := subscriptionCore.FetchPartitionsForHash(ctx, sub, ordinalID) if err != nil { logger.Ctx(ctx).Errorw("Error resolving partitions for subscription", "subscription", sub.Name) } @@ -93,6 +93,8 @@ func NewLifecycleManager(ctx context.Context, replicas int, ordinalId int, subsc } + // TODO: Implement watch on subscripitons to update active consumers based on watch updates. + // swh := registry.WatchConfig{ // WatchType: "keyprefix", // WatchPath: common.GetBasePrefix() + subscription.Prefix, @@ -110,24 +112,26 @@ func NewLifecycleManager(ctx context.Context, replicas int, ordinalId int, subsc return mgr, nil } -func (s *Manager) Run() { +// Run instantiates the listeners for a lifecycle manager +func (m *Manager) Run() { for { select { - case <-s.ctx.Done(): - for _, con := range s.consumers { + case <-m.ctx.Done(): + for _, con := range m.consumers { con.stop() } return - case cleanupMessage := <-s.cleanupCh: - logger.Ctx(s.ctx).Infow("manager: got request to cleanup subscriber", "cleanupMessage", cleanupMessage) - s.mutex.Lock() + case cleanupMessage := <-m.cleanupCh: + logger.Ctx(m.ctx).Infow("manager: got request to cleanup subscriber", "cleanupMessage", cleanupMessage) + m.mutex.Lock() //Implement cleanup here - s.mutex.Unlock() + m.mutex.Unlock() } } } +// GetConsumer fetches the relevant consumer from the memory map. func (m *Manager) GetConsumer(ctx context.Context, sub string, partition int) (*Consumer, error) { computedHash := m.subscriptionCore.FetchSubscriptionHash(ctx, sub, partition) consumer := m.consumers[computedHash] @@ -137,6 +141,7 @@ func (m *Manager) GetConsumer(ctx context.Context, sub string, partition int) (* return consumer, nil } +// CloseConsumer ensures that the consumer is greacefully exited. func (m *Manager) CloseConsumer(ctx context.Context, computedhash int) error { con := m.consumers[computedhash] if con != nil { diff --git a/internal/consumer/request.go b/internal/consumer/request.go index 2f20aec5..345154fb 100644 --- a/internal/consumer/request.go +++ b/internal/consumer/request.go @@ -5,18 +5,21 @@ import ( metrov1 "github.com/razorpay/metro/rpc/proto/v1" ) +// ParsedFetchRequest ... type ParsedFetchRequest struct { Subscription string MessageCount int Partition int } +// ParsedAcknowledgeRequest ... type ParsedAcknowledgeRequest struct { Subscription string AckIDs []string AckMessages []*subscriber.AckMessage } +//ParsedModifyAckDeadlineRequest ... type ParsedModifyAckDeadlineRequest struct { Subscription string AckIDs []string @@ -24,7 +27,7 @@ type ParsedModifyAckDeadlineRequest struct { ModifyDeadlineMsgIdsWithSecs map[string]int32 } -// +// NewParsedFetchRequest ... func NewParsedFetchRequest(req *metrov1.FetchRequest) (*ParsedFetchRequest, error) { parsedReq := &ParsedFetchRequest{} parsedReq.Subscription = req.Subscription diff --git a/internal/hash/hash.go b/internal/hash/hash.go index 5617cf97..5d7beca1 100644 --- a/internal/hash/hash.go +++ b/internal/hash/hash.go @@ -4,6 +4,7 @@ import ( "hash/fnv" ) +// ComputeHash resolves a checksum given a byte array func ComputeHash(arr []byte) int { h := fnv.New32a() h.Write(arr) diff --git a/internal/subscriber/core.go b/internal/subscriber/core.go index 8948dead..b5a5e456 100644 --- a/internal/subscriber/core.go +++ b/internal/subscriber/core.go @@ -111,6 +111,7 @@ func (c *Core) NewSubscriber(ctx context.Context, return s, nil } +// NewOpinionatedSubscriber ... func (c *Core) NewOpinionatedSubscriber(ctx context.Context, subscriberID string, subscription *subscription.Model, diff --git a/internal/subscriber/retry/delayconsumer.go b/internal/subscriber/retry/delayconsumer.go index 3d15b4a1..bf9db546 100644 --- a/internal/subscriber/retry/delayconsumer.go +++ b/internal/subscriber/retry/delayconsumer.go @@ -98,27 +98,6 @@ func (dc *DelayConsumer) Run(ctx context.Context) { } } -// Run spawns the delay-consumer -func (dc *DelayConsumer) SetupWatch(ctx context.Context) { - defer close(dc.doneCh) - - logger.Ctx(ctx).Infow("delay-consumer: running", dc.LogFields()...) - for { - select { - case <-dc.ctx.Done(): - logger.Ctx(dc.ctx).Infow("delay-consumer: stopping <-ctx.Done() called", dc.LogFields()...) - dc.bs.RemoveConsumer(ctx, messagebroker.ConsumerClientOptions{GroupID: dc.subs.GetDelayConsumerGroupID(dc.topic, dc.partition), GroupInstanceID: dc.subs.GetDelayConsumerGroupInstanceID(dc.subscriberID, dc.topic)}) - dc.consumer.Close(dc.ctx) - return - case <-dc.cleanupCh: - logger.Ctx(dc.ctx).Infow("delay-consumer: cleanup task on subscription", dc.LogFields()...) - dc.bs.RemoveConsumer(ctx, messagebroker.ConsumerClientOptions{GroupID: dc.subs.GetDelayConsumerGroupID(dc.topic, dc.partition), GroupInstanceID: dc.subs.GetDelayConsumerGroupInstanceID(dc.subscriberID, dc.topic)}) - dc.consumer.Close(dc.ctx) - return - } - } -} - // resumes a paused delay-consumer. Additionally process any previously cached messages in buffer. func (dc *DelayConsumer) resume() { logger.Ctx(dc.ctx).Infow("delay-consumer: resuming", dc.LogFields("error", nil)...) diff --git a/internal/subscriber/subscriber.go b/internal/subscriber/subscriber.go index 1d503183..995ac48f 100644 --- a/internal/subscriber/subscriber.go +++ b/internal/subscriber/subscriber.go @@ -67,6 +67,7 @@ type Subscriber struct { retrier retry.IRetrier } +// OpinionatedSubscriber extends a subscriber entity with the relevant partition and the corresponding hash. type OpinionatedSubscriber struct { Subscriber partition int diff --git a/pkg/messagebroker/config.go b/pkg/messagebroker/config.go index a11da3d3..a0af94c1 100644 --- a/pkg/messagebroker/config.go +++ b/pkg/messagebroker/config.go @@ -45,6 +45,7 @@ type ConsumerConfig struct { // AdminConfig holds configuration for admin APIs type AdminConfig struct{} +// TopicPartition represents a topic and one of it's partitions type TopicPartition struct { Topic string Partition int diff --git a/service/consume-plane/config.go b/service/consume-plane/config.go index 3f5ba042..ebf2632c 100644 --- a/service/consume-plane/config.go +++ b/service/consume-plane/config.go @@ -8,7 +8,7 @@ import ( type Config struct { Broker Broker ReplicaCount int - OrdinalId int + OrdinalID int Interfaces struct { API NetworkInterfaces } diff --git a/service/consume-plane/consumeplaneserver.go b/service/consume-plane/consumeplaneserver.go index a966abc6..1f44026e 100644 --- a/service/consume-plane/consumeplaneserver.go +++ b/service/consume-plane/consumeplaneserver.go @@ -33,7 +33,7 @@ func newConsumePlaneServer(brokerStore brokerstore.IBrokerStore, subscriptionCor } // Acknowledge a message -func (s consumeplaneserver) Acknowledge(ctx context.Context, req *metrov1.AcknowledgeRequest) (*emptypb.Empty, error) { +func (c consumeplaneserver) Acknowledge(ctx context.Context, req *metrov1.AcknowledgeRequest) (*emptypb.Empty, error) { logger.Ctx(ctx).Infow("consumeplaneserver: received request to ack messages", "ack_req", req.String()) span, ctx := opentracing.StartSpanFromContext(ctx, "ConsumePlaneServer.Acknowledge", opentracing.Tags{ @@ -53,7 +53,7 @@ func (s consumeplaneserver) Acknowledge(ctx context.Context, req *metrov1.Acknow } for partition, ackMsgs := range partitionAckMsgs { - consumer, err := s.manager.GetConsumer(ctx, parsedReq.Subscription, partition) + consumer, err := c.manager.GetConsumer(ctx, parsedReq.Subscription, partition) if err != nil { logger.Ctx(ctx).Errorw("consumeplaneserver: error is fetching consumer for fetch request", "request", req, "error", parseErr.Error()) return nil, err @@ -65,7 +65,7 @@ func (s consumeplaneserver) Acknowledge(ctx context.Context, req *metrov1.Acknow return new(emptypb.Empty), nil } -// Pull messages +// Fetch messages from the topic-partition func (c consumeplaneserver) Fetch(ctx context.Context, req *metrov1.FetchRequest) (*metrov1.PullResponse, error) { logger.Ctx(ctx).Infow("consumeplaneserver: received request to pull messages", "pull_req", req.String()) span, ctx := opentracing.StartSpanFromContext(ctx, "ConsumePlaneServer.Pull", opentracing.Tags{ @@ -74,13 +74,13 @@ func (c consumeplaneserver) Fetch(ctx context.Context, req *metrov1.FetchRequest parsedReq, parseErr := consumer.NewParsedFetchRequest(req) if parseErr != nil { - logger.Ctx(ctx).Errorw("subscriberserver: error is parsing pull request", "request", req, "error", parseErr.Error()) + logger.Ctx(ctx).Errorw("consumeplaneserver: error is parsing pull request", "request", req, "error", parseErr.Error()) return nil, parseErr } defer span.Finish() consumer, err := c.manager.GetConsumer(ctx, parsedReq.Subscription, parsedReq.Partition) if err != nil { - logger.Ctx(ctx).Errorw("subscriberserver: error in fetching consumer for fetch request", "request", req, "error", err.Error()) + logger.Ctx(ctx).Errorw("consumeplaneserver: error in fetching consumer for fetch request", "request", req, "error", err.Error()) return &metrov1.PullResponse{}, err } res, err := consumer.Fetch(ctx, int(req.MaxMessages)) @@ -90,7 +90,8 @@ func (c consumeplaneserver) Fetch(ctx context.Context, req *metrov1.FetchRequest return res, nil } -func (s consumeplaneserver) ModifyAckDeadline(ctx context.Context, req *metrov1.ModifyAckDeadlineRequest) (*emptypb.Empty, error) { +// ModifyAckDeadline updates dealine for the given ackMessages +func (c consumeplaneserver) ModifyAckDeadline(ctx context.Context, req *metrov1.ModifyAckDeadlineRequest) (*emptypb.Empty, error) { logger.Ctx(ctx).Infow("consumeplaneserver: received request to modack messages", "mod_ack_req", req.String()) span, ctx := opentracing.StartSpanFromContext(ctx, "ConsumePlaneServer.ModifyAckDeadline", opentracing.Tags{ @@ -110,7 +111,7 @@ func (s consumeplaneserver) ModifyAckDeadline(ctx context.Context, req *metrov1. } for partition, ackMsgs := range partitionAckMsgs { - consumer, err := s.manager.GetConsumer(ctx, parsedReq.Subscription, partition) + consumer, err := c.manager.GetConsumer(ctx, parsedReq.Subscription, partition) if err != nil { logger.Ctx(ctx).Errorw("consumeplaneserver: error is fetching consumer for fetch request", "request", req, "error", parseErr.Error()) return nil, err diff --git a/service/consume-plane/service.go b/service/consume-plane/service.go index e4e22faa..1fbeec3c 100644 --- a/service/consume-plane/service.go +++ b/service/consume-plane/service.go @@ -67,7 +67,7 @@ func (svc *Service) Start(ctx context.Context) error { subscriberCore := subscriber.NewCore(brokerStore, subscriptionCore, offset.NewCore(offset.NewRepo(r))) - mgr, err := consumer.NewLifecycleManager(ctx, svc.consumeConfig.ReplicaCount, svc.consumeConfig.OrdinalId, subscriptionCore, subscriberCore, brokerStore) + mgr, err := consumer.NewLifecycleManager(ctx, svc.consumeConfig.ReplicaCount, svc.consumeConfig.OrdinalID, subscriptionCore, subscriberCore, brokerStore) if err != nil { logger.Ctx(ctx).Errorw("consumeplaneserver: error setting up lifecycle manager", "error", err.Error()) } From 6cb111493d20d72f217ecda8b633eb9f88c83c6a Mon Sep 17 00:00:00 2001 From: vnktram Date: Wed, 22 Dec 2021 16:31:39 +0530 Subject: [PATCH 05/14] Supports for watcher on subscription updates --- internal/consumer/lifecycle.go | 235 ++++++++++++++++++++++--------- internal/migration/migration.go | 0 metro-proto | 2 +- service/consume-plane/service.go | 6 +- 4 files changed, 173 insertions(+), 70 deletions(-) create mode 100644 internal/migration/migration.go diff --git a/internal/consumer/lifecycle.go b/internal/consumer/lifecycle.go index ada7907b..ca5b6515 100644 --- a/internal/consumer/lifecycle.go +++ b/internal/consumer/lifecycle.go @@ -7,8 +7,11 @@ import ( "github.com/google/uuid" "github.com/pkg/errors" "github.com/razorpay/metro/pkg/logger" + "github.com/razorpay/metro/pkg/registry" + "golang.org/x/sync/errgroup" "github.com/razorpay/metro/internal/brokerstore" + "github.com/razorpay/metro/internal/common" "github.com/razorpay/metro/internal/subscriber" "github.com/razorpay/metro/internal/subscription" ) @@ -17,118 +20,210 @@ import ( type ILifecycle interface { GetConsumer(ctx context.Context, sub string, partition int) (*Consumer, error) CloseConsumer(ctx context.Context, computedHash int) error - Run() + Run() error } // Manager ... type Manager struct { consumers map[int]*Consumer subscriptionCore subscription.ICore + subscriberCore subscriber.ICore + subCache []*subscription.Model cleanupCh chan cleanupMessage + registry registry.IRegistry replicas int ordinalID int bs brokerstore.IBrokerStore mutex *sync.Mutex ctx context.Context + subWatchData chan *struct{} } // NewLifecycleManager ... -func NewLifecycleManager(ctx context.Context, replicas int, ordinalID int, subscriptionCore subscription.ICore, subscriberCore subscriber.ICore, bs brokerstore.IBrokerStore) (ILifecycle, error) { +func NewLifecycleManager(ctx context.Context, replicas int, ordinalID int, subscriptionCore subscription.ICore, subscriberCore subscriber.ICore, bs brokerstore.IBrokerStore, r registry.IRegistry) (ILifecycle, error) { + + allSubs, err := subscriptionCore.List(ctx, subscription.Prefix) + if err != nil { + logger.Ctx(ctx).Errorw("error fetching new subscription list", "error", err) + return nil, err + } + + // Filter Pull Subscriptions + var pullSubs []*subscription.Model + for _, sub := range allSubs { + if !sub.IsPush() { + pullSubs = append(pullSubs, sub) + } + } + mgr := &Manager{ consumers: make(map[int]*Consumer), subscriptionCore: subscriptionCore, + subscriberCore: subscriberCore, + registry: r, bs: bs, replicas: replicas, ordinalID: ordinalID, mutex: &sync.Mutex{}, ctx: ctx, + subCache: pullSubs, + subWatchData: make(chan *struct{}), } - allSubs, err := subscriptionCore.List(ctx, subscription.Prefix) + return mgr, nil +} + +// Run instantiates the listeners for a lifecycle manager +func (m *Manager) Run() error { + + // Start consumers + m.refreshConsumers() + + var subWatcher registry.IWatcher + + // TODO: Implement watch on subscripitons to update active consumers based on watch updates. + + swh := registry.WatchConfig{ + WatchType: "keyprefix", + WatchPath: common.GetBasePrefix() + subscription.Prefix, + Handler: func(ctx context.Context, pairs []registry.Pair) { + logger.Ctx(ctx).Infow("subscriptions watch handler data", "pairs", pairs) + m.subWatchData <- &struct{}{} + }, + } + + logger.Ctx(m.ctx).Infof("setting watch on subscriptions") + subWatcher, err := m.registry.Watch(m.ctx, &swh) if err != nil { - logger.Ctx(ctx).Errorw("error fetching new subscription list", "error", err) - return nil, err + return err } - // Filter Pull Subscriptions - var pullSubs []*subscription.Model - for _, sub := range allSubs { - if !sub.IsPush() { - pullSubs = append(pullSubs, sub) + leadgrp, gctx := errgroup.WithContext(m.ctx) + + // watch the Subscriptions path for new subscriptions and rebalance + leadgrp.Go(func() error { + watchErr := subWatcher.StartWatch() + close(m.subWatchData) + return watchErr + }) + + // wait for done channel to be closed and stop watches if received done + leadgrp.Go(func() error { + <-gctx.Done() + + if subWatcher != nil { + subWatcher.StopWatch() + } + + logger.Ctx(gctx).Info("scheduler group context done") + return gctx.Err() + }) + for { + select { + case <-m.ctx.Done(): + for h := range m.consumers { + delete(m.consumers, h) + } + return nil + case val := <-m.subWatchData: + if val == nil { + continue + } + allSubs, serr := m.subscriptionCore.List(gctx, subscription.Prefix) + if serr != nil { + logger.Ctx(gctx).Errorw("error fetching new subscription list", "error", serr) + return err + } + // Filter Push Subscriptions + var newSubs []*subscription.Model + for _, sub := range allSubs { + if !sub.IsPush() { + newSubs = append(newSubs, sub) + } + } + + m.subCache = newSubs + m.refreshConsumers() + case cleanupMessage := <-m.cleanupCh: + logger.Ctx(m.ctx).Infow("lifecyclemanager: got request to cleanup subscriber", "cleanupMessage", cleanupMessage) + m.CloseConsumer(m.ctx, cleanupMessage.computedHash) } } +} + +func (m *Manager) refreshConsumers() { - for _, sub := range pullSubs { - subPartitions, err := subscriptionCore.FetchPartitionsForHash(ctx, sub, ordinalID) + logger.Ctx(m.ctx).Infow("lifecyclemanager: Consumer refresh intiated") + existingConsumers := make(map[int]SubPartition) + consumersToAdd := make(map[int]SubPartition) + + for _, sub := range m.subCache { + subPart, err := m.subscriptionCore.FetchPartitionsForHash(m.ctx, sub, m.ordinalID) if err != nil { - logger.Ctx(ctx).Errorw("Error resolving partitions for subscription", "subscription", sub.Name) + logger.Ctx(m.ctx).Errorw("lifecyclemanager: Error resolving partitions for subscription", "subscription", sub.Name) } - for _, partition := range subPartitions { - subscriberID := uuid.New().String() - - var ( - // init these channels and pass to subscriber - // the lifecycle of these channels should be maintain by the user - subscriberRequestCh = make(chan *subscriber.PullRequest) - subscriberAckCh = make(chan *subscriber.AckMessage) - subscriberModAckCh = make(chan *subscriber.ModAckMessage) - ) - computedHash := subscriptionCore.FetchSubscriptionHash(ctx, sub.Name, partition) - subscriber, err := subscriberCore.NewOpinionatedSubscriber(ctx, subscriberID, sub, partition, computedHash, 100, 50, 5000, - subscriberRequestCh, subscriberAckCh, subscriberModAckCh) - if err != nil { - logger.Ctx(ctx).Errorw("lifecyclemanager: failed to create subscriber for subscription-partition", "subscription", sub.Name, "partition", partition) - // Proceed without failing since this requires other subscribers to be setup + for _, partition := range subPart { + computedHash := m.subscriptionCore.FetchSubscriptionHash(m.ctx, sub.Name, partition) + if _, ok := m.consumers[computedHash]; ok { + existingConsumers[computedHash] = SubPartition{ + subscription: sub, + partition: partition, + } } else { - mgr.consumers[computedHash] = &Consumer{ - ctx: ctx, - computedHash: computedHash, - subscriberID: subscriberID, - subscription: sub, - subscriberCore: subscriberCore, - subscriptionSubscriber: subscriber, + consumersToAdd[computedHash] = SubPartition{ + subscription: sub, + partition: partition, } } } - } - // TODO: Implement watch on subscripitons to update active consumers based on watch updates. + logger.Ctx(m.ctx).Infow("lifecyclemanager: Consumer update status", "existing", len(existingConsumers), "new", len(consumersToAdd)) + // Gracefully shutdown reassigned consumers + for h := range m.consumers { + if _, ok := m.consumers[h]; !ok { + m.cleanupCh <- cleanupMessage{ + computedHash: h, + } + } - // swh := registry.WatchConfig{ - // WatchType: "keyprefix", - // WatchPath: common.GetBasePrefix() + subscription.Prefix, - // Handler: func(ctx context.Context, pairs []registry.Pair) { - // logger.Ctx(ctx).Infow("subscriptions watch handler data", "pairs", pairs) - // sm.subWatchData <- &struct{}{} - // }, - // } + } + for hash, sp := range consumersToAdd { + consumer, err := m.CreateConsumer(m.ctx, sp.subscription, sp.partition, hash) + if err != nil { + // Proceed without failing since this requires other subscribers to be setup + logger.Ctx(m.ctx).Errorw("lifecyclemanager: failed to create consumer for subscription-partition assignment", "subscription", sp.subscription.Name, "partition", sp.partition) + } else { + logger.Ctx(m.ctx).Infow("lifecyclemanager: Successfully added consumer", "consumerHash", hash, "subscription", sp.subscription.Name, "partition", sp.partition) + m.consumers[hash] = consumer + } - // logger.Ctx(ctx).Infof("setting watch on subscriptions") - // subWatcher, err = sm.registry.Watch(ctx, &swh) - // if err != nil { - // } + } - return mgr, nil } -// Run instantiates the listeners for a lifecycle manager -func (m *Manager) Run() { +func (m *Manager) CreateConsumer(ctx context.Context, sub *subscription.Model, partition int, hash int) (*Consumer, error) { + subscriberID := uuid.New().String() - for { - select { - case <-m.ctx.Done(): - for _, con := range m.consumers { - con.stop() - } - return - case cleanupMessage := <-m.cleanupCh: - logger.Ctx(m.ctx).Infow("manager: got request to cleanup subscriber", "cleanupMessage", cleanupMessage) - m.mutex.Lock() - //Implement cleanup here - m.mutex.Unlock() - } + subscriberRequestCh := make(chan *subscriber.PullRequest) + subscriberAckCh := make(chan *subscriber.AckMessage) + subscriberModAckCh := make(chan *subscriber.ModAckMessage) + + subscriber, err := m.subscriberCore.NewOpinionatedSubscriber(ctx, subscriberID, sub, partition, hash, 100, 50, 5000, + subscriberRequestCh, subscriberAckCh, subscriberModAckCh) + if err != nil { + logger.Ctx(ctx).Errorw("lifecyclemanager: failed to create subscriber for subscription-partition", "subscription", sub.Name, "partition", partition) + return nil, err } + return &Consumer{ + ctx: m.ctx, + computedHash: hash, + subscriberID: subscriberID, + subscription: sub, + subscriberCore: m.subscriberCore, + subscriptionSubscriber: subscriber, + }, nil + } // GetConsumer fetches the relevant consumer from the memory map. @@ -156,6 +251,10 @@ func (m *Manager) CloseConsumer(ctx context.Context, computedhash int) error { // cleanupMessage ... type cleanupMessage struct { - subscriberID string - subscription string + computedHash int +} + +type SubPartition struct { + subscription *subscription.Model + partition int } diff --git a/internal/migration/migration.go b/internal/migration/migration.go new file mode 100644 index 00000000..e69de29b diff --git a/metro-proto b/metro-proto index 337b7974..b627bce0 160000 --- a/metro-proto +++ b/metro-proto @@ -1 +1 @@ -Subproject commit 337b7974cb5ff0e561ff3c22f15dc4ca755cd592 +Subproject commit b627bce018dc6962b977423d6ddd382284b287b4 diff --git a/service/consume-plane/service.go b/service/consume-plane/service.go index 1fbeec3c..bddca0dc 100644 --- a/service/consume-plane/service.go +++ b/service/consume-plane/service.go @@ -67,13 +67,17 @@ func (svc *Service) Start(ctx context.Context) error { subscriberCore := subscriber.NewCore(brokerStore, subscriptionCore, offset.NewCore(offset.NewRepo(r))) - mgr, err := consumer.NewLifecycleManager(ctx, svc.consumeConfig.ReplicaCount, svc.consumeConfig.OrdinalID, subscriptionCore, subscriberCore, brokerStore) + mgr, err := consumer.NewLifecycleManager(ctx, svc.consumeConfig.ReplicaCount, svc.consumeConfig.OrdinalID, subscriptionCore, subscriberCore, brokerStore, r) if err != nil { logger.Ctx(ctx).Errorw("consumeplaneserver: error setting up lifecycle manager", "error", err.Error()) } // initiates a error group grp, gctx := errgroup.WithContext(ctx) + grp.Go(func() error { + mgr.Run() + return nil + }) grp.Go(func() error { err := server.RunGRPCServer( gctx, From b3d888a75c3d7facb3519903a9fad67d8bdb0381 Mon Sep 17 00:00:00 2001 From: vnktram Date: Wed, 22 Dec 2021 16:59:54 +0530 Subject: [PATCH 06/14] Remove unused file --- internal/migration/migration.go | 0 1 file changed, 0 insertions(+), 0 deletions(-) delete mode 100644 internal/migration/migration.go diff --git a/internal/migration/migration.go b/internal/migration/migration.go deleted file mode 100644 index e69de29b..00000000 From 82697fe74eccbfa8a43709ba85cde82cb9d1341f Mon Sep 17 00:00:00 2001 From: vnktram Date: Wed, 22 Dec 2021 17:04:20 +0530 Subject: [PATCH 07/14] Fix lint issues --- internal/consumer/lifecycle.go | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/internal/consumer/lifecycle.go b/internal/consumer/lifecycle.go index ca5b6515..6cd0839d 100644 --- a/internal/consumer/lifecycle.go +++ b/internal/consumer/lifecycle.go @@ -154,8 +154,8 @@ func (m *Manager) Run() error { func (m *Manager) refreshConsumers() { logger.Ctx(m.ctx).Infow("lifecyclemanager: Consumer refresh intiated") - existingConsumers := make(map[int]SubPartition) - consumersToAdd := make(map[int]SubPartition) + existingConsumers := make(map[int]subPartition) + consumersToAdd := make(map[int]subPartition) for _, sub := range m.subCache { subPart, err := m.subscriptionCore.FetchPartitionsForHash(m.ctx, sub, m.ordinalID) @@ -165,12 +165,12 @@ func (m *Manager) refreshConsumers() { for _, partition := range subPart { computedHash := m.subscriptionCore.FetchSubscriptionHash(m.ctx, sub.Name, partition) if _, ok := m.consumers[computedHash]; ok { - existingConsumers[computedHash] = SubPartition{ + existingConsumers[computedHash] = subPartition{ subscription: sub, partition: partition, } } else { - consumersToAdd[computedHash] = SubPartition{ + consumersToAdd[computedHash] = subPartition{ subscription: sub, partition: partition, } @@ -202,6 +202,7 @@ func (m *Manager) refreshConsumers() { } +// CreateConsumer sets up the underlying subscriber and returns the consumer object func (m *Manager) CreateConsumer(ctx context.Context, sub *subscription.Model, partition int, hash int) (*Consumer, error) { subscriberID := uuid.New().String() @@ -254,7 +255,7 @@ type cleanupMessage struct { computedHash int } -type SubPartition struct { +type subPartition struct { subscription *subscription.Model partition int } From 9e47ce2a730e30faf62fcb8d14adde5e1d7d90b8 Mon Sep 17 00:00:00 2001 From: vnktram Date: Wed, 22 Dec 2021 17:21:45 +0530 Subject: [PATCH 08/14] Fix unit tests delayconsumer.go --- internal/subscriber/retry/delayconsumer_test.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/internal/subscriber/retry/delayconsumer_test.go b/internal/subscriber/retry/delayconsumer_test.go index b54e5052..5ac29578 100644 --- a/internal/subscriber/retry/delayconsumer_test.go +++ b/internal/subscriber/retry/delayconsumer_test.go @@ -1,3 +1,4 @@ +//go:build unit // +build unit package retry @@ -55,7 +56,7 @@ func TestDelayConsumer_Run(t *testing.T) { mockConsumer.EXPECT().Resume(gomock.AssignableToTypeOf(ctx), gomock.Any()).Return(nil).AnyTimes() // initialize delay consumer - dc, err := NewDelayConsumer(ctx, subscriberID, "t1", subs, mockBrokerStore, mockHandler) + dc, err := NewDelayConsumer(ctx, subscriberID, "t1", 0, subs, mockBrokerStore, mockHandler) assert.NotNil(t, dc) assert.Nil(t, err) assert.NotNil(t, dc.LogFields()) From cdedd54fdf79150290dd25acced88d73b39653c9 Mon Sep 17 00:00:00 2001 From: vnktram Date: Wed, 22 Dec 2021 17:52:29 +0530 Subject: [PATCH 09/14] Fix unit tests --- internal/hash/hash_test.go | 36 +++++++++++++++++++++++++++ internal/subscription/core_test.go | 7 +++--- internal/subscription/model_test.go | 2 +- pkg/messagebroker/kafka_test.go | 8 +++++- pkg/messagebroker/validations_test.go | 17 ++++++++++--- 5 files changed, 62 insertions(+), 8 deletions(-) create mode 100644 internal/hash/hash_test.go diff --git a/internal/hash/hash_test.go b/internal/hash/hash_test.go new file mode 100644 index 00000000..e5da0c37 --- /dev/null +++ b/internal/hash/hash_test.go @@ -0,0 +1,36 @@ +package hash + +import "testing" + +func TestComputeHash(t *testing.T) { + type args struct { + arr []byte + } + tests := []struct { + name string + args args + want int + }{ + { + name: "test string 1", + args: args{ + arr: []byte("abcdef-123"), + }, + want: 2675134171, + }, + { + name: "test string 2", + args: args{ + arr: []byte("lorem ipsum dolor sit amet"), + }, + want: 2828033737, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + if got := ComputeHash(tt.args.arr); got != tt.want { + t.Errorf("ComputeHash() = %v, want %v", got, tt.want) + } + }) + } +} diff --git a/internal/subscription/core_test.go b/internal/subscription/core_test.go index 574509de..c12fed5d 100644 --- a/internal/subscription/core_test.go +++ b/internal/subscription/core_test.go @@ -1,3 +1,4 @@ +//go:build unit // +build unit package subscription @@ -20,7 +21,7 @@ func TestSubscriptionCore_UpdateSubscription(t *testing.T) { mockTopicCore := tCore.NewMockICore(ctrl) mockRepo := repo.NewMockIRepo(ctrl) - core := NewCore(mockRepo, mockProjectCore, mockTopicCore) + core := NewCore(1, mockRepo, mockProjectCore, mockTopicCore) sub := Model{ Name: "projects/project123/subscriptions/subscription123", Topic: "projects/project123/subscriptions/subscription123", @@ -45,7 +46,7 @@ func TestSubscriptionCore_UpdateSubscriptionNotExists(t *testing.T) { mockTopicCore := tCore.NewMockICore(ctrl) mockRepo := repo.NewMockIRepo(ctrl) - core := NewCore(mockRepo, mockProjectCore, mockTopicCore) + core := NewCore(1, mockRepo, mockProjectCore, mockTopicCore) sub := Model{ Name: "projects/project123/subscriptions/subscription123", Topic: "projects/project123/subscriptions/subscription123", @@ -70,7 +71,7 @@ func TestSubscriptionCore_UpdateSubscriptionProjectNotExists(t *testing.T) { mockTopicCore := tCore.NewMockICore(ctrl) mockRepo := repo.NewMockIRepo(ctrl) - core := NewCore(mockRepo, mockProjectCore, mockTopicCore) + core := NewCore(1, mockRepo, mockProjectCore, mockTopicCore) sub := Model{ Name: "projects/project123/subscriptions/subscription123", Topic: "projects/project123/subscriptions/subscription123", diff --git a/internal/subscription/model_test.go b/internal/subscription/model_test.go index ce42e026..9c756f4a 100644 --- a/internal/subscription/model_test.go +++ b/internal/subscription/model_test.go @@ -79,6 +79,6 @@ func Test_Model(t *testing.T) { assert.Equal(t, 8, len(dSubscription.GetDelayTopicsMap())) assert.Equal(t, []string{"projects/test-project/topics/test-subscription.delay.5.seconds", "projects/test-project/topics/test-subscription.delay.30.seconds", "projects/test-project/topics/test-subscription.delay.60.seconds", "projects/test-project/topics/test-subscription.delay.150.seconds", "projects/test-project/topics/test-subscription.delay.300.seconds", "projects/test-project/topics/test-subscription.delay.600.seconds", "projects/test-project/topics/test-subscription.delay.1800.seconds", "projects/test-project/topics/test-subscription.delay.3600.seconds"}, delayTopics) - assert.Equal(t, "test-subscription.delay.5.seconds-cg", dSubscription.GetDelayConsumerGroupID("test-subscription.delay.5.seconds")) + assert.Equal(t, "test-subscription.delay.5.seconds-0-cg", dSubscription.GetDelayConsumerGroupID("test-subscription.delay.5.seconds", 0)) assert.Equal(t, "test-subscription.delay.5.seconds-subscriberID", dSubscription.GetDelayConsumerGroupInstanceID("subscriberID", "test-subscription.delay.5.seconds")) } diff --git a/pkg/messagebroker/kafka_test.go b/pkg/messagebroker/kafka_test.go index 33a22542..740414b1 100644 --- a/pkg/messagebroker/kafka_test.go +++ b/pkg/messagebroker/kafka_test.go @@ -1,3 +1,4 @@ +//go:build unit // +build unit package messagebroker @@ -60,7 +61,12 @@ func getValidBrokerConfig() *BrokerConfig { func getValidConsumerClientOptions() *ConsumerClientOptions { return &ConsumerClientOptions{ - Topics: []string{"t1"}, + Topics: []TopicPartition{ + { + Topic: "t1", + Partition: 0, + }, + }, Subscription: "s1", GroupID: "sg1", GroupInstanceID: "sg_i1", diff --git a/pkg/messagebroker/validations_test.go b/pkg/messagebroker/validations_test.go index a30aef6e..29bc6370 100644 --- a/pkg/messagebroker/validations_test.go +++ b/pkg/messagebroker/validations_test.go @@ -1,3 +1,4 @@ +//go:build unit // +build unit package messagebroker @@ -10,7 +11,12 @@ import ( func Test_validateKafkaConsumerClientConfig(t *testing.T) { op1 := ConsumerClientOptions{ - Topics: []string{"t1"}, + Topics: []TopicPartition{ + { + Topic: "t1", + Partition: 0, + }, + }, GroupID: "g1", } @@ -18,10 +24,15 @@ func Test_validateKafkaConsumerClientConfig(t *testing.T) { ops := []ConsumerClientOptions{ { - Topics: []string{"t1"}, + Topics: []TopicPartition{ + { + Topic: "t1", + Partition: 0, + }, + }, GroupID: "", }, { - Topics: []string{}, + Topics: []TopicPartition{}, GroupID: "g3", }, } From bf629b9c32fb083ab3cdffa339d981433b121f46 Mon Sep 17 00:00:00 2001 From: vnktram Date: Thu, 23 Dec 2021 08:39:13 +0530 Subject: [PATCH 10/14] Fix integration tests --- .../messagebroker/messagebroker_kafka_test.go | 21 ++++++++++++++++--- 1 file changed, 18 insertions(+), 3 deletions(-) diff --git a/tests/integration/messagebroker/messagebroker_kafka_test.go b/tests/integration/messagebroker/messagebroker_kafka_test.go index 2c172eab..f1c51150 100644 --- a/tests/integration/messagebroker/messagebroker_kafka_test.go +++ b/tests/integration/messagebroker/messagebroker_kafka_test.go @@ -33,7 +33,12 @@ func Test_CreateValidTopic(t *testing.T) { // create consumer to fetch topic metadata consumer1, err := messagebroker.NewConsumerClient(context.Background(), "kafka", getKafkaBrokerConfig(), &messagebroker.ConsumerClientOptions{ - Topics: []string{topic}, + Topics: []TopicPartition{ + { + Topic: topic, + Partition: partition, + }, + }, GroupID: "dummy-group-2", }) metadata, merr := consumer1.GetTopicPartitionMetadata(context.Background(), messagebroker.GetTopicPartitionMetadataRequest{Topic: topic}) @@ -150,7 +155,12 @@ func Test_ProduceAndConsumeMessagesInDetail(t *testing.T) { // now consume the messages and assert the message ids generated in the previous step consumer1, err := messagebroker.NewConsumerClient(context.Background(), "kafka", getKafkaBrokerConfig(), &messagebroker.ConsumerClientOptions{ - Topics: []string{topic}, + Topics: []TopicPartition{ + { + Topic: topic, + Partition: 0, + } + }, GroupID: "dummy-group-1", }) @@ -175,7 +185,12 @@ func Test_ProduceAndConsumeMessagesInDetail(t *testing.T) { // spwan a new consumer and try to re-receive after commit and make sure no new messages are available consumer3, err := messagebroker.NewConsumerClient(context.Background(), "kafka", getKafkaBrokerConfig(), &messagebroker.ConsumerClientOptions{ - Topics: []string{topic}, + Topics: []TopicPartition{ + { + Topic:topic, + Partition: 0, + }, + }, GroupID: "dummy-group-1", }) From 9f51228670ae33fe64bffbacd5b22f287f6260ca Mon Sep 17 00:00:00 2001 From: vnktram Date: Thu, 23 Dec 2021 09:55:07 +0530 Subject: [PATCH 11/14] Fix integration test --- .../messagebroker/messagebroker_kafka_test.go | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/tests/integration/messagebroker/messagebroker_kafka_test.go b/tests/integration/messagebroker/messagebroker_kafka_test.go index f1c51150..093038ec 100644 --- a/tests/integration/messagebroker/messagebroker_kafka_test.go +++ b/tests/integration/messagebroker/messagebroker_kafka_test.go @@ -33,10 +33,10 @@ func Test_CreateValidTopic(t *testing.T) { // create consumer to fetch topic metadata consumer1, err := messagebroker.NewConsumerClient(context.Background(), "kafka", getKafkaBrokerConfig(), &messagebroker.ConsumerClientOptions{ - Topics: []TopicPartition{ + Topics: []messagebroker.TopicPartition{ { - Topic: topic, - Partition: partition, + Topic: topic, + Partition: 0, }, }, GroupID: "dummy-group-2", @@ -155,11 +155,11 @@ func Test_ProduceAndConsumeMessagesInDetail(t *testing.T) { // now consume the messages and assert the message ids generated in the previous step consumer1, err := messagebroker.NewConsumerClient(context.Background(), "kafka", getKafkaBrokerConfig(), &messagebroker.ConsumerClientOptions{ - Topics: []TopicPartition{ - { - Topic: topic, + Topics: []messagebroker.TopicPartition{ + { + Topic: topic, Partition: 0, - } + }, }, GroupID: "dummy-group-1", }) @@ -185,9 +185,9 @@ func Test_ProduceAndConsumeMessagesInDetail(t *testing.T) { // spwan a new consumer and try to re-receive after commit and make sure no new messages are available consumer3, err := messagebroker.NewConsumerClient(context.Background(), "kafka", getKafkaBrokerConfig(), &messagebroker.ConsumerClientOptions{ - Topics: []TopicPartition{ + Topics: []messagebroker.TopicPartition{ { - Topic:topic, + Topic: topic, Partition: 0, }, }, From 36ad18640bd52b0e1b68d259b212951e22132e2f Mon Sep 17 00:00:00 2001 From: vnktram Date: Tue, 28 Dec 2021 13:21:02 +0530 Subject: [PATCH 12/14] Fix Assign vs Subscribe bug --- internal/consumer/lifecycle.go | 5 ++-- pkg/messagebroker/kafka.go | 45 +++++++++++++++++++++++++--------- 2 files changed, 36 insertions(+), 14 deletions(-) diff --git a/internal/consumer/lifecycle.go b/internal/consumer/lifecycle.go index 6cd0839d..7266fc95 100644 --- a/internal/consumer/lifecycle.go +++ b/internal/consumer/lifecycle.go @@ -210,8 +210,9 @@ func (m *Manager) CreateConsumer(ctx context.Context, sub *subscription.Model, p subscriberAckCh := make(chan *subscriber.AckMessage) subscriberModAckCh := make(chan *subscriber.ModAckMessage) - subscriber, err := m.subscriberCore.NewOpinionatedSubscriber(ctx, subscriberID, sub, partition, hash, 100, 50, 5000, - subscriberRequestCh, subscriberAckCh, subscriberModAckCh) + subscriber, err := m.subscriberCore.NewSubscriber(ctx, subscriberID, sub, 100, 50, 5000, subscriberRequestCh, subscriberAckCh, subscriberModAckCh) + // subscriber, err := m.subscriberCore.NewOpinionatedSubscriber(ctx, subscriberID, sub, partition, hash, 100, 50, 5000, + // subscriberRequestCh, subscriberAckCh, subscriberModAckCh) if err != nil { logger.Ctx(ctx).Errorw("lifecyclemanager: failed to create subscriber for subscription-partition", "subscription", sub.Name, "partition", partition) return nil, err diff --git a/pkg/messagebroker/kafka.go b/pkg/messagebroker/kafka.go index 6ae483a5..b1b00b03 100644 --- a/pkg/messagebroker/kafka.go +++ b/pkg/messagebroker/kafka.go @@ -39,14 +39,24 @@ type KafkaBroker struct { // newKafkaConsumerClient returns a kafka consumer func newKafkaConsumerClient(ctx context.Context, bConfig *BrokerConfig, options *ConsumerClientOptions) (Consumer, error) { - normalizedTopics := make([]kafkapkg.TopicPartition, 0) - for i := range options.Topics { - brokerTopicName := normalizeTopicName(options.Topics[i].Topic) - kfkTp := kafkapkg.TopicPartition{ - Topic: &brokerTopicName, - Partition: int32(options.Topics[i].Partition), - } - normalizedTopics = append(normalizedTopics, kfkTp) + //////////////////////////////////////////////////////////////////////////////////////////////////////////// + // ** This section is deferred until ingestion/subscription topics are in place. + // ** Explicit Partition Assignment not to be undertaken since consumer groups are no longer honored. + // + // normalizedTopics := make([]kafkapkg.TopicPartition, 0) + // for i := range options.Topics { + // brokerTopicName := normalizeTopicName(options.Topics[i].Topic) + // kfkTp := kafkapkg.TopicPartition{ + // Topic: &brokerTopicName, + // Partition: int32(options.Topics[i].Partition), + // } + // normalizedTopics = append(normalizedTopics, kfkTp) + // } + //////////////////////////////////////////////////////////////////////////////////////////////////////////// + + normalizedTopics := make([]string, 0) + for _, topic := range options.Topics { + normalizedTopics = append(normalizedTopics, normalizeTopicName(topic.Topic)) } err := validateKafkaConsumerBrokerConfig(bConfig) @@ -88,10 +98,21 @@ func newKafkaConsumerClient(ctx context.Context, bConfig *BrokerConfig, options return nil, err } - err = c.Assign(normalizedTopics) - if err != nil { - return nil, err - } + // With partition specific consumers in place at the consume plane this would still work given + // only one partition per topic is currently supported. + // This behaviour is not guaranteed with multiple partitions per primary topic. + c.SubscribeTopics(normalizedTopics, nil) + + ////////////////////////////////////////////////////// + // *** Warning *** DO NOT explicitly assign topic partitions to consumers + // unless subscriptions have dedicated topics. This will result in consumers being shared across subscribers + // and messages being consumed erratically. + // + // err = c.Assign(normalizedTopics) + // if err != nil { + // return nil, err + // } + ////////////////////////////////////////////////////// logger.Ctx(ctx).Infow("kafka consumer: initialized", "options", options) From e5f1d1d6664b2e6969c8a211b80c943928a20e6e Mon Sep 17 00:00:00 2001 From: vnktram Date: Tue, 4 Jan 2022 11:38:21 +0530 Subject: [PATCH 13/14] Fix consumer restarts --- internal/consumer/lifecycle.go | 27 +++++++++++++++++++++++---- 1 file changed, 23 insertions(+), 4 deletions(-) diff --git a/internal/consumer/lifecycle.go b/internal/consumer/lifecycle.go index 7266fc95..3107120f 100644 --- a/internal/consumer/lifecycle.go +++ b/internal/consumer/lifecycle.go @@ -29,7 +29,8 @@ type Manager struct { subscriptionCore subscription.ICore subscriberCore subscriber.ICore subCache []*subscription.Model - cleanupCh chan cleanupMessage + cleanupCh chan consumerIdentifier + recoveryCh chan consumerIdentifier registry registry.IRegistry replicas int ordinalID int @@ -62,6 +63,8 @@ func NewLifecycleManager(ctx context.Context, replicas int, ordinalID int, subsc subscriberCore: subscriberCore, registry: r, bs: bs, + cleanupCh: make(chan consumerIdentifier), + recoveryCh: make(chan consumerIdentifier), replicas: replicas, ordinalID: ordinalID, mutex: &sync.Mutex{}, @@ -144,6 +147,22 @@ func (m *Manager) Run() error { m.subCache = newSubs m.refreshConsumers() + case recoveryMessage := <-m.recoveryCh: + logger.Ctx(m.ctx).Infow("lifecyclemanager: consumer encountered irrecoverable error, attempting to recover", "recoveryMessage", recoveryMessage) + if existingCon, ok := m.consumers[recoveryMessage.computedHash]; ok { + + sub := existingCon.subscription + part := existingCon.subscription.Partition + + m.CloseConsumer(m.ctx, recoveryMessage.computedHash) + + con, err := m.CreateConsumer(m.ctx, sub, part, recoveryMessage.computedHash) + if err != nil { + logger.Ctx(m.ctx).Errorw("lifecyclemanager: failed to restore consumer", "subscription", sub.Name, "partition", part) + } + m.consumers[recoveryMessage.computedHash] = con + } + case cleanupMessage := <-m.cleanupCh: logger.Ctx(m.ctx).Infow("lifecyclemanager: got request to cleanup subscriber", "cleanupMessage", cleanupMessage) m.CloseConsumer(m.ctx, cleanupMessage.computedHash) @@ -182,7 +201,7 @@ func (m *Manager) refreshConsumers() { // Gracefully shutdown reassigned consumers for h := range m.consumers { if _, ok := m.consumers[h]; !ok { - m.cleanupCh <- cleanupMessage{ + m.cleanupCh <- consumerIdentifier{ computedHash: h, } } @@ -251,8 +270,8 @@ func (m *Manager) CloseConsumer(ctx context.Context, computedhash int) error { return nil } -// cleanupMessage ... -type cleanupMessage struct { +// consumerIdentifier ... +type consumerIdentifier struct { computedHash int } From 853402c842742d88cb898791069c54e334f7197a Mon Sep 17 00:00:00 2001 From: Venkat Ram Date: Wed, 12 Jan 2022 14:46:35 +0530 Subject: [PATCH 14/14] Consume/web integration (#293) * Integrate consume plane with web pods * Remove unused file * Fix lint issues * Proto commit --- config/default.toml | 11 + config/dev_docker.toml | 2 + config/func.toml | 2 + config/perf.toml | 2 + config/stage.toml | 2 + go.mod | 1 + internal/subscription/core.go | 13 +- service/consume-plane/consumeplaneserver.go | 3 +- service/web/config.go | 8 +- service/web/service.go | 9 +- service/web/stream/request.go | 16 ++ service/web/subscriberserver.go | 231 ++++++++++++++------ 12 files changed, 220 insertions(+), 80 deletions(-) diff --git a/config/default.toml b/config/default.toml index a896f1fa..47aa86d3 100644 --- a/config/default.toml +++ b/config/default.toml @@ -17,6 +17,8 @@ errorLevel = 1 [web] + replicaCount = 1 + consumePlaneDeployment = "metro.svc.cluster.local" [web.broker] variant = "kafka" [web.broker.brokerconfig] @@ -28,6 +30,15 @@ GrpcServerAddress = "0.0.0.0:8081" HttpServerAddress = "0.0.0.0:8082" InternalHttpServerAddress = "0.0.0.0:9000" + [web.httpclientconfig] + connectTimeoutMs = 2000 + connKeepAliveMs = 0 + expectContinueTimeoutMs = 0 + idleConnTimeoutMs = 60000 + maxAllIdleConns = 1000 + maxHostIdleConns = 1000 + responseHeaderTimeoutMs = 25000 + tlsHandshakeTimeoutMs = 2000 [consumePlane] diff --git a/config/dev_docker.toml b/config/dev_docker.toml index c3ba36c5..41461b33 100644 --- a/config/dev_docker.toml +++ b/config/dev_docker.toml @@ -17,6 +17,8 @@ errorLevel = 1 [web] + replicaCount = 1 + consumePlaneDeployment = "metro.svc.cluster.local" [web.broker] variant = "kafka" [web.broker.brokerconfig] diff --git a/config/func.toml b/config/func.toml index a1d2563b..2ea22f4b 100644 --- a/config/func.toml +++ b/config/func.toml @@ -17,6 +17,8 @@ errorLevel = 1 [web] + replicaCount = 1 + consumePlaneDeployment = "int.perf.razorpay.in" [web.broker] variant = "kafka" [web.broker.brokerconfig] diff --git a/config/perf.toml b/config/perf.toml index d83fb95c..23d13fc0 100644 --- a/config/perf.toml +++ b/config/perf.toml @@ -17,6 +17,8 @@ errorLevel = 1 [web] + replicaCount = 1 + consumePlaneDeployment = "int.perf.razorpay.in" [web.broker] variant = "kafka" [web.broker.brokerconfig] diff --git a/config/stage.toml b/config/stage.toml index 043de590..c452935c 100644 --- a/config/stage.toml +++ b/config/stage.toml @@ -17,6 +17,8 @@ errorLevel = 1 [web] + replicaCount = 1 + consumePlaneDeployment = "int.stage.razorpay.in" [web.broker] variant = "kafka" [web.broker.brokerconfig] diff --git a/go.mod b/go.mod index 874f5d32..30055de7 100644 --- a/go.mod +++ b/go.mod @@ -14,6 +14,7 @@ require ( github.com/dvsekhvalnov/jose2go v0.0.0-20201001154944-b09cfaf05951 // indirect github.com/fatih/color v1.10.0 // indirect github.com/getsentry/sentry-go v0.11.0 + github.com/gogo/protobuf v1.3.2 // indirect github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect github.com/golang/mock v1.6.0 github.com/golang/protobuf v1.5.2 diff --git a/internal/subscription/core.go b/internal/subscription/core.go index 25c8e04e..9780c54a 100644 --- a/internal/subscription/core.go +++ b/internal/subscription/core.go @@ -29,7 +29,6 @@ type ICore interface { Migrate(ctx context.Context, names []string) error FetchPartitionsForHash(ctx context.Context, m *Model, node int) ([]int, error) FetchSubscriptionHash(ctx context.Context, subName string, partition int) int - GetPartition(ctx context.Context, m *Model) int } const ( @@ -348,6 +347,13 @@ func (c *Core) FetchPartitionsForHash(ctx context.Context, sub *Model, node int) return matchingPartitions, err } + if c.TotalNodes == 1 { + partitions := make([]int, topicModel.NumPartitions) + for i := 0; i < topicModel.NumPartitions; i++ { + partitions = append(partitions, i) + } + return partitions, nil + } for i := 0; i < topicModel.NumPartitions; i++ { computedHash := c.FetchSubscriptionHash(ctx, sub.Name, i) partitionNode := computedHash % c.TotalNodes @@ -364,11 +370,6 @@ func (c *Core) FetchSubscriptionHash(ctx context.Context, sub string, partition return hash.ComputeHash([]byte(sub + strconv.Itoa(partition))) } -// GetPartition ... -func (c *Core) GetPartition(ctx context.Context, m *Model) int { - return m.Partition -} - // Migrate takes care of backfilling subscription topics for existing subscriptions. // This is an idempotent operation that creates topics for each subscription. // Migrate can be modified in the future for other use-cases as well. diff --git a/service/consume-plane/consumeplaneserver.go b/service/consume-plane/consumeplaneserver.go index 1f44026e..882ac7fb 100644 --- a/service/consume-plane/consumeplaneserver.go +++ b/service/consume-plane/consumeplaneserver.go @@ -80,11 +80,12 @@ func (c consumeplaneserver) Fetch(ctx context.Context, req *metrov1.FetchRequest defer span.Finish() consumer, err := c.manager.GetConsumer(ctx, parsedReq.Subscription, parsedReq.Partition) if err != nil { - logger.Ctx(ctx).Errorw("consumeplaneserver: error in fetching consumer for fetch request", "request", req, "error", err.Error()) + logger.Ctx(ctx).Errorw("consumeplaneserver: error in fetching consumer for fetch request", "subscription", parsedReq.Subscription) return &metrov1.PullResponse{}, err } res, err := consumer.Fetch(ctx, int(req.MaxMessages)) if err != nil { + logger.Ctx(ctx).Errorw("consumeplaneserver: Failed to fetch messages", "err", err.Error()) return &metrov1.PullResponse{}, err } return res, nil diff --git a/service/web/config.go b/service/web/config.go index 89a4c28c..a65d5f4b 100644 --- a/service/web/config.go +++ b/service/web/config.go @@ -1,13 +1,17 @@ package web import ( + "github.com/razorpay/metro/pkg/httpclient" "github.com/razorpay/metro/pkg/messagebroker" ) // Config for producer type Config struct { - Broker Broker - Interfaces struct { + Broker Broker + ReplicaCount int + ConsumePlaneAddress string + HTTPClientConfig httpclient.Config + Interfaces struct { API NetworkInterfaces } } diff --git a/service/web/service.go b/service/web/service.go index 6205f059..789fc8c4 100644 --- a/service/web/service.go +++ b/service/web/service.go @@ -11,7 +11,6 @@ import ( "github.com/razorpay/metro/internal/credentials" "github.com/razorpay/metro/internal/health" "github.com/razorpay/metro/internal/interceptors" - "github.com/razorpay/metro/internal/offset" "github.com/razorpay/metro/internal/project" "github.com/razorpay/metro/internal/publisher" "github.com/razorpay/metro/internal/server" @@ -21,7 +20,6 @@ import ( "github.com/razorpay/metro/pkg/messagebroker" "github.com/razorpay/metro/pkg/registry" metrov1 "github.com/razorpay/metro/rpc/proto/v1" - "github.com/razorpay/metro/service/web/stream" _ "github.com/razorpay/metro/statik" // to serve openAPI static assets ) @@ -77,9 +75,8 @@ func (svc *Service) Start(ctx context.Context) error { publisherCore := publisher.NewCore(brokerStore) - offsetCore := offset.NewCore(offset.NewRepo(r)) - - streamManager := stream.NewStreamManager(ctx, subscriptionCore, offsetCore, brokerStore, svc.webConfig.Interfaces.API.GrpcServerAddress) + replicaCount := svc.webConfig.ReplicaCount + consumePlaneDeployment := svc.webConfig.ConsumePlaneAddress // initiates a error group grp, gctx := errgroup.WithContext(ctx) @@ -92,7 +89,7 @@ func (svc *Service) Start(ctx context.Context) error { metrov1.RegisterStatusCheckAPIServer(server, health.NewServer(healthCore)) metrov1.RegisterPublisherServer(server, newPublisherServer(projectCore, brokerStore, topicCore, credentialsCore, publisherCore)) metrov1.RegisterAdminServiceServer(server, newAdminServer(svc.admin, projectCore, subscriptionCore, topicCore, credentialsCore, brokerStore)) - metrov1.RegisterSubscriberServer(server, newSubscriberServer(projectCore, brokerStore, subscriptionCore, credentialsCore, streamManager)) + metrov1.RegisterSubscriberServer(server, newSubscriberServer(projectCore, brokerStore, subscriptionCore, topicCore, credentialsCore, replicaCount, consumePlaneDeployment, &svc.webConfig.HTTPClientConfig)) return nil }, getInterceptors()..., diff --git a/service/web/stream/request.go b/service/web/stream/request.go index 6bfed2d4..137013c5 100644 --- a/service/web/stream/request.go +++ b/service/web/stream/request.go @@ -16,6 +16,12 @@ type ParsedStreamingPullRequest struct { ModifyDeadlineMsgIdsWithSecs map[string]int32 } +// ParsedPullRequest ... +type ParsedPullRequest struct { + Subscription string + MaxMessages int +} + // HasSubscription ... func (r *ParsedStreamingPullRequest) HasSubscription() bool { return len(strings.Trim(r.Subscription, " ")) > 0 @@ -57,6 +63,16 @@ func NewParsedStreamingPullRequest(req *metrov1.StreamingPullRequest) (*ParsedSt return parsedReq, nil } +// NewParsedPullRequest ... +func NewParsedPullRequest(req *metrov1.PullRequest) (*ParsedPullRequest, error) { + parsedReq := &ParsedPullRequest{ + Subscription: req.Subscription, + MaxMessages: int(req.MaxMessages), + } + + return parsedReq, nil +} + // NewParsedAcknowledgeRequest ... func NewParsedAcknowledgeRequest(req *metrov1.AcknowledgeRequest) (*ParsedStreamingPullRequest, error) { parsedReq := &ParsedStreamingPullRequest{} diff --git a/service/web/subscriberserver.go b/service/web/subscriberserver.go index 5af744a8..3191d5b8 100644 --- a/service/web/subscriberserver.go +++ b/service/web/subscriberserver.go @@ -1,8 +1,12 @@ package web import ( + "bytes" "context" + "fmt" + "net/http" + "github.com/gogo/protobuf/jsonpb" "github.com/imdario/mergo" "github.com/mennanov/fmutils" "github.com/opentracing/opentracing-go" @@ -11,23 +15,63 @@ import ( "github.com/razorpay/metro/internal/merror" "github.com/razorpay/metro/internal/project" "github.com/razorpay/metro/internal/subscription" + "github.com/razorpay/metro/internal/topic" + "github.com/razorpay/metro/pkg/httpclient" "github.com/razorpay/metro/pkg/logger" metrov1 "github.com/razorpay/metro/rpc/proto/v1" "github.com/razorpay/metro/service/web/stream" - "golang.org/x/sync/errgroup" "google.golang.org/protobuf/types/known/emptypb" ) +const ( + consumePlaneAddressFormat string = "https://%s-%d.%s" + consumePlaneDeployment string = "metro-consume-plane" + acknowledgeRequestFormat string = "%s/v1/%s:acknowledge" + fetchRequestFormat string = "%s/v1/%s:fetch" + modAckRequestFormat string = "%s/v1/%s:modifyAckDeadline" +) + type subscriberserver struct { projectCore project.ICore brokerStore brokerstore.IBrokerStore subscriptionCore subscription.ICore + topicCore topic.ICore credentialCore credentials.ICore - psm stream.IManager + replicaCount int + consumeNodes map[int]string + marshaler jsonpb.Marshaler + unmarshaler jsonpb.Unmarshaler + httpClient *http.Client } -func newSubscriberServer(projectCore project.ICore, brokerStore brokerstore.IBrokerStore, subscriptionCore subscription.ICore, credentialCore credentials.ICore, psm stream.IManager) *subscriberserver { - return &subscriberserver{projectCore, brokerStore, subscriptionCore, credentialCore, psm} +func newSubscriberServer( + projectCore project.ICore, + brokerStore brokerstore.IBrokerStore, + subscriptionCore subscription.ICore, + topicCore topic.ICore, + credentialCore credentials.ICore, + replicaCount int, + consumePlaneAddress string, + config *httpclient.Config, +) *subscriberserver { + consumeNodes := make(map[int]string, 0) + for i := 0; i < replicaCount; i++ { + consumeNodes[i] = fmt.Sprintf(consumePlaneAddress, consumePlaneDeployment, i, consumePlaneAddress) + } + marshaler := jsonpb.Marshaler{ + EnumsAsInts: false, + EmitDefaults: false, + Indent: "", + OrigName: false, + AnyResolver: nil, + } + + unmarshaler := jsonpb.Unmarshaler{ + AllowUnknownFields: true, + AnyResolver: nil, + } + httpClient := httpclient.NewClient(config) + return &subscriberserver{projectCore, brokerStore, subscriptionCore, topicCore, credentialCore, replicaCount, consumeNodes, marshaler, unmarshaler, httpClient} } // CreateSubscription to create a new subscription @@ -101,12 +145,43 @@ func (s subscriberserver) Acknowledge(ctx context.Context, req *metrov1.Acknowle logger.Ctx(ctx).Errorw("subscriberserver: error is parsing ack request", "request", req, "error", parseErr.Error()) return nil, merror.ToGRPCError(parseErr) } - - err := s.psm.Acknowledge(ctx, parsedReq) - if err != nil { - return nil, merror.ToGRPCError(err) + AckIdsByPartitions := make(map[int32][]string) + for _, ack := range parsedReq.AckMessages { + AckIdsByPartitions[ack.Partition] = append(AckIdsByPartitions[ack.Partition], ack.AckID) } + for part, msgs := range AckIdsByPartitions { + hash := s.subscriptionCore.FetchSubscriptionHash(ctx, req.Subscription, int(part)) + + if node, ok := s.consumeNodes[hash%s.replicaCount]; ok { + var b []byte + byteBuffer := bytes.NewBuffer(b) + body := &metrov1.AcknowledgeRequest{ + Subscription: req.Subscription, + AckIds: msgs, + } + err := s.marshaler.Marshal(byteBuffer, body) + if err != nil { + logger.Ctx(ctx).Errorw("subscriberserver: Failed to marshal upstream request", "error", err.Error(), "subscription", parsedReq.Subscription) + } + cpReq, err := http.NewRequestWithContext(ctx, "POST", fmt.Sprintf(acknowledgeRequestFormat, node, parsedReq.Subscription), byteBuffer) + + _, err = s.httpClient.Do(cpReq) + if err != nil { + logger.Ctx(ctx).Errorw("subscriberserver: Failed to send request to consume plane", "error", err.Error()) + } + + } else { + logger.Ctx(ctx).Errorw("subscriberserver: Failed to fetch consume plane for partition", + "subscription", parsedReq.Subscription, + "partition", part, + "hash", hash, + "replicaCount", s.replicaCount, + ) + continue + } + + } return new(emptypb.Empty), nil } @@ -118,73 +193,66 @@ func (s subscriberserver) Pull(ctx context.Context, req *metrov1.PullRequest) (* "subscription": req.Subscription, }) defer span.Finish() - - // non streaming pull not to be supported - /* - res, err := s.subscriberCore.Pull(ctx, &subscriber.PullRequest{req.Subscription, 0, 0}, 2, xid.New().String()) // TODO: fix - if err != nil { - logger.Ctx(ctx).Errorw("pull response errored", "msg", err.Error()) - return nil, merror.ToGRPCError(err) - } - return &metrov1.PullResponse{ReceivedMessages: res.ReceivedMessages}, nil - - */ - return &metrov1.PullResponse{}, nil -} - -// StreamingPull ... -func (s subscriberserver) StreamingPull(server metrov1.Subscriber_StreamingPullServer) error { - // TODO: check if the requested subscription is push based and handle it the way pubsub does - ctx := server.Context() - errGroup := new(errgroup.Group) - - // the first request reaching this server path would always be to establish a new stream. - // once established the active stream server instance will be held in pullstream and - // periodically polled for new requests - req, err := server.Recv() - if err != nil { - return err - } - - parsedReq, parseErr := stream.NewParsedStreamingPullRequest(req) + parsedReq, parseErr := stream.NewParsedPullRequest(req) if parseErr != nil { logger.Ctx(ctx).Errorw("subscriberserver: error is parsing pull request", "request", req, "error", parseErr.Error()) - return nil + return &metrov1.PullResponse{}, parseErr } - // request to init a new stream - if parsedReq.HasSubscription() { - err := s.psm.CreateNewStream(server, parsedReq, errGroup) + // Restricting to one partition till dynamic re-partitioning is in place. + // This helps us avoid two lookups for now (subscription lookup, topic lookup) + hash := s.subscriptionCore.FetchSubscriptionHash(ctx, req.Subscription, 0) + + if node, ok := s.consumeNodes[hash%s.replicaCount]; ok { + var b []byte + byteBuffer := bytes.NewBuffer(b) + body := &metrov1.FetchRequest{ + Subscription: parsedReq.Subscription, + Partition: 0, + MaxMessages: int32(parsedReq.MaxMessages), + } + err := s.marshaler.Marshal(byteBuffer, body) if err != nil { - return merror.ToGRPCError(err) + logger.Ctx(ctx).Errorw("subscriberserver: Failed to marshal upstream request", "error", err.Error(), "subscription", parsedReq.Subscription) } - } else { - return merror.New(merror.InvalidArgument, "subscription name empty").ToGRPCError() - } - // ack and modack here for the first time - // later it happens in stream handler - if parsedReq.HasModifyAcknowledgement() { - // request to modify acknowledgement deadlines - // Nack indicated by modifying the deadline to zero - // https://github.com/googleapis/google-cloud-go/blob/pubsub/v1.10.0/pubsub/iterator.go#L348 - err := s.psm.ModifyAcknowledgement(ctx, parsedReq) + cpReq, err := http.NewRequestWithContext(ctx, "POST", fmt.Sprintf(fetchRequestFormat, node, parsedReq.Subscription), byteBuffer) if err != nil { - return merror.ToGRPCError(err) + logger.Ctx(ctx).Errorw("subscriberserver: Failed to create request object for consume plane", "error", err.Error(), "url", fmt.Sprintf(fetchRequestFormat, node, parsedReq.Subscription)) } - } - if parsedReq.HasAcknowledgement() { - // request to acknowledge existing messages - err := s.psm.Acknowledge(ctx, parsedReq) + cpRes, err := s.httpClient.Do(cpReq) if err != nil { - return merror.ToGRPCError(err) + logger.Ctx(ctx).Errorw("subscriberserver: Failed to send request to consume plane", "error", err.Error(), "url", fmt.Sprintf(fetchRequestFormat, node, parsedReq.Subscription)) } + defer cpRes.Body.Close() + if cpRes.StatusCode == http.StatusOK { + resp := &metrov1.PullResponse{} + + err = s.unmarshaler.Unmarshal(cpRes.Body, resp) + if err != nil { + logger.Ctx(ctx).Errorw("subscriberserver: Failed to Unmarshal response", "error", err.Error()) + } + return resp, nil + + } + } else { + logger.Ctx(ctx).Errorw("subscriberserver: Failed to fetch consume plane for partition", + "subscription", parsedReq.Subscription, + "hash", hash, + "replicaCount", s.replicaCount, + ) + } - if err := errGroup.Wait(); err != nil { - return err - } - return nil + + return &metrov1.PullResponse{}, nil +} + +// StreamingPull ... +func (s subscriberserver) StreamingPull(server metrov1.Subscriber_StreamingPullServer) error { + // TODO: Implement streaming pull with state management + + return http.ErrNotSupported } // DeleteSubscription deletes a subscription @@ -222,9 +290,42 @@ func (s subscriberserver) ModifyAckDeadline(ctx context.Context, req *metrov1.Mo logger.Ctx(ctx).Errorw("subscriberserver: error is parsing modack request", "request", req, "error", parseErr.Error()) return nil, merror.ToGRPCError(parseErr) } - err := s.psm.ModifyAcknowledgement(ctx, parsedReq) - if err != nil { - return nil, merror.ToGRPCError(err) + AckIdsByPartitions := make(map[int32][]string) + for _, ack := range parsedReq.AckMessages { + AckIdsByPartitions[ack.Partition] = append(AckIdsByPartitions[ack.Partition], ack.AckID) + } + + for part, msgs := range AckIdsByPartitions { + hash := s.subscriptionCore.FetchSubscriptionHash(ctx, parsedReq.Subscription, int(part)) + if node, ok := s.consumeNodes[hash%s.replicaCount]; ok { + var b []byte + byteBuffer := bytes.NewBuffer(b) + body := &metrov1.ModifyAckDeadlineRequest{ + Subscription: req.Subscription, + AckIds: msgs, + AckDeadlineSeconds: req.AckDeadlineSeconds, + } + err := s.marshaler.Marshal(byteBuffer, body) + if err != nil { + logger.Ctx(ctx).Errorw("subscriberserver: Failed to marshal upstream request", "error", err.Error(), "subscription", parsedReq.Subscription) + } + cpReq, err := http.NewRequestWithContext(ctx, "POST", fmt.Sprintf(modAckRequestFormat, node, parsedReq.Subscription), byteBuffer) + + _, err = s.httpClient.Do(cpReq) + if err != nil { + logger.Ctx(ctx).Errorw("subscriberserver: Failed to send request to consume plane", "error", err.Error()) + } + + } else { + logger.Ctx(ctx).Errorw("subscriberserver: Failed to fetch consume plane for partition", + "subscription", parsedReq.Subscription, + "partition", part, + "hash", hash, + "replicaCount", s.replicaCount, + ) + continue + } + } return new(emptypb.Empty), nil }