Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion embedded/graph/crud/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ func RegisterAllFunctionTypes(runtime *statefun.Runtime) {
statefun.NewFunctionType(runtime, "functions.graph.api.link.delete", LLAPILinkDelete, *statefun.NewFunctionTypeConfig().SetAllowedRequestProviders(sfPlugins.AutoRequestSelect))
statefun.NewFunctionType(runtime, "functions.graph.api.link.read", LLAPILinkRead, *statefun.NewFunctionTypeConfig().SetAllowedRequestProviders(sfPlugins.AutoRequestSelect))

if runtime.Domain.Name() == runtime.Domain.HubDomainName() {
if runtime.Domain.Name() == runtime.Domain.LocalHubDomainName() {
runtime.RegisterOnAfterStartFunction(cmdbSchemaPrepare, false)
}
}
18 changes: 9 additions & 9 deletions embedded/graph/crud/hl_crud.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ func replyWithoutOpStack(om *sfMediators.OpMediator, ctx *sfPlugins.StatefunCont
*/
func CreateType(_ sfPlugins.StatefunExecutor, ctx *sfPlugins.StatefunContextProcessor) {
selfID := getOriginalID(ctx.Self.ID)
typesVertexId := ctx.Domain.CreateObjectIDWithHubDomain(BUILT_IN_TYPES, false)
typesVertexId := ctx.Domain.CreateObjectIDWithLocalHubDomain(BUILT_IN_TYPES, false)

if typeOperationRedirectedToHub(ctx) {
return
Expand Down Expand Up @@ -182,7 +182,7 @@ func ReadType(_ sfPlugins.StatefunExecutor, ctx *sfPlugins.StatefunContextProces
om.AggregateOpMsg(m)

vertexIsType := false
typesVertexId := ctx.Domain.CreateObjectIDWithHubDomain(BUILT_IN_TYPES, false)
typesVertexId := ctx.Domain.CreateObjectIDWithLocalHubDomain(BUILT_IN_TYPES, false)
for i := 0; i < m.Data.GetByPath("links.in").ArraySize(); i++ {
fromId := m.Data.GetByPath("links.in").ArrayElement(i).GetByPath("from").AsStringDefault("")
if fromId == typesVertexId {
Expand Down Expand Up @@ -235,8 +235,8 @@ func CreateObject(_ sfPlugins.StatefunExecutor, ctx *sfPlugins.StatefunContextPr
return
}

originType = ctx.Domain.CreateObjectIDWithHubDomain(originType, true)
builtInObjectsVertexId := ctx.Domain.CreateObjectIDWithHubDomain(BUILT_IN_OBJECTS, false)
originType = ctx.Domain.CreateObjectIDWithLocalHubDomain(originType, true)
builtInObjectsVertexId := ctx.Domain.CreateObjectIDWithLocalHubDomain(BUILT_IN_OBJECTS, false)

opTime := ctx.Payload.GetByPath("op_time").AsNumericDefault(-1)
ctx.Payload.SetByPath("op_time", easyjson.NewJSON(opTime))
Expand Down Expand Up @@ -412,7 +412,7 @@ func ReadObject(_ sfPlugins.StatefunExecutor, ctx *sfPlugins.StatefunContextProc

vertexIsObject := false
typeBidirectionalLink := false
objectsVertexId := ctx.Domain.CreateObjectIDWithHubDomain(BUILT_IN_OBJECTS, false)
objectsVertexId := ctx.Domain.CreateObjectIDWithLocalHubDomain(BUILT_IN_OBJECTS, false)
for i := 0; i < m.Data.GetByPath("links.in").ArraySize(); i++ {
fromId := m.Data.GetByPath("links.in").ArrayElement(i).GetByPath("from").AsStringDefault("")
if fromId == objectsVertexId {
Expand Down Expand Up @@ -481,7 +481,7 @@ func CreateTypesLink(_ sfPlugins.StatefunExecutor, ctx *sfPlugins.StatefunContex
om.AggregateOpMsg(sfMediators.OpMsgFailed("'to' undefined")).Reply()
return
}
toType = ctx.Domain.CreateObjectIDWithHubDomain(toType, true)
toType = ctx.Domain.CreateObjectIDWithLocalHubDomain(toType, true)

link := easyjson.NewJSONObject()
link.SetByPath("to", easyjson.NewJSON(toType))
Expand Down Expand Up @@ -524,7 +524,7 @@ func UpdateTypesLink(_ sfPlugins.StatefunExecutor, ctx *sfPlugins.StatefunContex
om.AggregateOpMsg(sfMediators.OpMsgFailed("'to' undefined")).Reply()
return
}
toType = ctx.Domain.CreateObjectIDWithHubDomain(toType, true)
toType = ctx.Domain.CreateObjectIDWithLocalHubDomain(toType, true)

link := ctx.Payload.Clone()
link.SetByPath("to", easyjson.NewJSON(toType))
Expand Down Expand Up @@ -574,7 +574,7 @@ func DeleteTypesLink(_ sfPlugins.StatefunExecutor, ctx *sfPlugins.StatefunContex
om.AggregateOpMsg(sfMediators.OpMsgFailed("'to' undefined")).Reply()
return
}
toType = ctx.Domain.CreateObjectIDWithHubDomain(toType, true)
toType = ctx.Domain.CreateObjectIDWithLocalHubDomain(toType, true)

operationKeysMutexLock(ctx, []string{selfID, toType}, true)

Expand Down Expand Up @@ -640,7 +640,7 @@ func ReadTypesLink(_ sfPlugins.StatefunExecutor, ctx *sfPlugins.StatefunContextP
om.AggregateOpMsg(sfMediators.OpMsgFailed("'to' undefined")).Reply()
return
}
toType = ctx.Domain.CreateObjectIDWithHubDomain(toType, true)
toType = ctx.Domain.CreateObjectIDWithLocalHubDomain(toType, true)

payload := easyjson.NewJSONObject()
payload.SetByPath("to", easyjson.NewJSON(toType))
Expand Down
8 changes: 4 additions & 4 deletions embedded/graph/crud/hl_crud_helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,10 @@ const (
)

func typeOperationRedirectedToHub(ctx *sfPlugins.StatefunContextProcessor) bool {
if ctx.Domain.Name() != ctx.Domain.HubDomainName() {
if ctx.Domain.Name() != ctx.Domain.LocalHubDomainName() {
om := sfMediators.NewOpMediator(ctx)
selfID := getOriginalID(ctx.Self.ID)
idOnHub := ctx.Domain.CreateObjectIDWithHubDomain(selfID, true)
idOnHub := ctx.Domain.CreateObjectIDWithLocalHubDomain(selfID, true)
om.AggregateOpMsg(sfMediators.OpMsgFromSfReply(ctx.Request(sfPlugins.AutoRequestSelect, ctx.Self.Typename, idOnHub, ctx.Payload, ctx.Options))).Reply()
return true
}
Expand Down Expand Up @@ -195,13 +195,13 @@ func cmdbSchemaPrepare(ctx context.Context, runtime *statefun.Runtime) error {
v := easyjson.NewJSONObject()
v.SetByPath("to", easyjson.NewJSON(BUILT_IN_TYPES))
v.SetByPath("type", easyjson.NewJSON(TYPES_TYPELINK))
v.SetByPath("name", easyjson.NewJSON(runtime.Domain.CreateObjectIDWithHubDomain(BUILT_IN_TYPES, false)))
v.SetByPath("name", easyjson.NewJSON(runtime.Domain.CreateObjectIDWithLocalHubDomain(BUILT_IN_TYPES, false)))
system.MsgOnErrorReturn(runtime.Request(sfPlugins.AutoRequestSelect, "functions.graph.api.link.create", BUILT_IN_ROOT, &v, nil))

v = easyjson.NewJSONObject()
v.SetByPath("to", easyjson.NewJSON(BUILT_IN_OBJECTS))
v.SetByPath("type", easyjson.NewJSON(OBJECTS_TYPELINK))
v.SetByPath("name", easyjson.NewJSON(runtime.Domain.CreateObjectIDWithHubDomain(BUILT_IN_OBJECTS, false)))
v.SetByPath("name", easyjson.NewJSON(runtime.Domain.CreateObjectIDWithLocalHubDomain(BUILT_IN_OBJECTS, false)))
system.MsgOnErrorReturn(runtime.Request(sfPlugins.AutoRequestSelect, "functions.graph.api.link.create", BUILT_IN_ROOT, &v, nil))
// ----------------------------------------------------

Expand Down
12 changes: 6 additions & 6 deletions embedded/graph/crud/hl_polytype_crud.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ func TypeSetSubType(_ sfPlugins.StatefunExecutor, ctx *sfPlugins.StatefunContext
om.AggregateOpMsg(sfMediators.OpMsgFailed("'sub_type' undefined")).Reply()
return
}
childTypeWithDomain := ctx.Domain.CreateObjectIDWithHubDomain(childType, true)
childTypeWithDomain := ctx.Domain.CreateObjectIDWithLocalHubDomain(childType, true)

operationKeysMutexLock(ctx, []string{selfID, childTypeWithDomain}, true)
defer operationKeysMutexUnlock(ctx)
Expand Down Expand Up @@ -82,7 +82,7 @@ func TypeRemoveSubType(_ sfPlugins.StatefunExecutor, ctx *sfPlugins.StatefunCont
om.AggregateOpMsg(sfMediators.OpMsgFailed("'sub_type' undefined")).Reply()
return
}
childTypeWithDomain := ctx.Domain.CreateObjectIDWithHubDomain(childType, true)
childTypeWithDomain := ctx.Domain.CreateObjectIDWithLocalHubDomain(childType, true)

operationKeysMutexLock(ctx, []string{selfID, childTypeWithDomain}, true)
defer operationKeysMutexUnlock(ctx)
Expand Down Expand Up @@ -133,8 +133,8 @@ func CreateObjectsLinkFromSuperTypes(_ sfPlugins.StatefunExecutor, ctx *sfPlugin
fromObjectClaimType := ctx.Payload.GetByPath("from_super_type").AsStringDefault("")
toObjectClaimType := ctx.Payload.GetByPath("to_super_type").AsStringDefault("")

fromObjectClaimType = ctx.Domain.CreateObjectIDWithHubDomain(fromObjectClaimType, true)
toObjectClaimType = ctx.Domain.CreateObjectIDWithHubDomain(toObjectClaimType, true)
fromObjectClaimType = ctx.Domain.CreateObjectIDWithLocalHubDomain(fromObjectClaimType, true)
toObjectClaimType = ctx.Domain.CreateObjectIDWithLocalHubDomain(toObjectClaimType, true)

objectLinkType := isObjectLinkPermittedForClaimedTypes(ctx, selfID, objectToID, fromObjectClaimType, toObjectClaimType)
if len(objectLinkType) == 0 {
Expand Down Expand Up @@ -185,8 +185,8 @@ func DeleteObjectsLinkFromSuperTypes(_ sfPlugins.StatefunExecutor, ctx *sfPlugin
fromObjectClaimType := ctx.Payload.GetByPath("from_super_type").AsStringDefault("")
toObjectClaimType := ctx.Payload.GetByPath("to_super_type").AsStringDefault("")

fromObjectClaimType = ctx.Domain.CreateObjectIDWithHubDomain(fromObjectClaimType, true)
toObjectClaimType = ctx.Domain.CreateObjectIDWithHubDomain(toObjectClaimType, true)
fromObjectClaimType = ctx.Domain.CreateObjectIDWithLocalHubDomain(fromObjectClaimType, true)
toObjectClaimType = ctx.Domain.CreateObjectIDWithLocalHubDomain(toObjectClaimType, true)

objectLinkType := isObjectLinkPermittedForClaimedTypes(ctx, selfID, objectToID, fromObjectClaimType, toObjectClaimType)
if len(objectLinkType) == 0 {
Expand Down
8 changes: 4 additions & 4 deletions embedded/graph/crud/hl_polytype_helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,15 +62,15 @@ func getObjectAllTypesBaseAndParents(ctx *sfPlugins.StatefunContextProcessor, ob
return
}

targetObjectType = ctx.Domain.CreateObjectIDWithHubDomain(targetObjectType, true)
targetObjectType = ctx.Domain.CreateObjectIDWithLocalHubDomain(targetObjectType, true)
result[targetObjectType] = struct{}{}

om := sfMediators.OpMsgFromSfReply(ctx.Request(sfPlugins.AutoRequestSelect, "functions.cmdb.api.type.read", makeSequenceFreeParentBasedID(ctx, targetObjectType), injectParentHoldsLocks(ctx, nil), nil))
if om.Data.PathExists("body.cache.parent_types") {
parentTypes := om.Data.GetByPath("body.cache.parent_types")
for i := 0; i < parentTypes.ArraySize(); i++ {
parentType := parentTypes.ArrayElement(i).AsStringDefault("")
parentType = ctx.Domain.CreateObjectIDWithHubDomain(parentType, true)
parentType = ctx.Domain.CreateObjectIDWithLocalHubDomain(parentType, true)
if len(parentType) > 0 {
result[parentType] = struct{}{}
}
Expand Down Expand Up @@ -172,7 +172,7 @@ func PolyTypeGoalFinalize(ctx *sfPlugins.StatefunContextProcessor, typesToRefres
}

func UpdateTypeModelVersion(ctx *sfPlugins.StatefunContextProcessor) {
typesVertexId := ctx.Domain.CreateObjectIDWithHubDomain(BUILT_IN_TYPES, false)
typesVertexId := ctx.Domain.CreateObjectIDWithLocalHubDomain(BUILT_IN_TYPES, false)

payload := easyjson.NewJSONObject()
payload.SetByPath("body.version", easyjson.NewJSON(system.GetUniqueStrID()))
Expand Down Expand Up @@ -230,7 +230,7 @@ func getTypeCacheVersionAndGlobalVersion(ctx *sfPlugins.StatefunContextProcessor
typeCacheVersion = som1.Data.GetByPath("body.cache.version").AsStringDefault("")
}

typesVertexId := ctx.Domain.CreateObjectIDWithHubDomain(BUILT_IN_TYPES, false)
typesVertexId := ctx.Domain.CreateObjectIDWithLocalHubDomain(BUILT_IN_TYPES, false)
som2 := sfMediators.OpMsgFromSfReply(ctx.Request(sfPlugins.AutoRequestSelect, "functions.graph.api.vertex.read", makeSequenceFreeParentBasedID(ctx, typesVertexId), injectParentHoldsLocks(ctx, nil), nil))
if som2.Status == sfMediators.SYNC_OP_STATUS_OK {
typeModelVersion = som2.Data.GetByPath("body.version").AsStringDefault("")
Expand Down
2 changes: 1 addition & 1 deletion embedded/graph/debug/graph.go
Original file line number Diff line number Diff line change
Expand Up @@ -287,7 +287,7 @@ func LLAPIImportGraph(executor sfPlugins.StatefunExecutor, ctx *sfPlugins.Statef
defer wg.Done()
for i := range vertexIdx {
n := graph.Nodes[i]
uuid := ctx.Domain.CreateObjectIDWithHubDomain(n.Id, true)
uuid := ctx.Domain.CreateObjectIDWithLocalHubDomain(n.Id, true)

if err := dbc.Graph.VertexDelete(uuid); err != nil {
system.MsgOnErrorReturn(err)
Expand Down
72 changes: 45 additions & 27 deletions statefun/domain.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,8 @@
)

type Domain struct {
hubDomainName string
centralHubDomainName string
localHubDomainName string
name string
weakClusterDomains map[string]struct{}
weakClusterDomainsMutex sync.Mutex
Expand All @@ -65,43 +66,58 @@
maxAge time.Duration
}

func NewDomain(nc *nats.Conn, js nats.JetStreamContext, desiredHubDomainName string, ftSC, sysSC, kvSC streamConfig) (dm *Domain, e error) {
func NewDomain(nc *nats.Conn, js nats.JetStreamContext, desiredCentralHubDomainName, desiredLocalHubDomainName string, ftSC, sysSC, kvSC streamConfig) (dm *Domain, e error) {
accInfo, err := js.AccountInfo()
if err != nil {
return nil, err
}

hubDomainName := desiredHubDomainName
centralHubDomainName := desiredCentralHubDomainName
localHubDomainName := desiredLocalHubDomainName
thisDomainName := accInfo.Domain
if thisDomainName == "" {
if hubDomainName == "" {
thisDomainName = DefaultHubDomainName
hubDomainName = DefaultHubDomainName
} else {
thisDomainName = hubDomainName
if centralHubDomainName == "" {
centralHubDomainName = DefaultCentralHubDomainName
}
if localHubDomainName == "" {
localHubDomainName = centralHubDomainName
}
thisDomainName = localHubDomainName
} else {
if hubDomainName == "" {
hubDomainName = thisDomainName
if centralHubDomainName == "" {
centralHubDomainName = thisDomainName
}
if localHubDomainName == "" {
localHubDomainName = thisDomainName
}
}

domain := &Domain{
hubDomainName: hubDomainName,
name: thisDomainName,
weakClusterDomains: map[string]struct{}{thisDomainName: {}},
nc: nc,
js: js,
ftSC: ftSC,
sysSC: sysSC,
kvSC: kvSC,
centralHubDomainName: centralHubDomainName,
localHubDomainName: localHubDomainName,
name: thisDomainName,
weakClusterDomains: map[string]struct{}{thisDomainName: {}},
nc: nc,
js: js,
ftSC: ftSC,
sysSC: sysSC,
kvSC: kvSC,
}

return domain, nil
}

// Deprecated
func (dm *Domain) HubDomainName() string {
return dm.hubDomainName
return dm.localHubDomainName
}

func (dm *Domain) CentralHubDomainName() string {
return dm.centralHubDomainName
}

func (dm *Domain) LocalHubDomainName() string {
return dm.localHubDomainName
}

func (dm *Domain) Name() string {
Expand All @@ -117,7 +133,7 @@
dm.weakClusterDomainsMutex.Lock()
defer dm.weakClusterDomainsMutex.Unlock()

weakClusterUniqueDomainNamesIncludingThis := []string{}
weakClusterUniqueDomainNamesIncludingThis := make([]string, 0, len(dm.weakClusterDomains))
for k := range dm.weakClusterDomains {
weakClusterUniqueDomainNamesIncludingThis = append(weakClusterUniqueDomainNamesIncludingThis, k)
}
Expand Down Expand Up @@ -250,8 +266,13 @@
return dm.CreateObjectIDWithDomain(dm.name, objectID, domainReplace)
}

// Deprecated
func (dm *Domain) CreateObjectIDWithHubDomain(objectID string, domainReplace bool) string {
return dm.CreateObjectIDWithDomain(dm.hubDomainName, objectID, domainReplace)
return dm.CreateObjectIDWithDomain(dm.localHubDomainName, objectID, domainReplace)
}

func (dm *Domain) CreateObjectIDWithLocalHubDomain(objectID string, domainReplace bool) string {
return dm.CreateObjectIDWithDomain(dm.localHubDomainName, objectID, domainReplace)
}

func (dm *Domain) start(cacheConfig *cache.Config, createDomainRouters bool) error {
Expand All @@ -274,15 +295,12 @@
if err != nil {
return err
}
kvExists = true

Check failure on line 298 in statefun/domain.go

View workflow job for this annotation

GitHub Actions / lint

ineffectual assignment to kvExists (ineffassign)
}
if !kvExists {
return fmt.Errorf("Nats KV was not inited")
}
// --------------------------------------------------------------

if createDomainRouters {
if dm.hubDomainName == dm.name {
if dm.centralHubDomainName == dm.name {
if err := dm.createHubSignalStream(); err != nil {
return err
}
Expand Down Expand Up @@ -348,14 +366,14 @@

func (dm *Domain) createIngresSignalStream() error {
var ss *nats.StreamSource
if dm.hubDomainName == dm.name {
if dm.centralHubDomainName == dm.name {
ss = &nats.StreamSource{
Name: hubEventStreamName,
FilterSubject: fmt.Sprintf(FromGlobalSignalTmpl, dm.name, ">"),
}
} else {
ext := &nats.ExternalStream{
APIPrefix: fmt.Sprintf(streamPrefix, dm.hubDomainName),
APIPrefix: fmt.Sprintf(streamPrefix, dm.centralHubDomainName),
}
ss = &nats.StreamSource{
Name: hubEventStreamName,
Expand Down
2 changes: 1 addition & 1 deletion statefun/function_type.go
Original file line number Diff line number Diff line change
Expand Up @@ -436,7 +436,7 @@ func (ft *FunctionType) getObjectImplTypes(id string) ([]string, error) {
parentTypes := response.GetByPath("data.body.cache.parent_types")
for i := 0; i < parentTypes.ArraySize(); i++ {
parentType := parentTypes.ArrayElement(i).AsStringDefault("")
parentType = ft.runtime.Domain.CreateObjectIDWithHubDomain(parentType, true)
parentType = ft.runtime.Domain.CreateObjectIDWithLocalHubDomain(parentType, true)
if len(parentType) > 0 {
result[parentType] = struct{}{}
}
Expand Down
5 changes: 5 additions & 0 deletions statefun/plugins/plugins.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,14 +59,19 @@ type SyncReply struct {
}

type Domain interface {
//Deprecated
HubDomainName() string
CentralHubDomainName() string
LocalHubDomainName() string
Name() string
Cache() *cache.Store
GetDomainFromObjectID(objectID string) string
GetObjectIDWithoutDomain(objectID string) string
CreateObjectIDWithDomain(domain string, objectID string, domainReplace bool) string
CreateObjectIDWithThisDomain(objectID string, domainReplace bool) string
//Deprecated
CreateObjectIDWithHubDomain(objectID string, domainReplace bool) string
CreateObjectIDWithLocalHubDomain(objectID string, domainReplace bool) string
// Get all domains in weak cluster including this one
GetWeakClusterDomains() []string
// Set all domains in weak cluster (this domain name will also be included automatically if not defined)
Expand Down
5 changes: 3 additions & 2 deletions statefun/runtime.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,11 +89,12 @@ func NewRuntime(config RuntimeConfig) (*Runtime, error) {
maxAge: config.kvStreamMaxAge,
}

r.Domain, err = NewDomain(r.nc, r.js, config.desiredHUBDomainName, ftStreamConfig, sysStreamConfig, kvStreamConfig)
r.Domain, err = NewDomain(r.nc, r.js, config.desiredCentralHUBDomainName, config.desiredLocalHUBDomainName, ftStreamConfig, sysStreamConfig, kvStreamConfig)
if err != nil {
return nil, err
}
r.config.desiredHUBDomainName = r.Domain.hubDomainName
r.config.desiredCentralHUBDomainName = r.Domain.centralHubDomainName
r.config.desiredLocalHUBDomainName = r.Domain.localHubDomainName

return r, nil
}
Expand Down
Loading
Loading