Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 34 additions & 0 deletions bitswap-fetcher.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import defer from 'p-defer'
import { pipe } from 'it-pipe'
import * as lp from 'it-length-prefixed'
import { base58btc } from 'multiformats/bases/base58'
import { identity } from 'multiformats/hashes/identity'
import debug from 'debug'
import { Entry, Message, BlockPresenceType } from './message.js'
import * as Prefix from './prefix.js'
Expand Down Expand Up @@ -91,6 +92,10 @@ export class BitswapFetcher {
throw options.signal.reason || abortError()
}

if (cid.code === identity.code) {
return { cid, bytes: cid.multihash.digest }
}

// ensure we can hash the data when we receive the block
if (!this.#hashers[cid.multihash.code]) {
throw new Error(`missing hasher: 0x${cid.multihash.code.toString(16)} for wanted block: ${cid}`)
Expand Down Expand Up @@ -123,6 +128,35 @@ export class BitswapFetcher {
return deferred.promise
}

/**
* @param {import('multiformats').UnknownLink} cid
* @param {{ range?: import('./index').Range, signal?: AbortSignal }} [options]
*/
async stream (cid, options) {
const block = await this.get(cid, options)
if (!block) return

return /** @type {ReadableStream<Uint8Array>} */ (new ReadableStream({
pull (controller) {
const { range } = options ?? {}
const bytes = range
? block.bytes.slice(range[0], range[1] && (range[1] + 1))
: block.bytes
controller.enqueue(bytes)
controller.close()
}
}))
}

/**
* @param {import('multiformats').UnknownLink} cid
* @param {{ signal?: AbortSignal }} [options]
*/
async stat (cid, options) {
const block = await this.get(cid, options)
if (block) return { size: block.bytes.length }
}

/** @type {import('@libp2p/interface-registrar').StreamHandler} */
async handler ({ stream }) {
log('incoming stream')
Expand Down
62 changes: 62 additions & 0 deletions blocks.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
import debug from 'debug'
import { CID } from 'multiformats/cid'

const log = debug('dagula:blocks')

/**
* @typedef {import('./index').BlockService} BlockService
* @implements {BlockService}
*/
export class Blocks {
/** @type {import('./index').Blockstore} */
#blockstore

/**
* @param {import('./index').BlockGetter & import('./index').BlockStreamer & import('./index').BlockInspecter} blockstore
*/
constructor (blockstore) {
this.#blockstore = blockstore
}

/**
* @param {import('multiformats').UnknownLink|string} cid
* @param {{ signal?: AbortSignal }} [options]
*/
async getBlock (cid, options = {}) {
cid = typeof cid === 'string' ? CID.parse(cid) : cid
log('getting block %s', cid)
const block = await this.#blockstore.get(cid, { signal: options.signal })
if (!block) {
throw Object.assign(new Error(`peer does not have: ${cid}`), { code: 'ERR_DONT_HAVE' })
}
return block
}

/**
* @param {import('multiformats').UnknownLink|string} cid
* @param {import('./index').AbortOptions & import('./index').RangeOptions} [options]
*/
async streamBlock (cid, options) {
cid = typeof cid === 'string' ? CID.parse(cid) : cid
log('streaming block %s', cid)
const readable = await this.#blockstore.stream(cid, { signal: options?.signal, range: options?.range })
if (!readable) {
throw Object.assign(new Error(`peer does not have: ${cid}`), { code: 'ERR_DONT_HAVE' })
}
return readable
}

/**
* @param {import('multiformats').UnknownLink|string} cid
* @param {import('./index').AbortOptions} [options]
*/
async statBlock (cid, options) {
cid = typeof cid === 'string' ? CID.parse(cid) : cid
log('stat block %s', cid)
const stat = await this.#blockstore.stat(cid, { signal: options?.signal })
if (!stat) {
throw Object.assign(new Error(`peer does not have: ${cid}`), { code: 'ERR_DONT_HAVE' })
}
return stat
}
}
Loading