mongres v0.2.6
Mongres ETL System for Node.js
Synchronize PostgreSQL and MongoDB using Node.js
Mongres uses configurable modules containing database and operation definitions to perform arbitrary ETL (extract, transform, load) procedures. This project is still in development and so core fundamentals could be changed without notice. Contributions, comments, and issue reports are always welcome.
Installation
npm install mongresCommand Line Usage
Usage: node mongres.js [options] <file1 [file2 ...]>
-h, --help Display usage help
-v, --version Display version number
-d, --debug Enable debug mode
-q, --quiet Disable verbose output
-p, --period Specify periodic execution ( in sec )
-f, --file Specify module file to load ( default )Database Definitions
Databases can be defined and used almost interchangably with very minor differences. The database abstraction layer provides the following methods:
- queryArray (query, options, callback)
- Execute
queryusingoptionsif given and pass the result set as an array tocallbackfunction. queryformat for MongoDB is{collectionName: {field: value}}queryformat for PostgreSQL is{text: 'select foo from bar where foo between $1 and $2', values: [1,10]}callbacksignature isfunction (error, array)- Caution: all records will be loaded into memory. Do not use for very large data sets.
- Execute
- queryStream (query, options, callback)
- Execute
queryusingoptionsif given and pass a readable result stream to thecallbackfunction. queryformat for MongoDB is{collectionName: {field: value}}queryformat for PostgreSQL is{text: 'select foo from bar where foo between $1 and $2', values: [1,10]}callbacksignature isfunction (error, stream)
- Execute
- insert (insert, options, callback)
- Inserts
insertinto database usingoptionsif given. insertformat is{collectionOrTableName: {field: value}}callbacksignature isfunction (error, result)whereresultis the number of affected records.
- Inserts
- update (query, update, options, callback)
- Applies
updateto records matchingqueryusingoptionsif given. queryformat is{collectionOrTableName: {field: value}}updateformat is{collectionOrTableName: {field: value}}callbacksignature isfunction (error, result)whereresultis the number of affected records.
- Applies
- upsert (query, update, options, callback)
- Applies
updateto records matchingqueryusingoptionsif given. - A new record will be inserted if no matching records are found.
queryformat is{collectionOrTableName: {field: value}}updateformat is{collectionOrTableName: {field: value}}callbacksignature isfunction (error, result)whereresultis the number of affected records.
- Applies
Operation Definitions
The db parameter will be a Database instance configured for the specified database. The registry parameter is a storage object that is shared among all functions for the entire operation. The operation functions are executed in this order:
- init (db, registry, callback) (optional)
- Executed once and must call
callbackmethod to proceed with operation. - Useful for getting delta starting points or populating initial registry values.
- Executed once and must call
- extract (db, registry, process, done)
- Executed once, used to extract data from source database.
processmethod should be called once for each recorddonemethod should be called after all records have been processed
- transform (db, registry, data) (optional)
- Executed once for each record emitted by
extractfunctions. - Used to reshape data before insertion into target database.
- Called synchronously, so just
returnthe transformed data.
- Executed once for each record emitted by
- load (db, registry, data, callback)
- Executed once for each record emitted by
extractandtransformfunctions. - Can be used to populate registry with aggregated or incremental data. interval (db, registry, callback) (optional)
- Executed at regular intervals after load function
- Useful for recording incremental progress in case of failures.
- Executed once for each record emitted by
- exit (db, registry, callback) (optional)
- Executed once after all other functions have finished.
- Useful for cleaning up after operation, and for recording summary data.
Modules
A Mongres module must export a JSON object like this:
module.exports = {
db: {
mongo: {
type: 'mongodb',
name: 'test',
host: 'localhost',
port: 27017,
user: 'username',
pass: 'password'
},
postgres: {
type: 'postgresql',
name: 'tnt2',
host: 'localhost',
port: 5432,
user: 'username',
pass: 'password'
}
},
op: {
name: 'Sample Operation',
init: { // init is optional
// read data from "mongo" database into registry
mongo: [
function (db, registry, cb) {
// truncate the collection
db.remove(
{ // query
test: {} // matches all documents
},
cb // callback
);
},
function (db, registry, cb) {
db.queryArray(
{ // query
mongres: {
_id: "test"
}
},
{ // options
limit: 1,
fields: {
_id: 0,
lastDate: 1
}
},
function (err, docs) { // callback
if (err) return cb(err);
var doc = docs && docs.shift() || {};
registry.lastDate = doc.lastDate || new Date(0);
return cb(); // continue to "extract" step
}
);
}
]
},
extract: { // extract is required
// stream data from "postgres" database to load function(s)
postgres: function (db, registry, load, cb) {
db.queryStream(
{ // query
text: " \
SELECT \
GENERATE_SERIES($1::int, $2::int) AS series, $3::timestamp + \
(RANDOM()::NUMERIC(3,2) || ' days')::INTERVAL AS date, \
ARRAY['a','b','c'] AS arr,'{\"a\":{\"b\":\"c\"}}'::json AS obj \
ORDER BY date \
",
values: [1, 1000, registry.lastDate]
},
// options parameter is not required
function (err, stream) { // callback
if (err) return cb(err);
var error = null, procs = 0, limit = 10;
// call "load" method for each record
stream.on('data', function (data) {
procs++; // increment process counter
// pause stream if procs over limit
if (procs >= limit) stream.pause();
load(data, function (err) {
procs--; // decrement process counter
if (err) { // pass the error
stream.emit('error', err);
return stream.emit('end');
}
// resume stream when procs are under limit
if (!err && procs < limit) stream.resume();
});
});
// record errors for 'end' event
stream.on('error', function (err) {
error = err;
});
// execute callback function when ended
return stream.on('end', cb.bind(this, error));
}
);
}
},
transform: { // transform is optional
// transform data from "postgres" database into a different format
postgres: function (db, registry, data) {
return { // transformed data
_id: data.series,
date: data.date,
arr: data.arr,
obj: data.obj
};
}
},
load: { // load is required
// load data from all sources into "mongo" database
mongo: function (db, registry, data, cb) {
if (!data) return cb('No data was given.');
db.upsert(
{ // query
test: {
_id: data._id
}
},
{ // update
test: data
},
{ // options
w: 1,
journal: true
},
function (err, result) { // callback
if (err) return cb(err);
// determine most recent changed date
if (!registry.lastDate || registry.lastDate < data.date) {
registry.lastDate = data.date;
}
// proceed to next load function(s) or exit
return cb(null, result);
}
);
}
},
interval: { // intervals are optional
100: { // execute every 100 records
// write progress data to "mongo" database
mongo: function (db, registry, cb) {
db.upsert(
{ // query
mongres: {
_id: 'test'
}
},
{ // update
mongres: {
$set: {
lastDate: registry.lastDate
}
}
},
{ // options
w: 1,
journal: true
},
cb // callback
);
}
}
},
exit: { // exit is optional
// write result data to "mongo" database
mongo: function (db, registry, cb) {
db.upsert(
{ // query
mongres: {
_id: 'test'
}
},
{ // update
mongres: {
$set: {
lastDate: registry.lastDate
}
}
},
{ // options
w: 1,
journal: true
},
cb // callback
);
}
}
}
};Contributing
Contributions, comments, and issue reports are always welcome. Send pull requests to the master branch with your proposed changes or create issues to report bugs.
Testing
Copy tests\db.sample.js to tests\db.js and modify the credentials to match your development environment. You will need one MongoDB and one PostgreSQL database set up and running to complete the tests. To start tests, run npm test in your Mongres directory.