1.0.1 • Published 5 years ago

schema-registry-avro v1.0.1

Weekly downloads
4
License
ISC
Repository
github
Last release
5 years ago

Schema Registry

What is a Schema Registry ?

According to the confluent doc (https://docs.confluent.io/current/schema-registry/index.html), a schema registry will store a versioned history of your avro schema.

How it's work ?

You should have a least a producer and a consumer using kafka. When you produce a message, you should encode it using avro.

When you produce a message using an avro schema for the first time it should be post to the subject in the schema registry. You'll get an uniq id if this is your first schema for this subject or if the new schema is compatible with the old one (see https://docs.confluent.io/current/schema-registry/avro.html#compatibility-types)

When encoding your message especially with a schema registry, you'll need to insert before it a byte called magic byte and the id of the schema from the schema registry as an integer (4 bytes). You'll have :

|------------|------------|------------|------------|------------| ...................................

| magic byte | ------------------ schema id ------------------- | your encoded message

When consuming the message coming from kafka in your consumer, you'll need to read the schema id first. For example you get the id 12.

Then you can get the schema which encode the message by it's id from the schema registry.

All routes to the schema registry are documented in the confluent doc : https://docs.confluent.io/current/schema-registry/develop/api.html

Environment config

Access to the schema registry using Basic HTTP auth

  • SCHEMA_REGISTRY_URL: Url of the schema registry. Default : http://schema-registry:8081
  • SCHEMA_REGISTRY_USERNAME: Username for Basic HTTP auth. Default : none
  • SCHEMA_REGISTRY_PASSWORD: Password for Basic HTTP auth. Default : none

How to use it ?

Decode your message

From your consumer :

SchemaRegistry = require('schema-registry-avro');

const onMessage = async ({ topic, partition, message }) => {
  try {
    const mess = await SchemaRegistry.decode(message.value)
  } catch(e) {
    console.log(e)
  }