From fb193dc1e7a45cd899061e8ca0383442453a3eb6 Mon Sep 17 00:00:00 2001 From: Alan Shaw Date: Wed, 8 May 2024 12:11:29 +0100 Subject: [PATCH 1/5] feat: add block streaming interface --- bitswap-fetcher.js | 20 ++++++ index.d.ts | 140 +++++++++++++++++++++++++-------------- index.js | 57 +++++++++++----- package-lock.json | 19 ++---- package.json | 2 +- test/getPath.test.js | 10 +-- test/streamBlock.test.js | 41 ++++++++++++ test/testmark.test.js | 2 +- 8 files changed, 200 insertions(+), 91 deletions(-) create mode 100644 test/streamBlock.test.js diff --git a/bitswap-fetcher.js b/bitswap-fetcher.js index e4918b2..3eecf3f 100644 --- a/bitswap-fetcher.js +++ b/bitswap-fetcher.js @@ -123,6 +123,26 @@ export class BitswapFetcher { return deferred.promise } + /** + * @param {import('multiformats').UnknownLink} cid + * @param {{ range?: import('./index').Range, signal?: AbortSignal }} [options] + */ + async stream (cid, options) { + const block = await this.get(cid, options) + if (!block) return + + return /** @type {ReadableStream} */ (new ReadableStream({ + pull (controller) { + const { range } = options ?? {} + const bytes = range + ? block.bytes.slice(range[0], range[1] && (range[1] + 1)) + : block.bytes + controller.enqueue(bytes) + controller.close() + } + })) + } + /** @type {import('@libp2p/interface-registrar').StreamHandler} */ async handler ({ stream }) { log('incoming stream') diff --git a/index.d.ts b/index.d.ts index 74e99c6..60c1a82 100644 --- a/index.d.ts +++ b/index.d.ts @@ -8,6 +8,8 @@ import type { Stream } from '@libp2p/interface-connection' import type { StreamHandler } from '@libp2p/interface-registrar' import type { PeerId } from '@libp2p/interface-peer-id' +export type { AbortOptions } + export interface BlockDecoders { [code: number]: BlockDecoder } @@ -21,8 +23,16 @@ export interface Block { bytes: Uint8Array } -export interface Blockstore { - get: (cid: UnknownLink, options?: { signal?: AbortSignal }) => Promise +export interface Blockstore extends BlockGetter, BlockStreamer {} + +export interface BlockGetter { + /** Retrieve a block. */ + get: (cid: UnknownLink, options?: AbortOptions) => Promise +} + +export interface BlockStreamer { + /** Stream bytes from a block. */ + stream: (cid: UnknownLink, options?: AbortOptions & RangeOptions) => Promise|undefined> } export interface Network { @@ -63,27 +73,66 @@ export interface DagScopeOptions { } /** - * Specifies a range of bytes. - * - `*` can be substituted for end-of-file - * - `{ from: 0, to: '*' }` is the entire file. - * - Negative numbers can be used for referring to bytes from the end of a file - * - `{ from: -1024, to: '*' }` is the last 1024 bytes of a file. - * - It is also permissible to ask for the range of 500 bytes from the - * beginning of the file to 1000 bytes from the end: `{ from: 499, to: -1000 }` + * An absolute byte range to extract - always an array of two values + * corresponding to the first and last bytes (both inclusive). e.g. + * + * ``` + * [100, 200] + * ``` */ -export interface ByteRange { - /** Byte-offset of the first byte in a range (inclusive) */ - from: number - /** Byte-offset of the last byte in the range (inclusive) */ - to: number|'*' -} +export type AbsoluteRange = [first: number, last: number] + +/** + * An suffix byte range - always an array of one value corresponding to the + * first byte to start extraction from (inclusive). e.g. + * + * ``` + * [900] + * ``` + * + * If it is unknown how large a resource is, the last `n` bytes + * can be requested by specifying a negative value: + * + * ``` + * [-100] + * ``` + */ +export type SuffixRange = [first: number] + +/** + * Byte range to extract - an array of one or two values corresponding to the + * first and last bytes (both inclusive). e.g. + * + * ``` + * [100, 200] + * ``` + * + * Omitting the second value requests all remaining bytes of the resource. e.g. + * + * ``` + * [900] + * ``` + * + * Alternatively, if it's unknown how large a resource is, the last `n` bytes + * can be requested by specifying a negative value: + * + * ``` + * [-100] + * ``` + */ +export type Range = AbsoluteRange | SuffixRange export interface EntityBytesOptions { /** * A specific byte range from the entity. Setting entity bytes implies DAG * scope: entity. */ - entityBytes?: ByteRange + entityBytes?: Range +} + +export interface RangeOptions { + /** Extracts a specific byte range from the resource. */ + range: Range } /** @@ -110,49 +159,42 @@ export interface BlockOrderOptions { order?: BlockOrder } -export interface IDagula { - /** - * Get a complete DAG by root CID. - */ +/** @deprecated Use `BlockService`, `DagService` and `UnixfsService` interface instead. */ +export interface IDagula extends BlockService, DagService, UnixfsService {} + +export interface BlockService { + /** Get a single block. */ + getBlock (cid: UnknownLink|string, options?: AbortOptions): Promise + /** Stream bytes from a single block. */ + streamBlock (cid: UnknownLink|string, options?: AbortOptions & RangeOptions): Promise> +} + +export interface DagService { + /** Get a complete DAG by root CID. */ get (cid: UnknownLink|string, options?: AbortOptions & BlockOrderOptions): AsyncIterableIterator - /** - * Get a DAG for a cid+path. - */ + /** Get a DAG for a cid+path. */ getPath (cidPath: string, options?: AbortOptions & DagScopeOptions & EntityBytesOptions & BlockOrderOptions): AsyncIterableIterator - /** - * Get a single block. - */ - getBlock (cid: UnknownLink|string, options?: AbortOptions): Promise - /** - * Get UnixFS files and directories. - */ +} + +export interface UnixfsService { + /** Get UnixFS files and directories. */ getUnixfs (path: UnknownLink|string, options?: AbortOptions): Promise - /** - * Emit nodes for all path segements and get UnixFS files and directories. - */ + /** Emit nodes for all path segements and get UnixFS files and directories. */ walkUnixfsPath (path: UnknownLink|string, options?: AbortOptions): AsyncIterableIterator } -export declare class Dagula implements IDagula { +export declare class Dagula implements BlockService, DagService, UnixfsService { constructor (blockstore: Blockstore, options?: { decoders?: BlockDecoders, hashers?: MultihashHashers }) - /** - * Get a complete DAG by root CID. - */ + /** Get a complete DAG by root CID. */ get (cid: UnknownLink|string, options?: AbortOptions & BlockOrderOptions): AsyncIterableIterator - /** - * Get a DAG for a cid+path. - */ + /** Get a DAG for a cid+path. */ getPath (cidPath: string, options?: AbortOptions & DagScopeOptions & EntityBytesOptions & BlockOrderOptions): AsyncIterableIterator - /** - * Get a single block. - */ + /** Get a single block. */ getBlock (cid: UnknownLink|string, options?: AbortOptions): Promise - /** - * Get UnixFS files and directories. - */ + /** Stream bytes from a single block. */ + streamBlock (cid: UnknownLink|string, options?: AbortOptions & RangeOptions): Promise> + /** Get UnixFS files and directories. */ getUnixfs (path: UnknownLink|string, options?: AbortOptions): Promise - /** - * Emit nodes for all path segements and get UnixFS files and directories. - */ + /** Emit nodes for all path segements and get UnixFS files and directories. */ walkUnixfsPath (path: UnknownLink|string, options?: AbortOptions): AsyncIterableIterator } diff --git a/index.js b/index.js index 3b1a248..dd5a335 100644 --- a/index.js +++ b/index.js @@ -10,14 +10,15 @@ import { identity } from 'multiformats/hashes/identity' import { depthFirst, breadthFirst } from './traversal.js' /** + * @typedef {import('./index').DagService} DagService * @typedef {{ unixfs?: UnixFS }} LinkFilterContext * @typedef {([name, cid]: [string, import('multiformats').UnknownLink], context: LinkFilterContext) => boolean} LinkFilter - * @typedef {[from: number, to: number]} Range - * @typedef {{ cid: import('multiformats').UnknownLink, range?: Range }} GraphSelector + * @typedef {{ cid: import('multiformats').UnknownLink, range?: import('./index').AbsoluteRange }} GraphSelector */ const log = debug('dagula') +/** @implements {DagService} */ export class Dagula { /** @type {import('./index').Blockstore} */ #blockstore @@ -44,7 +45,7 @@ export class Dagula { * @param {object} [options] * @param {AbortSignal} [options.signal] * @param {import('./index').BlockOrder} [options.order] - * @param {import('./index').ByteRange} [options.entityBytes] + * @param {import('./index').Range} [options.entityBytes] * @param {LinkFilter} [options.filter] */ async * get (cid, options = {}) { @@ -137,7 +138,7 @@ export class Dagula { * 'entity': Mimic gateway semantics: Return All blocks for a multi-block file or just enough blocks to enumerate a dir/map but not the dir contents. * Where path points to a single block file, all three selectors would return the same thing. * where path points to a sharded hamt: 'file' returns the blocks of the hamt so the dir can be listed. 'block' returns the root block of the hamt. - * @param {import('./index').ByteRange} [options.entityBytes] + * @param {import('./index').Range} [options.entityBytes] */ async * getPath (cidPath, options = {}) { const dagScope = options.dagScope ?? (options.entityBytes ? 'entity' : 'all') @@ -188,20 +189,20 @@ export class Dagula { if (!base) throw new Error('walkPath did not yield an entry') if (dagScope === 'all' || (dagScope === 'entity' && base.type !== 'directory')) { - /** @type {Range|undefined} */ + /** @type {import('./index').AbsoluteRange|undefined} */ let range if (entityBytes) { const size = Number(base.size) // resolve entity bytes to actual byte offsets range = [ - entityBytes.from < 0 - ? size - 1 + entityBytes.from - : entityBytes.from, - entityBytes.to === '*' + entityBytes[0] < 0 + ? size - 1 + entityBytes[0] + : entityBytes[0], + entityBytes[1] == null ? size - 1 - : entityBytes.to < 0 - ? size - 1 + entityBytes.to - : entityBytes.to + : entityBytes[1] < 0 + ? size - 1 + entityBytes[1] + : entityBytes[1] ] } const selectors = getUnixfsEntryLinkSelectors(base, this.#decoders, range) @@ -238,6 +239,28 @@ export class Dagula { return block } + /** + * @param {import('multiformats').UnknownLink|string} cid + * @param {import('./index').AbortOptions & import('./index').RangeOptions} [options] + */ + async streamBlock (cid, options) { + cid = typeof cid === 'string' ? CID.parse(cid) : cid + log('streaming block %s', cid) + if (cid.code === identity.code) { + return new ReadableStream({ + pull (controller) { + controller.enqueue(cid.multihash.digest) + controller.close() + } + }) + } + const readable = await this.#blockstore.stream(cid, options) + if (!readable) { + throw Object.assign(new Error(`peer does not have: ${cid}`), { code: 'ERR_DONT_HAVE' }) + } + return readable + } + /** * @param {string|import('multiformats').UnknownLink} path * @param {{ signal?: AbortSignal }} [options] @@ -347,7 +370,7 @@ function toRanges (blockSizes) { const ranges = [] let offset = 0 for (const size of blockSizes) { - /** @type {Range} */ + /** @type {import('./index').AbsoluteRange} */ const absRange = [offset, offset + size - 1] ranges.push(absRange) offset += size @@ -367,9 +390,9 @@ function toRanges (blockSizes) { * toRelativeRange([100,200], [300,400]): undefined * ``` * - * @param {Range} a - * @param {Range} b - * @returns {Range|undefined} + * @param {import('./index').AbsoluteRange} a + * @param {import('./index').AbsoluteRange} b + * @returns {import('./index').AbsoluteRange|undefined} */ const toRelativeRange = (a, b) => { // starts in range @@ -396,7 +419,7 @@ const toRelativeRange = (a, b) => { * * @param {import('ipfs-unixfs-exporter').UnixFSEntry} entry * @param {import('./index').BlockDecoders} decoders - * @param {Range} [range] + * @param {import('./index').AbsoluteRange} [range] * @returns {GraphSelector[]} */ function getUnixfsEntryLinkSelectors (entry, decoders, range) { diff --git a/package-lock.json b/package-lock.json index c1381ba..7914351 100644 --- a/package-lock.json +++ b/package-lock.json @@ -36,7 +36,7 @@ "p-defer": "^4.0.0", "protobufjs": "^7.0.0", "sade": "^1.8.1", - "streaming-iterables": "^7.0.4", + "streaming-iterables": "^8.0.1", "timeout-abort-controller": "^3.0.0", "varint": "^6.0.0" }, @@ -6658,15 +6658,6 @@ "npm": ">=7.0.0" } }, - "node_modules/miniswap/node_modules/streaming-iterables": { - "version": "8.0.1", - "resolved": "https://registry.npmjs.org/streaming-iterables/-/streaming-iterables-8.0.1.tgz", - "integrity": "sha512-yfQdmUB1b+rGLZkD/r6YisT/eNOjZxBAckXKlzYNmRJnwSzHaiScykD8gsQceFcShtK09qAbLhOqvzIpnBPoDQ==", - "dev": true, - "engines": { - "node": ">=18" - } - }, "node_modules/mortice": { "version": "3.0.1", "resolved": "https://registry.npmjs.org/mortice/-/mortice-3.0.1.tgz", @@ -7976,11 +7967,11 @@ "integrity": "sha512-v+dm9bNVfOYsY1OrhaCrmyOcYoSeVvbt+hHZ0Au+T+p1y+0Uyj9aMaGIeUTT6xdpRbWzDeYKvfOslPhggQMcsg==" }, "node_modules/streaming-iterables": { - "version": "7.1.0", - "resolved": "https://registry.npmjs.org/streaming-iterables/-/streaming-iterables-7.1.0.tgz", - "integrity": "sha512-t2KmiLVhqafTRqGefD98s5XAMskfkfprr/BTzPIZz0kWB23iyR7XUkY03yjUf4aZpAuuV2/2SUOVri3LgKuOKw==", + "version": "8.0.1", + "resolved": "https://registry.npmjs.org/streaming-iterables/-/streaming-iterables-8.0.1.tgz", + "integrity": "sha512-yfQdmUB1b+rGLZkD/r6YisT/eNOjZxBAckXKlzYNmRJnwSzHaiScykD8gsQceFcShtK09qAbLhOqvzIpnBPoDQ==", "engines": { - "node": ">=14" + "node": ">=18" } }, "node_modules/streamsearch": { diff --git a/package.json b/package.json index 2a5f141..12b3a58 100644 --- a/package.json +++ b/package.json @@ -44,7 +44,7 @@ "p-defer": "^4.0.0", "protobufjs": "^7.0.0", "sade": "^1.8.1", - "streaming-iterables": "^7.0.4", + "streaming-iterables": "^8.0.1", "timeout-abort-controller": "^3.0.0", "varint": "^6.0.0" }, diff --git a/test/getPath.test.js b/test/getPath.test.js index 7298a5f..66cc774 100644 --- a/test/getPath.test.js +++ b/test/getPath.test.js @@ -11,6 +11,7 @@ import { sha256 } from 'multiformats/hashes/sha2' import { identity } from 'multiformats/hashes/identity' import { CID } from 'multiformats/cid' import * as Block from 'multiformats/block' +import { collect } from 'streaming-iterables' import { Dagula } from '../index.js' import { getLibp2p, fromNetwork } from '../p2p.js' import { startBitswapPeer } from './_libp2p.js' @@ -569,12 +570,3 @@ test('should yield intermediate blocks when last path component does not exist', t.is(blocks.length, 1) t.is(blocks[0].cid.toString(), fileLink.cid.toString()) }) - -/** @param {AsyncIterable} source */ -async function collect (source) { - const blocks = [] - for await (const block of source) { - blocks.push(block) - } - return blocks -} diff --git a/test/streamBlock.test.js b/test/streamBlock.test.js new file mode 100644 index 0000000..7db48ee --- /dev/null +++ b/test/streamBlock.test.js @@ -0,0 +1,41 @@ +import test from 'ava' +import { fromString, toString } from 'multiformats/bytes' +import * as raw from 'multiformats/codecs/raw' +import { sha256 } from 'multiformats/hashes/sha2' +import { CID } from 'multiformats/cid' +import { collect } from 'streaming-iterables' +import { getLibp2p, fromNetwork } from '../p2p.js' +import { startBitswapPeer } from './_libp2p.js' + +test('should stream a block', async t => { + const bytes = fromString(`TEST DATA ${Date.now()}`) + const hash = await sha256.digest(bytes) + const cid = CID.create(1, raw.code, hash) + const peer = await startBitswapPeer([{ cid, bytes }]) + + const libp2p = await getLibp2p() + const dagula = await fromNetwork(libp2p, { peer: peer.libp2p.getMultiaddrs()[0] }) + const readable = await dagula.streamBlock(cid) + t.assert(readable) + + // @ts-expect-error one day all things will implement + const chunks = await collect(readable) + t.is(await new Blob(chunks).text(), toString(bytes)) +}) + +test('should stream a byte range from a block', async t => { + const bytes = fromString(`TEST DATA ${Date.now()}`) + const range = /** @type {import('../index').AbsoluteRange} */ ([5, 8]) + const hash = await sha256.digest(bytes) + const cid = CID.create(1, raw.code, hash) + const peer = await startBitswapPeer([{ cid, bytes }]) + + const libp2p = await getLibp2p() + const dagula = await fromNetwork(libp2p, { peer: peer.libp2p.getMultiaddrs()[0] }) + const readable = await dagula.streamBlock(cid, { range }) + t.assert(readable) + + // @ts-expect-error one day all things will implement + const chunks = await collect(readable) + t.is(await new Blob(chunks).text(), toString(bytes.slice(range[0], range[1] + 1))) +}) diff --git a/test/testmark.test.js b/test/testmark.test.js index c13ca46..250d38c 100644 --- a/test/testmark.test.js +++ b/test/testmark.test.js @@ -20,7 +20,7 @@ const parseQuery = query => { const entityBytes = url.searchParams.get('entity-bytes') if (entityBytes && entityBytes !== 'null') { const [from, to] = entityBytes.split(':') - options.entityBytes = { from: parseInt(from), to: to === '*' ? to : parseInt(to) } + options.entityBytes = to === '*' ? [parseInt(from)] : [parseInt(from), parseInt(to)] } const cidPath = url.pathname.replace('/ipfs/', '').split('/').map(decodeURIComponent).join('/') return { cidPath, options } From 4e2797d5e534e6a209d3fdd6502d045fb116f341 Mon Sep 17 00:00:00 2001 From: Alan Shaw Date: Wed, 8 May 2024 12:21:23 +0100 Subject: [PATCH 2/5] fix: implements all the interfaces --- index.js | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/index.js b/index.js index dd5a335..fc910ad 100644 --- a/index.js +++ b/index.js @@ -10,7 +10,9 @@ import { identity } from 'multiformats/hashes/identity' import { depthFirst, breadthFirst } from './traversal.js' /** + * @typedef {import('./index').BlockService} BlockService * @typedef {import('./index').DagService} DagService + * @typedef {import('./index').UnixfsService} UnixfsService * @typedef {{ unixfs?: UnixFS }} LinkFilterContext * @typedef {([name, cid]: [string, import('multiformats').UnknownLink], context: LinkFilterContext) => boolean} LinkFilter * @typedef {{ cid: import('multiformats').UnknownLink, range?: import('./index').AbsoluteRange }} GraphSelector @@ -18,7 +20,11 @@ import { depthFirst, breadthFirst } from './traversal.js' const log = debug('dagula') -/** @implements {DagService} */ +/** + * @implements {BlockService} + * @implements {DagService} + * @implements {UnixfsService} + */ export class Dagula { /** @type {import('./index').Blockstore} */ #blockstore From 8c0075d5b4441d49600c2676f5d98526e8569196 Mon Sep 17 00:00:00 2001 From: Alan Shaw Date: Wed, 8 May 2024 14:59:00 +0100 Subject: [PATCH 3/5] feat: add statBlock method --- bitswap-fetcher.js | 14 ++++++++++++++ index.d.ts | 18 ++++++++++++++++-- index.js | 28 +++++++++++++++------------- test/statBlock.test.js | 20 ++++++++++++++++++++ 4 files changed, 65 insertions(+), 15 deletions(-) create mode 100644 test/statBlock.test.js diff --git a/bitswap-fetcher.js b/bitswap-fetcher.js index 3eecf3f..30c7afa 100644 --- a/bitswap-fetcher.js +++ b/bitswap-fetcher.js @@ -2,6 +2,7 @@ import defer from 'p-defer' import { pipe } from 'it-pipe' import * as lp from 'it-length-prefixed' import { base58btc } from 'multiformats/bases/base58' +import { identity } from 'multiformats/hashes/identity' import debug from 'debug' import { Entry, Message, BlockPresenceType } from './message.js' import * as Prefix from './prefix.js' @@ -91,6 +92,10 @@ export class BitswapFetcher { throw options.signal.reason || abortError() } + if (cid.code === identity.code) { + return { cid, bytes: cid.multihash.digest } + } + // ensure we can hash the data when we receive the block if (!this.#hashers[cid.multihash.code]) { throw new Error(`missing hasher: 0x${cid.multihash.code.toString(16)} for wanted block: ${cid}`) @@ -143,6 +148,15 @@ export class BitswapFetcher { })) } + /** + * @param {import('multiformats').UnknownLink} cid + * @param {{ signal?: AbortSignal }} [options] + */ + async stat (cid, options) { + const block = await this.get(cid, options) + if (block) return { size: block.bytes.length } + } + /** @type {import('@libp2p/interface-registrar').StreamHandler} */ async handler ({ stream }) { log('incoming stream') diff --git a/index.d.ts b/index.d.ts index 60c1a82..6a8de05 100644 --- a/index.d.ts +++ b/index.d.ts @@ -23,7 +23,12 @@ export interface Block { bytes: Uint8Array } -export interface Blockstore extends BlockGetter, BlockStreamer {} +export interface BlockStat { + /** Total size in bytes of the block. */ + size: number +} + +export interface Blockstore extends BlockGetter, BlockStreamer, BlockInspecter {} export interface BlockGetter { /** Retrieve a block. */ @@ -35,6 +40,11 @@ export interface BlockStreamer { stream: (cid: UnknownLink, options?: AbortOptions & RangeOptions) => Promise|undefined> } +export interface BlockInspecter { + /** Retrieve information about a block. */ + stat: (cid: UnknownLink, options?: AbortOptions) => Promise +} + export interface Network { dialProtocol (peer: PeerId | Multiaddr, protocols: string | string[], options?: AbortOptions): Promise handle: (protocol: string | string[], handler: StreamHandler) => Promise @@ -132,7 +142,7 @@ export interface EntityBytesOptions { export interface RangeOptions { /** Extracts a specific byte range from the resource. */ - range: Range + range?: Range } /** @@ -165,6 +175,8 @@ export interface IDagula extends BlockService, DagService, UnixfsService {} export interface BlockService { /** Get a single block. */ getBlock (cid: UnknownLink|string, options?: AbortOptions): Promise + /** Retrieve information about a block. */ + statBlock (cid: UnknownLink|string, options?: AbortOptions): Promise /** Stream bytes from a single block. */ streamBlock (cid: UnknownLink|string, options?: AbortOptions & RangeOptions): Promise> } @@ -191,6 +203,8 @@ export declare class Dagula implements BlockService, DagService, UnixfsService { getPath (cidPath: string, options?: AbortOptions & DagScopeOptions & EntityBytesOptions & BlockOrderOptions): AsyncIterableIterator /** Get a single block. */ getBlock (cid: UnknownLink|string, options?: AbortOptions): Promise + /** Retrieve information about a block. */ + statBlock (cid: UnknownLink|string, options?: AbortOptions): Promise /** Stream bytes from a single block. */ streamBlock (cid: UnknownLink|string, options?: AbortOptions & RangeOptions): Promise> /** Get UnixFS files and directories. */ diff --git a/index.js b/index.js index fc910ad..6c7750e 100644 --- a/index.js +++ b/index.js @@ -6,7 +6,6 @@ import { UnixFS } from 'ipfs-unixfs' import { exporter, walkPath } from 'ipfs-unixfs-exporter' import { parallelMap, transform } from 'streaming-iterables' import { Decoders, Hashers } from './defaults.js' -import { identity } from 'multiformats/hashes/identity' import { depthFirst, breadthFirst } from './traversal.js' /** @@ -235,9 +234,6 @@ export class Dagula { async getBlock (cid, options = {}) { cid = typeof cid === 'string' ? CID.parse(cid) : cid log('getting block %s', cid) - if (cid.code === identity.code) { - return { cid, bytes: cid.multihash.digest } - } const block = await this.#blockstore.get(cid, { signal: options.signal }) if (!block) { throw Object.assign(new Error(`peer does not have: ${cid}`), { code: 'ERR_DONT_HAVE' }) @@ -252,21 +248,27 @@ export class Dagula { async streamBlock (cid, options) { cid = typeof cid === 'string' ? CID.parse(cid) : cid log('streaming block %s', cid) - if (cid.code === identity.code) { - return new ReadableStream({ - pull (controller) { - controller.enqueue(cid.multihash.digest) - controller.close() - } - }) - } - const readable = await this.#blockstore.stream(cid, options) + const readable = await this.#blockstore.stream(cid, { signal: options?.signal, range: options?.range }) if (!readable) { throw Object.assign(new Error(`peer does not have: ${cid}`), { code: 'ERR_DONT_HAVE' }) } return readable } + /** + * @param {import('multiformats').UnknownLink|string} cid + * @param {import('./index').AbortOptions} [options] + */ + async statBlock (cid, options) { + cid = typeof cid === 'string' ? CID.parse(cid) : cid + log('stat block %s', cid) + const stat = await this.#blockstore.stat(cid, { signal: options?.signal }) + if (!stat) { + throw Object.assign(new Error(`peer does not have: ${cid}`), { code: 'ERR_DONT_HAVE' }) + } + return stat + } + /** * @param {string|import('multiformats').UnknownLink} path * @param {{ signal?: AbortSignal }} [options] diff --git a/test/statBlock.test.js b/test/statBlock.test.js new file mode 100644 index 0000000..f6ff405 --- /dev/null +++ b/test/statBlock.test.js @@ -0,0 +1,20 @@ +import test from 'ava' +import { fromString } from 'multiformats/bytes' +import * as raw from 'multiformats/codecs/raw' +import { sha256 } from 'multiformats/hashes/sha2' +import { CID } from 'multiformats/cid' +import { getLibp2p, fromNetwork } from '../p2p.js' +import { startBitswapPeer } from './_libp2p.js' + +test('should stat a block', async t => { + const bytes = fromString(`TEST DATA ${Date.now()}`) + const hash = await sha256.digest(bytes) + const cid = CID.create(1, raw.code, hash) + const peer = await startBitswapPeer([{ cid, bytes }]) + + const libp2p = await getLibp2p() + const dagula = await fromNetwork(libp2p, { peer: peer.libp2p.getMultiaddrs()[0] }) + const { size } = await dagula.statBlock(cid) + + t.is(size, bytes.length) +}) From ebc5c21da5c9f887cc45ea92f7aec5f59bde8f34 Mon Sep 17 00:00:00 2001 From: Alan Shaw Date: Mon, 13 May 2024 11:43:35 +0100 Subject: [PATCH 4/5] feat: export separate classes that implement block, dag and unixfs interfaces --- blocks.js | 62 ++++++++ dag.js | 406 +++++++++++++++++++++++++++++++++++++++++++++++++++++ index.d.ts | 8 +- 3 files changed, 472 insertions(+), 4 deletions(-) create mode 100644 blocks.js create mode 100644 dag.js diff --git a/blocks.js b/blocks.js new file mode 100644 index 0000000..0829abf --- /dev/null +++ b/blocks.js @@ -0,0 +1,62 @@ +import debug from 'debug' +import { CID } from 'multiformats/cid' + +const log = debug('dagula:blocks') + +/** + * @typedef {import('./index').BlockService} BlockService + * @implements {BlockService} + */ +export class Blocks { + /** @type {import('./index').Blockstore} */ + #blockstore + + /** + * @param {import('./index').BlockGetter & import('./index').BlockStreamer & import('./index').BlockInspecter} blockstore + */ + constructor (blockstore) { + this.#blockstore = blockstore + } + + /** + * @param {import('multiformats').UnknownLink|string} cid + * @param {{ signal?: AbortSignal }} [options] + */ + async getBlock (cid, options = {}) { + cid = typeof cid === 'string' ? CID.parse(cid) : cid + log('getting block %s', cid) + const block = await this.#blockstore.get(cid, { signal: options.signal }) + if (!block) { + throw Object.assign(new Error(`peer does not have: ${cid}`), { code: 'ERR_DONT_HAVE' }) + } + return block + } + + /** + * @param {import('multiformats').UnknownLink|string} cid + * @param {import('./index').AbortOptions & import('./index').RangeOptions} [options] + */ + async streamBlock (cid, options) { + cid = typeof cid === 'string' ? CID.parse(cid) : cid + log('streaming block %s', cid) + const readable = await this.#blockstore.stream(cid, { signal: options?.signal, range: options?.range }) + if (!readable) { + throw Object.assign(new Error(`peer does not have: ${cid}`), { code: 'ERR_DONT_HAVE' }) + } + return readable + } + + /** + * @param {import('multiformats').UnknownLink|string} cid + * @param {import('./index').AbortOptions} [options] + */ + async statBlock (cid, options) { + cid = typeof cid === 'string' ? CID.parse(cid) : cid + log('stat block %s', cid) + const stat = await this.#blockstore.stat(cid, { signal: options?.signal }) + if (!stat) { + throw Object.assign(new Error(`peer does not have: ${cid}`), { code: 'ERR_DONT_HAVE' }) + } + return stat + } +} diff --git a/dag.js b/dag.js new file mode 100644 index 0000000..9421a7f --- /dev/null +++ b/dag.js @@ -0,0 +1,406 @@ +import debug from 'debug' +import { CID } from 'multiformats/cid' +import * as dagPB from '@ipld/dag-pb' +import * as Block from 'multiformats/block' +import { UnixFS } from 'ipfs-unixfs' +import { exporter, walkPath } from 'ipfs-unixfs-exporter' +import { parallelMap, transform } from 'streaming-iterables' +import { Decoders, Hashers } from './defaults.js' +import { depthFirst, breadthFirst } from './traversal.js' + +/** + * @typedef {{ unixfs?: UnixFS }} LinkFilterContext + * @typedef {([name, cid]: [string, import('multiformats').UnknownLink], context: LinkFilterContext) => boolean} LinkFilter + * @typedef {{ cid: import('multiformats').UnknownLink, range?: import('./index').AbsoluteRange }} GraphSelector + */ + +const log = debug('dagula') + +/** + * @typedef {import('./index').DagService} DagService + * @implements {DagService} + */ +export class Dag { + /** @type {import('./index').BlockGetter} */ + #blockstore + /** @type {import('./index').BlockDecoders} */ + #decoders + /** @type {import('./index').MultihashHashers} */ + #hashers + + /** + * @param {import('./index').BlockGetter} blockstore + * @param {{ + * decoders?: import('./index').BlockDecoders, + * hashers?: import('./index').MultihashHashers + * }} [options] + */ + constructor (blockstore, options = {}) { + this.#blockstore = blockstore + this.#decoders = options.decoders || Decoders + this.#hashers = options.hashers || Hashers + } + + /** + * @param {import('multiformats').UnknownLink[]|import('multiformats').UnknownLink|string} cid + * @param {object} [options] + * @param {AbortSignal} [options.signal] + * @param {import('./index').BlockOrder} [options.order] + * @param {import('./index').Range} [options.entityBytes] + * @param {LinkFilter} [options.filter] + */ + async * getDag (cid, options = {}) { + cid = typeof cid === 'string' ? CID.parse(cid) : cid + log('getting DAG %s', cid) + yield * this.#get((Array.isArray(cid) ? cid : [cid]).map(cid => ({ cid })), options) + } + + /** + * @param {GraphSelector[]} selectors + * @param {object} [options] + * @param {AbortSignal} [options.signal] + * @param {import('./index').BlockOrder} [options.order] + * @param {LinkFilter} [options.filter] + */ + async * #get (selectors, options = {}) { + const order = options.order ?? 'dfs' + + // fn to track which links to follow next + const search = order === 'dfs' ? depthFirst() : breadthFirst() + + // fn to normalize extracting links from different block types + const getLinks = blockLinks(options.filter) + + selectors = search(selectors) + + /** @type {AbortController[]} */ + let aborters = [] + const { signal } = options + signal?.addEventListener('abort', () => aborters.forEach(a => a.abort(signal.reason))) + + while (selectors.length > 0) { + log('fetching %d CIDs', selectors.length) + const parallelFn = order === 'dfs' ? parallelMap : transform + const fetchBlocks = parallelFn(selectors.length, async selector => { + if (signal) { + const aborter = new AbortController() + aborters.push(aborter) + const block = await this.#getBlock(selector.cid, { signal: aborter.signal }) + aborters = aborters.filter(a => a !== aborter) + return { selector, block } + } + const block = await this.#getBlock(selector.cid) + return { selector, block } + }) + /** @type {GraphSelector[]} */ + const nextSelectors = [] + for await (const { block: { cid, bytes }, selector } of fetchBlocks(selectors)) { + const decoder = this.#decoders[cid.code] + if (!decoder) { + throw new Error(`unknown codec: 0x${cid.code.toString(16)}`) + } + const hasher = this.#hashers[cid.multihash.code] + if (!hasher) { + throw new Error(`unknown multihash codec: 0x${cid.multihash.code.toString(16)}`) + } + log('decoding block %s', cid) + // bitswap-fetcher _must_ verify hashes on receipt of a block, but we + // cannot guarantee the blockstore passed is a bitswap so cannot use + // createUnsafe here. + const block = await Block.create({ bytes, cid, codec: decoder, hasher }) + yield block + nextSelectors.push(...getLinks(block, selector)) + } + log('%d CIDs in links', nextSelectors.length) + // reduce the next selectors in the links to the ones that should be + // considered for the given DAG traversal method. e.g. if using DFS and + // next selectors has 1 raw block, and 2 non-raw blocks, then the DFS + // search will reduce the next selectors down to just the first 2 items, + // since the second item (the non-raw block) may have links that need to + // be traversed before the others. + selectors = search(nextSelectors) + } + } + + /** + * Yield all blocks traversed to resolve the ipfs path. + * Then use dagScope to determine the set of blocks of the targeted dag to yield. + * Yield all blocks by default. + * Use dagScope: 'block' to yield the terminal block. + * Use dagScope: 'entity' to yield all the blocks of a unixfs file, or enough blocks to list a directory. + * + * @param {string} cidPath + * @param {object} [options] + * @param {AbortSignal} [options.signal] + * @param {import('./index').BlockOrder} [options.order] Specify desired block ordering. `dfs` - Depth First Search, `unk` - unknown ordering. + * @param {import('./index').DagScope} [options.dagScope] control how many layers of the dag are returned + * 'all': return the entire dag starting at path. (default) + * 'block': return the block identified by the path. + * 'entity': Mimic gateway semantics: Return All blocks for a multi-block file or just enough blocks to enumerate a dir/map but not the dir contents. + * Where path points to a single block file, all three selectors would return the same thing. + * where path points to a sharded hamt: 'file' returns the blocks of the hamt so the dir can be listed. 'block' returns the root block of the hamt. + * @param {import('./index').Range} [options.entityBytes] + */ + async * getDagByPath (cidPath, options = {}) { + const dagScope = options.dagScope ?? (options.entityBytes ? 'entity' : 'all') + const entityBytes = dagScope === 'entity' ? options.entityBytes : undefined + + /** + * The resolved dag root at the terminus of the cidPath + * @type {import('ipfs-unixfs-exporter').UnixFSEntry?} + */ + let base = null + + /** + * Cache of blocks required to resolve the cidPath + * @type {import('./index').Block[]} + */ + let traversed = [] + + /** + * Adapter for unixfs-exporter to track the blocks it loads as it resolves the path. + * `walkPath` emits a single unixfs entry for multiblock structures, but we need the individual blocks. + * TODO: port logic to @web3-storage/ipfs-path to make this less ugly. + */ + const blockstore = { + /** + * @param {CID} cid + * @param {{ signal?: AbortSignal }} [options] + */ + get: async (cid, options) => { + const block = await this.#getBlock(cid, options) + traversed.push(block) + return block.bytes + } + } + try { + for await (const item of walkPath(cidPath, blockstore, { signal: options.signal })) { + base = item + yield * traversed + traversed = [] + } + } catch (err) { + // yield all traversed blocks even if the path was not found. This allows + // the caller to verify the path does not exist for themselves. + if (err.code === 'ERR_NOT_FOUND') { + yield * traversed + } + throw err + } + if (!base) throw new Error('walkPath did not yield an entry') + + if (dagScope === 'all' || (dagScope === 'entity' && base.type !== 'directory')) { + /** @type {import('./index').AbsoluteRange|undefined} */ + let range + if (entityBytes) { + const size = Number(base.size) + // resolve entity bytes to actual byte offsets + range = [ + entityBytes[0] < 0 + ? size - 1 + entityBytes[0] + : entityBytes[0], + entityBytes[1] == null + ? size - 1 + : entityBytes[1] < 0 + ? size - 1 + entityBytes[1] + : entityBytes[1] + ] + } + const selectors = getUnixfsEntryLinkSelectors(base, this.#decoders, range) + yield * this.#get(selectors, { signal: options.signal, order: options.order }) + } + // non-files, like directories, and IPLD Maps only return blocks necessary for their enumeration + if (dagScope === 'entity' && base.type === 'directory') { + // the single block for the root has already been yielded. + // For a hamt we must fetch all the blocks of the (current) hamt. + if (base.unixfs.type === 'hamt-sharded-directory') { + const padLength = getUnixfsHamtPadLength(base.unixfs.fanout) + const hamtLinks = base.node.Links?.filter(l => l.Name?.length === padLength).map(l => l.Hash) || [] + if (hamtLinks.length) { + yield * this.getDag(hamtLinks, { filter: hamtFilter, signal: options.signal, order: options.order }) + } + } + } + } + + /** + * @param {import('multiformats').UnknownLink|string} cid + * @param {{ signal?: AbortSignal }} [options] + */ + async #getBlock (cid, options = {}) { + cid = typeof cid === 'string' ? CID.parse(cid) : cid + log('getting block %s', cid) + const block = await this.#blockstore.get(cid, { signal: options.signal }) + if (!block) { + throw Object.assign(new Error(`peer does not have: ${cid}`), { code: 'ERR_DONT_HAVE' }) + } + return block + } +} + +/** + * Create a search function that given a decoded Block and selector, will + * return an array of `GraphSelector` of things to fetch next. + * + * @param {LinkFilter} linkFilter + */ +export function blockLinks (linkFilter = () => true) { + /** + * @param {import('multiformats').BlockView} block + * @param {GraphSelector} selector + */ + return function (block, selector) { + if (isDagPB(block)) { + if (selector.range) { + const data = UnixFS.unmarshal(block.value.Data ?? new Uint8Array()) + if (data.type === 'file') { + const ranges = toRanges(data.blockSizes.map(Number)) + /** @type {GraphSelector[]} */ + const selectors = [] + for (let i = 0; i < block.value.Links.length; i++) { + const { Name, Hash } = block.value.Links[i] + if (linkFilter([Name ?? '', Hash], { unixfs: data })) { + const relRange = toRelativeRange(selector.range, ranges[i]) + if (relRange) selectors.push({ cid: block.value.Links[i].Hash, range: relRange }) + } + } + return selectors + } + } + + const filterCtx = { + get unixfs () { + return UnixFS.unmarshal(block.value.Data ?? new Uint8Array()) + } + } + return block.value.Links + .filter(({ Name, Hash }) => linkFilter([Name ?? '', Hash], filterCtx)) + .map(l => ({ cid: l.Hash })) + } + + /** @type {GraphSelector[]} */ + const selectors = [] + // links() paths dagPb in the ipld style so name is e.g `Links/0/Hash`, and not what we want here. + for (const link of block.links()) { + if (linkFilter(link, {})) { + selectors.push({ cid: link[1] }) + } + } + return selectors + } +} + +/** + * @param {import('multiformats').BlockView} block + * @returns {block is import('multiformats').BlockView} + */ +const isDagPB = block => block.cid.code === dagPB.code + +/** @type {LinkFilter} */ +export const hamtFilter = ([name], ctx) => ctx.unixfs ? name?.length === getUnixfsHamtPadLength(ctx.unixfs.fanout) : false + +/** + * Converts an array of block sizes to an array of byte ranges. + * @param {number[]} blockSizes + */ +function toRanges (blockSizes) { + const ranges = [] + let offset = 0 + for (const size of blockSizes) { + /** @type {import('./index').AbsoluteRange} */ + const absRange = [offset, offset + size - 1] + ranges.push(absRange) + offset += size + } + return ranges +} + +/** + * Given two absolute ranges `a` and `b`, calculate the intersection and + * convert to a relative range within `b`. + * + * Examples: + * ```js + * toRelativeRange([100,200], [150,300]): [0,50] + * toRelativeRange([100,200], [50,250]): [50,100] + * toRelativeRange([100,200], [25,110]): [75,10] + * toRelativeRange([100,200], [300,400]): undefined + * ``` + * + * @param {import('./index').AbsoluteRange} a + * @param {import('./index').AbsoluteRange} b + * @returns {import('./index').AbsoluteRange|undefined} + */ +const toRelativeRange = (a, b) => { + // starts in range + if (b[0] >= a[0] && b[0] <= a[1]) { + // ends in range + if (b[1] >= a[0] && b[1] <= a[1]) { + return [0, b[1] - b[0]] + // ends out of range + } else { + return [0, a[1] - b[0]] + } + // ends in range + } else if (b[1] >= a[0] && b[1] <= a[1]) { + return [a[0] - b[0], b[1] - a[0]] + // covers whole range + } else if (b[0] < a[0] && b[1] > a[1]) { + return [a[0] - b[0], a[1] - a[0]] + } +} + +/** + * Get selectors for a UnixFS entry's links with their corresponding relative + * byte ranges. + * + * @param {import('ipfs-unixfs-exporter').UnixFSEntry} entry + * @param {import('./index').BlockDecoders} decoders + * @param {import('./index').AbsoluteRange} [range] + * @returns {GraphSelector[]} + */ +function getUnixfsEntryLinkSelectors (entry, decoders, range) { + if (entry.type === 'file') { + if (range) { + const blockSizes = entry.unixfs.blockSizes.map(s => Number(s)) + /** @type {GraphSelector[]} */ + const selectors = [] + // create selectors, filtering out links that do not contain data in the range. + for (const [i, absRange] of toRanges(blockSizes).entries()) { + if (absRange[0] > range[1]) break + const relRange = toRelativeRange(range, absRange) + if (relRange) selectors.push({ cid: entry.node.Links[i].Hash, range: relRange }) + } + return selectors + } + return entry.node.Links.map(l => ({ cid: l.Hash })) + } + + if (entry.type === 'directory') { + return entry.node.Links.map(l => ({ cid: l.Hash })) + } + + if (entry.type === 'object' || entry.type === 'identity') { + // UnixFSEntry `node` is Uint8Array for objects and identity blocks! + // so we have to decode them again to get the links here. + const decoder = decoders[entry.cid.code] + if (!decoder) { + throw new Error(`unknown codec: 0x${entry.cid.code.toString(16)}`) + } + const decoded = Block.createUnsafe({ bytes: entry.node, cid: entry.cid, codec: decoder }) + const selectors = [] + for (const [, cid] of decoded.links()) { + selectors.push({ cid }) + } + return selectors + } + + // raw! no links! + return [] +} + +/** @param {number|bigint|undefined} fanout */ +function getUnixfsHamtPadLength (fanout) { + if (!fanout) throw new Error('missing fanout') + return (Number(fanout) - 1).toString(16).length +} diff --git a/index.d.ts b/index.d.ts index 6a8de05..b780336 100644 --- a/index.d.ts +++ b/index.d.ts @@ -183,9 +183,9 @@ export interface BlockService { export interface DagService { /** Get a complete DAG by root CID. */ - get (cid: UnknownLink|string, options?: AbortOptions & BlockOrderOptions): AsyncIterableIterator + getDag (cid: UnknownLink|string, options?: AbortOptions & BlockOrderOptions): AsyncIterableIterator /** Get a DAG for a cid+path. */ - getPath (cidPath: string, options?: AbortOptions & DagScopeOptions & EntityBytesOptions & BlockOrderOptions): AsyncIterableIterator + getDagByPath (cidPath: string, options?: AbortOptions & DagScopeOptions & EntityBytesOptions & BlockOrderOptions): AsyncIterableIterator } export interface UnixfsService { @@ -198,9 +198,9 @@ export interface UnixfsService { export declare class Dagula implements BlockService, DagService, UnixfsService { constructor (blockstore: Blockstore, options?: { decoders?: BlockDecoders, hashers?: MultihashHashers }) /** Get a complete DAG by root CID. */ - get (cid: UnknownLink|string, options?: AbortOptions & BlockOrderOptions): AsyncIterableIterator + getDag (cid: UnknownLink|string, options?: AbortOptions & BlockOrderOptions): AsyncIterableIterator /** Get a DAG for a cid+path. */ - getPath (cidPath: string, options?: AbortOptions & DagScopeOptions & EntityBytesOptions & BlockOrderOptions): AsyncIterableIterator + getDagByPath (cidPath: string, options?: AbortOptions & DagScopeOptions & EntityBytesOptions & BlockOrderOptions): AsyncIterableIterator /** Get a single block. */ getBlock (cid: UnknownLink|string, options?: AbortOptions): Promise /** Retrieve information about a block. */ From 063a70c9b05056c1c65b50a21e2a0d2823885a34 Mon Sep 17 00:00:00 2001 From: Alan Shaw Date: Mon, 13 May 2024 11:45:21 +0100 Subject: [PATCH 5/5] feat: wip DAG service implementation --- dag.js | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dag.js b/dag.js index 9421a7f..452762a 100644 --- a/dag.js +++ b/dag.js @@ -3,7 +3,7 @@ import { CID } from 'multiformats/cid' import * as dagPB from '@ipld/dag-pb' import * as Block from 'multiformats/block' import { UnixFS } from 'ipfs-unixfs' -import { exporter, walkPath } from 'ipfs-unixfs-exporter' +import { walkPath } from 'ipfs-unixfs-exporter' import { parallelMap, transform } from 'streaming-iterables' import { Decoders, Hashers } from './defaults.js' import { depthFirst, breadthFirst } from './traversal.js'