在Springboot应用程序中使用KAFKA的AVRO模式对象,然后将消息存储在数据库中时,我需要映射类吗?
我有一个Springboot应用程序,该应用程序将听一个Kafka主题,该主题传递了符合Avro模式的消息。 Springboot应用程序将负责从Kafka获取这些消息,并将它们存储在Postgres数据库中。 KAFKA消息上的字段将几乎1比1映射到数据库列。
现在,我们拥有一个映射类别的映射类别,该类别将AVRO对象进行化,并将其映射到POJO,然后使用该POJO将其映射到DB。这一切都像现在写的那样感觉非常手动,我想我只是想知道它是否真的需要,或者是否有一些弹簧/AVRO自动魔术方法?这是我们的映射器类中的代码,可以使您对我所指的内容有所了解:
public class AppointmentMapper {
final static String NO_DESCRIPTION="";
final static String NO_FACILITY="";
public static AppointmentDTO mapToApi(Appointment appointment){
return AppointmentDTO.builder()
.status(appointment.getStatus())
.consumerId(appointment.getConsumerId())
.providerId(appointment.getProviderId())
.startDate(appointment.getStartDate())
.endDate(appointment.getEndDate())
.duration(appointment.getDuration())
.businessId(appointment.getBusinessId())
.appointmentId(appointment.getAppointmentId())
.facilityId(appointment.getFacilityId())
.description(appointment.getDescription())
.officeId(appointment.getOfficeId())
.pmsId(appointment.getPmsId())
.build();
}
public static Appointment mapToDb(com.kafka.avro.model.appointment.Appointment avroAppointment) {
Appointment appointment = new Appointment();
appointment.setAppointmentId(avroAppointment.getBase().getGuid());
appointment.setConsumerId(avroAppointment.getConsumerGuid().orElse(null));
if(avroAppointment.getStatus().isPresent()){
AppointmentStatus status = AppointmentStatus.valueOf(avroAppointment.getStatus().get().name());
appointment.setStatus(status);
}else{
appointment.setStatus(AppointmentStatus.UNSPECIFIED);
}
appointment.setPmsId(avroAppointment.getBase().getPmsId());
appointment.setDescription(avroAppointment.getDescription().orElse(NO_DESCRIPTION));
appointment.setOfficeId(avroAppointment.getOfficeId().orElse(null));
appointment.setProviderId(avroAppointment.getProvider().orElse(null));
appointment.setBusinessId(avroAppointment.getBase().getBusinessGuid());
avroAppointment.getAppointmentCodes().ifPresent(appointmentCodes -> appointmentCodes.forEach(appointmentCode -> appointment.getAppointmentProcedures().add(mapToProcedure(appointmentCode))));
avroAppointment.getDate().ifPresent(instant ->{
ZonedDateTime startDate = ZonedDateTime.ofInstant(instant, ZoneId.of("UTC"));
ZonedDateTime endDate = null;
int duration = avroAppointment.getDuration().orElse(0);
appointment.setDuration(duration);
appointment.setStartDate(startDate);
if(duration>0){
endDate = startDate.plus(duration, TimeUnit.HOURS.toChronoUnit());
}
appointment.setEndDate(endDate);
});
appointment.setFacilityId(avroAppointment.getFacilityId().orElse(NO_FACILITY));
return appointment;
}
public static Appointment mapToDb(AppointmentDTO appointmentDTO) {
Appointment appointment = new Appointment();
appointment.setAppointmentId(appointmentDTO.getAppointmentId());
appointment.setEndDate(appointmentDTO.getEndDate());
appointment.setStartDate(appointmentDTO.getStartDate());
appointment.setPmsId(appointmentDTO.getPmsId());
appointment.setFacilityId(appointmentDTO.getFacilityId());
appointment.setBusinessId(appointmentDTO.getBusinessId());
appointment.setConsumerId(appointmentDTO.getConsumerId());
appointment.setDuration(appointmentDTO.getDuration());
appointment.setOfficeId(appointmentDTO.getOfficeId());
appointment.setStatus(appointmentDTO.getStatus());
appointment.setProviderId(appointmentDTO.getProviderId());
return appointment;
}
}
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。
绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论
评论(1)
简短答案 - 是的,但是只有使用与模式注册表相反。这利用Connect API的内部架构/结构模型来创建数据库DDL语句。它不仅适用于Avro。
如果主题中的数据未完全映射到数据库中所需的格式,则您仍然可以使用Spring-Kafka / Spring-Cloud Streams消费&amp;将数据映射到连接接收器最终将写入的输出主题。或者,您可以在Connect API中使用内置 /自定义< / code>转换< / code>。
否则,如果您坚持使用Spring和您自己的代码,则需要(重新)自己构建该ORM层(以及任何数据库客户端处理,批处理,错误处理等)。 ModelMapper 是一个有用的Java库,我发现用于将事件模型转换为域/持久性模型。
Short answer - Yes, but only if you use the Kafka Connect JDBC Sink instead with a Schema Registry. This utilizes Connect API's internal Schema/Struct models to create database DDL statements. It'll work for more than just Avro, too.
If your data in the topic doesn't map exactly to the format that you want in the database, you can still use Spring-Kafka / Spring-Cloud Streams to consume & map the data to an output topic that the Connect Sink will eventually write. Or, you can use built-in / custom
transforms
in the Connect API.Otherwise, if you insist on using Spring and your own code, you need to (re)build that ORM layer yourself (as well as any database client handling, batching, error-processing, etc.). ModelMapper is a useful Java library, I've found for converting your event models into domain/persistence models.