From ea55435a60d1d54ac0456db25e922bb8f14ddb01 Mon Sep 17 00:00:00 2001 From: Dominic Cassidy Date: Thu, 19 Feb 2026 22:54:24 +0200 Subject: [PATCH 1/8] Add support for encoding closest nodes in key --- build.js | 22 ++++++++ index.js | 12 +++- lib/connect.js | 20 ++++++- package.json | 1 + spec/hyperschema/index.js | 103 +++++++++++++++++++++++++++++++++++ spec/hyperschema/schema.json | 25 +++++++++ test/cache.js | 50 +++++++++++++++++ 7 files changed, 229 insertions(+), 4 deletions(-) create mode 100644 build.js create mode 100644 spec/hyperschema/index.js create mode 100644 spec/hyperschema/schema.json create mode 100644 test/cache.js diff --git a/build.js b/build.js new file mode 100644 index 00000000..aaffa1b6 --- /dev/null +++ b/build.js @@ -0,0 +1,22 @@ +const Hyperschema = require('hyperschema') + +const schema = Hyperschema.from('./spec/hyperschema') +const ns = schema.namespace('hyperdht') + +ns.register({ + name: 'key', + fields: [ + { + name: 'key', + type: 'fixed32', + required: true + }, + { + name: 'nodes', + type: 'ipv4Address', + array: true + } + ] +}) + +Hyperschema.toDisk(schema) diff --git a/index.js b/index.js index 40c86805..8171cbc2 100644 --- a/index.js +++ b/index.js @@ -12,10 +12,12 @@ const Server = require('./lib/server') const connect = require('./lib/connect') const { FIREWALL, BOOTSTRAP_NODES, KNOWN_NODES, COMMANDS } = require('./lib/constants') const { hash, createKeyPair } = require('./lib/crypto') -const { decode } = require('hypercore-id-encoding') const RawStreamSet = require('./lib/raw-stream-set') const ConnectionPool = require('./lib/connection-pool') const { STREAM_NOT_CONNECTED } = require('./lib/errors') +const { getEncoding } = require('./spec/hyperschema/index.js') + +const KeyEncoding = getEncoding('@hyperdht/key') const DEFAULTS = { ...DHT.DEFAULTS, @@ -77,8 +79,12 @@ class HyperDHT extends DHT { static DEFAULTS = DEFAULTS - connect(remotePublicKey, opts) { - return connect(this, decode(remotePublicKey), opts) + static EncodeKey(key, nodes = []) { + return c.encode(KeyEncoding, { key, nodes }) + } + + connect(remotePublicKey, opts = {}) { + return connect(this, remotePublicKey, opts) } createServer(opts, onconnection) { diff --git a/lib/connect.js b/lib/connect.js index bad3c896..ebb90c9a 100644 --- a/lib/connect.js +++ b/lib/connect.js @@ -28,14 +28,32 @@ const { RELAY_ABORTED, SUSPENDED } = require('./errors') +const { decode } = require('hypercore-id-encoding') +const { getEncoding } = require('../spec/hyperschema/index.js') +const cenc = require('compact-encoding') + +const KeyEncoding = getEncoding('@hyperdht/key') module.exports = function connect(dht, publicKey, opts = {}) { const pool = opts.pool || null + let providedNodes = null + if (b4a.isBuffer(publicKey) && publicKey.length > 32) { + const { key, nodes } = cenc.decode(KeyEncoding, publicKey) + publicKey = key + providedNodes = nodes + } else { + publicKey = decode(publicKey) + } + if (pool && pool.has(publicKey)) return pool.get(publicKey) publicKey = unslab(publicKey) + const relayAddresses = + opts.relayAddresses && providedNodes + ? [...providedNodes, ...opts.relayAddresses] + : providedNodes || opts.relayAddresses || [] const keyPair = opts.keyPair || dht.defaultKeyPair const relayThrough = selectRelay(opts.relayThrough || null) const encryptedSocket = (opts.createSecretStream || defaultCreateSecretStream)(true, null, { @@ -58,7 +76,7 @@ module.exports = function connect(dht, publicKey, opts = {}) { id, dht, session: dht.session(), - relayAddresses: opts.relayAddresses || [], + relayAddresses, remoteRelayAddresses: [], pool, round: 0, diff --git a/package.json b/package.json index 8eb19986..94de8766 100644 --- a/package.json +++ b/package.json @@ -35,6 +35,7 @@ "dht-rpc": "^6.15.1", "hypercore-crypto": "^3.3.0", "hypercore-id-encoding": "^1.2.0", + "hyperschema": "^1.20.1", "noise-curve-ed": "^2.0.0", "noise-handshake": "^4.0.0", "record-cache": "^1.1.1", diff --git a/spec/hyperschema/index.js b/spec/hyperschema/index.js new file mode 100644 index 00000000..4f04e989 --- /dev/null +++ b/spec/hyperschema/index.js @@ -0,0 +1,103 @@ +// This file is autogenerated by the hyperschema compiler +// Schema Version: 1 +/* eslint-disable camelcase */ +/* eslint-disable quotes */ +/* eslint-disable space-before-function-paren */ + +const { c } = require('hyperschema/runtime') + +const VERSION = 1 + +// eslint-disable-next-line no-unused-vars +let version = VERSION + +// @hyperdht/key.nodes +const encoding0_1 = c.array(c.ipv4Address) + +// @hyperdht/key +const encoding0 = { + preencode(state, m) { + c.fixed32.preencode(state, m.key) + state.end++ // max flag is 1 so always one byte + + if (m.nodes) encoding0_1.preencode(state, m.nodes) + }, + encode(state, m) { + const flags = m.nodes ? 1 : 0 + + c.fixed32.encode(state, m.key) + c.uint.encode(state, flags) + + if (m.nodes) encoding0_1.encode(state, m.nodes) + }, + decode(state) { + const r0 = c.fixed32.decode(state) + const flags = c.uint.decode(state) + + return { + key: r0, + nodes: (flags & 1) !== 0 ? encoding0_1.decode(state) : null + } + } +} + +function setVersion(v) { + version = v +} + +function encode(name, value, v = VERSION) { + version = v + return c.encode(getEncoding(name), value) +} + +function decode(name, buffer, v = VERSION) { + version = v + return c.decode(getEncoding(name), buffer) +} + +function getEnum(name) { + switch (name) { + default: + throw new Error('Enum not found ' + name) + } +} + +function getEncoding(name) { + switch (name) { + case '@hyperdht/key': + return encoding0 + default: + throw new Error('Encoder not found ' + name) + } +} + +function getStruct(name, v = VERSION) { + const enc = getEncoding(name) + return { + preencode(state, m) { + version = v + enc.preencode(state, m) + }, + encode(state, m) { + version = v + enc.encode(state, m) + }, + decode(state) { + version = v + return enc.decode(state) + } + } +} + +const resolveStruct = getStruct // compat + +module.exports = { + resolveStruct, + getStruct, + getEnum, + getEncoding, + encode, + decode, + setVersion, + version +} diff --git a/spec/hyperschema/schema.json b/spec/hyperschema/schema.json new file mode 100644 index 00000000..adf9f191 --- /dev/null +++ b/spec/hyperschema/schema.json @@ -0,0 +1,25 @@ +{ + "version": 1, + "schema": [ + { + "name": "key", + "namespace": "hyperdht", + "compact": false, + "flagsPosition": 1, + "fields": [ + { + "name": "key", + "required": true, + "type": "fixed32", + "version": 1 + }, + { + "name": "nodes", + "array": true, + "type": "ipv4Address", + "version": 1 + } + ] + } + ] +} diff --git a/test/cache.js b/test/cache.js new file mode 100644 index 00000000..a7cccf7b --- /dev/null +++ b/test/cache.js @@ -0,0 +1,50 @@ +const test = require('brittle') +const { swarm, toArray } = require('./helpers') +const DHT = require('../') +const HyperDHT = require('../') + +test('createServer + connect - once defaults', async function (t) { + t.plan(2) + + const [a, b] = await swarm(t) + const lc = t.test('socket lifecycle') + + lc.plan(4) + + const server = a.createServer(function (socket) { + lc.pass('server side opened') + + socket.once('end', function () { + lc.pass('server side ended') + socket.end() + }) + }) + + await server.listen() + + const q = b.findPeer(server.publicKey) + const result = await toArray(q) + const target = HyperDHT.EncodeKey(server.publicKey, [result[0].to]) + + console.log(target) + + const socket = b.connect(target) + + socket.once('open', function () { + lc.pass('client side opened') + }) + + socket.once('end', function () { + lc.pass('client side ended') + }) + + socket.end() + + await lc + + server.on('close', function () { + t.pass('server closed') + }) + + await server.close() +}) From 9ead92d64bff92f0e9f955a5c5cf62d70aaf9241 Mon Sep 17 00:00:00 2001 From: Dominic Cassidy Date: Fri, 20 Feb 2026 14:13:32 +0200 Subject: [PATCH 2/8] flip order of providedNodes --- lib/connect.js | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/connect.js b/lib/connect.js index ebb90c9a..cd1d71dc 100644 --- a/lib/connect.js +++ b/lib/connect.js @@ -52,7 +52,7 @@ module.exports = function connect(dht, publicKey, opts = {}) { const relayAddresses = opts.relayAddresses && providedNodes - ? [...providedNodes, ...opts.relayAddresses] + ? [...opts.relayAddresses, ...providedNodes] : providedNodes || opts.relayAddresses || [] const keyPair = opts.keyPair || dht.defaultKeyPair const relayThrough = selectRelay(opts.relayThrough || null) From bf406c4fbe5f703bfc64e3386dff58653270d8fa Mon Sep 17 00:00:00 2001 From: Dominic Cassidy Date: Fri, 20 Feb 2026 14:18:19 +0200 Subject: [PATCH 3/8] simplify test --- test/cache.js | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/test/cache.js b/test/cache.js index a7cccf7b..221e21a2 100644 --- a/test/cache.js +++ b/test/cache.js @@ -22,11 +22,9 @@ test('createServer + connect - once defaults', async function (t) { await server.listen() - const q = b.findPeer(server.publicKey) - const result = await toArray(q) - const target = HyperDHT.EncodeKey(server.publicKey, [result[0].to]) - - console.log(target) + const target = HyperDHT.EncodeKey(server.publicKey, [ + { host: b.io._boundServerPort, port: b.io._boundServerPort } + ]) const socket = b.connect(target) From 2f73b616f62a180a36aa68b3279b492f1019e197 Mon Sep 17 00:00:00 2001 From: Dominic Cassidy Date: Fri, 20 Feb 2026 14:26:17 +0200 Subject: [PATCH 4/8] tidy --- lib/connect.js | 13 +++++-------- spec/hyperschema/index.js | 14 +++++++------- spec/hyperschema/schema.json | 4 ++-- 3 files changed, 14 insertions(+), 17 deletions(-) diff --git a/lib/connect.js b/lib/connect.js index cd1d71dc..f17cb701 100644 --- a/lib/connect.js +++ b/lib/connect.js @@ -37,14 +37,11 @@ const KeyEncoding = getEncoding('@hyperdht/key') module.exports = function connect(dht, publicKey, opts = {}) { const pool = opts.pool || null - let providedNodes = null - if (b4a.isBuffer(publicKey) && publicKey.length > 32) { - const { key, nodes } = cenc.decode(KeyEncoding, publicKey) - publicKey = key - providedNodes = nodes - } else { - publicKey = decode(publicKey) - } + const { key, nodes: providedNodes } = cenc.decode( + KeyEncoding, + b4a.isBuffer(publicKey) ? publicKey : decode(publicKey) + ) + publicKey = key if (pool && pool.has(publicKey)) return pool.get(publicKey) diff --git a/spec/hyperschema/index.js b/spec/hyperschema/index.js index 4f04e989..2ab92d43 100644 --- a/spec/hyperschema/index.js +++ b/spec/hyperschema/index.js @@ -1,12 +1,12 @@ // This file is autogenerated by the hyperschema compiler -// Schema Version: 1 +// Schema Version: 2 /* eslint-disable camelcase */ /* eslint-disable quotes */ /* eslint-disable space-before-function-paren */ const { c } = require('hyperschema/runtime') -const VERSION = 1 +const VERSION = 2 // eslint-disable-next-line no-unused-vars let version = VERSION @@ -20,23 +20,23 @@ const encoding0 = { c.fixed32.preencode(state, m.key) state.end++ // max flag is 1 so always one byte - if (m.nodes) encoding0_1.preencode(state, m.nodes) + if (version >= 2 && m.nodes) encoding0_1.preencode(state, m.nodes) }, encode(state, m) { - const flags = m.nodes ? 1 : 0 + const flags = version >= 2 && m.nodes ? 1 : 0 c.fixed32.encode(state, m.key) c.uint.encode(state, flags) - if (m.nodes) encoding0_1.encode(state, m.nodes) + if (version >= 2 && m.nodes) encoding0_1.encode(state, m.nodes) }, decode(state) { const r0 = c.fixed32.decode(state) - const flags = c.uint.decode(state) + const flags = state.start < state.end ? c.uint.decode(state) : 0 return { key: r0, - nodes: (flags & 1) !== 0 ? encoding0_1.decode(state) : null + nodes: version >= 2 && (flags & 1) !== 0 ? encoding0_1.decode(state) : null } } } diff --git a/spec/hyperschema/schema.json b/spec/hyperschema/schema.json index adf9f191..662c71c2 100644 --- a/spec/hyperschema/schema.json +++ b/spec/hyperschema/schema.json @@ -1,5 +1,5 @@ { - "version": 1, + "version": 2, "schema": [ { "name": "key", @@ -17,7 +17,7 @@ "name": "nodes", "array": true, "type": "ipv4Address", - "version": 1 + "version": 2 } ] } From 3a0da116fbcb8784b2288c7b4c30283cd24231a1 Mon Sep 17 00:00:00 2001 From: Dominic Cassidy Date: Tue, 10 Mar 2026 23:53:29 +0200 Subject: [PATCH 5/8] Swap to using hyperdht-address --- build.js | 22 -------- index.js | 7 --- lib/connect.js | 13 +---- package.json | 1 + spec/hyperschema/index.js | 103 ----------------------------------- spec/hyperschema/schema.json | 25 --------- test/cache.js | 6 +- 7 files changed, 7 insertions(+), 170 deletions(-) delete mode 100644 build.js delete mode 100644 spec/hyperschema/index.js delete mode 100644 spec/hyperschema/schema.json diff --git a/build.js b/build.js deleted file mode 100644 index aaffa1b6..00000000 --- a/build.js +++ /dev/null @@ -1,22 +0,0 @@ -const Hyperschema = require('hyperschema') - -const schema = Hyperschema.from('./spec/hyperschema') -const ns = schema.namespace('hyperdht') - -ns.register({ - name: 'key', - fields: [ - { - name: 'key', - type: 'fixed32', - required: true - }, - { - name: 'nodes', - type: 'ipv4Address', - array: true - } - ] -}) - -Hyperschema.toDisk(schema) diff --git a/index.js b/index.js index 8171cbc2..f348412a 100644 --- a/index.js +++ b/index.js @@ -15,9 +15,6 @@ const { hash, createKeyPair } = require('./lib/crypto') const RawStreamSet = require('./lib/raw-stream-set') const ConnectionPool = require('./lib/connection-pool') const { STREAM_NOT_CONNECTED } = require('./lib/errors') -const { getEncoding } = require('./spec/hyperschema/index.js') - -const KeyEncoding = getEncoding('@hyperdht/key') const DEFAULTS = { ...DHT.DEFAULTS, @@ -79,10 +76,6 @@ class HyperDHT extends DHT { static DEFAULTS = DEFAULTS - static EncodeKey(key, nodes = []) { - return c.encode(KeyEncoding, { key, nodes }) - } - connect(remotePublicKey, opts = {}) { return connect(this, remotePublicKey, opts) } diff --git a/lib/connect.js b/lib/connect.js index f17cb701..e3f04a20 100644 --- a/lib/connect.js +++ b/lib/connect.js @@ -29,16 +29,12 @@ const { SUSPENDED } = require('./errors') const { decode } = require('hypercore-id-encoding') -const { getEncoding } = require('../spec/hyperschema/index.js') -const cenc = require('compact-encoding') - -const KeyEncoding = getEncoding('@hyperdht/key') +const HyperDHTAddress = require('hyperdht-address') module.exports = function connect(dht, publicKey, opts = {}) { const pool = opts.pool || null - const { key, nodes: providedNodes } = cenc.decode( - KeyEncoding, + const { key, nodes: providedNodes } = HyperDHTAddress.decode( b4a.isBuffer(publicKey) ? publicKey : decode(publicKey) ) publicKey = key @@ -47,10 +43,7 @@ module.exports = function connect(dht, publicKey, opts = {}) { publicKey = unslab(publicKey) - const relayAddresses = - opts.relayAddresses && providedNodes - ? [...opts.relayAddresses, ...providedNodes] - : providedNodes || opts.relayAddresses || [] + const relayAddresses = opts.relayAddresses || providedNodes || [] const keyPair = opts.keyPair || dht.defaultKeyPair const relayThrough = selectRelay(opts.relayThrough || null) const encryptedSocket = (opts.createSecretStream || defaultCreateSecretStream)(true, null, { diff --git a/package.json b/package.json index 94de8766..0dcd243e 100644 --- a/package.json +++ b/package.json @@ -35,6 +35,7 @@ "dht-rpc": "^6.15.1", "hypercore-crypto": "^3.3.0", "hypercore-id-encoding": "^1.2.0", + "hyperdht-address": "^1.0.1", "hyperschema": "^1.20.1", "noise-curve-ed": "^2.0.0", "noise-handshake": "^4.0.0", diff --git a/spec/hyperschema/index.js b/spec/hyperschema/index.js deleted file mode 100644 index 2ab92d43..00000000 --- a/spec/hyperschema/index.js +++ /dev/null @@ -1,103 +0,0 @@ -// This file is autogenerated by the hyperschema compiler -// Schema Version: 2 -/* eslint-disable camelcase */ -/* eslint-disable quotes */ -/* eslint-disable space-before-function-paren */ - -const { c } = require('hyperschema/runtime') - -const VERSION = 2 - -// eslint-disable-next-line no-unused-vars -let version = VERSION - -// @hyperdht/key.nodes -const encoding0_1 = c.array(c.ipv4Address) - -// @hyperdht/key -const encoding0 = { - preencode(state, m) { - c.fixed32.preencode(state, m.key) - state.end++ // max flag is 1 so always one byte - - if (version >= 2 && m.nodes) encoding0_1.preencode(state, m.nodes) - }, - encode(state, m) { - const flags = version >= 2 && m.nodes ? 1 : 0 - - c.fixed32.encode(state, m.key) - c.uint.encode(state, flags) - - if (version >= 2 && m.nodes) encoding0_1.encode(state, m.nodes) - }, - decode(state) { - const r0 = c.fixed32.decode(state) - const flags = state.start < state.end ? c.uint.decode(state) : 0 - - return { - key: r0, - nodes: version >= 2 && (flags & 1) !== 0 ? encoding0_1.decode(state) : null - } - } -} - -function setVersion(v) { - version = v -} - -function encode(name, value, v = VERSION) { - version = v - return c.encode(getEncoding(name), value) -} - -function decode(name, buffer, v = VERSION) { - version = v - return c.decode(getEncoding(name), buffer) -} - -function getEnum(name) { - switch (name) { - default: - throw new Error('Enum not found ' + name) - } -} - -function getEncoding(name) { - switch (name) { - case '@hyperdht/key': - return encoding0 - default: - throw new Error('Encoder not found ' + name) - } -} - -function getStruct(name, v = VERSION) { - const enc = getEncoding(name) - return { - preencode(state, m) { - version = v - enc.preencode(state, m) - }, - encode(state, m) { - version = v - enc.encode(state, m) - }, - decode(state) { - version = v - return enc.decode(state) - } - } -} - -const resolveStruct = getStruct // compat - -module.exports = { - resolveStruct, - getStruct, - getEnum, - getEncoding, - encode, - decode, - setVersion, - version -} diff --git a/spec/hyperschema/schema.json b/spec/hyperschema/schema.json deleted file mode 100644 index 662c71c2..00000000 --- a/spec/hyperschema/schema.json +++ /dev/null @@ -1,25 +0,0 @@ -{ - "version": 2, - "schema": [ - { - "name": "key", - "namespace": "hyperdht", - "compact": false, - "flagsPosition": 1, - "fields": [ - { - "name": "key", - "required": true, - "type": "fixed32", - "version": 1 - }, - { - "name": "nodes", - "array": true, - "type": "ipv4Address", - "version": 2 - } - ] - } - ] -} diff --git a/test/cache.js b/test/cache.js index 221e21a2..2bed57e8 100644 --- a/test/cache.js +++ b/test/cache.js @@ -1,6 +1,6 @@ const test = require('brittle') -const { swarm, toArray } = require('./helpers') -const DHT = require('../') +const { swarm } = require('./helpers') +const HyperDHTAddress = require('hyperdht-address') const HyperDHT = require('../') test('createServer + connect - once defaults', async function (t) { @@ -22,7 +22,7 @@ test('createServer + connect - once defaults', async function (t) { await server.listen() - const target = HyperDHT.EncodeKey(server.publicKey, [ + const target = HyperDHTAddress.encode(server.publicKey, [ { host: b.io._boundServerPort, port: b.io._boundServerPort } ]) From 6e968ccbf0a28d3d80bd77778c701354c5c078a8 Mon Sep 17 00:00:00 2001 From: Dominic Cassidy Date: Wed, 11 Mar 2026 00:14:01 +0200 Subject: [PATCH 6/8] update test --- test/all.js | 1 + test/cache.js | 42 +++++++++++++++++++++++------------------- 2 files changed, 24 insertions(+), 19 deletions(-) diff --git a/test/all.js b/test/all.js index 6c5c24f7..a74f2446 100644 --- a/test/all.js +++ b/test/all.js @@ -8,6 +8,7 @@ async function runTests() { test.pause() await import('./announces.js') + await import('./cache.js') await import('./connections.js') await import('./holepuncher.js') await import('./lifecycle.js') diff --git a/test/cache.js b/test/cache.js index 2bed57e8..a64bc91e 100644 --- a/test/cache.js +++ b/test/cache.js @@ -1,44 +1,48 @@ const test = require('brittle') const { swarm } = require('./helpers') const HyperDHTAddress = require('hyperdht-address') -const HyperDHT = require('../') -test('createServer + connect - once defaults', async function (t) { - t.plan(2) +test('cache - key with nodes', async function (t) { + t.plan(3) const [a, b] = await swarm(t) - const lc = t.test('socket lifecycle') + const ts = t.test('server') - lc.plan(4) + ts.plan(2) const server = a.createServer(function (socket) { - lc.pass('server side opened') + ts.pass('server side opened') socket.once('end', function () { - lc.pass('server side ended') + ts.pass('server side ended') socket.end() }) }) await server.listen() - const target = HyperDHTAddress.encode(server.publicKey, [ - { host: b.io._boundServerPort, port: b.io._boundServerPort } - ]) + { + const tn = t.test('client w/nodes') + tn.plan(2) - const socket = b.connect(target) + const target = HyperDHTAddress.encode(server.publicKey, [ + { host: b.io._boundServerPort, port: b.io._boundServerPort } + ]) - socket.once('open', function () { - lc.pass('client side opened') - }) + const socket = b.connect(target) - socket.once('end', function () { - lc.pass('client side ended') - }) + socket.once('open', function () { + tn.pass('client side opened') + }) + + socket.once('end', function () { + tn.pass('client side ended') + }) - socket.end() + socket.end() - await lc + await tn + } server.on('close', function () { t.pass('server closed') From 1b320fa625354537ca63556540de42c3b0b77700 Mon Sep 17 00:00:00 2001 From: Dominic Cassidy Date: Fri, 27 Mar 2026 11:19:29 +0000 Subject: [PATCH 7/8] review fixes --- index.js | 2 +- lib/connect.js | 4 ++-- package.json | 1 - test/cache.js | 4 +--- 4 files changed, 4 insertions(+), 7 deletions(-) diff --git a/index.js b/index.js index f348412a..31a09481 100644 --- a/index.js +++ b/index.js @@ -76,7 +76,7 @@ class HyperDHT extends DHT { static DEFAULTS = DEFAULTS - connect(remotePublicKey, opts = {}) { + connect(remotePublicKey, opts) { return connect(this, remotePublicKey, opts) } diff --git a/lib/connect.js b/lib/connect.js index e3f04a20..1aba2acb 100644 --- a/lib/connect.js +++ b/lib/connect.js @@ -43,7 +43,7 @@ module.exports = function connect(dht, publicKey, opts = {}) { publicKey = unslab(publicKey) - const relayAddresses = opts.relayAddresses || providedNodes || [] + opts.relayAddresses = opts.relayAddresses || providedNodes || [] const keyPair = opts.keyPair || dht.defaultKeyPair const relayThrough = selectRelay(opts.relayThrough || null) const encryptedSocket = (opts.createSecretStream || defaultCreateSecretStream)(true, null, { @@ -66,7 +66,7 @@ module.exports = function connect(dht, publicKey, opts = {}) { id, dht, session: dht.session(), - relayAddresses, + relayAddresses: opts.relayAddresses, remoteRelayAddresses: [], pool, round: 0, diff --git a/package.json b/package.json index 0dcd243e..1e5cc477 100644 --- a/package.json +++ b/package.json @@ -36,7 +36,6 @@ "hypercore-crypto": "^3.3.0", "hypercore-id-encoding": "^1.2.0", "hyperdht-address": "^1.0.1", - "hyperschema": "^1.20.1", "noise-curve-ed": "^2.0.0", "noise-handshake": "^4.0.0", "record-cache": "^1.1.1", diff --git a/test/cache.js b/test/cache.js index a64bc91e..6909dfbb 100644 --- a/test/cache.js +++ b/test/cache.js @@ -25,9 +25,7 @@ test('cache - key with nodes', async function (t) { const tn = t.test('client w/nodes') tn.plan(2) - const target = HyperDHTAddress.encode(server.publicKey, [ - { host: b.io._boundServerPort, port: b.io._boundServerPort } - ]) + const target = HyperDHTAddress.encode(server.publicKey, server.relayAddresses) const socket = b.connect(target) From f142fba9d142355cb0921cff080f1b1a1eeb1d2c Mon Sep 17 00:00:00 2001 From: Sean Zellmer Date: Wed, 8 Apr 2026 16:18:12 -0500 Subject: [PATCH 8/8] Pre-connect to `closestNodes` in parallel to `FIND_PEER` query (#245) * Use closestNodes to connect in parallel to findPeer query Given `relayAddresses` (referred to as `closestNodes` in `dht-rpc`) are passed when connecting, these nodes could be used to connect through immediately while the normal `FIND_PEER` query is run. The closestNodes are likely candidates for connecting to a specific peer so attempting connection can speed up connection by skipping querying the DHT. To accommodate the extra connection, the total connections allowed is increased by 1 when closestNodes are passed. If a closestNode is returned as part of the findPeer query, then connection is skipped assuming that the preConnect parallel approach will establish the connection. * Add closestNode address to remoteRelayAddresses when attempting In the same way that connectThroughNode may work for a node returned via a `FIND_PEER` query, closestNodes attempted as part of preconnect should also be tracked for caching. * Add closestNode address to remoteRelayAddresses when attempting In the same way that connectThroughNode may work for a node returned via a `FIND_PEER` query, closestNodes attempted as part of preconnect should also be tracked for caching. * Check that `closestNodes` is set before skipping node in `FIND_PEER` `preConnect` can be `true` while `closestNodes` is `null` when the `closestNodes` were set initially but then cleared when the query makes a second attempt. * Filter out double `open` events in server side pool test This test previously wasn't likely to emit two `open` events, but since the pre-connect feature was added, if the client has the servers `id` in it's cache and populates the `closestNodes` with it, it will immediately connect to the server via the relay node simultaneously. This allows the encrypted stream to `open` twice instead of the normal 1 time because the connection to the relay has opened but the connection from the relay to the server will error when a duplicate connection is detected. * Split out logic to detect if `FIND_PEER` node is an existing `closestNode` Moved to a function with a simple for loop instead of using `.find()`. * Use `preConnect` bool to set number of semaphore waits Without this check, 3 query nodes would attempt connecting at once if `closestNodes` is an empty array. Pre-connnect wouldn't run so they should be the same. * Rename `closestNodes` to `relayAddresses` in `findAndConnect()` Often `closestNodes` & `relayAddresses` are the same, but they are distinct. `closestNodes` are DHT nodes that are closest (via XOR metric) to the target. `relayAddresses` are nodes that will serve as the relay node for doing the handshake with a given peer. --- lib/connect.js | 49 +++++++++++++++++++++++++++++++++++++++---------- test/pool.js | 8 ++++++++ 2 files changed, 47 insertions(+), 10 deletions(-) diff --git a/lib/connect.js b/lib/connect.js index 1aba2acb..3dc18fdc 100644 --- a/lib/connect.js +++ b/lib/connect.js @@ -323,36 +323,51 @@ async function holepunch(c, opts) { } } +async function connectThroughNodes(c, addresses, socket) { + for (const address of addresses) { + if (isDone(c) || c.connect) return + + c.remoteRelayAddresses.push(address) + await connectThroughNode(c, address, socket) + } +} + async function findAndConnect(c, opts) { let attempts = 0 - let closestNodes = opts.relayAddresses && opts.relayAddresses.length ? opts.relayAddresses : null + let relayAddresses = + opts.relayAddresses && opts.relayAddresses.length ? opts.relayAddresses : null - if (!closestNodes) { + if (!relayAddresses) { const cachedRelayAddresses = c.dht._relayAddressesCache.get(c.id) - if (cachedRelayAddresses) closestNodes = cachedRelayAddresses + if (cachedRelayAddresses) relayAddresses = cachedRelayAddresses } if (c.dht._persistent) { // check if we know the route ourself... const route = c.dht._router.get(c.target) if (route && route.relay !== null) { - closestNodes = [{ host: route.relay.host, port: route.relay.port }] + relayAddresses = [{ host: route.relay.host, port: route.relay.port }] } } // 2 is how many parallel connect attempts we want to do, we can make this configurable - const sem = new Semaphore(2) + const preConnect = relayAddresses !== null && relayAddresses.length > 0 + const sem = new Semaphore(preConnect ? 3 : 2) const signal = sem.signal.bind(sem) - const tries = closestNodes !== null ? 2 : 1 + const tries = relayAddresses !== null ? 2 : 1 + + if (preConnect) { + await sem.wait() + connectThroughNodes(c, relayAddresses, null).then(signal, signal) + } try { for (let i = 0; i < tries && !isDone(c) && !c.connect; i++) { c.query = c.dht.findPeer(c.target, { hash: false, session: c.session, - closestNodes, - onlyClosestNodes: closestNodes !== null, - retries: closestNodes ? 1 : 3 + nodes: relayAddresses, + retries: 3 }) for await (const data of c.query) { @@ -364,12 +379,18 @@ async function findAndConnect(c, opts) { break } + // Skip node already run via preConnect + if (preConnect && relayAddresses && isRelayAddress(relayAddresses, data)) { + sem.signal() + continue + } + c.remoteRelayAddresses.push(data.from) attempts++ connectThroughNode(c, data.from, null).then(signal, signal) } - closestNodes = null + relayAddresses = null if (attempts > 0) await sem.flush() } @@ -855,4 +876,12 @@ function selectRelay(relayThrough) { return relayThrough } +function isRelayAddress(relayAddresses, data) { + for (const node of relayAddresses) { + if (node.host === data.from.host && node.port === data.from.port) return true + } + + return false +} + function noop() {} diff --git a/test/pool.js b/test/pool.js index bcc8b1c2..6712e1ea 100644 --- a/test/pool.js +++ b/test/pool.js @@ -46,12 +46,16 @@ test('connection pool, server side', async function (t) { const open = t.test('open') open.plan(2) + let atLeastOneOpen = false { const socket = b.connect(server.publicKey) socket .on('open', () => { + if (atLeastOneOpen) return + open.pass('1st stream opened') + atLeastOneOpen = true }) .on('error', () => { open.pass('1st stream errored') @@ -62,7 +66,10 @@ test('connection pool, server side', async function (t) { const socket = b.connect(server.publicKey) socket .on('open', () => { + if (atLeastOneOpen) return + open.pass('2nd stream opened') + atLeastOneOpen = true }) .on('error', () => { open.pass('2nd stream errored') @@ -71,6 +78,7 @@ test('connection pool, server side', async function (t) { } await open + t.ok(atLeastOneOpen, 'verify one client opened') await server.close() })