0.0.21 • Published 8 months ago
@drivly/kafka.do v0.0.21
kafka.do
kafka.do
is a simple API for managing Kafka-based queues. Below are the available endpoints.
Endpoints
List all registered servers
GET /
Consume from a queue
GET /:queue
Produce a message to a queue
GET /:queue/send/:message
Send a batch of messages
POST /:queue/sendBatch
Payload:
["message1", "message2"]
Acknowledge all messages as consumed
GET /:queue/ackAll
Mark all messages to be retried
GET /:queue/retryAll
Acknowledge a message as consumed
GET /:queue/ack/:messageId
Mark a message to be retried
GET /:queue/retry/:messageId
Parameters
Each queue endpoint registers a topic if it is not already registered and accepts the following parameters to change queue behavior:
maxBatchSize
: The maximum number of messages allowed in each batch.maxBatchTimeout
: The maximum number of seconds to wait until a batch is full.maxRetries
: The maximum number of retries for a message, if it fails or retryAll is invoked.deadLetterQueue
: The name of another queue to send a message if it fails processing at least maxRetries times. If a deadLetterQueue is not defined, messages that repeatedly fail processing will eventually be discarded. If there is no queue with the specified name, it will be created automatically.maxConcurrency
: The maximum number of concurrent consumers allowed to run at once. Leaving this unset means that the number of invocations will scale to the currently supported maximum.
Cloudflare Worker Queue Compatibility
Producer
import { QueueProducer } from 'kafka.do'
export default {
fetch: (req, env, ctx) = {
QueueProducer('MY_QUEUE', env)
await env.MY_QUEUE.send({
url: req.url,
method: req.method,
headers: Object.fromEntries(req.headers),
}
return new Response('Sent!')
}
}
And you can send multiple at once:
const sendResultsToQueue = async (results: Array<any>, env: Environment) => {
const batch: MessageSendRequest[] = results.map((value) => ({
body: JSON.stringify(value),
}))
await env.queue.sendBatch(batch)
}
QueuesContentType
type QueuesContentType = 'text' | 'bytes' | 'json' | 'v8'
Consumer
import { QueueConsumer } from 'kafka.do'
export default QueueConsumer({
async queue(batch: MessageBatch, env: Environment, ctx: ExecutionContext): Promise<void> {
for (const message of batch.messages) {
console.log('Received', message)
}
},
})
MessageBatch
interface MessageBatch<Body = unknown> {
readonly queue: string
readonly messages: Message<Body>[]
ackAll(): void
retryAll(): void
}
Message
interface Message<Body = unknown> {
readonly id: string
readonly timestamp: Date
readonly body: Body
ack(): void
retry(): void
}
0.0.21
8 months ago
0.0.20
8 months ago
0.0.19
8 months ago
0.0.18
8 months ago
0.0.17
8 months ago
0.0.16
8 months ago
0.0.15
8 months ago
0.0.14
8 months ago
0.0.13
8 months ago
0.0.12
8 months ago
0.0.11
8 months ago
0.0.10
9 months ago
0.0.9
9 months ago
0.0.8
9 months ago
0.0.7
9 months ago
0.0.6
9 months ago
0.0.5
9 months ago
0.0.4
9 months ago
0.0.3
9 months ago
0.0.2
9 months ago
0.0.1
9 months ago