如何在 C# 代码中使用 Amazon MSK 将消息发布到 Kafka

发布于 2025-01-15 07:47:38 字数 1858 浏览 3 评论 0原文

我正在创建 ac# 应用程序来向 Kafka 发布消息。在我的应用程序的当前版本中,我使用 docker(confluenceinc-cp 和 confluenceinc-zookeeper)在本地设置集群。然而,为了高效运行 Kafka,我们决定使用 Managed Streaming Apache Kafka (MSK) 来运行云原生 kafka 集群。我们的数据工程团队已经创建了一个集群。已经有一个专用网络(引导服务器)的端点和 Zookeeper 的明文 url。

如何更改 ProducerConfig 以使用 MSK 而不是 localhost

我的生产者代码如下所示:

public class ProducerHostedService : IHostedService
{
    private readonly ILogger<ProducerHostedService> _logger;
    private readonly IProducer<Null, string> _producer;

    public ProducerHostedService(ILogger<ProducerHostedService> logger)
    {
        _logger = logger;
        var config = new ProducerConfig
        {
            //SecurityProtocol = SecurityProtocol.Ssl,
            
            BootstrapServers = "localhost:9092"
        };
        _producer = new ProducerBuilder<Null, string>(config).Build();
    }
    
    public async Task StartAsync(CancellationToken cancellationToken)
    {
        for (var i = 0; i < 100; ++i)
        {
          var order = new OrderRequest()
                      {
                          CustomerId = i,
                          ProductId = i,
                          OrderId = i,
                          Quantity = 1,
                          Status = "New"
                      };
            string message = JsonSerializer.Serialize(order);
            _logger.LogInformation(message);
            // ProduceAsync creates a topic if not exists
            await _producer.ProduceAsync("test", new Message<Null, string>()
            {
                Value = message
            }, cancellationToken);
        }

        _producer.Flush(TimeSpan.FromSeconds(10));
    }

    public Task StopAsync(CancellationToken cancellationToken)
    {
        _producer?.Dispose();
        return Task.CompletedTask;
    }
}

I am creating a c# application to publish a message to Kafka. In the current version of my application I set up cluster locally using docker (confluentinc-cp and confluentinc-zookeeper). However to run Kafka efficiently, we decided to use Managed Streaming Apache Kafka (MSK) to run a Cloud native kafka cluster. There's already a cluster created by our data-engineering team. There's already an endpoint for private network (bootstrap server) and plaintext url for Zookeeper.

How can I change my ProducerConfig to use MSK instead of localhost

My producer code looks like this:

public class ProducerHostedService : IHostedService
{
    private readonly ILogger<ProducerHostedService> _logger;
    private readonly IProducer<Null, string> _producer;

    public ProducerHostedService(ILogger<ProducerHostedService> logger)
    {
        _logger = logger;
        var config = new ProducerConfig
        {
            //SecurityProtocol = SecurityProtocol.Ssl,
            
            BootstrapServers = "localhost:9092"
        };
        _producer = new ProducerBuilder<Null, string>(config).Build();
    }
    
    public async Task StartAsync(CancellationToken cancellationToken)
    {
        for (var i = 0; i < 100; ++i)
        {
          var order = new OrderRequest()
                      {
                          CustomerId = i,
                          ProductId = i,
                          OrderId = i,
                          Quantity = 1,
                          Status = "New"
                      };
            string message = JsonSerializer.Serialize(order);
            _logger.LogInformation(message);
            // ProduceAsync creates a topic if not exists
            await _producer.ProduceAsync("test", new Message<Null, string>()
            {
                Value = message
            }, cancellationToken);
        }

        _producer.Flush(TimeSpan.FromSeconds(10));
    }

    public Task StopAsync(CancellationToken cancellationToken)
    {
        _producer?.Dispose();
        return Task.CompletedTask;
    }
}

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。
列表为空,暂无数据
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文