1.0.55 • Published 5 months ago

@hkube/producer-consumer v1.0.55

Weekly downloads
48
License
ISC
Repository
-
Last release
5 months ago

Producer consumer

Build Status Coverage Status

producer consumer message queue based on Redis built for Node.js

Installation

$ npm install @hkube/producer-consumer

Basic usage

Producer

const { Producer } = require('@hkube/producer-consumer');
const options = {
    job: {
        type: 'test-job',
        data: { action: 'bla' },
    }
}
const producer = new Producer(options);
const job = await producer.createJob(options);

Consumer

const { Consumer } = require('@hkube/producer-consumer');
const options = {
    job: {
        type: 'test-job'
    }
}
const consumer = new Consumer(options);
consumer.on('job', (job) => {
    // do some work...
    job.done(null, {result: true}); // success

    // or
    job.done(new Error('oopps..')); // failed
});
consumer.register(options);

Schema

The createJob method will validate the options against the schema

const schema = {
    "properties": {
        "job": {
            "type": "object",
            "properties": {
                "type": {
                    "type": "string",
                    "description": "the job type"
                },
                "waitingTimeout": {
                    "type": "integer",
                    "description": "time wait before the job is active/failed/completed"
                },
                "resolveOnStart": {
                    "type": "boolean",
                    "description": "should resolve when the job is in active state"
                },
                "resolveOnComplete": {
                    "type": "boolean",
                    "description": "should resolve when the job is in completed state"
                }
            }
        },
        "queue": {
            "type": "object",
            "properties": {
                "priority": {
                    "type": "integer",
                    "description": "ranges from 1 (highest) to MAX_INT"
                },
                "delay": {
                    "type": "integer",
                    "description": "miliseconds to wait until this job can be processed."
                },
                "timeout": {
                    "type": "integer",
                    "description": "milliseconds after which the job should be fail with a timeout error"
                },
                "attempts": {
                    "type": "integer",
                    "description": "total number of attempts to try the job until it completes"
                },
                "removeOnComplete": {
                    "type": "boolean",
                    "description": "If true, removes the job when it successfully completes",
                    "default": false
                },
                "removeOnFail": {
                    "type": "boolean",
                    "description": "If true, removes the job when it fails after all attempts",
                    "default": false
                }
            }
        },
        "setting": {
            "type": "object",
            "properties": {
                "prefix": {
                    "type": "string",
                    "default": "queue",
                    "description": "prefix for all queue keys"
                },
                "redis": {
                    "type": "object",
                    "properties": {
                        "host": {
                            "type": "string",
                            "default": "localhost"
                        },
                        "port": {
                            "type": "integer",
                            "default": 6379
                        }
                    }
                }
            }
        }
    }
}

Events

const { Producer } = require('@hkube/producer-consumer');
producer.on('job-failed', (jobId, err) => {       
}).on('job-completed', (jobId, result) => {           
}).on('job-active', (jobId) => {             
});
producer.createJob(options);

const options = {
    job: {
        type: 'test-job',
        data: { action: 'bla' },
    }
}
const producer = new Producer(options);
const job = await producer.createJob(options);

Full Detailed Example

const { producer } = require('@hkube/producer-consumer');
const options = {
    job: {
        resolveOnStart: false,
        resolveOnComplete: false,
        type: 'test-job',
        data: { action: 'bla' },
        waitingTimeout: 5000
    },
    queue: {
        priority: 1,
        delay: 1000,
        timeout: 5000,
        attempts: 3,
        removeOnComplete: true,
        removeOnFail: false
    },
    setting: {
        prefix: 'sf-queue',
        redis: {
            host: '127.0.0.1',
            port: 6379,
            cluster: true,
            sentinel: false
        }
    }
}

const job = await producer.createJob(options);

License

MIT

1.0.55

5 months ago

1.0.54

6 months ago

1.0.53

6 months ago

1.0.52

3 years ago

1.0.51

3 years ago

1.0.49

4 years ago

1.0.50

4 years ago

1.0.48

4 years ago

1.0.47

4 years ago

1.0.46

5 years ago

1.0.45

5 years ago

1.0.44

6 years ago

1.0.43

6 years ago

1.0.42

6 years ago

1.0.41

6 years ago

1.0.40

6 years ago

1.0.39

6 years ago

1.0.38

6 years ago

1.0.37

6 years ago

1.0.36

6 years ago

1.0.34

6 years ago

1.0.33

7 years ago

1.0.32

7 years ago

1.0.31

7 years ago

1.0.30

7 years ago

1.0.29

7 years ago

1.0.28

7 years ago

1.0.27

7 years ago

1.0.26

7 years ago

1.0.25

7 years ago

1.0.24

7 years ago

1.0.23

7 years ago

1.0.22

7 years ago

1.0.21

7 years ago

1.0.20

7 years ago

1.0.19

7 years ago

1.0.18

7 years ago

1.0.17

7 years ago

1.0.16

7 years ago

1.0.15

7 years ago

1.0.14

7 years ago

1.0.13

7 years ago

1.0.12

7 years ago

1.0.11

7 years ago