0.1.7 • Published 15 days ago

frakture-workerbots v0.1.7

Weekly downloads
23
License
-
Repository
github
Last release
15 days ago

frakture-workerbots

Frakture Worker Bots

Documentation

An overview of FraktureBots and their inner workings and standard implementations.

Method Naming Conventions

There are several prefixes/postfixes that should be kept in mind when building out standard Frakture (and bot specific) methods:

  • get* implies getting something from another system (usually in the form of a file or stream) - a CRM, FTP, etc. Examples of this are getPeopleFile, getTransactionFile, etc. The Postfix should indicate the resulting object (stream or file).
  • list* is a version of get that returns an array of objects, without writing them to a file. Examples of this are listPeople, listMessages, listTransactions, etc.
  • load* implies loading a table in the frakture warehouse or mongo database (almost always the warehouse) using data from an external system - it usually makes an implicit call to one of the get* or list* functions. (This does not include the warehouse methods loadTable and loadTableFromQuery). Examples are loadPeople, loadMessages, etc.
  • load*File is a version of the load that expects the get* method to have been called already, and usually has an option for filename. These methods are used internally to load* methods, and should almost never be called explicitly.
  • build* methods are used to load tables within Frakture based on other tables that are already in Frakture (this makes them distinct from load methods).

Table Naming Conventions

  • Non-global tables should always have the bot prefix (defaults to the bot_id, but can be overridden), and the object name should always be singular (ie bot_id_person).
  • Get the bot prefix using this.getTablePrefix().
  • Access the global table prefix using this.getGlobalTablePrefix(options,(e,global_table_prefix) => {...})

Standard Output, Option Structures, and Callbacks

There are a few standard conventions to follow in option and output structures.

table

Most DBBot methods working with a specific table will have an option table, while many will have a sql option if they are interacting with a query.

If a Bot method results in a table being loaded, the result of the method should include a field table and the number of rows loaded as records. These are already included in the result of loadTable, loadTableFromQuery - so if your bot method uses these methods, it's probably that you use that result as the final result.

filename

Bot methods that work on a file should have an option field filename which is the path to the file being used or modified.

Bot methods that produce a file should have a result field filename which is the final file. Most FileManipulatorBot bot methods follow this paradigm.

callback

The callback interface of Bot methods is:

(e,r,m) => { ... }

Where e is the error (if one occurred), r is the result (if the method completed successfully), and m is the "modification" to send to the job framework.

Most methods will only use the (e,r) syntax - those that have a result after a single run should stick to this method.

If you want to save state to the job framework between runs and signal the framework to call the method again later (if, for instance, the bot will start a job and then wait for it), specify m as follows:

{
  status: 'pending', // signals framework to try running this later
  start_after_timestamp, // when the job should run, typically written as js.getRelativeDate("+5m") or similar
  options: {
    // modifications to the job options that will be passed to the method on the next run
  }
}

Parameter Naming Conventions

Bot methods that take an object and callback should always have the signature:

function(options,callback) {
}

Callbacks passed in should always use e to refer to the first (error) parameter, and m to refer to the third (modification) parameter if present.

The result parameter should be representative of what is returned, with the following common parameter names as examples:

t should be used to represent the result of a function that loads a table and has a table result field:

dbbot.loadTable({...}, (e,t) => ...)

f should be used to represent the result of a function that writes a file and has a filename result field:

filebot.objectSreamToFile({...}, (e,f) => ...)

s should be used to represent the result of a function that creates a stream result field.

Warehouse Bot

DBBot is the overarching implementation for interacting with a warehouse.

A Bot can get access to it's warehouse bot using the following call:

this.getWarehouseBot({}, (e,dbbot) => {
  if(e) return callback(e);
  ...
  dbbot.loadQuery(...);
})

The most commonly used methods are: loadTable: Used to load a file into a table (a temporary table if not specified). It has the following interface:

dbbot.loadTable({
  filename: `...`, // csv, txt, xls supported. a required field
  table: `table_name`, // not required
  upsert: (true|false),
  ignore_dupes: (true|false)
}, (e,t) => {
  ...
});

loadTableFromQuery: Used to load the result of a sql query into a table (a temporary table if not specified):

dbbot.loadTableFromQuery({
  sql: `...`, // required
  table: `table_name`, // not required
  upsert: (true|false),
  ignore_dupes: (true|false)
}, (e,t) => {
  ...
});

Bot loadTable and loadTableFromQuery have the same result structure:

{
  table, // name of the table loaded into
  records // the number of records that were loaded into the table
}

runQuery: Used to run a query - the results are provided as the result in the callback. Specifying target_format will cause the results to be written to a file instead

dbbot.runQuery({
  sql: `...`, // required,
  target_format: undefined | 'csv' | 'txt' // if specified, the results will be written to file
}, (e,r) => {
  ...
});

If target_format is specified, the result will be an object with field filename, containing the name of the results file. Otherwise, r will be an array of results.

FileManipulatorBot

FileManipulatorBot provides methods for manipulating files on the local file system. Most of it's methods take a filename (possibly multiple) as a parameter and will result in an object that itself has a filename containing the result of the method.

Access FileManipulatorBot in a bot method using: this.getFilebot() (synchronously) or this.getFilebot({}, (e,fbot) => {...}) (asynchronously).

objectStreamToFile: Used to write a stream (or array) to a file.

this.getFilebot().objectStreamToFile({
  stream: [...], // array or object stream (required)
  target_format: 'csv' | 'txt' // not required
}, (e,f) => {
})

The result r will have the format:

{
  filename: ' ... ' // the name of the resulting file
}

transform: Used to create a new file based on the contents of another file, based on the transform method provided to the method.

this.getFilebot().transform({
  filename: '...', // required,
  transform: (o,enc,cb) => {
    ...
    // cb _must_ be called
  }, // transform method, called per row. required
  target_format: undefined | 'csv' | 'txt' // specifies the format of the resulting file
}, (e,f) => {
})

The result r will have the following structure:

{
  filename, // the result of the file, if there were records
  records, // the number of records in the resulting file
  no_data // will be `true` if there were no files to write
}

The method provided to transform has the following interface:

(o,enc,cb) => {
}

where o is the record currently being transformed, enc is the "encoding" (meaningless in most contexts), and cb is used to signal the end of the transformation. cb(e,r) takes up to two parameters:

cb() signals to the transform that there was no result - this record will be ignored, and no row will be written to file

cb(e) signals to transform that there was an error - the final callback will be called with the error provided as the first parameter

cb(null, {...}) will write the value provided as the second parameter to the result file

BaseBot Common Tools

fraktureAutoInject: Recreates the behavior of async.autoInject, making use of the Frakture checkpoint/modification infrastructure, isolating the calls so that they can call checkpoint internally without breaking the overall structure of the job. Instead of the original autoInject format, functions passed to this method must use the function(...) {} syntax, instead of the array () => {} syntax, and have two final arguments that are opts,cb. The first is the options that have previously been set using checkpoint or modify (it's an empty object at first). The second is the callback used to indicate the function is done, and to specify the error, the result, or the modification.

writeStreamToTempFile: Creates a temporary file and writes the contents of a text stream to that file. Used as:

this.writeStreamToTempFile({
  stream: request({...}),
  target_format: 'csv|txt'
}, (e,f) => {
  if(e) return callback(e);
  debug(f.filename);
  ....
})

Export Job Pattern

Many systems support "exporting" data - this usually (not always) requires starting a job, waiting for that job to complete, and then pulling the results. While a bot could continue running while it waits for the data, this would consume resources that should be available by the system. So, instead, it should use the checkpoint/modifying system instead.

The body of these functions should look something like:

function export(options,callback) {
  if(options.external_job_id) return this.getJobStatus(options, (e,r) => {
    if(e) return callback(e);
    if(r.job_still_running) return callback(null, null, {status:'pending', start_after_timestamp: js.getRelativeDate("+1m")});
    this.getJobResults(r, callback);
  }
  
  this.startJob(options,(e,r) => {
    if(e) return callback(e);
    const {external_job_id} = r;
    callback(null, null, {status: 'pending', start_after_timestamp:js.getRelativeDate("+1m")});
  });
}

Where the external_job_id is the unique identifier of the job within the external system. Note - this can make use of fraktureAutoInject - like so:

this.fraktureAutoInject({
  job: function(opts,cb) {
    this.startJob(options, (e,r) => cb(e,r))
  },
  results: function(job,opts,cb) {
    this.getJobStatus(job, (e,status) => {
      if(e) return cb(e);
      if(status.is_still_running) return cb(null,null, {status:'pending' ...});
      this.getJobResults(job, cb);
    })
  }
}, (e,r,m) => {
  if(e||m) return callback(e,null,m);
  callback(null, r.results);
});

Channel Bot Implementations

Channel Bots typically have a number of submodules representing different datatypes within an object. These types will usually map to a Frakture standard data type (although some systems have addition types). A bot will represent these submodules in it's metadata - for example:

{
  metadata: {
    auth_fields: {
      ...
    }
  },
  
  People: require('./People),
  Transactions: require('./Transactions),
  ...
}

The above shows how to include the People and Transaction submodules for a Bot (assuming the files exist).

Message Submodules

Message Submodules should inherit from MessageBase:

const util = require('util');
const MessageBase = require('frakture-workerbots')('MessageBase');
function Bot(bot) { MessageBase.call(this,bot); }
util.inherits(Bot, MessageBase);

And implement the following methods:

listMessages: Called by loadMessages:

Bot.prototype.listMessages = function(options, callback) {
  ...
};

Which should result in an object of the form:

{
  messages: [...], // array of message objects
  records: , // number of records
}

Each item in messages should be an object that maps to the Frakture standard (and can have additional, object structured data). It will ultimately be stored in mongo, and then mapped from mongo to the message table in the warehouse by the loadMessages call.

Important elements on the message objects are:

{
  remote_id, // the unique identifier for the message in the source system
    
 }

People Submodules

People Submodules should inherit from PersonBase:

const util = require('util');
const PersonBase = require('frakture-workerbots')('PersonBase');
function Bot(bot) { PersonBase.call(this,bot); }
util.inherits(Bot,Personbase);

And should implement the following methods (based on support within for the system):

getPeopleFile: It is called by loadPeople, and should have the following interface:

Bot.prototype.getPeopleFile = function(options, callback) {
  ...
};

Where options will be provided a sub set of the following options:

{
  start, // date or relative date ("2020-01-01", or "-1y", etc)
  end, // date or relative date ("2020-01-01", or "-1y", or "now")
  custom_fields: // optional list of custom fields to narrow the result to
}

The method should ideally result in an object of the form:

{
  filename, // the resulting filename
  records, // the number of records
  no_data // true if there was no filename, or if the number of records is 0
}

Where the records follow the Frakture standard:

{
  remote_person_id,
  last_modified,
  date_created
  ...
}

And the records are filtered to only those that were modified (or created) within the start/date range. However, not all systems support this. If the underlying system only supports date_created instead of last_modified, use that instead.

pushPerson: For those systems that support creating or updating person records.

Once implemented, this method should not be called directly, but managed using queuedPushPerson, which manages the records using a queue table.

Common Libraries

request

request - used to interact with web services, websites, apis, etc

async

async - used to manage control flow using callback based asynchronous functions