kafka流连接两个特定的avro对象

发布于 2025-01-22 11:27:42 字数 14089 浏览 2 评论 0原文

基本任务

我在avro格式的Kafka中有2个相同的流。我正在尝试使用这两个流的基本左键。

对于两个主题中的键,我都使用时间戳圆形到毫秒到毫秒,因为这两个流都来自IoT设备的数据,该数据源自IoT设备,该数据正好每20ms每20ms生成一次测量,并且两个设备现在都将其同步到UTC时间。

到目前为止完成了

我已经能够生产Kafka流,该流正在将一个流转换为主题之后 this 教程,但不幸的是基本 tream-stream join 在Confluent开发人员页面上不存在教程。

Avro Java序列化类 我已经基于2个输入和输出生成了3个特定的avroserde类。 尽管输入流是相同的,但我创建了单独的模式/类,如果将来会有不同的模式。 AVRO JAVA课程是在构建时间白色问题期间生成的。

因此,这是输入,输出和加入流的模式:

 {
    "namespace": "pmu.serialization.avro",
    "name": "RawPMU_214",
    "type": "record",
    "fields": [
     {"name": "pmu_id", "type": "int"},
            {"name": "time", "type":"string"},
            {"name": "time_rounded", "type":"string"},
            {"name": "stream_id","type":"int"},
            {"name": "stat", "type":"string"},
            {"name": "ph_i1_r","type":"float"},
            {"name": "ph_i1_j","type":"float"},
            {"name": "ph_i2_r","type":"float"},
            {"name": "ph_i2_j","type":"float"},
            {"name": "ph_i3_r","type":"float"},
            {"name": "ph_i3_j","type":"float"},
            {"name": "ph_v4_r","type":"float"},
            {"name": "ph_v4_j","type":"float"},
            {"name": "ph_v5_r","type":"float"},
            {"name": "ph_v5_j","type":"float"},
            {"name": "ph_v6_r","type":"float"},
            {"name": "ph_v6_j","type":"float"},
            {"name": "ph_7_r","type":"float"},
            {"name": "ph_7_j","type":"float"},
            {"name": "ph_8_r","type":"float"},
            {"name": "ph_8_j","type":"float"},
            {"name": "analog","type":"string"},
            {"name": "digital","type":"string"},
            {"name": "frequency","type":"float"},
            {"name": "rocof","type":"int"},
            {"name": "orderCount","type":"int"}
    ]
 }

代码

关键问题是我不知道如何使用Value Joiner正确实现此部分:

    KStream<String, RawPMU_Joined> joinedPMU = rawPMUs_214.join(rawPMUs_218,
            (leftValue, rightValue) -> "left=" + leftValue + ", right=" + rightValue, /* ValueJoiner */

我尝试了各种答案,但我并没有真正找到特定的avroserde的流程流的完整Java代码的任何示例。

此时的完整代码:

package io.confluent.developer;
import io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.*;
import org.apache.kafka.streams.kstream.JoinWindows;
import org.apache.kafka.streams.kstream.Joined;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.Produced;

import pmu.serialization.avro.RawPMU_214;
import pmu.serialization.avro.RawPMU_218;
import pmu.serialization.avro.RawPMU_Joined;

import java.time.Duration;
import java.io.FileInputStream;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;

public class TransformStream_join {

    public Topology buildTopology(Properties allProps) {
        final StreamsBuilder builder = new StreamsBuilder();

        // Define input PMU topics
        final String inputPMU_01 = allProps.getProperty("input.topic.pmu1");
        final String inputPMU_02 = allProps.getProperty("input.topic.pmu1");
        final String outputTopic = allProps.getProperty("output.topic.name");

        KStream<String, RawPMU_214> rawPMUs_214 = builder.stream(inputPMU_01);
        KStream<String, RawPMU_218> rawPMUs_218 = builder.stream(inputPMU_02);

        KStream<String, RawPMU_Joined> joinedPMU = rawPMUs_214.join(rawPMUs_218,
                (leftValue, rightValue) -> "left=" + leftValue + ", right=" + rightValue, /* ValueJoiner */
                JoinWindows.of(Duration.ofMillis(20)),
                Joined.with(
                        Serdes.String(),
                        raw_pmu214AvroSerde(allProps),
                        raw_pmu218AvroSerde(allProps))
                );

    joinedPMU.to(outputTopic, Produced.with(Serdes.String(), raw_outAvroSerde(allProps)));
          return builder.build();
    }

    private SpecificAvroSerde<RawPMU_214> raw_pmu214AvroSerde(Properties allProps) {
        SpecificAvroSerde<RawPMU_214> raw_pmu214AvroSerde = new SpecificAvroSerde<>();
        raw_pmu214AvroSerde.configure((Map)allProps, false);
        return raw_pmu214AvroSerde;
    }

    private SpecificAvroSerde<RawPMU_218> raw_pmu218AvroSerde(Properties allProps) {
        SpecificAvroSerde<RawPMU_218> raw_pmu218AvroSerde = new SpecificAvroSerde<>();
        raw_pmu218AvroSerde.configure((Map)allProps, false);
        return raw_pmu218AvroSerde;
    }

    private SpecificAvroSerde<RawPMU_Joined> raw_outAvroSerde(Properties allProps) {
        SpecificAvroSerde<RawPMU_Joined> raw_outAvroSerde = new SpecificAvroSerde<>();
        raw_outAvroSerde.configure((Map)allProps, false);
        return raw_outAvroSerde;
    }

    public void createTopics(Properties allProps) {
        AdminClient client = AdminClient.create(allProps);

        List<NewTopic> topics = new ArrayList<>();

        topics.add(new NewTopic(
                allProps.getProperty("input.topic.pmu1"),
                Integer.parseInt(allProps.getProperty("input.topic.partitions")),
                Short.parseShort(allProps.getProperty("input.topic.replication.factor"))));

        topics.add(new NewTopic(
                allProps.getProperty("input.topic.pmu2"),
                Integer.parseInt(allProps.getProperty("input.topic.partitions")),
                Short.parseShort(allProps.getProperty("input.topic.replication.factor"))));

        topics.add(new NewTopic(
                allProps.getProperty("output.topic.name"),
                Integer.parseInt(allProps.getProperty("output.topic.partitions")),
                Short.parseShort(allProps.getProperty("output.topic.replication.factor"))));

        client.createTopics(topics);
        client.close();
    }

    public Properties loadEnvProperties(String fileName) throws IOException {
        Properties allProps = new Properties();
        FileInputStream input = new FileInputStream(fileName);
        allProps.load(input);
        input.close();

        return allProps;
    }

    public static void main(String[] args) throws Exception {
        if (args.length < 1) {
            throw new IllegalArgumentException("This program takes one argument: the path to an environment configuration file.");
        }

        TransformStream ts = new TransformStream();
        Properties allProps = ts.loadEnvProperties(args[0]);
        allProps.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        allProps.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, SpecificAvroSerde.class);
        Topology topology = ts.buildTopology(allProps);

        ts.createTopics(allProps);

        final KafkaStreams streams = new KafkaStreams(topology, allProps);
        final CountDownLatch latch = new CountDownLatch(1);

        // Attach shutdown handler to catch Control-C.
        Runtime.getRuntime().addShutdownHook(new Thread("streams-shutdown-hook") {
            @Override
            public void run() {
                streams.close(Duration.ofSeconds(5));
                latch.countDown();
            }
        });

        try {
            streams.start();
            latch.await();
        } catch (Throwable e) {
            System.exit(1);
        }
        System.exit(0);
    }
}

编辑

kstream加入: 自从我创建Joiner类

KStream<String, RawPMU_Joined> joinedPMU = pmu214Stream.join(pmu218Stream, pmuJoiner,
        JoinWindows.of(Duration.ofMillis(20)),
        Joined.with(
                Serdes.String(),
                raw_pmu214AvroSerde(allProps),
                raw_pmu218AvroSerde(allProps))
);

pmujoiner class

package io.confluent.developer;
import org.apache.kafka.streams.kstream.ValueJoiner;
import pmu.serialization.avro.RawPMU_214; 
import pmu.serialization.avro.RawPMU_218; 
import pmu.serialization.avro.RawPMU_Joined; 
public class PMUJoiner implements ValueJoiner<RawPMU_218, RawPMU_214, RawPMU_Joined> {

    public RawPMU_Joined apply(RawPMU_218 pmu218Stream, RawPMU_214 pmu214Stream) {
        return RawPMU_Joined.newBuilder()
                // PMU 218
                .setTimeRounded1(pmu218Stream.getTimeRounded())
                .setOrderCount1(pmu218Stream.getOrderCount())
                .setPhI1R1(pmu218Stream.getPhI1R())
                .setPhI1J1(pmu218Stream.getPhI1J())
                .setPhI2R1(pmu218Stream.getPhI2R())
                .setPhI2J1(pmu218Stream.getPhI2J())
                .setPhI3R1(pmu218Stream.getPhI3R())
                .setPhI3J1(pmu218Stream.getPhI3J())
                .setPhV4R1(pmu218Stream.getPhV4R())
                .setPhV4J1(pmu218Stream.getPhV4J())
                .setPhV5R1(pmu218Stream.getPhV5R())
                .setPhV5J1(pmu218Stream.getPhV5J())
                .setPhV6R1(pmu218Stream.getPhV6R())
                .setPhV6J1(pmu218Stream.getPhV6J())
                .setPh7R1(pmu218Stream.getPh7R())
                .setPh7J1(pmu218Stream.getPh7J())
                .setPh8R1(pmu218Stream.getPh8R())
                .setPh8J1(pmu218Stream.getPh8J())
                //PMU 214
                .setTimeRounded2(pmu214Stream.getTimeRounded())
                .setOrderCount2(pmu214Stream.getOrderCount())
                .setPhI1R2(pmu214Stream.getPhI1R())
                .setPhI1J2(pmu214Stream.getPhI1J())
                .setPhI2R2(pmu214Stream.getPhI2R())
                .setPhI2J2(pmu214Stream.getPhI2J())
                .setPhI3R2(pmu214Stream.getPhI3R())
                .setPhI3J2(pmu214Stream.getPhI3J())
                .setPhV4R2(pmu214Stream.getPhV4R())
                .setPhV4J2(pmu214Stream.getPhV4J())
                .setPhV5R2(pmu214Stream.getPhV5R())
                .setPhV5J2(pmu214Stream.getPhV5J())
                .setPhV6R2(pmu214Stream.getPhV6R())
                .setPhV6J2(pmu214Stream.getPhV6J())
                .setPh7R2(pmu214Stream.getPh7R())
                .setPh7J2(pmu214Stream.getPh7J())
                .setPh8R2(pmu214Stream.getPh8R())
                .setPh8J2(pmu214Stream.getPh8J())
                .build();
    }
}

错误

以来,我已经简化了加入流代码

... pmustream01/src/main/java/io/contruent/contruent/developer/joinpmustreams.java:46: 错误:找不到适合的方法 join(org.apache.kafka.streams.kstream.kstream&lt; 。 kStream&lt; string,rawpmu_joined&gt;加入PMU = PMU214Stream.join(PMU218Stream, ^ 方法org.apache.kafka.kafka.streams.kstream.kstream。 kstream.valuejoiner&lt; super pmu.serialization.avro.rawpmu_214,?超级vo?扩展 vr&gt;,org.apache.kafka.streams.kstream.joinwindows)不适用 (无法推断类型变量(S)VO,VR (实际的和正式的论点列表在长度上有所不同) 方法org.apache.kafka.kafka.streams.kstream.kstream。 kstream.valuejoiner&lt; super pmu.serialization.avro.rawpmu_214,?超级vo?扩展 vr&gt;,org.apache.kafka.kafka.streams.kstream.joinwindows,org.apache.kafka.kafka.streams.kstreams.kstream.joined&lt; java.lang.lang.string,pmu.serialization.serialization.avro.rawpmu_214,vo&gt;) 不适用 (无法推断类型变量(S)VO,VR (参数不匹配; io.confluent.developer.pmujoiner不能转换为org.apache.kafka.kafka.streams.kstreams.kstream.valuejoiner&lt;? pmu.serialization.avro.rawpmu_214,?超级vo?扩展VR&gt;)) 方法org.apache.kafka.kafka.streams.kstream.kstream。 kstream.valuejoiner&lt; super pmu.serialization.avro.rawpmu_214,?超级vo?扩展 vr&gt;,org.apache.kafka.streams.kstream.koinwindows,org.apache.kafka.kafka.streams.kstreams.kstreams.kstream.stream.streamjoined&lt; java.lang.lang.string,pmu.serialization.avro.rawpmu_214,vo&gt;) 不适用 (无法推断类型变量(S)VO,VR (参数不匹配; io.confluent.developer.pmujoiner不能转换为org.apache.kafka.kafka.streams.kstreams.kstream.valuejoiner&lt;? pmu.serialization.avro.rawpmu_214,?超级vo?扩展VR&gt;)) 方法org.apache.kafka.kafka.streams.kstream.kstream。 kstream.valuejoiner&lt; super pmu.serialization.avro.rawpmu_214,?超级VT,?扩展VR&gt;)是 不适用 (无法推断类型变量(S)VT,VR (实际的和正式的论点列表在长度上有所不同) 方法org.apache.kafka.kafka.streams.kstream.kstream。 kstream.valuejoiner&lt; super pmu.serialization.avro.rawpmu_214,?超级VT,?扩展 vr&gt;,org.apache.kafka.streams.kstream.joined&lt; java.lang.string,pmu.serialization.avro.rawpmu_214,vt;) 不适用 (无法推断类型变量(S)VT,VR (实际的和正式的论点列表在长度上有所不同) 方法org.apache.kafka.streams.kstream.kstream.&lt; gk,gv,gv,rv&gt; join(org.apache.kafka.kafka.kafka.streams.kstreams.kstreams.klobalktable&globalktable&lt&lt&lt; gk,gt; KeyValueMapper&lt; Super Java.lang.String,? super pmu.serialization.avro.rawpmu_214,? 扩展gk&gt;,org.apache.kafka.streams.kstream.valuejoiner&lt;极好的 pmu.serialization.avro.rawpmu_214,?超级GV,?扩展RV&gt;)不是 适用的 (无法推断类型变量(S)GK,GV,RV (实际的和正式的论点列表在长度上有所不同) 方法org.apache.kafka.streams.kstream.kstream.&lt; gk,gv,gv,rv&gt; join(org.apache.kafka.kafka.kafka.streams.kstreams.kstreams.klobalktable&globalktable&lt&lt&lt; gk,gt; KeyValueMapper&lt; Super Java.lang.String,? super pmu.serialization.avro.rawpmu_214,? 扩展gk&gt;,org.apache.kafka.streams.kstream.valuejoiner&lt;极好的 pmu.serialization.avro.rawpmu_214,?超级GV,?扩展 rv&gt;,org.apache.kafka.streams.kstream.named)不适用 (无法推断类型变量(S)GK,GV,RV (参数不匹配; org.apache.kafka.streams.kstream.kstream.kstream&lt; java.lang.string,pmu.serialization.avro.rawpmu_218&gt; 不能转换为 org.apache.kafka.streams.kstream.globalktable&lt; gk,gv&gt;))

不知道为什么会发生这种情况,因为我相信我已经正确地提供了所有具有正确返回类型的参数。

Basic task

I have 2 identical streams in Kafka in Avro format. I'm trying to do basic left join with those 2 streams.

Keys

For keys in both topics I'm using timestamp rounded to milliseconds since both stream have data originating from IoT device that is generating measurement exactly every 20ms and both devices are synchronized to UTC Time Now.

Done so far

I've been able to produce Kafka stream that's transforming one stream to topic following this tutorial, but unfortunately basic stream-stream join tutorial doesn't exists on Confluent developer page.

Avro Java serialization classes
I've generated 3 SpecificAvroSerde classes based on 2 inputs and output.
Although input streams are identical I've created separate schemas/classes in case streams would have different schemas in future.
Avro Java classes are generated during build time whiteout problems.

So this is schema of input, output and Joined stream:

 {
    "namespace": "pmu.serialization.avro",
    "name": "RawPMU_214",
    "type": "record",
    "fields": [
     {"name": "pmu_id", "type": "int"},
            {"name": "time", "type":"string"},
            {"name": "time_rounded", "type":"string"},
            {"name": "stream_id","type":"int"},
            {"name": "stat", "type":"string"},
            {"name": "ph_i1_r","type":"float"},
            {"name": "ph_i1_j","type":"float"},
            {"name": "ph_i2_r","type":"float"},
            {"name": "ph_i2_j","type":"float"},
            {"name": "ph_i3_r","type":"float"},
            {"name": "ph_i3_j","type":"float"},
            {"name": "ph_v4_r","type":"float"},
            {"name": "ph_v4_j","type":"float"},
            {"name": "ph_v5_r","type":"float"},
            {"name": "ph_v5_j","type":"float"},
            {"name": "ph_v6_r","type":"float"},
            {"name": "ph_v6_j","type":"float"},
            {"name": "ph_7_r","type":"float"},
            {"name": "ph_7_j","type":"float"},
            {"name": "ph_8_r","type":"float"},
            {"name": "ph_8_j","type":"float"},
            {"name": "analog","type":"string"},
            {"name": "digital","type":"string"},
            {"name": "frequency","type":"float"},
            {"name": "rocof","type":"int"},
            {"name": "orderCount","type":"int"}
    ]
 }

Code

Key problem is that I don't know how to properly implement this part with value joiner:

    KStream<String, RawPMU_Joined> joinedPMU = rawPMUs_214.join(rawPMUs_218,
            (leftValue, rightValue) -> "left=" + leftValue + ", right=" + rightValue, /* ValueJoiner */

I tried various answers but I haven't really found any example of full Java code for stream-stream join for SpecificAvroSerde.

Full code at this point:

package io.confluent.developer;
import io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.*;
import org.apache.kafka.streams.kstream.JoinWindows;
import org.apache.kafka.streams.kstream.Joined;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.Produced;

import pmu.serialization.avro.RawPMU_214;
import pmu.serialization.avro.RawPMU_218;
import pmu.serialization.avro.RawPMU_Joined;

import java.time.Duration;
import java.io.FileInputStream;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;

public class TransformStream_join {

    public Topology buildTopology(Properties allProps) {
        final StreamsBuilder builder = new StreamsBuilder();

        // Define input PMU topics
        final String inputPMU_01 = allProps.getProperty("input.topic.pmu1");
        final String inputPMU_02 = allProps.getProperty("input.topic.pmu1");
        final String outputTopic = allProps.getProperty("output.topic.name");

        KStream<String, RawPMU_214> rawPMUs_214 = builder.stream(inputPMU_01);
        KStream<String, RawPMU_218> rawPMUs_218 = builder.stream(inputPMU_02);

        KStream<String, RawPMU_Joined> joinedPMU = rawPMUs_214.join(rawPMUs_218,
                (leftValue, rightValue) -> "left=" + leftValue + ", right=" + rightValue, /* ValueJoiner */
                JoinWindows.of(Duration.ofMillis(20)),
                Joined.with(
                        Serdes.String(),
                        raw_pmu214AvroSerde(allProps),
                        raw_pmu218AvroSerde(allProps))
                );

    joinedPMU.to(outputTopic, Produced.with(Serdes.String(), raw_outAvroSerde(allProps)));
          return builder.build();
    }

    private SpecificAvroSerde<RawPMU_214> raw_pmu214AvroSerde(Properties allProps) {
        SpecificAvroSerde<RawPMU_214> raw_pmu214AvroSerde = new SpecificAvroSerde<>();
        raw_pmu214AvroSerde.configure((Map)allProps, false);
        return raw_pmu214AvroSerde;
    }

    private SpecificAvroSerde<RawPMU_218> raw_pmu218AvroSerde(Properties allProps) {
        SpecificAvroSerde<RawPMU_218> raw_pmu218AvroSerde = new SpecificAvroSerde<>();
        raw_pmu218AvroSerde.configure((Map)allProps, false);
        return raw_pmu218AvroSerde;
    }

    private SpecificAvroSerde<RawPMU_Joined> raw_outAvroSerde(Properties allProps) {
        SpecificAvroSerde<RawPMU_Joined> raw_outAvroSerde = new SpecificAvroSerde<>();
        raw_outAvroSerde.configure((Map)allProps, false);
        return raw_outAvroSerde;
    }

    public void createTopics(Properties allProps) {
        AdminClient client = AdminClient.create(allProps);

        List<NewTopic> topics = new ArrayList<>();

        topics.add(new NewTopic(
                allProps.getProperty("input.topic.pmu1"),
                Integer.parseInt(allProps.getProperty("input.topic.partitions")),
                Short.parseShort(allProps.getProperty("input.topic.replication.factor"))));

        topics.add(new NewTopic(
                allProps.getProperty("input.topic.pmu2"),
                Integer.parseInt(allProps.getProperty("input.topic.partitions")),
                Short.parseShort(allProps.getProperty("input.topic.replication.factor"))));

        topics.add(new NewTopic(
                allProps.getProperty("output.topic.name"),
                Integer.parseInt(allProps.getProperty("output.topic.partitions")),
                Short.parseShort(allProps.getProperty("output.topic.replication.factor"))));

        client.createTopics(topics);
        client.close();
    }

    public Properties loadEnvProperties(String fileName) throws IOException {
        Properties allProps = new Properties();
        FileInputStream input = new FileInputStream(fileName);
        allProps.load(input);
        input.close();

        return allProps;
    }

    public static void main(String[] args) throws Exception {
        if (args.length < 1) {
            throw new IllegalArgumentException("This program takes one argument: the path to an environment configuration file.");
        }

        TransformStream ts = new TransformStream();
        Properties allProps = ts.loadEnvProperties(args[0]);
        allProps.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        allProps.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, SpecificAvroSerde.class);
        Topology topology = ts.buildTopology(allProps);

        ts.createTopics(allProps);

        final KafkaStreams streams = new KafkaStreams(topology, allProps);
        final CountDownLatch latch = new CountDownLatch(1);

        // Attach shutdown handler to catch Control-C.
        Runtime.getRuntime().addShutdownHook(new Thread("streams-shutdown-hook") {
            @Override
            public void run() {
                streams.close(Duration.ofSeconds(5));
                latch.countDown();
            }
        });

        try {
            streams.start();
            latch.await();
        } catch (Throwable e) {
            System.exit(1);
        }
        System.exit(0);
    }
}

EDIT

KStream join:
I've simplified joining stream code since I've created joiner class

KStream<String, RawPMU_Joined> joinedPMU = pmu214Stream.join(pmu218Stream, pmuJoiner,
        JoinWindows.of(Duration.ofMillis(20)),
        Joined.with(
                Serdes.String(),
                raw_pmu214AvroSerde(allProps),
                raw_pmu218AvroSerde(allProps))
);

PMUJoiner class

package io.confluent.developer;
import org.apache.kafka.streams.kstream.ValueJoiner;
import pmu.serialization.avro.RawPMU_214; 
import pmu.serialization.avro.RawPMU_218; 
import pmu.serialization.avro.RawPMU_Joined; 
public class PMUJoiner implements ValueJoiner<RawPMU_218, RawPMU_214, RawPMU_Joined> {

    public RawPMU_Joined apply(RawPMU_218 pmu218Stream, RawPMU_214 pmu214Stream) {
        return RawPMU_Joined.newBuilder()
                // PMU 218
                .setTimeRounded1(pmu218Stream.getTimeRounded())
                .setOrderCount1(pmu218Stream.getOrderCount())
                .setPhI1R1(pmu218Stream.getPhI1R())
                .setPhI1J1(pmu218Stream.getPhI1J())
                .setPhI2R1(pmu218Stream.getPhI2R())
                .setPhI2J1(pmu218Stream.getPhI2J())
                .setPhI3R1(pmu218Stream.getPhI3R())
                .setPhI3J1(pmu218Stream.getPhI3J())
                .setPhV4R1(pmu218Stream.getPhV4R())
                .setPhV4J1(pmu218Stream.getPhV4J())
                .setPhV5R1(pmu218Stream.getPhV5R())
                .setPhV5J1(pmu218Stream.getPhV5J())
                .setPhV6R1(pmu218Stream.getPhV6R())
                .setPhV6J1(pmu218Stream.getPhV6J())
                .setPh7R1(pmu218Stream.getPh7R())
                .setPh7J1(pmu218Stream.getPh7J())
                .setPh8R1(pmu218Stream.getPh8R())
                .setPh8J1(pmu218Stream.getPh8J())
                //PMU 214
                .setTimeRounded2(pmu214Stream.getTimeRounded())
                .setOrderCount2(pmu214Stream.getOrderCount())
                .setPhI1R2(pmu214Stream.getPhI1R())
                .setPhI1J2(pmu214Stream.getPhI1J())
                .setPhI2R2(pmu214Stream.getPhI2R())
                .setPhI2J2(pmu214Stream.getPhI2J())
                .setPhI3R2(pmu214Stream.getPhI3R())
                .setPhI3J2(pmu214Stream.getPhI3J())
                .setPhV4R2(pmu214Stream.getPhV4R())
                .setPhV4J2(pmu214Stream.getPhV4J())
                .setPhV5R2(pmu214Stream.getPhV5R())
                .setPhV5J2(pmu214Stream.getPhV5J())
                .setPhV6R2(pmu214Stream.getPhV6R())
                .setPhV6J2(pmu214Stream.getPhV6J())
                .setPh7R2(pmu214Stream.getPh7R())
                .setPh7J2(pmu214Stream.getPh7J())
                .setPh8R2(pmu214Stream.getPh8R())
                .setPh8J2(pmu214Stream.getPh8J())
                .build();
    }
}

Error

...pmuStream01/src/main/java/io/confluent/developer/JoinPMUStreams.java:46:
error: no suitable method found for
join(org.apache.kafka.streams.kstream.KStream<java.lang.String,pmu.serialization.avro.RawPMU_218>,io.confluent.developer.PMUJoiner,org.apache.kafka.streams.kstream.JoinWindows,org.apache.kafka.streams.kstream.Joined<java.lang.String,pmu.serialization.avro.RawPMU_214,pmu.serialization.avro.RawPMU_218>)
KStream<String, RawPMU_Joined> joinedPMU = pmu214Stream.join(pmu218Stream,
^
method org.apache.kafka.streams.kstream.KStream.<VO,VR>join(org.apache.kafka.streams.kstream.KStream<java.lang.String,VO>,org.apache.kafka.streams.kstream.ValueJoiner<?
super pmu.serialization.avro.RawPMU_214,? super VO,? extends
VR>,org.apache.kafka.streams.kstream.JoinWindows) is not applicable
(cannot infer type-variable(s) VO,VR
(actual and formal argument lists differ in length))
method org.apache.kafka.streams.kstream.KStream.<VO,VR>join(org.apache.kafka.streams.kstream.KStream<java.lang.String,VO>,org.apache.kafka.streams.kstream.ValueJoiner<?
super pmu.serialization.avro.RawPMU_214,? super VO,? extends
VR>,org.apache.kafka.streams.kstream.JoinWindows,org.apache.kafka.streams.kstream.Joined<java.lang.String,pmu.serialization.avro.RawPMU_214,VO>)
is not applicable
(cannot infer type-variable(s) VO,VR
(argument mismatch; io.confluent.developer.PMUJoiner cannot be converted to org.apache.kafka.streams.kstream.ValueJoiner<? super
pmu.serialization.avro.RawPMU_214,? super VO,? extends VR>))
method org.apache.kafka.streams.kstream.KStream.<VO,VR>join(org.apache.kafka.streams.kstream.KStream<java.lang.String,VO>,org.apache.kafka.streams.kstream.ValueJoiner<?
super pmu.serialization.avro.RawPMU_214,? super VO,? extends
VR>,org.apache.kafka.streams.kstream.JoinWindows,org.apache.kafka.streams.kstream.StreamJoined<java.lang.String,pmu.serialization.avro.RawPMU_214,VO>)
is not applicable
(cannot infer type-variable(s) VO,VR
(argument mismatch; io.confluent.developer.PMUJoiner cannot be converted to org.apache.kafka.streams.kstream.ValueJoiner<? super
pmu.serialization.avro.RawPMU_214,? super VO,? extends VR>))
method org.apache.kafka.streams.kstream.KStream.<VT,VR>join(org.apache.kafka.streams.kstream.KTable<java.lang.String,VT>,org.apache.kafka.streams.kstream.ValueJoiner<?
super pmu.serialization.avro.RawPMU_214,? super VT,? extends VR>) is
not applicable
(cannot infer type-variable(s) VT,VR
(actual and formal argument lists differ in length))
method org.apache.kafka.streams.kstream.KStream.<VT,VR>join(org.apache.kafka.streams.kstream.KTable<java.lang.String,VT>,org.apache.kafka.streams.kstream.ValueJoiner<?
super pmu.serialization.avro.RawPMU_214,? super VT,? extends
VR>,org.apache.kafka.streams.kstream.Joined<java.lang.String,pmu.serialization.avro.RawPMU_214,VT>)
is not applicable
(cannot infer type-variable(s) VT,VR
(actual and formal argument lists differ in length))
method org.apache.kafka.streams.kstream.KStream.<GK,GV,RV>join(org.apache.kafka.streams.kstream.GlobalKTable<GK,GV>,org.apache.kafka.streams.kstream.KeyValueMapper<?
super java.lang.String,? super pmu.serialization.avro.RawPMU_214,?
extends GK>,org.apache.kafka.streams.kstream.ValueJoiner<? super
pmu.serialization.avro.RawPMU_214,? super GV,? extends RV>) is not
applicable
(cannot infer type-variable(s) GK,GV,RV
(actual and formal argument lists differ in length))
method org.apache.kafka.streams.kstream.KStream.<GK,GV,RV>join(org.apache.kafka.streams.kstream.GlobalKTable<GK,GV>,org.apache.kafka.streams.kstream.KeyValueMapper<?
super java.lang.String,? super pmu.serialization.avro.RawPMU_214,?
extends GK>,org.apache.kafka.streams.kstream.ValueJoiner<? super
pmu.serialization.avro.RawPMU_214,? super GV,? extends
RV>,org.apache.kafka.streams.kstream.Named) is not applicable
(cannot infer type-variable(s) GK,GV,RV
(argument mismatch; org.apache.kafka.streams.kstream.KStream<java.lang.String,pmu.serialization.avro.RawPMU_218>
cannot be converted to
org.apache.kafka.streams.kstream.GlobalKTable<GK,GV>))

Don't know why's that happening since I believe I've properly supplied all arguments with correct return types.

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(1

黑寡妇 2025-01-29 11:27:42

我建议从此开始 - 一个接收两个AVRO对象并返回三分之一(可选的AVRO)的木器功能。

(leftValue, rightValue) -> {
    RawPMU_Joined j = new RawPMU_Joined();
    j.set...
    return j;
}

您可以在GitHub上的Contruent-expamples回购中遵循一些通用的AVRO示例;特定记录不需要一个,因为它只是您要返回的另一个对象,但是它不是字符串。

I'd suggest starting with this - a joiner function that accepts two Avro objects and returns a third (optionally, Avro) one.

(leftValue, rightValue) -> {
    RawPMU_Joined j = new RawPMU_Joined();
    j.set...
    return j;
}

There are generic Avro examples you can follow in the confluent-examples repo on Github; there shouldn't need to be one for specific records as it's just a different object you're returning, however it wouldn't be a string.

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文