1.0.11 • Published 5 months ago

@develop-x/nest-service-connector v1.0.11

Weekly downloads
-
License
ISC
Repository
-
Last release
5 months ago

@develop-x/nest-service-connector

概述

@develop-x/nest-service-connector 是一个 NestJS 包,提供集成了服务发现、熔断器模式和 OpenTelemetry 的 HTTP 客户端功能。它通过内置的弹性模式和可观测性简化微服务间的通信。

安装

npm install @develop-x/nest-service-connector

功能特性

  • 服务发现集成: 通过 Consul 自动服务发现
  • 熔断器模式: 使用 Opossum 实现内置熔断器,提供弹性保障
  • 请求构建器: 链式 API 构建 HTTP 请求
  • OpenTelemetry 集成: 服务间调用的自动链路追踪
  • 可配置超时: 可自定义请求和熔断器超时时间
  • 错误处理: 完善的错误处理和适当的 HTTP 状态码
  • 类型安全: 完整的 TypeScript 支持和泛型响应类型

使用方法

模块导入

在应用模块中导入 HttpClientModule

import { Module } from '@nestjs/common';
import { HttpClientModule } from '@develop-x/nest-service-connector';

@Module({
  imports: [
    HttpClientModule.forRoot(
      'http://consul:8500',  // Consul URL
      'your-api-key',        // 内部认证 API 密钥
      {                      // 熔断器选项
        timeout: 5000,
        errorThresholdPercentage: 50,
        resetTimeout: 30000,
      }
    ),
  ],
})
export class AppModule {}

基本用法

在服务中注入 HttpClientService

import { Injectable } from '@nestjs/common';
import { HttpClientService } from '@develop-x/nest-service-connector';

@Injectable()
export class UserService {
  constructor(private readonly httpClient: HttpClientService) {}

  async getUserById(id: string) {
    return this.httpClient
      .builder()
      .toService('user-service')
      .atEndpoint(`/users/${id}`)
      .withMethod('GET')
      .execute<User>();
  }

  async createUser(userData: CreateUserDto) {
    return this.httpClient
      .builder()
      .toService('user-service')
      .atEndpoint('/users')
      .withMethod('POST')
      .withBody(userData)
      .execute<User>();
  }

  async updateUser(id: string, userData: UpdateUserDto) {
    return this.httpClient
      .builder()
      .toService('user-service')
      .atEndpoint(`/users/${id}`)
      .withMethod('PUT')
      .withBody(userData)
      .execute<User>();
  }

  async deleteUser(id: string) {
    return this.httpClient
      .builder()
      .toService('user-service')
      .atEndpoint(`/users/${id}`)
      .withMethod('DELETE')
      .execute<void>();
  }
}

API 参考

HttpClientModule 配置

forRoot(consulUrl?: string, apiKey?: string, breakerOptions?: BreakerOptions)

使用以下参数配置 HTTP 客户端模块:

参数:

  • consulUrl?: string - 服务发现的 Consul 服务器 URL
  • apiKey?: string - 内部服务认证的 API 密钥
  • breakerOptions?: BreakerOptions - 熔断器配置

熔断器选项:

interface BreakerOptions {
  timeout?: number;                    // 请求超时时间(毫秒,默认: 5000)
  errorThresholdPercentage?: number;   // 错误阈值百分比(默认: 50)
  resetTimeout?: number;               // 重置超时时间(毫秒,默认: 30000)
  rollingCountTimeout?: number;        // 滚动窗口时间(毫秒,默认: 10000)
  rollingCountBuckets?: number;        // 桶数量(默认: 10)
  name?: string;                       // 熔断器名称
  group?: string;                      // 熔断器分组
}

HttpClientService

builder()

创建新的 HTTP 请求构建器。

返回: HttpRequestBuilder - 链式请求构建器

batchExecute(builders: HttpRequestBuilder[])

批量执行多个请求构建器。

参数:

  • builders: HttpRequestBuilder[] - 请求构建器数组

返回: Promise<T[]> - 批量执行结果

HttpRequestBuilder

toService(service: string)

设置目标服务名称。

.toService('user-service')

atEndpoint(endpoint: string)

设置请求端点路径。

.atEndpoint('/api/users/123')

withMethod(method: 'GET' | 'POST' | 'PUT' | 'DELETE')

设置 HTTP 方法。

.withMethod('POST')

withHeaders(headers: Record<string, string>)

设置请求头。

.withHeaders({
  'Content-Type': 'application/json',
  'Authorization': 'Bearer token'
})

withBody(body: Record<string, any>)

设置请求体(用于 POST、PUT 请求)。

.withBody({ name: 'John Doe', email: 'john@example.com' })

withQuery(query: Record<string, any>)

设置查询参数。

.withQuery({ page: 1, limit: 10, filter: 'active' })

withApiVersion(version: string)

设置 API 版本。

.withApiVersion('2')  // 会生成 /v2/endpoint

withFetchOptions(options: RequestInit)

设置额外的 fetch 选项。

.withFetchOptions({ timeout: 10000 })

execute()

执行请求并返回结果。

.execute<ResponseType>(): Promise<ResponseType>

高级用法

自定义请求头和认证

@Injectable()
export class AuthenticatedApiService {
  constructor(private readonly httpClient: HttpClientService) {}

  async getProtectedData(token: string) {
    return this.httpClient
      .builder()
      .toService('protected-service')
      .atEndpoint('/protected/data')
      .withMethod('GET')
      .withHeaders({
        'Authorization': `Bearer ${token}`,
        'X-Client-Version': '1.0.0',
      })
      .execute<ProtectedData>();
  }
}

查询参数和过滤

@Injectable()
export class ProductService {
  constructor(private readonly httpClient: HttpClientService) {}

  async searchProducts(filters: ProductFilters) {
    return this.httpClient
      .builder()
      .toService('product-service')
      .atEndpoint('/products/search')
      .withMethod('GET')
      .withQuery({
        category: filters.category,
        minPrice: filters.minPrice,
        maxPrice: filters.maxPrice,
        page: filters.page || 1,
        limit: filters.limit || 20,
      })
      .execute<ProductSearchResult>();
  }
}

错误处理

@Injectable()
export class OrderService {
  constructor(private readonly httpClient: HttpClientService) {}

  async createOrder(orderData: CreateOrderDto) {
    try {
      return await this.httpClient
        .builder()
        .toService('order-service')
        .atEndpoint('/orders')
        .withMethod('POST')
        .withBody(orderData)
        .withFetchOptions({ timeout: 15000 }) // 15秒超时
        .execute<Order>();
    } catch (error) {
      if (error.status === 400) {
        throw new BadRequestException('Invalid order data');
      } else if (error.status === 409) {
        throw new ConflictException('Order already exists');
      } else if (error.message.includes('Service temporarily unavailable')) {
        throw new ServiceUnavailableException('Order service is temporarily unavailable');
      }
      throw error;
    }
  }
}

批量操作

@Injectable()
export class BulkOperationService {
  constructor(private readonly httpClient: HttpClientService) {}

  async bulkCreateUsers(users: CreateUserDto[]) {
    return this.httpClient
      .builder()
      .toService('user-service')
      .atEndpoint('/users/bulk')
      .withMethod('POST')
      .withBody({ users })
      .withFetchOptions({ timeout: 30000 }) // 批量操作使用更长超时
      .execute<BulkCreateResult>();
  }

  // 使用批量执行
  async getMultipleUserData(userIds: string[]) {
    const builders = userIds.map(id =>
      this.httpClient
        .builder()
        .toService('user-service')
        .atEndpoint(`/users/${id}`)
        .withMethod('GET')
    );

    return this.httpClient.batchExecute<User>(builders);
  }
}

API 版本控制

@Injectable()
export class VersionedApiService {
  constructor(private readonly httpClient: HttpClientService) {}

  async getUserV1(id: string) {
    return this.httpClient
      .builder()
      .toService('user-service')
      .atEndpoint(`/users/${id}`)
      .withMethod('GET')
      .withApiVersion('1')  // 调用 /v1/users/123
      .execute<UserV1>();
  }

  async getUserV2(id: string) {
    return this.httpClient
      .builder()
      .toService('user-service')
      .atEndpoint(`/users/${id}`)
      .withMethod('GET')
      .withApiVersion('2')  // 调用 /v2/users/123
      .execute<UserV2>();
  }
}

最佳实践

  1. 服务命名: 在微服务中使用一致的服务名称
  2. 超时配置: 为不同操作设置适当的超时时间
  3. 错误处理: 实现适当的错误处理和回退机制
  4. 熔断器: 根据 SLA 配置熔断器阈值
  5. 监控: 监控熔断器指标和服务健康状况
  6. 缓存: 为频繁访问的数据实现缓存
  7. 重试逻辑: 考虑为瞬时故障实现重试逻辑

依赖项

  • @develop-x/nest-consul: 服务发现的 Consul 集成
  • @opentelemetry/api: 链路追踪的 OpenTelemetry API
  • opossum: 熔断器实现

许可证

ISC

支持

如有问题或疑问,请参考项目仓库或联系开发团队。

1.0.11

5 months ago

1.0.9

5 months ago

1.0.8

5 months ago

1.0.7

5 months ago

1.0.6

5 months ago

1.0.4

5 months ago

1.0.3

5 months ago

1.0.2

5 months ago

1.0.1

5 months ago

1.0.0

5 months ago