在Springboot应用程序中使用KAFKA的AVRO模式对象,然后将消息存储在数据库中时,我需要映射类吗?

发布于 2025-01-24 16:09:27 字数 3963 浏览 0 评论 0 原文

我有一个Springboot应用程序,该应用程序将听一个Kafka主题,该主题传递了符合Avro模式的消息。 Springboot应用程序将负责从Kafka获取这些消息,并将它们存储在Postgres数据库中。 KAFKA消息上的字段将几乎1比1映射到数据库列。

现在,我们拥有一个映射类别的映射类别,该类别将AVRO对象进行化,并将其映射到POJO,然后使用该POJO将其映射到DB。这一切都像现在写的那样感觉非常手动,我想我只是想知道它是否真的需要,或者是否有一些弹簧/AVRO自动魔术方法?这是我们的映射器类中的代码,可以使您对我所指的内容有所了解:

public class AppointmentMapper {

    final static String NO_DESCRIPTION="";
    final static String NO_FACILITY="";

    public static AppointmentDTO mapToApi(Appointment appointment){
        return AppointmentDTO.builder()
                .status(appointment.getStatus())
                .consumerId(appointment.getConsumerId())
                .providerId(appointment.getProviderId())
                .startDate(appointment.getStartDate())
                .endDate(appointment.getEndDate())
                .duration(appointment.getDuration())
                .businessId(appointment.getBusinessId())
                .appointmentId(appointment.getAppointmentId())
                .facilityId(appointment.getFacilityId())
                .description(appointment.getDescription())
                .officeId(appointment.getOfficeId())
                .pmsId(appointment.getPmsId())
                .build();
    }

    public static Appointment mapToDb(com.kafka.avro.model.appointment.Appointment avroAppointment) {
        Appointment appointment = new Appointment();
        appointment.setAppointmentId(avroAppointment.getBase().getGuid());
        appointment.setConsumerId(avroAppointment.getConsumerGuid().orElse(null));
        if(avroAppointment.getStatus().isPresent()){
            AppointmentStatus status = AppointmentStatus.valueOf(avroAppointment.getStatus().get().name());
            appointment.setStatus(status);
        }else{
            appointment.setStatus(AppointmentStatus.UNSPECIFIED);
        }
        appointment.setPmsId(avroAppointment.getBase().getPmsId());
        appointment.setDescription(avroAppointment.getDescription().orElse(NO_DESCRIPTION));
        appointment.setOfficeId(avroAppointment.getOfficeId().orElse(null));
        appointment.setProviderId(avroAppointment.getProvider().orElse(null));
        appointment.setBusinessId(avroAppointment.getBase().getBusinessGuid());
        avroAppointment.getAppointmentCodes().ifPresent(appointmentCodes -> appointmentCodes.forEach(appointmentCode -> appointment.getAppointmentProcedures().add(mapToProcedure(appointmentCode))));
        avroAppointment.getDate().ifPresent(instant ->{
            ZonedDateTime startDate = ZonedDateTime.ofInstant(instant, ZoneId.of("UTC"));
            ZonedDateTime endDate = null;
            int duration = avroAppointment.getDuration().orElse(0);
            appointment.setDuration(duration);
            appointment.setStartDate(startDate);
            if(duration>0){
                endDate = startDate.plus(duration, TimeUnit.HOURS.toChronoUnit());
            }
            appointment.setEndDate(endDate);
        });
        appointment.setFacilityId(avroAppointment.getFacilityId().orElse(NO_FACILITY));
        return appointment;
    }

    public static Appointment mapToDb(AppointmentDTO appointmentDTO) {
       Appointment appointment = new Appointment();
       appointment.setAppointmentId(appointmentDTO.getAppointmentId());
       appointment.setEndDate(appointmentDTO.getEndDate());
       appointment.setStartDate(appointmentDTO.getStartDate());
       appointment.setPmsId(appointmentDTO.getPmsId());
       appointment.setFacilityId(appointmentDTO.getFacilityId());
       appointment.setBusinessId(appointmentDTO.getBusinessId());
       appointment.setConsumerId(appointmentDTO.getConsumerId());
       appointment.setDuration(appointmentDTO.getDuration());
       appointment.setOfficeId(appointmentDTO.getOfficeId());
       appointment.setStatus(appointmentDTO.getStatus());
       appointment.setProviderId(appointmentDTO.getProviderId());
       return appointment;
    }
}

I have a springboot application that will be listening to a Kafka topic which is passing messages that conform to an avro schema. The springboot app will be responsible for taking these messages from kafka and storing them in a postgres database. The fields on the kafka message map almost 1 to 1 to the database columns.

The way it's coding right now we have a mapper class that will deserialized avro object and map that to a POJO and then use that POJO to map to the db. This all feels very manual the way it's written right now and I guess I'm just wondering if it's actually needed or if there is some Spring/Avro auto-magical way of doing this? This is the code in our mapper class to give you some sense of what I'm referring to:

public class AppointmentMapper {

    final static String NO_DESCRIPTION="";
    final static String NO_FACILITY="";

    public static AppointmentDTO mapToApi(Appointment appointment){
        return AppointmentDTO.builder()
                .status(appointment.getStatus())
                .consumerId(appointment.getConsumerId())
                .providerId(appointment.getProviderId())
                .startDate(appointment.getStartDate())
                .endDate(appointment.getEndDate())
                .duration(appointment.getDuration())
                .businessId(appointment.getBusinessId())
                .appointmentId(appointment.getAppointmentId())
                .facilityId(appointment.getFacilityId())
                .description(appointment.getDescription())
                .officeId(appointment.getOfficeId())
                .pmsId(appointment.getPmsId())
                .build();
    }

    public static Appointment mapToDb(com.kafka.avro.model.appointment.Appointment avroAppointment) {
        Appointment appointment = new Appointment();
        appointment.setAppointmentId(avroAppointment.getBase().getGuid());
        appointment.setConsumerId(avroAppointment.getConsumerGuid().orElse(null));
        if(avroAppointment.getStatus().isPresent()){
            AppointmentStatus status = AppointmentStatus.valueOf(avroAppointment.getStatus().get().name());
            appointment.setStatus(status);
        }else{
            appointment.setStatus(AppointmentStatus.UNSPECIFIED);
        }
        appointment.setPmsId(avroAppointment.getBase().getPmsId());
        appointment.setDescription(avroAppointment.getDescription().orElse(NO_DESCRIPTION));
        appointment.setOfficeId(avroAppointment.getOfficeId().orElse(null));
        appointment.setProviderId(avroAppointment.getProvider().orElse(null));
        appointment.setBusinessId(avroAppointment.getBase().getBusinessGuid());
        avroAppointment.getAppointmentCodes().ifPresent(appointmentCodes -> appointmentCodes.forEach(appointmentCode -> appointment.getAppointmentProcedures().add(mapToProcedure(appointmentCode))));
        avroAppointment.getDate().ifPresent(instant ->{
            ZonedDateTime startDate = ZonedDateTime.ofInstant(instant, ZoneId.of("UTC"));
            ZonedDateTime endDate = null;
            int duration = avroAppointment.getDuration().orElse(0);
            appointment.setDuration(duration);
            appointment.setStartDate(startDate);
            if(duration>0){
                endDate = startDate.plus(duration, TimeUnit.HOURS.toChronoUnit());
            }
            appointment.setEndDate(endDate);
        });
        appointment.setFacilityId(avroAppointment.getFacilityId().orElse(NO_FACILITY));
        return appointment;
    }

    public static Appointment mapToDb(AppointmentDTO appointmentDTO) {
       Appointment appointment = new Appointment();
       appointment.setAppointmentId(appointmentDTO.getAppointmentId());
       appointment.setEndDate(appointmentDTO.getEndDate());
       appointment.setStartDate(appointmentDTO.getStartDate());
       appointment.setPmsId(appointmentDTO.getPmsId());
       appointment.setFacilityId(appointmentDTO.getFacilityId());
       appointment.setBusinessId(appointmentDTO.getBusinessId());
       appointment.setConsumerId(appointmentDTO.getConsumerId());
       appointment.setDuration(appointmentDTO.getDuration());
       appointment.setOfficeId(appointmentDTO.getOfficeId());
       appointment.setStatus(appointmentDTO.getStatus());
       appointment.setProviderId(appointmentDTO.getProviderId());
       return appointment;
    }
}

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(1

寄居者 2025-01-31 16:09:27

这样做的一些弹簧/AVRO自动魔术方法?

简短答案 - 是的,但是只有使用与模式注册表相反。这利用Connect API的内部架构/结构模型来创建数据库DDL语句。它不仅适用于Avro。

如果主题中的数据未完全映射到数据库中所需的格式,则您仍然可以使用Spring-Kafka / Spring-Cloud Streams消费&amp;将数据映射到连接接收器最终将写入的输出主题。或者,您可以在Connect API中使用内置 /自定义< / code>转换< / code>。


否则,如果您坚持使用Spring和您自己的代码,则需要(重新)自己构建该ORM层(以及任何数据库客户端处理,批处理,错误处理等)。 ModelMapper 是一个有用的Java库,我发现用于将事件模型转换为域/持久性模型。

some Spring/Avro auto-magical way of doing this?

Short answer - Yes, but only if you use the Kafka Connect JDBC Sink instead with a Schema Registry. This utilizes Connect API's internal Schema/Struct models to create database DDL statements. It'll work for more than just Avro, too.

If your data in the topic doesn't map exactly to the format that you want in the database, you can still use Spring-Kafka / Spring-Cloud Streams to consume & map the data to an output topic that the Connect Sink will eventually write. Or, you can use built-in / custom transforms in the Connect API.


Otherwise, if you insist on using Spring and your own code, you need to (re)build that ORM layer yourself (as well as any database client handling, batching, error-processing, etc.). ModelMapper is a useful Java library, I've found for converting your event models into domain/persistence models.

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文