Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
126 changes: 88 additions & 38 deletions index.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
const BSON = require('bson')
const { ObjectID } = BSON
const { ObjectId } = BSON
const cbor = require('cbor')
const SubEncoder = require('sub-encoder')

// Version of the indexing algorithm
// Will be incremented for breaking changes
Expand Down Expand Up @@ -67,9 +68,29 @@ class Collection {
constructor (name, bee) {
this.name = name
this.bee = bee
this.docs = bee.sub('doc')
this.idxs = bee.sub('idxs')
this.idx = bee.sub('idx')
this.enc = new SubEncoder()

this.idxEncoding = this.enc.sub('idx')
this.idxsEncoding = this.enc.sub('idxs')
this.docsEncoding = this.enc.sub('docs')

// this.watching()
}

// watching () {
// this.watcher = this.bee.watch({ keyEncoding: this.docsEncoding })
// }

async insertOne (data) {
const doc = await this.insert(data)
return Object.assign(data, doc)
}

async insertMany (data) {
for (const item of data) {
await this.insertOne(item)
}
return data
}

async insert (rawDoc) {
Expand All @@ -78,33 +99,43 @@ class Collection {
if (!doc._id) {
doc = {
...doc,
_id: new ObjectID()
_id: new ObjectId()
}
}

// Get _id as buffer
const key = doc._id.id

const exists = await this.docs.get(key)
const exists = await this.bee.get(key, { keyEncoding: this.docsEncoding })

if (exists) throw new Error('Duplicate Key error, try using .update?')

const value = BSON.serialize(doc)

await this.docs.put(key, value)
await this.bee.put(key, value, { keyEncoding: this.docsEncoding })

const indexes = await this.listIndexes()

for (const { fields, name } of indexes) {
// TODO: Cache index subs
const bee = this.idx.sub(name)

await this._indexDocument(bee, fields, doc)
const enc = this.idxEncoding.sub(name)
await this._indexDocument(enc, fields, doc)
}

return doc
}

async updateOne (query = {}, data, options) {
await this.update(query, data, options)
return this.findOne(query)
}

async updateMany (query = {}, data, options) {
await this.update(query, data, options)
return this.findMany(query)
}

async update (query = {}, update = {}, options = {}) {
const {
upsert = false,
Expand All @@ -130,14 +161,14 @@ class Collection {
const key = doc._id.id
const value = BSON.serialize(newDoc)

await this.docs.put(key, value)
await this.bee.put(key, value, { keyEncoding: this.docsEncoding })

for (const { fields, name } of indexes) {
// TODO: Cache index subs
const bee = this.idx.sub(name)
const enc = this.idxEncoding.sub(name)

await this._deIndexDocument(bee, fields, doc)
await this._indexDocument(bee, fields, newDoc)
await this._deIndexDocument(enc, fields, doc)
await this._indexDocument(enc, fields, newDoc)
}
nModified++
}
Expand All @@ -146,7 +177,7 @@ class Collection {
const initialDoc = {}
for (const queryField of Object.keys(query)) {
const queryValue = query[queryField]
if ('$eq' in queryValue) initialDoc[queryField] = queryValue.$eq
if (isQueryObject(queryValue) && '$eq' in queryValue) initialDoc[queryField] = queryValue.$eq
else if (!isQueryObject(queryValue)) initialDoc[queryField] = queryValue
}

Expand All @@ -165,18 +196,24 @@ class Collection {

async findOne (query = {}) {
const results = await (this.find(query).limit(1))

const [doc] = results
return doc || null
}

if (!doc) throw new Error('not found')

return doc
findMany (query, { sort = null } = {}) {
const q = this.find(query)
return sort ? q.sort(...parseSort(sort)) : q
}

find (query = {}) {
return new Cursor(query, this)
}

watch (_id, opts) {
// const key = _id.id
// return this.docsEncoding.watch()
}

async createIndex (fields, { rebuild = false, version = INDEX_VERSION, ...opts } = {}) {
const name = fields.join(',')
const exists = await this.indexExists(name)
Expand All @@ -197,65 +234,65 @@ class Collection {
opts
}

await this.idxs.put(name, BSON.serialize(index))
await this.bee.put(name, BSON.serialize(index), { keyEncoding: this.idxsEncoding })

await this.reIndex(name)

return name
}

async indexExists (name) {
const exists = await this.idxs.get(name)
const exists = await this.bee.get(name, { keyEncoding: this.idxsEncoding })
return exists !== null
}

async getIndex (name) {
const data = await this.idxs.get(name)
const data = await this.bee.get(name, { keyEncoding: this.idxsEncoding })
if (!data) throw new Error('Invalid index')
return BSON.deserialize(data.value)
}

async reIndex (name) {
const { fields } = await this.getIndex(name)
// TODO: Cache index subs
const bee = this.idx.sub(name)
const enc = this.idxEncoding.sub(name)

for await (const doc of this.find()) {
await this._indexDocument(bee, fields, doc)
await this._indexDocument(enc, fields, doc)
}
}

// This is a private API, don't depend on it
async _indexDocument (bee, fields, doc) {
async _indexDocument (enc, fields, doc) {
if (!hasFields(doc, fields)) return
const idxValue = doc._id.id

const batch = bee.batch()
const batch = this.bee.batch()

for (const flattened of flattenDocument(doc, fields)) {
const idxKey = makeIndexKeyV2(flattened, fields)
await batch.put(idxKey, idxValue)
await batch.put(idxKey, idxValue, { keyEncoding: enc })
}

await batch.flush()
}

async _deIndexDocument (bee, fields, doc) {
async _deIndexDocument (enc, fields, doc) {
if (!hasFields(doc, fields)) return

const batch = bee.batch()
const batch = this.bee.batch()

for (const flattened of flattenDocument(doc, fields)) {
const idxKey = makeIndexKeyV2(flattened, fields)
await batch.del(idxKey)
await batch.del(idxKey, { keyEncoding: enc })
}

await batch.flush()
}

// TODO: Cache indexes?
async listIndexes () {
const stream = this.idxs.createReadStream()
const stream = this.bee.createReadStream({ keyEncoding: this.idxsEncoding })
const indexes = []

for await (const { value } of stream) {
Expand Down Expand Up @@ -401,11 +438,11 @@ class Cursor {
}

async * [Symbol.asyncIterator] () {
if (this.query._id && (this.query._id instanceof ObjectID)) {
if (this.query._id && (this.query._id instanceof ObjectId)) {
// Doc IDs are unique, so we can query against them without doing a search
const key = this.query._id.id

const found = await this.collection.docs.get(key)
const found = await this.collection.bee.get(key, { keyEncoding: this.collection.docsEncoding })

// Exit premaurely

Expand Down Expand Up @@ -495,15 +532,16 @@ class Cursor {
lt[lt.length - 1] = 0xFF
}

const stream = this.collection.idx.sub(index.name).createReadStream(opts)
const enc = this.collection.idxEncoding.sub(index.name)
const stream = this.collection.bee.createReadStream({ keyEncoding: enc, ...opts })

for await (const { key, value: rawId } of stream) {
const keyDoc = makeDocFromIndex(key, index.fields)

// Test the fields agains the index to avoid fetching the doc
if (!matchesQuery(keyDoc, subQuery)) continue

const { value: rawDoc } = await this.collection.docs.get(rawId)
const { value: rawDoc } = await this.collection.bee.get(rawId, { keyEncoding: this.collection.docsEncoding })
const doc = BSON.deserialize(rawDoc)

// TODO: Avoid needing to double-process the values
Expand All @@ -514,7 +552,7 @@ class Cursor {
}
} else if (sort === null) {
// If we aren't sorting, and don't have an index, iterate over all docs
const stream = this.collection.docs.createReadStream()
const stream = this.collection.bee.createReadStream({ keyEncoding: this.collection.docsEncoding })

for await (const { value: rawDoc } of stream) {
// TODO: Can we avoid iterating over keys that should be skipped?
Expand All @@ -531,6 +569,18 @@ class Cursor {
}
}

function parseSort (key = '') {
if (Array.isArray(key)) {
const keys = key.map(parseSort)
return Object.assign(...keys)
}

const [sign] = key.match(/^[+-]/) || []
if (sign) key = key.substring(1)
const dir = sign === '-' ? -1 : 1
return [key, dir]
}

function performUpdate (doc, update) {
if (Array.isArray(update)) {
return update.reduce(performUpdate, doc)
Expand Down Expand Up @@ -773,8 +823,8 @@ function makeIndexKeyV2 (doc, fields, allFields = fields) {
// CBOR encode fields
const keyValues = fields.map((field) => {
const value = doc[field]
// Detect ObjectID
if (value instanceof ObjectID) {
// Detect ObjectId
if (value instanceof ObjectId) {
return value.id
}
return value
Expand Down Expand Up @@ -809,7 +859,7 @@ function makeDocFromIndexV2 (key, fields) {
const field = fields[index] || '_id'
if (Buffer.isBuffer(value) && value.length === 12) {
try {
doc[field] = new ObjectID(value)
doc[field] = new ObjectId(value)
} catch {
doc[field] = value
}
Expand Down
15 changes: 8 additions & 7 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,15 @@
},
"homepage": "https://github.com/RangerMauve/hyperbeedeebee#readme",
"dependencies": {
"bson": "^4.3.0",
"cbor": "^8.1.0"
"bson": "^5.0.0",
"cbor": "^8.1.0",
"sub-encoder": "^2.1.3"
},
"devDependencies": {
"hyperbee": "^1.5.4",
"hypercore": "^9.9.1",
"random-access-memory": "^3.1.2",
"standard": "^16.0.3",
"tape": "^5.2.2"
"hyperbee": "^2.16.0",
"hypercore": "^10.7.0",
"random-access-memory": "^6.1.0",
"standard": "^17.0.0",
"tape": "^5.6.3"
}
}
3 changes: 2 additions & 1 deletion test.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@ const HyperbeeDeeBee = require('./')
const { DB } = HyperbeeDeeBee

function getBee () {
return new Hyperbee(new Hypercore(RAM))
const core = new Hypercore(RAM)
return new Hyperbee(core, { extension: false })
}

test('Create a document in a collection', async (t) => {
Expand Down