3.4.12 • Published 4 years ago
wy-mongo-es v3.4.12
Mongo-ES
A MongoDB to Elasticsearch connector
从mongo-es fork 过来的项目,由于默认mongdb中的表映射进入es需要每个字段都写对应关系, 我做了一个默认映射全部字段的修改, 在task 定义中加入了一个参数 transAll,映射全部字段,默认为false,
不仅如此,还改动了 scan这个任务的具体拉取数据方式,原本是 new ObjectID(id) 我给直接改成 string了
Installation
npm i -g mongo-esUsage
Command Line
# normal mode
mongo-es ./config.json
# debug mode, with debug info printed
NODE_ENV=dev mongo-es ./config.jsonProgrammatically
const fs = require('fs')
const Redis = require('ioredis')
const { Config, Task, run } = require('mongo-es')
const redis = new Redis('localhost')
Task.onSaveCheckpoint((name, checkpoint) => {
return redis.set(`mongo-es:${name}`, JSON.stringify(checkpoint))
})
// this will overwrite task.from in config file
Task.onLoadCheckpoint((name) => {
return redis.get(`mongo-es:${name}`).then(JSON.parse)
})
run(new Config(fs.readFileSync('config.json', 'utf8')))Concepts
Scan phase
scan entire database for existed documents
Tail phase
tail the oplog for documents' create, update or delete
Configuration
Structure:
{
"controls": {},
"mongodb": {},
"elasticsearch": {},
"tasks": [
{
"extract": {},
"transform": {},
"load": {}
}
]
}controls
mongodbReadCapacity- Max docs read per second (default:10000). (optional)elasticsearchBulkInterval- Max bluk interval per request (default:5000). (optional)elasticsearchBulkSize- Max bluk size per request (default:5000). (optional)indexNameSuffix- Index name suffix, for index version control. (optional)
mongodb
url- The connection URI string, eg:mongodb://user:password@localhost:27017/db?replicaSet=rs0. notice: must use aadminuser to access oplog.options- Connection settings, see: MongoClient. (optional)
elasticsearch
options- Elasticsearch Config Options, see: Configuration.indices- If set, auto create indices when program start, see: Indeces Create. (optional)
task.from
phase-scanortailtime- tail oplog with query:{ ts: { $gte: new Timestamp(0, new Date(time).getTime() / 1000) } }id- scan collection with query{ _id: { $gte: id }}改动了一下,id直接使用传入的string 而不是 ObjectID(id)
task.extract
db- Database name.collection- Collection name in database.projection- Projection selector, see Projection.
task.transform
mapping- The field mapping from mongodb's collection to elasticsearch's index.parent- The field in mongodb's collection to use as the_parentin elasticsearch's index. (optional)transAll- If set, all documents will be indexed
task.load
index- The name of the index.type- The name of the document type.body- The request body, see Put Mapping.