@ceramicnetwork/transport-subject v0.3.0
TransportSubject
Message-based communication transport as a RxJS Subject
Installation
npm install @ceramicnetwork/transport-subjectUsage
import { TransportSubject } from '@ceramicnetwork/transport-subject'
import { Subscriber, interval } from 'rxjs'
import { map } from 'rxjs'
type Message = { type: string }
class MyTransport extends TransportSubject<Message> {
constructor(time = 1000) {
const source = interval(time).map(() => ({ type: 'ping' }))
const sink = new Subscriber((message) => {
console.log('send message', message)
})
super(source, sink)
}
}
const transport = new MyTransport()
transport.subscribe((message) => {
console.log('received message', message)
})
transport.next({ type: 'pong' })Types
Wrapped
type Wrapped<Message, Namespace extends string> = { __tw: true; msg: Message; ns: Namespace }Wrapper
type Wrapper<MsgIn, MsgOut, WrappedOut> = {
wrap: (msg: MsgOut) => WrappedOut
unwrap: (input: any) => MsgIn
}UnwrapOperatorOptions
type UnwrapOperatorOptions = {
onInvalidInput?: (input: unknown, error: Error) => void
throwWhenInvalid?: boolean
}API
TransportSubject class
Extends RxJS Subject class
Type parameters
MsgIn: the type of the messages coming in from thesourceMsgOut = MsgIn: the type of the messages going out to thesink
new TransportSubject()
Arguments
.next()
Arguments
message: MsgOut
Returns void
createWrap()
Type parameters
MsgOutNamespace extends string = string
Arguments
namespace: Namespace
Returns (msg: MsgOut) => Wrapped<MsgOut, Namespace>, see Wrapped type
createUnwrap()
Type parameters
MsgInNamespace extends string = string
Arguments
namespace: Namespace
Returns (input: any) => MsgIn
createWrapper()
Combines createWrap() and createUnwrap()
Type parameters
MsgIn: the type of the messages coming in from the returned transportMsgOut = MsgIn: the type of the messages pushed to the returned transportNamespace extends string = string
Arguments
namespace: Namespace
Returns Wrapper<MsgIn, MsgOut, Wrapped<MsgOut, Namespace>>, see Wrapper and Wrapped types
createUnwrapOperator()
Type parameters
WrappedIn: the type of the messages coming in from the inputsourceMsgIn: the type of the messages coming in from the returned observable
Arguments
unwrap: (input: any) => MsgInoptions?: UnwrapOperatorOptions = {}
Returns OperatorFunction<WrappedIn, MsgIn>
createWrapObserver()
Type parameters
MsgOut: the type of the messages pushed to the returned observerWrappedOut: the type of the messages going out to the inputsink
Arguments
sink: Observer<WrappedOut>wrap: (msg: MsgOut) => WrappedOut
Returns Observer<MsgOut>
createWrappedTransport()
Combines createUnwrapObservable() and createWrapObserver() in a TransportSubject
Type parameters
MsgIn: the type of the messages coming in from the returned transportMsgOut: the type of the messages pushed to the returned transportWrappedIn: the type of the messages coming in from the inputtransportsourceWrappedOut = WrappedIn: the type of the messages going out to the inputtransportsink
Arguments
transport: TransportSubject<WrappedIn, WrappedOut>wrapper: MessageWrapper<MsgIn, MsgOut, WrappedOut>options?: UnwrapObservableOptions = {}
Returns TransportSubject<MsgIn, MsgOut>
createNamepacedTransport()
Combines createWrappedTransport() and createWrapper()
Type parameters
MsgIn: the type of the messages coming in from the returned transportMsgOut = MsgIn: the type of the messages pushed to the returned transportNamespace extends string = string
Arguments
transport TransportSubject<Wrapped<MsgIn, Namespace>, Wrapped<MsgOut, Namespace>>>namespace: Namespaceoptions?: UnwrapOperatorOptions = {}
Returns TransportSubject<MsgIn, MsgOut>
License
Apache-2.0 OR MIT