1.0.11 • Published 5 months ago
@develop-x/nest-service-connector v1.0.11
@develop-x/nest-service-connector
概述
@develop-x/nest-service-connector
是一个 NestJS 包,提供集成了服务发现、熔断器模式和 OpenTelemetry 的 HTTP 客户端功能。它通过内置的弹性模式和可观测性简化微服务间的通信。
安装
npm install @develop-x/nest-service-connector
功能特性
- 服务发现集成: 通过 Consul 自动服务发现
- 熔断器模式: 使用 Opossum 实现内置熔断器,提供弹性保障
- 请求构建器: 链式 API 构建 HTTP 请求
- OpenTelemetry 集成: 服务间调用的自动链路追踪
- 可配置超时: 可自定义请求和熔断器超时时间
- 错误处理: 完善的错误处理和适当的 HTTP 状态码
- 类型安全: 完整的 TypeScript 支持和泛型响应类型
使用方法
模块导入
在应用模块中导入 HttpClientModule
:
import { Module } from '@nestjs/common';
import { HttpClientModule } from '@develop-x/nest-service-connector';
@Module({
imports: [
HttpClientModule.forRoot(
'http://consul:8500', // Consul URL
'your-api-key', // 内部认证 API 密钥
{ // 熔断器选项
timeout: 5000,
errorThresholdPercentage: 50,
resetTimeout: 30000,
}
),
],
})
export class AppModule {}
基本用法
在服务中注入 HttpClientService
:
import { Injectable } from '@nestjs/common';
import { HttpClientService } from '@develop-x/nest-service-connector';
@Injectable()
export class UserService {
constructor(private readonly httpClient: HttpClientService) {}
async getUserById(id: string) {
return this.httpClient
.builder()
.toService('user-service')
.atEndpoint(`/users/${id}`)
.withMethod('GET')
.execute<User>();
}
async createUser(userData: CreateUserDto) {
return this.httpClient
.builder()
.toService('user-service')
.atEndpoint('/users')
.withMethod('POST')
.withBody(userData)
.execute<User>();
}
async updateUser(id: string, userData: UpdateUserDto) {
return this.httpClient
.builder()
.toService('user-service')
.atEndpoint(`/users/${id}`)
.withMethod('PUT')
.withBody(userData)
.execute<User>();
}
async deleteUser(id: string) {
return this.httpClient
.builder()
.toService('user-service')
.atEndpoint(`/users/${id}`)
.withMethod('DELETE')
.execute<void>();
}
}
API 参考
HttpClientModule 配置
forRoot(consulUrl?: string, apiKey?: string, breakerOptions?: BreakerOptions)
使用以下参数配置 HTTP 客户端模块:
参数:
consulUrl?: string
- 服务发现的 Consul 服务器 URLapiKey?: string
- 内部服务认证的 API 密钥breakerOptions?: BreakerOptions
- 熔断器配置
熔断器选项:
interface BreakerOptions {
timeout?: number; // 请求超时时间(毫秒,默认: 5000)
errorThresholdPercentage?: number; // 错误阈值百分比(默认: 50)
resetTimeout?: number; // 重置超时时间(毫秒,默认: 30000)
rollingCountTimeout?: number; // 滚动窗口时间(毫秒,默认: 10000)
rollingCountBuckets?: number; // 桶数量(默认: 10)
name?: string; // 熔断器名称
group?: string; // 熔断器分组
}
HttpClientService
builder()
创建新的 HTTP 请求构建器。
返回: HttpRequestBuilder
- 链式请求构建器
batchExecute(builders: HttpRequestBuilder[])
批量执行多个请求构建器。
参数:
builders: HttpRequestBuilder[]
- 请求构建器数组
返回: Promise<T[]>
- 批量执行结果
HttpRequestBuilder
toService(service: string)
设置目标服务名称。
.toService('user-service')
atEndpoint(endpoint: string)
设置请求端点路径。
.atEndpoint('/api/users/123')
withMethod(method: 'GET' | 'POST' | 'PUT' | 'DELETE')
设置 HTTP 方法。
.withMethod('POST')
withHeaders(headers: Record<string, string>)
设置请求头。
.withHeaders({
'Content-Type': 'application/json',
'Authorization': 'Bearer token'
})
withBody(body: Record<string, any>)
设置请求体(用于 POST、PUT 请求)。
.withBody({ name: 'John Doe', email: 'john@example.com' })
withQuery(query: Record<string, any>)
设置查询参数。
.withQuery({ page: 1, limit: 10, filter: 'active' })
withApiVersion(version: string)
设置 API 版本。
.withApiVersion('2') // 会生成 /v2/endpoint
withFetchOptions(options: RequestInit)
设置额外的 fetch 选项。
.withFetchOptions({ timeout: 10000 })
execute()
执行请求并返回结果。
.execute<ResponseType>(): Promise<ResponseType>
高级用法
自定义请求头和认证
@Injectable()
export class AuthenticatedApiService {
constructor(private readonly httpClient: HttpClientService) {}
async getProtectedData(token: string) {
return this.httpClient
.builder()
.toService('protected-service')
.atEndpoint('/protected/data')
.withMethod('GET')
.withHeaders({
'Authorization': `Bearer ${token}`,
'X-Client-Version': '1.0.0',
})
.execute<ProtectedData>();
}
}
查询参数和过滤
@Injectable()
export class ProductService {
constructor(private readonly httpClient: HttpClientService) {}
async searchProducts(filters: ProductFilters) {
return this.httpClient
.builder()
.toService('product-service')
.atEndpoint('/products/search')
.withMethod('GET')
.withQuery({
category: filters.category,
minPrice: filters.minPrice,
maxPrice: filters.maxPrice,
page: filters.page || 1,
limit: filters.limit || 20,
})
.execute<ProductSearchResult>();
}
}
错误处理
@Injectable()
export class OrderService {
constructor(private readonly httpClient: HttpClientService) {}
async createOrder(orderData: CreateOrderDto) {
try {
return await this.httpClient
.builder()
.toService('order-service')
.atEndpoint('/orders')
.withMethod('POST')
.withBody(orderData)
.withFetchOptions({ timeout: 15000 }) // 15秒超时
.execute<Order>();
} catch (error) {
if (error.status === 400) {
throw new BadRequestException('Invalid order data');
} else if (error.status === 409) {
throw new ConflictException('Order already exists');
} else if (error.message.includes('Service temporarily unavailable')) {
throw new ServiceUnavailableException('Order service is temporarily unavailable');
}
throw error;
}
}
}
批量操作
@Injectable()
export class BulkOperationService {
constructor(private readonly httpClient: HttpClientService) {}
async bulkCreateUsers(users: CreateUserDto[]) {
return this.httpClient
.builder()
.toService('user-service')
.atEndpoint('/users/bulk')
.withMethod('POST')
.withBody({ users })
.withFetchOptions({ timeout: 30000 }) // 批量操作使用更长超时
.execute<BulkCreateResult>();
}
// 使用批量执行
async getMultipleUserData(userIds: string[]) {
const builders = userIds.map(id =>
this.httpClient
.builder()
.toService('user-service')
.atEndpoint(`/users/${id}`)
.withMethod('GET')
);
return this.httpClient.batchExecute<User>(builders);
}
}
API 版本控制
@Injectable()
export class VersionedApiService {
constructor(private readonly httpClient: HttpClientService) {}
async getUserV1(id: string) {
return this.httpClient
.builder()
.toService('user-service')
.atEndpoint(`/users/${id}`)
.withMethod('GET')
.withApiVersion('1') // 调用 /v1/users/123
.execute<UserV1>();
}
async getUserV2(id: string) {
return this.httpClient
.builder()
.toService('user-service')
.atEndpoint(`/users/${id}`)
.withMethod('GET')
.withApiVersion('2') // 调用 /v2/users/123
.execute<UserV2>();
}
}
最佳实践
- 服务命名: 在微服务中使用一致的服务名称
- 超时配置: 为不同操作设置适当的超时时间
- 错误处理: 实现适当的错误处理和回退机制
- 熔断器: 根据 SLA 配置熔断器阈值
- 监控: 监控熔断器指标和服务健康状况
- 缓存: 为频繁访问的数据实现缓存
- 重试逻辑: 考虑为瞬时故障实现重试逻辑
依赖项
@develop-x/nest-consul
: 服务发现的 Consul 集成@opentelemetry/api
: 链路追踪的 OpenTelemetry APIopossum
: 熔断器实现
许可证
ISC
支持
如有问题或疑问,请参考项目仓库或联系开发团队。