Level / level

Universal abstract-level database for Node.js and browsers.
MIT License
1.55k stars 106 forks source link

Live streaming values from a level #222

Closed ralphtheninja closed 1 year ago

ralphtheninja commented 2 years ago

@vweevers If I wanted to have some sort of live streaming as keys are inserted into the db. How would go about doing that with the new level implementation? (i.e. no createReadStream())

vweevers commented 2 years ago

For a quick solution, you could fork level-live and replace its db.createReadStream(opts).on('data', ..) line with const { EntryStream } = require('level-read-stream') and new EntryStream(db, opts).on('data', ..). Alternatively, replace that part with an iterator, or remove it altogether if you're only interested in the live part (which should still work the same because level-live uses db events for that).

vweevers commented 2 years ago

It might also be interesting to consider an approach that solely uses async iterators. Along the lines of:

const { on } = require('events')

async function* live (db) {
  // Current entries
  for await (const [key, value] of db.iterator()) {
    yield [key, value]
  }

  // Live entries (this is missing logic for batch and del)
  for await (const [key, value] of on(db, 'put')) {
    yield [key, value]
  }
}
ralphtheninja commented 2 years ago

It might also be interesting to consider an approach that solely uses async iterators.

This looks really clean. I take it async function* is the pattern for returning back an async iterator. Yeah, sorry, I have hardly used them before.

vweevers commented 2 years ago

Yeah, that function is an async generator. PS. My example looks too good to be true; it needs some form of buffering to capture events emitted while it's busy iterating db.iterator().

vweevers commented 1 year ago

A couple of things I want to do that will benefit live streams/iterators:

We can then support aligning the encoding of live data with the encoding of the iterator. E.g. if you do live(db, { valueEncoding: 'json' }) but then a db.put(key, value, { valueEncoding: 'buffer' }), the resulting 'batch' event includes the necessary information to transcode from buffer to json. Roughly like so:

const valueEncoding = db.valueEncoding('json')
const iterator = db.iterator({ valueEncoding })

db.on('batch', (operations) => {
  for (const op of operations) {
    if (op.valueEncoding.commonName === valueEncoding.commonName) {
      // Public data matches desired encoding
      const value = op.value
    } else if (op.valueEncoding.format === valueEncoding.commonName) {
      // Private data matches desired encoding
      const value = op.encodedValue
    } else {
      // Decode private data (one of view, buffer, utf8) to match desired encoding
      const transcoder = valueEncoding.createTranscoder(op.valueEncoding.format)
      const value = transcoder.decode(op.encodedValue)
    }
  }
})

That last step there (createTranscoder()) will just need a small utility method in level-transcoder:

Click to expand ```js function createTranscoder (format) { if (format === 'view') { return this.createViewTranscoder() } else if (format === 'buffer') { return this.createBufferTranscoder() } else if (format === 'utf8') { return this.createUTF8Transcoder() } else { throw new Error('nope') } } ```
vweevers commented 1 year ago

Having that private data also means ltgt logic (like level-live has) can work regardless of the user's choice of encoding, because it can compare buffers and strings rather than public data with arbitrary types.