1.0.1 • Published 7 months ago

pubsubplus-opentelemetry-js-integration v1.0.1

Weekly downloads
-
License
Apache-2.0
Repository
github
Last release
7 months ago

Solace PubSub+ OpenTelemetry Integration for Solace JS API

NPM Published Version Apache License

This module provides manual instrumentation for the solclientjs (Solclientjs) API.

Compatible with OpenTelemetry JS API and SDK 1.0+.

Installation

npm install --save pubsubplus-opentelemetry-js-integration

Supported Versions

  • OpenTelemetry SDK: >=0.5.5
  • Node: 16+

Usage

Solace PubSub+ OpenTelemetry Integration for Solace JS API allows the user to manually collect trace data and export them to the backend of choice, to give observability to distributed systems when working with the Solace PubSub+ broker.

                                       |

Manual instrumentation example: inject context with a setter on publish:

import * as solace from 'solclientjs';
import * as opentelemetry from '@opentelemetry/api';
import {
    context,
    propagation,
    trace,
} from '@opentelemetry/api';
import {
    CompositePropagator,
    W3CTraceContextPropagator,
    W3CBaggagePropagator,
} from '@opentelemetry/core';
import { BasicTracerProvider } from '@opentelemetry/sdk-trace-base';
import {
    SemanticAttributes,
    MessagingDestinationKindValues,
} from '@opentelemetry/semantic-conventions';
import { SolaceW3CTextMapSetter, Version } from 'pubsubplus-opentelemetry-js-integration';


// example of a manually instrumented publish function
const publishMessage = async function (
    solaceSession: solace.Session,
    queueName: string,
    solaceMessage: solace.Message|undefined
) {
    // check whether valid message present
    if(!solaceMessage) {
        console.log('No valid Solace message found!');
        return; // skip instrumentation if no message
    }

    // set global propagator
    // setup composite propagator
    const compositePropagator = new CompositePropagator({
        propagators: [
            new W3CBaggagePropagator(),
            new W3CTraceContextPropagator(),
        ],
    });
    propagation.setGlobalPropagator(compositePropagator);

    // Create a provider for activating and tracking spans
    const tracerProvider = new BasicTracerProvider();

    // Register the tracer
    tracerProvider.register();

    // Get a tracer
    const tracer = opentelemetry.trace.getTracer('solace-pubsub-publisher-test', '1.0.0');

    // uses currently active context
    let ctx = context.active();

    // create a new publish span
    const span = tracer.startSpan(queueName + ' send', { kind: opentelemetry.SpanKind.CLIENT }, ctx);

    // Create a new context from the current context which has the span "active"
    ctx = trace.setSpan(ctx, span);

    if(span.isRecording()) {
        span.setAttribute(SemanticAttributes.MESSAGING_SYSTEM, 'SolacePubSub+');
        span.setAttribute(SemanticAttributes.MESSAGING_DESTINATION, queueName);
        span.setAttribute(SemanticAttributes.MESSAGING_DESTINATION_KIND, MessagingDestinationKindValues.TOPIC);
        span.setAttribute(SemanticAttributes.MESSAGING_PROTOCOL, 'SMF');
        span.setAttribute(SemanticAttributes.MESSAGING_PROTOCOL_VERSION, '1.0');
        span.setAttribute('messaging.api.lang', 'nodejs');
        span.setAttribute(SemanticAttributes.MESSAGING_OPERATION, 'send');
    }

    // added baggage entries just for api demo purposes
    const baggage = propagation.createBaggage({
        "baggageEntry1": {
          value: 'test1',
          metadata: undefined,
        },
        "baggageEntry2": {
          value: 'test2',
          metadata: undefined,
      },
    });
    ctx = propagation.setBaggage(ctx, baggage);
 
    // new instance of the SolaceW3CTextMapSetter to be used for context/baggage injection
    const setter = new SolaceW3CTextMapSetter();

    // inject context using w3c context propagator and SolaceW3CTextMapSetter
    propagation.inject(ctx, solaceMessage, setter);

    try {
        console.log('Sending message to queue "' + queueName + '"...');
        solaceMessage.setDestination(solace.SolclientFactory.createDurableQueueDestination(queueName));
        solaceMessage.setDeliveryMode(solace.MessageDeliveryModeType.PERSISTENT);

        // dump message before sending to broker here
        // console.log('SOLACE MESSAGE DUMP: ', solaceMessage.dump(solace.MessageDumpFlag.MSGDUMP_FULL), '\n');

        // use publisher session to publish message to Queue on Solace broker
        solaceSession.send(solaceMessage);

        // Message published.
        span.setStatus({
            code: opentelemetry.SpanStatusCode.OK,
            message: 'Message Sent'
        });
 
    } catch (error: any) {
        // log publisher error
        span.recordException(error);
        span.setStatus({
            code: opentelemetry.SpanStatusCode.ERROR,
            message: String(error)
        });
    }
 
    span.end();
}

// publish instrumented message to the broker
(async function instrument() {
    // Initialize factory with the most recent API defaults
    const factoryProps: solace.SolclientFactoryProperties = new solace.SolclientFactoryProperties();
    factoryProps.profile = solace.SolclientFactoryProfiles.version10_5;
    solace.SolclientFactory.init(factoryProps);

    // enable logging to JavaScript console at TRACE level
    // NOTICE: works only with "solclientjs-debug.js"
    // also the "pubsubplus-opentelemetry-js-integration" module uses the log level set here
    solace.SolclientFactory.setLogLevel(solace.LogLevel.TRACE);

    console.log(
        '\n'
        + '\n>>>>>>>>>>>>[ Start Pubsubplus Opentelemetry JS Integration Version Logging]>>>>>>>>>>>>>>>>'
        + `\nVersion: ${Version.version}`
        + `\nDate: ${Version.date}`
        + `\nTarget: ${JSON.stringify(Version.target)}`
        + `\nFormattedDate: ${Version.formattedDate}`
        + `\nSummary: ${Version.summary}`
        + `\ntoString: ${Version.toString()}`
        + '\n>>>>>>>>>>>>[ End Pubsubplus Opentelemetry JS Integration Version Logging]>>>>>>>>>>>>>>>>>>'
        + '\n\n'
    );

    const pubSession: solace.Session|null = await createPubSession();

    // wait for about 4 sec for session to come up
    setTimeout(async () => {
        console.log('Session 4 seconds TimeOut ended, sending message now...');
        // if created session
        if(pubSession) {
            const message = await createMessage(); // create new Solace.Message instance here
            await publishMessage(pubSession, QUEUE_NAME_ON_BROKER, message); // publish with OTEL support
            // wait for about 5 sec, then close, destroy and clean up session resources
            setTimeout(async () => {
                await disconnectSession(pubSession); // disconnect session after 5 seconds 
            }, 5000);
        }
    }, 4000);

})();

Manual instrumentation example: Fetch context with getter for message processing on subscribe:

import * as solace from 'solclientjs';
import * as opentelemetry from '@opentelemetry/api';
import { propagation } from '@opentelemetry/api';
import {
    CompositePropagator,
    W3CTraceContextPropagator,
    W3CBaggagePropagator,
} from '@opentelemetry/core';
import { BasicTracerProvider } from '@opentelemetry/sdk-trace-base';
import {
    SemanticAttributes,
    MessagingOperationValues,
    MessagingDestinationKindValues,
} from '@opentelemetry/semantic-conventions';
import { SolaceW3CTextMapGetter, Version } from 'pubsubplus-opentelemetry-js-integration';


// example of a manually instrumented consume function
const consumeMessages = async function (
    solaceSession: solace.Session,
    queueName: string
) {
    // check whether valid session
    if(!solaceSession) {
        // failed to start the queue consumer because not connected to Solace message router.
        console.log('Failed to start the queue consumer because not connected to Solace router');
        return; // skip instrumentation if no valid session
    }

    try {
        // Create a message consumer
        const messageConsumer = solaceSession.createMessageConsumer({
            queueDescriptor: { name: queueName, type: solace.QueueType.QUEUE },
            acknowledgeMode: solace.MessageConsumerAcknowledgeMode.CLIENT, // Enabling Client ack
        });
        // Define message consumer event listeners:
        messageConsumer.on(solace.MessageConsumerEventName.UP, function () {
            // consumer is up
            console.log('consumer is up');
        });
        messageConsumer.on(solace.MessageConsumerEventName.CONNECT_FAILED_ERROR, function () {
            // connection failed, consumer is not connected
            console.log('connection failed, consumer is not connected');
        });
        messageConsumer.on(solace.MessageConsumerEventName.DOWN, function () {
            // consumer is now down
            console.log('consumer is now down');
        });
        messageConsumer.on(solace.MessageConsumerEventName.DOWN_ERROR, function () {
            // consumer is down due to error situation
            console.log('consumer is down due to error situation');
        });
        // register callback based message listener
        messageConsumer.on(solace.MessageConsumerEventName.MESSAGE, function (message: solace.Message) {
            // dump received message from broker here
            // console.log('RECEIVED SOLACE MESSAGE DUMP: ', message.dump(solace.MessageDumpFlag.MSGDUMP_FULL), '\n');

            // set as root context
            opentelemetry.ROOT_CONTEXT

            // Get a tracer
            const tracer = opentelemetry.trace.getTracer('example-basic-tracer-node', '1.0.0');

            // set global propagator
            // setup composite propagator
            const compositePropagator = new CompositePropagator({
                propagators: [
                    new W3CBaggagePropagator(),
                    new W3CTraceContextPropagator(),
                ],
            });
            propagation.setGlobalPropagator(compositePropagator);

            // Use to extract the span context and baggage from the Solace message
            const getter = new SolaceW3CTextMapGetter();

            // extract context using w3c context propagator and Solace getter
            const parentContext = propagation.extract(opentelemetry.ROOT_CONTEXT, message, getter);

            const baggage = propagation.getBaggage(parentContext);
            // dump received baggage extracted from Solace message
            // console.log('RECEIVED PROPAGATION BAGGAGE: ', baggage);

            // Create New 'process' Span which could have a linked remote parent
            let span = tracer.startSpan(
                MessagingOperationValues.PROCESS,
                {
                    kind: opentelemetry.SpanKind.CLIENT
                },
                parentContext || undefined
            );  

            const messageDestination = message.getDestination();
            if(messageDestination) {
                span.setAttribute(
                    SemanticAttributes.MESSAGING_DESTINATION,
                    (messageDestination as solace.Destination).getName()
                );
            }
            span.setAttribute(SemanticAttributes.MESSAGING_SYSTEM,'SolacePubSub+');
            span.setAttribute(
                SemanticAttributes.MESSAGING_DESTINATION_KIND,
                MessagingDestinationKindValues.QUEUE
            );
            span.setAttribute(SemanticAttributes.MESSAGING_PROTOCOL, 'SMF');
            span.setAttribute(SemanticAttributes.MESSAGING_PROTOCOL_VERSION, '1.0');
            span.setAttribute(SemanticAttributes.MESSAGING_OPERATION, 'process');

            try {
                // processMessage(message); // - this is a made up client method for message processing, it can throw an error

                // doMoreWork(message); // - this is another made up client method

                span.setStatus({
                    code: opentelemetry.SpanStatusCode.OK,
                    message: 'Message processed'
                });

                // Need to explicitly ack otherwise it will not be deleted from the message router
                message.acknowledge();

            }  catch (error: any) {
                // log processing error
                span.recordException(error);
                span.setStatus({
                    code: opentelemetry.SpanStatusCode.ERROR,
                    message: String(error)
                });
            }

            span.end();

        });

        // Connect the message consumer
        messageConsumer.connect();
    } catch (error: any) {
        // failed to connect
        console.log('Failed to connect: ', error);
    }
};

// consume instrumented message(s) from the broker
(async function instrument() {
    // Initialize factory with the most recent API defaults
    const factoryProps: solace.SolclientFactoryProperties = new solace.SolclientFactoryProperties();
    factoryProps.profile = solace.SolclientFactoryProfiles.version10_5;
    solace.SolclientFactory.init(factoryProps);

    // enable logging to JavaScript console at TRACE level
    // NOTICE: works only with "solclientjs-debug.js"
    // also the "pubsubplus-opentelemetry-js-integration" module uses the log level set here
    solace.SolclientFactory.setLogLevel(solace.LogLevel.TRACE);

    console.log(
        '\n'
        + '\n>>>>>>>>>>>>[ Start Pubsubplus Opentelemetry JS Integration Version Logging]>>>>>>>>>>>>>>>>'
        + `\nVersion: ${Version.version}`
        + `\nDate: ${Version.date}`
        + `\nTarget: ${JSON.stringify(Version.target)}`
        + `\nFormattedDate: ${Version.formattedDate}`
        + `\nSummary: ${Version.summary}`
        + `\ntoString: ${Version.toString()}`
        + '\n>>>>>>>>>>>>[ End Pubsubplus Opentelemetry JS Integration Version Logging]>>>>>>>>>>>>>>>>>>'
        + '\n\n'
    );

    const consumerSession: solace.Session|null = await createConsumerSession();
    // if created session
    if(consumerSession) {
        await consumeMessages(consumerSession, QUEUE_NAME_ON_BROKER);
        // wait for about 10 sec, then close, destroy and clean up session resources
        setTimeout(async () => {
            await disconnectSession(consumerSession); // disconnect session after 10 seconds
        }, 10000);
    }

})();
                                       |

Useful links

License

Apache 2.0 - See LICENSE for more information.