2.0.8 • Published 8 days ago

@voicenter-team/failover-amqp-pool v2.0.8

Weekly downloads
-
License
MIT
Repository
github
Last release
8 days ago

failover-amqp-pool

amqp pool client

Usage

Publishing strategies:

  • "rr" - round robin (for all available channels)
  • "all" - to all available channels
  • function(msg, channels) {} - callback

Consume

config:

[
  {
    "connection": {
      "host": "127.0.0.1",
      "port": 5672,
      "ssl": false,
      "username": "user",
      "password": "password",
      "vhost": "/",
      "heartbeat": 5
    },
    "channel": {
      "exchange": {
        "name": "TestExchange",
        "type": "fanout",
        "options": {
          "durable": true
        }
      },
      "queue": {
        "name": "TestQueue",
        "options": {
          "exclusive": false,
          "durable": true
        }
      },
      "binding":  {
        "enabled": true,
        "pattern": "routing_key",
        "options": {}
      },
      "prefetch": 5
    }
  },
  {
    "connection": {
      "host": "127.0.0.1",
      "port": 5672,
      "ssl": false,
      "username": "test",
      "password": "password",
      "vhost": "/",
      "heartbeat": 5
    },
    "channel": {
      "exchange": {
        "name": "TestExchange2",
        "type": "fanout",
        "options": {
          "durable": true
        }
      },
      "queue": {
        "name": "TestQueue2",
        "options": {
          "exclusive": false,
          "durable": true
        }
      },
      "binding":  {
        "enabled": true,
        "pattern": "",
        "options": {}
      },
      "prefetch": 5
    }
  },
  {
    "connection": {
      "host": "127.0.0.1",
      "port": 5672,
      "ssl": false,
      "username": "test",
      "password": "password",
      "vhost": "/",
      "heartbeat": 5
    },
    "channel": {
      "exchange": {
        "name": "TestExchange3",
        "type": "fanout",
        "options": {
          "durable": true
        }
      },
      "queue": {
        "name": "TestQueue3",
        "options": {
          "exclusive": false,
          "durable": true
        }
      },
      "binding":  {
        "enabled": true,
        "pattern": "routing_key",
        "options": {}
      },
      "prefetch": 5
    }
  },
  {
    "connection": {
      "host": "127.0.0.1",
      "port": 5672,
      "ssl": false,
      "username": "test",
      "password": "password",
      "vhost": "/",
      "heartbeat": 5
    },
    "channel": {
      "exchange": {
        "name": "TestExchange4",
        "type": "fanout",
        "options": {
          "durable": true
        }
      },
      "queue": {
        "name": "TestQueue4",
        "options": {
          "exclusive": false,
          "durable": true
        }
      },
      "binding":  {
        "enabled": true,
        "pattern": "routing_key",
        "options": {}
      },
      "prefetch": 5
    }
  }
]

code:

let cfg = require('./config1.json');
const AMQPPool = require('./index');
let i = 0;
let pool = new AMQPPool(cfg);
pool.start();
pool.on('ready', (_channel) => {
    (function (channel) {
        channel.on("message", (message) => {
            setTimeout(() => {
                console.log('<< ' + message.content.toString() + " -> " + i);
                channel.ack(message);
                i++;
            }, 500);
        });
        channel.consume();
    })(_channel);
});

Feed

config:

[
  {
    "connection": {
      "host": "127.0.0.1",
      "port": 5672,
      "ssl": false,
      "username": "user",
      "password": "password",
      "vhost": "/",
      "heartbeat": 5
    },
    "channel": {
      "exchange": {
        "name": "TestExchange",
        "type": "fanout",
        "options": {
          "durable": true
        }
      }
    }
  },
  {
    "connection": {
      "host": "127.0.0.1",
      "port": 5672,
      "ssl": false,
      "username": "test",
      "password": "password",
      "vhost": "/",
      "heartbeat": 5
    },
    "channel": {
      "exchange": {
        "name": "TestExchange1",
        "type": "fanout",
        "options": {
          "durable": true
        }
      }
    }
  },
  {
    "connection": {
      "host": "127.0.0.1",
      "port": 5672,
      "ssl": false,
      "username": "test",
      "password": "password",
      "vhost": "/",
      "heartbeat": 5
    },
    "channel": {
      "exchange": {
        "name": "TestExchange2",
        "type": "fanout",
        "options": {
          "durable": true
        }
      }
    }
  },
  {
    "connection": {
      "host": "127.0.0.1",
      "port": 5672,
      "ssl": false,
      "username": "test",
      "password": "password",
      "vhost": "/",
      "heartbeat": 5
    },
    "channel": {
      "exchange": {
        "name": "TestExchange3",
        "type": "fanout",
        "options": {
          "durable": true
        }
      }
    }
  }
]

code:

let cfg = require('./config.json');
const AMQPPool = require('./index');
let i = 0;

let pool = new AMQPPool(cfg);
pool.start();

// Publish a message rr with internal counter
setInterval(() => {
    pool.publish("Mesage-" + i, 'rr', "pattern", {"headers": {"asd": "asd"}});
    i++;
}, 1000);

// Publish a message with a callback which implements rr
let rr_i = 0;
setInterval(() => {
    pool.publish("Mesage-" + i, function (channels) {
        if (rr_i >= channels.length) {
            rr_i = 0;
        }
        return channels[rr_i++];
    });
    i++;
}, 1000);

// Publish a message to all available channels
pool.on('ready', (_channel) => {
    (function (channel) {
        setInterval(() => {
            channel.publish("Mesage-" + i, "all");
            console.log("Mesage-" + i);
            i++;
        }, 1000);
    })(_channel);
});

###Transport

const winston = require('winston');
const WinstonAMQPPoolTransport = require("./WinstonAMQPPoolTransport.js");

const logger = winston.createLogger({
level: 'info',
format: winston.format.json(),
defaultMeta: {service: 'user-service'},
transports: [
new WinstonAMQPPoolTransport({
    filename: 'error.log',
    level: 'error',
    pool: [{
        "connection": {
            "host": "127.0.0.1",
            "port": 5672,
            "ssl": false,
            "username": "user",
            "password": "password",
            "vhost": "/",
            "heartbeat": 5,
            "reconnectInterval": 2000
        },
        "channel": {
            "exchange": {
                "name": "TestExchange",
                "type": "fanout",
                "options": {
                    "durable": true
                }
            }
        }
    }],
    strategy: 'all',
    topic: ""
});

setInterval(() => {
    console.log(1)
    logger.log( 'error', 'asdasdsadsad');
}, 1000);
2.0.8

8 days ago

2.0.5

12 days ago

2.0.4

12 days ago

2.0.7

12 days ago

2.0.6

12 days ago

2.0.3

15 days ago

2.0.2

16 days ago

2.0.1

16 days ago

2.0.0

16 days ago

1.6.9

7 months ago

1.6.11

7 months ago

1.6.8

7 months ago

1.6.10

7 months ago

1.6.7

7 months ago

1.6.6

7 months ago

1.6.4

11 months ago

1.6.3

11 months ago

1.6.2

11 months ago

1.6.1

11 months ago

1.6.5

10 months ago

1.6.0

1 year ago

1.4.2

2 years ago

1.2.13

2 years ago

1.2.16

2 years ago

1.2.17

2 years ago

1.2.14

2 years ago

1.2.15

2 years ago

1.2.12

2 years ago

1.2.10

2 years ago

1.2.11

2 years ago

1.2.9

3 years ago

1.2.8

3 years ago

1.2.7

3 years ago

1.2.6

3 years ago

1.2.5

3 years ago

1.2.4

3 years ago

1.1.1

3 years ago

1.2.3

3 years ago

1.2.2

3 years ago

1.1.2

3 years ago

1.0.5

3 years ago

1.0.4

3 years ago

1.0.3

3 years ago

1.0.2

3 years ago

1.0.1

3 years ago

1.0.0

3 years ago