@reconbot/subscriptionless v1.1.1
About
This is a hopefully temporary fork of https://github.com/andyrichardson/subscriptionless. All changes here will be eventually pushed into the primary project or abandoned.
GraphQL subscriptions for AWS Lambda and API Gateway WebSockets.
Have all the functionality of GraphQL subscriptions on a stateful server without the cost.
Note: This project uses the graphql-ws protocol under the hood.
⚠️ Limitations
Seriously, read this first before you even think about using this.
This is Alpha software and should be treated as such.
There are a few noteworthy limitations to the AWS API Gateway WebSocket implementation.
Note: If you work on AWS and want to run through this, hit me up!
Ping/Pong
For whatever reason, AWS API Gateway does not support WebSocket protocol level ping/pong.
This means early detection of unclean client disconnects is near impossible (graphql-ws will not implement subprotocol level ping/pong).
Socket idleness
API Gateway considers an idle connection to be one where no messages have been sent on the socket for a fixed duration (currently 10 minutes).
Again, the WebSocket spec has support for detecting idle connections (ping/pong) but API Gateway doesn't use it. This means, in the case where both parties are connected, and no message is sent on the socket for the defined duration (direction agnostic), API Gateway will close the socket.
A quick fix for this is to set up immediate reconnection on the client side.
Socket errors
API Gateway's current socket closing functionality doesn't support any kind of message/payload. Along with this, graphql-ws won't support error messages.
Because of this limitation, there is no clear way to communicate subprotocol errors to the client. In the case of a subprotocol error the socket will be closed by the server (with no meaningful disconnect payload).
Setup
Create a subscriptionless instance.
import { createInstance } from 'subscriptionless';
const instance = createInstance({
dynamodb,
schema,
});
Export the handler.
export const handler = instance.handler;
Configure API Gateway
Set up API Gateway to route WebSocket events to the exported handler.
Serverless framework example.
functions:
websocket:
name: my-subscription-lambda
handler: ./handler.handler
events:
- websocket:
route: $connect
- websocket:
route: $disconnect
- websocket:
route: $default
Create DynanmoDB tables for state
In-flight connections and subscriptions need to be persisted.
Use the tableNames
argument to override the default table names.
const instance = createInstance({
/* ... */
tableNames: {
connections: 'my_connections',
subscriptions: 'my_subscriptions',
},
});
resources:
Resources:
# Table for tracking connections
connectionsTable:
Type: AWS::DynamoDB::Table
Properties:
TableName: ${self:provider.environment.CONNECTIONS_TABLE}
AttributeDefinitions:
- AttributeName: id
AttributeType: S
KeySchema:
- AttributeName: id
KeyType: HASH
ProvisionedThroughput:
ReadCapacityUnits: 1
WriteCapacityUnits: 1
# Table for tracking subscriptions
subscriptionsTable:
Type: AWS::DynamoDB::Table
Properties:
TableName: ${self:provider.environment.SUBSCRIPTIONS_TABLE}
AttributeDefinitions:
- AttributeName: id
AttributeType: S
- AttributeName: topic
AttributeType: S
- AttributeName: connectionId
AttributeType: S
KeySchema:
- AttributeName: id
KeyType: HASH
- AttributeName: topic
KeyType: RANGE
GlobalSecondaryIndexes:
- IndexName: ConnectionIndex
KeySchema:
- AttributeName: connectionId
KeyType: HASH
Projection:
ProjectionType: ALL
ProvisionedThroughput:
ReadCapacityUnits: 1
WriteCapacityUnits: 1
- IndexName: TopicIndex
KeySchema:
- AttributeName: topic
KeyType: HASH
Projection:
ProjectionType: ALL
ProvisionedThroughput:
ReadCapacityUnits: 1
WriteCapacityUnits: 1
ProvisionedThroughput:
ReadCapacityUnits: 1
WriteCapacityUnits: 1
resource "aws_dynamodb_table" "connections-table" {
name = "subscriptionless_connections"
billing_mode = "PROVISIONED"
read_capacity = 1
write_capacity = 1
hash_key = "id"
attribute {
name = "id"
type = "S"
}
}
resource "aws_dynamodb_table" "subscriptions-table" {
name = "subscriptionless_subscriptions"
billing_mode = "PROVISIONED"
read_capacity = 1
write_capacity = 1
hash_key = "id"
range_key = "topic"
attribute {
name = "id"
type = "S"
}
attribute {
name = "topic"
type = "S"
}
attribute {
name = "connectionId"
type = "S"
}
global_secondary_index {
name = "ConnectionIndex"
hash_key = "connectionId"
write_capacity = 1
read_capacity = 1
projection_type = "ALL"
}
global_secondary_index {
name = "TopicIndex"
hash_key = "topic"
write_capacity = 1
read_capacity = 1
projection_type = "ALL"
}
}
Usage
PubSub
subscriptionless
uses it's own PubSub implementation which loosely implements the Apollo PubSub Interface.
Note: Unlike the Apollo
PubSub
library, this implementation is (mostly) stateless
Use the subscribe
function to associate incoming subscriptions with a topic.
import { subscribe } from 'subscriptionless/subscribe';
export const resolver = {
Subscribe: {
mySubscription: {
resolve: (event, args, context) => {/* ... */}
subscribe: subscribe('MY_TOPIC'),
}
}
}
Wrap any subscribe
function call in a withFilter
to provide filter conditions.
Note: If a function is provided, it will be called on subscription start and must return a serializable object.
import { withFilter, subscribe } from 'subscriptionless/subscribe';
// Subscription agnostic filter
withFilter(subscribe('MY_TOPIC'), {
attr1: '`attr1` must have this value',
attr2: {
attr3: 'Nested attributes work fine',
},
});
// Subscription specific filter
withFilter(subscribe('MY_TOPIC'), (root, args, context, info) => ({
userId: args.userId,
}));
Join multiple topic subscriptions together using concat
.
import { concat, subscribe } from 'subscriptionless/subscribe';
concat(subscribe('TOPIC_1'), subscribe('TOPIC_2'));
Use the publish
on your subscriptionless instance to publish events to active subscriptions.
instance.publish({
type: 'MY_TOPIC',
payload: 'HELLO',
});
Events can come from many sources
// SNS Event
export const snsHandler = (event) =>
Promise.all(
event.Records.map((r) =>
instance.publish({
topic: r.Sns.TopicArn.substring(r.Sns.TopicArn.lastIndexOf(':') + 1), // Get topic name (e.g. "MY_TOPIC")
payload: JSON.parse(r.Sns.Message),
})
)
);
// Manual Invocation
export const invocationHandler = (payload) =>
instance.publish({ topic: 'MY_TOPIC', payload });
Context
Context values are accessible in all resolver level functions (resolve
, subscribe
, onSubscribe
and onComplete
).
Assuming no context
argument is provided, the default value is an object containing a connectionParams
attribute.
This attribute contains the (optionally parsed) payload from connection_init
.
export const resolver = {
Subscribe: {
mySubscription: {
resolve: (event, args, context) => {
console.log(context.connectionParams); // payload from connection_init
},
},
},
};
An object can be provided via the context
attribute when calling createInstance
.
const instance = createInstance({
/* ... */
context: {
myAttr: 'hello',
},
});
The default values (above) will be appended to this object prior to execution.
A function (optionally async) can be provided via the context
attribute when calling createInstance
.
The default context value is passed as an argument.
const instance = createInstance({
/* ... */
context: ({ connectionParams }) => ({
myAttr: 'hello',
user: connectionParams.user,
}),
});
Side effects
Side effect handlers can be declared on subscription fields to handle onSubscribe
(start) and onComplete
(stop) events.
For onSubscribe
and onComplete
side effects to work, resolvers must first be passed to prepareResolvers
prior to schema construction.
import { prepareResolvers } from 'subscriptionless/subscribe';
const schema = makeExecutableSchema({
typedefs,
resolvers: prepareResolvers(resolvers),
});
export const resolver = {
Subscribe: {
mySubscription: {
resolve: (event, args, context) => {
/* ... */
},
subscribe: subscribe('MY_TOPIC'),
onSubscribe: (root, args) => {
/* Do something on subscription start */
},
onComplete: (root, args) => {
/* Do something on subscription stop */
},
},
},
};
Events
Global events can be provided when calling createInstance
to track the execution cycle of the lambda.
Called when a WebSocket connection is first established.
const instance = createInstance({
/* ... */
onConnect: ({ event }) => {
/* */
},
});
Called when a WebSocket connection is disconnected.
const instance = createInstance({
/* ... */
onDisconnect: ({ event }) => {
/* */
},
});
onConnectionInit
can be used to verify the connection_init
payload prior to persistence.
Note: Any sensitive data in the incoming message should be removed at this stage.
const instance = createInstance({
/* ... */
onConnectionInit: ({ message }) => {
const token = message.payload.token;
if (!myValidation(token)) {
throw Error('Token validation failed');
}
// Prevent sensitive data from being written to DB
return {
...message.payload,
token: undefined,
};
},
});
By default, the (optionally parsed) payload will be accessible via context.
Subscribe (onSubscribe)
Called when any subscription message is received.
const instance = createInstance({
/* ... */
onSubscribe: ({ event, message }) => {
/* */
},
});
Called when any complete message is received.
const instance = createInstance({
/* ... */
onComplete: ({ event, message }) => {
/* */
},
});
Called when any error is encountered
const instance = createInstance({
/* ... */
onError: (error, context) => {
/* */
},
});