0.0.6 • Published 5 years ago

c4-mq v0.0.6

Weekly downloads
37
License
-
Repository
github
Last release
5 years ago

C4MQ是基于amqp模块构建的MQ操作类(模块),并提供了更高级的封装形式:

  • 提供Publisher和Subscriber定义,建立在Exchange和Queue的基础上;
  • 提供注解方式定义消息处理方法;
  • 支持Subscriber与Publisher之间的自由绑定;
  • Subscriber支持对预读取的消息进行排队处理,确保处理方法的FIFO执行

快速上手示例,参考./src/main.ts

使用MQHandler注解对方法进行标记,提供消息处理方法;

  • 注解项包含:

      {
          PublisherName ?: string,        // Exchange或Publisher的名字
          RoutingKey ?: string,           // routingkey
          MsgType : string                // Message的类型
      }
  • 使用MsgBody注解对消息体中属性进行标记,用于参数绑定;

    • 注解项包含:
    {
      value : string          // 需要绑定的属性名
    }
  • 具体示例参考./src/MQHandlers/Hello.ts

也可以声明对象的方式(JS环境下不能使用注解时),提供消息处理方法:

  • 对象包含:

    {
      publisherName: "要绑定得Exchange(Publisher)名",
      routingKey: "route key,现在支持 * 和 # 通配符",
      subscribeOption: {
        ack: true,        // true为手动ack,建议使用
        prefetchCount: 6  // 预读取数量(提高读取性能),使用手动ack时,不会对可靠性造成影响
      },
      CBs: {  // 消息处理函数
        “msgType 的值,与要处理的msgType值一一对应”: async function(msg: any,
          headers: any,
          deliveryInfo: any,
          ack: any) {
          console.log(msg);
          return true;
          // 可选的返回值有true,false和null
          // true代表消息已经正确处理,rabbitmq可以将该消息删除
          // false表示处理消息时出现错误,且订阅端希望抛弃这条消息(rabbitmq也会将该消息删除)
          // null表示处理消息时出现错误,但是订阅端希望消息得以保留(rabbitmq会将该消息重新入队)
        }
      }
    }
  • 具体示例参考./src/MQHandlers/TestTopic.ts

注意:注解方式和对象方式声明的消息处理方法不能配置时同时加载,加载注解方式声明的handler时,C4MQHelper的配置中handlerType为"standard",使用对象方式声明时,handlerType为undefined

  • C4MQ

    • 说明:用于连接MQ的类,对应RabbitMQ中Channel
    • 路径:./src/C4MQ.ts
    • 成员变量:

      • m_Channel,AMQPClient
      • m_Logger,日志对象(可使用console,推荐使用c4logger)
    • 成员方法:

      • init
      /**
       * 初始化,用于配置Channel和连接MQ
       * @param config ConnectionOptions
       * @param logger 日志对象
       */
      async init(config : ConnectionOptions, logger ?: any)
      • disconnect
      /**
       * 断开连接
       */
      async disconnect()
      • getChannel
      /**
       * 获取mq的Channel对象
       */
      getChannel()
      • isInit
      /**
       * 是否初始化
       */
      isInit()
  • C4Exchange

    • 说明:对RabbitMQ的Exchange的封装
    • 路径:./src/C4Exchange.ts
    • 成员变量:

      • m_Channel,AMQPClient,连接
      • m_Exchange,AMQPExchange
      • m_Name,名字
      • m_Logger,日志对象
    • 成员方法:

      • declared
      /**
       * 声明Exchange
       * @param Conn C4MQ,MQ连接
       * @param name Exchange名
       * @param options ExchangeOptions,配置属性
       * @param logger 日志对象
       */
      async declared(Conn : C4MQ, name : string, options : ExchangeOptions, logger ?: any)
      • publish
      /**
       * 发布消息
       * @param routingKey routingKey
       * @param message message
       * @param options ExchangePublishOptions,配置项
       */
      async publish(routingKey : string, message : Buffer | {}, options : ExchangePublishOptions)
      • destroy
      /**
       * 销毁
       * @param ifUnused 是否在不再使用时销毁
       */
      destroy(ifUnused = true)
      • bindExchange
      /**
       * 绑定到另一个Exchange
       * @param srcExchangeName exchange的名字
       * @param routingKey routingkey
       */
      async bindExchange(srcExchangeName : string, routingKey : string)
      • unbindExchange
      /**
       * 解除绑定
       * @param srcExchangeName exchange名
       * @param routingKey routingKey
       */
      async unbindExchange(srcExchangeName : string, routingKey : string)
      • bindHeaders
      /**
       * 绑定Header
       * @param exchangeName exchange名
       * @param routingKey routingKey
       */
      async bindHeaders(exchangeName : string, routingKey : string)
      • getExchagneInstance
      /**
       * 获取Exchange的实例
       */
      getExchagneInstance()
      • isInit
      /**
       * 是否初始化
       */
      isInit()
      • getName
      /**
       * 获取Exchange名
       */
      getName()
  • C4Queue

    • 说明:对应RabbitMQ的Queue
    • 路径:./src/C4Queue.ts
    • 成员变量:

      • m_Channel,AMQPClient,MQ连接
      • m_Queue,AMQPQueue
      • m_Name,名字
      • m_ConsumerTag,自定义标签
      • m_MsgQueue,用于预取消息的队列
      • m_Logger,日志对象(可使用console,推荐使用c4logger)
    • 成员方法:

      • declared
      /**
       * 声明
       * @param Conn 连接
       * @param name 名字
       * @param options QueueOptions
       * @param logger 日志
       */
      async declared(Conn : C4MQ, name : string, options : QueueOptions, logger ?: any)
      • desroy
      /**
       * 销毁
       * @param options {
       *  ifUnused : boolean,     // 是否在不用后销毁
       *  isEmpty : boolean,      // 是否在空后后销毁
       * }
       */
      desroy(options : { ifUnused : boolean, isEmpty : boolean})
      • bindExchange
      /**
       * 绑定Excahnge
       * @param exchangeName excahnge的名字
       * @param routingKey routingKey
       */
      async bindExchange(exchangeName : string, routingKey : string)
      • unbindExchange
      /**
       * 解绑
       * @param exchangeName exchange的名字
       * @param routingKey routingKey
       */
      unbindExchange(exchangeName : string, routingKey : string)
      • bindHeaders
      /**
       * 绑定Headers
       * @param exchangeName excahnge名字
       * @param routingKey routingKey
       */
      bindHeaders(exchangeName : string, routingKey : string)
      • unbindHeaders
      /**
       * 解绑Headers
       * @param exchangeName exchange名
       * @param routingKey routingKeys
       */
      unbindHeaders(exchangeName : string, routingKey : string)
      • subscribe
      /**
       * 订阅消息
       * @param options 订阅配置
       * @param CB 消息处理方法,如果不设置,则采用内部机制
       * 
       */
      async subscribe(options : SubscribeOptions, CB ?: SubscribeCallback)
      • unsubscribe
      /**
       * 取消订阅
       */
      unsubscribe()
      • peekMsg
      /**
       * 去消息并处理
       * @param CB 消息处理方法
       * 订阅消息时没有设置处理方法,则会进行内部处理,然后使用该方法进行单个消息的处理
       * CB返回true,则消息正常ack
       * CB返回false,则消息丢弃不会重排
       * CB抛异常,则消息重排
       */
      async peekMsg(CB : (message : any,
          headers : {[key : string] : any},
          deliveryInfo : DeliveryInfo,
          ack : Ack) => Promise<boolean> | boolean)
      • getQueueInstance
      /**
       * 获取Queue实例
       */
      getQueueInstance()
      • isInit
      /**
       * 是否已经初始化
       */
      isInit()
      • getName
      /**
       * 获取Queue名
       */
      getName()
  • C4Publisher

    • 说明:对C4Exchange的封装,主要发送消息;
    • 路径:./src/C4Publisher.ts
    • 成员变量:

      • m_Exchange,C4Exchange;
      • m_PublishOption,ExchangePublishOptions;
      • m_RoutingKey,routingkey;
      • m_Logger,日志对象
    • 成员方法:

      • init
      /**
       * 初始化
       * @param Conn C4MQ,MQ连接
       * @param option C4PublisherOption
       * @param logger 日志对象
       */
      async init(Conn : C4MQ, option : C4PublisherOption, logger ?: any)
      • publish
      /**
       * 发布消息
       * @param message 消息
       * @param routingKey routingKey
       * @param option ExchangePublishOptions
       */
       async publish(message : {} | Buffer, routingKey ?: string, option ?: ExchangePublishOptions)
      • isInit
      /**
       * 是否初始化
       */
      isInit()
      • getName
      /**
       * 获取Exchange的名字
       */
      getName()
  • C4Subscriber

  • 说明:对C4Queue的封装,支持注解添加消息处理方法;
  • 路径:./src/C4Subscriber.ts
  • 成员变量:

    • m_Queue,C4Queue;
    • m_DefaultPublisher,默认绑定的Publisher名;
    • m_SubscribeOptions,订阅配置项;
    • m_Logger,日志对象;
    • m_CBs,消息处理方法;
  • 成员方法:

    • init
    /**
     * 初始化
     * @param Conn C4MQ连接,MQ连接
     * @param option C4SubscriberOption
     * @param logger 日志对象
     */
    async init(Conn : C4MQ, option : C4SubscriberOption, logger ?: any)
    • addSubscribe
    /**
     * 添加订阅配置
     * @param option C4SubscribeOption
     */
    addSubscribe(option : C4SubscribeOption)
    • subscribe
    /**
     * 开始订阅
     */
    async subscribe()
    • subscribeEx
    /**
     * 开始订阅(支持 * 和 # 通配符的route key)
     */
    async subscribeEx()
    • __processMsg
    /**
     * 消息处理方法
     */
    async __processMsg()
    • __processMsgEx
    /**
     * 消息处理方法(支持 * 和 # 通配符的route key)
     */
    async __processMsgEx()
    • addMQHandler
    /**
     * 添加消息处理方法
     * @param handlers 方法对象或者加名字
     * 当handlers为名字时,会从
     * './MQHandlers/',
     * './out/MQHandlers/'
     * 两个位置尝试进行搜索对应的文件并加载
     * ts:默认放到在src/MQHandlers目下编写ts代码,
     * 编译后在out目录下自动得到'./out/MQHandlers/'
     * js:工程目录下创建MQHandlers,然后放入编译后的js代码
     */
    async addMQHandler(handlers : Array<any>) : Promise<any>;
    async addMQHandler(handlerPaths : string[]) : Promise<any>;
    async addMQHandler(arg : Array<any | string>)
    • isInit
    /**
     * 是否初始化
     */
    isInit()
    • getName
    /**
     * 获取Queue的名字
     */
    getName()