verymodel-riak v0.12.1
verymodel-riak
Riak extensions for VeryModel
- Author: Aaron McCall aaron@andyet.net
- Version: 0.11.3
- License: MIT
Examples
Using only default functionality
var VeryRiakModel = require('verymodel-riak').VeryRiakModel;
Define our fields
var MyDef = {
first_name: {},
last_name: { index: true },
name: {
private: true,
derive: function () { return this.first_name + ' ' + this.last_name; }
},
age: { index: { integer: true } },
gender: { index: true, private: true },
city: {},
state: { index: true},
zip: { index: { integer: true } },
model: { default: 'person', required: true, private: true, static: true},
favorite_foods: { index: { isArray: true } }
};
Define our indexes
var MyOptions = {
allKey: 'model',
bucket: "test:bucket"
};
Init our model factory
var MyModel = new VeryRiakModel(MyDef, MyOptions);
Create a model instance
var myInstance = MyModel.create({
first_name: 'Bill',
last_name: 'Jones',
age: 40,
gender: 'm',
city: 'Atlanta',
state: 'GA',
zip: 30303,
favorite_foods: ['pizza', 'fried chicken', 'applesauce', 'cake']
});
myInstance.save(); // stores the instance's data to Riak
myInstance.indexes will return:
[
{key: 'last_name_bin', value: 'Jones'},
{key: 'age_int', value: 40},
{key: 'gender_bin', value: 40},
{key: 'state_bin', value: 'GA'},
{key: 'zip_int', value: 30303},
{key: 'model_bin', value: 'person'}
{key: 'favorite_foods_bin', value: 'pizza'}
{key: 'favorite_foods_bin', value: 'fried chicken'}
{key: 'favorite_foods_bin', value: 'applesauce'}
{key: 'favorite_foods_bin', value: 'cake'}
]
myInstance.value will return:
{
first_name: data[0].first_name,
last_name: 'Jones',
city: 'Atlanta',
state: 'GA',
zip: 30303
}
Defaults
var _ = require('underscore');
var indexes = require('./indexes');
var request = require('./request_helpers.js');
var streams = require('./streams');
module.exports = {
definition
definition: {
id: The "id" field is where we'll store the Riak key. By default it will just be an unvalidated, public field
id: {},
indexes: We'll need a way to retrieve all of the fields that should be indexed, so by default that will be all of the fields defined as indexes
indexes: {
private: true,
derive: indexes.derive
},
value: By default, all non-private fields that aren't the id are expected to be the value payload
value: {
private: true,
derive: function () {
var model = this.__verymeta.model;
if (!model.options.values) model.options.values = _.compact(_.map(model.definition, function (def, key) {
var isKeyField = key === model.options.keyField;
var isAllKey = key === model.options.allKey;
By default we'll use all of the public fields except id
return (def.private || isKeyField || isAllKey) ? false : key;
}));
return _.pick(this, model.options.values);
}
}
},
model methods
methods: {
getAllKey: Allows us to share a bucket between different model types
getAllKey: function () {
if (this.options.allKey === '$bucket') {
return {key: this.options.allKey, def: {default: this.getBucket()}};
}
var allKeyDef = this.options.allKey && this.definition[this.options.allKey],
default + required ensures that the allKey is always populated private ensures it's not stored as part of the object's data static ensures that the default value is not overwritten
allKeyIsValid = allKeyDef && (allKeyDef.default &&
allKeyDef.required && allKeyDef.static);
if (allKeyDef && allKeyIsValid) {
return { key: this.options.allKey + '_bin', def: allKeyDef};
}
},
getBucket: Returns default bucket optionally appending an additional namespace
getBucket: function (append) {
var bucket = this.options.bucket;
if (!bucket && this.options.riak) bucket = this.options.riak.bucket;
if (bucket && !append) return bucket;
if (bucket && append) return [bucket, append].join(this.options.namespaceSeparator||"::");
throw new Error('Please set a Riak bucket via options.bucket');
},
getLogger: Returns specified logger if defined or creates one and returns it
getLogger: function _logger() {
return this.options.logger || (this.options.logger = require('bucker').createNullLogger());
},
getRequest: Builds Riak request objects. Signature varies according to type and developer preference. Supported request types are: del, get, index, mapreduce, and search.
- Build request to get a single object by its key:
- type and key:
model.getRequest('get', 'my-riak-key')
or - type and object:
model.getRequest('get', {key: 'my-riak-key'})
- type and key:
- Build request to get a list of keys for an index exact match:
- type, index, key:
model.getRequest('index', 'my_index_bin', 'foo')
or - type and object:
model.getRequest('index', {index: 'my_index_bin', key: 'foo'})
- type, index, key:
- Build request to get a list of keys via an index range search:
- type, index, min, max:
model.getRequest('index', 'my_index_bin', 'bar', 'foo')
or - type and object:
model.getRequest('index', {index: 'my_index_bin', range_min: 'bar', range_max: 'foo'})
- type, index, min, max:
- Build request for mapreduce:
- type, index, key, query array:
model.getRequest('mapreduce', 'my_index', 'my_key', […map/reduce phases…])
or - type and object:
model.getRequest('mapreduce', {inputs: …my inputs…, query: […map/reduce phases…]})
or - type, inputs array, query array:
model.getRequest('mapreduce', […my inputs…], […map/reduce phases…])
- type, index, key, query array:
- Build request to search:
- type, index, q:
model.getRequest('search', 'my_index', 'name:Bill')
or - type and object:
model.getRequest('search', {index: 'my_index', q: 'name:Bill'})
- type, index, q:
- Finally, any type of request can be created according to the following format:
model.getRequest({ type: 'index', options: {index: 'my_index_bin', key: 'foo'}})
where type is any one of the types listed and options
getRequest: function (type) {
if (_.isObject(type) && type.type && type.options) {
return request[type.type](this, [type.options]);
}
return request[type](this, _.rest(arguments));
},
_indexQuery: Index query wrapper that returns a stream
_indexQuery: function () {
var args = _.rest(arguments, 0);
args.unshift('index');
var request = this.getRequest.apply(this, args);
Return the readable stream
return this.getClient().getIndex(request);
},
all: Streams or calls back with all instances of this model that are stored in Riak or an index-filtered set of them, depending on whether filtering args are passed. If the first argument is a function, it will be called with the result.
all: function () {
var args = _.rest(arguments, 0),
streaming = typeof args[0] !== 'function',
cb = !streaming ? args[0] : null,
requestArgs = _.rest(args, (streaming ? 0 : 1)),
bucket;
if (args.length > 1 && typeof args[1] === 'object') {
bucket = args[1].bucket;
}
var logger = this.getLogger();
logger.debug('query prepared: %j, streaming: %s', requestArgs, streaming);
All stream handling is done via a Transform stream that receives our key stream and transmits instances
var stream = this._indexQuery.apply(this, requestArgs),
streamOpts = {model: this, bucket: bucket};
return stream.pipe(new streams.KeyToValueStream(_.defaults({}, streamOpts)))
.pipe(new streams.InstanceStream(_.defaults({callback: cb}, streamOpts)));
},
find: Simplifies index lookups and can be called with the values or options object signatures.
- values: find('index', 'key_or_min', 'max',)
- options object (shown with range query—-substitute key for range_min/range_max for exact match): find({index: 'index', range_min: 'min', range_max: 'max'}, function (err, instance))
find: function (index) {
var args = _.rest(arguments),
hasCb = typeof _.last(args) === 'function',
cb = hasCb && args.pop();
if (typeof index === 'string' && !index.match(/_(bin|int)$/)) {
index = indexes.getName(index, this.definition[index]);
}
args.unshift(index);
if (cb) {
args.unshift(cb);
}
return this.all.apply(this, args);
},
find: searches for matching indexesToData: Reformats indexes from Riak so that they can be applied to model instances
indexesToData: indexes.rehydrate,
replyToData: Reformats riak reply into the appropriate format to feed into an instance's loadData method
replyToData: function (reply) {
this.getLogger().debug('Function [replyToData]: %j', reply);
if (!reply || !reply.content) {
return {};
}
var content = (reply.content.length > 1) ? this.options.resolveSiblings(reply.content) : reply.content[0];
reformat our data for VeryModel
var indexes = {};
indexes = this.indexesToData(content.indexes);
var data = _.extend(content.value, indexes);
if (reply.key) {
data[this.options.keyField] = reply.key;
}
if (reply.vclock) {
data.vclock = reply.vclock;
}
return data;
},
_getQuery: function (id, bucket, cb) {
var reqArgs = ['get', id];
if (typeof bucket === 'function' && typeof cb === 'undefined') {
cb = bucket;
bucket = undefined;
}
if (bucket) reqArgs.push(bucket);
this.getClient().get(this.getRequest.apply(this, reqArgs), cb);
},
_getInstance: function (id, reply, bucket) {
Resolve siblings, if necessary, or just grab our content
var data = this.replyToData(reply);
data[this.options.keyField] = id;
if (bucket && this.definition.bucket) data.bucket = bucket;
var instance = this.create(data);
if (bucket && !instance.bucket) instance.bucket = bucket;
return instance;
},
load: Load an object's data from Riak and creates a model instance from it.
load: function (id, bucket, cb) {
var self = this;
if (typeof bucket === 'function' && typeof cb === 'undefined') {
cb = bucket;
bucket = undefined;
}
this._getQuery(id, bucket, function (err, reply) {
if (err||_.isEmpty(reply)) return cb(err||new Error('No matching key found.'));
self._last = self._getInstance(id, reply, bucket);
Override default toJSON method to make more Hapi compatible
if (typeof cb === 'function') {
cb(null, self._last);
}
});
},
remove: Remove an instance from Riak
delete: function (id, bucket, cb) {
var self = this;
if (typeof bucket === 'function' && typeof cb === 'undefined') {
cb = bucket;
bucket = undefined;
}
this.getLogger().debug('request to delete account(%s)', id);
this.getClient().del(this.getRequest('del', id, bucket), function (err) {
if (err) return cb(err);
self.getLogger().debug('successfully deleted account');
cb();
});
},
backwards compatibility
remove: module.exports.methods.delete
},
options
options: {
- Default allKey is Riak's magic 'give me all the keys' index
allKey: '$bucket',
- Default key field is id
keyField: 'id',
- pagination is on by default to prevent overloading the server
max_results: 100,
paginate: true,
- Default sibling handler is "last one wins"
resolveSiblings: function (siblings) {
return _.max(siblings, function (sibling) {
return parseFloat(sibling.last_mod + '.' + sibling.last_mod_usecs);
});
}
},
instanceMethods
instanceMethods: {
wrapper around this instance's
delete: function (callback) {},
prepare: Prepare a Riak request object from this instance.
prepare: function () {
var content = {
value: JSON.stringify(this.value),
content_type: 'application/json'
};
var indexes = this.indexes;
if (indexes.length) content.indexes = indexes;
var payload = {
content: content,
bucket: this.getBucket(),
return_body: true
};
if (this.id) {
payload.key = this.id;
}
if (this.vclock) {
payload.vclock = this.vclock;
}
return payload;
},
save: Put this instance to Riak. May be called with an options object, currently only for the purpose of passing { validate: false } to bypass validation
save: function (cb, opts) {
if (!opts || opts.validate !== false) {
var errors = this.doValidate();
if (errors.length) return cb(errors);
}
var self = this;
var logger = this.getLogger();
var payload = this.prepare();
logger.debug('save payload: %j', payload);
this.getClient().put(payload, function (err, reply) {
logger.debug('riak put %s', (err == null ? 'succeeded' : ('failed: ' + err)));
if (!err) {
if (!self.id && reply.key) self.id = reply.key;
if (reply.vclock) self.vclock = reply.vclock;
}
if (reply.content.length > 1 && typeof cb !== 'boolean') {
self.loadData(self.__verymeta.model.replyToData(reply));
The boolean arg prevents a race condition when reply.content.length continues to be > 1
self.save(true);
}
if (typeof cb === 'function') {
cb(err, self);
}
});
},
getClient: Proxy method to get the Riak client from model
getClient: function () { return this.__verymeta.model.getClient(); },
getBucket: return instance-level bucket property or fall back to model's getBucket
getBucket: function () {
return (typeof this.bucket !== 'undefined') ? this.bucket : this.__verymeta.model.getBucket();
},
getLogger: return this instance's logger (if defined) or fall back to model's getLogger
getLogger: function () {
return (typeof this.logger !== 'undefined') ? this.logger : this.__verymeta.model.getLogger();
}
}
};
Add some logging
var logify = function (obj, name) {
if (typeof obj[name] !== 'function') return;
var method = obj[name];
obj[name] = _.wrap(method, function (method) {
this.getLogger().debug('Function [%s]', name);
return method.apply(this, _.rest(_.toArray(arguments)));
});
};
['load', 'remove', 'find', 'all'].forEach(_.partial(logify, module.exports.methods));
logify(module.exports.instanceMethods, 'save');
logify(module.exports.instanceMethods, 'prepare');
Acknowledgements
- First and foremost, thanks to @fritzy for making VeryModel without which verymodel-riak would be non-existent or pointless.
- Contributors:
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago