From 8e34775bad2ac3c20d391f3984a5bf41f7245b1f Mon Sep 17 00:00:00 2001 From: Hector Sanjuan Date: Mon, 18 Jun 2018 15:56:54 +0200 Subject: [PATCH] Feat: FetchGraphToDepth() to fetch a graph to a given depth This comes in the context of #5133. It enables Merkledag to fetch DAGs down to a given depth. Note that actual usage of depth is expected to be 1, or 2 (and not an arbitrarily high value), thus I have opted to not complicate things with branch-pruning optimizations. They can be introduced if they are ever needed at another point in time. License: MIT Signed-off-by: Hector Sanjuan --- merkledag/merkledag.go | 104 +++++++++++++++++++++---- merkledag/merkledag_test.go | 151 ++++++++++++++++++++++++++++++++---- 2 files changed, 224 insertions(+), 31 deletions(-) diff --git a/merkledag/merkledag.go b/merkledag/merkledag.go index a59e3469e4b..fbb529b0d32 100644 --- a/merkledag/merkledag.go +++ b/merkledag/merkledag.go @@ -159,6 +159,13 @@ func (n *dagService) Session(ctx context.Context) ipld.NodeGetter { // FetchGraph fetches all nodes that are children of the given node func FetchGraph(ctx context.Context, root *cid.Cid, serv ipld.DAGService) error { + return FetchGraphToDepth(ctx, root, -1, serv) +} + +// FetchGraphToDepth fetches all nodes that are children to the given node +// down to the given depth. Depth 0 means "only fetch root". Depth 1 means +// fetch root and its direct children. And so on... +func FetchGraphToDepth(ctx context.Context, root *cid.Cid, depth int, serv ipld.DAGService) error { var ng ipld.NodeGetter = serv ds, ok := serv.(*dagService) if ok { @@ -167,7 +174,7 @@ func FetchGraph(ctx context.Context, root *cid.Cid, serv ipld.DAGService) error v, _ := ctx.Value(progressContextKey).(*ProgressTracker) if v == nil { - return EnumerateChildrenAsync(ctx, GetLinksDirect(ng), root, cid.NewSet().Visit) + return EnumerateChildrenAsyncToDepth(ctx, GetLinksDirect(ng), root, depth, cid.NewSet().Visit) } set := cid.NewSet() visit := func(c *cid.Cid) bool { @@ -177,7 +184,7 @@ func FetchGraph(ctx context.Context, root *cid.Cid, serv ipld.DAGService) error } return false } - return EnumerateChildrenAsync(ctx, GetLinksDirect(ng), root, visit) + return EnumerateChildrenAsyncToDepth(ctx, GetLinksDirect(ng), root, depth, visit) } // GetMany gets many nodes from the DAG at once. @@ -255,14 +262,36 @@ func GetLinksWithDAG(ng ipld.NodeGetter) GetLinks { // unseen children to the passed in set. // TODO: parallelize to avoid disk latency perf hits? func EnumerateChildren(ctx context.Context, getLinks GetLinks, root *cid.Cid, visit func(*cid.Cid) bool) error { + return EnumerateChildrenToDepth(ctx, getLinks, root, -1, visit) +} + +// EnumerateChildrenToDepth walks the dag below the given root to the given +// depth. The root is at level 0, the children are at level 1 and so on. +// Thus, setting depth to two, will walk the root, the children, and the +// children of the children. +// Setting depth to a negative number will walk the full tree. +func EnumerateChildrenToDepth(ctx context.Context, getLinks GetLinks, root *cid.Cid, depth int, visit func(*cid.Cid) bool) error { + if depth == 0 { + // Root nodes are not marked as visited (enumerate CHILDREN) + return nil + } + + if depth > 0 { + depth-- + } + links, err := getLinks(ctx, root) if err != nil { return err } for _, lnk := range links { c := lnk.Cid - if visit(c) { - err = EnumerateChildren(ctx, getLinks, c, visit) + if visit(c) || depth > 0 { + // Note, when enumerating to a depth, we must revisit + // children until depth is 0, even if they were visited + // before, since we cannot ensure the previous visit + // explored as deep as this one. + err = EnumerateChildrenToDepth(ctx, getLinks, c, depth, visit) if err != nil { return err } @@ -306,8 +335,26 @@ var FetchGraphConcurrency = 8 // // NOTE: It *does not* make multiple concurrent calls to the passed `visit` function. func EnumerateChildrenAsync(ctx context.Context, getLinks GetLinks, c *cid.Cid, visit func(*cid.Cid) bool) error { - feed := make(chan *cid.Cid) - out := make(chan []*ipld.Link) + return EnumerateChildrenAsyncToDepth(ctx, getLinks, c, -1, visit) +} + +// EnumerateChildrenAsyncToDepth is equivalent to EnumerateChildrenToDepth *except* that +// it fetches children in parallel. +// +// NOTE: It *does not* make multiple concurrent calls to the passed `visit` function. +func EnumerateChildrenAsyncToDepth(ctx context.Context, getLinks GetLinks, c *cid.Cid, depth int, visit func(*cid.Cid) bool) error { + type cidDepth struct { + c *cid.Cid + depth int + } + + type linksDepth struct { + links []*ipld.Link + depth int + } + + feed := make(chan *cidDepth) + out := make(chan *linksDepth) done := make(chan struct{}) var setlk sync.Mutex @@ -320,19 +367,36 @@ func EnumerateChildrenAsync(ctx context.Context, getLinks GetLinks, c *cid.Cid, for i := 0; i < FetchGraphConcurrency; i++ { go func() { for ic := range feed { + depth := ic.depth + setlk.Lock() - shouldVisit := visit(ic) + // we should always visit children when + // depth > 0, as even if they were visited + // before, we cannot ensure that the branch + // was explored to the needed depth. + shouldVisit := visit(ic.c) || depth > 0 setlk.Unlock() - if shouldVisit { - links, err := getLinks(ctx, ic) + switch { + case depth == 0: + case shouldVisit: + if depth > 0 { + depth-- + } + + links, err := getLinks(ctx, ic.c) if err != nil { errChan <- err return } + outLinks := &linksDepth{ + links: links, + depth: depth, + } + select { - case out <- links: + case out <- outLinks: case <-fetchersCtx.Done(): return } @@ -347,10 +411,13 @@ func EnumerateChildrenAsync(ctx context.Context, getLinks GetLinks, c *cid.Cid, defer close(feed) send := feed - var todobuffer []*cid.Cid + var todobuffer []*cidDepth var inProgress int - next := c + next := &cidDepth{ + c: c, + depth: depth, + } for { select { case send <- next: @@ -367,13 +434,17 @@ func EnumerateChildrenAsync(ctx context.Context, getLinks GetLinks, c *cid.Cid, if inProgress == 0 && next == nil { return nil } - case links := <-out: - for _, lnk := range links { + case outLinks := <-out: + for _, lnk := range outLinks.links { + cd := &cidDepth{ + c: lnk.Cid, + depth: outLinks.depth, + } if next == nil { - next = lnk.Cid + next = cd send = feed } else { - todobuffer = append(todobuffer, lnk.Cid) + todobuffer = append(todobuffer, cd) } } case err := <-errChan: @@ -383,7 +454,6 @@ func EnumerateChildrenAsync(ctx context.Context, getLinks GetLinks, c *cid.Cid, return ctx.Err() } } - } var _ ipld.LinkGetter = &dagService{} diff --git a/merkledag/merkledag_test.go b/merkledag/merkledag_test.go index 460df81be76..f520f7cdf7e 100644 --- a/merkledag/merkledag_test.go +++ b/merkledag/merkledag_test.go @@ -126,6 +126,36 @@ func TestBatchFetchDupBlock(t *testing.T) { runBatchFetchTest(t, read) } +// makeDepthTestingGraph makes a small DAG with two levels. The level-two +// nodes are both children of the root and of one of the level 1 nodes. +// This is meant to test the enumerate*ToDepth functions. +func makeDepthTestingGraph(t *testing.T, ds ipld.DAGService) ipld.Node { + root := NodeWithData(nil) + l11 := NodeWithData([]byte("leve1_node1")) + l12 := NodeWithData([]byte("leve1_node1")) + l21 := NodeWithData([]byte("leve2_node1")) + l22 := NodeWithData([]byte("leve2_node2")) + l23 := NodeWithData([]byte("leve2_node3")) + + l11.AddNodeLink(l21.Cid().String(), l21) + l11.AddNodeLink(l23.Cid().String(), l22) + l11.AddNodeLink(l23.Cid().String(), l23) + + root.AddNodeLink(l11.Cid().String(), l11) + root.AddNodeLink(l12.Cid().String(), l12) + root.AddNodeLink(l23.Cid().String(), l23) + + ctx := context.Background() + for _, n := range []ipld.Node{l23, l22, l21, l12, l11, root} { + err := ds.Add(ctx, n) + if err != nil { + t.Fatal(err) + } + } + + return root +} + // makeTestDAG creates a simple DAG from the data in a reader. // First, a node is created from each 512 bytes of data from the reader // (like a the Size chunker would do). Then all nodes are added as children @@ -293,6 +323,22 @@ func TestFetchGraph(t *testing.T) { } } +// Check that all children of root are in the given set and in the datastore +func traverseAndCheck(t *testing.T, root ipld.Node, ds ipld.DAGService, set *cid.Set) { + // traverse dag and check + for _, lnk := range root.Links() { + c := lnk.Cid + if !set.Has(c) { + t.Fatal("missing key in set! ", lnk.Cid.String()) + } + child, err := ds.Get(context.Background(), c) + if err != nil { + t.Fatal(err) + } + traverseAndCheck(t, child, ds, set) + } +} + func TestEnumerateChildren(t *testing.T) { bsi := bstest.Mocks(1) ds := NewDAGService(bsi[0]) @@ -307,23 +353,100 @@ func TestEnumerateChildren(t *testing.T) { t.Fatal(err) } - var traverse func(n ipld.Node) - traverse = func(n ipld.Node) { - // traverse dag and check - for _, lnk := range n.Links() { - c := lnk.Cid - if !set.Has(c) { - t.Fatal("missing key in set! ", lnk.Cid.String()) - } - child, err := ds.Get(context.Background(), c) - if err != nil { - t.Fatal(err) - } - traverse(child) + traverseAndCheck(t, root, ds, set) +} + +func TestEnumerateChildrenToDepth(t *testing.T) { + bsi := bstest.Mocks(1) + ds := NewDAGService(bsi[0]) + root := makeDepthTestingGraph(t, ds) + + type testcase struct { + depth int + expectedLen int + } + + tests := []testcase{ + testcase{1, 3}, + testcase{0, 0}, + testcase{-1, 5}, + } + + testF := func(t *testing.T, tc testcase) { + set := cid.NewSet() + err := EnumerateChildrenToDepth( + context.Background(), + ds.GetLinks, + root.Cid(), + tc.depth, + set.Visit, + ) + if err != nil { + t.Fatal(err) + } + if l := len(set.Keys()); l != tc.expectedLen { + t.Errorf("expected %d keys and got %d", tc.expectedLen, l) } } - traverse(root) + for _, tc := range tests { + testF(t, tc) + } +} + +func TestEnumerateChildrenAsync(t *testing.T) { + bsi := bstest.Mocks(1) + ds := NewDAGService(bsi[0]) + + read := io.LimitReader(u.NewTimeSeededRand(), 1024*1024) + root := makeTestDAG(t, read, ds) + + set := cid.NewSet() + + err := EnumerateChildrenAsync(context.Background(), ds.GetLinks, root.Cid(), set.Visit) + if err != nil { + t.Fatal(err) + } + + traverseAndCheck(t, root, ds, set) +} + +func TestEnumerateChildrenAsyncToDepth(t *testing.T) { + bsi := bstest.Mocks(1) + ds := NewDAGService(bsi[0]) + root := makeDepthTestingGraph(t, ds) + + type testcase struct { + depth int + expectedLen int + } + + tests := []testcase{ + testcase{1, 4}, + testcase{0, 1}, + testcase{-1, 6}, + } + + testF := func(t *testing.T, tc testcase) { + set := cid.NewSet() + err := EnumerateChildrenAsyncToDepth( + context.Background(), + ds.GetLinks, + root.Cid(), + tc.depth, + set.Visit, + ) + if err != nil { + t.Fatal(err) + } + if l := len(set.Keys()); l != tc.expectedLen { + t.Errorf("expected %d keys and got %d", tc.expectedLen, l) + } + } + + for _, tc := range tests { + testF(t, tc) + } } func TestFetchFailure(t *testing.T) {