Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions embedded/graph/crud/ll_crud.go
Original file line number Diff line number Diff line change
Expand Up @@ -561,15 +561,15 @@ func LLAPILinkCreate(_ sfPlugins.StatefunExecutor, ctx *sfPlugins.StatefunContex

if !forceCreate {
// Check if link with this name already exists --------------
_, err := ctx.Domain.Cache().GetValue(fmt.Sprintf(OutLinkBodyKeyPrefPattern+KeySuff1Pattern, selfID, linkName))
_, err := ctx.Domain.Cache().GetValueJSON(fmt.Sprintf(OutLinkBodyKeyPrefPattern+KeySuff1Pattern, selfID, linkName))
if err == nil {
operationKeysMutexUnlock(ctx)
om.AggregateOpMsg(sfMediators.OpMsgFailed(fmt.Sprintf("link from=%s with name=%s already exists", selfID, linkName))).Reply()
return
}
// ----------------------------------------------------------
// Check if link with this type "type" to "to" already exists
_, err = ctx.Domain.Cache().GetValue(fmt.Sprintf(OutLinkTypeKeyPrefPattern+KeySuff2Pattern, selfID, linkType, toId))
_, err = ctx.Domain.Cache().GetValueJSON(fmt.Sprintf(OutLinkTypeKeyPrefPattern+KeySuff2Pattern, selfID, linkType, toId))
if err == nil {
operationKeysMutexUnlock(ctx)
om.AggregateOpMsg(sfMediators.OpMsgFailed(fmt.Sprintf("link from=%s with name=%s to=%s with type=%s already exists, two vertices can have a link with this type and direction only once", selfID, linkName, toId, linkType))).Reply()
Expand Down
7 changes: 5 additions & 2 deletions statefun/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -351,8 +351,11 @@
//lg.Logf("---CACHE_KV TF DELETE: %s, %d, %d", key, kvRecordTime, appendFlag)

//system.MsgOnErrorReturn(kv.Delete(entry.Key()))
system.MsgOnErrorReturn(customNatsKv.KVDelete(cs.js, cs.kv, entry.Key()))

if err = customNatsKv.KVDelete(cs.js, cs.kv, entry.Key()); err != nil &&
!(strings.HasPrefix(err.Error(), "nats: sequence ") && strings.Contains(err.Error(), " not found")) {
// Ignore "sequence {SEQ} not found" errors - another runtime already deleted the entry
lg.Logf(lg.ErrorLevel, "Failed to delete key=%s, err=%s", key, err)
}
//cs.rootValue.purgeReady
//if csv := cs.getLastKeyCacheStoreValue(key); csv != nil {
// csv.Purge(true)
Expand Down Expand Up @@ -626,7 +629,7 @@

if keyLastToken, parentCacheStoreValue := cs.getLastKeyTokenAndItsParentCacheStoreValue(key, false); len(keyLastToken) > 0 && parentCacheStoreValue != nil {
if csv, ok := parentCacheStoreValue.LoadChild(keyLastToken); ok {
cacheMiss = false // Value exists in cache - no cache miss then

Check failure on line 632 in statefun/cache/cache.go

View workflow job for this annotation

GitHub Actions / lint

ineffectual assignment to cacheMiss (ineffassign)
csv.RLock("GetValue")
if !csv.ValueExists() { // Value was intentionally deleted and was marked so, no cache miss policy can be applied here
resultError = fmt.Errorf("value for key=%s does not exist", key)
Expand Down
20 changes: 12 additions & 8 deletions statefun/runtime.go
Original file line number Diff line number Diff line change
Expand Up @@ -244,14 +244,18 @@ func (r *Runtime) handleSingleInstanceFunctions(ctx context.Context, revisions m
// startFunctionSubscriptions starts the function subscriptions based on the configuration.
func (r *Runtime) startFunctionSubscriptions(ctx context.Context, revisions map[string]uint64) error {
for _, ft := range r.registeredFunctionTypes {
revision, exist := revisions[ft.name]
if !exist {
lg.Logf(lg.WarnLevel, "Function type %s is not registered; skipping", ft.name)
continue
}
if !ft.config.multipleInstancesAllowed && revision == 0 {
lg.Logf(lg.WarnLevel, "Function type %s is already running; skipping", ft.name)
continue
if ft.config.multipleInstancesAllowed {
lg.Logf(lg.DebugLevel, "Function type %s allows multiple instances, subscribing", ft.name)
} else {
revision, exist := revisions[ft.name]
if !exist {
lg.Logf(lg.WarnLevel, "Function type %s is not registered; skipping", ft.name)
continue
}
if revision == 0 {
lg.Logf(lg.WarnLevel, "Function type %s is already running; skipping", ft.name)
continue
}
}

if ft.config.IsSignalProviderAllowed(sfPlugins.JetstreamGlobalSignal) {
Expand Down
Loading