0.0.16 • Published 6 years ago

mysql-binlog-emitter v0.0.16

Weekly downloads
9
License
MIT
Repository
github
Last release
6 years ago

mysql-binlog-emitter

A nodejs mysql/mariadb slave replication event emitter.

This is a high level implementation of mysql's binary log protocol, if your are only interested in changes occurring in your database, check out mysql-event-emitter.

* It is tested against mariadb 10.1.31 and binlog protocol version 4 *

Contents

Install

npm install mysql-binlog-emitter

Setup

Mysql Server

Enable binary log replication in /etc/mysql/my.cnf

[mysqld]
server-id        = 1
log_bin          = /var/log/mysql/mysql-bin
log_bin_index    = /var/log/mysql/mysql-bin.index
binlog-format    = row                              # needed for row events
# Optional
expire_logs_days = 10
max_binlog_size  = 100M

Mysql User

Give your user the rights to read binary logs

GRANT REPLICATION SLAVE, REPLICATION CLIENT, SELECT ON *.* TO '[USER]'@'[HOST]'

Reset Binary Log

After changing the mysql configuration you may have to run RESET MASTER once.
To do that your user needs the RELOAD right

GRANT RELOAD ON *.* TO '[USER]'@'[HOST]'

Usage

const MyBinlogEmitter = require('mysql-binlog-emitter');
const Event = MyBinlogEmitter.Events;

var mbe =  MyBinlogEmitter({
  "mysql": {
    "user": "[USER]",
    "password": "[PWD]"
  }
});

mbe.on(Event.ANY, function(type, ...data){
  console.log(Event.eventName(type));
}):

mbe.start();

Options

  • For a single instance no binlog option is needed
  • Any mysql connection and pool option is supported.

Defaults

{  
  // mysql pool options
  "mysql": {
    "[socket]": "[SOCKET]",
    "[host]":   "127.0.0.1",
    "user":     "[USER]",
    "password": "[PWD]",
    "[database]": null        // (is not required)
  }
  // binlog options
  "binlog": {
    "slaveId": 1,             // Needs to be counted up, if more than one instance is running  
    "lastPos": 0,             // Skip all events < lastPos
    "lastTime": 0,            // Timstamp; Skip all events < lastTime 
    "recoverTimeout": 240,    // Time in ms between reconnection attempts. (Eg. on a mysql server restart)
    "hasChecksum": null,      // Auto detected; Boolean; Whether the server uses a checksum table
    "version": 0,             // Auto detected; (Only version 4 is supported right now)
  }
}

//: <> ("values": false, // not yet implemented)

Events

NoNameData.
100ANYEventType, DataWildcard event; Emitts all events incl. BinlogEvents.
101CONNECTED
102DISCONNECTED
103RECONNECTING
104RECOVERING
110BINLOGPacketEmits all BinlogEvents, except skipped one's
111SKIPPacketEmits a packet w/out data, when a packet is skipped
120TIMEOUT
121END
400ERRORErrorAny Error
401ERROR_SQLErrorSQL Error
402ERROR_COMErrorSQL Com Error
403ERROR_PARSEError, PacketBinlog Parse Error
404ERROR_PARSE_DATAError, PacketBinlog Parse Data Error
405ERROR_RECOVERErrorRecover Error; (Will not stop the emitter from recovering)

BinlogEvents

As described in MySQL Internals Manual > Binlog Event Types

NoName
0UNKNOWN_EVENT
1START_EVENT_V3
2QUERY_EVENT
3STOP_EVENT
4ROTATE_EVENT
5INTVAR_EVENT
6LOAD_EVENT
7SLAVE_EVENT
8CREATE_FILE_EVENT
9APPEND_BLOCK_EVENT
10EXEC_LOAD_EVENT
11DELETE_FILE_EVENT
12NEW_LOAD_EVENT
13RAND_EVENT
14USER_VAR_EVENT
15FORMAT_DESCRIPTION_EVENT
16XID_EVENT
17BEGIN_LOAD_QUERY_EVENT
18EXECUTE_LOAD_QUERY_EVENT
19TABLE_MAP_EVENT
21WRITE_ROWS_EVENTv0
22UPDATE_ROWS_EVENTv0
23DELETE_ROWS_EVENTv0
24WRITE_ROWS_EVENTv1
25UPDATE_ROWS_EVENTv1
26DELETE_ROWS_EVENTv1
27INCIDENT_EVENT
28HEARTBEAT_EVENT
29IGNORABLE_EVENT
30ROWS_QUERY_EVENT
31WRITE_ROWS_EVENTv2
32UPDATE_ROWS_EVENTv2
33DELETE_ROWS_EVENTv2
34GTID_EVENT
35ANONYMOUS_GTID_EVENT
36PREVIOUS_GTIDS_EVENT

Additional wildcard events

NoNameDescription
37ROWS_EVENTEmitted on WRITE/UPDATE/DELETE-ROWS_EVENT
38WRITE_ROWS_EVENTEmitted on WRITE_ROWS_EVENT/v0/v1/v2; Changes the packet.eventType
39UPDATE_ROWS_EVENTEmitted on UPDATE_ROWS_EVENT/v0/v1/v2; Changes the packet.eventType
40DELETE_ROWS_EVENTEmitted on DELETE_ROWS_EVENT/v0/v1/v2; Changes the packet.eventType

Packet

Packets consist of an event header packet and an event body packet.data.

BinlogPacket {                        // Packet Head
  timestamp: [UNIX Timestamp w/ ms],  // Log Time
  eventType: [Integer],               // BinlogEvent Type
  serverId: [Integer],                // Slave ID
  eventSize: [Integer],
  flags: [Integer],
  logPos: [Integer],                  // Log Position
  version: [Integer],                 // Protocol Version
  skipped: [Boolean],                 // Whether the packet was skipped
  data: [Object]                      // Packet Body
}

(If packet.data is null the parser for it is not implemented yet)
(Row events do not contain any data yet.)

Examples

const MyBinlogEmitter = require('mysql-binlog-emitter');
const Event = MyBinlogEmitter.Events;
const BinlogEvent = MyBinlogEmitter.BinlogEvents;

var mbe =  MyEmitter({
  "mysql": {
    "user": "[USER]",
    "password": "[PWD]"
  }
});

mbe.on(BinlogEvent.WRITE_ROWS_EVENTv1, function(packet){
  
  console.log(Event.eventName(packet.eventType)); // Event.eventName includes BinlogEvents
  
  console.log(BinlogEvent.eventName(packet.eventType)); // BinlogEvent.eventName includes BinlogEvents only
  
  console.log(packet); 

}):

mye.start(function(err){
  console.log('started');
});

// get last log position
var pos = mye.pos;

// get last log time
var time = mye.time;

// gracefully restart
// packets will be skipped until last pos | time
mye.restart(function(err){
  console.log('restarted');
});

mye.stop(function(err){
  console.log('stopped');
});

Credits

mysql
Faking a slave: Subscribing to mysql row-based-replication changes
Dive into MySQL replication protocol and go-mysql
ZongJi