diff --git a/bitswap-fetcher.js b/bitswap-fetcher.js index e4918b2..30c7afa 100644 --- a/bitswap-fetcher.js +++ b/bitswap-fetcher.js @@ -2,6 +2,7 @@ import defer from 'p-defer' import { pipe } from 'it-pipe' import * as lp from 'it-length-prefixed' import { base58btc } from 'multiformats/bases/base58' +import { identity } from 'multiformats/hashes/identity' import debug from 'debug' import { Entry, Message, BlockPresenceType } from './message.js' import * as Prefix from './prefix.js' @@ -91,6 +92,10 @@ export class BitswapFetcher { throw options.signal.reason || abortError() } + if (cid.code === identity.code) { + return { cid, bytes: cid.multihash.digest } + } + // ensure we can hash the data when we receive the block if (!this.#hashers[cid.multihash.code]) { throw new Error(`missing hasher: 0x${cid.multihash.code.toString(16)} for wanted block: ${cid}`) @@ -123,6 +128,35 @@ export class BitswapFetcher { return deferred.promise } + /** + * @param {import('multiformats').UnknownLink} cid + * @param {{ range?: import('./index').Range, signal?: AbortSignal }} [options] + */ + async stream (cid, options) { + const block = await this.get(cid, options) + if (!block) return + + return /** @type {ReadableStream} */ (new ReadableStream({ + pull (controller) { + const { range } = options ?? {} + const bytes = range + ? block.bytes.slice(range[0], range[1] && (range[1] + 1)) + : block.bytes + controller.enqueue(bytes) + controller.close() + } + })) + } + + /** + * @param {import('multiformats').UnknownLink} cid + * @param {{ signal?: AbortSignal }} [options] + */ + async stat (cid, options) { + const block = await this.get(cid, options) + if (block) return { size: block.bytes.length } + } + /** @type {import('@libp2p/interface-registrar').StreamHandler} */ async handler ({ stream }) { log('incoming stream') diff --git a/index.d.ts b/index.d.ts index 74e99c6..6a8de05 100644 --- a/index.d.ts +++ b/index.d.ts @@ -8,6 +8,8 @@ import type { Stream } from '@libp2p/interface-connection' import type { StreamHandler } from '@libp2p/interface-registrar' import type { PeerId } from '@libp2p/interface-peer-id' +export type { AbortOptions } + export interface BlockDecoders { [code: number]: BlockDecoder } @@ -21,8 +23,26 @@ export interface Block { bytes: Uint8Array } -export interface Blockstore { - get: (cid: UnknownLink, options?: { signal?: AbortSignal }) => Promise +export interface BlockStat { + /** Total size in bytes of the block. */ + size: number +} + +export interface Blockstore extends BlockGetter, BlockStreamer, BlockInspecter {} + +export interface BlockGetter { + /** Retrieve a block. */ + get: (cid: UnknownLink, options?: AbortOptions) => Promise +} + +export interface BlockStreamer { + /** Stream bytes from a block. */ + stream: (cid: UnknownLink, options?: AbortOptions & RangeOptions) => Promise|undefined> +} + +export interface BlockInspecter { + /** Retrieve information about a block. */ + stat: (cid: UnknownLink, options?: AbortOptions) => Promise } export interface Network { @@ -63,27 +83,66 @@ export interface DagScopeOptions { } /** - * Specifies a range of bytes. - * - `*` can be substituted for end-of-file - * - `{ from: 0, to: '*' }` is the entire file. - * - Negative numbers can be used for referring to bytes from the end of a file - * - `{ from: -1024, to: '*' }` is the last 1024 bytes of a file. - * - It is also permissible to ask for the range of 500 bytes from the - * beginning of the file to 1000 bytes from the end: `{ from: 499, to: -1000 }` + * An absolute byte range to extract - always an array of two values + * corresponding to the first and last bytes (both inclusive). e.g. + * + * ``` + * [100, 200] + * ``` */ -export interface ByteRange { - /** Byte-offset of the first byte in a range (inclusive) */ - from: number - /** Byte-offset of the last byte in the range (inclusive) */ - to: number|'*' -} +export type AbsoluteRange = [first: number, last: number] + +/** + * An suffix byte range - always an array of one value corresponding to the + * first byte to start extraction from (inclusive). e.g. + * + * ``` + * [900] + * ``` + * + * If it is unknown how large a resource is, the last `n` bytes + * can be requested by specifying a negative value: + * + * ``` + * [-100] + * ``` + */ +export type SuffixRange = [first: number] + +/** + * Byte range to extract - an array of one or two values corresponding to the + * first and last bytes (both inclusive). e.g. + * + * ``` + * [100, 200] + * ``` + * + * Omitting the second value requests all remaining bytes of the resource. e.g. + * + * ``` + * [900] + * ``` + * + * Alternatively, if it's unknown how large a resource is, the last `n` bytes + * can be requested by specifying a negative value: + * + * ``` + * [-100] + * ``` + */ +export type Range = AbsoluteRange | SuffixRange export interface EntityBytesOptions { /** * A specific byte range from the entity. Setting entity bytes implies DAG * scope: entity. */ - entityBytes?: ByteRange + entityBytes?: Range +} + +export interface RangeOptions { + /** Extracts a specific byte range from the resource. */ + range?: Range } /** @@ -110,49 +169,46 @@ export interface BlockOrderOptions { order?: BlockOrder } -export interface IDagula { - /** - * Get a complete DAG by root CID. - */ +/** @deprecated Use `BlockService`, `DagService` and `UnixfsService` interface instead. */ +export interface IDagula extends BlockService, DagService, UnixfsService {} + +export interface BlockService { + /** Get a single block. */ + getBlock (cid: UnknownLink|string, options?: AbortOptions): Promise + /** Retrieve information about a block. */ + statBlock (cid: UnknownLink|string, options?: AbortOptions): Promise + /** Stream bytes from a single block. */ + streamBlock (cid: UnknownLink|string, options?: AbortOptions & RangeOptions): Promise> +} + +export interface DagService { + /** Get a complete DAG by root CID. */ get (cid: UnknownLink|string, options?: AbortOptions & BlockOrderOptions): AsyncIterableIterator - /** - * Get a DAG for a cid+path. - */ + /** Get a DAG for a cid+path. */ getPath (cidPath: string, options?: AbortOptions & DagScopeOptions & EntityBytesOptions & BlockOrderOptions): AsyncIterableIterator - /** - * Get a single block. - */ - getBlock (cid: UnknownLink|string, options?: AbortOptions): Promise - /** - * Get UnixFS files and directories. - */ +} + +export interface UnixfsService { + /** Get UnixFS files and directories. */ getUnixfs (path: UnknownLink|string, options?: AbortOptions): Promise - /** - * Emit nodes for all path segements and get UnixFS files and directories. - */ + /** Emit nodes for all path segements and get UnixFS files and directories. */ walkUnixfsPath (path: UnknownLink|string, options?: AbortOptions): AsyncIterableIterator } -export declare class Dagula implements IDagula { +export declare class Dagula implements BlockService, DagService, UnixfsService { constructor (blockstore: Blockstore, options?: { decoders?: BlockDecoders, hashers?: MultihashHashers }) - /** - * Get a complete DAG by root CID. - */ + /** Get a complete DAG by root CID. */ get (cid: UnknownLink|string, options?: AbortOptions & BlockOrderOptions): AsyncIterableIterator - /** - * Get a DAG for a cid+path. - */ + /** Get a DAG for a cid+path. */ getPath (cidPath: string, options?: AbortOptions & DagScopeOptions & EntityBytesOptions & BlockOrderOptions): AsyncIterableIterator - /** - * Get a single block. - */ + /** Get a single block. */ getBlock (cid: UnknownLink|string, options?: AbortOptions): Promise - /** - * Get UnixFS files and directories. - */ + /** Retrieve information about a block. */ + statBlock (cid: UnknownLink|string, options?: AbortOptions): Promise + /** Stream bytes from a single block. */ + streamBlock (cid: UnknownLink|string, options?: AbortOptions & RangeOptions): Promise> + /** Get UnixFS files and directories. */ getUnixfs (path: UnknownLink|string, options?: AbortOptions): Promise - /** - * Emit nodes for all path segements and get UnixFS files and directories. - */ + /** Emit nodes for all path segements and get UnixFS files and directories. */ walkUnixfsPath (path: UnknownLink|string, options?: AbortOptions): AsyncIterableIterator } diff --git a/index.js b/index.js index 3b1a248..6c7750e 100644 --- a/index.js +++ b/index.js @@ -6,18 +6,24 @@ import { UnixFS } from 'ipfs-unixfs' import { exporter, walkPath } from 'ipfs-unixfs-exporter' import { parallelMap, transform } from 'streaming-iterables' import { Decoders, Hashers } from './defaults.js' -import { identity } from 'multiformats/hashes/identity' import { depthFirst, breadthFirst } from './traversal.js' /** + * @typedef {import('./index').BlockService} BlockService + * @typedef {import('./index').DagService} DagService + * @typedef {import('./index').UnixfsService} UnixfsService * @typedef {{ unixfs?: UnixFS }} LinkFilterContext * @typedef {([name, cid]: [string, import('multiformats').UnknownLink], context: LinkFilterContext) => boolean} LinkFilter - * @typedef {[from: number, to: number]} Range - * @typedef {{ cid: import('multiformats').UnknownLink, range?: Range }} GraphSelector + * @typedef {{ cid: import('multiformats').UnknownLink, range?: import('./index').AbsoluteRange }} GraphSelector */ const log = debug('dagula') +/** + * @implements {BlockService} + * @implements {DagService} + * @implements {UnixfsService} + */ export class Dagula { /** @type {import('./index').Blockstore} */ #blockstore @@ -44,7 +50,7 @@ export class Dagula { * @param {object} [options] * @param {AbortSignal} [options.signal] * @param {import('./index').BlockOrder} [options.order] - * @param {import('./index').ByteRange} [options.entityBytes] + * @param {import('./index').Range} [options.entityBytes] * @param {LinkFilter} [options.filter] */ async * get (cid, options = {}) { @@ -137,7 +143,7 @@ export class Dagula { * 'entity': Mimic gateway semantics: Return All blocks for a multi-block file or just enough blocks to enumerate a dir/map but not the dir contents. * Where path points to a single block file, all three selectors would return the same thing. * where path points to a sharded hamt: 'file' returns the blocks of the hamt so the dir can be listed. 'block' returns the root block of the hamt. - * @param {import('./index').ByteRange} [options.entityBytes] + * @param {import('./index').Range} [options.entityBytes] */ async * getPath (cidPath, options = {}) { const dagScope = options.dagScope ?? (options.entityBytes ? 'entity' : 'all') @@ -188,20 +194,20 @@ export class Dagula { if (!base) throw new Error('walkPath did not yield an entry') if (dagScope === 'all' || (dagScope === 'entity' && base.type !== 'directory')) { - /** @type {Range|undefined} */ + /** @type {import('./index').AbsoluteRange|undefined} */ let range if (entityBytes) { const size = Number(base.size) // resolve entity bytes to actual byte offsets range = [ - entityBytes.from < 0 - ? size - 1 + entityBytes.from - : entityBytes.from, - entityBytes.to === '*' + entityBytes[0] < 0 + ? size - 1 + entityBytes[0] + : entityBytes[0], + entityBytes[1] == null ? size - 1 - : entityBytes.to < 0 - ? size - 1 + entityBytes.to - : entityBytes.to + : entityBytes[1] < 0 + ? size - 1 + entityBytes[1] + : entityBytes[1] ] } const selectors = getUnixfsEntryLinkSelectors(base, this.#decoders, range) @@ -228,9 +234,6 @@ export class Dagula { async getBlock (cid, options = {}) { cid = typeof cid === 'string' ? CID.parse(cid) : cid log('getting block %s', cid) - if (cid.code === identity.code) { - return { cid, bytes: cid.multihash.digest } - } const block = await this.#blockstore.get(cid, { signal: options.signal }) if (!block) { throw Object.assign(new Error(`peer does not have: ${cid}`), { code: 'ERR_DONT_HAVE' }) @@ -238,6 +241,34 @@ export class Dagula { return block } + /** + * @param {import('multiformats').UnknownLink|string} cid + * @param {import('./index').AbortOptions & import('./index').RangeOptions} [options] + */ + async streamBlock (cid, options) { + cid = typeof cid === 'string' ? CID.parse(cid) : cid + log('streaming block %s', cid) + const readable = await this.#blockstore.stream(cid, { signal: options?.signal, range: options?.range }) + if (!readable) { + throw Object.assign(new Error(`peer does not have: ${cid}`), { code: 'ERR_DONT_HAVE' }) + } + return readable + } + + /** + * @param {import('multiformats').UnknownLink|string} cid + * @param {import('./index').AbortOptions} [options] + */ + async statBlock (cid, options) { + cid = typeof cid === 'string' ? CID.parse(cid) : cid + log('stat block %s', cid) + const stat = await this.#blockstore.stat(cid, { signal: options?.signal }) + if (!stat) { + throw Object.assign(new Error(`peer does not have: ${cid}`), { code: 'ERR_DONT_HAVE' }) + } + return stat + } + /** * @param {string|import('multiformats').UnknownLink} path * @param {{ signal?: AbortSignal }} [options] @@ -347,7 +378,7 @@ function toRanges (blockSizes) { const ranges = [] let offset = 0 for (const size of blockSizes) { - /** @type {Range} */ + /** @type {import('./index').AbsoluteRange} */ const absRange = [offset, offset + size - 1] ranges.push(absRange) offset += size @@ -367,9 +398,9 @@ function toRanges (blockSizes) { * toRelativeRange([100,200], [300,400]): undefined * ``` * - * @param {Range} a - * @param {Range} b - * @returns {Range|undefined} + * @param {import('./index').AbsoluteRange} a + * @param {import('./index').AbsoluteRange} b + * @returns {import('./index').AbsoluteRange|undefined} */ const toRelativeRange = (a, b) => { // starts in range @@ -396,7 +427,7 @@ const toRelativeRange = (a, b) => { * * @param {import('ipfs-unixfs-exporter').UnixFSEntry} entry * @param {import('./index').BlockDecoders} decoders - * @param {Range} [range] + * @param {import('./index').AbsoluteRange} [range] * @returns {GraphSelector[]} */ function getUnixfsEntryLinkSelectors (entry, decoders, range) { diff --git a/package-lock.json b/package-lock.json index c1381ba..7914351 100644 --- a/package-lock.json +++ b/package-lock.json @@ -36,7 +36,7 @@ "p-defer": "^4.0.0", "protobufjs": "^7.0.0", "sade": "^1.8.1", - "streaming-iterables": "^7.0.4", + "streaming-iterables": "^8.0.1", "timeout-abort-controller": "^3.0.0", "varint": "^6.0.0" }, @@ -6658,15 +6658,6 @@ "npm": ">=7.0.0" } }, - "node_modules/miniswap/node_modules/streaming-iterables": { - "version": "8.0.1", - "resolved": "https://registry.npmjs.org/streaming-iterables/-/streaming-iterables-8.0.1.tgz", - "integrity": "sha512-yfQdmUB1b+rGLZkD/r6YisT/eNOjZxBAckXKlzYNmRJnwSzHaiScykD8gsQceFcShtK09qAbLhOqvzIpnBPoDQ==", - "dev": true, - "engines": { - "node": ">=18" - } - }, "node_modules/mortice": { "version": "3.0.1", "resolved": "https://registry.npmjs.org/mortice/-/mortice-3.0.1.tgz", @@ -7976,11 +7967,11 @@ "integrity": "sha512-v+dm9bNVfOYsY1OrhaCrmyOcYoSeVvbt+hHZ0Au+T+p1y+0Uyj9aMaGIeUTT6xdpRbWzDeYKvfOslPhggQMcsg==" }, "node_modules/streaming-iterables": { - "version": "7.1.0", - "resolved": "https://registry.npmjs.org/streaming-iterables/-/streaming-iterables-7.1.0.tgz", - "integrity": "sha512-t2KmiLVhqafTRqGefD98s5XAMskfkfprr/BTzPIZz0kWB23iyR7XUkY03yjUf4aZpAuuV2/2SUOVri3LgKuOKw==", + "version": "8.0.1", + "resolved": "https://registry.npmjs.org/streaming-iterables/-/streaming-iterables-8.0.1.tgz", + "integrity": "sha512-yfQdmUB1b+rGLZkD/r6YisT/eNOjZxBAckXKlzYNmRJnwSzHaiScykD8gsQceFcShtK09qAbLhOqvzIpnBPoDQ==", "engines": { - "node": ">=14" + "node": ">=18" } }, "node_modules/streamsearch": { diff --git a/package.json b/package.json index 2a5f141..12b3a58 100644 --- a/package.json +++ b/package.json @@ -44,7 +44,7 @@ "p-defer": "^4.0.0", "protobufjs": "^7.0.0", "sade": "^1.8.1", - "streaming-iterables": "^7.0.4", + "streaming-iterables": "^8.0.1", "timeout-abort-controller": "^3.0.0", "varint": "^6.0.0" }, diff --git a/test/getPath.test.js b/test/getPath.test.js index 7298a5f..66cc774 100644 --- a/test/getPath.test.js +++ b/test/getPath.test.js @@ -11,6 +11,7 @@ import { sha256 } from 'multiformats/hashes/sha2' import { identity } from 'multiformats/hashes/identity' import { CID } from 'multiformats/cid' import * as Block from 'multiformats/block' +import { collect } from 'streaming-iterables' import { Dagula } from '../index.js' import { getLibp2p, fromNetwork } from '../p2p.js' import { startBitswapPeer } from './_libp2p.js' @@ -569,12 +570,3 @@ test('should yield intermediate blocks when last path component does not exist', t.is(blocks.length, 1) t.is(blocks[0].cid.toString(), fileLink.cid.toString()) }) - -/** @param {AsyncIterable} source */ -async function collect (source) { - const blocks = [] - for await (const block of source) { - blocks.push(block) - } - return blocks -} diff --git a/test/statBlock.test.js b/test/statBlock.test.js new file mode 100644 index 0000000..f6ff405 --- /dev/null +++ b/test/statBlock.test.js @@ -0,0 +1,20 @@ +import test from 'ava' +import { fromString } from 'multiformats/bytes' +import * as raw from 'multiformats/codecs/raw' +import { sha256 } from 'multiformats/hashes/sha2' +import { CID } from 'multiformats/cid' +import { getLibp2p, fromNetwork } from '../p2p.js' +import { startBitswapPeer } from './_libp2p.js' + +test('should stat a block', async t => { + const bytes = fromString(`TEST DATA ${Date.now()}`) + const hash = await sha256.digest(bytes) + const cid = CID.create(1, raw.code, hash) + const peer = await startBitswapPeer([{ cid, bytes }]) + + const libp2p = await getLibp2p() + const dagula = await fromNetwork(libp2p, { peer: peer.libp2p.getMultiaddrs()[0] }) + const { size } = await dagula.statBlock(cid) + + t.is(size, bytes.length) +}) diff --git a/test/streamBlock.test.js b/test/streamBlock.test.js new file mode 100644 index 0000000..7db48ee --- /dev/null +++ b/test/streamBlock.test.js @@ -0,0 +1,41 @@ +import test from 'ava' +import { fromString, toString } from 'multiformats/bytes' +import * as raw from 'multiformats/codecs/raw' +import { sha256 } from 'multiformats/hashes/sha2' +import { CID } from 'multiformats/cid' +import { collect } from 'streaming-iterables' +import { getLibp2p, fromNetwork } from '../p2p.js' +import { startBitswapPeer } from './_libp2p.js' + +test('should stream a block', async t => { + const bytes = fromString(`TEST DATA ${Date.now()}`) + const hash = await sha256.digest(bytes) + const cid = CID.create(1, raw.code, hash) + const peer = await startBitswapPeer([{ cid, bytes }]) + + const libp2p = await getLibp2p() + const dagula = await fromNetwork(libp2p, { peer: peer.libp2p.getMultiaddrs()[0] }) + const readable = await dagula.streamBlock(cid) + t.assert(readable) + + // @ts-expect-error one day all things will implement + const chunks = await collect(readable) + t.is(await new Blob(chunks).text(), toString(bytes)) +}) + +test('should stream a byte range from a block', async t => { + const bytes = fromString(`TEST DATA ${Date.now()}`) + const range = /** @type {import('../index').AbsoluteRange} */ ([5, 8]) + const hash = await sha256.digest(bytes) + const cid = CID.create(1, raw.code, hash) + const peer = await startBitswapPeer([{ cid, bytes }]) + + const libp2p = await getLibp2p() + const dagula = await fromNetwork(libp2p, { peer: peer.libp2p.getMultiaddrs()[0] }) + const readable = await dagula.streamBlock(cid, { range }) + t.assert(readable) + + // @ts-expect-error one day all things will implement + const chunks = await collect(readable) + t.is(await new Blob(chunks).text(), toString(bytes.slice(range[0], range[1] + 1))) +}) diff --git a/test/testmark.test.js b/test/testmark.test.js index c13ca46..250d38c 100644 --- a/test/testmark.test.js +++ b/test/testmark.test.js @@ -20,7 +20,7 @@ const parseQuery = query => { const entityBytes = url.searchParams.get('entity-bytes') if (entityBytes && entityBytes !== 'null') { const [from, to] = entityBytes.split(':') - options.entityBytes = { from: parseInt(from), to: to === '*' ? to : parseInt(to) } + options.entityBytes = to === '*' ? [parseInt(from)] : [parseInt(from), parseInt(to)] } const cidPath = url.pathname.replace('/ipfs/', '').split('/').map(decodeURIComponent).join('/') return { cidPath, options }