yodoya-message-adapter v1.0.0
yodoya-message-adapter
Yodoya internal mesage adapter
Usage:
bindIncoming
bindIncoming binds the wiring socket (whether it be nanomsg, or kafka) to
a local observable stream (for an example Kefir). The function also takes an
option object whose members are the following:
messageName is a string of data name to be bound to on event handler.
filter is a function to specify whether to emit a decoded message to the obs
stream.
getPayload is a function to obtain payload from a received data.
For an example, for usage with kafka-node and Kefir:
const messageFilter = function (message) {
return message.topic === 'mytopic'
}
const bindReqOptions = {
messageName: 'message',
filter: messageFilter,
getPayload: function (buf) {
return buf.value
}
}
let kstream = Kefir.stream(function (_emitter) {
kemitter = _emitter
adapter.bindIncoming(consumer, kemitter, bindReqOptions, (err, res) => {
if (err) return done(err)
})
})sendEvent
sendEvent sends out Activetick data through some wire after encoding
it with a AT specific protocol buffer schema. The function also takes in
an option object:
formatPayload is a function to format a message intended to be sent out
given that outbound message may encapsulate the encoded data. For an example,
in Kafka, outbound message needs to specify topic and further encode the
protobuf encoded data into binary buffer.
sendData is a function that defines whether wiring library's send method
is synchronous or asynchronous.
For an example, for usage with kafka-node:
const makeOutboundMsg = function ( encodedData ) {
let message = {
topic: 'mytopic',
messages: Buffer.from(encodedData)
}
return [message]
}
const respOptions = {
formatPayload: makeOutboundMsg,
sendData: function (socket, payload, cb) {
return socket.send(payload, cb)
}
}
adapter.sendEvent(producer, response, respOptions, (err, res) => {
if (err) return done(err)
})8 years ago