1.0.7 • Published 7 months ago

@team_seki/centrifugo-plugin v1.0.7

Weekly downloads
-
License
-
Repository
-
Last release
7 months ago

centrifugo-plugin

This library was generated with Nx.

Building

Run nx build centrifugo-plugin to build the library.

Running unit tests

Run nx test centrifugo-plugin to execute the unit tests via Jest.

How to use it

Server-side code

Your backend application should provide an endpoint to generate authentication tokens. In your service layer you should add a method for publishing messages to channels.

// [Controller.ts]

import { Body, Controller, Post } from '@nestjs/common';

import { Service } from './Service';

interface IPublish {
  channel: string;
  message: string;
}

@Controller('centrifuge')
export class DefaultController {
  constructor(private readonly service: Service) {}

  @Post('/token')
  // eslint-disable-next-line @typescript-eslint/no-explicit-any
  create(@Body() user:any) {
    return this.service.createToken(user);
  }

  // example
  @Post('/publish')
  // eslint-disable-next-line @typescript-eslint/no-explicit-any
  publish(@Body() data:IPublish) {
    return this.service.publish(data.channel, data.message);
  }
}
// [Service.ts]
import { Injectable, Logger } from '@nestjs/common';
import { CentrifugoAuthClient, CentrifugoClient, CentrifugoPlugin } from "@team_seki/centrifugo-plugin"

const prefix = '[centrifuge-service]'

interface IUser {
  id: string
  name: string
}

const TOKEN_EXPIRATION_IN_SECONDS = 60 * 60 * 3; // 3 hours

@Injectable()
export class Service {
  private authClient: CentrifugoAuthClient
  private client: CentrifugoClient

  constructor() {
    const centrifugoPlugin = new CentrifugoPlugin()

    this.authClient = centrifugoPlugin.getAuthClient()
    this.client = centrifugoPlugin.getClient()
  }

  /**
   * Create centrifuge token
   * @param user User data
   * @returns {Promise<UI.IJwtToken>}
   */
  // eslint-disable-next-line @typescript-eslint/no-explicit-any
  async createToken(user: IUser): Promise<any> {
    try {
      return await this.authClient.createToken({
          userId: user.id,
          extraInfo: {
            userName: user.name
          }
        }, TOKEN_EXPIRATION_IN_SECONDS
      )
    } catch (error) {
      const message = 'Unexpected error creating centrifuge token'
      Logger.debug(`${prefix} ${message}`, { user })
      Logger.error(`${prefix} ${message}`, error)

      throw error
    }
  }

  // publish example
  async publish(channel: string, message:string): Promise<void> {
    this.client.publish(channel, message)
  }
}

export default Service

Client-side code using websocket

private request = axios.create({
  baseURL: 'http://localhost:8080',
  timeout: 10000,
});

// eslint-disable-next-line @typescript-eslint/no-explicit-any
private intervalId: any;

override state: IState = {
  users: []
};

override async componentDidMount() {
  const wsEndpoint = "ws://localhost:8000/connection/websocket";
  const centrifuge = new Centrifuge(wsEndpoint, {
    getToken: this.getToken // function for getting/refreshing the token
  });

  centrifuge.on('connecting', function (ctx) {
    console.log(`connecting: ${ctx.code}, ${ctx.reason}`);
  }).on('connected', function (ctx) {
    console.log(`connected over ${ctx.transport}`);
  }).on('disconnected', function (ctx) {
    console.log(`disconnected: ${ctx.code}, ${ctx.reason}`);
  }).connect();

  // subscribe to channel
  const sub = centrifuge.newSubscription("users");

  sub.on('publication', (ctx) => {
    this.setState(({ users:newUser }) => {
      newUser.push(ctx.data);
      console.log(`receiving message: ${JSON.stringify(ctx.data)}`)
      return { users: newUser }
    })
  }).on('subscribing', function (ctx) {
    console.log(`subscribing: ${ctx.code}, ${ctx.reason}`);
  }).on('subscribed', function (ctx) {
    console.log('subscribed', ctx);
  }).on('unsubscribed', function (ctx) {
    console.log(`unsubscribed: ${ctx.code}, ${ctx.reason}`);
  }).subscribe();
  this.intervalId = setInterval(this.publish, 5000);
}

// get an authentication token
getToken = async ():Promise<string> => {
  const response = await this.request.post('/centrifuge/token',{ id: 'admin', name: '123'});
  return response.data.access_token;
}

// publish example
publish = async () => {
  if(this.state.users.length < 10) {
    await this.request.post('/centrifuge/publish', {
      channel: 'users',
      message: `user_${new Date().getTime()}`
    });
  } else {
    clearInterval(this.intervalId);
  }
}

Client-side code using SSE

import React from 'react'
import axios from 'axios'

interface IProps {}
interface IState {}

export default class Main extends React.Component<IProps, IState> {

  centrifugoSseUrl = 'http://localhost:8000/connection/uni_sse'

  backendUrl = 'http://localhost:8080'

  override async componentDidMount() {
    const url = new URL(this.centrifugoSseUrl)
    const token = await this.getToken()

    url.searchParams.append("cf_connect", JSON.stringify({
      'token': token,
      'subs': {
        'notifications': {}, // subscription to 'notifications' channel
        'tasks': {} // subscription to 'tasks' channel
      }
    }))

    const eventSource = new EventSource(url)

    eventSource.onopen = () => {
      console.log('onopen')
    }

    eventSource.onmessage = (event) => {
      console.log(`receiving data: ${event.data}`)
    }

    eventSource.onerror = (event) => {
      console.log(`Receiving error: ${JSON.stringify(event)}`)
      // TODO: reconnect when the token expires
    }
  }

  // get an authentication token
  getToken = async (): Promise<string> => {
    const request = axios.create({
      baseURL: this.backendUrl,
      timeout: 5000
    })
    const user = { id: '123', name: 'admin'}
    const response = await request.post('/centrifuge/token', user)
    return response.data.access_token
  }

  override render() {
    return (
      ...
    )
  }
}