cypress-commands-kafka v2.1.7
To get started :
npm install -D cypress-commands-kafka
Cypress configuration :
In your config file add :
const { CypressKafka } = require("cypress-commands-kafka")
Kafka and Schema registry configuration :
The first part is for Kafka, the second for the registry, you can send whatever is accepted by kafkaJs. Read the following documentation for more information : https://kafka.js.org/docs/configuration https://kafkajs.github.io/confluent-schema-registry/docs/usage
kafka configuration is required, schema registry is optional.
Example of a configuration with kafka and schema registry :
const cypressKafka = new CypressKafka(
{
clientId: process.env.KAFKA_CLIENT_ID,
brokers: [process.env.KAFKA_BROKERS],
ssl: true,
sasl: {
mechanism: "plain",
username: process.env.KAFKA_SASL_USERNAME,
password: process.env.KAFKA_SASL_PASSWORD,
},
},
{
host: process.env.KAFKA_SCHEMA_REGISTRY_HOST,
auth: {
username: process.env.KAFKA_SCHEMA_REGISTRY_USERNAME,
password: process.env.KAFKA_SCHEMA_REGISTRY_PASSWORD,
},
}
);
The tasks :
async produceEvent(fileName) {
return cypressKafka.produceEvent(fileName);
},
async consumeEvent(topics) {
const groupId = "your-group-id"
return cypressKafka.consumeEvent(topics, groupId);
},
async getConsumedEvent(topic) {
return cypressKafka.getConsumedEvent(topic);
},
async cleanAndDisconnectConsumer() {
return cypressKafka.cleanAndDisconnectConsumer();
}
In your index.d.ts add :
produceEvent(data: string): Chainable<void>;
consumeEvent(topic: Array<string>): Chainable<void>;
cleanAndDisconnectConsumer(): Chainable<void>;
getConsumedEvent(topic: string): Chainable<any[]>;
in your commands file add :
Cypress.Commands.add("produceEvent", (data: string) => {
cy.task("produceEvent", data);
});
Cypress.Commands.add("consumeEvent", (topic: Array<string>) => {
cy.task("consumeEvent", topic);
});
Cypress.Commands.add("getConsumedEvent", (topic: string) => {
cy.task("getConsumedEvent", topic);
});
Cypress.Commands.add("cleanAndDisconnectConsumer", () => {
cy.task("cleanAndDisconnectConsumer");
});
How to use :
To produce an event
To call it in your test :
cy.produceEvent(`cypress/fixtures/${Cypress.env('KAFKA_ENV')}/myFile.json`);
Here I even added an environment variable to be able to do my test on SIT or UAT. You'll need to give the path to a fixture.
The fixture will look like this :
[
{
"key": {
"myKey": "myKeyValue",
"myKey2": "myKeyValue"
},
"headers": {
"event_type": "myHeader"
},
"payload": {
"MyPayload1": "MyPayloadValue1",
"MyPayload2": "MyPayloadValue2",
"MyPayload3": "MyPayloadValue3",
},
"topic": "THE TOPIC TO SEND THE MESSAGE TO",
"subject": "THE SCHEMA REGISTRY FOR THE PAYLOAD OF THE TOPIC",
"subjectKey": "THE SCHEMA REGISTRY FOR THE KEY OF THE TOPIC"
},
{
"key": {
"myKey": "anotherExample"
},
"payload": {
"MyPayload1": "anotherExample",
"MyPayload2": "anotherExample",
"MyPayload3": "anotherExample",
},
"topic": "anotherExample"
}
]
You can send 1 message or more on 1 topic or on multiple topic at the same time. The only required things are the topic and the payload. In the first message in the example I send a message to a topic wich use avro on the key and the payload so I added the subject and the subject key. On the second message in the example, there is no avro used so there is no subject or subjectKey.
Always wait a bit ( cy.wait(3000) ? ) after the events are send to give some time for them to arrive and be consumed by your application
To consume an event
First don't forget to put const groupId = "your-group-id" with the right group id in your config file.
cy.consumeEvent(["Topic1", "Topic2"]);
Will listen to the topics given in the array, you can listen to 1 or more topic at the same time, the message are consume at the moment where the consumeEvent task is called. So lets say you wanna test if you get an event when you do something on your website, you'll need to start consuming BEFORE you do your test that will give you an event.
Every event consumed are stocked in a variable, so to test you'll need the next command
To check / test your consumed event
cy.getConsumedEvent("Mytopic")
The command to get the consumed event, sorted by topics so lets say you had 2 topics you could do :
cy.getConsumedEvent("myTopic1").then((messages) => {
//A filter to get the right message by the key in this case
const filteredMessages = messages.filter(message => message.key === "myKeyValue");
filteredMessages.forEach(msg => {
//The test to check the payload content and see if it was right
expect(msg.payload.myId).to.equal("wow good job");
});
});
cy.getConsumedEvent("myTopic2").then((messages) => {
//A filter to get the right message by the id in the payload in this case
const filteredMessages = messages.filter(message => message.payload.myId === "myIdValueInPayload");
filteredMessages.forEach(msg => {
//The test to check the key content and see if it was right
expect(msg.key.myKeyName).to.equal("123465");
});
});
With those 2 examples, I leave the rest to your imagination...
To disconnect the consumer after your test
In my case I just do this :
afterEach(() => {
cy.cleanAndDisconnectConsumer();
});
It will disconnect the consumer and empty the variable that stocked the messages consumed, so I just do it after every test
Side notes :
This package is a work in progress so be mindfull some stuff might change.
Working on it :
- Check if it works with tests that are running in parrallel
- Rename my variables for more comprehension