Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions index.js
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ class HyperDHT extends DHT {
...this.stats
}
this.rawStreams = new RawStreamSet(this)
this.plugins = new Map()

this._router = new Router(this, router)
this._socketPool = new SocketPool(this, opts.host || '0.0.0.0')
Expand Down Expand Up @@ -127,6 +128,7 @@ class HyperDHT extends DHT {
}
this._router.destroy()
if (this._persistent) this._persistent.destroy()
for (const [_, plugin] of this.plugins) plugin.destroy()
await this.rawStreams.clear()
await this._socketPool.destroy()
await super.destroy()
Expand Down Expand Up @@ -436,6 +438,10 @@ class HyperDHT extends DHT {
this._persistent.onimmutableget(req)
return true
}
case COMMANDS.PLUGIN_PERSISTENT: {
this._persistent.onplugin(req)
return true
}
}

return false
Expand Down Expand Up @@ -510,6 +516,10 @@ class HyperDHT extends DHT {
from
)
}

register(name, plugin) {
this.plugins.set(name, plugin)
}
}

HyperDHT.BOOTSTRAP = BOOTSTRAP_NODES
Expand Down
29 changes: 19 additions & 10 deletions lib/constants.js
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,8 @@ const COMMANDS = (exports.COMMANDS = {
MUTABLE_PUT: 6,
MUTABLE_GET: 7,
IMMUTABLE_PUT: 8,
IMMUTABLE_GET: 9
IMMUTABLE_GET: 9,
PLUGIN_PERSISTENT: 10
})

exports.BOOTSTRAP_NODES = global.Pear?.config.dht?.bootstrap || [
Expand Down Expand Up @@ -39,19 +40,27 @@ exports.ERROR = {
SEQ_TOO_LOW: 17
}

const [NS_ANNOUNCE, NS_UNANNOUNCE, NS_MUTABLE_PUT, NS_PEER_HANDSHAKE, NS_PEER_HOLEPUNCH] =
crypto.namespace('hyperswarm/dht', [
COMMANDS.ANNOUNCE,
COMMANDS.UNANNOUNCE,
COMMANDS.MUTABLE_PUT,
COMMANDS.PEER_HANDSHAKE,
COMMANDS.PEER_HOLEPUNCH
])
const [
NS_ANNOUNCE,
NS_UNANNOUNCE,
NS_MUTABLE_PUT,
NS_PEER_HANDSHAKE,
NS_PEER_HOLEPUNCH,
NS_PLUGIN_PERSISTENT
] = crypto.namespace('hyperswarm/dht', [
COMMANDS.ANNOUNCE,
COMMANDS.UNANNOUNCE,
COMMANDS.MUTABLE_PUT,
COMMANDS.PEER_HANDSHAKE,
COMMANDS.PEER_HOLEPUNCH,
COMMANDS.PLUGIN_PERSISTENT
])

exports.NS = {
ANNOUNCE: NS_ANNOUNCE,
UNANNOUNCE: NS_UNANNOUNCE,
MUTABLE_PUT: NS_MUTABLE_PUT,
PEER_HANDSHAKE: NS_PEER_HANDSHAKE,
PEER_HOLEPUNCH: NS_PEER_HOLEPUNCH
PEER_HOLEPUNCH: NS_PEER_HOLEPUNCH,
PLUGIN_PERSISTENT: NS_PLUGIN_PERSISTENT
}
20 changes: 20 additions & 0 deletions lib/messages.js
Original file line number Diff line number Diff line change
Expand Up @@ -419,3 +419,23 @@ exports.mutableGetResponse = {
}
}
}

exports.pluginRequest = {
preencode(state, m) {
c.string.preencode(state, m.plugin)
c.int.preencode(state, m.command)
c.buffer.preencode(state, m.payload)
},
encode(state, m) {
c.string.encode(state, m.plugin)
c.int.encode(state, m.command)
c.buffer.encode(state, m.payload)
},
decode(state) {
return {
plugin: c.string.decode(state),
command: c.int.decode(state),
payload: c.buffer.decode(state)
}
}
}
14 changes: 14 additions & 0 deletions lib/persistent.js
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,20 @@ module.exports = class Persistent {
req.reply(null)
}

onplugin(req) {
if (!req.value) return

const plugreq = decode(m.pluginRequest, req.value)
if (plugreq === null) return

const { plugin, command, payload } = plugreq

const p = this.dht.plugins.get(plugin)
if (!p) return

p.onrequest(req)
}

destroy() {
this.records.destroy()
this.refreshes.destroy()
Expand Down
13 changes: 13 additions & 0 deletions lib/plugin.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
module.exports = class Plugin {
constructor(name) {
this.name = name
}

onrequest(req) {
throw new Error('onrequest() must be implemented')
}

destroy() {
throw new Error('destroy() must be implemented')
}
}
1 change: 1 addition & 0 deletions test/all.js
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ async function runTests() {
await import('./pool.js')
await import('./relaying.js')
await import('./storing.js')
await import('./plugins.js')

test.resume()
}
173 changes: 173 additions & 0 deletions test/plugins.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,173 @@
const test = require('brittle')
const c = require('compact-encoding')
const sodium = require('sodium-universal')
const HyperDHT = require('../')
const { swarm } = require('./helpers')
const m = require('../lib/messages')
const { COMMANDS: HYPERDHT_COMMANDS } = require('../lib/constants')
const DHTPlugin = require('../lib/plugin')

test('plugin put - get', async function (t) {
const PLUGIN_COMMANDS = {
PUT: 0,
GET: 1
}

function mapTest(node) {
return node
}

class TestPlugin extends DHTPlugin {
constructor(dht) {
super('testplugin')

this.dht = dht
this.data = new Map()
}

onrequest(req) {
if (!req.value) return

let plugreq
try {
plugreq = c.decode(m.pluginRequest, req.value)
} catch {
return
}

const { plugin, command, payload } = plugreq

switch (command) {
case PLUGIN_COMMANDS.PUT: {
this.onput(req)
return true
}
case PLUGIN_COMMANDS.GET: {
this.onget(req)
return true
}
}

return false
}

destroy() {
// Do nothing
}

onput(req) {
if (!req.target || !req.token || !req.value) return

let val
try {
const { plugin, command, payload } = c.decode(m.pluginRequest, req.value)
val = payload
} catch {
return req.reply(null)
}

const k = req.target.toString('hex')
this.data.set(k, val)
req.reply(null)
}

onget(req) {
if (!req.target) return
const k = req.target.toString('hex')
req.reply(this.data.get(k) || null)
}

async put(val) {
const putReq = c.encode(m.pluginRequest, {
plugin: this.name,
command: PLUGIN_COMMANDS.PUT,
payload: Buffer.from(val)
})

const opts = {
map: mapTest,
commit(reply, dht) {
return dht.request(
{
token: reply.token,
target,
command: HYPERDHT_COMMANDS.PLUGIN_PERSISTENT,
value: putReq
},
reply.from
)
}
}

const target = Buffer.alloc(32)
sodium.crypto_generichash(target, Buffer.from(val))

const getReq = c.encode(m.pluginRequest, {
plugin: this.name,
command: PLUGIN_COMMANDS.GET,
payload: null
})

const query = this.dht.query(
{
target,
command: HYPERDHT_COMMANDS.PLUGIN_PERSISTENT,
value: getReq
},
opts
)

await query.finished()
return { target, closestNodes: query.closestNodes }
}

async get(target) {
const opts = { map: mapTest }

const req = c.encode(m.pluginRequest, {
plugin: this.name,
command: PLUGIN_COMMANDS.GET,
payload: null
})

const query = this.dht.query(
{
target,
command: HYPERDHT_COMMANDS.PLUGIN_PERSISTENT,
value: req
},
opts
)

for await (const node of query) {
if (node.value !== null) return node
}

return null
}
}

const { nodes } = await swarm(t, 100)
const pluginClients = []

for (const node of nodes) {
const p = new TestPlugin(node)
pluginClients.push(p)
node.register(p.name, p)
}

const put = await pluginClients[30].put('myTestValue')

t.is(put.target.length, 32)

const res = await pluginClients[30].get(put.target)
const { value } = res

t.is(value.toString(), 'myTestValue')
t.is(typeof res.from, 'object')
t.is(typeof res.from.host, 'string')
t.is(typeof res.from.port, 'number')
t.is(typeof res.to, 'object')
t.is(typeof res.to.host, 'string')
t.is(typeof res.to.port, 'number')
})
Loading