Scala火花流。获取任务而不是序列化错误|如何在Scala Spark流中序列化用户定义的类/功能
大家好,我正在与Kafka + Spark Streaming一起使用JSON图书馆。 我需要弄平传入的复杂Kafka消息(JSON String)。 示例传入KAFKA消息 """{"name" : "s…
在多种条件下滤除火花数据帧行的更好/有效的方法
我在下面有一个数据框, id pub_date version unique_id c_id p_id type source lni001 20220301 1 64WP-UI-POLI 002 P02 org internet lni001 202203…
Scala 3:为内类定义一个类型实例
我正在尝试定义内部类的类型实例。 作为一个最小的示例,我的代码看起来像这样: trait Typeclass[T] class Outer: class Inner 我想定义 typeclass …
选择并使用适当的解码器作为新项目的解码器,同时保留错误累积
如果我有一些逻辑需要分歧的程度,因此需要选择一组可能使用的解码器中的1组,则可能会出现此问题。 考虑此代码段: import cats.implicits.{catsSynt…
Scala添加_2 s在元组列表中
我在Scala中有以下可变的hashmap: HashMap((b,3), (c,4), (a,8), (a,2)) 需要转换为以下内容: HashMap((b,3), (c,4), (a,10)) 我需要诸如dearsbykey…
获取案例类参数类型作为HLIST
我正在尝试使用无形的 这用于生成 foo foo foo case class Foo(x: Int, y: String) class Context { val random = new Random() } def genInt(context…
相当于火花数据框的“ takewhile”
我有一个看起来像这样的数据框: scala> val df = Seq((1,.5), (2,.3), (3,.9), (4,.0), (5,.6), (6,.0)).toDF("id", "x") scala> df.show() +---+---…
Scala测试:如何在不进行硬编码的情况下安全地断言异常消息?
我有以下代码,该代码曾在Spark DataFrame中用于(SHA)哈希列: import org.apache.spark.sql.DataFrame import org.apache.spark.sql.functions.{sh…
如何从DataStream Scala+ apache flink
我正在从Confluent的Kafka主题中获得AVRO的回应,当我想应对响应时,我正面临问题。不了解我应该如何定义avro deserializer并在阅读时使用在kafka源中…
Spark RDD地图如何到Cassandra桌子?
我是Spark的新手,最近我看到一个代码将RDD格式的数据保存到Cassandra Table。但是我无法弄清楚它如何进行列映射。它都不使用案例类,也指定了以下代…
我可以使用Typeclasses获得非混合子类,共享联合收割机的实现以及返回子类类型吗?
猫和狗都是宠物,因此有一个年龄。我想定义一个选择方法,给定两只猫或两只狗,都会选择最古老的宠物,但是当cat 和同时选择哪种宠物出现狗。 用类型…
在DataFrame中动态替换嵌套字段名称中的特殊字符--- Spark Scala
我有一个事件作为JSON,它将转换为数据框架进行处理。这些事件深深地嵌套了,最近我收到的事件具有具有小数数字的字段名称,例如0.5,0.75。 这导致我…