2.0.1 • Published 5 years ago

level-rpc-stream v2.0.1

Weekly downloads
3
License
MIT
Repository
github
Last release
5 years ago

level-rpc-stream

Control a level db over a duplex stream. If you're looking for a full network server/client leveldb solution check out level-rpc-server.

Installation

npm i --save level-rpc-stream

Usage

Examples

Create a LevelRPCStream

import createLevelStream from `level-rpc-stream`

const db = ... // any level db like require('level')('foo')

// create a duplex rpc stream for interacting with level
const stream = createLevelStream(db)

Perform Level Operations

Send a get request

To perform a get operation on level db using level-rpc-stream, you have to write an operation object (containing an id, operation, and args) to the stream. This will invoke the equivalent method on the level db instance, with the passed args.

// Get key data using stream, equivalent to `db.get('key')`
stream.write({
  id: requestId,
  op: OPERATIONS.GET,
  args: ['key'],
})

Send a get request full example

To receive a response from the level-rpc-stream you have to pipe to demux. demux splits the level-rpc-stream's readable stream into multiple substreams. All responses from leveldb promise operations are sent over the "response substream".

import createLevelStream, {
  demux,
  RESPONSE_SUBSTREAM_ID,
  OPERATIONS
} from `level-rpc-stream`
import uuid from 'uuid'

const db = ... // any level db
const stream = createLevelStream(db)

// perform a `get` operation and recieve a response
const requestId = uuid()
// receive response data over the level-rpc-stream substreams
stream.pipe(demux((substream) => {
  // all request responses are emitted over the "response substream"
  if (substream.id === RESPONSE_SUBSTREAM_ID) {
    // substream is the "response substream"
    substream.on('data', (data) => {
      if (data.id === requestId) {
        // recieved a response for the `get` operation
        // check if the get operation rejected with an error
        if (data.error) {
          // errors will Error instances
          throw data.error
        }
        // the get operation succeeded and resolved a result
        // if `get` resolves a buffer, the result will be a valid buffer
        console.log(data.result) // <Buffer 68 65 6c 6c 6f 20 77 6f 72 6c 64>
      }
    })
  }
}))

// Get key data using stream, equivalent to `db.get('key')`
stream.write({
  id: requestId, //unique request id string
  op: OPERATIONS.GET,
  args: ['key']
})

Send a createReadStream request full example

LevelDB supports streaming operations like createReadStream, createKeyStream, and createValueStream. level-rpc-stream supports streaming responses by using substreams. Each streaming response will create a new substream over the level-rpc-stream readable stream.

import createLevelStream, {
  demux,
  RESPONSE_SUBSTREAM_ID,
  OPERATIONS
} from `level-rpc-stream`
import uuid from 'uuid'

const db = ... // any level db
const stream = createLevelStream(db)

// perform a `createReadStream` operation and recieve it's stream response
const requestId = uuid()
// receive response data over the level-rpc-stream substreams
stream.pipe(demux((substream) => {
  // the `createReadStream` substream id will match the request's id
  if (substream.id === requestId) {
    // substream is a level db read stream, and will emit all the stream events: 'data', 'end', 'error', etc
    substream.on('data', (data) => {
      console.log(data)
      /*
      {
        key: <Buffer 66 6f 6f>,
        val: <Buffer 68 65 6c 6c 6f 20 77 6f 72 6c 64>
      }
      */
    })
  }
}))

// Write `createReadStream` operation
stream.write({
  id: requestId, //unique request id string
  op: OPERATIONS.RSTREAM,
  args: [{ /* createReadStream options */ }]
})

Documentation

createLevelStream(leveldb)

createLevelStream is the default export of this module. It accepts a leveldb instance and returns a level-rpc-stream. levelStream (level-rpc-stream instance), is a duplex stream. The writable stream is an object stream that accepts operations as objects, that invoke methods on leveldb instance. The readable stream is the multiplexed stream (has substreams) where responses and streams from level are read.

levelStream.write

levelStream.write is used to send operations to leveldb. The supported operations are PUT, GET, DEL, BATCH, RSTREAM (createReadStream), KSTREAM (createKeyStream), VSTREAM (createValueStream), and DSTREAM (allows you to destroy a level stream by id). All of these operations are formatted like { id, op, args }. id is the request id, op is the operation (use OPERATIONS), and args are the arguments passed to the leveldb associated method.

Operation Requests Examples

// PUT operation
levelStream.write({
  id: 'some_unique_id',
  op: OPERATIONS.PUT,
  args: ['key', 'val'],
})

// GET operation
levelStream.write({
  id: 'some_unique_id',
  op: OPERATIONS.GET,
  args: ['key'],
})

// DEL operation
levelStream.write({
  id: 'some_unique_id',
  op: OPERATIONS.DEL,
  args: ['key'],
})

// BATCH operation
levelStream.write({
  id: 'some_unique_id',
  op: OPERATIONS.BATCH,
  args: [
    [
      { type: 'del', key: 'key1' },
      { type: 'put', key: 'key2', value: 'value2' },
    ],
  ],
})

// BATCH operation
levelStream.write({
  id: 'some_unique_id',
  op: OPERATIONS.DSTREAM,
  args: [streamId],
})

Stream Requests Examples

For stream options checkout: levelup's docs

// RSTREAM operation
levelStream.write({
  id: 'some_unique_id',
  op: OPERATIONS.RSTREAM,
  args: [streamOpts],
})

// KSTREAM operation
levelStream.write({
  id: 'some_unique_id',
  op: OPERATIONS.KSTREAM,
  args: [streamOpts],
})

// VSTREAM operation
levelStream.write({
  id: 'some_unique_id',
  op: OPERATIONS.VSTREAM,
  args: [streamOpts],
})

demux

levelStreams readable stream is a multiplexed stream (has substreams). All operation request responses are can be read from the "response substream". The response substream's id is exported as RESPONSE_SUBSTREAM_ID. Stream operations (like RSTREAM) will create new substreams on the readable stream. Check out the examples above "Send a get request and recieve the response" and "Send a createReadStream request full example".

License

MIT