@coco-sheng/websocket-server v1.0.6
快速开始
websocket-server
是一个简单易用的websocket服务器,基于ws,遵循JSON-RPC 2.0协议。
安装
npm install --save @coco-sheng/websocket-server
示例代码
typing.d.ts
interface SocketAttr {
userId: string;
role: string;
type: string
token: string;
}
server.ts
import { WebsocketServer } from '@coco-sheng/websocket-server';
const port = 3403;
export default new WebsocketServer<SocketAttr>({ port });
start.ts
import server from './server';
server.start();
SocketAttr
是socket连接上的属性,详见socket属性部分。
method
定义method(类似于定义一个api)
method.ts
import server from './server';
server.register('hello', () => {
return 'hello world!';
});
你也可以同时定义多个method
import { MethodFn } from '@coco-sheng/websocket-server';
import server from './server';
server.register({
hello1() {
return 'hello world1!';
},
hello2() {
return {
result: 'hello world2!'
};
},
hello3(){
console.log('hello world3!');
}
});
const testMethod: MethodFn<SocketAttr> = () => {
console.log('test');
};
server.register({ testMethod });
client.ts
import Websocket from 'ws';
const port = 3403;
const client = new Websocket(`ws://localhost:${port}`);
client.on('open',async ()=>{
const result1 = await new Promise(resolve => {
client.send(JSON.stringify({ method: 'hello', id: 1, params: [], jsonrpc: '2.0' }));
client.once('message', data => resolve(JSON.parse(data.toString())));
});
console.log(result1);
// {
// jsonrpc: '2.0',
// id: 1,
// method: 'hello',
// result: 'hello world!'
// }
const result2 = await new Promise(resolve => {
client.send(JSON.stringify({ method: 'hello1', id: 2, params: [], jsonrpc: '2.0' }));
client.once('message', data => resolve(JSON.parse(data.toString())));
});
console.log(result2);
// {
// jsonrpc: '2.0',
// id: 2,
// method: 'hello1',
// result: 'hello world1!'
// }
const result3 = await new Promise(resolve => {
client.send(JSON.stringify({ method: 'hello2', id: 3, params: [], jsonrpc: '2.0' }));
client.once('message', data => resolve(JSON.parse(data.toString())));
});
console.log(result3);
// {
// jsonrpc: '2.0',
// id: 3,
// method: 'hello2',
// result: {
// result: 'hello world2!'
// }
// }
const result4 = await new Promise(resolve => {
client.send(JSON.stringify({ method: 'hello3', id: 4, params: [], jsonrpc: '2.0' }));
client.once('message', data => resolve(JSON.parse(data.toString())));
});
console.log(result4);
// {
// jsonrpc: '2.0',
// id: 4,
// method: 'hello3',
// result: ''
// }
const result5 = await new Promise(resolve => {
client.send(JSON.stringify({ method: 'testMethod', id: 5, params: [], jsonrpc: '2.0' }));
client.once('message', data => resolve(JSON.parse(data.toString())));
});
console.log(result5);
// {
// jsonrpc: '2.0',
// id: 5,
// method: 'testMethod',
// result: ''
// }
});
主动向客户端发送消息
server.register('hello',(_params, socket)=>{
// _params 为method的请求参数
socket.sendout({
method: 'test'
result: 'pending hello'
});
// 或者
socket.send(JSON.stringify({
jsonrpc: '2.0',
method: 'test',
result: 'pending hello'
}));
});
sendout
和send
的区别在于sendout
会把要发送的数据转换为符合jsonrpc2.0
规范的格式,同时也会根据数据压缩配置将数据进行压缩处理;send
则需要手动组装jsonrpc2.0
规范的数据,且不会根据配置压缩数据。
内置method
系统内置了两个method:connect
,ping
client.on('open',async ()=>{
const result1 = await new Promise(resolve => {
client.send(JSON.stringify({ method: 'connect', id: 1, params: [], jsonrpc: '2.0' }));
client.once('message', data => resolve(JSON.parse(data.toString())));
});
console.log(result1);
// {
// jsonrpc: '2.0',
// id: 1,
// method: 'connect',
// result: {
// msg: 'connected',
// session: 'xxxxxxxxxx'
// }
// }
const result2 = await new Promise(resolve => {
client.send(JSON.stringify({ method: 'ping', id: 2, params: [], jsonrpc: '2.0' }));
client.once('message', data => resolve(JSON.parse(data.toString())));
});
console.log(result2);
// {
// jsonrpc: '2.0',
// id: 2,
// method: 'ping',
// result: 'pong'
// }
});
注意:要中断method函数的执行,可以直接
return
,或者throw
一个错误,throw抛出的错误会被系统捕获,并封装为一个符合JSONRPC-2.0
规范的错误数据返回,你也可以定义全局的错误捕获回调函数,来自行处理throw
抛出的错误。
中间件
中间件是一个在请求到达method之前对请求的数据和业务进行处理的函数。该函数如果返回一个Object,则会将该Object的属性挂载到当前socket连接的属性(后续业务可获取)上;返回其它数据结构则不做任何处理,后续业务逻辑也无法获取该返回值。可以定义针对全部method的一个会多个中间件,也可以为某个method定义一个或多个中间件,中间件的执行顺序为中间件定义的代码逻辑顺序。
middleware.ts,定义全局的中间件:
import { MiddlewareFn } from '@coco-sheng/websocket-server';
server.use(()=>{
console.log('this is a middleware for all methods');
});
const mdw1: MiddlewareFn<SocketAttr> = (_params, socket, method) => {
if (method !== 'login' && !socket.getAttr('userId')) {
throw Error('you must login to do this!');
}
console.log('middleware 1 for all methods');
};
const mdw2: MiddlewareFn<SocketAttr> = () =>{
// ...
return { type: '1' };
};
server.use(mdw1, mdw2, ...);
使用return
结束中间件的执行,只是结束了当前中间件的执行,后续的中间件及method依然会执行,要想在中间件中直接结束整个method的业务逻辑,只能通过throw
,throw抛出错误的处理逻辑同method,详见method部分。
定义针对某个method的中间件。
import { MiddlewareFn } from '@coco-sheng/websocket-server';
server.use('login', () => {
console.log('to login method');
// ...
return { role: 'admin' };
// 将会为当前socket设置一个role的属性,值为admin
});
const checkLoginToken: MiddlewareFn<SocketAttr> = (params, socket) => {
// ...
};
const checkPermission: MiddlewareFn<SocketAttr> = () => {
// ...
throw new Error('you are no permission');
};
server.use('login', checkLoginToken, checkPermission);
使用中间件函数的socket参数,也可以在中间件里向客户端主动发送消息。
socket属性
socket的属性即挂在到当前socket连接上的数据。
属性的设置
属性的设置有两种方式,一种是在中间件函数里返回一个Object(详见中间件部分),另一种是调用socket对象本身的属性操作函数,如下:
// 在method中设置
server.register('hello', (_params, socket) => {
socket.setAttr('key','value');
socket.setAttr({
user: '....',
role: 'admin'
});
});
// 在中间件中设置
server.use((_params, socket, method)=>{
console.log('this is a middleware for all methods');
const user = {...};
if(method === 'login' && user.role === 'admin'){
socket.setAttr('role', 'admin');
socket.setAttr({
user: '....',
type: 'password-login'
});
}
});
属性的获取
获取全部属性
const attr = socket.getAttr();
// 全部属性
// {
// ...
// }
获取某个属性
const value = socket.getAttr('key');
// 该属性值
// ...
获取某些属性
const values = socket.getAttr('key1', 'key2', ...);
// key1,key2,...的属性
// {
// key1: ...,
// key2: ...,
// ...
// }
server
获取连接
每一个客户端socket连接都有一个id(这个id通常可以作为socket连接的session标识来使用),是一个随机生成的字符串,在method和中间件中都可以通过socket获取,以method为例:
server.register('hello', (_params, socket) => {
console.log(socket.id);
});
当你知道某个socket连接的id时,可以通过server直接拿到对应的socket对象:
const socketId = 'xxxxxxxxx';
const socket = server.getSocket(socketId);
socket?.sendout({
id: new Date().getTime(),
method: 'notice',
result: 'noticed!'
});
server中所有的socket连接,可通过server.clients
来获取,也可以通过自定义条件的方式来获取某些socket连接:
const socket = server.getSockets((attr:SocketAttr)=>{
if(attr.role === 'admin'){
return true;
}
});
server.getSockets
接受一个回调函数,该函数的入参为某个socket的所有属性,需要返回一个Boolean
值来判断某个soeket连接是否为需要获取的socket对象。
获取连接属性
获取单个socket连接的属性,实际调用的是socket属性里属性的获取函数。
const socketId = 'xxxxxxx';
// 获取全部属性
const attr = server.getSocketAttr(socketId);
// 获取某个属性的值
const value = server.getSocketAttr(socketId, 'key1');
// 获取某些属性的值
const attr = server.getSocketAttr(socketId, 'key1', 'key2', ...);
如果要根据条件筛选socket并获取连接的属性,可以调用server.getSocketsAttr
,如下获取某些socket连接的全部属性值:
const attrs = server.getSocketsAttr((attr: SocketAttr) => {
if(attr.role === 'admin'){
return true;
}
});
同server.getSocketAttr
一样,你也可以获取部分属性值:
// 获取某一个属性的值
const values = server.getSocketsAttr((attr: SocketAttr) => {
if(attr.role === 'admin'){
return true;
}
}, 'key1');
// 获取某几个属性的值
const values = server.getSocketsAttr((attr: SocketAttr) => {
if(attr.role === 'admin'){
return true;
}
}, 'key1', 'key2', ...);
设置连接属性
你也可以通过server
设置某个socket连接的属性值:
const socketId = 'xxxxxxx';
server.setSocketAttr(socketId, {
key1: '1',
key2: '2'
});
回调
server.online
有客户端连接成功的回调函数,可传入多个,按顺序执行
import { OnlineCallbackFn } from '@coco-sheng/websocket-server';
const online1: OnlineCallbackFn = () => {
// ...
};
const online2: OnlineCallbackFn = () => {
// ...
};
server.online(online1, online2);
server.offline
有客户端断开连接时的回调函数,可传入多个,按顺序执行
import { OfflineCallbackFn } from '@coco-sheng/websocket-server';
const offline1: OfflineCallbackFn<SocketAttr> = () => { };
const offline2: OfflineCallbackFn<SocketAttr> = () => { };
server.offline(offline1, offline2);
server.error
捕获到全局错误时的回调,系统内置了一个全局错误处理逻辑,当捕获到错误时,会发送一条符合jsonrpc2.0
规范的错误信息到客户端(客户端已下线除外),当配置了log(详情见扩展配置),该错误信息也会打印到控制台。如果设置server.error
回调函数,则不会执行系统内置的错误逻辑,但是依然会根据log配置打印错误信息,但是server.error
设置的错误回调函数内部的错误并不会再次捕获处理。
import { ErrorCallbackFn } from '@coco-sheng/websocket-server';
const error1: ErrorCallbackFn<SocketAttr> = () => { };
const error2: ErrorCallbackFn<SocketAttr> = () => { };
server.error(error1, error2);
关于ping
服务器端不建议主动去ping客户端,以此对连接进行保活,这样做会消耗服务器性能,所以系统内置了一个ping
的method,当客户端发送ping
的method时,系统会回复一条数据,详见内置method。当然,你也可以通过其它方式实现ping来对连接保活。
扩展配置
new WebsocketServer(config, options);
[options.log]
:Boolean | Function
,默认false
关闭日志打印,当为true
时,将采用内置的log4js
日志配置打印日志;如果为一个函数,则需要返回一个Logger
对象,系统的日志将采用该对象打印。[options.compression]
:zlib
,默认undefined
。当为zlib
时,将对服务器发送到客户端的数据先进行zlib压缩,再发送。
import { WebsocketServer } from '@coco-sheng/websocket-server';
const port = 3403;
export default new WebsocketServer<SocketAttr>({ port }, {
log: () => console,
compression: 'zlib'
});
客户端解压缩示例:
client.on('open',async ()=>{
const result = await new Promise(resolve => {
client.send(JSON.stringify({ method: 'connect', id: 1, params: [], jsonrpc: '2.0' }));
client.once('message', data => resolve(JSON.parse(zlib.inflateSync(data as Buffer).toString())));
});
console.log(result);
// {
// jsonrpc: '2.0',
// id: 2,
// method: 'ping',
// result: 'pong'
// }
});
其它
关于性能
该工程经过了实际项目的检测,在1核CPU1G内存的设备上,能同时维持2万个客户端连接,qps在20到30(根据method业务逻辑的复杂性而定)
暂无其它。