-
Notifications
You must be signed in to change notification settings - Fork 295
feat: Support Arbitrary Endpoint Matching for Edge Ingest - BED-7451 #2422
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -27,7 +27,6 @@ import ( | |
| "github.com/specterops/bloodhound/packages/go/graphschema/ad" | ||
| "github.com/specterops/bloodhound/packages/go/graphschema/common" | ||
| "github.com/specterops/dawgs/graph" | ||
| "github.com/specterops/dawgs/query" | ||
| "github.com/specterops/dawgs/util" | ||
| ) | ||
|
|
||
|
|
@@ -42,7 +41,7 @@ func IngestRelationships(ingestCtx *IngestContext, sourceKind graph.Kind, relati | |
| errs = util.NewErrorCollector() | ||
| ) | ||
|
|
||
| updates, err := resolveRelationships(ingestCtx, relationships, sourceKind) | ||
| updates, err := buildIngestionUpdateBatch(ingestCtx, relationships, sourceKind) | ||
| if err != nil { | ||
| errs.Add(err) | ||
| } | ||
|
|
@@ -192,202 +191,45 @@ func IngestSessions(batch *IngestContext, sessions []ein.IngestibleSession) erro | |
| return errs.Combined() | ||
| } | ||
|
|
||
| type endpointKey struct { | ||
| Name string | ||
| Kind string | ||
| } | ||
|
|
||
| func addKey(endpoint ein.IngestibleEndpoint, cache map[endpointKey]struct{}) { | ||
| if endpoint.MatchBy != ein.MatchByName { | ||
| return | ||
| } | ||
| key := endpointKey{ | ||
| Name: strings.ToUpper(endpoint.Value), | ||
| } | ||
| if endpoint.Kind != nil { | ||
| key.Kind = endpoint.Kind.String() | ||
| } | ||
| cache[key] = struct{}{} | ||
| } | ||
|
|
||
| // resolveAllEndpointsByName attempts to resolve all unique source and target | ||
| // endpoints from a list of ingestible relationships into their corresponding object IDs. | ||
| // | ||
| // Each endpoint is identified by a Name, (optional) Kind pair. A single batch query is | ||
| // used to resolve all endpoints in one round trip. | ||
| // | ||
| // If multiple nodes match a given Name, Kind pair with conflicting object IDs, | ||
| // the match is considered ambiguous and excluded from the result. This can happen because there are no | ||
| // uniqueness guarantees on a node's `Name` property. | ||
| // | ||
| // Returns a map of resolved object IDs. If no matches are found or the input is empty, an empty map is returned. | ||
| func resolveAllEndpointsByName(batch BatchUpdater, rels []ein.IngestibleRelationship) (map[endpointKey]string, error) { | ||
| // seen deduplicates Name:Kind pairs from the input batch to ensure that each Name:Kind pairs is resolved once. | ||
| seen := map[endpointKey]struct{}{} | ||
|
|
||
| if len(rels) == 0 { | ||
| return map[endpointKey]string{}, nil | ||
| } | ||
|
|
||
| for _, rel := range rels { | ||
| addKey(rel.Source, seen) | ||
| addKey(rel.Target, seen) | ||
| } | ||
| // if nothing to filter, return early | ||
| if len(seen) == 0 { | ||
| return map[endpointKey]string{}, nil | ||
| } | ||
|
|
||
| func buildIngestionUpdateBatch(batch *IngestContext, rels []ein.IngestibleRelationship, sourceKind graph.Kind) ([]graph.RelationshipUpdate, error) { | ||
| var ( | ||
| filters = make([]graph.Criteria, 0, len(seen)) | ||
| buildFilter = func(key endpointKey) graph.Criteria { | ||
| var criteria []graph.Criteria | ||
|
|
||
| criteria = append(criteria, query.Equals(query.NodeProperty(common.Name.String()), key.Name)) | ||
| if key.Kind != "" { | ||
| criteria = append(criteria, query.Kind(query.Node(), graph.StringKind(key.Kind))) | ||
| } | ||
| return query.And(criteria...) | ||
| } | ||
| updates []graph.RelationshipUpdate | ||
| errs = errorlist.NewBuilder() | ||
| ) | ||
|
|
||
| // aggregate all Name:Kind pairs in 1 DAWGs query for 1 round trip | ||
| for key := range seen { | ||
| filters = append(filters, buildFilter(key)) | ||
| } | ||
|
|
||
| var ( | ||
| resolved = map[endpointKey]string{} | ||
| ambiguous = map[endpointKey]bool{} | ||
| ) | ||
|
|
||
| if err := batch.Nodes().Filter(query.Or(filters...)).Fetch( | ||
| func(cursor graph.Cursor[*graph.Node]) error { | ||
|
|
||
| for node := range cursor.Chan() { | ||
| nameVal, _ := node.Properties.Get(common.Name.String()).String() | ||
| objectID, err := node.Properties.Get(string(common.ObjectID)).String() | ||
| if err != nil || objectID == "" { | ||
| slog.Warn("Matched node missing objectid", | ||
| slog.String("name", nameVal), | ||
| slog.Any("kinds", node.Kinds)) | ||
| continue | ||
| } | ||
|
|
||
| // edge case: resolve an empty key to match endpoints that provide no Kind filter | ||
| node.Kinds = append(node.Kinds, graph.EmptyKind) | ||
|
|
||
| // resolve all names found to objectids, | ||
| // record ambiguous matches (when more than one match is found, we cannot disambiguate the requested node and must skip the update) | ||
| for _, kind := range node.Kinds { | ||
| key := endpointKey{Name: strings.ToUpper(nameVal), Kind: kind.String()} | ||
| if existingID, exists := resolved[key]; exists && existingID != objectID { | ||
| ambiguous[key] = true | ||
| } else { | ||
| resolved[key] = objectID | ||
| } | ||
| } | ||
| } | ||
|
|
||
| return nil | ||
| }, | ||
| ); err != nil { | ||
| return nil, err | ||
| } | ||
|
|
||
| // remove ambiguous matches | ||
| for key := range ambiguous { | ||
| delete(resolved, key) | ||
| } | ||
|
|
||
| return resolved, nil | ||
| } | ||
| for _, rel := range rels { | ||
| rel.RelProps[common.LastSeen.String()] = batch.IngestTime | ||
|
|
||
| // resolveRelationships transforms a list of ingestible relationships into a | ||
| // slice of graph.RelationshipUpdate objects, suitable for ingestion into the | ||
| // graph database. | ||
| // | ||
| // The function resolves all source and target endpoints to their corresponding | ||
| // object IDs if MatchByName is set on an endpoint. Relationships with unresolved | ||
| // or ambiguous endpoints are skipped and logged with a warning. | ||
| // | ||
| // The identityKind parameter determines the identity kind used for both start | ||
| // and end nodes if provided. eg. ad.Base and az.Base are used for *hound collections, and generic ingest has no base kind. | ||
| // | ||
| // Each resolved relationship is stamped with the current UTC timestamp as the "last seen" property. | ||
| // | ||
| // Returns a slice of valid relationship updates or an error if resolution fails. | ||
| func resolveRelationships(batch *IngestContext, rels []ein.IngestibleRelationship, sourceKind graph.Kind) ([]graph.RelationshipUpdate, error) { | ||
| if cache, err := resolveAllEndpointsByName(batch.Batch, rels); err != nil { | ||
| return nil, err | ||
| } else { | ||
| var ( | ||
| updates []graph.RelationshipUpdate | ||
| errs = errorlist.NewBuilder() | ||
| startIdentityProperty = rel.Source.IdentityProperty() | ||
| startKinds = MergeNodeKinds(sourceKind, rel.Source.Kind) | ||
| startProperties = graph.AsProperties(map[string]any{ | ||
| startIdentityProperty: rel.Source.Value, | ||
| common.LastSeen.String(): batch.IngestTime, | ||
| }) | ||
|
|
||
| endIdentityProperty = rel.Target.IdentityProperty() | ||
| endKinds = MergeNodeKinds(sourceKind, rel.Target.Kind) | ||
| endProperties = graph.AsProperties(map[string]any{ | ||
| endIdentityProperty: rel.Target.Value, | ||
| common.LastSeen.String(): batch.IngestTime, | ||
| }) | ||
| ) | ||
|
|
||
| for _, rel := range rels { | ||
| srcID, srcOK := resolveEndpointID(rel.Source, cache) | ||
| targetID, targetOK := resolveEndpointID(rel.Target, cache) | ||
|
|
||
| if !srcOK || !targetOK { | ||
| slog.Warn("Skipping unresolved relationship", | ||
| slog.String("source", rel.Source.Value), | ||
| slog.String("target", rel.Target.Value), | ||
| slog.Bool("resolved_source", srcOK), | ||
| slog.Bool("resolved_target", targetOK), | ||
| slog.String("type", rel.RelType.String())) | ||
| errs.Add( | ||
| IngestUserDataError{ | ||
| Msg: fmt.Sprintf("skipping invalid relationship. unable to resolve endpoints. source: %s, target: %s", rel.Source.Value, rel.Target.Value), | ||
| }, | ||
| ) | ||
| continue | ||
| } | ||
|
|
||
| rel.RelProps[common.LastSeen.String()] = batch.IngestTime | ||
|
|
||
| startKinds := MergeNodeKinds(sourceKind, rel.Source.Kind) | ||
| endKinds := MergeNodeKinds(sourceKind, rel.Target.Kind) | ||
|
|
||
| update := graph.RelationshipUpdate{ | ||
| Start: graph.PrepareNode(graph.AsProperties(graph.PropertyMap{ | ||
| common.ObjectID: srcID, | ||
| common.LastSeen: batch.IngestTime, | ||
| }), startKinds...), | ||
| StartIdentityProperties: []string{common.ObjectID.String()}, | ||
| StartIdentityKind: sourceKind, | ||
| End: graph.PrepareNode(graph.AsProperties(graph.PropertyMap{ | ||
| common.ObjectID: targetID, | ||
| common.LastSeen: batch.IngestTime, | ||
| }), endKinds...), | ||
| EndIdentityKind: sourceKind, | ||
| EndIdentityProperties: []string{common.ObjectID.String()}, | ||
| Relationship: graph.PrepareRelationship(graph.AsProperties(rel.RelProps), rel.RelType), | ||
| } | ||
|
|
||
| updates = append(updates, update) | ||
| update := graph.RelationshipUpdate{ | ||
| Start: graph.PrepareNode(startProperties, startKinds...), | ||
| StartIdentityKind: sourceKind, | ||
| StartIdentityProperties: []string{startIdentityProperty}, | ||
| End: graph.PrepareNode(endProperties, endKinds...), | ||
| EndIdentityKind: sourceKind, | ||
| EndIdentityProperties: []string{endIdentityProperty}, | ||
| Relationship: graph.PrepareRelationship(graph.AsProperties(rel.RelProps), rel.RelType), | ||
| } | ||
|
|
||
| return updates, errs.Build() | ||
| } | ||
| } | ||
|
|
||
| func resolveEndpointID(endpoint ein.IngestibleEndpoint, cache map[endpointKey]string) (string, bool) { | ||
| if endpoint.MatchBy == ein.MatchByName { | ||
| key := endpointKey{ | ||
| Name: strings.ToUpper(endpoint.Value), | ||
| Kind: "", | ||
| } | ||
| if endpoint.Kind != nil { | ||
| key.Kind = endpoint.Kind.String() | ||
| } | ||
| id, ok := cache[key] | ||
| return id, ok | ||
| updates = append(updates, update) | ||
| } | ||
|
|
||
| // Fallback to raw value if matching by ID | ||
| return endpoint.Value, endpoint.Value != "" | ||
| return updates, errs.Build() | ||
| } | ||
|
Comment on lines
+194
to
233
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Line 197 initializes an error builder but no errors are added, and Line 201 can panic if 💡 Proposed fix func buildIngestionUpdateBatch(batch *IngestContext, rels []ein.IngestibleRelationship, sourceKind graph.Kind) ([]graph.RelationshipUpdate, error) {
var (
updates []graph.RelationshipUpdate
errs = errorlist.NewBuilder()
)
for _, rel := range rels {
+ if rel.RelProps == nil {
+ rel.RelProps = make(map[string]any)
+ }
+
+ startIdentityProperty := rel.Source.IdentityProperty()
+ endIdentityProperty := rel.Target.IdentityProperty()
+ if rel.Source.Value == "" || rel.Target.Value == "" || startIdentityProperty == "" || endIdentityProperty == "" {
+ errs.Add(fmt.Errorf("skipping invalid relationship: invalid endpoint identity"))
+ continue
+ }
+
rel.RelProps[common.LastSeen.String()] = batch.IngestTime
var (
- startIdentityProperty = rel.Source.IdentityProperty()
startKinds = MergeNodeKinds(sourceKind, rel.Source.Kind)
startProperties = graph.AsProperties(map[string]any{
startIdentityProperty: rel.Source.Value,
common.LastSeen.String(): batch.IngestTime,
})
- endIdentityProperty = rel.Target.IdentityProperty()
endKinds = MergeNodeKinds(sourceKind, rel.Target.Kind)
endProperties = graph.AsProperties(map[string]any{
endIdentityProperty: rel.Target.Value,
common.LastSeen.String(): batch.IngestTime,
})
)🤖 Prompt for AI Agents |
||
|
|
||
| // MergeNodeKinds combines a source kind with any additional kinds, | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do not uppercase values for
match_by: property.On Line 69 and Line 75, value normalization is unconditional. That mutates property-match values and can cause false misses for case-sensitive properties.
💡 Proposed fix
func ConvertGenericEdge(entity ein.GenericEdge, converted *ConvertedData) error { + startMatchBy := ein.IngestMatchStrategy(entity.Start.MatchBy) + startValue := entity.Start.Value + if startMatchBy != ein.MatchByGenericProperty { + startValue = strings.ToUpper(startValue) + } + + endMatchBy := ein.IngestMatchStrategy(entity.End.MatchBy) + endValue := entity.End.Value + if endMatchBy != ein.MatchByGenericProperty { + endValue = strings.ToUpper(endValue) + } + ingestibleRel := ein.NewIngestibleRelationship( ein.IngestibleEndpoint{ Property: entity.Start.Property, - Value: strings.ToUpper(entity.Start.Value), - MatchBy: ein.IngestMatchStrategy(entity.Start.MatchBy), + Value: startValue, + MatchBy: startMatchBy, Kind: graph.StringKind(entity.Start.Kind), }, ein.IngestibleEndpoint{ Property: entity.End.Property, - Value: strings.ToUpper(entity.End.Value), - MatchBy: ein.IngestMatchStrategy(entity.End.MatchBy), + Value: endValue, + MatchBy: endMatchBy, Kind: graph.StringKind(entity.End.Kind), },🤖 Prompt for AI Agents