@aarek/nest-stan 中文文档教程

发布于 4年前 浏览 27 项目主页 更新于 3年前

@aarek/nest-stan

Table of Contents

Description

包装 node-nats-streaming 用于 NestJS.

Usage

Install

npm install --save @aarek/nest-stan

yarn add @aarek/nest-stan

Module initialization

@Module({
  imports: [
    NestStanModule.forRoot({
      clientId: 'client-id-test1',
      clusterId: 'test-cluster',
      stanOptions: {
        url: 'nats://localhost:4222'
      }
    })
  ]
})
export class AppModule {}

@Module({
  imports: [
    NestStanModule.forRootAsync({
      imports: [ConfigModule]
      inject: [ConfigService],
      useFactory: (config: ConfigService) => config.nestStanConfig(),
    }),
  ]
})
export class AppModule {}

Handling messages

Nest Stan 为您提供两个装饰器 StanSubscribe 和<代码>AsyncStanSubscribe。 AsyncStanSubscribe 将在 Promise 解析成功后自动确认消息。 两个装饰器都接受 subject 作为第一个参数和可选的 OptionsBuilder 作为第二个参数。 OptionsBuilder 是一个接受 node-nats-streaming SubscriptionOptions 的函数,应该返回 SubscriptionOptions

例如:

@StanSubscribe('subject', options =>
  options.setStartAtTimeDelta(30 * 1000),
)
export class Subscriber implements IStanSubscriber<Message> {
  handle(message: Message, context: IMessageHandlerContext): void {
    // Handle message
    return;
  }
}
@AsyncStanSubscribe('subject', options =>
  options
    .setStartAtTimeDelta(30 * 1000)
    .setDurableName(AsyncSubscriber.name),
)
export class AsyncSubscriber implements IStanSubscriber<Message> {
  async handle(message: Message, context: IMessageHandlerContext): Promise<void> {
    // Handle message
    return;
  }
}

Publishing messages

为了发布消息,您首先需要为要发布的主题导入 NestStanModule

@Module({
  imports: [NestStanModule.forSubjects(['subject'])],
  controllers: [TestController],
  providers: [],
})
export class TestModule {}

Nest Stan 为您提供 InjectPublisher 装饰器以注入发布者:

@Controller()
class TestController {
  constructor(
    @InjectPublisher('subject') private readonly publisher: IStanPublisher<Message>,
  ) {}

  @Post()
  async publishMessage() {
    const message: Message = {
      foo: 'bar',
    }

    await this.publisher.publish(message)
  }
}

Connection status

在为了检查当前的连接状态,您可以使用 InjectConnectionStatusIndicator

@Injectable()
export class StanConnectionIndicator extends HealthIndicator {
  constructor(
    @InjectConnectionStatusIndicator() private readonly connectionStatus: ConnectionStatusIndicator
  ) {
    super();
  }

  async isHealthy(key: string): Promise<HealthIndicatorResult> {
    const isHealthy = this.connectionStatus.getStatus() === ConnectionStatus.CONNECTED;
    const lastError = this.connectionStatus.lastError()

    return this.getStatus(key, isHealthy, { lastError });
  }
}

ToDo

  • [ ] Add linter
  • [ ] Add classes documentation
  • [ ] Implement github actions
  • [ ] Collect coverage
  • [ ] Provide example app

@aarek/nest-stan

Table of Contents

Description

Wrapper around node-nats-streaming for NestJS.

Usage

Install

npm install --save @aarek/nest-stan

or

yarn add @aarek/nest-stan

Module initialization

@Module({
  imports: [
    NestStanModule.forRoot({
      clientId: 'client-id-test1',
      clusterId: 'test-cluster',
      stanOptions: {
        url: 'nats://localhost:4222'
      }
    })
  ]
})
export class AppModule {}

or

@Module({
  imports: [
    NestStanModule.forRootAsync({
      imports: [ConfigModule]
      inject: [ConfigService],
      useFactory: (config: ConfigService) => config.nestStanConfig(),
    }),
  ]
})
export class AppModule {}

Handling messages

Nest Stan gives you two decorators StanSubscribe and AsyncStanSubscribe. AsyncStanSubscribe will automatically acknowledge message after successful Promise resolve. Both decorators accepts subject as first argument and optional OptionsBuilder as second argument. OptionsBuilder is a function accepting node-nats-streaming SubscriptionOptions and should return SubscriptionOptions.

ex.:

@StanSubscribe('subject', options =>
  options.setStartAtTimeDelta(30 * 1000),
)
export class Subscriber implements IStanSubscriber<Message> {
  handle(message: Message, context: IMessageHandlerContext): void {
    // Handle message
    return;
  }
}
@AsyncStanSubscribe('subject', options =>
  options
    .setStartAtTimeDelta(30 * 1000)
    .setDurableName(AsyncSubscriber.name),
)
export class AsyncSubscriber implements IStanSubscriber<Message> {
  async handle(message: Message, context: IMessageHandlerContext): Promise<void> {
    // Handle message
    return;
  }
}

Publishing messages

In order to publish messages you first need to import NestStanModule for subjects that you would like to publish to:

@Module({
  imports: [NestStanModule.forSubjects(['subject'])],
  controllers: [TestController],
  providers: [],
})
export class TestModule {}

Nest Stan provides you with InjectPublisher decorator to inject publishers:

@Controller()
class TestController {
  constructor(
    @InjectPublisher('subject') private readonly publisher: IStanPublisher<Message>,
  ) {}

  @Post()
  async publishMessage() {
    const message: Message = {
      foo: 'bar',
    }

    await this.publisher.publish(message)
  }
}

Connection status

In order to check what is the current status of connection you can use InjectConnectionStatusIndicator:

@Injectable()
export class StanConnectionIndicator extends HealthIndicator {
  constructor(
    @InjectConnectionStatusIndicator() private readonly connectionStatus: ConnectionStatusIndicator
  ) {
    super();
  }

  async isHealthy(key: string): Promise<HealthIndicatorResult> {
    const isHealthy = this.connectionStatus.getStatus() === ConnectionStatus.CONNECTED;
    const lastError = this.connectionStatus.lastError()

    return this.getStatus(key, isHealthy, { lastError });
  }
}

ToDo

  • [ ] Add linter
  • [ ] Add classes documentation
  • [ ] Implement github actions
  • [ ] Collect coverage
  • [ ] Provide example app
    我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
    原文