2.0.1 • Published 3 years ago

@thebestco/pubsub v2.0.1

Weekly downloads
-
License
MIT
Repository
github
Last release
3 years ago

PubSub is a simple promise-driven, websocket-powered messaging platform.

The purpose of the platfom is to enable instant messaging between clients, users and the server.

The platform has two components: the server and the client.

PubSub


To install it (for now):

git add git+https://c3e176e0fde947bc235315a72116a90b7134f7cc:x-oauth-basic@github.com/phaistos-networks/pubsub.git

Entities

PubSub is build around 3 distinct entities. Those are:

  • User A user can have many clients associated with him
  • Client A tab/connection for a user
  • Server The server

Check out the associated, native, events for those entities here.

You can subscribe to those events from the client using the subscribe method or handle them in the backend using the handler. See below for more on those.


Server

The server will be handling the incoming messages (actions) and either broadcast them to clients, push them to other users or / and to the server.

Creating a PubSub server is quite simple.

createServer(options)

  • options
    • port {Numnber} The port where the server is to listen
    • verifyClient {Function} A method to verify the client - check here for more.
    • handler {Function} See below
    • expressUse {Object} Optionally define an object (route => middleware) to use for established express serever, i.e peerjs
import createServer from '../server';
import handler from './handler';
import verifyClient from './verifyClient';

const server = createServer({
  port: 4001,
  [handler],
  [verifyClient],
});

Note that verifyClient will be called after an initial check if the query contains the userId is passed.

handler is a function that will receive an options object and will act upon them - and return a response if needed.

The options object will include

  • action {Object} The data passed to the server by the client (action object)
  • wsServer {Object} The reference to the websocket server
  • client {Object} The client from whom the message is sent
  • user {Object} The associated user to the client
  • users {Map} The reference to users struct
  • sendToClient {Function} Reference
  • broadcast {Function} Reference

Example:

export default ({ ws, action, users }) => {
  const { type } = action;

  switch (type) {
    case 'TEST_ASYNC_RESPONSE':
      return new Promise(resolve => setTimeout(() => resolve('yay!'), 1000));

    case 'DO_SOMETHING_ON_THE_SERVER':
      // imperative stuff here
      return true;

    case 'FETCH_RPC': {
      const { method, args } = action;
      return fetchRPC(method, ...args);
    }

    default:
      break;
  }
};

Each incoming message is marked with a unique messageId by the client and so it's easy to provide both optimistic updates but also return responses for given messages.

That said, the return value of a handled action will be passed back to the client directly and automatically.

If the return value is a promise, it will be passed once it's resolved.

server API

.disconnect

Disconnects the server.

.stats

Return an object containing handy information about users, clients and more.

Those stats are also available from the server's URI/stats, e.g, for bestprice https://pubsub.bestprice.gr:4001/stats

User model

  • clients {Set} Set of connected clients
  • activeClient {Object} The current active client for connected user
  • status
    • status {String} OFFLINE, ONLINE, AWAY
    • previousStatus {String}
    • onlineSinceTs {Number}
    • lastOnlineTs {Number}
    • awaySinceTs {Number}

Client

Creating a client is also easy.

createClient (options)

  • options
    • endPoint {String} The endpoint, e.g wss://pubsub.bestprice.gr:443
    • userId {String|Number} The unique user id, e.g the BestPrice user Id || guestId.
import createClient from 'pubsub';

const client = createClient({
  userId: window.USER.id,
   endPoint: 'wss://pubsub.bestprice.gr:443',
});

client API

.broadcast (action)

  • action {Object} An action message object (type, ...)

Will broadcast an action message to all other connected clients for user.

Returns a promise that is resolved when the action reaches the server - or rejected if not.

NOTE: If the promise is rejected, then the rejected promise will return an object (error, action, retry). The retry is a method you can call to retry.

THIS IS THE CASE FOR ALL SEND METHODS

 broadcast({
      type: 'SHORTLIST_UPDATE',
      state: stateToJSON(store.getState()),
    });

.sendToUser (action, to)

  • action {Object} An action object (type, ...)
  • to {Number|Array} Recipient(s) userIds

Will broadcast an action message to to.

Returns a promise that is resolved when the action reaches the server, or rejected if not.

 sendToUser({
      type: 'CHAT_MESSAGE',
      message: 'Hey dudes!',
    }, [44, 12]);

.sendToServer (action)

  • action {Object} An action object (type, ...)

Will sent an action payload to server.

This is a different case than the broadcast, sendTo since it does not involve messaging or broadcasting.

You should use this method to send and receive response from backend methods etc.

Note that you can also subscribe to this kind of events the way you do with message actions.

Returns a promise that is resolved when the action reaches the server, or rejected if not.

The argument passed to the resolved promise will be the response by the server, if any.

const stats = await send({
    type: 'GET_STATS',
});

.subscribe(type, callback)

  • action {String} The event type to subscribe to. Can be an message action or a system one.
  • to {Function} The callback to be added to the listener pool

Returns an ubsubscribe method that will be used to ubsubscribe the method from listener pool - the way it's done in Redux.

subscribe('SHORTLIST_UPDATE', ({ state }) => {
     console.log('new state!', state);
});
subscribe('STATUS_CHANGE', ({ status }) => {
    console.log('new status %s', status);
});

.getStatus()

Return the current user status.

Values are: CONNECTING, OFFLINE, AWAY, ONLINE

const status = getStatus();

.onStatusChange

A wrapper for a subscribe call to return the STATUS_CHANGE event.

client.onStatusChange = callback => subscribe('STATUS_CHANGE', callback),

.isConnected

Will return true if status is ONLINE or AWAY.


Develop

  1. Clone the repo
  2. yarn start
  3. yarn deploy when done

A test-client is in the repo, pointing to the local BestPrice PubSub server.

You can use this to test and develop both the PubSub itself and also the BestPrice PubSub server as well.


BestPrice PubHub server

For convenience, the BestPrice PubHub server is hosted in this repo.

Security related

The way we authenticate and verify requests sent to the server are:

  • By checking if the origin matches BestPrice origin.
  • By checking if the request was sent from a secure (https) environment.

Additional checks will be added in the near future.

Debugging

We use the marvelous pm2 in order to manage the server. Learn about pm2 here.

The server is active in staging, port 4001 at /home/system/Web/BestPrice/PubSub and it's restarted when new releases are pushed.

Some handy pm2 calls to check on the app

  • yarn pm2 monit
  • yarn pm2 ls bp-pubsub
  • yarn pm2 show bp-pubsub

Deployment

~Jenkins does the whole thing for you.~

  1. When push takes place to GitHub
  2. A hook triggers build on any available jenkins executor
  3. The build results are pushs to Staging
  4. ~pm2 restarts the server~ You have to manually restart the service which now runs from systemd systemctl start pubsub