csp-api-common-libraries v0.0.1
Introduction to NPM packages
There are 2 ways to use libraries
- Referencing the module name:
- Requires having index.ts which exports all the files.
import { Injectable } from '@nestjs/common';
- Using the path:
import { Injectable } from '@nestjs/common/decorators/core/injectable.decorator';
When using typescript, we need to split source and compiled files:
- Source files → src
- Compiled files → dist
If we export all files, there can't be name collisions.
- Invalid example:
- JSONSchema.ts → export const ValidationResult
- SchemaValidator.ts → export const ValidationResult
- Recommended: Include a LICENSE file
Lerna
Commands:
- "lerna bootstrap" will take care of installing all modules
- "lerna run" will ensure steps are executed in the right order
- Example: "lerna run build" will compile first modules without dependencies
Versioning:
- Can version all packages together or one-by-one
- All together → Any change will update version on all packages
- Independent → Only changed packages and dependent ones will be versioned
- Versions will create a tag on Bitbucket
Publish:
- Uses lerna version underneath, so will update the version number before publishing
Local development
Inside csp-api-common-libraries project
- lerna bootstrap will take care of linking projects
- May require to rebuild dependencies when doing changes as we use typescript
Externals projects (backend, source, data-capture..):
- Load project from local file:
"@csp-api-common-libraries/core": "file:../csp-api-common-libraries/packages/core"
How to create a new version of the library?
Pre-requisites:
- Code is on master branch
- It is aligned with all team members which version will be deployed
Version number: can be selected between following. Check https://docs.npmjs.com/about-semantic-versioning#incrementing-semantic-versions-in-published-packages
- Major: 1.0.0
- Minor: 0.1.0
- Patch: 0.0.1
- Prerelease: 0.0.1-alpha.0
Publication Job: publish-csp-api-common-libraries
Parameters:
- Version: Will apply the selected version change.
- If no value, there will be no version changes
- Publish: YES/NO
- If YES, will publish the final version (after versioning process if selected) to adidas private NPM repository
- If a package version already exists, it will be skipped
Functionalities
## Kafka
### Kafka producer
Example how to inject a kafka producer using ConfigLoader.
It allows to mock producer based on config property useMockProducer
const createKafkaConfig = (kafkaConfig: AppConfig['kafka']): Array<DefaultKafkaProducerModuleOption> => {
const { topic, clientId, brokerList, schemaRegistry } = kafkaConfig;
return [
{
name: KAFKA_PRODUCER_NAME,
options: {
client: {
ssl: kafkaConfig.ssl && {
ca: kafkaConfig.ssl.fdpTruststoreCa,
cert: kafkaConfig.ssl.fdpSecret,
key: kafkaConfig.ssl.fdpKey
},
clientId,
brokers: brokerList
},
serializer: new KafkaAvroRequestSerializer({
config: {
host: schemaRegistry.host
},
schemas: [
{
topic,
keySuffix: schemaRegistry.keySuffix,
valueSuffix: schemaRegistry.valueSuffix
}
]
})
}
}
];
};
const createMockConfig = (): Array<MockKafkaProducerModuleOption> => [
{
name: SOURCE_KAFKA_PRODUCER_NAME,
mockedProducer: true
}
];
const kafkaModuleConfig = {
useFactory(configLoader: ConfigLoader<SourceAppConfig>): Array<KafkaProducerModuleOption> {
const kafkaConfig = configLoader.getConfig().kafka;
return kafkaConfig.useMockProducer ? createMockConfig() : createKafkaConfig(kafkaConfig);
},
inject: [ConfigLoader]
} as KafkaModuleOptionsAsync;
@Module({
imports: [KafkaProducerModule.registerAsync([KAFKA_PRODUCER_NAME])]
...
})
export class MyModule {}
Then you just need to inject it on your service:
import { KafkaProducerService } from '@csp-api-common-libraries/core/dist/services/kafka/producer/services/KafkaProducerService';
@Injectable()
export class SourceKafkaProducerService {
private readonly topic = 'MY_TOPIC';
public constructor(
@Inject(KAFKA_PRODUCER_NAME) private readonly kafkaService: KafkaProducerService
) {}
...
public async sendData(data: any): Promise<Array<RecordMetadata>> {
const message = {
topic: this.topic,
messages: [
{
key: data.key,
value: data.value
}
]
};
return this.kafkaService.send(message);
}
}
Kafka consumer
To create a Kafka consumer we first need to register one or several consumer using KafkaConsumerModule. If only one Consumer, name parameter is optional.
Example how to inject a kafka consumer without ConfigLoader. ConfigLoader can be also used similarly to the example shown for Kafka Producer.
KafkaConsumerModule.register([
{
name: optional_name, //Mandatory if more than one Consumer
options: {
client: {
clientId: 'myClient',
brokers: ['localhost:9092']
},
consumer: {
groupId: 'myGroup',
allowAutoTopicCreation: false
},
deserializer: new KafkaAvroResponseDeserializer({
host: 'http://localhost:8081'
})
}
}
];
Then you need to define a controller (annotated as Controller) which will process data from consumer.
You can use two annotations for configuring the Consumer:
@Topic(topic: string, fromBeginning?: boolean, consumerName?: string)
@Pattern(matcher?: Matcher, mapper?: Mapper)
TOPIC
Topic annotation can be used to annotate a Controller or a method. If more than one consumer has been defined, consumerName MUST be provided linking the controller to the consumer.
If the topic annotation is placed in a Controller, then a Pattern annotation needs to be placed in any of the methods of the controller.
PATTERN
Pattern annotation is used to:
- Without parameters, mark a method as the handler for all the messages received from the topic
- With a Matcher, you can select which messages will be processed by this method.
- With a Mapper, instead of the method being called with the raw message from Kafka, it will be called with a transformed version of the message
A matcher is any class implementing the following interface, returning true for the message that we want to process.
export interface Matcher<P = any, K = any> {
match(message: KafkaResponse<K, P>): boolean;
}
A mapper is any class implementing the following interface, it will receive the raw kafka message and process it to a more friendly object.
export interface Mapper<P = any, K = any, R = any> {
transform(message: KafkaResponse<K, P>): R;
}
Example of usage:
Controller:
import { Pattern } from '@csp-api-common-libraries/nestjs/src/services/kafka/decorator/EventDecorator';
import { Topic } from '@csp-api-common-libraries/nestjs/src/services/kafka/decorator/TopicDecorator';
import { Controller} from '@nestjs/common';
import { AccountRequestDto } from './dto/AccountRequestDto';
import { PartnershipAccountMapper } from './PartnershipAccountMapper';
import { PartnershipAccountMatcher } from './PartnershipAccountMatcher';
@Controller()
@Topic('ca_consumer_anp.anp.event.consumer_activity')
export class PartnerController {
@Pattern(new PartnershipAccountMatcher(), new PartnershipAccountMapper())
public handleCreateSource(account: AccountRequestDto): any {
// Do something there
}
}
Matcher:
import { KafkaResponse, Matcher } from '@csp-api-common-libraries/core/dist/services/kafka/consumer/ConsumerInterfaces';
import { validPartnershipEvents } from '../constants/PartnershipModuleConstants';
export class PartnershipAccountMatcher implements Matcher {
public match(message: KafkaResponse<string, Record<'type', string>>): boolean {
return Object.values(validPartnershipEvents).includes(message.response.type as validPartnershipEvents);
}
}
Mapper (using BaseMapper functionality is also recommended):
import { KafkaResponse, Mapper } from '@csp-api-common-libraries/core/dist/services/kafka/consumer/ConsumerInterfaces';
import { BaseMapper } from '@csp-api-common-libraries/core/dist/utils/mapper/BaseMapper';
import { AccountRequestDto } from './dto/AccountRequestDto';
export class PartnershipAccountMapper extends BaseMapper implements Mapper {
private static readonly MAP_TABLE = {
'response.data.account.email': 'email',
'response.touchpointSourceId.sourceId': 'sourceId'
};
public transform(message: KafkaResponse<string, Record<string, unknown>>): AccountRequestDto {
return BaseMapper.mapObject(message, PartnershipAccountMapper.MAP_TABLE);
}
}
3 years ago