@aarek/nest-stan 中文文档教程
@aarek/nest-stan
Table of Contents
- Description
- Usage
- Install
- Module Initialization
- Handling Messages
- Publishing Messages
- Connection Status
- ToDo
Description
包装 node-nats-streaming 用于 NestJS.
Usage
Install
npm install --save @aarek/nest-stan
或
yarn add @aarek/nest-stan
Module initialization
@Module({
imports: [
NestStanModule.forRoot({
clientId: 'client-id-test1',
clusterId: 'test-cluster',
stanOptions: {
url: 'nats://localhost:4222'
}
})
]
})
export class AppModule {}
或
@Module({
imports: [
NestStanModule.forRootAsync({
imports: [ConfigModule]
inject: [ConfigService],
useFactory: (config: ConfigService) => config.nestStanConfig(),
}),
]
})
export class AppModule {}
Handling messages
Nest Stan 为您提供两个装饰器 StanSubscribe
和<代码>AsyncStanSubscribe。 AsyncStanSubscribe
将在 Promise
解析成功后自动确认消息。 两个装饰器都接受 subject
作为第一个参数和可选的 OptionsBuilder
作为第二个参数。 OptionsBuilder
是一个接受 node-nats-streaming
SubscriptionOptions
的函数,应该返回 SubscriptionOptions
。
例如:
@StanSubscribe('subject', options =>
options.setStartAtTimeDelta(30 * 1000),
)
export class Subscriber implements IStanSubscriber<Message> {
handle(message: Message, context: IMessageHandlerContext): void {
// Handle message
return;
}
}
@AsyncStanSubscribe('subject', options =>
options
.setStartAtTimeDelta(30 * 1000)
.setDurableName(AsyncSubscriber.name),
)
export class AsyncSubscriber implements IStanSubscriber<Message> {
async handle(message: Message, context: IMessageHandlerContext): Promise<void> {
// Handle message
return;
}
}
Publishing messages
为了发布消息,您首先需要为要发布的主题导入 NestStanModule
:
@Module({
imports: [NestStanModule.forSubjects(['subject'])],
controllers: [TestController],
providers: [],
})
export class TestModule {}
Nest Stan 为您提供 InjectPublisher
装饰器以注入发布者:
@Controller()
class TestController {
constructor(
@InjectPublisher('subject') private readonly publisher: IStanPublisher<Message>,
) {}
@Post()
async publishMessage() {
const message: Message = {
foo: 'bar',
}
await this.publisher.publish(message)
}
}
Connection status
在为了检查当前的连接状态,您可以使用 InjectConnectionStatusIndicator
:
@Injectable()
export class StanConnectionIndicator extends HealthIndicator {
constructor(
@InjectConnectionStatusIndicator() private readonly connectionStatus: ConnectionStatusIndicator
) {
super();
}
async isHealthy(key: string): Promise<HealthIndicatorResult> {
const isHealthy = this.connectionStatus.getStatus() === ConnectionStatus.CONNECTED;
const lastError = this.connectionStatus.lastError()
return this.getStatus(key, isHealthy, { lastError });
}
}
ToDo
- [ ] Add linter
- [ ] Add classes documentation
- [ ] Implement github actions
- [ ] Collect coverage
- [ ] Provide example app
@aarek/nest-stan
Table of Contents
- Description
- Usage
- Install
- Module Initialization
- Handling Messages
- Publishing Messages
- Connection Status
- ToDo
Description
Wrapper around node-nats-streaming for NestJS.
Usage
Install
npm install --save @aarek/nest-stan
or
yarn add @aarek/nest-stan
Module initialization
@Module({
imports: [
NestStanModule.forRoot({
clientId: 'client-id-test1',
clusterId: 'test-cluster',
stanOptions: {
url: 'nats://localhost:4222'
}
})
]
})
export class AppModule {}
or
@Module({
imports: [
NestStanModule.forRootAsync({
imports: [ConfigModule]
inject: [ConfigService],
useFactory: (config: ConfigService) => config.nestStanConfig(),
}),
]
})
export class AppModule {}
Handling messages
Nest Stan gives you two decorators StanSubscribe
and AsyncStanSubscribe
. AsyncStanSubscribe
will automatically acknowledge message after successful Promise
resolve. Both decorators accepts subject
as first argument and optional OptionsBuilder
as second argument. OptionsBuilder
is a function accepting node-nats-streaming
SubscriptionOptions
and should return SubscriptionOptions
.
ex.:
@StanSubscribe('subject', options =>
options.setStartAtTimeDelta(30 * 1000),
)
export class Subscriber implements IStanSubscriber<Message> {
handle(message: Message, context: IMessageHandlerContext): void {
// Handle message
return;
}
}
@AsyncStanSubscribe('subject', options =>
options
.setStartAtTimeDelta(30 * 1000)
.setDurableName(AsyncSubscriber.name),
)
export class AsyncSubscriber implements IStanSubscriber<Message> {
async handle(message: Message, context: IMessageHandlerContext): Promise<void> {
// Handle message
return;
}
}
Publishing messages
In order to publish messages you first need to import NestStanModule
for subjects that you would like to publish to:
@Module({
imports: [NestStanModule.forSubjects(['subject'])],
controllers: [TestController],
providers: [],
})
export class TestModule {}
Nest Stan provides you with InjectPublisher
decorator to inject publishers:
@Controller()
class TestController {
constructor(
@InjectPublisher('subject') private readonly publisher: IStanPublisher<Message>,
) {}
@Post()
async publishMessage() {
const message: Message = {
foo: 'bar',
}
await this.publisher.publish(message)
}
}
Connection status
In order to check what is the current status of connection you can use InjectConnectionStatusIndicator
:
@Injectable()
export class StanConnectionIndicator extends HealthIndicator {
constructor(
@InjectConnectionStatusIndicator() private readonly connectionStatus: ConnectionStatusIndicator
) {
super();
}
async isHealthy(key: string): Promise<HealthIndicatorResult> {
const isHealthy = this.connectionStatus.getStatus() === ConnectionStatus.CONNECTED;
const lastError = this.connectionStatus.lastError()
return this.getStatus(key, isHealthy, { lastError });
}
}
ToDo
- [ ] Add linter
- [ ] Add classes documentation
- [ ] Implement github actions
- [ ] Collect coverage
- [ ] Provide example app