diff --git a/bitswap-fetcher.js b/bitswap-fetcher.js index e4918b2..30c7afa 100644 --- a/bitswap-fetcher.js +++ b/bitswap-fetcher.js @@ -2,6 +2,7 @@ import defer from 'p-defer' import { pipe } from 'it-pipe' import * as lp from 'it-length-prefixed' import { base58btc } from 'multiformats/bases/base58' +import { identity } from 'multiformats/hashes/identity' import debug from 'debug' import { Entry, Message, BlockPresenceType } from './message.js' import * as Prefix from './prefix.js' @@ -91,6 +92,10 @@ export class BitswapFetcher { throw options.signal.reason || abortError() } + if (cid.code === identity.code) { + return { cid, bytes: cid.multihash.digest } + } + // ensure we can hash the data when we receive the block if (!this.#hashers[cid.multihash.code]) { throw new Error(`missing hasher: 0x${cid.multihash.code.toString(16)} for wanted block: ${cid}`) @@ -123,6 +128,35 @@ export class BitswapFetcher { return deferred.promise } + /** + * @param {import('multiformats').UnknownLink} cid + * @param {{ range?: import('./index').Range, signal?: AbortSignal }} [options] + */ + async stream (cid, options) { + const block = await this.get(cid, options) + if (!block) return + + return /** @type {ReadableStream} */ (new ReadableStream({ + pull (controller) { + const { range } = options ?? {} + const bytes = range + ? block.bytes.slice(range[0], range[1] && (range[1] + 1)) + : block.bytes + controller.enqueue(bytes) + controller.close() + } + })) + } + + /** + * @param {import('multiformats').UnknownLink} cid + * @param {{ signal?: AbortSignal }} [options] + */ + async stat (cid, options) { + const block = await this.get(cid, options) + if (block) return { size: block.bytes.length } + } + /** @type {import('@libp2p/interface-registrar').StreamHandler} */ async handler ({ stream }) { log('incoming stream') diff --git a/blocks.js b/blocks.js new file mode 100644 index 0000000..0829abf --- /dev/null +++ b/blocks.js @@ -0,0 +1,62 @@ +import debug from 'debug' +import { CID } from 'multiformats/cid' + +const log = debug('dagula:blocks') + +/** + * @typedef {import('./index').BlockService} BlockService + * @implements {BlockService} + */ +export class Blocks { + /** @type {import('./index').Blockstore} */ + #blockstore + + /** + * @param {import('./index').BlockGetter & import('./index').BlockStreamer & import('./index').BlockInspecter} blockstore + */ + constructor (blockstore) { + this.#blockstore = blockstore + } + + /** + * @param {import('multiformats').UnknownLink|string} cid + * @param {{ signal?: AbortSignal }} [options] + */ + async getBlock (cid, options = {}) { + cid = typeof cid === 'string' ? CID.parse(cid) : cid + log('getting block %s', cid) + const block = await this.#blockstore.get(cid, { signal: options.signal }) + if (!block) { + throw Object.assign(new Error(`peer does not have: ${cid}`), { code: 'ERR_DONT_HAVE' }) + } + return block + } + + /** + * @param {import('multiformats').UnknownLink|string} cid + * @param {import('./index').AbortOptions & import('./index').RangeOptions} [options] + */ + async streamBlock (cid, options) { + cid = typeof cid === 'string' ? CID.parse(cid) : cid + log('streaming block %s', cid) + const readable = await this.#blockstore.stream(cid, { signal: options?.signal, range: options?.range }) + if (!readable) { + throw Object.assign(new Error(`peer does not have: ${cid}`), { code: 'ERR_DONT_HAVE' }) + } + return readable + } + + /** + * @param {import('multiformats').UnknownLink|string} cid + * @param {import('./index').AbortOptions} [options] + */ + async statBlock (cid, options) { + cid = typeof cid === 'string' ? CID.parse(cid) : cid + log('stat block %s', cid) + const stat = await this.#blockstore.stat(cid, { signal: options?.signal }) + if (!stat) { + throw Object.assign(new Error(`peer does not have: ${cid}`), { code: 'ERR_DONT_HAVE' }) + } + return stat + } +} diff --git a/dag.js b/dag.js new file mode 100644 index 0000000..452762a --- /dev/null +++ b/dag.js @@ -0,0 +1,406 @@ +import debug from 'debug' +import { CID } from 'multiformats/cid' +import * as dagPB from '@ipld/dag-pb' +import * as Block from 'multiformats/block' +import { UnixFS } from 'ipfs-unixfs' +import { walkPath } from 'ipfs-unixfs-exporter' +import { parallelMap, transform } from 'streaming-iterables' +import { Decoders, Hashers } from './defaults.js' +import { depthFirst, breadthFirst } from './traversal.js' + +/** + * @typedef {{ unixfs?: UnixFS }} LinkFilterContext + * @typedef {([name, cid]: [string, import('multiformats').UnknownLink], context: LinkFilterContext) => boolean} LinkFilter + * @typedef {{ cid: import('multiformats').UnknownLink, range?: import('./index').AbsoluteRange }} GraphSelector + */ + +const log = debug('dagula') + +/** + * @typedef {import('./index').DagService} DagService + * @implements {DagService} + */ +export class Dag { + /** @type {import('./index').BlockGetter} */ + #blockstore + /** @type {import('./index').BlockDecoders} */ + #decoders + /** @type {import('./index').MultihashHashers} */ + #hashers + + /** + * @param {import('./index').BlockGetter} blockstore + * @param {{ + * decoders?: import('./index').BlockDecoders, + * hashers?: import('./index').MultihashHashers + * }} [options] + */ + constructor (blockstore, options = {}) { + this.#blockstore = blockstore + this.#decoders = options.decoders || Decoders + this.#hashers = options.hashers || Hashers + } + + /** + * @param {import('multiformats').UnknownLink[]|import('multiformats').UnknownLink|string} cid + * @param {object} [options] + * @param {AbortSignal} [options.signal] + * @param {import('./index').BlockOrder} [options.order] + * @param {import('./index').Range} [options.entityBytes] + * @param {LinkFilter} [options.filter] + */ + async * getDag (cid, options = {}) { + cid = typeof cid === 'string' ? CID.parse(cid) : cid + log('getting DAG %s', cid) + yield * this.#get((Array.isArray(cid) ? cid : [cid]).map(cid => ({ cid })), options) + } + + /** + * @param {GraphSelector[]} selectors + * @param {object} [options] + * @param {AbortSignal} [options.signal] + * @param {import('./index').BlockOrder} [options.order] + * @param {LinkFilter} [options.filter] + */ + async * #get (selectors, options = {}) { + const order = options.order ?? 'dfs' + + // fn to track which links to follow next + const search = order === 'dfs' ? depthFirst() : breadthFirst() + + // fn to normalize extracting links from different block types + const getLinks = blockLinks(options.filter) + + selectors = search(selectors) + + /** @type {AbortController[]} */ + let aborters = [] + const { signal } = options + signal?.addEventListener('abort', () => aborters.forEach(a => a.abort(signal.reason))) + + while (selectors.length > 0) { + log('fetching %d CIDs', selectors.length) + const parallelFn = order === 'dfs' ? parallelMap : transform + const fetchBlocks = parallelFn(selectors.length, async selector => { + if (signal) { + const aborter = new AbortController() + aborters.push(aborter) + const block = await this.#getBlock(selector.cid, { signal: aborter.signal }) + aborters = aborters.filter(a => a !== aborter) + return { selector, block } + } + const block = await this.#getBlock(selector.cid) + return { selector, block } + }) + /** @type {GraphSelector[]} */ + const nextSelectors = [] + for await (const { block: { cid, bytes }, selector } of fetchBlocks(selectors)) { + const decoder = this.#decoders[cid.code] + if (!decoder) { + throw new Error(`unknown codec: 0x${cid.code.toString(16)}`) + } + const hasher = this.#hashers[cid.multihash.code] + if (!hasher) { + throw new Error(`unknown multihash codec: 0x${cid.multihash.code.toString(16)}`) + } + log('decoding block %s', cid) + // bitswap-fetcher _must_ verify hashes on receipt of a block, but we + // cannot guarantee the blockstore passed is a bitswap so cannot use + // createUnsafe here. + const block = await Block.create({ bytes, cid, codec: decoder, hasher }) + yield block + nextSelectors.push(...getLinks(block, selector)) + } + log('%d CIDs in links', nextSelectors.length) + // reduce the next selectors in the links to the ones that should be + // considered for the given DAG traversal method. e.g. if using DFS and + // next selectors has 1 raw block, and 2 non-raw blocks, then the DFS + // search will reduce the next selectors down to just the first 2 items, + // since the second item (the non-raw block) may have links that need to + // be traversed before the others. + selectors = search(nextSelectors) + } + } + + /** + * Yield all blocks traversed to resolve the ipfs path. + * Then use dagScope to determine the set of blocks of the targeted dag to yield. + * Yield all blocks by default. + * Use dagScope: 'block' to yield the terminal block. + * Use dagScope: 'entity' to yield all the blocks of a unixfs file, or enough blocks to list a directory. + * + * @param {string} cidPath + * @param {object} [options] + * @param {AbortSignal} [options.signal] + * @param {import('./index').BlockOrder} [options.order] Specify desired block ordering. `dfs` - Depth First Search, `unk` - unknown ordering. + * @param {import('./index').DagScope} [options.dagScope] control how many layers of the dag are returned + * 'all': return the entire dag starting at path. (default) + * 'block': return the block identified by the path. + * 'entity': Mimic gateway semantics: Return All blocks for a multi-block file or just enough blocks to enumerate a dir/map but not the dir contents. + * Where path points to a single block file, all three selectors would return the same thing. + * where path points to a sharded hamt: 'file' returns the blocks of the hamt so the dir can be listed. 'block' returns the root block of the hamt. + * @param {import('./index').Range} [options.entityBytes] + */ + async * getDagByPath (cidPath, options = {}) { + const dagScope = options.dagScope ?? (options.entityBytes ? 'entity' : 'all') + const entityBytes = dagScope === 'entity' ? options.entityBytes : undefined + + /** + * The resolved dag root at the terminus of the cidPath + * @type {import('ipfs-unixfs-exporter').UnixFSEntry?} + */ + let base = null + + /** + * Cache of blocks required to resolve the cidPath + * @type {import('./index').Block[]} + */ + let traversed = [] + + /** + * Adapter for unixfs-exporter to track the blocks it loads as it resolves the path. + * `walkPath` emits a single unixfs entry for multiblock structures, but we need the individual blocks. + * TODO: port logic to @web3-storage/ipfs-path to make this less ugly. + */ + const blockstore = { + /** + * @param {CID} cid + * @param {{ signal?: AbortSignal }} [options] + */ + get: async (cid, options) => { + const block = await this.#getBlock(cid, options) + traversed.push(block) + return block.bytes + } + } + try { + for await (const item of walkPath(cidPath, blockstore, { signal: options.signal })) { + base = item + yield * traversed + traversed = [] + } + } catch (err) { + // yield all traversed blocks even if the path was not found. This allows + // the caller to verify the path does not exist for themselves. + if (err.code === 'ERR_NOT_FOUND') { + yield * traversed + } + throw err + } + if (!base) throw new Error('walkPath did not yield an entry') + + if (dagScope === 'all' || (dagScope === 'entity' && base.type !== 'directory')) { + /** @type {import('./index').AbsoluteRange|undefined} */ + let range + if (entityBytes) { + const size = Number(base.size) + // resolve entity bytes to actual byte offsets + range = [ + entityBytes[0] < 0 + ? size - 1 + entityBytes[0] + : entityBytes[0], + entityBytes[1] == null + ? size - 1 + : entityBytes[1] < 0 + ? size - 1 + entityBytes[1] + : entityBytes[1] + ] + } + const selectors = getUnixfsEntryLinkSelectors(base, this.#decoders, range) + yield * this.#get(selectors, { signal: options.signal, order: options.order }) + } + // non-files, like directories, and IPLD Maps only return blocks necessary for their enumeration + if (dagScope === 'entity' && base.type === 'directory') { + // the single block for the root has already been yielded. + // For a hamt we must fetch all the blocks of the (current) hamt. + if (base.unixfs.type === 'hamt-sharded-directory') { + const padLength = getUnixfsHamtPadLength(base.unixfs.fanout) + const hamtLinks = base.node.Links?.filter(l => l.Name?.length === padLength).map(l => l.Hash) || [] + if (hamtLinks.length) { + yield * this.getDag(hamtLinks, { filter: hamtFilter, signal: options.signal, order: options.order }) + } + } + } + } + + /** + * @param {import('multiformats').UnknownLink|string} cid + * @param {{ signal?: AbortSignal }} [options] + */ + async #getBlock (cid, options = {}) { + cid = typeof cid === 'string' ? CID.parse(cid) : cid + log('getting block %s', cid) + const block = await this.#blockstore.get(cid, { signal: options.signal }) + if (!block) { + throw Object.assign(new Error(`peer does not have: ${cid}`), { code: 'ERR_DONT_HAVE' }) + } + return block + } +} + +/** + * Create a search function that given a decoded Block and selector, will + * return an array of `GraphSelector` of things to fetch next. + * + * @param {LinkFilter} linkFilter + */ +export function blockLinks (linkFilter = () => true) { + /** + * @param {import('multiformats').BlockView} block + * @param {GraphSelector} selector + */ + return function (block, selector) { + if (isDagPB(block)) { + if (selector.range) { + const data = UnixFS.unmarshal(block.value.Data ?? new Uint8Array()) + if (data.type === 'file') { + const ranges = toRanges(data.blockSizes.map(Number)) + /** @type {GraphSelector[]} */ + const selectors = [] + for (let i = 0; i < block.value.Links.length; i++) { + const { Name, Hash } = block.value.Links[i] + if (linkFilter([Name ?? '', Hash], { unixfs: data })) { + const relRange = toRelativeRange(selector.range, ranges[i]) + if (relRange) selectors.push({ cid: block.value.Links[i].Hash, range: relRange }) + } + } + return selectors + } + } + + const filterCtx = { + get unixfs () { + return UnixFS.unmarshal(block.value.Data ?? new Uint8Array()) + } + } + return block.value.Links + .filter(({ Name, Hash }) => linkFilter([Name ?? '', Hash], filterCtx)) + .map(l => ({ cid: l.Hash })) + } + + /** @type {GraphSelector[]} */ + const selectors = [] + // links() paths dagPb in the ipld style so name is e.g `Links/0/Hash`, and not what we want here. + for (const link of block.links()) { + if (linkFilter(link, {})) { + selectors.push({ cid: link[1] }) + } + } + return selectors + } +} + +/** + * @param {import('multiformats').BlockView} block + * @returns {block is import('multiformats').BlockView} + */ +const isDagPB = block => block.cid.code === dagPB.code + +/** @type {LinkFilter} */ +export const hamtFilter = ([name], ctx) => ctx.unixfs ? name?.length === getUnixfsHamtPadLength(ctx.unixfs.fanout) : false + +/** + * Converts an array of block sizes to an array of byte ranges. + * @param {number[]} blockSizes + */ +function toRanges (blockSizes) { + const ranges = [] + let offset = 0 + for (const size of blockSizes) { + /** @type {import('./index').AbsoluteRange} */ + const absRange = [offset, offset + size - 1] + ranges.push(absRange) + offset += size + } + return ranges +} + +/** + * Given two absolute ranges `a` and `b`, calculate the intersection and + * convert to a relative range within `b`. + * + * Examples: + * ```js + * toRelativeRange([100,200], [150,300]): [0,50] + * toRelativeRange([100,200], [50,250]): [50,100] + * toRelativeRange([100,200], [25,110]): [75,10] + * toRelativeRange([100,200], [300,400]): undefined + * ``` + * + * @param {import('./index').AbsoluteRange} a + * @param {import('./index').AbsoluteRange} b + * @returns {import('./index').AbsoluteRange|undefined} + */ +const toRelativeRange = (a, b) => { + // starts in range + if (b[0] >= a[0] && b[0] <= a[1]) { + // ends in range + if (b[1] >= a[0] && b[1] <= a[1]) { + return [0, b[1] - b[0]] + // ends out of range + } else { + return [0, a[1] - b[0]] + } + // ends in range + } else if (b[1] >= a[0] && b[1] <= a[1]) { + return [a[0] - b[0], b[1] - a[0]] + // covers whole range + } else if (b[0] < a[0] && b[1] > a[1]) { + return [a[0] - b[0], a[1] - a[0]] + } +} + +/** + * Get selectors for a UnixFS entry's links with their corresponding relative + * byte ranges. + * + * @param {import('ipfs-unixfs-exporter').UnixFSEntry} entry + * @param {import('./index').BlockDecoders} decoders + * @param {import('./index').AbsoluteRange} [range] + * @returns {GraphSelector[]} + */ +function getUnixfsEntryLinkSelectors (entry, decoders, range) { + if (entry.type === 'file') { + if (range) { + const blockSizes = entry.unixfs.blockSizes.map(s => Number(s)) + /** @type {GraphSelector[]} */ + const selectors = [] + // create selectors, filtering out links that do not contain data in the range. + for (const [i, absRange] of toRanges(blockSizes).entries()) { + if (absRange[0] > range[1]) break + const relRange = toRelativeRange(range, absRange) + if (relRange) selectors.push({ cid: entry.node.Links[i].Hash, range: relRange }) + } + return selectors + } + return entry.node.Links.map(l => ({ cid: l.Hash })) + } + + if (entry.type === 'directory') { + return entry.node.Links.map(l => ({ cid: l.Hash })) + } + + if (entry.type === 'object' || entry.type === 'identity') { + // UnixFSEntry `node` is Uint8Array for objects and identity blocks! + // so we have to decode them again to get the links here. + const decoder = decoders[entry.cid.code] + if (!decoder) { + throw new Error(`unknown codec: 0x${entry.cid.code.toString(16)}`) + } + const decoded = Block.createUnsafe({ bytes: entry.node, cid: entry.cid, codec: decoder }) + const selectors = [] + for (const [, cid] of decoded.links()) { + selectors.push({ cid }) + } + return selectors + } + + // raw! no links! + return [] +} + +/** @param {number|bigint|undefined} fanout */ +function getUnixfsHamtPadLength (fanout) { + if (!fanout) throw new Error('missing fanout') + return (Number(fanout) - 1).toString(16).length +} diff --git a/index.d.ts b/index.d.ts index 74e99c6..b780336 100644 --- a/index.d.ts +++ b/index.d.ts @@ -8,6 +8,8 @@ import type { Stream } from '@libp2p/interface-connection' import type { StreamHandler } from '@libp2p/interface-registrar' import type { PeerId } from '@libp2p/interface-peer-id' +export type { AbortOptions } + export interface BlockDecoders { [code: number]: BlockDecoder } @@ -21,8 +23,26 @@ export interface Block { bytes: Uint8Array } -export interface Blockstore { - get: (cid: UnknownLink, options?: { signal?: AbortSignal }) => Promise +export interface BlockStat { + /** Total size in bytes of the block. */ + size: number +} + +export interface Blockstore extends BlockGetter, BlockStreamer, BlockInspecter {} + +export interface BlockGetter { + /** Retrieve a block. */ + get: (cid: UnknownLink, options?: AbortOptions) => Promise +} + +export interface BlockStreamer { + /** Stream bytes from a block. */ + stream: (cid: UnknownLink, options?: AbortOptions & RangeOptions) => Promise|undefined> +} + +export interface BlockInspecter { + /** Retrieve information about a block. */ + stat: (cid: UnknownLink, options?: AbortOptions) => Promise } export interface Network { @@ -63,27 +83,66 @@ export interface DagScopeOptions { } /** - * Specifies a range of bytes. - * - `*` can be substituted for end-of-file - * - `{ from: 0, to: '*' }` is the entire file. - * - Negative numbers can be used for referring to bytes from the end of a file - * - `{ from: -1024, to: '*' }` is the last 1024 bytes of a file. - * - It is also permissible to ask for the range of 500 bytes from the - * beginning of the file to 1000 bytes from the end: `{ from: 499, to: -1000 }` + * An absolute byte range to extract - always an array of two values + * corresponding to the first and last bytes (both inclusive). e.g. + * + * ``` + * [100, 200] + * ``` */ -export interface ByteRange { - /** Byte-offset of the first byte in a range (inclusive) */ - from: number - /** Byte-offset of the last byte in the range (inclusive) */ - to: number|'*' -} +export type AbsoluteRange = [first: number, last: number] + +/** + * An suffix byte range - always an array of one value corresponding to the + * first byte to start extraction from (inclusive). e.g. + * + * ``` + * [900] + * ``` + * + * If it is unknown how large a resource is, the last `n` bytes + * can be requested by specifying a negative value: + * + * ``` + * [-100] + * ``` + */ +export type SuffixRange = [first: number] + +/** + * Byte range to extract - an array of one or two values corresponding to the + * first and last bytes (both inclusive). e.g. + * + * ``` + * [100, 200] + * ``` + * + * Omitting the second value requests all remaining bytes of the resource. e.g. + * + * ``` + * [900] + * ``` + * + * Alternatively, if it's unknown how large a resource is, the last `n` bytes + * can be requested by specifying a negative value: + * + * ``` + * [-100] + * ``` + */ +export type Range = AbsoluteRange | SuffixRange export interface EntityBytesOptions { /** * A specific byte range from the entity. Setting entity bytes implies DAG * scope: entity. */ - entityBytes?: ByteRange + entityBytes?: Range +} + +export interface RangeOptions { + /** Extracts a specific byte range from the resource. */ + range?: Range } /** @@ -110,49 +169,46 @@ export interface BlockOrderOptions { order?: BlockOrder } -export interface IDagula { - /** - * Get a complete DAG by root CID. - */ - get (cid: UnknownLink|string, options?: AbortOptions & BlockOrderOptions): AsyncIterableIterator - /** - * Get a DAG for a cid+path. - */ - getPath (cidPath: string, options?: AbortOptions & DagScopeOptions & EntityBytesOptions & BlockOrderOptions): AsyncIterableIterator - /** - * Get a single block. - */ +/** @deprecated Use `BlockService`, `DagService` and `UnixfsService` interface instead. */ +export interface IDagula extends BlockService, DagService, UnixfsService {} + +export interface BlockService { + /** Get a single block. */ getBlock (cid: UnknownLink|string, options?: AbortOptions): Promise - /** - * Get UnixFS files and directories. - */ + /** Retrieve information about a block. */ + statBlock (cid: UnknownLink|string, options?: AbortOptions): Promise + /** Stream bytes from a single block. */ + streamBlock (cid: UnknownLink|string, options?: AbortOptions & RangeOptions): Promise> +} + +export interface DagService { + /** Get a complete DAG by root CID. */ + getDag (cid: UnknownLink|string, options?: AbortOptions & BlockOrderOptions): AsyncIterableIterator + /** Get a DAG for a cid+path. */ + getDagByPath (cidPath: string, options?: AbortOptions & DagScopeOptions & EntityBytesOptions & BlockOrderOptions): AsyncIterableIterator +} + +export interface UnixfsService { + /** Get UnixFS files and directories. */ getUnixfs (path: UnknownLink|string, options?: AbortOptions): Promise - /** - * Emit nodes for all path segements and get UnixFS files and directories. - */ + /** Emit nodes for all path segements and get UnixFS files and directories. */ walkUnixfsPath (path: UnknownLink|string, options?: AbortOptions): AsyncIterableIterator } -export declare class Dagula implements IDagula { +export declare class Dagula implements BlockService, DagService, UnixfsService { constructor (blockstore: Blockstore, options?: { decoders?: BlockDecoders, hashers?: MultihashHashers }) - /** - * Get a complete DAG by root CID. - */ - get (cid: UnknownLink|string, options?: AbortOptions & BlockOrderOptions): AsyncIterableIterator - /** - * Get a DAG for a cid+path. - */ - getPath (cidPath: string, options?: AbortOptions & DagScopeOptions & EntityBytesOptions & BlockOrderOptions): AsyncIterableIterator - /** - * Get a single block. - */ + /** Get a complete DAG by root CID. */ + getDag (cid: UnknownLink|string, options?: AbortOptions & BlockOrderOptions): AsyncIterableIterator + /** Get a DAG for a cid+path. */ + getDagByPath (cidPath: string, options?: AbortOptions & DagScopeOptions & EntityBytesOptions & BlockOrderOptions): AsyncIterableIterator + /** Get a single block. */ getBlock (cid: UnknownLink|string, options?: AbortOptions): Promise - /** - * Get UnixFS files and directories. - */ + /** Retrieve information about a block. */ + statBlock (cid: UnknownLink|string, options?: AbortOptions): Promise + /** Stream bytes from a single block. */ + streamBlock (cid: UnknownLink|string, options?: AbortOptions & RangeOptions): Promise> + /** Get UnixFS files and directories. */ getUnixfs (path: UnknownLink|string, options?: AbortOptions): Promise - /** - * Emit nodes for all path segements and get UnixFS files and directories. - */ + /** Emit nodes for all path segements and get UnixFS files and directories. */ walkUnixfsPath (path: UnknownLink|string, options?: AbortOptions): AsyncIterableIterator } diff --git a/index.js b/index.js index 3b1a248..6c7750e 100644 --- a/index.js +++ b/index.js @@ -6,18 +6,24 @@ import { UnixFS } from 'ipfs-unixfs' import { exporter, walkPath } from 'ipfs-unixfs-exporter' import { parallelMap, transform } from 'streaming-iterables' import { Decoders, Hashers } from './defaults.js' -import { identity } from 'multiformats/hashes/identity' import { depthFirst, breadthFirst } from './traversal.js' /** + * @typedef {import('./index').BlockService} BlockService + * @typedef {import('./index').DagService} DagService + * @typedef {import('./index').UnixfsService} UnixfsService * @typedef {{ unixfs?: UnixFS }} LinkFilterContext * @typedef {([name, cid]: [string, import('multiformats').UnknownLink], context: LinkFilterContext) => boolean} LinkFilter - * @typedef {[from: number, to: number]} Range - * @typedef {{ cid: import('multiformats').UnknownLink, range?: Range }} GraphSelector + * @typedef {{ cid: import('multiformats').UnknownLink, range?: import('./index').AbsoluteRange }} GraphSelector */ const log = debug('dagula') +/** + * @implements {BlockService} + * @implements {DagService} + * @implements {UnixfsService} + */ export class Dagula { /** @type {import('./index').Blockstore} */ #blockstore @@ -44,7 +50,7 @@ export class Dagula { * @param {object} [options] * @param {AbortSignal} [options.signal] * @param {import('./index').BlockOrder} [options.order] - * @param {import('./index').ByteRange} [options.entityBytes] + * @param {import('./index').Range} [options.entityBytes] * @param {LinkFilter} [options.filter] */ async * get (cid, options = {}) { @@ -137,7 +143,7 @@ export class Dagula { * 'entity': Mimic gateway semantics: Return All blocks for a multi-block file or just enough blocks to enumerate a dir/map but not the dir contents. * Where path points to a single block file, all three selectors would return the same thing. * where path points to a sharded hamt: 'file' returns the blocks of the hamt so the dir can be listed. 'block' returns the root block of the hamt. - * @param {import('./index').ByteRange} [options.entityBytes] + * @param {import('./index').Range} [options.entityBytes] */ async * getPath (cidPath, options = {}) { const dagScope = options.dagScope ?? (options.entityBytes ? 'entity' : 'all') @@ -188,20 +194,20 @@ export class Dagula { if (!base) throw new Error('walkPath did not yield an entry') if (dagScope === 'all' || (dagScope === 'entity' && base.type !== 'directory')) { - /** @type {Range|undefined} */ + /** @type {import('./index').AbsoluteRange|undefined} */ let range if (entityBytes) { const size = Number(base.size) // resolve entity bytes to actual byte offsets range = [ - entityBytes.from < 0 - ? size - 1 + entityBytes.from - : entityBytes.from, - entityBytes.to === '*' + entityBytes[0] < 0 + ? size - 1 + entityBytes[0] + : entityBytes[0], + entityBytes[1] == null ? size - 1 - : entityBytes.to < 0 - ? size - 1 + entityBytes.to - : entityBytes.to + : entityBytes[1] < 0 + ? size - 1 + entityBytes[1] + : entityBytes[1] ] } const selectors = getUnixfsEntryLinkSelectors(base, this.#decoders, range) @@ -228,9 +234,6 @@ export class Dagula { async getBlock (cid, options = {}) { cid = typeof cid === 'string' ? CID.parse(cid) : cid log('getting block %s', cid) - if (cid.code === identity.code) { - return { cid, bytes: cid.multihash.digest } - } const block = await this.#blockstore.get(cid, { signal: options.signal }) if (!block) { throw Object.assign(new Error(`peer does not have: ${cid}`), { code: 'ERR_DONT_HAVE' }) @@ -238,6 +241,34 @@ export class Dagula { return block } + /** + * @param {import('multiformats').UnknownLink|string} cid + * @param {import('./index').AbortOptions & import('./index').RangeOptions} [options] + */ + async streamBlock (cid, options) { + cid = typeof cid === 'string' ? CID.parse(cid) : cid + log('streaming block %s', cid) + const readable = await this.#blockstore.stream(cid, { signal: options?.signal, range: options?.range }) + if (!readable) { + throw Object.assign(new Error(`peer does not have: ${cid}`), { code: 'ERR_DONT_HAVE' }) + } + return readable + } + + /** + * @param {import('multiformats').UnknownLink|string} cid + * @param {import('./index').AbortOptions} [options] + */ + async statBlock (cid, options) { + cid = typeof cid === 'string' ? CID.parse(cid) : cid + log('stat block %s', cid) + const stat = await this.#blockstore.stat(cid, { signal: options?.signal }) + if (!stat) { + throw Object.assign(new Error(`peer does not have: ${cid}`), { code: 'ERR_DONT_HAVE' }) + } + return stat + } + /** * @param {string|import('multiformats').UnknownLink} path * @param {{ signal?: AbortSignal }} [options] @@ -347,7 +378,7 @@ function toRanges (blockSizes) { const ranges = [] let offset = 0 for (const size of blockSizes) { - /** @type {Range} */ + /** @type {import('./index').AbsoluteRange} */ const absRange = [offset, offset + size - 1] ranges.push(absRange) offset += size @@ -367,9 +398,9 @@ function toRanges (blockSizes) { * toRelativeRange([100,200], [300,400]): undefined * ``` * - * @param {Range} a - * @param {Range} b - * @returns {Range|undefined} + * @param {import('./index').AbsoluteRange} a + * @param {import('./index').AbsoluteRange} b + * @returns {import('./index').AbsoluteRange|undefined} */ const toRelativeRange = (a, b) => { // starts in range @@ -396,7 +427,7 @@ const toRelativeRange = (a, b) => { * * @param {import('ipfs-unixfs-exporter').UnixFSEntry} entry * @param {import('./index').BlockDecoders} decoders - * @param {Range} [range] + * @param {import('./index').AbsoluteRange} [range] * @returns {GraphSelector[]} */ function getUnixfsEntryLinkSelectors (entry, decoders, range) { diff --git a/package-lock.json b/package-lock.json index c1381ba..7914351 100644 --- a/package-lock.json +++ b/package-lock.json @@ -36,7 +36,7 @@ "p-defer": "^4.0.0", "protobufjs": "^7.0.0", "sade": "^1.8.1", - "streaming-iterables": "^7.0.4", + "streaming-iterables": "^8.0.1", "timeout-abort-controller": "^3.0.0", "varint": "^6.0.0" }, @@ -6658,15 +6658,6 @@ "npm": ">=7.0.0" } }, - "node_modules/miniswap/node_modules/streaming-iterables": { - "version": "8.0.1", - "resolved": "https://registry.npmjs.org/streaming-iterables/-/streaming-iterables-8.0.1.tgz", - "integrity": "sha512-yfQdmUB1b+rGLZkD/r6YisT/eNOjZxBAckXKlzYNmRJnwSzHaiScykD8gsQceFcShtK09qAbLhOqvzIpnBPoDQ==", - "dev": true, - "engines": { - "node": ">=18" - } - }, "node_modules/mortice": { "version": "3.0.1", "resolved": "https://registry.npmjs.org/mortice/-/mortice-3.0.1.tgz", @@ -7976,11 +7967,11 @@ "integrity": "sha512-v+dm9bNVfOYsY1OrhaCrmyOcYoSeVvbt+hHZ0Au+T+p1y+0Uyj9aMaGIeUTT6xdpRbWzDeYKvfOslPhggQMcsg==" }, "node_modules/streaming-iterables": { - "version": "7.1.0", - "resolved": "https://registry.npmjs.org/streaming-iterables/-/streaming-iterables-7.1.0.tgz", - "integrity": "sha512-t2KmiLVhqafTRqGefD98s5XAMskfkfprr/BTzPIZz0kWB23iyR7XUkY03yjUf4aZpAuuV2/2SUOVri3LgKuOKw==", + "version": "8.0.1", + "resolved": "https://registry.npmjs.org/streaming-iterables/-/streaming-iterables-8.0.1.tgz", + "integrity": "sha512-yfQdmUB1b+rGLZkD/r6YisT/eNOjZxBAckXKlzYNmRJnwSzHaiScykD8gsQceFcShtK09qAbLhOqvzIpnBPoDQ==", "engines": { - "node": ">=14" + "node": ">=18" } }, "node_modules/streamsearch": { diff --git a/package.json b/package.json index 2a5f141..12b3a58 100644 --- a/package.json +++ b/package.json @@ -44,7 +44,7 @@ "p-defer": "^4.0.0", "protobufjs": "^7.0.0", "sade": "^1.8.1", - "streaming-iterables": "^7.0.4", + "streaming-iterables": "^8.0.1", "timeout-abort-controller": "^3.0.0", "varint": "^6.0.0" }, diff --git a/test/getPath.test.js b/test/getPath.test.js index 7298a5f..66cc774 100644 --- a/test/getPath.test.js +++ b/test/getPath.test.js @@ -11,6 +11,7 @@ import { sha256 } from 'multiformats/hashes/sha2' import { identity } from 'multiformats/hashes/identity' import { CID } from 'multiformats/cid' import * as Block from 'multiformats/block' +import { collect } from 'streaming-iterables' import { Dagula } from '../index.js' import { getLibp2p, fromNetwork } from '../p2p.js' import { startBitswapPeer } from './_libp2p.js' @@ -569,12 +570,3 @@ test('should yield intermediate blocks when last path component does not exist', t.is(blocks.length, 1) t.is(blocks[0].cid.toString(), fileLink.cid.toString()) }) - -/** @param {AsyncIterable} source */ -async function collect (source) { - const blocks = [] - for await (const block of source) { - blocks.push(block) - } - return blocks -} diff --git a/test/statBlock.test.js b/test/statBlock.test.js new file mode 100644 index 0000000..f6ff405 --- /dev/null +++ b/test/statBlock.test.js @@ -0,0 +1,20 @@ +import test from 'ava' +import { fromString } from 'multiformats/bytes' +import * as raw from 'multiformats/codecs/raw' +import { sha256 } from 'multiformats/hashes/sha2' +import { CID } from 'multiformats/cid' +import { getLibp2p, fromNetwork } from '../p2p.js' +import { startBitswapPeer } from './_libp2p.js' + +test('should stat a block', async t => { + const bytes = fromString(`TEST DATA ${Date.now()}`) + const hash = await sha256.digest(bytes) + const cid = CID.create(1, raw.code, hash) + const peer = await startBitswapPeer([{ cid, bytes }]) + + const libp2p = await getLibp2p() + const dagula = await fromNetwork(libp2p, { peer: peer.libp2p.getMultiaddrs()[0] }) + const { size } = await dagula.statBlock(cid) + + t.is(size, bytes.length) +}) diff --git a/test/streamBlock.test.js b/test/streamBlock.test.js new file mode 100644 index 0000000..7db48ee --- /dev/null +++ b/test/streamBlock.test.js @@ -0,0 +1,41 @@ +import test from 'ava' +import { fromString, toString } from 'multiformats/bytes' +import * as raw from 'multiformats/codecs/raw' +import { sha256 } from 'multiformats/hashes/sha2' +import { CID } from 'multiformats/cid' +import { collect } from 'streaming-iterables' +import { getLibp2p, fromNetwork } from '../p2p.js' +import { startBitswapPeer } from './_libp2p.js' + +test('should stream a block', async t => { + const bytes = fromString(`TEST DATA ${Date.now()}`) + const hash = await sha256.digest(bytes) + const cid = CID.create(1, raw.code, hash) + const peer = await startBitswapPeer([{ cid, bytes }]) + + const libp2p = await getLibp2p() + const dagula = await fromNetwork(libp2p, { peer: peer.libp2p.getMultiaddrs()[0] }) + const readable = await dagula.streamBlock(cid) + t.assert(readable) + + // @ts-expect-error one day all things will implement + const chunks = await collect(readable) + t.is(await new Blob(chunks).text(), toString(bytes)) +}) + +test('should stream a byte range from a block', async t => { + const bytes = fromString(`TEST DATA ${Date.now()}`) + const range = /** @type {import('../index').AbsoluteRange} */ ([5, 8]) + const hash = await sha256.digest(bytes) + const cid = CID.create(1, raw.code, hash) + const peer = await startBitswapPeer([{ cid, bytes }]) + + const libp2p = await getLibp2p() + const dagula = await fromNetwork(libp2p, { peer: peer.libp2p.getMultiaddrs()[0] }) + const readable = await dagula.streamBlock(cid, { range }) + t.assert(readable) + + // @ts-expect-error one day all things will implement + const chunks = await collect(readable) + t.is(await new Blob(chunks).text(), toString(bytes.slice(range[0], range[1] + 1))) +}) diff --git a/test/testmark.test.js b/test/testmark.test.js index c13ca46..250d38c 100644 --- a/test/testmark.test.js +++ b/test/testmark.test.js @@ -20,7 +20,7 @@ const parseQuery = query => { const entityBytes = url.searchParams.get('entity-bytes') if (entityBytes && entityBytes !== 'null') { const [from, to] = entityBytes.split(':') - options.entityBytes = { from: parseInt(from), to: to === '*' ? to : parseInt(to) } + options.entityBytes = to === '*' ? [parseInt(from)] : [parseInt(from), parseInt(to)] } const cidPath = url.pathname.replace('/ipfs/', '').split('/').map(decodeURIComponent).join('/') return { cidPath, options }