Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
104 changes: 87 additions & 17 deletions merkledag/merkledag.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,13 @@ func (n *dagService) Session(ctx context.Context) ipld.NodeGetter {

// FetchGraph fetches all nodes that are children of the given node
func FetchGraph(ctx context.Context, root *cid.Cid, serv ipld.DAGService) error {
return FetchGraphToDepth(ctx, root, -1, serv)
}

// FetchGraphToDepth fetches all nodes that are children to the given node
// down to the given depth. Depth 0 means "only fetch root". Depth 1 means
// fetch root and its direct children. And so on...
func FetchGraphToDepth(ctx context.Context, root *cid.Cid, depth int, serv ipld.DAGService) error {
var ng ipld.NodeGetter = serv
ds, ok := serv.(*dagService)
if ok {
Expand All @@ -167,7 +174,7 @@ func FetchGraph(ctx context.Context, root *cid.Cid, serv ipld.DAGService) error

v, _ := ctx.Value(progressContextKey).(*ProgressTracker)
if v == nil {
return EnumerateChildrenAsync(ctx, GetLinksDirect(ng), root, cid.NewSet().Visit)
return EnumerateChildrenAsyncToDepth(ctx, GetLinksDirect(ng), root, depth, cid.NewSet().Visit)
}
set := cid.NewSet()
visit := func(c *cid.Cid) bool {
Expand All @@ -177,7 +184,7 @@ func FetchGraph(ctx context.Context, root *cid.Cid, serv ipld.DAGService) error
}
return false
}
return EnumerateChildrenAsync(ctx, GetLinksDirect(ng), root, visit)
return EnumerateChildrenAsyncToDepth(ctx, GetLinksDirect(ng), root, depth, visit)
}

// GetMany gets many nodes from the DAG at once.
Expand Down Expand Up @@ -255,14 +262,36 @@ func GetLinksWithDAG(ng ipld.NodeGetter) GetLinks {
// unseen children to the passed in set.
// TODO: parallelize to avoid disk latency perf hits?
func EnumerateChildren(ctx context.Context, getLinks GetLinks, root *cid.Cid, visit func(*cid.Cid) bool) error {
return EnumerateChildrenToDepth(ctx, getLinks, root, -1, visit)
}

// EnumerateChildrenToDepth walks the dag below the given root to the given
// depth. The root is at level 0, the children are at level 1 and so on.
// Thus, setting depth to two, will walk the root, the children, and the
// children of the children.
// Setting depth to a negative number will walk the full tree.
func EnumerateChildrenToDepth(ctx context.Context, getLinks GetLinks, root *cid.Cid, depth int, visit func(*cid.Cid) bool) error {
if depth == 0 {
// Root nodes are not marked as visited (enumerate CHILDREN)
return nil
}

if depth > 0 {
depth--
}

links, err := getLinks(ctx, root)
if err != nil {
return err
}
for _, lnk := range links {
c := lnk.Cid
if visit(c) {
err = EnumerateChildren(ctx, getLinks, c, visit)
if visit(c) || depth > 0 {
// Note, when enumerating to a depth, we must revisit
// children until depth is 0, even if they were visited
// before, since we cannot ensure the previous visit
// explored as deep as this one.
err = EnumerateChildrenToDepth(ctx, getLinks, c, depth, visit)
if err != nil {
return err
}
Expand Down Expand Up @@ -306,8 +335,26 @@ var FetchGraphConcurrency = 8
//
// NOTE: It *does not* make multiple concurrent calls to the passed `visit` function.
func EnumerateChildrenAsync(ctx context.Context, getLinks GetLinks, c *cid.Cid, visit func(*cid.Cid) bool) error {
feed := make(chan *cid.Cid)
out := make(chan []*ipld.Link)
return EnumerateChildrenAsyncToDepth(ctx, getLinks, c, -1, visit)
}

// EnumerateChildrenAsyncToDepth is equivalent to EnumerateChildrenToDepth *except* that
// it fetches children in parallel.
//
// NOTE: It *does not* make multiple concurrent calls to the passed `visit` function.
func EnumerateChildrenAsyncToDepth(ctx context.Context, getLinks GetLinks, c *cid.Cid, depth int, visit func(*cid.Cid) bool) error {
type cidDepth struct {
c *cid.Cid
depth int
}

type linksDepth struct {
links []*ipld.Link
depth int
}

feed := make(chan *cidDepth)
out := make(chan *linksDepth)
done := make(chan struct{})

var setlk sync.Mutex
Expand All @@ -320,19 +367,36 @@ func EnumerateChildrenAsync(ctx context.Context, getLinks GetLinks, c *cid.Cid,
for i := 0; i < FetchGraphConcurrency; i++ {
go func() {
for ic := range feed {
depth := ic.depth

setlk.Lock()
shouldVisit := visit(ic)
// we should always visit children when
// depth > 0, as even if they were visited
// before, we cannot ensure that the branch
// was explored to the needed depth.
shouldVisit := visit(ic.c) || depth > 0
setlk.Unlock()

if shouldVisit {
links, err := getLinks(ctx, ic)
switch {
case depth == 0:
case shouldVisit:
if depth > 0 {
depth--
}

links, err := getLinks(ctx, ic.c)
if err != nil {
errChan <- err
return
}

outLinks := &linksDepth{
links: links,
depth: depth,
}

select {
case out <- links:
case out <- outLinks:
case <-fetchersCtx.Done():
return
}
Expand All @@ -347,10 +411,13 @@ func EnumerateChildrenAsync(ctx context.Context, getLinks GetLinks, c *cid.Cid,
defer close(feed)

send := feed
var todobuffer []*cid.Cid
var todobuffer []*cidDepth
var inProgress int

next := c
next := &cidDepth{
c: c,
depth: depth,
}
for {
select {
case send <- next:
Expand All @@ -367,13 +434,17 @@ func EnumerateChildrenAsync(ctx context.Context, getLinks GetLinks, c *cid.Cid,
if inProgress == 0 && next == nil {
return nil
}
case links := <-out:
for _, lnk := range links {
case outLinks := <-out:
for _, lnk := range outLinks.links {
cd := &cidDepth{
c: lnk.Cid,
depth: outLinks.depth,
}
if next == nil {
next = lnk.Cid
next = cd
send = feed
} else {
todobuffer = append(todobuffer, lnk.Cid)
todobuffer = append(todobuffer, cd)
}
}
case err := <-errChan:
Expand All @@ -383,7 +454,6 @@ func EnumerateChildrenAsync(ctx context.Context, getLinks GetLinks, c *cid.Cid,
return ctx.Err()
}
}

}

var _ ipld.LinkGetter = &dagService{}
Expand Down
151 changes: 137 additions & 14 deletions merkledag/merkledag_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,36 @@ func TestBatchFetchDupBlock(t *testing.T) {
runBatchFetchTest(t, read)
}

// makeDepthTestingGraph makes a small DAG with two levels. The level-two
// nodes are both children of the root and of one of the level 1 nodes.
// This is meant to test the enumerate*ToDepth functions.
func makeDepthTestingGraph(t *testing.T, ds ipld.DAGService) ipld.Node {
root := NodeWithData(nil)
l11 := NodeWithData([]byte("leve1_node1"))
l12 := NodeWithData([]byte("leve1_node1"))
l21 := NodeWithData([]byte("leve2_node1"))
l22 := NodeWithData([]byte("leve2_node2"))
l23 := NodeWithData([]byte("leve2_node3"))

l11.AddNodeLink(l21.Cid().String(), l21)
l11.AddNodeLink(l23.Cid().String(), l22)
l11.AddNodeLink(l23.Cid().String(), l23)

root.AddNodeLink(l11.Cid().String(), l11)
root.AddNodeLink(l12.Cid().String(), l12)
root.AddNodeLink(l23.Cid().String(), l23)

ctx := context.Background()
for _, n := range []ipld.Node{l23, l22, l21, l12, l11, root} {
err := ds.Add(ctx, n)
if err != nil {
t.Fatal(err)
}
}

return root
}

// makeTestDAG creates a simple DAG from the data in a reader.
// First, a node is created from each 512 bytes of data from the reader
// (like a the Size chunker would do). Then all nodes are added as children
Expand Down Expand Up @@ -293,6 +323,22 @@ func TestFetchGraph(t *testing.T) {
}
}

// Check that all children of root are in the given set and in the datastore
func traverseAndCheck(t *testing.T, root ipld.Node, ds ipld.DAGService, set *cid.Set) {
// traverse dag and check
for _, lnk := range root.Links() {
c := lnk.Cid
if !set.Has(c) {
t.Fatal("missing key in set! ", lnk.Cid.String())
}
child, err := ds.Get(context.Background(), c)
if err != nil {
t.Fatal(err)
}
traverseAndCheck(t, child, ds, set)
}
}

func TestEnumerateChildren(t *testing.T) {
bsi := bstest.Mocks(1)
ds := NewDAGService(bsi[0])
Expand All @@ -307,23 +353,100 @@ func TestEnumerateChildren(t *testing.T) {
t.Fatal(err)
}

var traverse func(n ipld.Node)
traverse = func(n ipld.Node) {
// traverse dag and check
for _, lnk := range n.Links() {
c := lnk.Cid
if !set.Has(c) {
t.Fatal("missing key in set! ", lnk.Cid.String())
}
child, err := ds.Get(context.Background(), c)
if err != nil {
t.Fatal(err)
}
traverse(child)
traverseAndCheck(t, root, ds, set)
}

func TestEnumerateChildrenToDepth(t *testing.T) {
bsi := bstest.Mocks(1)
ds := NewDAGService(bsi[0])
root := makeDepthTestingGraph(t, ds)

type testcase struct {
depth int
expectedLen int
}

tests := []testcase{
testcase{1, 3},
testcase{0, 0},
testcase{-1, 5},
}

testF := func(t *testing.T, tc testcase) {
set := cid.NewSet()
err := EnumerateChildrenToDepth(
context.Background(),
ds.GetLinks,
root.Cid(),
tc.depth,
set.Visit,
)
if err != nil {
t.Fatal(err)
}
if l := len(set.Keys()); l != tc.expectedLen {
t.Errorf("expected %d keys and got %d", tc.expectedLen, l)
}
}

traverse(root)
for _, tc := range tests {
testF(t, tc)
}
}

func TestEnumerateChildrenAsync(t *testing.T) {
bsi := bstest.Mocks(1)
ds := NewDAGService(bsi[0])

read := io.LimitReader(u.NewTimeSeededRand(), 1024*1024)
root := makeTestDAG(t, read, ds)

set := cid.NewSet()

err := EnumerateChildrenAsync(context.Background(), ds.GetLinks, root.Cid(), set.Visit)
if err != nil {
t.Fatal(err)
}

traverseAndCheck(t, root, ds, set)
}

func TestEnumerateChildrenAsyncToDepth(t *testing.T) {
bsi := bstest.Mocks(1)
ds := NewDAGService(bsi[0])
root := makeDepthTestingGraph(t, ds)

type testcase struct {
depth int
expectedLen int
}

tests := []testcase{
testcase{1, 4},
testcase{0, 1},
testcase{-1, 6},
}

testF := func(t *testing.T, tc testcase) {
set := cid.NewSet()
err := EnumerateChildrenAsyncToDepth(
context.Background(),
ds.GetLinks,
root.Cid(),
tc.depth,
set.Visit,
)
if err != nil {
t.Fatal(err)
}
if l := len(set.Keys()); l != tc.expectedLen {
t.Errorf("expected %d keys and got %d", tc.expectedLen, l)
}
}

for _, tc := range tests {
testF(t, tc)
}
}

func TestFetchFailure(t *testing.T) {
Expand Down