From 4954b31d415279d1a2544927d7476fedcfb11c47 Mon Sep 17 00:00:00 2001 From: atauov Date: Tue, 9 Dec 2025 11:01:58 +0500 Subject: [PATCH 1/8] ph1 --- statefun/cache/cache.go | 101 ++++++++++--- statefun/domain.go | 83 +++++++++- statefun/wal.go | 325 ++++++++++++++++++++++++++++++++++++++++ 3 files changed, 481 insertions(+), 28 deletions(-) create mode 100644 statefun/wal.go diff --git a/statefun/cache/cache.go b/statefun/cache/cache.go index 57c971c..ba1c525 100644 --- a/statefun/cache/cache.go +++ b/statefun/cache/cache.go @@ -295,6 +295,8 @@ type Store struct { transactionsMutex *sync.Mutex getKeysByPatternFromKVMutex *sync.Mutex + transactionGenerator TransactionGenerator + //write barrier state backupBarrierTimestamp int64 backupBarrierStatus int32 // 0=unlocked, 1=locking, 2=locked @@ -303,6 +305,7 @@ type Store struct { } func NewCacheStore(ctx context.Context, cacheConfig *Config, js nats.JetStreamContext, kv nats.KeyValue) *Store { + le := lg.GetLogger() var inited atomic.Bool initChan := make(chan bool) cs := Store{ @@ -364,7 +367,7 @@ func NewCacheStore(ctx context.Context, cacheConfig *Config, js nats.JetStreamCo if json, ok := easyjson.JSONFromBytes(valueBytes[9:]); ok { cs.SetValueJSON(key, &json, false, kvRecordTime, "") } else { - lg.Logf(lg.ErrorLevel, "Failed to parse JSON for key=%s", key) + le.Errorf(ctx, "Failed to parse JSON for key=%s", key) } default: // Someone else (other module) deleted a key from the cache @@ -403,7 +406,7 @@ func NewCacheStore(ctx context.Context, cacheConfig *Config, js nats.JetStreamCo // Deletion notify - omitting cause value must already be deleted from the cache } else { //lg.Logf("---CACHE_KV !T!F: %s", key) - lg.Logf(lg.ErrorLevel, "storeUpdatesHandler: received value without time and append flag!") + le.Error(ctx, "storeUpdatesHandler: received value without time and append flag!") } } else { if inited.CompareAndSwap(false, true) { @@ -414,15 +417,14 @@ func NewCacheStore(ctx context.Context, cacheConfig *Config, js nats.JetStreamCo } system.MsgOnErrorReturn(w.Stop()) } else { - lg.Logf(lg.ErrorLevel, "storeUpdatesHandler kv.Watch error %s", err) + le.Errorf(ctx, "storeUpdatesHandler kv.Watch error %s", err) } } - kvLazyWriter := func(cs *Store) { + kvLazyWriterWithWAL := func(cs *Store) { system.GlobalPrometrics.GetRoutinesCounter().Started("cache.kvLazyWriter") defer system.GlobalPrometrics.GetRoutinesCounter().Stopped("cache.kvLazyWriter") shutdownStatus := shutdownStatusNone for { - //start := time.Now() iterationSyncedCount := 0 traverseCount := 0 @@ -430,11 +432,11 @@ func NewCacheStore(ctx context.Context, cacheConfig *Config, js nats.JetStreamCo case <-cs.ctx.Done(): switch shutdownStatus { case shutdownStatusNone: - lg.GetLogger().Debugf(ctx, "cache got shutdown sinal") + le.Debugf(ctx, "cache got shutdown sinal") shutdownStatus = shutdownStatusWaiting case shutdownStatusWaiting: case shutdownStatusReady: - lg.GetLogger().Debugf(ctx, "cache synced, ready for shutdown") + le.Debugf(ctx, "cache synced, ready for shutdown") close(cs.Synced) return } @@ -445,11 +447,17 @@ func NewCacheStore(ctx context.Context, cacheConfig *Config, js nats.JetStreamCo if backupBarrierTimestamp == 0 { backupBarrierTimestamp = system.GetCurrentTimeNs() system.MsgOnErrorReturn(cs.updateBackupBarrierWithTimestamp(backupBarrierTimestamp)) - lg.Logf(lg.InfoLevel, "set barrier timestamp: %d", backupBarrierTimestamp) + le.Infof(ctx, "set barrier timestamp: %d", backupBarrierTimestamp) } } allBeforeBackupBarrierSynced := true + txID := "" + if cs.transactionGenerator != nil { + txID = cs.transactionGenerator.GenerateTransactionID() + } + opsCount := 0 + cacheStoreValueStack := []*StoreValue{cs.rootValue} suffixPathsStack := []string{""} depthsStack := []int{0} @@ -498,6 +506,7 @@ func NewCacheStore(ctx context.Context, cacheConfig *Config, js nats.JetStreamCo } var valueUpdateTime int64 = 0 + var opType OpType csvChild.Lock("kvLazyWriter") if csvChild.syncNeeded { valueUpdateTime = csvChild.valueUpdateTime @@ -514,14 +523,16 @@ func NewCacheStore(ctx context.Context, cacheConfig *Config, js nats.JetStreamCo flag = FlagJSONAppend dataBytes = csvChild.value.(*easyjson.JSON).ToBytes() default: - lg.Logf(lg.ErrorLevel, "Unknown type for key=%s, value=%v", newSuffix, csvChild.value) + le.Errorf(ctx, "Unknown type for key=%s, value=%v", newSuffix, csvChild.value) csvChild.Unlock("kvLazyWriter") return true } header := append(timeBytes, flag) finalBytes = append(header, dataBytes...) + opType = OpTypePUT } else { finalBytes = append(timeBytes, FlagDeleted) // Add delete flag "0" + opType = OpTypeDelete } } else { if csvChild.valueUpdateTime > 0 && csvChild.valueUpdateTime <= cs.lruTresholdTime && csvChild.purgeState == 0 { // Older than or equal to specific time @@ -535,27 +546,39 @@ func NewCacheStore(ctx context.Context, cacheConfig *Config, js nats.JetStreamCo } csvChild.Unlock("kvLazyWriter") - // Putting value into KV store ------------------ + // Putting value into WAL stream ------------------ if csvChild.syncNeeded { iterationSyncedCount++ keyStr := key.(string) - ///_, putErr := kv.Put(cs.toStoreKey(newSuffix), finalBytes) if err := cs.checkBackupBarrierInfoBeforeWrite(valueUpdateTime); err != nil { - lg.Logf(lg.TraceLevel, "==============skipping write for key=%s due to barrier: %v", keyStr, err) + le.Tracef(ctx, "==============skipping write for key=%s due to barrier: %v", keyStr, err) return true } - _, putErr := customNatsKv.KVPut(cs.js, kv, cs.toStoreKey(newSuffix), finalBytes) - if putErr == nil { - csvChild.Lock("kvLazyWriter") - if valueUpdateTime == csvChild.valueUpdateTime { - csvChild.syncNeeded = false + if cs.transactionGenerator != nil { + if err := cs.transactionGenerator.PublishOperation(txID, valueUpdateTime, opType, cs.toStoreKey(newSuffix), finalBytes); err != nil { + le.Errorf(ctx, "Store kvLazyWriter cannot publish WAL operation for key=%s: %s", keyStr, err) + } else { + opsCount++ + csvChild.Lock("kvLazyWriter") + if valueUpdateTime == csvChild.valueUpdateTime { + csvChild.syncNeeded = false + } + csvChild.Unlock("kvLazyWriter") } - csvChild.Unlock("kvLazyWriter") } else { - lg.Logf(lg.ErrorLevel, "Store kvLazyWriter cannot update key=%s\n: %s", keyStr, putErr) + _, putErr := customNatsKv.KVPut(cs.js, cs.kv, cs.toStoreKey(newSuffix), finalBytes) + if putErr == nil { + csvChild.Lock("kvLazyWriter") + if valueUpdateTime == csvChild.valueUpdateTime { + csvChild.syncNeeded = false + } + csvChild.Unlock("kvLazyWriter") + } else { + le.Errorf(ctx, "Store kvLazyWriter cannot update key=%s\n: %s", keyStr, putErr) + } } } // ---------------------------------------------- @@ -580,9 +603,23 @@ func NewCacheStore(ctx context.Context, cacheConfig *Config, js nats.JetStreamCo } } - if backupBarrierStatus == BackupBarrierStatusLocking && allBeforeBackupBarrierSynced { - cs.markCacheReadyForBackup() + if txID != "" && opsCount > 0 { + le.Infof(ctx, "Transaction %s: traversed=%d nodes, opsCount=%d", txID, traverseCount, opsCount) + } + + if cs.transactionGenerator != nil && opsCount > 0 { + le.Infof(ctx, "Publishing WAL commit for tx=%s with %d operations", txID, opsCount) + if err := cs.transactionGenerator.PublishCommit(txID); err != nil { + le.Errorf(ctx, "Store kvLazyWriter cannot publish WAL commit for tx=%s: %s", txID, err) + } } + + if backupBarrierStatus == BackupBarrierStatusLocking { + if allBeforeBackupBarrierSynced { + cs.markCacheReadyForBackup() + } + } + if shutdownStatus == shutdownStatusWaiting && iterationSyncedCount == 0 { shutdownStatus = shutdownStatusReady } @@ -628,7 +665,7 @@ func NewCacheStore(ctx context.Context, cacheConfig *Config, js nats.JetStreamCo } go storeUpdatesHandler(&cs) - go kvLazyWriter(&cs) + go kvLazyWriterWithWAL(&cs) <-initChan return &cs } @@ -1266,3 +1303,23 @@ func (cs *Store) toStoreKey(key string) string { func (cs *Store) fromStoreKey(key string) string { return strings.Replace(key, cs.cacheConfig.kvStorePrefix+".", "", 1) } + +// WAL +type OpType = string + +const ( + OpTypePUT = "PUT" + OpTypeDelete = "DELETE" +) + +const ConsistencyKey = "__kv_consistent__" + +type TransactionGenerator interface { + PublishOperation(txID string, opTime int64, opType OpType, key string, value []byte) error + PublishCommit(txID string) error + GenerateTransactionID() string +} + +func (cs *Store) SetTransactionGenerator(tg TransactionGenerator) { + cs.transactionGenerator = tg +} diff --git a/statefun/domain.go b/statefun/domain.go index a74da36..f3f901e 100644 --- a/statefun/domain.go +++ b/statefun/domain.go @@ -278,9 +278,7 @@ func (dm *Domain) start(ctx context.Context, cacheConfig *cache.Config, createDo } kvExists = true } - if !kvExists { - return fmt.Errorf("Nats KV was not inited") - } + // -------------------------------------------------------------- if createDomainRouters { @@ -307,11 +305,52 @@ func (dm *Domain) start(ctx context.Context, cacheConfig *cache.Config, createDo if err := dm.createTraceStream(); err != nil { return err } + if err := dm.createWALOperationsStream(); err != nil { + return err + } + if err := dm.createWALCommitsStream(); err != nil { + return err + } + } + + le := lg.GetLogger() + + le.Debug(ctx, "Starting Transaction Committer...") + if err := dm.TransactionCommitter(ctx); err != nil { + return err + } + + le.Debug(ctx, "Waiting for KV to become consistent...") + startTime := time.Now() + lastLogTime := startTime + logInterval := 5 * time.Second + + for { + consistent, err := dm.isKVConsistent() + if err != nil { + le.Errorf(ctx, "Failed to check KV consistency: %s", err) + time.Sleep(1 * time.Second) + continue + } + if consistent { + elapsed := time.Since(startTime) + le.Debugf(ctx, "KV is consistent (waited %v), proceeding with cache initialization", elapsed.Round(time.Millisecond)) + break + } + + if time.Since(lastLogTime) >= logInterval { + elapsed := time.Since(startTime) + le.Debugf(ctx, "Still waiting for KV consistency... (elapsed: %v)", elapsed.Round(time.Second)) + lastLogTime = time.Now() + } + + time.Sleep(100 * time.Millisecond) } - lg.Logln(lg.TraceLevel, "Initializing the cache store...") + le.Trace(ctx, "Initializing the cache store...") dm.cache = cache.NewCacheStore(ctx, cacheConfig, dm.js, dm.kv) - lg.Logln(lg.TraceLevel, "Cache store inited!") + dm.cache.SetTransactionGenerator(dm) + le.Trace(ctx, "Cache store inited!") return nil } @@ -410,6 +449,26 @@ func (dm *Domain) createTraceStream() error { return dm.createStreamIfNotExists(sc) } +func (dm *Domain) createWALOperationsStream() error { + sc := &nats.StreamConfig{ + Name: WALOperationsStreamName, + Subjects: []string{fmt.Sprintf("wal.ops.%s.>", dm.kv.Bucket())}, + Retention: nats.WorkQueuePolicy, + MaxAge: 24 * time.Hour, + } + return dm.createStreamIfNotExists(sc) +} + +func (dm *Domain) createWALCommitsStream() error { + sc := &nats.StreamConfig{ + Name: WALCommitsStreamName, + Subjects: []string{fmt.Sprintf(WALCommitsSubject, dm.kv.Bucket())}, + Retention: nats.WorkQueuePolicy, + MaxAge: 24 * time.Hour, + } + return dm.createStreamIfNotExists(sc) +} + func (dm *Domain) createStreamIfNotExists(sc *nats.StreamConfig) error { // Create streams if does not exist ------------------------------ /* Each stream contains a single subject (topic). @@ -475,7 +534,7 @@ func (dm *Domain) createRouter(sourceStreamName string, subject string, tsc targ case domainEgressStreamName: // Default logic - infinite republishing case domainIngressStreamName: - // Send message to DLQ without retryAdd commentMore actions + // Send message to DLQ without retry if err == nil { lg.Logf(lg.DebugLevel, "Domain (domain=%s) router with sourceStreamName=%s republished message to DLQ", dm.name, sourceStreamName) system.MsgOnErrorReturn(msg.Ack()) @@ -515,3 +574,15 @@ func dlqMsgBuilder(subject, stream, domain, errorMsg string, data []byte) *nats. return dlqMsg } + +func (dm *Domain) PublishOperation(txID string, opTime int64, opType cache.OpType, key string, value []byte) error { + return dm.publishWALOperation(txID, opTime, opType, key, value) +} + +func (dm *Domain) PublishCommit(txID string) error { + return dm.publishWALCommit(txID) +} + +func (dm *Domain) GenerateTransactionID() string { + return GenerateTransactionID() +} diff --git a/statefun/wal.go b/statefun/wal.go new file mode 100644 index 0000000..1b56c26 --- /dev/null +++ b/statefun/wal.go @@ -0,0 +1,325 @@ +package statefun + +import ( + "context" + "errors" + "fmt" + "strconv" + "strings" + "sync" + "time" + + customNatsKv "github.com/foliagecp/sdk/embedded/nats/kv" + "github.com/foliagecp/sdk/statefun/cache" + lg "github.com/foliagecp/sdk/statefun/logger" + "github.com/foliagecp/sdk/statefun/system" + "github.com/nats-io/nats.go" +) + +const ( + WALOperationsStreamName = "wal_operations" + WALCommitsStreamName = "wal_commits" + WALOperationsSubject = "wal.ops.%s.%s" + WALCommitsSubject = "wal.commits.%s" + CommitterDurableName = "TRANSACTION_COMMITTER_CONSUMER" +) + +func (dm *Domain) TransactionCommitter(ctx context.Context) error { + go func() { + if err := dm.runTransactionCommitter(ctx); err != nil { + lg.GetLogger().Debugf(ctx, "TransactionCommitter error: %s", err) + } + }() + return nil +} + +func (dm *Domain) runTransactionCommitter(ctx context.Context) error { + commitSubject := fmt.Sprintf(WALCommitsSubject, dm.kv.Bucket()) + consumerName := CommitterDurableName + "-" + dm.kv.Bucket() + + lg.GetLogger().Debugf(ctx, "TransactionCommitter starting for bucket=%s, subject=%s", dm.kv.Bucket(), commitSubject) + + if err := dm.setKVConsistent(false); err != nil { + lg.GetLogger().Debugf(ctx, "Failed to set KV as inconsistent: %s", err) + return err + } + lg.GetLogger().Debugf(ctx, "KV marked as inconsistent, will process pending transactions") + + processedTxs := sync.Map{} + + _, err := dm.js.AddConsumer(WALCommitsStreamName, &nats.ConsumerConfig{ + Name: consumerName, + Durable: consumerName, + DeliverSubject: consumerName, + DeliverGroup: consumerName + "-group", + FilterSubject: commitSubject, + AckPolicy: nats.AckExplicitPolicy, + AckWait: 30 * time.Second, + MaxDeliver: 5, + }) + if err != nil && !errors.Is(err, nats.ErrConsumerNameAlreadyInUse) { + lg.GetLogger().Debugf(ctx, "TransactionCommitter failed to create consumer: %s", err) + return err + } + + lg.GetLogger().Debugf(ctx, "TransactionCommitter consumer created/exists: %s", consumerName) + + pendingCount := dm.countPendingCommits(consumerName) + lg.GetLogger().Debugf(ctx, "Found %d pending transactions to process", pendingCount) + + if pendingCount == 0 { + lg.GetLogger().Debugf(ctx, "No pending transactions, marking KV as consistent immediately") + if err = dm.setKVConsistent(true); err != nil { + lg.GetLogger().Debugf(ctx, "Failed to set KV as consistent: %s", err) + return err + } + } + + processedCount := 0 + processingTxs := sync.Map{} + + _, err = dm.js.QueueSubscribe( + commitSubject, + consumerName+"-group", + func(msg *nats.Msg) { + txID := msg.Header.Get("tx_id") + if txID == "" { + lg.GetLogger().Debugf(ctx, "TransactionCommitter: received commit without tx_id") + system.MsgOnErrorReturn(msg.Ack()) + return + } + + if _, processing := processingTxs.LoadOrStore(txID, true); processing { + lg.GetLogger().Debugf(ctx, "TransactionCommitter: transaction %s is already being processed, ignoring duplicate", txID) + return + } + defer processingTxs.Delete(txID) + + if _, processed := processedTxs.Load(txID); processed { + lg.GetLogger().Debugf(ctx, "TransactionCommitter: transaction %s already processed, Ack() final", txID) + system.MsgOnErrorReturn(msg.Ack()) + processedTxs.Delete(txID) + + processedCount++ + if pendingCount > 0 && processedCount >= pendingCount { + lg.GetLogger().Debugf(ctx, "All %d pending transactions processed, marking KV as consistent", pendingCount) + if err := dm.setKVConsistent(true); err != nil { + lg.GetLogger().Debugf(ctx, "Failed to set KV as consistent: %s", err) + } + pendingCount = 0 + } + + return + } + + lg.GetLogger().Debugf(ctx, "TransactionCommitter: first time processing commit for tx_id=%s, Nak()", txID) + system.MsgOnErrorReturn(msg.Nak()) + + if err = dm.applyTransactionOperations(ctx, txID); err != nil { + lg.GetLogger().Debugf(ctx, "TransactionCommitter: failed to apply transaction %s: %s", txID, err) + return + } + + lg.GetLogger().Debugf(ctx, "TransactionCommitter: successfully applied transaction %s, marked as processed", txID) + processedTxs.Store(txID, true) + }, + nats.Bind(WALCommitsStreamName, consumerName), + nats.ManualAck(), + ) + + if err != nil { + return err + } + + <-ctx.Done() + return nil +} + +func (dm *Domain) applyTransactionOperations(ctx context.Context, txID string) error { + opsSubject := fmt.Sprintf(WALOperationsSubject, dm.kv.Bucket(), txID) + consumerName := "TX_OPS_" + dm.kv.Bucket() + "_" + txID + + lg.GetLogger().Debugf(ctx, "applyTransactionOperations: processing tx_id=%s, subject=%s", txID, opsSubject) + + info, err := dm.js.ConsumerInfo(WALOperationsStreamName, consumerName) + if err != nil || info == nil { + lg.GetLogger().Debugf(ctx, "Consumer %s not found, creating new one", consumerName) + + consumerConfig := &nats.ConsumerConfig{ + Name: consumerName, + Durable: consumerName, + FilterSubject: opsSubject, + AckPolicy: nats.AckExplicitPolicy, + AckWait: 10 * time.Second, + MaxDeliver: 3, + } + + if _, err = dm.js.AddConsumer(WALOperationsStreamName, consumerConfig); err != nil { + lg.GetLogger().Debugf(ctx, "Failed to create consumer: %s", err) + return fmt.Errorf("failed to create operations consumer: %w", err) + } + lg.GetLogger().Debugf(ctx, "Created durable consumer %s", consumerName) + } else { + lg.GetLogger().Debugf(ctx, "Reusing existing consumer %s", consumerName) + } + + sub, err := dm.js.PullSubscribe(opsSubject, consumerName) + if err != nil { + lg.GetLogger().Debugf(ctx, "Failed to create subscription: %s", err) + return fmt.Errorf("failed to subscribe to operations: %w", err) + } + lg.GetLogger().Debugf(ctx, "Created subscription for consumer %s, checking if valid...", consumerName) + + if !sub.IsValid() { + lg.GetLogger().Debugf(ctx, "Subscription is INVALID immediately after creation!") + return fmt.Errorf("subscription invalid after creation") + } + lg.GetLogger().Debugf(ctx, "Subscription is valid, proceeding with Fetch") + + defer func() { + lg.GetLogger().Debugf(ctx, "Unsubscribing from consumer %s", consumerName) + system.MsgOnErrorReturn(sub.Unsubscribe()) + }() + + totalOps := 0 + + for { + lg.GetLogger().Debugf(ctx, "Attempting to Fetch up to 100 messages, subscription valid=%v", sub.IsValid()) + msgs, err := sub.Fetch(100, nats.MaxWait(1*time.Second)) + if err != nil { + if errors.Is(err, nats.ErrTimeout) { + lg.GetLogger().Debugf(ctx, "applyTransactionOperations: finished, processed %d operations for tx_id=%s", totalOps, txID) + break + } + return fmt.Errorf("failed to fetch operations: %w", err) + } + + if len(msgs) == 0 { + lg.GetLogger().Debugf(ctx, "applyTransactionOperations: no more messages, processed %d operations for tx_id=%s", totalOps, txID) + break + } + + lg.GetLogger().Debugf(ctx, "applyTransactionOperations: fetched %d operations for tx_id=%s", len(msgs), txID) + + for _, msg := range msgs { + opType := msg.Header.Get("op_type") + key := msg.Header.Get("key") + + if key == "" { + lg.GetLogger().Debugf(ctx, "Operation without key in transaction %s", txID) + system.MsgOnErrorReturn(msg.Ack()) + continue + } + + var kvErr error + switch opType { + case cache.OpTypePUT: + lg.GetLogger().Debugf(ctx, "Applying PUT for key=%s in tx=%s", key, txID) + _, kvErr = customNatsKv.KVPut(dm.js, dm.kv, key, msg.Data) + case cache.OpTypeDelete: + lg.GetLogger().Debugf(ctx, "Applying DELETE for key=%s in tx=%s", key, txID) + kvErr = customNatsKv.KVDelete(dm.js, dm.kv, key) + if kvErr != nil { + errMsg := kvErr.Error() + if errors.Is(kvErr, nats.ErrKeyNotFound) || + strings.Contains(errMsg, "message not found") || + strings.Contains(errMsg, "key not found") { + lg.GetLogger().Debugf(ctx, "Key %s already deleted, ignoring error: %s", key, errMsg) + kvErr = nil + } + } + default: + lg.GetLogger().Debugf(ctx, "Unknown operation type %s in transaction %s", opType, txID) + system.MsgOnErrorReturn(msg.Ack()) + continue + } + + if kvErr != nil { + lg.GetLogger().Debugf(ctx, "Failed to apply operation to KV: %s", kvErr) + system.MsgOnErrorReturn(msg.Nak()) + return fmt.Errorf("failed to apply operation: %w", kvErr) + } + + totalOps++ + system.MsgOnErrorReturn(msg.Ack()) + } + } + + lg.GetLogger().Debugf(ctx, "applyTransactionOperations: all operations processed, deleting consumer %s", consumerName) + system.MsgOnErrorReturn(dm.js.DeleteConsumer(WALOperationsStreamName, consumerName)) + + return nil +} + +func (dm *Domain) setKVConsistent(consistent bool) error { + value := []byte("false") + if consistent { + value = []byte("true") + } + + _, err := customNatsKv.KVPut(dm.js, dm.kv, cache.ConsistencyKey, value) + return err +} + +func (dm *Domain) isKVConsistent() (bool, error) { + entry, err := dm.kv.Get(cache.ConsistencyKey) + if err != nil { + if errors.Is(err, nats.ErrKeyNotFound) { + return false, nil + } + return false, err + } + + return string(entry.Value()) == "true", nil +} + +func (dm *Domain) countPendingCommits(consumerName string) int { + info, err := dm.js.ConsumerInfo(WALCommitsStreamName, consumerName) + if err != nil { + lg.GetLogger().Debugf(context.TODO(), "Failed to get consumer info: %s", err) + return 0 + } + + pending := int(info.NumPending) + lg.GetLogger().Debugf(context.TODO(), "Consumer %s has %d pending messages", consumerName, pending) + return pending +} + +func GenerateTransactionID() string { + return fmt.Sprintf("%d", time.Now().UnixNano()) +} + +func (dm *Domain) publishWALOperation(txID string, opTime int64, opType cache.OpType, key string, value []byte) error { + subject := fmt.Sprintf(WALOperationsSubject, dm.kv.Bucket(), txID) + + msg := nats.NewMsg(subject) + msg.Header.Set("tx_id", txID) + msg.Header.Set("op_time", strconv.FormatInt(opTime, 10)) + msg.Header.Set("op_type", opType) + msg.Header.Set("key", key) + msg.Data = value + + lg.GetLogger().Debugf(context.TODO(), "::::::::::Publishing WAL operation: tx=%s, type=%s, key=%s, subject=%s", txID, opType, key, subject) + + if _, err := dm.js.PublishMsg(msg); err != nil { + lg.GetLogger().Errorf(context.TODO(), "Failed to publish WAL operation: %s", err) + return err + } + return nil +} + +func (dm *Domain) publishWALCommit(txID string) error { + subject := fmt.Sprintf(WALCommitsSubject, dm.kv.Bucket()) + + msg := nats.NewMsg(subject) + msg.Header.Set("tx_id", txID) + msg.Header.Set("commit_time", strconv.FormatInt(time.Now().UnixNano(), 10)) + + lg.GetLogger().Debugf(context.TODO(), "::::::::::Publishing WAL commit: tx=%s, subject=%s", txID, subject) + + if _, err := dm.js.PublishMsg(msg); err != nil { + lg.GetLogger().Errorf(context.TODO(), "::::::::::Failed to publish WAL commit: %s", err) + return err + } + return nil +} From 687b08e03d85253fecdddf4e88d20623266f8c91 Mon Sep 17 00:00:00 2001 From: atauov Date: Mon, 15 Dec 2025 16:09:26 +0500 Subject: [PATCH 2/8] change Nak()->InProgress() | each runtime is running TransactionCommitter phase 1 WAL transactions --- statefun/cache/cache.go | 443 +++++++++++++++++---------------- statefun/cache/cache_config.go | 3 + statefun/domain.go | 127 +++++++--- statefun/runtime.go | 108 +++++--- statefun/wal.go | 182 ++++++++------ 5 files changed, 507 insertions(+), 356 deletions(-) diff --git a/statefun/cache/cache.go b/statefun/cache/cache.go index ba1c525..854e180 100644 --- a/statefun/cache/cache.go +++ b/statefun/cache/cache.go @@ -304,6 +304,170 @@ type Store struct { Synced chan struct{} } +type traverseResult struct { + opsCount int + traverseCount int + iterationSyncedCount int + lruTimes []int64 + allBeforeBackupBarrierSynced bool +} + +func (cs *Store) traverseCacheForTransaction( + txID string, + barrierTime int64, + ignoreBarrier bool, +) *traverseResult { + le := lg.GetLogger() + result := &traverseResult{ + opsCount: 0, + traverseCount: 0, + iterationSyncedCount: 0, + lruTimes: []int64{}, + allBeforeBackupBarrierSynced: true, + } + + var backupBarrierTimestamp int64 + var backupBarrierStatus int32 + if !ignoreBarrier { + backupBarrierTimestamp, backupBarrierStatus = cs.getBackupBarrierState() + } + + cacheStoreValueStack := []*StoreValue{cs.rootValue} + suffixPathsStack := []string{""} + depthsStack := []int{0} + + for len(cacheStoreValueStack) > 0 { + lastID := len(cacheStoreValueStack) - 1 + + currentStoreValue := cacheStoreValueStack[lastID] + result.traverseCount++ + + currentStoreValue.RLock("kvLazyWriter") + result.lruTimes = append(result.lruTimes, currentStoreValue.valueUpdateTime) + currentStoreValue.RUnlock("kvLazyWriter") + + currentSuffix := suffixPathsStack[lastID] + currentDepth := depthsStack[lastID] + + cacheStoreValueStack = cacheStoreValueStack[:lastID] + suffixPathsStack = suffixPathsStack[:lastID] + depthsStack = depthsStack[:lastID] + + noChildren := true + currentStoreValue.Range(func(key, value interface{}) bool { + noChildren = false + + var newSuffix string + if currentDepth == 0 { + newSuffix = currentSuffix + key.(string) + } else { + newSuffix = currentSuffix + "." + key.(string) + } + + csvChild := value.(*StoreValue) + + if backupBarrierStatus == BackupBarrierStatusLocking { + ve, vut, sn, sw := csvChild.syncState() + if ve && vut > 0 && vut <= backupBarrierTimestamp { + if sn || !sw { + result.allBeforeBackupBarrierSynced = false + } + } + } + + var valueUpdateTime int64 = 0 + var opType OpType + var finalBytes []byte = nil + + csvChild.Lock("kvLazyWriter") + if csvChild.syncNeeded { + valueUpdateTime = csvChild.valueUpdateTime + timeBytes := make([]byte, 8) + binary.BigEndian.PutUint64(timeBytes, uint64(valueUpdateTime)) + if csvChild.valueExists { + var flag uint8 + var dataBytes []byte + switch csvChild.valueType { + case typeByteArray: + flag = FlagBytesAppend + dataBytes = csvChild.value.([]byte) + case typeJson: + flag = FlagJSONAppend + dataBytes = csvChild.value.(*easyjson.JSON).ToBytes() + default: + le.Errorf(cs.ctx, "Unknown type for key=%s, value=%v", newSuffix, csvChild.value) + csvChild.Unlock("kvLazyWriter") + cacheStoreValueStack = append(cacheStoreValueStack, value.(*StoreValue)) + suffixPathsStack = append(suffixPathsStack, newSuffix) + depthsStack = append(depthsStack, currentDepth+1) + return true + } + header := append(timeBytes, flag) + finalBytes = append(header, dataBytes...) + opType = OpTypePUT + } else { + finalBytes = append(timeBytes, FlagDeleted) + opType = OpTypeDelete + } + } else { + if csvChild.valueUpdateTime > 0 && csvChild.valueUpdateTime <= cs.lruTresholdTime && csvChild.purgeState == 0 { + currentStoreValue.ConsistencyLoss(system.GetCurrentTimeNs()) + csvChild.TryPurgeReady() + csvChild.TryPurgeConfirm() + } + } + csvChild.Unlock("kvLazyWriter") + + if finalBytes != nil { + result.iterationSyncedCount++ + keyStr := key.(string) + + if !ignoreBarrier { + if valueUpdateTime > barrierTime { + cacheStoreValueStack = append(cacheStoreValueStack, value.(*StoreValue)) + suffixPathsStack = append(suffixPathsStack, newSuffix) + depthsStack = append(depthsStack, currentDepth+1) + return true + } + + if err := cs.checkBackupBarrierInfoBeforeWrite(valueUpdateTime); err != nil { + le.Tracef(cs.ctx, "skipping write for key=%s due to barrier: %v", keyStr, err) + cacheStoreValueStack = append(cacheStoreValueStack, value.(*StoreValue)) + suffixPathsStack = append(suffixPathsStack, newSuffix) + depthsStack = append(depthsStack, currentDepth+1) + return true + } + } + + if err := cs.transactionGenerator.PublishOperation(txID, valueUpdateTime, opType, cs.toStoreKey(newSuffix), finalBytes); err != nil { + le.Errorf(cs.ctx, "Store kvLazyWriter cannot publish WAL operation for key=%s: %s", keyStr, err) + } else { + result.opsCount++ + csvChild.Lock("kvLazyWriter") + if valueUpdateTime == csvChild.valueUpdateTime { + csvChild.syncNeeded = false + } + csvChild.Unlock("kvLazyWriter") + } + } + + cacheStoreValueStack = append(cacheStoreValueStack, value.(*StoreValue)) + suffixPathsStack = append(suffixPathsStack, newSuffix) + depthsStack = append(depthsStack, currentDepth+1) + + time.Sleep(time.Duration(cs.cacheConfig.lazyWriterValueProcessDelayMkS) * time.Microsecond) + + return true + }) + + if noChildren { + currentStoreValue.collectGarbage() + } + } + + return result +} + func NewCacheStore(ctx context.Context, cacheConfig *Config, js nats.JetStreamContext, kv nats.KeyValue) *Store { le := lg.GetLogger() var inited atomic.Bool @@ -423,246 +587,94 @@ func NewCacheStore(ctx context.Context, cacheConfig *Config, js nats.JetStreamCo kvLazyWriterWithWAL := func(cs *Store) { system.GlobalPrometrics.GetRoutinesCounter().Started("cache.kvLazyWriter") defer system.GlobalPrometrics.GetRoutinesCounter().Stopped("cache.kvLazyWriter") + shutdownStatus := shutdownStatusNone for { - iterationSyncedCount := 0 - traverseCount := 0 - - select { - case <-cs.ctx.Done(): - switch shutdownStatus { - case shutdownStatusNone: - le.Debugf(ctx, "cache got shutdown sinal") - shutdownStatus = shutdownStatusWaiting - case shutdownStatusWaiting: - case shutdownStatusReady: - le.Debugf(ctx, "cache synced, ready for shutdown") - close(cs.Synced) - return - } - default: + if shutdownStatus == shutdownStatusReady { + le.Debugf(ctx, "cache synced, ready for shutdown") + close(cs.Synced) + return } - backupBarrierTimestamp, backupBarrierStatus := cs.getBackupBarrierState() - if backupBarrierStatus == BackupBarrierStatusLocking { - if backupBarrierTimestamp == 0 { - backupBarrierTimestamp = system.GetCurrentTimeNs() - system.MsgOnErrorReturn(cs.updateBackupBarrierWithTimestamp(backupBarrierTimestamp)) - le.Infof(ctx, "set barrier timestamp: %d", backupBarrierTimestamp) - } - } - allBeforeBackupBarrierSynced := true - txID := "" - if cs.transactionGenerator != nil { - txID = cs.transactionGenerator.GenerateTransactionID() + if cs.transactionGenerator == nil { + le.Debugf(ctx, "WAL is not ready, skip this iteration") + time.Sleep(100 * time.Millisecond) + continue } - opsCount := 0 - - cacheStoreValueStack := []*StoreValue{cs.rootValue} - suffixPathsStack := []string{""} - depthsStack := []int{0} - lruTimes := []int64{} + if shutdownStatus == shutdownStatusNone { + barrierTime := system.GetCurrentTimeNs() + txID := fmt.Sprintf("%d", barrierTime) - for len(cacheStoreValueStack) > 0 { - lastID := len(cacheStoreValueStack) - 1 - - currentStoreValue := cacheStoreValueStack[lastID] - traverseCount++ //Debug - - currentStoreValue.RLock("kvLazyWriter") - lruTimes = append(lruTimes, currentStoreValue.valueUpdateTime) - currentStoreValue.RUnlock("kvLazyWriter") - - currentSuffix := suffixPathsStack[lastID] - currentDepth := depthsStack[lastID] - - cacheStoreValueStack = cacheStoreValueStack[:lastID] - suffixPathsStack = suffixPathsStack[:lastID] - depthsStack = depthsStack[:lastID] - - noChildred := true - currentStoreValue.Range(func(key, value interface{}) bool { - noChildred = false - - var newSuffix string - if currentDepth == 0 { - newSuffix = currentSuffix + key.(string) - } else { - newSuffix = currentSuffix + "." + key.(string) - } - - var finalBytes []byte = nil - - csvChild := value.(*StoreValue) - - if backupBarrierStatus == BackupBarrierStatusLocking { - ve, vut, sn, sw := csvChild.syncState() - if ve && vut > 0 && vut <= backupBarrierTimestamp { - if sn || !sw { - allBeforeBackupBarrierSynced = false - } - } - } - - var valueUpdateTime int64 = 0 - var opType OpType - csvChild.Lock("kvLazyWriter") - if csvChild.syncNeeded { - valueUpdateTime = csvChild.valueUpdateTime - timeBytes := make([]byte, 8) - binary.BigEndian.PutUint64(timeBytes, uint64(valueUpdateTime)) - if csvChild.valueExists { - var flag uint8 - var dataBytes []byte - switch csvChild.valueType { - case typeByteArray: - flag = FlagBytesAppend - dataBytes = csvChild.value.([]byte) - case typeJson: - flag = FlagJSONAppend - dataBytes = csvChild.value.(*easyjson.JSON).ToBytes() - default: - le.Errorf(ctx, "Unknown type for key=%s, value=%v", newSuffix, csvChild.value) - csvChild.Unlock("kvLazyWriter") - return true - } - header := append(timeBytes, flag) - finalBytes = append(header, dataBytes...) - opType = OpTypePUT - } else { - finalBytes = append(timeBytes, FlagDeleted) // Add delete flag "0" - opType = OpTypeDelete - } - } else { - if csvChild.valueUpdateTime > 0 && csvChild.valueUpdateTime <= cs.lruTresholdTime && csvChild.purgeState == 0 { // Older than or equal to specific time - // currentStoreValue locked by range no locking/unlocking needed - currentStoreValue.ConsistencyLoss(system.GetCurrentTimeNs()) - //lg.Logf("Consistency lost for key=\"%s\" store", currentStoreValue.GetFullKeyString()) - //lg.Logln("Purging: " + newSuffix) - csvChild.TryPurgeReady() - csvChild.TryPurgeConfirm() - } - } - csvChild.Unlock("kvLazyWriter") - - // Putting value into WAL stream ------------------ - if csvChild.syncNeeded { - iterationSyncedCount++ - - keyStr := key.(string) - - if err := cs.checkBackupBarrierInfoBeforeWrite(valueUpdateTime); err != nil { - le.Tracef(ctx, "==============skipping write for key=%s due to barrier: %v", keyStr, err) - return true - } + waitTimer := time.NewTimer(time.Duration(cacheConfig.walTransactionWaitPeriodMS) * time.Millisecond) + select { + case <-cs.ctx.Done(): + waitTimer.Stop() + le.Debugf(ctx, "cache got shutdown signal") + shutdownStatus = shutdownStatusWaiting + continue + case <-waitTimer.C: + } - if cs.transactionGenerator != nil { - if err := cs.transactionGenerator.PublishOperation(txID, valueUpdateTime, opType, cs.toStoreKey(newSuffix), finalBytes); err != nil { - le.Errorf(ctx, "Store kvLazyWriter cannot publish WAL operation for key=%s: %s", keyStr, err) - } else { - opsCount++ - csvChild.Lock("kvLazyWriter") - if valueUpdateTime == csvChild.valueUpdateTime { - csvChild.syncNeeded = false - } - csvChild.Unlock("kvLazyWriter") - } - } else { - _, putErr := customNatsKv.KVPut(cs.js, cs.kv, cs.toStoreKey(newSuffix), finalBytes) - if putErr == nil { - csvChild.Lock("kvLazyWriter") - if valueUpdateTime == csvChild.valueUpdateTime { - csvChild.syncNeeded = false - } - csvChild.Unlock("kvLazyWriter") - } else { - le.Errorf(ctx, "Store kvLazyWriter cannot update key=%s\n: %s", keyStr, putErr) - } - } + backupBarrierTimestamp, backupBarrierStatus := cs.getBackupBarrierState() + if backupBarrierStatus == BackupBarrierStatusLocking { + if backupBarrierTimestamp == 0 { + backupBarrierTimestamp = system.GetCurrentTimeNs() + system.MsgOnErrorReturn(cs.updateBackupBarrierWithTimestamp(backupBarrierTimestamp)) + le.Infof(ctx, "set barrier timestamp: %d", backupBarrierTimestamp) } - // ---------------------------------------------- + } - cacheStoreValueStack = append(cacheStoreValueStack, value.(*StoreValue)) - suffixPathsStack = append(suffixPathsStack, newSuffix) - depthsStack = append(depthsStack, currentDepth+1) + result := cs.traverseCacheForTransaction(txID, barrierTime, false) - select { - case <-cs.ctx.Done(): - if shutdownStatus == shutdownStatusNone { - shutdownStatus = shutdownStatusWaiting - } - default: - time.Sleep(time.Duration(cacheConfig.lazyWriterValueProcessDelayMkS) * time.Microsecond) + if result.opsCount > 0 { + le.Tracef(ctx, "Transaction %s: traversed=%d nodes, opsCount=%d", txID, result.traverseCount, result.opsCount) + le.Tracef(ctx, "Publishing WAL commit for tx=%s with %d operations", txID, result.opsCount) + if err := cs.transactionGenerator.PublishCommit(txID); err != nil { + le.Errorf(ctx, "Store kvLazyWriter cannot publish WAL commit for tx=%s: %s", txID, err) } - return true - }) - - if noChildred { - currentStoreValue.collectGarbage() } - } - if txID != "" && opsCount > 0 { - le.Infof(ctx, "Transaction %s: traversed=%d nodes, opsCount=%d", txID, traverseCount, opsCount) - } - - if cs.transactionGenerator != nil && opsCount > 0 { - le.Infof(ctx, "Publishing WAL commit for tx=%s with %d operations", txID, opsCount) - if err := cs.transactionGenerator.PublishCommit(txID); err != nil { - le.Errorf(ctx, "Store kvLazyWriter cannot publish WAL commit for tx=%s: %s", txID, err) + if backupBarrierStatus == BackupBarrierStatusLocking { + if result.allBeforeBackupBarrierSynced { + cs.markCacheReadyForBackup() + } } - } - if backupBarrierStatus == BackupBarrierStatusLocking { - if allBeforeBackupBarrierSynced { - cs.markCacheReadyForBackup() + sort.Slice(result.lruTimes, func(i, j int) bool { return result.lruTimes[i] > result.lruTimes[j] }) + if len(result.lruTimes) > cacheConfig.lruSize { + cs.lruTresholdTime = result.lruTimes[cacheConfig.lruSize-1] + } else if len(result.lruTimes) > 0 { + cs.lruTresholdTime = result.lruTimes[len(result.lruTimes)-1] } - } - if shutdownStatus == shutdownStatusWaiting && iterationSyncedCount == 0 { - shutdownStatus = shutdownStatusReady - } - sort.Slice(lruTimes, func(i, j int) bool { return lruTimes[i] > lruTimes[j] }) - if len(lruTimes) > cacheConfig.lruSize { - cs.lruTresholdTime = lruTimes[cacheConfig.lruSize-1] - } else { - cs.lruTresholdTime = lruTimes[len(lruTimes)-1] - } + cs.valuesInCache = len(result.lruTimes) - /*// Debug info ----------------------------------------------------- - if cs.valuesInCache != len(lruTimes) { - cmpr := []bool{} - for i := 0; i < len(lruTimes); i++ { - cmpr = append(cmpr, lruTimes[i] > 0 && lruTimes[i] <= cs.lruTresholdTime) + if gaugeVec, err := system.GlobalPrometrics.EnsureGaugeVecSimple("cache_values", "", []string{"id"}); err == nil { + gaugeVec.With(prometheus.Labels{"id": cs.cacheConfig.id}).Set(float64(cs.valuesInCache)) } - lg.Logf("LEFT IN CACHE: %d (%d) - %s %s", len(lruTimes), cs.lruTresholdTime, fmt.Sprintln(cmpr), fmt.Sprintln(lruTimes)) + time.Sleep(time.Duration(cacheConfig.lazyWriterRepeatDelayMkS) * time.Microsecond) // Prevents too many locks and prevents too much processor time consumption } - // ----------------------------------------------------------------*/ - cs.valuesInCache = len(lruTimes) + if shutdownStatus == shutdownStatusWaiting { + txID := cs.transactionGenerator.GenerateTransactionID() + le.Debugf(ctx, "Final shutdown transaction %s - collecting all pending operations", txID) - if gaugeVec, err := system.GlobalPrometrics.EnsureGaugeVecSimple("cache_values", "", []string{"id"}); err == nil { - gaugeVec.With(prometheus.Labels{"id": cs.cacheConfig.id}).Set(float64(cs.valuesInCache)) - } - /*// Debug info ----------------------------------------------------- + result := cs.traverseCacheForTransaction(txID, 0, true) - iterationElapsed := time.Since(start) + if result.opsCount > 0 { + le.Debugf(ctx, "Final transaction %s: traversed=%d nodes, opsCount=%d", txID, result.traverseCount, result.opsCount) + le.Debugf(ctx, "Publishing final WAL commit for tx=%s with %d operations", txID, result.opsCount) + if err := cs.transactionGenerator.PublishCommit(txID); err != nil { + le.Errorf(ctx, "Store kvLazyWriter cannot publish final WAL commit for tx=%s: %s", txID, err) + } + } - if iterationSyncedCount > 0 { - speed := float64(iterationSyncedCount) / iterationElapsed.Seconds() - lg.GetLogger().Debugf(cs.ctx, - "::::::::::::::: Iteration: synced %d values (traversed %d nodes) in %v speed: %.2f writes/sec", - iterationSyncedCount, traverseCount, iterationElapsed, speed) - } else { - //lg.GetLogger().Debugf(cs.ctx, "::::::::::::::: Iteration: synced 0 values (traversed %d nodes) in %v", - // traverseCount, iterationElapsed) + if result.iterationSyncedCount == 0 { + shutdownStatus = shutdownStatusReady + } } - // ----------------------------------------------------------------*/ - - time.Sleep(time.Duration(cacheConfig.lazyWriterRepeatDelayMkS) * time.Microsecond) // Prevents too many locks and prevents too much processor time consumption } - } go storeUpdatesHandler(&cs) go kvLazyWriterWithWAL(&cs) @@ -1304,7 +1316,8 @@ func (cs *Store) fromStoreKey(key string) string { return strings.Replace(key, cs.cacheConfig.kvStorePrefix+".", "", 1) } -// WAL +// -------- WAL transactions --------- + type OpType = string const ( @@ -1323,3 +1336,5 @@ type TransactionGenerator interface { func (cs *Store) SetTransactionGenerator(tg TransactionGenerator) { cs.transactionGenerator = tg } + +// ----------------------------------- diff --git a/statefun/cache/cache_config.go b/statefun/cache/cache_config.go index f8812d1..009aae5 100644 --- a/statefun/cache/cache_config.go +++ b/statefun/cache/cache_config.go @@ -6,6 +6,7 @@ const ( LevelSubscriptionNotificationsBufferMaxSize = 30000 // ~16Mb: elemenets := 16 * 1024 * 1024 / (64 + 512), where 512 - avg value size, 64 - avg key size LazyWriterValueProcessDelayMkS = 500 LazyWriterRepeatDelayMkS = 100000 + WalTransactionWaitPeriodMS = 5000 ) type Config struct { @@ -15,6 +16,7 @@ type Config struct { levelSubscriptionNotificationsBufferMaxSize int lazyWriterValueProcessDelayMkS int lazyWriterRepeatDelayMkS int + walTransactionWaitPeriodMS int } func NewCacheConfig(id string) *Config { @@ -25,6 +27,7 @@ func NewCacheConfig(id string) *Config { levelSubscriptionNotificationsBufferMaxSize: LevelSubscriptionNotificationsBufferMaxSize, lazyWriterValueProcessDelayMkS: LazyWriterValueProcessDelayMkS, lazyWriterRepeatDelayMkS: LazyWriterRepeatDelayMkS, + walTransactionWaitPeriodMS: WalTransactionWaitPeriodMS, } } diff --git a/statefun/domain.go b/statefun/domain.go index 65edfdf..ed3e26c 100644 --- a/statefun/domain.go +++ b/statefun/domain.go @@ -59,6 +59,8 @@ type Domain struct { kv nats.KeyValue cache *cache.Store + + shutdown chan struct{} } type streamConfig struct { @@ -99,6 +101,7 @@ func NewDomain(nc *nats.Conn, js nats.JetStreamContext, desiredHubDomainName str sysSC: sysSC, kvSC: kvSC, traceSC: traceSC, + shutdown: make(chan struct{}), } return domain, nil @@ -317,44 +320,106 @@ func (dm *Domain) start(ctx context.Context, cacheConfig *cache.Config, createDo le := lg.GetLogger() - le.Debug(ctx, "Starting Transaction Committer...") + le.Trace(ctx, "Initializing the cache store...") + dm.cache = cache.NewCacheStore(ctx, cacheConfig, dm.js, dm.kv) + dm.shutdown = make(chan struct{}) + dm.cache.SetTransactionGenerator(dm) + if err := dm.TransactionCommitter(ctx); err != nil { return err } + if err := dm.checkKvConsistency(ctx); err != nil { + return err + } + le.Trace(ctx, "Cache store inited!") + + return nil +} + +func (dm *Domain) checkKvConsistency(ctx context.Context) error { + consumerName := CommitterDurableName + "-" + dm.kv.Bucket() + + const ( + checkInterval = 100 * time.Millisecond + waitTimeout = 30 * time.Second + ) + + timeout := time.NewTimer(waitTimeout) + defer timeout.Stop() - le.Debug(ctx, "Waiting for KV to become consistent...") - startTime := time.Now() - lastLogTime := startTime - logInterval := 5 * time.Second + ticker := time.NewTicker(checkInterval) + defer ticker.Stop() + + lg.GetLogger().Tracef(ctx, "Waiting for KV consistency ...") for { - consistent, err := dm.isKVConsistent() - if err != nil { - le.Errorf(ctx, "Failed to check KV consistency: %s", err) - time.Sleep(1 * time.Second) - continue - } - if consistent { - elapsed := time.Since(startTime) - le.Debugf(ctx, "KV is consistent (waited %v), proceeding with cache initialization", elapsed.Round(time.Millisecond)) - break - } + select { + case <-ctx.Done(): + return ctx.Err() + + case <-timeout.C: + return fmt.Errorf("timeout waiting for KV consistency") + + case <-ticker.C: + info, err := dm.js.ConsumerInfo(WALCommitsStreamName, consumerName) + if err != nil { + lg.GetLogger().Debugf(ctx, + "Failed to get consumer info: %s", err, + ) + continue + } - if time.Since(lastLogTime) >= logInterval { - elapsed := time.Since(startTime) - le.Debugf(ctx, "Still waiting for KV consistency... (elapsed: %v)", elapsed.Round(time.Second)) - lastLogTime = time.Now() - } + noPendingWork := + info.NumPending == 0 && + info.NumAckPending == 0 - time.Sleep(100 * time.Millisecond) - } + consistent, err := dm.isKVConsistent() + if err != nil { + lg.GetLogger().Errorf(ctx, + "Failed to check consistency: %s", err, + ) + continue + } - le.Trace(ctx, "Initializing the cache store...") - dm.cache = cache.NewCacheStore(ctx, cacheConfig, dm.js, dm.kv) - dm.cache.SetTransactionGenerator(dm) - le.Trace(ctx, "Cache store inited!") + if !noPendingWork { + lg.GetLogger().Tracef(ctx, + "Transactions in progress: pending=%d, ackPending=%d", + info.NumPending, info.NumAckPending, + ) + + if consistent { + if err := dm.setKVConsistent(false); err != nil { + return fmt.Errorf( + "failed to set KV inconsistent: %w", err, + ) + } + } - return nil + if !timeout.Stop() { + select { + case <-timeout.C: + default: + } + } + timeout.Reset(waitTimeout) + continue + } + + if !consistent { + lg.GetLogger().Tracef(ctx, + "No pending work, setting KV consistent", + ) + if err := dm.setKVConsistent(true); err != nil { + return fmt.Errorf( + "failed to set KV consistent: %w", err, + ) + } + } + + lg.GetLogger().Tracef(ctx, "KV is consistent") + return nil + } + } } func (dm *Domain) createIngressRouter() error { @@ -462,7 +527,7 @@ func (dm *Domain) createTraceStream() error { func (dm *Domain) createWALOperationsStream() error { sc := &nats.StreamConfig{ Name: WALOperationsStreamName, - Subjects: []string{fmt.Sprintf("wal.ops.%s.>", dm.kv.Bucket())}, + Subjects: []string{WALOperationsSubject}, Retention: nats.WorkQueuePolicy, MaxAge: 24 * time.Hour, } @@ -472,7 +537,7 @@ func (dm *Domain) createWALOperationsStream() error { func (dm *Domain) createWALCommitsStream() error { sc := &nats.StreamConfig{ Name: WALCommitsStreamName, - Subjects: []string{fmt.Sprintf(WALCommitsSubject, dm.kv.Bucket())}, + Subjects: []string{WALCommitsSubject}, Retention: nats.WorkQueuePolicy, MaxAge: 24 * time.Hour, } @@ -594,5 +659,5 @@ func (dm *Domain) PublishCommit(txID string) error { } func (dm *Domain) GenerateTransactionID() string { - return GenerateTransactionID() + return generateTransactionID() } diff --git a/statefun/runtime.go b/statefun/runtime.go index 4c24950..beab758 100644 --- a/statefun/runtime.go +++ b/statefun/runtime.go @@ -6,6 +6,7 @@ import ( "errors" "os" "os/signal" + "strings" "sync" "sync/atomic" "syscall" @@ -57,6 +58,7 @@ type Runtime struct { functionsStopCh chan struct{} wg sync.WaitGroup afterStartFunctionsWaitGroup sync.WaitGroup + once sync.Once } // NewRuntime initializes a new Runtime instance with the given configuration. @@ -151,6 +153,7 @@ func (r *Runtime) RegisterOnAfterStartFunction(f OnAfterStartFunction, async boo func (r *Runtime) Start(ctx context.Context, cacheConfig *cache.Config) error { logger := lg.GetLogger() phaseOneContext, cancelPhaseOneContext := context.WithCancel(context.Background()) + _ = phaseOneContext phaseTwoContext, cancelPhaseTwoContext := context.WithCancel(context.Background()) phaseThreeContext, cancelPhaseThreeContext := context.WithCancel(context.Background()) @@ -158,10 +161,18 @@ func (r *Runtime) Start(ctx context.Context, cacheConfig *cache.Config) error { sig := make(chan os.Signal, 1) signal.Notify(sig, syscall.SIGINT, syscall.SIGTERM, syscall.SIGHUP) <-sig + if !r.config.isActiveInstance { + logger.Debugf(ctx, "Runtime is not active. Shutting down immediately") + r.Shutdown() + cancelPhaseOneContext() + cancelPhaseTwoContext() + cancelPhaseThreeContext() + return + } startShutdown := time.Now() r.shutdownPhase.Store(ShutdownPhaseOne) - lg.GetLogger().Debugf(ctx, "Received shutdown signal, shutting down gracefully...") - lg.GetLogger().Debugf(ctx, "Shutdown phase 1") + logger.Debugf(ctx, "Received shutdown signal, shutting down gracefully...") + logger.Debugf(ctx, "Shutdown phase 1") cancelPhaseOneContext() timeout := time.NewTimer(10 * time.Second) @@ -175,26 +186,35 @@ func (r *Runtime) Start(ctx context.Context, cacheConfig *cache.Config) error { select { case <-timeout.C: - lg.GetLogger().Debugf(ctx, "AfterStart functions timed out") + logger.Debugf(ctx, "AfterStart functions timed out") case <-done: - lg.GetLogger().Debugf(ctx, "AfterStart functions completed") + logger.Debugf(ctx, "AfterStart functions completed") } r.drainSignalSubscriptions() r.shutdownPhase.Store(ShutdownPhaseTwo) - lg.GetLogger().Debugf(ctx, "Shutdown phase 2") + + logger.Debugf(ctx, "Shutdown phase 2") <-r.functionsStopCh cancelPhaseTwoContext() - lg.GetLogger().Debugf(ctx, "Shutdown phase 3") + logger.Debugf(ctx, "Shutdown phase 3") <-r.Domain.cache.Synced + if r.config.isActiveInstance { + logger.Debugf(ctx, "Shutdown - waiting for transaction committer") + <-r.Domain.shutdown + } + r.Shutdown() - lg.GetLogger().Debugf(ctx, "Shutdown took %v s", time.Since(startShutdown)) + + cancelPhaseThreeContext() + + logger.Debugf(ctx, "Shutdown took %v s", time.Since(startShutdown)) } go gracefulShutdownFunc() @@ -237,7 +257,7 @@ func (r *Runtime) Start(ctx context.Context, cacheConfig *cache.Config) error { // Handle single-instance functions. singleInstanceFunctionRevisions := make(map[string]uint64) - if err := r.handleSingleInstanceFunctions(phaseOneContext, singleInstanceFunctionRevisions); err != nil { + if err := r.handleSingleInstanceFunctions(phaseThreeContext, singleInstanceFunctionRevisions); err != nil { return err } @@ -255,16 +275,13 @@ func (r *Runtime) Start(ctx context.Context, cacheConfig *cache.Config) error { // Set Runtime ready r.isReady = true - // Run after-start functions. - r.runAfterStartFunctions(phaseOneContext) - // Wait for shutdown signal. <-r.shutdown - cancelPhaseThreeContext() // Perform cleanup. logger.Info(ctx, "Shutting down...") + // Wait for last goroutines waitCh := make(chan struct{}) go func() { r.wg.Wait() @@ -517,8 +534,10 @@ func (r *Runtime) singleInstanceFunctionLocksUpdater(ctx context.Context, revisi //release all functions releaseAllLocks := func(ctx context.Context, runtime *Runtime, revisions map[string]uint64) { - for ftName, revID := range revisions { - system.MsgOnErrorReturn(KeyMutexUnlock(ctx, runtime, system.GetHashStr(ftName), revID)) + if runtime.config.isActiveInstance { + for ftName, revID := range revisions { + system.MsgOnErrorReturn(KeyMutexUnlock(ctx, runtime, system.GetHashStr(ftName), revID)) + } } } defer releaseAllLocks(ctx, r, revisions) @@ -536,40 +555,67 @@ func (r *Runtime) singleInstanceFunctionLocksUpdater(ctx context.Context, revisi if r.config.isActiveInstance { newRevID, err := KeyMutexLockUpdate(ctx, r, system.GetHashStr(RuntimeName), r.config.activeRevID) if err != nil { - lg.Logf(lg.ErrorLevel, "KeyMutexLockUpdate failed for %s: %v", RuntimeName, err) + lg.Logf(lg.WarnLevel, "Lost active lock for %s: %v", RuntimeName, err) + r.config.isActiveInstance = false + r.config.activeRevID = 0 + continue } else { r.config.activeRevID = newRevID } } else { newRevID, err := KeyMutexLock(ctx, r, system.GetHashStr(RuntimeName), true) if err == nil { + lg.Logf(lg.DebugLevel, "Passive instance becoming active") r.config.isActiveInstance = true r.config.activeRevID = newRevID - subscribeRequired = true + + if err := r.Domain.checkKvConsistency(ctx); err != nil { + lg.Logf(lg.ErrorLevel, "Failed to become active: %v", err) + system.MsgOnErrorReturn(KeyMutexUnlock(ctx, r, system.GetHashStr(RuntimeName), newRevID)) + r.config.isActiveInstance = false + } else { + subscribeRequired = true + } } else if !errors.Is(err, ErrMutexLocked) { lg.Logf(lg.ErrorLevel, "KeyMutexLock failed for %s: %v", RuntimeName, err) - return + continue } } + } else { + r.config.isActiveInstance = true + } + + tryLock := func(ftName string) { + newRevID, err := KeyMutexLock(ctx, r, system.GetHashStr(ftName), true) + if err == nil { + subscribeRequired = true + revisions[ftName] = newRevID + lg.Logf(lg.TraceLevel, "KeyMutexLock succeeded for %s", ftName) + } } if r.config.isActiveInstance { + r.once.Do(func() { + lg.GetLogger().Debugf(ctx, "runtime is active, run afterStartFunctions") + r.runAfterStartFunctions(ctx) + }) + for ftName, revID := range revisions { - if revID != 0 { - newRevID, err := KeyMutexLockUpdate(ctx, r, system.GetHashStr(ftName), revID) - if err != nil { - lg.Logf(lg.ErrorLevel, "KeyMutexLockUpdate failed for %s: %v", ftName, err) - } else { - revisions[ftName] = newRevID - } - } else { - newRevID, err := KeyMutexLock(ctx, r, system.GetHashStr(ftName), true) - if err == nil { - subscribeRequired = true - revisions[ftName] = newRevID - lg.Logf(lg.DebugLevel, "KeyMutexLock succeeded for %s", ftName) - } + if revID == 0 { + tryLock(ftName) + continue + } + newRevID, err := KeyMutexLockUpdate(ctx, r, system.GetHashStr(ftName), revID) + if err == nil { + revisions[ftName] = newRevID + continue + } + if strings.Contains(err.Error(), "already unlocked") { + revisions[ftName] = 0 + tryLock(ftName) + continue } + lg.Logf(lg.ErrorLevel, "KeyMutexLockUpdate failed for %s: %v", ftName, err) } } diff --git a/statefun/wal.go b/statefun/wal.go index 1b56c26..2534b7c 100644 --- a/statefun/wal.go +++ b/statefun/wal.go @@ -6,7 +6,6 @@ import ( "fmt" "strconv" "strings" - "sync" "time" customNatsKv "github.com/foliagecp/sdk/embedded/nats/kv" @@ -19,33 +18,33 @@ import ( const ( WALOperationsStreamName = "wal_operations" WALCommitsStreamName = "wal_commits" - WALOperationsSubject = "wal.ops.%s.%s" - WALCommitsSubject = "wal.commits.%s" + WALOperationsSubject = "wal.ops.*.*" + WALCommitsSubject = "wal.commits.*" CommitterDurableName = "TRANSACTION_COMMITTER_CONSUMER" ) func (dm *Domain) TransactionCommitter(ctx context.Context) error { + ready := make(chan struct{}) go func() { - if err := dm.runTransactionCommitter(ctx); err != nil { - lg.GetLogger().Debugf(ctx, "TransactionCommitter error: %s", err) + if err := dm.runTransactionCommitter(ctx, ready); err != nil { + lg.GetLogger().Errorf(ctx, "TransactionCommitter error: %s", err) } }() + <-ready return nil } -func (dm *Domain) runTransactionCommitter(ctx context.Context) error { - commitSubject := fmt.Sprintf(WALCommitsSubject, dm.kv.Bucket()) +func (dm *Domain) runTransactionCommitter(ctx context.Context, ready chan struct{}) error { + commitSubject := fmt.Sprintf("wal.commits.%s", dm.kv.Bucket()) consumerName := CommitterDurableName + "-" + dm.kv.Bucket() - lg.GetLogger().Debugf(ctx, "TransactionCommitter starting for bucket=%s, subject=%s", dm.kv.Bucket(), commitSubject) + lg.GetLogger().Tracef(ctx, "TransactionCommitter starting for bucket=%s, subject=%s", dm.kv.Bucket(), commitSubject) if err := dm.setKVConsistent(false); err != nil { - lg.GetLogger().Debugf(ctx, "Failed to set KV as inconsistent: %s", err) + lg.GetLogger().Errorf(ctx, "ailed to set KV as inconsistent: %s", err) return err } - lg.GetLogger().Debugf(ctx, "KV marked as inconsistent, will process pending transactions") - - processedTxs := sync.Map{} + lg.GetLogger().Tracef(ctx, "KV marked as inconsistent, will process pending transactions") _, err := dm.js.AddConsumer(WALCommitsStreamName, &nats.ConsumerConfig{ Name: consumerName, @@ -56,27 +55,27 @@ func (dm *Domain) runTransactionCommitter(ctx context.Context) error { AckPolicy: nats.AckExplicitPolicy, AckWait: 30 * time.Second, MaxDeliver: 5, + MaxAckPending: 1, }) if err != nil && !errors.Is(err, nats.ErrConsumerNameAlreadyInUse) { - lg.GetLogger().Debugf(ctx, "TransactionCommitter failed to create consumer: %s", err) + lg.GetLogger().Errorf(ctx, "TransactionCommitter failed to create consumer: %s", err) return err } - lg.GetLogger().Debugf(ctx, "TransactionCommitter consumer created/exists: %s", consumerName) + lg.GetLogger().Tracef(ctx, "TransactionCommitter consumer created/exists: %s", consumerName) pendingCount := dm.countPendingCommits(consumerName) - lg.GetLogger().Debugf(ctx, "Found %d pending transactions to process", pendingCount) + lg.GetLogger().Tracef(ctx, "Found %d pending transactions to process", pendingCount) if pendingCount == 0 { - lg.GetLogger().Debugf(ctx, "No pending transactions, marking KV as consistent immediately") + lg.GetLogger().Tracef(ctx, "No pending transactions, marking KV as consistent immediately") if err = dm.setKVConsistent(true); err != nil { - lg.GetLogger().Debugf(ctx, "Failed to set KV as consistent: %s", err) + lg.GetLogger().Errorf(ctx, "Failed to set KV as consistent: %s", err) return err } } processedCount := 0 - processingTxs := sync.Map{} _, err = dm.js.QueueSubscribe( commitSubject, @@ -84,44 +83,30 @@ func (dm *Domain) runTransactionCommitter(ctx context.Context) error { func(msg *nats.Msg) { txID := msg.Header.Get("tx_id") if txID == "" { - lg.GetLogger().Debugf(ctx, "TransactionCommitter: received commit without tx_id") + lg.GetLogger().Error(ctx, "TransactionCommitter: received commit without tx_id") system.MsgOnErrorReturn(msg.Ack()) return } - if _, processing := processingTxs.LoadOrStore(txID, true); processing { - lg.GetLogger().Debugf(ctx, "TransactionCommitter: transaction %s is already being processed, ignoring duplicate", txID) - return - } - defer processingTxs.Delete(txID) - - if _, processed := processedTxs.Load(txID); processed { - lg.GetLogger().Debugf(ctx, "TransactionCommitter: transaction %s already processed, Ack() final", txID) - system.MsgOnErrorReturn(msg.Ack()) - processedTxs.Delete(txID) - - processedCount++ - if pendingCount > 0 && processedCount >= pendingCount { - lg.GetLogger().Debugf(ctx, "All %d pending transactions processed, marking KV as consistent", pendingCount) - if err := dm.setKVConsistent(true); err != nil { - lg.GetLogger().Debugf(ctx, "Failed to set KV as consistent: %s", err) - } - pendingCount = 0 - } + lg.GetLogger().Tracef(ctx, "TransactionCommitter: processing commit for tx_id=%s", txID) + if err = dm.applyTransactionOperations(ctx, txID); err != nil { + lg.GetLogger().Errorf(ctx, "TransactionCommitter: failed to apply transaction %s: %s", txID, err) + system.MsgOnErrorReturn(msg.Nak()) return } - lg.GetLogger().Debugf(ctx, "TransactionCommitter: first time processing commit for tx_id=%s, Nak()", txID) - system.MsgOnErrorReturn(msg.Nak()) + lg.GetLogger().Tracef(ctx, "TransactionCommitter: transaction %s completed, Ack()", txID) + system.MsgOnErrorReturn(msg.Ack()) - if err = dm.applyTransactionOperations(ctx, txID); err != nil { - lg.GetLogger().Debugf(ctx, "TransactionCommitter: failed to apply transaction %s: %s", txID, err) - return + processedCount++ + if pendingCount > 0 && processedCount >= pendingCount { + lg.GetLogger().Debugf(ctx, "TransactionCommitter: all %d pending transactions processed, marking KV as consistent", pendingCount) + if err = dm.setKVConsistent(true); err != nil { + lg.GetLogger().Errorf(ctx, "TransactionCommitter: failed to set KV as consistent: %s", err) + } + pendingCount = 0 } - - lg.GetLogger().Debugf(ctx, "TransactionCommitter: successfully applied transaction %s, marked as processed", txID) - processedTxs.Store(txID, true) }, nats.Bind(WALCommitsStreamName, consumerName), nats.ManualAck(), @@ -130,20 +115,67 @@ func (dm *Domain) runTransactionCommitter(ctx context.Context) error { if err != nil { return err } + close(ready) + select { + case <-dm.cache.Synced: + if err = dm.setKVConsistent(false); err != nil { + lg.GetLogger().Errorf(ctx, "Failed to set KV as inconsistent: %s", err) + } - <-ctx.Done() - return nil + lg.GetLogger().Trace(ctx, "Cache synced, recounting pending transactions for shutdown") + + finalPendingCount := dm.countPendingCommits(consumerName) + lg.GetLogger().Tracef(ctx, "Final pending transactions count: %d", finalPendingCount) + + if finalPendingCount == 0 { + lg.GetLogger().Trace(ctx, "No final pending transactions, shutdown complete") + close(dm.shutdown) + return nil + } + + checkInterval := 200 * time.Millisecond + logInterval := 5 * time.Second + maxWaitTime := 30 * time.Second + startWait := time.Now() + lastLogTime := startWait + + for { + currentPending := dm.countPendingCommits(consumerName) + + if time.Since(lastLogTime) >= logInterval { + lg.GetLogger().Tracef(ctx, "Shutdown: waiting for transactions, pending=%d, elapsed=%v", + currentPending, time.Since(startWait).Round(time.Second)) + lastLogTime = time.Now() + } + + if currentPending == 0 { + lg.GetLogger().Tracef(ctx, "All transactions processed in %v, shutdown complete", + time.Since(startWait).Round(time.Millisecond)) + close(dm.shutdown) + return nil + } + + if time.Since(startWait) > maxWaitTime { + lg.GetLogger().Warnf(ctx, "Shutdown timeout reached after %v with %d pending transactions", + maxWaitTime, currentPending) + close(dm.shutdown) + return nil + } + + time.Sleep(checkInterval) + } + } } func (dm *Domain) applyTransactionOperations(ctx context.Context, txID string) error { - opsSubject := fmt.Sprintf(WALOperationsSubject, dm.kv.Bucket(), txID) + opsSubject := fmt.Sprintf("wal.ops.%s.%s", dm.kv.Bucket(), txID) consumerName := "TX_OPS_" + dm.kv.Bucket() + "_" + txID - lg.GetLogger().Debugf(ctx, "applyTransactionOperations: processing tx_id=%s, subject=%s", txID, opsSubject) + lg.GetLogger().Tracef(ctx, "applyTransactionOperations: processing tx_id=%s, subject=%s", txID, opsSubject) info, err := dm.js.ConsumerInfo(WALOperationsStreamName, consumerName) if err != nil || info == nil { - lg.GetLogger().Debugf(ctx, "Consumer %s not found, creating new one", consumerName) + lg.GetLogger().Errorf(ctx, "Consumer %s not found, creating new one", consumerName) consumerConfig := &nats.ConsumerConfig{ Name: consumerName, @@ -155,58 +187,53 @@ func (dm *Domain) applyTransactionOperations(ctx context.Context, txID string) e } if _, err = dm.js.AddConsumer(WALOperationsStreamName, consumerConfig); err != nil { - lg.GetLogger().Debugf(ctx, "Failed to create consumer: %s", err) + lg.GetLogger().Errorf(ctx, "Failed to create consumer: %s", err) return fmt.Errorf("failed to create operations consumer: %w", err) } - lg.GetLogger().Debugf(ctx, "Created durable consumer %s", consumerName) + lg.GetLogger().Tracef(ctx, "Created durable consumer %s", consumerName) } else { - lg.GetLogger().Debugf(ctx, "Reusing existing consumer %s", consumerName) + lg.GetLogger().Tracef(ctx, "Reusing existing consumer %s", consumerName) } sub, err := dm.js.PullSubscribe(opsSubject, consumerName) if err != nil { - lg.GetLogger().Debugf(ctx, "Failed to create subscription: %s", err) + lg.GetLogger().Errorf(ctx, "Failed to create subscription: %s", err) return fmt.Errorf("failed to subscribe to operations: %w", err) } - lg.GetLogger().Debugf(ctx, "Created subscription for consumer %s, checking if valid...", consumerName) if !sub.IsValid() { - lg.GetLogger().Debugf(ctx, "Subscription is INVALID immediately after creation!") return fmt.Errorf("subscription invalid after creation") } - lg.GetLogger().Debugf(ctx, "Subscription is valid, proceeding with Fetch") defer func() { - lg.GetLogger().Debugf(ctx, "Unsubscribing from consumer %s", consumerName) system.MsgOnErrorReturn(sub.Unsubscribe()) }() totalOps := 0 for { - lg.GetLogger().Debugf(ctx, "Attempting to Fetch up to 100 messages, subscription valid=%v", sub.IsValid()) msgs, err := sub.Fetch(100, nats.MaxWait(1*time.Second)) if err != nil { if errors.Is(err, nats.ErrTimeout) { - lg.GetLogger().Debugf(ctx, "applyTransactionOperations: finished, processed %d operations for tx_id=%s", totalOps, txID) + lg.GetLogger().Tracef(ctx, "applyTransactionOperations: finished, processed %d operations for tx_id=%s", totalOps, txID) break } return fmt.Errorf("failed to fetch operations: %w", err) } if len(msgs) == 0 { - lg.GetLogger().Debugf(ctx, "applyTransactionOperations: no more messages, processed %d operations for tx_id=%s", totalOps, txID) + lg.GetLogger().Tracef(ctx, "applyTransactionOperations: no more messages, processed %d operations for tx_id=%s", totalOps, txID) break } - lg.GetLogger().Debugf(ctx, "applyTransactionOperations: fetched %d operations for tx_id=%s", len(msgs), txID) + //lg.GetLogger().Tracef(ctx, "applyTransactionOperations: fetched %d operations for tx_id=%s", len(msgs), txID) for _, msg := range msgs { opType := msg.Header.Get("op_type") key := msg.Header.Get("key") if key == "" { - lg.GetLogger().Debugf(ctx, "Operation without key in transaction %s", txID) + lg.GetLogger().Errorf(ctx, "Operation without key in transaction %s", txID) system.MsgOnErrorReturn(msg.Ack()) continue } @@ -214,28 +241,25 @@ func (dm *Domain) applyTransactionOperations(ctx context.Context, txID string) e var kvErr error switch opType { case cache.OpTypePUT: - lg.GetLogger().Debugf(ctx, "Applying PUT for key=%s in tx=%s", key, txID) _, kvErr = customNatsKv.KVPut(dm.js, dm.kv, key, msg.Data) case cache.OpTypeDelete: - lg.GetLogger().Debugf(ctx, "Applying DELETE for key=%s in tx=%s", key, txID) kvErr = customNatsKv.KVDelete(dm.js, dm.kv, key) if kvErr != nil { errMsg := kvErr.Error() if errors.Is(kvErr, nats.ErrKeyNotFound) || strings.Contains(errMsg, "message not found") || strings.Contains(errMsg, "key not found") { - lg.GetLogger().Debugf(ctx, "Key %s already deleted, ignoring error: %s", key, errMsg) kvErr = nil } } default: - lg.GetLogger().Debugf(ctx, "Unknown operation type %s in transaction %s", opType, txID) + lg.GetLogger().Tracef(ctx, "Unknown operation type %s in transaction %s", opType, txID) system.MsgOnErrorReturn(msg.Ack()) continue } if kvErr != nil { - lg.GetLogger().Debugf(ctx, "Failed to apply operation to KV: %s", kvErr) + lg.GetLogger().Errorf(ctx, "Failed to apply operation to KV: %s", kvErr) system.MsgOnErrorReturn(msg.Nak()) return fmt.Errorf("failed to apply operation: %w", kvErr) } @@ -245,7 +269,7 @@ func (dm *Domain) applyTransactionOperations(ctx context.Context, txID string) e } } - lg.GetLogger().Debugf(ctx, "applyTransactionOperations: all operations processed, deleting consumer %s", consumerName) + lg.GetLogger().Tracef(ctx, "applyTransactionOperations: all operations processed, deleting consumer %s", consumerName) system.MsgOnErrorReturn(dm.js.DeleteConsumer(WALOperationsStreamName, consumerName)) return nil @@ -276,21 +300,21 @@ func (dm *Domain) isKVConsistent() (bool, error) { func (dm *Domain) countPendingCommits(consumerName string) int { info, err := dm.js.ConsumerInfo(WALCommitsStreamName, consumerName) if err != nil { - lg.GetLogger().Debugf(context.TODO(), "Failed to get consumer info: %s", err) + lg.GetLogger().Errorf(context.TODO(), "Failed to get consumer info: %s", err) return 0 } pending := int(info.NumPending) - lg.GetLogger().Debugf(context.TODO(), "Consumer %s has %d pending messages", consumerName, pending) + lg.GetLogger().Tracef(context.TODO(), "Consumer %s has %d pending messages", consumerName, pending) return pending } -func GenerateTransactionID() string { - return fmt.Sprintf("%d", time.Now().UnixNano()) +func generateTransactionID() string { + return fmt.Sprintf("%d", system.GetCurrentTimeNs()) } func (dm *Domain) publishWALOperation(txID string, opTime int64, opType cache.OpType, key string, value []byte) error { - subject := fmt.Sprintf(WALOperationsSubject, dm.kv.Bucket(), txID) + subject := fmt.Sprintf("wal.ops.%s.%s", dm.kv.Bucket(), txID) msg := nats.NewMsg(subject) msg.Header.Set("tx_id", txID) @@ -299,8 +323,6 @@ func (dm *Domain) publishWALOperation(txID string, opTime int64, opType cache.Op msg.Header.Set("key", key) msg.Data = value - lg.GetLogger().Debugf(context.TODO(), "::::::::::Publishing WAL operation: tx=%s, type=%s, key=%s, subject=%s", txID, opType, key, subject) - if _, err := dm.js.PublishMsg(msg); err != nil { lg.GetLogger().Errorf(context.TODO(), "Failed to publish WAL operation: %s", err) return err @@ -309,16 +331,16 @@ func (dm *Domain) publishWALOperation(txID string, opTime int64, opType cache.Op } func (dm *Domain) publishWALCommit(txID string) error { - subject := fmt.Sprintf(WALCommitsSubject, dm.kv.Bucket()) + subject := fmt.Sprintf("wal.commits.%s", dm.kv.Bucket()) msg := nats.NewMsg(subject) msg.Header.Set("tx_id", txID) msg.Header.Set("commit_time", strconv.FormatInt(time.Now().UnixNano(), 10)) - lg.GetLogger().Debugf(context.TODO(), "::::::::::Publishing WAL commit: tx=%s, subject=%s", txID, subject) + lg.GetLogger().Tracef(context.TODO(), "Publishing WAL commit: tx=%s, subject=%s", txID, subject) if _, err := dm.js.PublishMsg(msg); err != nil { - lg.GetLogger().Errorf(context.TODO(), "::::::::::Failed to publish WAL commit: %s", err) + lg.GetLogger().Errorf(context.TODO(), "Failed to publish WAL commit: %s", err) return err } return nil From 35917fc8b1c0c7b302c61d4fafd77319c1a0f9b8 Mon Sep 17 00:00:00 2001 From: atauov Date: Wed, 17 Dec 2025 22:51:06 +0500 Subject: [PATCH 3/8] not need a sleep --- statefun/cache/cache.go | 2 -- statefun/wal.go | 5 ++--- 2 files changed, 2 insertions(+), 5 deletions(-) diff --git a/statefun/cache/cache.go b/statefun/cache/cache.go index 854e180..1f90bbd 100644 --- a/statefun/cache/cache.go +++ b/statefun/cache/cache.go @@ -455,8 +455,6 @@ func (cs *Store) traverseCacheForTransaction( suffixPathsStack = append(suffixPathsStack, newSuffix) depthsStack = append(depthsStack, currentDepth+1) - time.Sleep(time.Duration(cs.cacheConfig.lazyWriterValueProcessDelayMkS) * time.Microsecond) - return true }) diff --git a/statefun/wal.go b/statefun/wal.go index 2534b7c..a07c0a4 100644 --- a/statefun/wal.go +++ b/statefun/wal.go @@ -41,7 +41,6 @@ func (dm *Domain) runTransactionCommitter(ctx context.Context, ready chan struct lg.GetLogger().Tracef(ctx, "TransactionCommitter starting for bucket=%s, subject=%s", dm.kv.Bucket(), commitSubject) if err := dm.setKVConsistent(false); err != nil { - lg.GetLogger().Errorf(ctx, "ailed to set KV as inconsistent: %s", err) return err } lg.GetLogger().Tracef(ctx, "KV marked as inconsistent, will process pending transactions") @@ -175,7 +174,7 @@ func (dm *Domain) applyTransactionOperations(ctx context.Context, txID string) e info, err := dm.js.ConsumerInfo(WALOperationsStreamName, consumerName) if err != nil || info == nil { - lg.GetLogger().Errorf(ctx, "Consumer %s not found, creating new one", consumerName) + lg.GetLogger().Tracef(ctx, "Consumer %s not found, creating new one", consumerName) consumerConfig := &nats.ConsumerConfig{ Name: consumerName, @@ -233,7 +232,7 @@ func (dm *Domain) applyTransactionOperations(ctx context.Context, txID string) e key := msg.Header.Get("key") if key == "" { - lg.GetLogger().Errorf(ctx, "Operation without key in transaction %s", txID) + lg.GetLogger().Warnf(ctx, "Operation without key in transaction %s", txID) system.MsgOnErrorReturn(msg.Ack()) continue } From 87e552e2fc789ef3369b84dabef1be538136eec5 Mon Sep 17 00:00:00 2001 From: atauov Date: Thu, 18 Dec 2025 00:46:30 +0500 Subject: [PATCH 4/8] update gracefully shutdown --- statefun/io.go | 4 +-- statefun/runtime.go | 78 ++++++++++++++++++++++++++++++--------------- 2 files changed, 55 insertions(+), 27 deletions(-) diff --git a/statefun/io.go b/statefun/io.go index 900c369..e7d3bb6 100644 --- a/statefun/io.go +++ b/statefun/io.go @@ -385,7 +385,7 @@ func (r *Runtime) Signal(signalProvider sfPlugins.SignalProvider, typename strin if !r.isReady { return fmt.Errorf("can not send signal - runtime has not started yet") } - if r.shutdownPhase.Load() != ShutdownPhaseNone { + if r.gs.phase() != ShutdownPhaseNone { return fmt.Errorf("can not send signal - runtime is shutting down") } return r.signal(signalProvider, "ingress", "signal", typename, r.Domain.GetValidObjectId(id), payload, options, nil) @@ -395,7 +395,7 @@ func (r *Runtime) Request(requestProvider sfPlugins.RequestProvider, typename st if !r.isReady { return nil, fmt.Errorf("can not send request - runtime has not started yet") } - if r.shutdownPhase.Load() != ShutdownPhaseNone { + if r.gs.phase() != ShutdownPhaseNone { return nil, fmt.Errorf("can not send request - runtime is shutting down") } return r.request(requestProvider, "ingress", "request", typename, r.Domain.GetValidObjectId(id), payload, options, nil, timeout...) diff --git a/statefun/runtime.go b/statefun/runtime.go index beab758..974f4f3 100644 --- a/statefun/runtime.go +++ b/statefun/runtime.go @@ -54,13 +54,25 @@ type Runtime struct { isReady bool shutdown chan struct{} - shutdownPhase atomic.Uint32 + gs *GracefullyShutdown functionsStopCh chan struct{} wg sync.WaitGroup afterStartFunctionsWaitGroup sync.WaitGroup once sync.Once } +type GracefullyShutdown struct { + Phase atomic.Uint32 + + cancelPhaseOne context.CancelFunc + cancelPhaseTwo context.CancelFunc + cancelPhaseThree context.CancelFunc + + CtxPhaseOne context.Context + CtxPhaseTwo context.Context + CtxPhaseThree context.Context +} + // NewRuntime initializes a new Runtime instance with the given configuration. func NewRuntime(config RuntimeConfig) (*Runtime, error) { r := &Runtime{ @@ -71,7 +83,6 @@ func NewRuntime(config RuntimeConfig) (*Runtime, error) { shutdown: make(chan struct{}), functionsStopCh: make(chan struct{}), } - r.shutdownPhase.Store(ShutdownPhaseNone) natsOpts := nats.GetDefaultOptions() natsOpts.Url = r.config.natsURL @@ -152,10 +163,7 @@ func (r *Runtime) RegisterOnAfterStartFunction(f OnAfterStartFunction, async boo // It also handles graceful shutdown via context.Context. func (r *Runtime) Start(ctx context.Context, cacheConfig *cache.Config) error { logger := lg.GetLogger() - phaseOneContext, cancelPhaseOneContext := context.WithCancel(context.Background()) - _ = phaseOneContext - phaseTwoContext, cancelPhaseTwoContext := context.WithCancel(context.Background()) - phaseThreeContext, cancelPhaseThreeContext := context.WithCancel(context.Background()) + r.gs = NewGracefullyShutdown(context.Background()) gracefulShutdownFunc := func() { sig := make(chan os.Signal, 1) @@ -163,18 +171,15 @@ func (r *Runtime) Start(ctx context.Context, cacheConfig *cache.Config) error { <-sig if !r.config.isActiveInstance { logger.Debugf(ctx, "Runtime is not active. Shutting down immediately") + r.gs.cancelAllContexts() r.Shutdown() - cancelPhaseOneContext() - cancelPhaseTwoContext() - cancelPhaseThreeContext() return } startShutdown := time.Now() - r.shutdownPhase.Store(ShutdownPhaseOne) + r.gs.setPhase(ShutdownPhaseOne) logger.Debugf(ctx, "Received shutdown signal, shutting down gracefully...") logger.Debugf(ctx, "Shutdown phase 1") - cancelPhaseOneContext() - + r.gs.cancelPhaseOne() timeout := time.NewTimer(10 * time.Second) defer timeout.Stop() @@ -193,13 +198,13 @@ func (r *Runtime) Start(ctx context.Context, cacheConfig *cache.Config) error { r.drainSignalSubscriptions() - r.shutdownPhase.Store(ShutdownPhaseTwo) + r.gs.setPhase(ShutdownPhaseTwo) logger.Debugf(ctx, "Shutdown phase 2") <-r.functionsStopCh - cancelPhaseTwoContext() + r.gs.cancelPhaseTwo() logger.Debugf(ctx, "Shutdown phase 3") @@ -209,11 +214,8 @@ func (r *Runtime) Start(ctx context.Context, cacheConfig *cache.Config) error { logger.Debugf(ctx, "Shutdown - waiting for transaction committer") <-r.Domain.shutdown } - r.Shutdown() - - cancelPhaseThreeContext() - + r.gs.cancelPhaseThree() logger.Debugf(ctx, "Shutdown took %v s", time.Since(startShutdown)) } @@ -232,7 +234,7 @@ func (r *Runtime) Start(ctx context.Context, cacheConfig *cache.Config) error { } // Start the domain. - if err := r.Domain.start(phaseTwoContext, cacheConfig, r.config.handlesDomainRouters); err != nil { + if err := r.Domain.start(r.gs.CtxPhaseTwo, cacheConfig, r.config.handlesDomainRouters); err != nil { return err } @@ -257,7 +259,7 @@ func (r *Runtime) Start(ctx context.Context, cacheConfig *cache.Config) error { // Handle single-instance functions. singleInstanceFunctionRevisions := make(map[string]uint64) - if err := r.handleSingleInstanceFunctions(phaseThreeContext, singleInstanceFunctionRevisions); err != nil { + if err := r.handleSingleInstanceFunctions(r.gs.CtxPhaseThree, singleInstanceFunctionRevisions); err != nil { return err } @@ -270,7 +272,7 @@ func (r *Runtime) Start(ctx context.Context, cacheConfig *cache.Config) error { // Start garbage collector. r.wg.Add(1) - go r.runGarbageCollector(phaseThreeContext) + go r.runGarbageCollector(r.gs.CtxPhaseThree) // Set Runtime ready r.isReady = true @@ -328,6 +330,32 @@ func (r *Runtime) stopRequestSubscriptions() { } } +func NewGracefullyShutdown(rootCtx context.Context) *GracefullyShutdown { + gs := &GracefullyShutdown{} + + gs.setPhase(ShutdownPhaseNone) + gs.CtxPhaseOne, gs.cancelPhaseOne = context.WithCancel(rootCtx) + gs.CtxPhaseTwo, gs.cancelPhaseTwo = context.WithCancel(rootCtx) + gs.CtxPhaseThree, gs.cancelPhaseThree = context.WithCancel(rootCtx) + + return gs +} + +func (gs *GracefullyShutdown) setPhase(phase ShutdownPhase) { + gs.Phase.Store(phase) +} + +func (gs *GracefullyShutdown) phase() ShutdownPhase { + return gs.Phase.Load() +} + +// cancelAllContexts force stops passive runtime +func (gs *GracefullyShutdown) cancelAllContexts() { + gs.cancelPhaseOne() + gs.cancelPhaseTwo() + gs.cancelPhaseThree() +} + // Shutdown stops the runtime. func (r *Runtime) Shutdown() { close(r.shutdown) @@ -474,7 +502,7 @@ func (r *Runtime) collectGarbage() { lg.Logf(lg.ErrorLevel, "Error ensuring GaugeVec: %v", err) } - isShutdown := r.shutdownPhase.Load() == ShutdownPhaseTwo + isShutdown := r.gs.phase() == ShutdownPhaseTwo functionsReadyForStop := isShutdown for _, ft := range r.registeredFunctionTypes { @@ -502,7 +530,7 @@ func (r *Runtime) collectGarbage() { for _, ft := range r.registeredFunctionTypes { ft.stopRequestSubscription() } - r.shutdownPhase.Store(ShutdownPhaseThree) + r.gs.setPhase(ShutdownPhaseThree) r.functionsStopCh <- struct{}{} } @@ -595,9 +623,9 @@ func (r *Runtime) singleInstanceFunctionLocksUpdater(ctx context.Context, revisi } if r.config.isActiveInstance { - r.once.Do(func() { + r.once.Do(func() { //TODO need change logic for active->passive->active lg.GetLogger().Debugf(ctx, "runtime is active, run afterStartFunctions") - r.runAfterStartFunctions(ctx) + r.runAfterStartFunctions(r.gs.CtxPhaseOne) }) for ftName, revID := range revisions { From 8a254387ce891707f12b65e5569fb60b912f48b5 Mon Sep 17 00:00:00 2001 From: atauov Date: Fri, 19 Dec 2025 14:26:51 +0500 Subject: [PATCH 5/8] Fix active->passive failover --- statefun/runtime.go | 38 +++++++++++++++++++++++++++++++------- 1 file changed, 31 insertions(+), 7 deletions(-) diff --git a/statefun/runtime.go b/statefun/runtime.go index 974f4f3..db31015 100644 --- a/statefun/runtime.go +++ b/statefun/runtime.go @@ -58,7 +58,8 @@ type Runtime struct { functionsStopCh chan struct{} wg sync.WaitGroup afterStartFunctionsWaitGroup sync.WaitGroup - once sync.Once + afterStartRunning atomic.Bool + activeInstanceMu sync.RWMutex } type GracefullyShutdown struct { @@ -453,6 +454,13 @@ func (r *Runtime) startFunctionSubscriptions(ctx context.Context, revisions map[ return nil } +func (r *Runtime) stopFunctionSubscriptions(ctx context.Context) { + for _, ft := range r.registeredFunctionTypes { + ft.stopSignalSubscription() + ft.stopRequestSubscription() + } +} + // runAfterStartFunctions executes the registered OnAfterStart functions. func (r *Runtime) runAfterStartFunctions(ctx context.Context) { for _, fnWithMode := range r.onAfterStartFunctionsWithMode { @@ -583,9 +591,16 @@ func (r *Runtime) singleInstanceFunctionLocksUpdater(ctx context.Context, revisi if r.config.isActiveInstance { newRevID, err := KeyMutexLockUpdate(ctx, r, system.GetHashStr(RuntimeName), r.config.activeRevID) if err != nil { - lg.Logf(lg.WarnLevel, "Lost active lock for %s: %v", RuntimeName, err) + lg.Logf(lg.WarnLevel, "Active instance lost lock, becoming passive") + r.activeInstanceMu.Lock() r.config.isActiveInstance = false r.config.activeRevID = 0 + r.activeInstanceMu.Unlock() + r.stopFunctionSubscriptions(ctx) + if r.afterStartRunning.Load() { + r.gs.cancelPhaseOne() + r.afterStartRunning.Store(false) + } continue } else { r.config.activeRevID = newRevID @@ -594,14 +609,21 @@ func (r *Runtime) singleInstanceFunctionLocksUpdater(ctx context.Context, revisi newRevID, err := KeyMutexLock(ctx, r, system.GetHashStr(RuntimeName), true) if err == nil { lg.Logf(lg.DebugLevel, "Passive instance becoming active") + r.activeInstanceMu.Lock() r.config.isActiveInstance = true r.config.activeRevID = newRevID + r.activeInstanceMu.Unlock() if err := r.Domain.checkKvConsistency(ctx); err != nil { lg.Logf(lg.ErrorLevel, "Failed to become active: %v", err) system.MsgOnErrorReturn(KeyMutexUnlock(ctx, r, system.GetHashStr(RuntimeName), newRevID)) + + r.activeInstanceMu.Lock() r.config.isActiveInstance = false + r.config.activeRevID = 0 + r.activeInstanceMu.Unlock() } else { + r.gs.CtxPhaseOne, r.gs.cancelPhaseOne = context.WithCancel(context.Background()) subscribeRequired = true } } else if !errors.Is(err, ErrMutexLocked) { @@ -623,11 +645,13 @@ func (r *Runtime) singleInstanceFunctionLocksUpdater(ctx context.Context, revisi } if r.config.isActiveInstance { - r.once.Do(func() { //TODO need change logic for active->passive->active - lg.GetLogger().Debugf(ctx, "runtime is active, run afterStartFunctions") - r.runAfterStartFunctions(r.gs.CtxPhaseOne) - }) - + if r.config.isActiveInstance { + if r.afterStartRunning.CompareAndSwap(false, true) { + r.gs.CtxPhaseOne, r.gs.cancelPhaseOne = context.WithCancel(context.Background()) + lg.GetLogger().Debugf(ctx, "runtime is active, run afterStartFunctions") + r.runAfterStartFunctions(r.gs.CtxPhaseOne) + } + } for ftName, revID := range revisions { if revID == 0 { tryLock(ftName) From 84a37bd211aa142bcec9a07227541d8e85132c6a Mon Sep 17 00:00:00 2001 From: atauov Date: Fri, 19 Dec 2025 14:27:18 +0500 Subject: [PATCH 6/8] Decrease AckWait --- statefun/wal.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/statefun/wal.go b/statefun/wal.go index a07c0a4..453ef37 100644 --- a/statefun/wal.go +++ b/statefun/wal.go @@ -52,7 +52,7 @@ func (dm *Domain) runTransactionCommitter(ctx context.Context, ready chan struct DeliverGroup: consumerName + "-group", FilterSubject: commitSubject, AckPolicy: nats.AckExplicitPolicy, - AckWait: 30 * time.Second, + AckWait: 5 * time.Second, MaxDeliver: 5, MaxAckPending: 1, }) From 658b1bdcd701c7b702e34e9a62a8b079b03ea253 Mon Sep 17 00:00:00 2001 From: atauov Date: Thu, 5 Feb 2026 16:14:22 +0500 Subject: [PATCH 7/8] Replicate WAL streams (system-streams) --- statefun/domain.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/statefun/domain.go b/statefun/domain.go index 7773b85..3dde5ef 100644 --- a/statefun/domain.go +++ b/statefun/domain.go @@ -560,6 +560,7 @@ func (dm *Domain) createWALOperationsStream() error { Subjects: []string{WALOperationsSubject}, Retention: nats.WorkQueuePolicy, MaxAge: 24 * time.Hour, + Replicas: dm.sysSC.replicasCount, } return dm.createStreamIfNotExists(sc) } @@ -570,6 +571,7 @@ func (dm *Domain) createWALCommitsStream() error { Subjects: []string{WALCommitsSubject}, Retention: nats.WorkQueuePolicy, MaxAge: 24 * time.Hour, + Replicas: dm.sysSC.replicasCount, } return dm.createStreamIfNotExists(sc) } From f031136b9b66001c1df24c102a813e695e0b9667 Mon Sep 17 00:00:00 2001 From: atauov Date: Mon, 16 Feb 2026 17:12:25 +0500 Subject: [PATCH 8/8] WAL phase 1, ready for HA (active-passive) --- statefun/cache/cache.go | 552 +++++++++++++++------------------ statefun/cache/cache_config.go | 3 + statefun/domain.go | 147 ++++++++- statefun/io.go | 4 +- statefun/runtime.go | 236 +++++++++++--- statefun/wal.go | 387 +++++++++++++++++++++++ 6 files changed, 976 insertions(+), 353 deletions(-) create mode 100644 statefun/wal.go diff --git a/statefun/cache/cache.go b/statefun/cache/cache.go index 57c971c..1c99e6d 100644 --- a/statefun/cache/cache.go +++ b/statefun/cache/cache.go @@ -295,6 +295,8 @@ type Store struct { transactionsMutex *sync.Mutex getKeysByPatternFromKVMutex *sync.Mutex + transactionGenerator TransactionGenerator + //write barrier state backupBarrierTimestamp int64 backupBarrierStatus int32 // 0=unlocked, 1=locking, 2=locked @@ -302,7 +304,170 @@ type Store struct { Synced chan struct{} } +type traverseResult struct { + opsCount int + traverseCount int + iterationSyncedCount int + lruTimes []int64 + allBeforeBackupBarrierSynced bool +} + +func (cs *Store) traverseCacheForTransaction( + txID string, + barrierTime int64, + ignoreBarrier bool, +) *traverseResult { + le := lg.GetLogger() + result := &traverseResult{ + opsCount: 0, + traverseCount: 0, + iterationSyncedCount: 0, + lruTimes: []int64{}, + allBeforeBackupBarrierSynced: true, + } + + var backupBarrierTimestamp int64 + var backupBarrierStatus int32 + if !ignoreBarrier { + backupBarrierTimestamp, backupBarrierStatus = cs.getBackupBarrierState() + } + + cacheStoreValueStack := []*StoreValue{cs.rootValue} + suffixPathsStack := []string{""} + depthsStack := []int{0} + + for len(cacheStoreValueStack) > 0 { + lastID := len(cacheStoreValueStack) - 1 + + currentStoreValue := cacheStoreValueStack[lastID] + result.traverseCount++ + + currentStoreValue.RLock("kvLazyWriter") + result.lruTimes = append(result.lruTimes, currentStoreValue.valueUpdateTime) + currentStoreValue.RUnlock("kvLazyWriter") + + currentSuffix := suffixPathsStack[lastID] + currentDepth := depthsStack[lastID] + + cacheStoreValueStack = cacheStoreValueStack[:lastID] + suffixPathsStack = suffixPathsStack[:lastID] + depthsStack = depthsStack[:lastID] + + noChildren := true + currentStoreValue.Range(func(key, value interface{}) bool { + noChildren = false + + var newSuffix string + if currentDepth == 0 { + newSuffix = currentSuffix + key.(string) + } else { + newSuffix = currentSuffix + "." + key.(string) + } + + csvChild := value.(*StoreValue) + + if backupBarrierStatus == BackupBarrierStatusLocking { + ve, vut, sn, sw := csvChild.syncState() + if ve && vut > 0 && vut <= backupBarrierTimestamp { + if sn || !sw { + result.allBeforeBackupBarrierSynced = false + } + } + } + + var valueUpdateTime int64 = 0 + var opType OpType + var finalBytes []byte = nil + + csvChild.Lock("kvLazyWriter") + if csvChild.syncNeeded { + valueUpdateTime = csvChild.valueUpdateTime + timeBytes := make([]byte, 8) + binary.BigEndian.PutUint64(timeBytes, uint64(valueUpdateTime)) + if csvChild.valueExists { + var flag uint8 + var dataBytes []byte + switch csvChild.valueType { + case typeByteArray: + flag = FlagBytesAppend + dataBytes = csvChild.value.([]byte) + case typeJson: + flag = FlagJSONAppend + dataBytes = csvChild.value.(*easyjson.JSON).ToBytes() + default: + le.Errorf(cs.ctx, "Unknown type for key=%s, value=%v", newSuffix, csvChild.value) + csvChild.Unlock("kvLazyWriter") + cacheStoreValueStack = append(cacheStoreValueStack, value.(*StoreValue)) + suffixPathsStack = append(suffixPathsStack, newSuffix) + depthsStack = append(depthsStack, currentDepth+1) + return true + } + header := append(timeBytes, flag) + finalBytes = append(header, dataBytes...) + opType = OpTypePUT + } else { + finalBytes = append(timeBytes, FlagDeleted) + opType = OpTypeDelete + } + } else { + if csvChild.valueUpdateTime > 0 && csvChild.valueUpdateTime <= cs.lruTresholdTime && csvChild.purgeState == 0 { + currentStoreValue.ConsistencyLoss(system.GetCurrentTimeNs()) + csvChild.TryPurgeReady() + csvChild.TryPurgeConfirm() + } + } + csvChild.Unlock("kvLazyWriter") + + if finalBytes != nil { + result.iterationSyncedCount++ + keyStr := key.(string) + + if !ignoreBarrier { + if valueUpdateTime > barrierTime { + cacheStoreValueStack = append(cacheStoreValueStack, value.(*StoreValue)) + suffixPathsStack = append(suffixPathsStack, newSuffix) + depthsStack = append(depthsStack, currentDepth+1) + return true + } + + if err := cs.checkBackupBarrierInfoBeforeWrite(valueUpdateTime); err != nil { + le.Tracef(cs.ctx, "skipping write for key=%s due to barrier: %v", keyStr, err) + cacheStoreValueStack = append(cacheStoreValueStack, value.(*StoreValue)) + suffixPathsStack = append(suffixPathsStack, newSuffix) + depthsStack = append(depthsStack, currentDepth+1) + return true + } + } + + if err := cs.transactionGenerator.PublishOperation(txID, valueUpdateTime, opType, cs.toStoreKey(newSuffix), finalBytes); err != nil { + le.Errorf(cs.ctx, "Store kvLazyWriter cannot publish WAL operation for key=%s: %s", keyStr, err) + } else { + result.opsCount++ + csvChild.Lock("kvLazyWriter") + if valueUpdateTime == csvChild.valueUpdateTime { + csvChild.syncNeeded = false + } + csvChild.Unlock("kvLazyWriter") + } + } + + cacheStoreValueStack = append(cacheStoreValueStack, value.(*StoreValue)) + suffixPathsStack = append(suffixPathsStack, newSuffix) + depthsStack = append(depthsStack, currentDepth+1) + + return true + }) + + if noChildren { + currentStoreValue.collectGarbage() + } + } + + return result +} + func NewCacheStore(ctx context.Context, cacheConfig *Config, js nats.JetStreamContext, kv nats.KeyValue) *Store { + le := lg.GetLogger() var inited atomic.Bool initChan := make(chan bool) cs := Store{ @@ -334,8 +499,6 @@ func NewCacheStore(ctx context.Context, cacheConfig *Config, js nats.JetStreamCo cs.ctx, cs.cancel = context.WithCancel(ctx) - system.MsgOnErrorReturn(cs.clearBackupBarrier()) - storeUpdatesHandler := func(cs *Store) { system.GlobalPrometrics.GetRoutinesCounter().Started("cache.storeUpdatesHandler") defer system.GlobalPrometrics.GetRoutinesCounter().Stopped("cache.storeUpdatesHandler") @@ -364,7 +527,7 @@ func NewCacheStore(ctx context.Context, cacheConfig *Config, js nats.JetStreamCo if json, ok := easyjson.JSONFromBytes(valueBytes[9:]); ok { cs.SetValueJSON(key, &json, false, kvRecordTime, "") } else { - lg.Logf(lg.ErrorLevel, "Failed to parse JSON for key=%s", key) + le.Errorf(ctx, "Failed to parse JSON for key=%s", key) } default: // Someone else (other module) deleted a key from the cache @@ -403,7 +566,7 @@ func NewCacheStore(ctx context.Context, cacheConfig *Config, js nats.JetStreamCo // Deletion notify - omitting cause value must already be deleted from the cache } else { //lg.Logf("---CACHE_KV !T!F: %s", key) - lg.Logf(lg.ErrorLevel, "storeUpdatesHandler: received value without time and append flag!") + le.Error(ctx, "storeUpdatesHandler: received value without time and append flag!") } } else { if inited.CompareAndSwap(false, true) { @@ -414,221 +577,103 @@ func NewCacheStore(ctx context.Context, cacheConfig *Config, js nats.JetStreamCo } system.MsgOnErrorReturn(w.Stop()) } else { - lg.Logf(lg.ErrorLevel, "storeUpdatesHandler kv.Watch error %s", err) + le.Errorf(ctx, "storeUpdatesHandler kv.Watch error %s", err) } } - kvLazyWriter := func(cs *Store) { + kvLazyWriterWithWAL := func(cs *Store) { system.GlobalPrometrics.GetRoutinesCounter().Started("cache.kvLazyWriter") defer system.GlobalPrometrics.GetRoutinesCounter().Stopped("cache.kvLazyWriter") + shutdownStatus := shutdownStatusNone for { - //start := time.Now() - iterationSyncedCount := 0 - traverseCount := 0 - - select { - case <-cs.ctx.Done(): - switch shutdownStatus { - case shutdownStatusNone: - lg.GetLogger().Debugf(ctx, "cache got shutdown sinal") - shutdownStatus = shutdownStatusWaiting - case shutdownStatusWaiting: - case shutdownStatusReady: - lg.GetLogger().Debugf(ctx, "cache synced, ready for shutdown") - close(cs.Synced) - return - } - default: - } - backupBarrierTimestamp, backupBarrierStatus := cs.getBackupBarrierState() - if backupBarrierStatus == BackupBarrierStatusLocking { - if backupBarrierTimestamp == 0 { - backupBarrierTimestamp = system.GetCurrentTimeNs() - system.MsgOnErrorReturn(cs.updateBackupBarrierWithTimestamp(backupBarrierTimestamp)) - lg.Logf(lg.InfoLevel, "set barrier timestamp: %d", backupBarrierTimestamp) - } + if shutdownStatus == shutdownStatusReady { + le.Debugf(ctx, "cache synced, ready for shutdown") + close(cs.Synced) + return } - allBeforeBackupBarrierSynced := true - - cacheStoreValueStack := []*StoreValue{cs.rootValue} - suffixPathsStack := []string{""} - depthsStack := []int{0} - - lruTimes := []int64{} - - for len(cacheStoreValueStack) > 0 { - lastID := len(cacheStoreValueStack) - 1 - currentStoreValue := cacheStoreValueStack[lastID] - traverseCount++ //Debug - - currentStoreValue.RLock("kvLazyWriter") - lruTimes = append(lruTimes, currentStoreValue.valueUpdateTime) - currentStoreValue.RUnlock("kvLazyWriter") - - currentSuffix := suffixPathsStack[lastID] - currentDepth := depthsStack[lastID] - - cacheStoreValueStack = cacheStoreValueStack[:lastID] - suffixPathsStack = suffixPathsStack[:lastID] - depthsStack = depthsStack[:lastID] - - noChildred := true - currentStoreValue.Range(func(key, value interface{}) bool { - noChildred = false - - var newSuffix string - if currentDepth == 0 { - newSuffix = currentSuffix + key.(string) - } else { - newSuffix = currentSuffix + "." + key.(string) - } - - var finalBytes []byte = nil + if cs.transactionGenerator == nil { + le.Debugf(ctx, "WAL is not ready, skip this iteration") + time.Sleep(100 * time.Millisecond) + continue + } - csvChild := value.(*StoreValue) + if shutdownStatus == shutdownStatusNone { + barrierTime := system.GetCurrentTimeNs() + txID := fmt.Sprintf("%d", barrierTime) - if backupBarrierStatus == BackupBarrierStatusLocking { - ve, vut, sn, sw := csvChild.syncState() - if ve && vut > 0 && vut <= backupBarrierTimestamp { - if sn || !sw { - allBeforeBackupBarrierSynced = false - } - } - } + waitTimer := time.NewTimer(time.Duration(cacheConfig.walTransactionWaitPeriodMS) * time.Millisecond) + select { + case <-cs.ctx.Done(): + waitTimer.Stop() + le.Debugf(ctx, "cache got shutdown signal") + shutdownStatus = shutdownStatusWaiting + continue + case <-waitTimer.C: + } - var valueUpdateTime int64 = 0 - csvChild.Lock("kvLazyWriter") - if csvChild.syncNeeded { - valueUpdateTime = csvChild.valueUpdateTime - timeBytes := make([]byte, 8) - binary.BigEndian.PutUint64(timeBytes, uint64(valueUpdateTime)) - if csvChild.valueExists { - var flag uint8 - var dataBytes []byte - switch csvChild.valueType { - case typeByteArray: - flag = FlagBytesAppend - dataBytes = csvChild.value.([]byte) - case typeJson: - flag = FlagJSONAppend - dataBytes = csvChild.value.(*easyjson.JSON).ToBytes() - default: - lg.Logf(lg.ErrorLevel, "Unknown type for key=%s, value=%v", newSuffix, csvChild.value) - csvChild.Unlock("kvLazyWriter") - return true - } - header := append(timeBytes, flag) - finalBytes = append(header, dataBytes...) - } else { - finalBytes = append(timeBytes, FlagDeleted) // Add delete flag "0" - } - } else { - if csvChild.valueUpdateTime > 0 && csvChild.valueUpdateTime <= cs.lruTresholdTime && csvChild.purgeState == 0 { // Older than or equal to specific time - // currentStoreValue locked by range no locking/unlocking needed - currentStoreValue.ConsistencyLoss(system.GetCurrentTimeNs()) - //lg.Logf("Consistency lost for key=\"%s\" store", currentStoreValue.GetFullKeyString()) - //lg.Logln("Purging: " + newSuffix) - csvChild.TryPurgeReady() - csvChild.TryPurgeConfirm() - } + backupBarrierTimestamp, backupBarrierStatus := cs.getBackupBarrierState() + if backupBarrierStatus == BackupBarrierStatusLocking { + if backupBarrierTimestamp == 0 { + backupBarrierTimestamp = system.GetCurrentTimeNs() + system.MsgOnErrorReturn(cs.updateBackupBarrierWithTimestamp(backupBarrierTimestamp)) + le.Infof(ctx, "set barrier timestamp: %d", backupBarrierTimestamp) } - csvChild.Unlock("kvLazyWriter") - - // Putting value into KV store ------------------ - if csvChild.syncNeeded { - iterationSyncedCount++ - - keyStr := key.(string) - ///_, putErr := kv.Put(cs.toStoreKey(newSuffix), finalBytes) + } - if err := cs.checkBackupBarrierInfoBeforeWrite(valueUpdateTime); err != nil { - lg.Logf(lg.TraceLevel, "==============skipping write for key=%s due to barrier: %v", keyStr, err) - return true - } + result := cs.traverseCacheForTransaction(txID, barrierTime, false) - _, putErr := customNatsKv.KVPut(cs.js, kv, cs.toStoreKey(newSuffix), finalBytes) - if putErr == nil { - csvChild.Lock("kvLazyWriter") - if valueUpdateTime == csvChild.valueUpdateTime { - csvChild.syncNeeded = false - } - csvChild.Unlock("kvLazyWriter") - } else { - lg.Logf(lg.ErrorLevel, "Store kvLazyWriter cannot update key=%s\n: %s", keyStr, putErr) - } + if result.opsCount > 0 { + le.Tracef(ctx, "Transaction %s: traversed=%d nodes, opsCount=%d", txID, result.traverseCount, result.opsCount) + le.Tracef(ctx, "Publishing WAL commit for tx=%s with %d operations", txID, result.opsCount) + if err := cs.transactionGenerator.PublishCommit(txID); err != nil { + le.Errorf(ctx, "Store kvLazyWriter cannot publish WAL commit for tx=%s: %s", txID, err) } - // ---------------------------------------------- - - cacheStoreValueStack = append(cacheStoreValueStack, value.(*StoreValue)) - suffixPathsStack = append(suffixPathsStack, newSuffix) - depthsStack = append(depthsStack, currentDepth+1) + } - select { - case <-cs.ctx.Done(): - if shutdownStatus == shutdownStatusNone { - shutdownStatus = shutdownStatusWaiting - } - default: - time.Sleep(time.Duration(cacheConfig.lazyWriterValueProcessDelayMkS) * time.Microsecond) + if backupBarrierStatus == BackupBarrierStatusLocking { + if result.allBeforeBackupBarrierSynced { + cs.markCacheReadyForBackup() } - return true - }) + } - if noChildred { - currentStoreValue.collectGarbage() + sort.Slice(result.lruTimes, func(i, j int) bool { return result.lruTimes[i] > result.lruTimes[j] }) + if len(result.lruTimes) > cacheConfig.lruSize { + cs.lruTresholdTime = result.lruTimes[cacheConfig.lruSize-1] + } else if len(result.lruTimes) > 0 { + cs.lruTresholdTime = result.lruTimes[len(result.lruTimes)-1] } - } - if backupBarrierStatus == BackupBarrierStatusLocking && allBeforeBackupBarrierSynced { - cs.markCacheReadyForBackup() - } - if shutdownStatus == shutdownStatusWaiting && iterationSyncedCount == 0 { - shutdownStatus = shutdownStatusReady - } - sort.Slice(lruTimes, func(i, j int) bool { return lruTimes[i] > lruTimes[j] }) - if len(lruTimes) > cacheConfig.lruSize { - cs.lruTresholdTime = lruTimes[cacheConfig.lruSize-1] - } else { - cs.lruTresholdTime = lruTimes[len(lruTimes)-1] - } + cs.valuesInCache = len(result.lruTimes) - /*// Debug info ----------------------------------------------------- - if cs.valuesInCache != len(lruTimes) { - cmpr := []bool{} - for i := 0; i < len(lruTimes); i++ { - cmpr = append(cmpr, lruTimes[i] > 0 && lruTimes[i] <= cs.lruTresholdTime) + if gaugeVec, err := system.GlobalPrometrics.EnsureGaugeVecSimple("cache_values", "", []string{"id"}); err == nil { + gaugeVec.With(prometheus.Labels{"id": cs.cacheConfig.id}).Set(float64(cs.valuesInCache)) } - lg.Logf("LEFT IN CACHE: %d (%d) - %s %s", len(lruTimes), cs.lruTresholdTime, fmt.Sprintln(cmpr), fmt.Sprintln(lruTimes)) + time.Sleep(time.Duration(cacheConfig.lazyWriterRepeatDelayMkS) * time.Microsecond) // Prevents too many locks and prevents too much processor time consumption } - // ----------------------------------------------------------------*/ - cs.valuesInCache = len(lruTimes) + if shutdownStatus == shutdownStatusWaiting { + txID := cs.transactionGenerator.GenerateTransactionID() + le.Debugf(ctx, "Final shutdown transaction %s - collecting all pending operations", txID) - if gaugeVec, err := system.GlobalPrometrics.EnsureGaugeVecSimple("cache_values", "", []string{"id"}); err == nil { - gaugeVec.With(prometheus.Labels{"id": cs.cacheConfig.id}).Set(float64(cs.valuesInCache)) - } - /*// Debug info ----------------------------------------------------- + result := cs.traverseCacheForTransaction(txID, 0, true) - iterationElapsed := time.Since(start) + if result.opsCount > 0 { + le.Debugf(ctx, "Final transaction %s: traversed=%d nodes, opsCount=%d", txID, result.traverseCount, result.opsCount) + le.Debugf(ctx, "Publishing final WAL commit for tx=%s with %d operations", txID, result.opsCount) + if err := cs.transactionGenerator.PublishCommit(txID); err != nil { + le.Errorf(ctx, "Store kvLazyWriter cannot publish final WAL commit for tx=%s: %s", txID, err) + } + } - if iterationSyncedCount > 0 { - speed := float64(iterationSyncedCount) / iterationElapsed.Seconds() - lg.GetLogger().Debugf(cs.ctx, - "::::::::::::::: Iteration: synced %d values (traversed %d nodes) in %v speed: %.2f writes/sec", - iterationSyncedCount, traverseCount, iterationElapsed, speed) - } else { - //lg.GetLogger().Debugf(cs.ctx, "::::::::::::::: Iteration: synced 0 values (traversed %d nodes) in %v", - // traverseCount, iterationElapsed) + if result.iterationSyncedCount == 0 { + shutdownStatus = shutdownStatusReady + } } - // ----------------------------------------------------------------*/ - - time.Sleep(time.Duration(cacheConfig.lazyWriterRepeatDelayMkS) * time.Microsecond) // Prevents too many locks and prevents too much processor time consumption } - } go storeUpdatesHandler(&cs) - go kvLazyWriter(&cs) + go kvLazyWriterWithWAL(&cs) <-initChan return &cs } @@ -678,11 +723,8 @@ func (cs *Store) GetValue(key string) ([]byte, error) { var result []byte = nil var resultError error = nil - //cacheMiss := true - if keyLastToken, parentCacheStoreValue := cs.getLastKeyTokenAndItsParentCacheStoreValue(key, false); len(keyLastToken) > 0 && parentCacheStoreValue != nil { if csv, ok := parentCacheStoreValue.LoadChild(keyLastToken); ok { - //cacheMiss = false // Value exists in cache - no cache miss then csv.RLock("GetValue") if !csv.ValueExists() { // Value was intentionally deleted and was marked so, no cache miss policy can be applied here resultError = fmt.Errorf("value for key=%s does not exist", key) @@ -704,41 +746,7 @@ func (cs *Store) GetValue(key string) ([]byte, error) { } // Cache miss ----------------------------------------- - if result == nil { - resultError = fmt.Errorf("Value for for key=%s does not exist", key) - } - - /*if cacheMiss { - if entry, err := customNatsKv.KVGet(cs.js, cs.kv, cs.toStoreKey(key)); err == nil { - key := cs.fromStoreKey(entry.Key()) - valueBytes := entry.Value() - result = valueBytes[9:] - - if len(valueBytes) >= 9 { // Updated or deleted value exists in KV store - appendFlag := valueBytes[8] - kvRecordTime := int64(binary.BigEndian.Uint64(valueBytes[:8])) - switch appendFlag { - case FlagAppendOld, FlagBytesAppend: - cs.SetValue(key, result, false, kvRecordTime, "") - case FlagJSONAppend: - if json, ok := easyjson.JSONFromBytes(result); ok { - cs.SetValueJSON(key, &json, false, kvRecordTime, "") - lg.Logf(lg.WarnLevel, "Value for key=%s is JSON in KV, use GetValueJSON method", key) - resultError = nil - } else { - resultError = fmt.Errorf("failed to parse JSON for key=%s", key) - } - default: - resultError = fmt.Errorf("unknown flag %d for key=%s", appendFlag, key) - } - } else { - resultError = fmt.Errorf("invalid KV entry (%v) for key=%s", valueBytes, key) - } - } else { - resultError = err - } - }*/ - // ---------------------------------------------------- + resultError = fmt.Errorf("value for for key=%s does not exist", key) return result, resultError } @@ -774,50 +782,7 @@ func (cs *Store) GetValueJSON(key string) (*easyjson.JSON, error) { } // ---------------------Cache miss-------------------------- - if result == nil { - resultError = fmt.Errorf("Value for for key=%s does not exist", key) - } - - /*if entry, err := customNatsKv.KVGet(cs.js, cs.kv, cs.toStoreKey(key)); err == nil { - key := cs.fromStoreKey(entry.Key()) - valueBytes := entry.Value() - - if len(valueBytes) >= 9 { - flag := valueBytes[8] - timestamp := int64(binary.BigEndian.Uint64(valueBytes[:8])) - data := valueBytes[9:] - - switch flag { - case FlagJSONAppend: - if json, ok := easyjson.JSONFromBytes(data); ok { - cs.SetValueJSON(key, &json, false, timestamp, "") - result = &json - resultError = nil - } else { - resultError = fmt.Errorf("failed to parse JSON for key=%s", key) - } - - case FlagAppendOld, FlagBytesAppend: - if json, ok := easyjson.JSONFromBytes(data); ok { - cs.SetValue(key, data, false, timestamp, "") - result = &json - resultError = nil - } else { - resultError = fmt.Errorf("value for key=%s is not JSON", key) - } - - case FlagDeleted, FlagDeletedOld: - resultError = fmt.Errorf("value for key=%s was deleted", key) - - default: - resultError = fmt.Errorf("unknown flag %d for key=%s", flag, key) - } - } else { - resultError = fmt.Errorf("invalid KV entry for key=%s", key) - } - } else { - resultError = err - }*/ + resultError = fmt.Errorf("value for for key=%s does not exist", key) return result, resultError } @@ -855,30 +820,6 @@ func (cs *Store) TransactionEnd(transactionID string) { } } -/*func (cs *Store) SetValueIfEquals(key string, newValue []byte, updateInKV bool, customSetTime int64, compareValue []byte) bool { - if customSetTime < 0 { - customSetTime = GetCurrentTimeNs() - } - if keyLastToken, parentCacheStoreValue := cs.getLastKeyTokenAndItsParentCacheStoreValue(key, true); len(keyLastToken) > 0 && parentCacheStoreValue != nil { - parentCacheStoreValue.Lock("SetValueIfEquals parent") - defer parentCacheStoreValue.Unlock("SetValueIfEquals parent") - - var csvUpdate *StoreValue = nil - if csv, ok := parentCacheStoreValue.LoadChild(keyLastToken, false); ok { - if currentByteValue, ok := csv.value.([]byte); ok && bytes.Equal(currentByteValue, compareValue) { - csv.Put(newValue, updateInKV, customSetTime) - return true - } - return false - } else { - csvUpdate = &StoreValue{value: newValue, storeMutex: &sync.Mutex{}, store: make(map[interface{}]*StoreValue), storeConsistencyWithKVLossTime: 0, valueExists: true, purgeState: 0, syncNeeded: updateInKV, syncedWithKV: !updateInKV, valueUpdateTime: customSetTime} - parentCacheStoreValue.StoreChild(keyLastToken, csvUpdate) - return true - } - } - return false -}*/ - func (cs *Store) SetValueIfDoesNotExist(key string, newValue []byte, updateInKV bool, customSetTime int64) bool { if keyLastToken, parent := cs.getLastKeyTokenAndItsParentCacheStoreValue(key, true); len(keyLastToken) > 0 && parent != nil { candidate := &StoreValue{ @@ -1266,3 +1207,26 @@ func (cs *Store) toStoreKey(key string) string { func (cs *Store) fromStoreKey(key string) string { return strings.Replace(key, cs.cacheConfig.kvStorePrefix+".", "", 1) } + +// -------- WAL transactions --------- + +type OpType = string + +const ( + OpTypePUT = "PUT" + OpTypeDelete = "DELETE" +) + +const ConsistencyKey = "__kv_consistent__" + +type TransactionGenerator interface { + PublishOperation(txID string, opTime int64, opType OpType, key string, value []byte) error + PublishCommit(txID string) error + GenerateTransactionID() string +} + +func (cs *Store) SetTransactionGenerator(tg TransactionGenerator) { + cs.transactionGenerator = tg +} + +// ----------------------------------- diff --git a/statefun/cache/cache_config.go b/statefun/cache/cache_config.go index f8812d1..009aae5 100644 --- a/statefun/cache/cache_config.go +++ b/statefun/cache/cache_config.go @@ -6,6 +6,7 @@ const ( LevelSubscriptionNotificationsBufferMaxSize = 30000 // ~16Mb: elemenets := 16 * 1024 * 1024 / (64 + 512), where 512 - avg value size, 64 - avg key size LazyWriterValueProcessDelayMkS = 500 LazyWriterRepeatDelayMkS = 100000 + WalTransactionWaitPeriodMS = 5000 ) type Config struct { @@ -15,6 +16,7 @@ type Config struct { levelSubscriptionNotificationsBufferMaxSize int lazyWriterValueProcessDelayMkS int lazyWriterRepeatDelayMkS int + walTransactionWaitPeriodMS int } func NewCacheConfig(id string) *Config { @@ -25,6 +27,7 @@ func NewCacheConfig(id string) *Config { levelSubscriptionNotificationsBufferMaxSize: LevelSubscriptionNotificationsBufferMaxSize, lazyWriterValueProcessDelayMkS: LazyWriterValueProcessDelayMkS, lazyWriterRepeatDelayMkS: LazyWriterRepeatDelayMkS, + walTransactionWaitPeriodMS: WalTransactionWaitPeriodMS, } } diff --git a/statefun/domain.go b/statefun/domain.go index 61c5e7d..6fee757 100644 --- a/statefun/domain.go +++ b/statefun/domain.go @@ -59,6 +59,8 @@ type Domain struct { kv nats.KeyValue cache *cache.Store + + shutdown chan struct{} } type streamConfig struct { @@ -310,9 +312,7 @@ func (dm *Domain) start(ctx context.Context, cacheConfig *cache.Config, createDo } kvExists = true } - if !kvExists { - return fmt.Errorf("Nats KV was not inited") - } + // -------------------------------------------------------------- if createDomainRouters { @@ -339,15 +339,108 @@ func (dm *Domain) start(ctx context.Context, cacheConfig *cache.Config, createDo if err := dm.createTraceStream(); err != nil { return err } + if err := dm.createWALOperationsStream(); err != nil { + return err + } + if err := dm.createWALCommitsStream(); err != nil { + return err + } } - lg.Logln(lg.TraceLevel, "Initializing the cache store...") + le := lg.GetLogger() + + le.Trace(ctx, "Initializing the cache store...") dm.cache = cache.NewCacheStore(ctx, cacheConfig, dm.js, dm.kv) - lg.Logln(lg.TraceLevel, "Cache store inited!") + dm.cache.SetTransactionGenerator(dm) + + if err := dm.TransactionCommitter(ctx); err != nil { + return err + } + if err := dm.checkKvConsistency(ctx); err != nil { + return err + } + le.Trace(ctx, "Cache store inited!") return nil } +func (dm *Domain) checkKvConsistency(ctx context.Context) error { + consumerName := CommitterDurableName + "-" + dm.kv.Bucket() + + const ( + checkInterval = 100 * time.Millisecond + waitTimeout = 30 * time.Second + ) + + timeout := time.NewTimer(waitTimeout) + defer timeout.Stop() + + ticker := time.NewTicker(checkInterval) + defer ticker.Stop() + + lg.Logln(lg.TraceLevel, "Waiting for KV consistency ...") + + for { + select { + case <-ctx.Done(): + return ctx.Err() + + case <-timeout.C: + return fmt.Errorf("timeout waiting for KV consistency") + + case <-ticker.C: + info, err := dm.js.ConsumerInfo(WALCommitsStreamName, consumerName) + if err != nil { + lg.Logf(lg.DebugLevel, "Failed to get consumer info: %s", err) + continue + } + + noPendingWork := + info.NumPending == 0 && + info.NumAckPending == 0 + + consistent, err := dm.isKVConsistent() + if err != nil { + lg.Logf(lg.ErrorLevel, "Failed to check consistency: %s", err) + continue + } + + if !noPendingWork { + lg.Logf(lg.TraceLevel, "Transactions in progress: pending=%d, ackPending=%d", info.NumPending, info.NumAckPending) + + if consistent { + if err := dm.setKVConsistent(false); err != nil { + return fmt.Errorf( + "failed to set KV inconsistent: %w", err, + ) + } + } + + if !timeout.Stop() { + select { + case <-timeout.C: + default: + } + } + timeout.Reset(waitTimeout) + continue + } + + if !consistent { + lg.Logln(lg.TraceLevel, "No pending work, setting KV consistent") + if err := dm.setKVConsistent(true); err != nil { + return fmt.Errorf( + "failed to set KV consistent: %w", err, + ) + } + } + + lg.Logln(lg.TraceLevel, "KV is consistent") + return nil + } + } +} + func (dm *Domain) createIngressRouter() error { targetSubjectCalculator := func(msg *nats.Msg) (string, error) { return fmt.Sprintf(DomainIngressSubjectsTmpl, dm.name, msg.Subject), nil @@ -450,6 +543,28 @@ func (dm *Domain) createTraceStream() error { return dm.createStreamIfNotExists(sc) } +func (dm *Domain) createWALOperationsStream() error { + sc := &nats.StreamConfig{ + Name: WALOperationsStreamName, + Subjects: []string{WALOperationsSubject}, + Retention: nats.WorkQueuePolicy, + MaxAge: 24 * time.Hour, + Replicas: dm.sysSC.replicasCount, + } + return dm.createStreamIfNotExists(sc) +} + +func (dm *Domain) createWALCommitsStream() error { + sc := &nats.StreamConfig{ + Name: WALCommitsStreamName, + Subjects: []string{WALCommitsSubject}, + Retention: nats.WorkQueuePolicy, + MaxAge: 24 * time.Hour, + Replicas: dm.sysSC.replicasCount, + } + return dm.createStreamIfNotExists(sc) +} + func (dm *Domain) createStreamIfNotExists(sc *nats.StreamConfig) error { // Create streams if does not exist ------------------------------ /* Each stream contains a single subject (topic). @@ -515,7 +630,7 @@ func (dm *Domain) createRouter(sourceStreamName string, subject string, tsc targ case domainEgressStreamName: // Default logic - infinite republishing case domainIngressStreamName: - // Send message to DLQ without retryAdd commentMore actions + // Send message to DLQ without retry if err == nil { lg.Logf(lg.DebugLevel, "Domain (domain=%s) router with sourceStreamName=%s republished message to DLQ", dm.name, sourceStreamName) system.MsgOnErrorReturn(msg.Ack()) @@ -555,3 +670,23 @@ func dlqMsgBuilder(subject, stream, domain, errorMsg string, data []byte) *nats. return dlqMsg } + +func (dm *Domain) PublishOperation(txID string, opTime int64, opType cache.OpType, key string, value []byte) error { + return dm.publishWALOperation(txID, opTime, opType, key, value) +} + +func (dm *Domain) PublishCommit(txID string) error { + return dm.publishWALCommit(txID) +} + +func (dm *Domain) GenerateTransactionID() string { + return generateTransactionID() +} + +func (dm *Domain) isBackupBarrierActive() bool { + entry, err := dm.kv.Get(cache.BackupBarrierLockKey) + if err != nil { + return false + } + return len(entry.Value()) > 0 +} diff --git a/statefun/io.go b/statefun/io.go index 900c369..44d52ed 100644 --- a/statefun/io.go +++ b/statefun/io.go @@ -385,7 +385,7 @@ func (r *Runtime) Signal(signalProvider sfPlugins.SignalProvider, typename strin if !r.isReady { return fmt.Errorf("can not send signal - runtime has not started yet") } - if r.shutdownPhase.Load() != ShutdownPhaseNone { + if r.gs.currentPhase() != ShutdownPhaseNone { return fmt.Errorf("can not send signal - runtime is shutting down") } return r.signal(signalProvider, "ingress", "signal", typename, r.Domain.GetValidObjectId(id), payload, options, nil) @@ -395,7 +395,7 @@ func (r *Runtime) Request(requestProvider sfPlugins.RequestProvider, typename st if !r.isReady { return nil, fmt.Errorf("can not send request - runtime has not started yet") } - if r.shutdownPhase.Load() != ShutdownPhaseNone { + if r.gs.currentPhase() != ShutdownPhaseNone { return nil, fmt.Errorf("can not send request - runtime is shutting down") } return r.request(requestProvider, "ingress", "request", typename, r.Domain.GetValidObjectId(id), payload, options, nil, timeout...) diff --git a/statefun/runtime.go b/statefun/runtime.go index 9e2b49a..d7bd9af 100644 --- a/statefun/runtime.go +++ b/statefun/runtime.go @@ -54,10 +54,25 @@ type Runtime struct { isReady bool shutdown chan struct{} - shutdownPhase atomic.Uint32 + gs *GracefulShutdown functionsStopCh chan struct{} wg sync.WaitGroup afterStartFunctionsWaitGroup sync.WaitGroup + afterStartRunning atomic.Bool + activeInstanceMu sync.RWMutex +} + +type GracefulShutdown struct { + phase atomic.Uint32 + mu sync.RWMutex + + cancelPhaseOne context.CancelFunc + cancelPhaseTwo context.CancelFunc + cancelPhaseThree context.CancelFunc + + ctxPhaseOne context.Context + ctxPhaseTwo context.Context + ctxPhaseThree context.Context } // NewRuntime initializes a new Runtime instance with the given configuration. @@ -70,7 +85,6 @@ func NewRuntime(config RuntimeConfig) (*Runtime, error) { shutdown: make(chan struct{}), functionsStopCh: make(chan struct{}), } - r.shutdownPhase.Store(ShutdownPhaseNone) natsOpts := nats.GetDefaultOptions() natsOpts.Servers = strings.Split(r.config.natsURL, ",") @@ -151,20 +165,23 @@ func (r *Runtime) RegisterOnAfterStartFunction(f OnAfterStartFunction, async boo // It also handles graceful shutdown via context.Context. func (r *Runtime) Start(ctx context.Context, cacheConfig *cache.Config) error { logger := lg.GetLogger() - phaseOneContext, cancelPhaseOneContext := context.WithCancel(context.Background()) - phaseTwoContext, cancelPhaseTwoContext := context.WithCancel(context.Background()) - phaseThreeContext, cancelPhaseThreeContext := context.WithCancel(context.Background()) + r.gs = NewGracefulShutdown(context.Background()) gracefulShutdownFunc := func() { sig := make(chan os.Signal, 1) signal.Notify(sig, syscall.SIGINT, syscall.SIGTERM, syscall.SIGHUP) <-sig + if !r.config.isActiveInstance { + logger.Debugf(ctx, "Runtime is not active. Shutting down immediately") + r.gs.cancelAllContexts() + r.Shutdown() + return + } startShutdown := time.Now() - r.shutdownPhase.Store(ShutdownPhaseOne) - lg.GetLogger().Debugf(ctx, "Received shutdown signal, shutting down gracefully...") - lg.GetLogger().Debugf(ctx, "Shutdown phase 1") - cancelPhaseOneContext() - + r.gs.setPhase(ShutdownPhaseOne) + logger.Debugf(ctx, "Received shutdown signal, shutting down gracefully...") + logger.Debugf(ctx, "Shutdown currentPhase 1") + r.gs.cancelPhaseOne() timeout := time.NewTimer(10 * time.Second) defer timeout.Stop() @@ -176,26 +193,32 @@ func (r *Runtime) Start(ctx context.Context, cacheConfig *cache.Config) error { select { case <-timeout.C: - lg.GetLogger().Debugf(ctx, "AfterStart functions timed out") + logger.Debugf(ctx, "AfterStart functions timed out") case <-done: - lg.GetLogger().Debugf(ctx, "AfterStart functions completed") + logger.Debugf(ctx, "AfterStart functions completed") } r.drainSignalSubscriptions() - r.shutdownPhase.Store(ShutdownPhaseTwo) - lg.GetLogger().Debugf(ctx, "Shutdown phase 2") + r.gs.setPhase(ShutdownPhaseTwo) + + logger.Debugf(ctx, "Shutdown currentPhase 2") <-r.functionsStopCh - cancelPhaseTwoContext() + r.gs.cancelPhaseTwo() - lg.GetLogger().Debugf(ctx, "Shutdown phase 3") + logger.Debugf(ctx, "Shutdown currentPhase 3") <-r.Domain.cache.Synced + if r.config.isActiveInstance { + logger.Debugf(ctx, "Shutdown - waiting for transaction committer") + <-r.Domain.shutdown + } r.Shutdown() - lg.GetLogger().Debugf(ctx, "Shutdown took %v s", time.Since(startShutdown)) + r.gs.cancelPhaseThree() + logger.Debugf(ctx, "Shutdown took %v s", time.Since(startShutdown)) } go gracefulShutdownFunc() @@ -213,7 +236,7 @@ func (r *Runtime) Start(ctx context.Context, cacheConfig *cache.Config) error { } // Start the domain. - if err := r.Domain.start(phaseTwoContext, cacheConfig, r.config.handlesDomainRouters); err != nil { + if err := r.Domain.start(r.gs.ctxPhaseTwo, cacheConfig, r.config.handlesDomainRouters); err != nil { return err } @@ -238,7 +261,7 @@ func (r *Runtime) Start(ctx context.Context, cacheConfig *cache.Config) error { // Handle single-instance functions. singleInstanceFunctionRevisions := make(map[string]uint64) - if err := r.handleSingleInstanceFunctions(phaseOneContext, singleInstanceFunctionRevisions); err != nil { + if err := r.handleSingleInstanceFunctions(r.gs.ctxPhaseThree, singleInstanceFunctionRevisions); err != nil { return err } @@ -251,21 +274,18 @@ func (r *Runtime) Start(ctx context.Context, cacheConfig *cache.Config) error { // Start garbage collector. r.wg.Add(1) - go r.runGarbageCollector(phaseThreeContext) + go r.runGarbageCollector(r.gs.ctxPhaseThree) // Set Runtime ready r.isReady = true - // Run after-start functions. - r.runAfterStartFunctions(phaseOneContext) - // Wait for shutdown signal. <-r.shutdown - cancelPhaseThreeContext() // Perform cleanup. logger.Info(ctx, "Shutting down...") + // Wait for last goroutines waitCh := make(chan struct{}) go func() { r.wg.Wait() @@ -312,6 +332,46 @@ func (r *Runtime) stopRequestSubscriptions() { } } +func NewGracefulShutdown(rootCtx context.Context) *GracefulShutdown { + gs := &GracefulShutdown{} + + gs.setPhase(ShutdownPhaseNone) + gs.mu.Lock() + gs.ctxPhaseOne, gs.cancelPhaseOne = context.WithCancel(rootCtx) + gs.ctxPhaseTwo, gs.cancelPhaseTwo = context.WithCancel(rootCtx) + gs.ctxPhaseThree, gs.cancelPhaseThree = context.WithCancel(rootCtx) + gs.mu.Unlock() + + return gs +} + +func (gs *GracefulShutdown) setPhase(phase ShutdownPhase) { + gs.phase.Store(phase) +} + +func (gs *GracefulShutdown) currentPhase() ShutdownPhase { + return gs.phase.Load() +} + +func (gs *GracefulShutdown) phaseOneCtx() context.Context { + gs.mu.RLock() + defer gs.mu.RUnlock() + return gs.ctxPhaseOne +} + +func (gs *GracefulShutdown) resetPhaseOneCtx() { + gs.mu.Lock() + defer gs.mu.Unlock() + gs.ctxPhaseOne, gs.cancelPhaseOne = context.WithCancel(context.Background()) +} + +// cancelAllContexts force stops passive runtime +func (gs *GracefulShutdown) cancelAllContexts() { + gs.cancelPhaseOne() + gs.cancelPhaseTwo() + gs.cancelPhaseThree() +} + // Shutdown stops the runtime. func (r *Runtime) Shutdown() { close(r.shutdown) @@ -409,6 +469,13 @@ func (r *Runtime) startFunctionSubscriptions(ctx context.Context, revisions map[ return nil } +func (r *Runtime) stopFunctionSubscriptions(ctx context.Context) { + for _, ft := range r.registeredFunctionTypes { + ft.stopSignalSubscription() + ft.stopRequestSubscription() + } +} + // runAfterStartFunctions executes the registered OnAfterStart functions. func (r *Runtime) runAfterStartFunctions(ctx context.Context) { for _, fnWithMode := range r.onAfterStartFunctionsWithMode { @@ -458,7 +525,7 @@ func (r *Runtime) collectGarbage() { lg.Logf(lg.ErrorLevel, "Error ensuring GaugeVec: %v", err) } - isShutdown := r.shutdownPhase.Load() == ShutdownPhaseTwo + isShutdown := r.gs.currentPhase() == ShutdownPhaseTwo functionsReadyForStop := isShutdown for _, ft := range r.registeredFunctionTypes { @@ -486,7 +553,7 @@ func (r *Runtime) collectGarbage() { for _, ft := range r.registeredFunctionTypes { ft.stopRequestSubscription() } - r.shutdownPhase.Store(ShutdownPhaseThree) + r.gs.setPhase(ShutdownPhaseThree) r.functionsStopCh <- struct{}{} } @@ -518,12 +585,31 @@ func (r *Runtime) singleInstanceFunctionLocksUpdater(ctx context.Context, revisi //release all functions releaseAllLocks := func(ctx context.Context, runtime *Runtime, revisions map[string]uint64) { - for ftName, revID := range revisions { - system.MsgOnErrorReturn(KeyMutexUnlock(ctx, runtime, system.GetHashStr(ftName), revID)) + if runtime.config.isActiveInstance { + for ftName, revID := range revisions { + system.MsgOnErrorReturn(KeyMutexUnlock(ctx, runtime, system.GetHashStr(ftName), revID)) + } } } defer releaseAllLocks(ctx, r, revisions) + // Channel for async KV consistency check result (non-nil = check in progress) + var kvConsistencyCheck chan error + + becomePassive := func(cause string) { + lg.Logf(lg.WarnLevel, "%s, becoming passive", cause) + r.activeInstanceMu.Lock() + r.config.isActiveInstance = false + r.config.activeRevID = 0 + r.activeInstanceMu.Unlock() + r.stopFunctionSubscriptions(ctx) + if r.afterStartRunning.Load() { + r.gs.cancelPhaseOne() + r.afterStartRunning.Store(false) + } + kvConsistencyCheck = nil + } + for { select { case <-ctx.Done(): @@ -531,46 +617,94 @@ func (r *Runtime) singleInstanceFunctionLocksUpdater(ctx context.Context, revisi case <-r.shutdown: return case <-ticker.C: - subscribeRequired := false //if true, need to subscribe on all functions + subscribeRequired := false if r.config.activePassiveMode { - if r.config.isActiveInstance { + // Refresh runtime lock if held (active or activating) + if r.config.activeRevID != 0 { newRevID, err := KeyMutexLockUpdate(ctx, r, system.GetHashStr(RuntimeName), r.config.activeRevID) if err != nil { - lg.Logf(lg.ErrorLevel, "KeyMutexLockUpdate failed for %s: %v", RuntimeName, err) - } else { - r.config.activeRevID = newRevID + becomePassive("Lost runtime lock") + continue + } + r.config.activeRevID = newRevID + } + + if r.config.isActiveInstance { + // Already active — nothing to do + } else if kvConsistencyCheck != nil { + // Activating: consistency check in progress, lock is being refreshed above + select { + case err := <-kvConsistencyCheck: + kvConsistencyCheck = nil + if err != nil { + lg.Logf(lg.ErrorLevel, "KV consistency check failed: %v", err) + system.MsgOnErrorReturn(KeyMutexUnlock(ctx, r, system.GetHashStr(RuntimeName), r.config.activeRevID)) + r.activeInstanceMu.Lock() + r.config.isActiveInstance = false + r.config.activeRevID = 0 + r.activeInstanceMu.Unlock() + } else { + lg.Logf(lg.DebugLevel, "KV consistent, fully active now") + r.activeInstanceMu.Lock() + r.config.isActiveInstance = true + r.activeInstanceMu.Unlock() + r.gs.resetPhaseOneCtx() + subscribeRequired = true + } + default: + // Still checking — lock was refreshed above, just wait } - } else { + } else if r.config.activeRevID == 0 { + // Passive: try to acquire lock newRevID, err := KeyMutexLock(ctx, r, system.GetHashStr(RuntimeName), true) if err == nil { - r.config.isActiveInstance = true + lg.Logf(lg.DebugLevel, "Passive instance acquired lock, checking KV consistency") r.config.activeRevID = newRevID - subscribeRequired = true + ch := make(chan error, 1) + kvConsistencyCheck = ch + go func() { + ch <- r.Domain.checkKvConsistency(ctx) + }() } else if !errors.Is(err, ErrMutexLocked) { lg.Logf(lg.ErrorLevel, "KeyMutexLock failed for %s: %v", RuntimeName, err) - return } } + } else { + r.config.isActiveInstance = true + } + + tryLock := func(ftName string) { + newRevID, err := KeyMutexLock(ctx, r, system.GetHashStr(ftName), true) + if err == nil { + subscribeRequired = true + revisions[ftName] = newRevID + lg.Logf(lg.TraceLevel, "KeyMutexLock succeeded for %s", ftName) + } } if r.config.isActiveInstance { + if r.afterStartRunning.CompareAndSwap(false, true) { + lg.GetLogger().Debugf(ctx, "runtime is active, run afterStartFunctions") + r.runAfterStartFunctions(r.gs.phaseOneCtx()) + } + for ftName, revID := range revisions { - if revID != 0 { - newRevID, err := KeyMutexLockUpdate(ctx, r, system.GetHashStr(ftName), revID) - if err != nil { - lg.Logf(lg.ErrorLevel, "KeyMutexLockUpdate failed for %s: %v", ftName, err) - } else { - revisions[ftName] = newRevID - } - } else { - newRevID, err := KeyMutexLock(ctx, r, system.GetHashStr(ftName), true) - if err == nil { - subscribeRequired = true - revisions[ftName] = newRevID - lg.Logf(lg.DebugLevel, "KeyMutexLock succeeded for %s", ftName) - } + if revID == 0 { + tryLock(ftName) + continue + } + newRevID, err := KeyMutexLockUpdate(ctx, r, system.GetHashStr(ftName), revID) + if err == nil { + revisions[ftName] = newRevID + continue + } + if strings.Contains(err.Error(), "already unlocked") { + revisions[ftName] = 0 + tryLock(ftName) + continue } + lg.Logf(lg.ErrorLevel, "KeyMutexLockUpdate failed for %s: %v", ftName, err) } } diff --git a/statefun/wal.go b/statefun/wal.go new file mode 100644 index 0000000..ce32a83 --- /dev/null +++ b/statefun/wal.go @@ -0,0 +1,387 @@ +package statefun + +import ( + "context" + "errors" + "fmt" + "strconv" + "strings" + "time" + + customNatsKv "github.com/foliagecp/sdk/embedded/nats/kv" + "github.com/foliagecp/sdk/statefun/cache" + lg "github.com/foliagecp/sdk/statefun/logger" + "github.com/foliagecp/sdk/statefun/system" + "github.com/nats-io/nats.go" +) + +const ( + WALOperationsStreamName = "wal_operations" + WALCommitsStreamName = "wal_commits" + WALOperationsSubject = "wal.ops.*.*" + WALCommitsSubject = "wal.commits.*" + CommitterDurableName = "TRANSACTION_COMMITTER_CONSUMER" + + // --- Commit consumer (TransactionCommitter) --- + + // How long NATS waits for Ack before redelivering a commit message. + commitAckWait = 5 * time.Second + + // How many times NATS retries an unacked commit before sending to DLQ. + commitMaxDeliver = 5 + + // --- Operations consumer (per-transaction) --- + + // How long NATS waits for Ack on each operation batch. + opsAckWait = 10 * time.Second + + // Fewer retries than commits: if a single op fails 3 times, the KV is likely + // unavailable and the whole transaction will be retried via commit redelivery. + opsMaxDeliver = 3 + + // Number of operation messages to fetch per pull batch. + // 100 balances throughput (fewer round-trips) vs memory (buffered messages). + opsFetchBatchSize = 100 + + // How long to wait for a batch before returning what's available. + opsFetchTimeout = 1 * time.Second + + // --- Shutdown drain --- + + // How often to poll for remaining pending commits during shutdown. + shutdownPollInterval = 200 * time.Millisecond + + // How often to log "still waiting" during shutdown drain. + shutdownLogInterval = 5 * time.Second + + // Max time to wait for pending transactions during shutdown. + // After this, shutdown proceeds with data loss warning. + shutdownMaxWait = 30 * time.Second +) + +func (dm *Domain) TransactionCommitter(ctx context.Context) error { + ready := make(chan struct{}) + go func() { + if err := dm.runTransactionCommitter(ctx, ready); err != nil { + lg.Logf(lg.ErrorLevel, "TransactionCommitter error: %s", err) + } + }() + <-ready + return nil +} + +func (dm *Domain) runTransactionCommitter(ctx context.Context, ready chan struct{}) error { + dm.shutdown = make(chan struct{}) + defer close(dm.shutdown) + + commitSubject := fmt.Sprintf("wal.commits.%s", dm.kv.Bucket()) + consumerName := CommitterDurableName + "-" + dm.kv.Bucket() + + lg.Logf(lg.TraceLevel, "TransactionCommitter starting for bucket=%s, subject=%s", dm.kv.Bucket(), commitSubject) + + if err := dm.setKVConsistent(false); err != nil { + return err + } + lg.Logln(lg.TraceLevel, "KV marked as inconsistent, will process pending transactions") + + _, err := dm.js.AddConsumer(WALCommitsStreamName, &nats.ConsumerConfig{ + Name: consumerName, + Durable: consumerName, + DeliverSubject: consumerName, + DeliverGroup: consumerName + "-group", + FilterSubject: commitSubject, + AckPolicy: nats.AckExplicitPolicy, + AckWait: commitAckWait, + MaxDeliver: commitMaxDeliver, + MaxAckPending: 1, + }) + if err != nil && !errors.Is(err, nats.ErrConsumerNameAlreadyInUse) { + lg.Logf(lg.ErrorLevel, "TransactionCommitter failed to create consumer: %s", err) + return err + } + + lg.Logf(lg.TraceLevel, "TransactionCommitter consumer created/exists: %s", consumerName) + + pendingCount := dm.countPendingCommits(consumerName) + lg.Logf(lg.TraceLevel, "Found %d pending transactions to process", pendingCount) + + if pendingCount == 0 { + lg.Logln(lg.TraceLevel, "No pending transactions, marking KV as consistent immediately") + if err = dm.setKVConsistent(true); err != nil { + lg.Logf(lg.ErrorLevel, "Failed to set KV as consistent: %s", err) + return err + } + } + + // Safe without mutex: MaxAckPending=1 guarantees sequential processing + processedCount := 0 + + _, err = dm.js.QueueSubscribe( + commitSubject, + consumerName+"-group", + func(msg *nats.Msg) { + txID := msg.Header.Get("tx_id") + if txID == "" { + lg.Logln(lg.ErrorLevel, "TransactionCommitter: received commit without tx_id") + system.MsgOnErrorReturn(msg.Ack()) + return + } + + lg.Logf(lg.TraceLevel, "TransactionCommitter: processing commit for tx_id=%s", txID) + + // Backup barrier pause + for dm.isBackupBarrierActive() { + lg.Logln(lg.WarnLevel, "Backup barrier active, pausing commit apply") + time.Sleep(200 * time.Millisecond) + } + + if err := dm.applyTransactionOperations(ctx, txID); err != nil { + lg.Logf(lg.ErrorLevel, "TransactionCommitter: failed to apply transaction %s: %s", txID, err) + system.MsgOnErrorReturn(msg.Nak()) + return + } + + lg.Logf(lg.TraceLevel, "TransactionCommitter: transaction %s completed, Ack()", txID) + system.MsgOnErrorReturn(msg.Ack()) + + processedCount++ + if pendingCount > 0 && processedCount >= pendingCount { + lg.Logf(lg.DebugLevel, "TransactionCommitter: all %d pending transactions processed, marking KV as consistent", pendingCount) + if err = dm.setKVConsistent(true); err != nil { + lg.Logf(lg.ErrorLevel, "TransactionCommitter: failed to set KV as consistent: %s", err) + } + pendingCount = 0 + } + }, + nats.Bind(WALCommitsStreamName, consumerName), + nats.ManualAck(), + ) + + if err != nil { + return err + } + + close(ready) + + <-dm.cache.Synced + + if err = dm.setKVConsistent(false); err != nil { + lg.Logf(lg.ErrorLevel, "Failed to set KV as inconsistent: %s", err) + } + + lg.Logf(lg.TraceLevel, "Cache synced, recounting pending transactions for shutdown") + + finalPendingCount := dm.countPendingCommits(consumerName) + lg.Logf(lg.TraceLevel, "Final pending transactions count: %d", finalPendingCount) + + if finalPendingCount == 0 { + lg.Logf(lg.TraceLevel, "No final pending transactions, shutdown complete") + return nil + } + + startWait := time.Now() + lastLogTime := startWait + + for { + currentPending := dm.countPendingCommits(consumerName) + + if time.Since(lastLogTime) >= shutdownLogInterval { + lg.Logf(lg.TraceLevel, "Shutdown: waiting for transactions, pending=%d, elapsed=%v", + currentPending, time.Since(startWait).Round(time.Second)) + lastLogTime = time.Now() + } + + if currentPending == 0 { + lg.Logf(lg.TraceLevel, "All transactions processed in %v, shutdown complete", + time.Since(startWait).Round(time.Millisecond)) + return nil + } + + if time.Since(startWait) > shutdownMaxWait { + lg.Logf(lg.WarnLevel, "Shutdown timeout reached after %v with %d pending transactions", + shutdownMaxWait, currentPending) + return nil + } + + time.Sleep(shutdownPollInterval) + } +} + +func (dm *Domain) applyTransactionOperations(ctx context.Context, txID string) error { + opsSubject := fmt.Sprintf("wal.ops.%s.%s", dm.kv.Bucket(), txID) + consumerName := "TX_OPS_" + dm.kv.Bucket() + "_" + txID + + lg.Logf(lg.TraceLevel, "applyTransactionOperations: processing tx_id=%s, subject=%s", txID, opsSubject) + + info, err := dm.js.ConsumerInfo(WALOperationsStreamName, consumerName) + if err != nil || info == nil { + lg.Logf(lg.TraceLevel, "Consumer %s not found, creating new one", consumerName) + + consumerConfig := &nats.ConsumerConfig{ + Name: consumerName, + Durable: consumerName, + FilterSubject: opsSubject, + AckPolicy: nats.AckExplicitPolicy, + AckWait: opsAckWait, + MaxDeliver: opsMaxDeliver, + } + + if _, err = dm.js.AddConsumer(WALOperationsStreamName, consumerConfig); err != nil { + lg.Logf(lg.ErrorLevel, "Failed to create consumer: %s", err) + return fmt.Errorf("failed to create operations consumer: %w", err) + } + lg.Logf(lg.TraceLevel, "Created durable consumer %s", consumerName) + } else { + lg.Logf(lg.TraceLevel, "Reusing existing consumer %s", consumerName) + } + + sub, err := dm.js.PullSubscribe(opsSubject, consumerName) + if err != nil { + lg.Logf(lg.ErrorLevel, "Failed to create subscription: %s", err) + return fmt.Errorf("failed to subscribe to operations: %w", err) + } + + if !sub.IsValid() { + return fmt.Errorf("subscription invalid after creation") + } + + defer func() { + system.MsgOnErrorReturn(sub.Unsubscribe()) + }() + + totalOps := 0 + + for { + msgs, err := sub.Fetch(opsFetchBatchSize, nats.MaxWait(opsFetchTimeout)) + if err != nil { + if errors.Is(err, nats.ErrTimeout) { + lg.Logf(lg.TraceLevel, "applyTransactionOperations: finished, processed %d operations for tx_id=%s", totalOps, txID) + break + } + return fmt.Errorf("failed to fetch operations: %w", err) + } + + if len(msgs) == 0 { + lg.Logf(lg.TraceLevel, "applyTransactionOperations: no more messages, processed %d operations for tx_id=%s", totalOps, txID) + break + } + + //lg.GetLogger().Tracef(ctx, "applyTransactionOperations: fetched %d operations for tx_id=%s", len(msgs), txID) + + for _, msg := range msgs { + opType := msg.Header.Get("op_type") + key := msg.Header.Get("key") + + if key == "" { + lg.Logf(lg.WarnLevel, "Operation without key in transaction %s", txID) + system.MsgOnErrorReturn(msg.Ack()) + continue + } + + var kvErr error + switch opType { + case cache.OpTypePUT: + _, kvErr = customNatsKv.KVPut(dm.js, dm.kv, key, msg.Data) + case cache.OpTypeDelete: + kvErr = customNatsKv.KVDelete(dm.js, dm.kv, key) + if kvErr != nil { + errMsg := kvErr.Error() + if errors.Is(kvErr, nats.ErrKeyNotFound) || + strings.Contains(errMsg, "message not found") || + strings.Contains(errMsg, "key not found") { + kvErr = nil + } + } + default: + lg.Logf(lg.TraceLevel, "Unknown operation type %s in transaction %s", opType, txID) + system.MsgOnErrorReturn(msg.Ack()) + continue + } + + if kvErr != nil { + lg.Logf(lg.ErrorLevel, "Failed to apply operation to KV: %s", kvErr) + system.MsgOnErrorReturn(msg.Nak()) + return fmt.Errorf("failed to apply operation: %w", kvErr) + } + + totalOps++ + system.MsgOnErrorReturn(msg.Ack()) + } + } + + lg.Logf(lg.TraceLevel, "applyTransactionOperations: all operations processed, deleting consumer %s", consumerName) + system.MsgOnErrorReturn(dm.js.DeleteConsumer(WALOperationsStreamName, consumerName)) + + return nil +} + +func (dm *Domain) setKVConsistent(consistent bool) error { + value := []byte("false") + if consistent { + value = []byte("true") + } + + _, err := customNatsKv.KVPut(dm.js, dm.kv, cache.ConsistencyKey, value) + return err +} + +func (dm *Domain) isKVConsistent() (bool, error) { + entry, err := dm.kv.Get(cache.ConsistencyKey) + if err != nil { + if errors.Is(err, nats.ErrKeyNotFound) { + return false, nil + } + return false, err + } + + return string(entry.Value()) == "true", nil +} + +func (dm *Domain) countPendingCommits(consumerName string) int { + info, err := dm.js.ConsumerInfo(WALCommitsStreamName, consumerName) + if err != nil { + lg.Logf(lg.ErrorLevel, "Failed to get consumer info: %s", err) + return 0 + } + + pending := int(info.NumPending) + lg.Logf(lg.TraceLevel, "Consumer %s has %d pending messages", consumerName, pending) + return pending +} + +func generateTransactionID() string { + return fmt.Sprintf("%d", system.GetCurrentTimeNs()) +} + +func (dm *Domain) publishWALOperation(txID string, opTime int64, opType cache.OpType, key string, value []byte) error { + subject := fmt.Sprintf("wal.ops.%s.%s", dm.kv.Bucket(), txID) + + msg := nats.NewMsg(subject) + msg.Header.Set("tx_id", txID) + msg.Header.Set("op_time", strconv.FormatInt(opTime, 10)) + msg.Header.Set("op_type", opType) + msg.Header.Set("key", key) + msg.Data = value + + if _, err := dm.js.PublishMsg(msg); err != nil { + lg.Logf(lg.ErrorLevel, "Failed to publish WAL operation: %s", err) + return err + } + return nil +} + +func (dm *Domain) publishWALCommit(txID string) error { + subject := fmt.Sprintf("wal.commits.%s", dm.kv.Bucket()) + + msg := nats.NewMsg(subject) + msg.Header.Set("tx_id", txID) + msg.Header.Set("commit_time", strconv.FormatInt(time.Now().UnixNano(), 10)) + + lg.Logf(lg.TraceLevel, "Publishing WAL commit: tx=%s, subject=%s", txID, subject) + + if _, err := dm.js.PublishMsg(msg); err != nil { + lg.Logf(lg.ErrorLevel, "Failed to publish WAL commit: %s", err) + return err + } + return nil +}