diff --git a/index.js b/index.js index 707aac0..717d586 100644 --- a/index.js +++ b/index.js @@ -1,6 +1,7 @@ const BSON = require('bson') -const { ObjectID } = BSON +const { ObjectId } = BSON const cbor = require('cbor') +const SubEncoder = require('sub-encoder') // Version of the indexing algorithm // Will be incremented for breaking changes @@ -67,9 +68,29 @@ class Collection { constructor (name, bee) { this.name = name this.bee = bee - this.docs = bee.sub('doc') - this.idxs = bee.sub('idxs') - this.idx = bee.sub('idx') + this.enc = new SubEncoder() + + this.idxEncoding = this.enc.sub('idx') + this.idxsEncoding = this.enc.sub('idxs') + this.docsEncoding = this.enc.sub('docs') + + // this.watching() + } + + // watching () { + // this.watcher = this.bee.watch({ keyEncoding: this.docsEncoding }) + // } + + async insertOne (data) { + const doc = await this.insert(data) + return Object.assign(data, doc) + } + + async insertMany (data) { + for (const item of data) { + await this.insertOne(item) + } + return data } async insert (rawDoc) { @@ -78,33 +99,43 @@ class Collection { if (!doc._id) { doc = { ...doc, - _id: new ObjectID() + _id: new ObjectId() } } // Get _id as buffer const key = doc._id.id - const exists = await this.docs.get(key) + const exists = await this.bee.get(key, { keyEncoding: this.docsEncoding }) if (exists) throw new Error('Duplicate Key error, try using .update?') const value = BSON.serialize(doc) - await this.docs.put(key, value) + await this.bee.put(key, value, { keyEncoding: this.docsEncoding }) const indexes = await this.listIndexes() for (const { fields, name } of indexes) { // TODO: Cache index subs - const bee = this.idx.sub(name) - await this._indexDocument(bee, fields, doc) + const enc = this.idxEncoding.sub(name) + await this._indexDocument(enc, fields, doc) } return doc } + async updateOne (query = {}, data, options) { + await this.update(query, data, options) + return this.findOne(query) + } + + async updateMany (query = {}, data, options) { + await this.update(query, data, options) + return this.findMany(query) + } + async update (query = {}, update = {}, options = {}) { const { upsert = false, @@ -130,14 +161,14 @@ class Collection { const key = doc._id.id const value = BSON.serialize(newDoc) - await this.docs.put(key, value) + await this.bee.put(key, value, { keyEncoding: this.docsEncoding }) for (const { fields, name } of indexes) { // TODO: Cache index subs - const bee = this.idx.sub(name) + const enc = this.idxEncoding.sub(name) - await this._deIndexDocument(bee, fields, doc) - await this._indexDocument(bee, fields, newDoc) + await this._deIndexDocument(enc, fields, doc) + await this._indexDocument(enc, fields, newDoc) } nModified++ } @@ -146,7 +177,7 @@ class Collection { const initialDoc = {} for (const queryField of Object.keys(query)) { const queryValue = query[queryField] - if ('$eq' in queryValue) initialDoc[queryField] = queryValue.$eq + if (isQueryObject(queryValue) && '$eq' in queryValue) initialDoc[queryField] = queryValue.$eq else if (!isQueryObject(queryValue)) initialDoc[queryField] = queryValue } @@ -165,18 +196,24 @@ class Collection { async findOne (query = {}) { const results = await (this.find(query).limit(1)) - const [doc] = results + return doc || null + } - if (!doc) throw new Error('not found') - - return doc + findMany (query, { sort = null } = {}) { + const q = this.find(query) + return sort ? q.sort(...parseSort(sort)) : q } find (query = {}) { return new Cursor(query, this) } + watch (_id, opts) { + // const key = _id.id + // return this.docsEncoding.watch() + } + async createIndex (fields, { rebuild = false, version = INDEX_VERSION, ...opts } = {}) { const name = fields.join(',') const exists = await this.indexExists(name) @@ -197,7 +234,7 @@ class Collection { opts } - await this.idxs.put(name, BSON.serialize(index)) + await this.bee.put(name, BSON.serialize(index), { keyEncoding: this.idxsEncoding }) await this.reIndex(name) @@ -205,12 +242,12 @@ class Collection { } async indexExists (name) { - const exists = await this.idxs.get(name) + const exists = await this.bee.get(name, { keyEncoding: this.idxsEncoding }) return exists !== null } async getIndex (name) { - const data = await this.idxs.get(name) + const data = await this.bee.get(name, { keyEncoding: this.idxsEncoding }) if (!data) throw new Error('Invalid index') return BSON.deserialize(data.value) } @@ -218,36 +255,36 @@ class Collection { async reIndex (name) { const { fields } = await this.getIndex(name) // TODO: Cache index subs - const bee = this.idx.sub(name) + const enc = this.idxEncoding.sub(name) for await (const doc of this.find()) { - await this._indexDocument(bee, fields, doc) + await this._indexDocument(enc, fields, doc) } } // This is a private API, don't depend on it - async _indexDocument (bee, fields, doc) { + async _indexDocument (enc, fields, doc) { if (!hasFields(doc, fields)) return const idxValue = doc._id.id - const batch = bee.batch() + const batch = this.bee.batch() for (const flattened of flattenDocument(doc, fields)) { const idxKey = makeIndexKeyV2(flattened, fields) - await batch.put(idxKey, idxValue) + await batch.put(idxKey, idxValue, { keyEncoding: enc }) } await batch.flush() } - async _deIndexDocument (bee, fields, doc) { + async _deIndexDocument (enc, fields, doc) { if (!hasFields(doc, fields)) return - const batch = bee.batch() + const batch = this.bee.batch() for (const flattened of flattenDocument(doc, fields)) { const idxKey = makeIndexKeyV2(flattened, fields) - await batch.del(idxKey) + await batch.del(idxKey, { keyEncoding: enc }) } await batch.flush() @@ -255,7 +292,7 @@ class Collection { // TODO: Cache indexes? async listIndexes () { - const stream = this.idxs.createReadStream() + const stream = this.bee.createReadStream({ keyEncoding: this.idxsEncoding }) const indexes = [] for await (const { value } of stream) { @@ -401,11 +438,11 @@ class Cursor { } async * [Symbol.asyncIterator] () { - if (this.query._id && (this.query._id instanceof ObjectID)) { + if (this.query._id && (this.query._id instanceof ObjectId)) { // Doc IDs are unique, so we can query against them without doing a search const key = this.query._id.id - const found = await this.collection.docs.get(key) + const found = await this.collection.bee.get(key, { keyEncoding: this.collection.docsEncoding }) // Exit premaurely @@ -495,7 +532,8 @@ class Cursor { lt[lt.length - 1] = 0xFF } - const stream = this.collection.idx.sub(index.name).createReadStream(opts) + const enc = this.collection.idxEncoding.sub(index.name) + const stream = this.collection.bee.createReadStream({ keyEncoding: enc, ...opts }) for await (const { key, value: rawId } of stream) { const keyDoc = makeDocFromIndex(key, index.fields) @@ -503,7 +541,7 @@ class Cursor { // Test the fields agains the index to avoid fetching the doc if (!matchesQuery(keyDoc, subQuery)) continue - const { value: rawDoc } = await this.collection.docs.get(rawId) + const { value: rawDoc } = await this.collection.bee.get(rawId, { keyEncoding: this.collection.docsEncoding }) const doc = BSON.deserialize(rawDoc) // TODO: Avoid needing to double-process the values @@ -514,7 +552,7 @@ class Cursor { } } else if (sort === null) { // If we aren't sorting, and don't have an index, iterate over all docs - const stream = this.collection.docs.createReadStream() + const stream = this.collection.bee.createReadStream({ keyEncoding: this.collection.docsEncoding }) for await (const { value: rawDoc } of stream) { // TODO: Can we avoid iterating over keys that should be skipped? @@ -531,6 +569,18 @@ class Cursor { } } +function parseSort (key = '') { + if (Array.isArray(key)) { + const keys = key.map(parseSort) + return Object.assign(...keys) + } + + const [sign] = key.match(/^[+-]/) || [] + if (sign) key = key.substring(1) + const dir = sign === '-' ? -1 : 1 + return [key, dir] +} + function performUpdate (doc, update) { if (Array.isArray(update)) { return update.reduce(performUpdate, doc) @@ -773,8 +823,8 @@ function makeIndexKeyV2 (doc, fields, allFields = fields) { // CBOR encode fields const keyValues = fields.map((field) => { const value = doc[field] - // Detect ObjectID - if (value instanceof ObjectID) { + // Detect ObjectId + if (value instanceof ObjectId) { return value.id } return value @@ -809,7 +859,7 @@ function makeDocFromIndexV2 (key, fields) { const field = fields[index] || '_id' if (Buffer.isBuffer(value) && value.length === 12) { try { - doc[field] = new ObjectID(value) + doc[field] = new ObjectId(value) } catch { doc[field] = value } diff --git a/package.json b/package.json index 96d65f9..a9eeb05 100644 --- a/package.json +++ b/package.json @@ -25,14 +25,15 @@ }, "homepage": "https://github.com/RangerMauve/hyperbeedeebee#readme", "dependencies": { - "bson": "^4.3.0", - "cbor": "^8.1.0" + "bson": "^5.0.0", + "cbor": "^8.1.0", + "sub-encoder": "^2.1.3" }, "devDependencies": { - "hyperbee": "^1.5.4", - "hypercore": "^9.9.1", - "random-access-memory": "^3.1.2", - "standard": "^16.0.3", - "tape": "^5.2.2" + "hyperbee": "^2.16.0", + "hypercore": "^10.7.0", + "random-access-memory": "^6.1.0", + "standard": "^17.0.0", + "tape": "^5.6.3" } } diff --git a/test.js b/test.js index 56ae983..6972f11 100644 --- a/test.js +++ b/test.js @@ -6,7 +6,8 @@ const HyperbeeDeeBee = require('./') const { DB } = HyperbeeDeeBee function getBee () { - return new Hyperbee(new Hypercore(RAM)) + const core = new Hypercore(RAM) + return new Hyperbee(core, { extension: false }) } test('Create a document in a collection', async (t) => {