scala连接kafka失败 如何解决?

发布于 2022-09-30 23:10:20 字数 1342 浏览 79 评论 0

按照官方文档的示例,写了以下程序:

def main(args: Array[String]):Unit = {

    val sparkConf: SparkConf = new SparkConf().setMaster("local[4]").setAppName("ylcx_miniApp_mall_recommendation");

    val streamingContext: StreamingContext = new StreamingContext(sparkConf,Seconds(5));

    val kafkaParams = Map[String, Object](
      "bootstrap.servers" -> "X.X.X.X:9092",
      "key.deserializer" -> classOf[StringDeserializer],
      "value.deserializer" -> classOf[StringDeserializer],
      "group.id" -> "user",
      "auto.offset.reset" -> "latest",
      "enable.auto.commit" -> (false: java.lang.Boolean)
    )

    val topics = Array("mallRec")

    val stream = KafkaUtils.createDirectStream[String, String](
      streamingContext,
      PreferConsistent,
      Subscribe[String, String](topics, kafkaParams)
    )

    stream.map(record => println(record.key+' '+record.value))
    stream.print()
    streamingContext.start();
    streamingContext.awaitTermination();
  }

运行以后会有以下错误:

21/09/28 12:18:26 WARN NetworkClient: [Consumer clientId=consumer-user-1, groupId=user] Bootstrap broker X.X.X.X:9092 (id: -1 rack: null) disconnected

安全组端口已经确认打开了

如何解决?

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。
列表为空,暂无数据
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文