1.5.2 • Published 12 months ago

@targetprocess/balancer-core v1.5.2

Weekly downloads
171
License
ISC
Repository
-
Last release
12 months ago

Balancer core

Usage sample

import {
  MessageBalancer,
  MessageCache,
  MessageStorage,
  Db,
  migrateDb,
  createMethodPerLevelLoggerAdapter,
  DefaultMessageBalancerDiagnosticsAdapter,
  DefaultMessageCacheDiagnosticsAdapter,
  DefaultMessageStorageDiagnosticsAdapter,
  ProcessMessageBatchResultItem
} from '@targetprocess/balancer-core'
import {makeLogger} from 'loggerism'
import {Pool} from 'pg'
import * as promClient from 'prom-client'

type MessageBalancerA = MessageBalancer<{data?: string; retry?: number}>

type MessageBalancerB = MessageBalancer<{data?: string}>

async function main() {
  const [balancerA, balancerB] = await createAndInitBalancers()

  await balancerA.storeMessage({
    partitionKey: 'partition#1',
    content: Buffer.alloc(128),
    properties: {data: 'some arbitrary data'}
  })

  await balancerB.storeMessage({
    partitionKey: 'partition#2',
    content: Buffer.alloc(128),
    properties: {data: 'some arbitrary data'}
  })

  await balancerA.processNextMessage(async message => {
    try {
      const {partitionGroup, partitionKey} = message
      console.log(`Processed message from partition "${partitionGroup}/${partitionKey}"`)
      return {type: 'Ok'}
    } catch {
      const properties = {
        ...message.properties,
        retry: (message.properties?.retry || 0) + 1
      }
      // Push message back with updated properties
      return {type: 'Requeue', update: {properties}}
    }
  })

  // Using batch API
  balancerB.processNextMessageBatch(
    async messages => {
      const results = [] as ProcessMessageBatchResultItem<{data?: string}>[]

      for (const message of messages) {
        try {
          const {partitionGroup, partitionKey} = message
          console.log(`Processed message from partition "${partitionGroup}/${partitionKey}"`)
          results.push({messageId: message.messageId, type: 'Ok'})
        } catch {
          // Push message back with no properties update
          results.push({messageId: message.messageId, type: 'Requeue'})
        }
      }

      return results
    },
    {maxBatchSize: 10}
  )
}

async function createAndInitBalancers(): Promise<[MessageBalancerA, MessageBalancerB]> {
  const pool = new Pool({
    connectionString: process.env.POSTGRES_CONNECTION_STRING,
    max: process.env.POSTGRES_POOL_MAX
  })

  pool.on('error', error => {
    // Handle error here
    console.error(error)
  })

  await migrateDb({pool})

  const logger = createMethodPerLevelLoggerAdapter(
    makeLogger({
      logLevel: process.env.LOG_LEVEL,
      handleExceptions: false
    })
  )

  const db = new Db({pool})
  const storage = new MessageStorage({
    db,
    diagnostics: new DefaultMessageStorageDiagnosticsAdapter({
      logger,
      createMessagesDurationMetric: summaryMetric('create_messages_duration_in_ms'),
      updateMessagesDurationMetric: summaryMetric('update_messages_duration_in_ms'),
      removeMessagesDurationMetric: summaryMetric('remove_messages_duration_in_ms'),
      readMessagesDurationMetric: summaryMetric('read_messages_duration_in_ms')
    })
  })
  const cache = new MessageCache({
    maxMessageSize: Number(process.env.MESSAGE_CACHE_MAX_MESSAGE_SIZE),
    maxSize: Number(process.env.MESSAGE_CACHE_MAX_SIZE),
    diagnostics: new DefaultMessageCacheDiagnosticsAdapter({
      logger,
      messageCountMetric: gaugeMetric('cache_message_count'),
      messageSizeMetric: gaugeMetric('cache_message_size')
    })
  })
  const balancerA = new MessageBalancer<{data?: string; retry?: number}>({
    partitionGroup: 'A',
    lockPartition: true,
    storage,
    cache,
    diagnostics: new DefaultMessageBalancerDiagnosticsAdapter({
      logger,
      endToEndMessageProcessingDurationMetric: summaryMetric('balancer_a_end_to_end_processing_duration_in_ms'),
      centrifugePartitionCountMetric: gaugeMetric('balancer_a_centrifuge_partition_count'),
      centrifugeMessageCountMetric: gaugeMetric('balancer_a_centrifuge_message_count')
    })
  })
  const balancerB = new MessageBalancer<{data?: string}>({
    partitionGroup: 'B',
    partitionSizeLimit: Number(process.env.PARTITION_SIZE_LIMIT),
    lockPartition: true,
    storage,
    cache,
    diagnostics: new DefaultMessageBalancerDiagnosticsAdapter({
      logger,
      endToEndMessageProcessingDurationMetric: summaryMetric('balancer_b_end_to_end_processing_duration_in_ms'),
      centrifugePartitionCountMetric: gaugeMetric('balancer_b_centrifuge_partition_count'),
      centrifugeMessageCountMetric: gaugeMetric('balancer_b_centrifuge_message_count')
    })
  })

  await balancerA.init()
  await balancerB.init()

  return [balancerA, balancerA]
}

function summaryMetric(name: string) {
  return new promClient.Summary({
    name,
    help: 'Write it yourself',
    percentiles: [0.1, 0.5, 0.9, 0.99],
    maxAgeSeconds: 10 * 60,
    ageBuckets: 10
  })
}

function gaugeMetric(name: string) {
  return new promClient.Gauge({
    name,
    help: 'Write it yourself'
  })
}

main()
1.5.2

12 months ago

1.5.1

2 years ago

1.5.0

2 years ago

1.4.0

2 years ago

1.3.1

2 years ago

1.2.0

2 years ago

1.3.0

2 years ago

1.1.1

2 years ago

1.1.0

2 years ago

1.0.1

2 years ago

1.0.0

2 years ago

1.1.2

2 years ago

0.7.0

3 years ago

0.6.0

4 years ago

0.5.2

4 years ago

0.5.1

4 years ago

0.5.0

4 years ago

0.4.1

4 years ago

0.4.0

4 years ago

0.3.3

4 years ago

0.3.2

4 years ago

0.3.1

4 years ago

0.3.0

4 years ago

0.2.0

4 years ago

0.0.10

4 years ago

0.0.11

4 years ago

0.1.0

4 years ago

0.0.9

4 years ago

0.0.8

4 years ago

0.0.7

4 years ago

0.0.6

4 years ago

0.0.5

4 years ago

0.0.4

4 years ago

0.0.3

4 years ago

0.0.2

4 years ago

0.0.1

4 years ago