logstash 同步一对多对多的关系数据如何处理啊
`input {
jdbc {
jdbc_driver_library => "F:/ELK/mysql-connector-java-5.1.46.jar"
jdbc_driver_class => "com.mysql.jdbc.Driver"
jdbc_connection_string => "jdbc:mysql://localhost:3306/newmedtiondb?useUnicode=true&useSSL=false"
jdbc_user => "root"
jdbc_password => "rS3m$6Bp"
jdbc_default_timezone => "Asia/Shanghai"
jdbc_pool_timeout => 5
jdbc_validate_connection => true
jdbc_validation_timeout => 3600
connection_retry_attempts => 3
connection_retry_attempts_wait_time => 3
id => "jdbc-meeting"
schedule => "* * * * *"
statement => "select * from xxx,xxx,xxx
"
lowercase_column_names => false
# clean_run => true
record_last_run => true
last_run_metadata_path => "syncpoint_table/meeting"
use_column_value => true
tracking_column => "startTime"
tracking_column_type => "timestamp"
}
}
filter {
aggregate {
task_id => "%{id}"
code => "
map['id'] = event.get('id')
map['meetingName'] = event.get('meetingName')
map['meetingShortName'] = event.get('meetingShortName')
map['meetingSchedule'] = event.get('meetingSchedule')
map['meetingAddress'] = event.get('meetingAddress')
map['keyDay'] = event.get('keyDay')
map['priority'] = event.get('priority')
map['startTime'] = event.get('startTime')
map['endTime'] = event.get('endTime')
map['publishStatus'] = event.get('publishStatus')
map['recommend'] = event.get('recommend')
map['latest'] = event.get('latest')
map['applyProxy'] = event.get('applyProxy')
map['hostOrganizer'] = event.get('hostOrganizer')
map['holdOrganizer'] = event.get('holdOrganizer')
map['remarks'] = event.get('remarks')
map['description'] = event.get('description')
map['creator'] = event.get('creator')
map['createDate'] = event.get('createDate')
map['updator'] = event.get('updator')
map['updateDate'] = event.get('updateDate')
map['stick'] = event.get('stick')
map['vote'] = event.get('vote')
map['canRegist'] = event.get('canRegist')
map['provideAccommodation'] = event.get('provideAccommodation')
map['keywords'] = event.get('keywords')
map['meetingSpeciality'] = event.get('meetingSpeciality')
map['qrcode'] = event.get('qrcode')
map['isIntervention'] = event.get('isIntervention')
map['siteType'] = event.get('siteType')
map['code'] = event.get('code')
map['hotIndex'] = event.get('hotIndex')
map['hits'] = event.get('hits')
map['province'] = event.get('province')
map['city'] = event.get('city')
map['views'] = event.get('views')
map['meetingDetail'] = {
'id' => event.get('meetingDetail.id'),
'mainPic' => event.get('meetingDetail.mainPic'),
'titlePic' => event.get('meetingDetail.titlePic'),
'twoDimensionCode' => event.get('meetingDetail.twoDimensionCode'),
'trafficInfo' => event.get('meetingDetail.trafficInfo'),
'diningInfo' => event.get('meetingDetail.diningInfo'),
'creditPoint' => event.get('meetingDetail.creditPoint'),
'noticeInfo' => event.get('meetingDetail.noticeInfo'),
'noticeAttach' => event.get('meetingDetail.noticeAttach'),
'venuePic' => event.get('meetingDetail.venuePic'),
'inviteAttach' => event.get('meetingDetail.inviteAttach'),
'pptAuthorize' => event.get('meetingDetail.pptAuthorize'),
'vedioAuthorize' => event.get('meetingDetail.vedioAuthorize'),
'remarks' => event.get('meetingDetail.remarks')
}
map['meetingAuthorIds'] ||= []
map['meetingAgendaIds'] ||= []
map['meetingFields.meetingAgenda'] ||= []
map['meetingFieldsIds'] ||= []
map['meetingFields'] ||= []
map['fieldAgentAuthorIds'] ||= []
if (event.get('meetingFields.id') != nil)
if !(map['fieldAgentAuthorIds'].include? event.get('meetingFields.id'))
map['meetingFieldsIds'] << event.get('meetingFields.id')
map['meetingFields'] << {
'id' => event.get('meetingFields.id'),
'meetingInfoId' => event.get('meetingFields.meetingInfoId'),
'meetingSubject' => event.get('meetingFields.meetingSubject'),
'holder' => event.get('meetingFields.holder'),
'startTime' => event.get('meetingFields.startTime'),
'endTime' => event.get('meetingFields.endTime'),
'time' => event.get('time'),
'videoCode' => event.get('meetingFields.videoCode'),
'pushUrl' => event.get('meetingFields.pushUrl'),
'address' => event.get('meetingFields.address'),
'weight' => event.get('meetingFields.weight'),
'gdLng' => event.get('meetingFields.gdLng'),
'gdLat' => event.get('meetingFields.gdLat'),
'bdLng' => event.get('meetingFields.bdLng'),
'bdLat' => event.get('meetingFields.bdLat'),
'remarks' => event.get('meetingFields.remarks'),
'creator' => event.get('meetingFields.creator'),
'createDate' => event.get('meetingFields.createDate'),
'updator' => event.get('meetingFields.updator'),
'updateDate' => event.get('meetingFields.updateDate'),
'comments' => event.get('meetingFields.comments'),
'views' => event.get('meetingFields.views'),
'chatroomId' => event.get('meetingFields.chatroomId'),
'isLogin' => event.get('meetingFields.isLogin'),
'isRealnameAuthentication' => event.get('meetingFields.isRealnameAuthentication'),
'messageCount' => event.get('meetingFields.messageCount'),
'likes' => event.get('meetingFields.likes'),
'iscomment' => event.get('meetingFields.iscomment'),
'condition' => event.get('meetingFields.condition'),
'recordStatus' => event.get('meetingFields.recordStatus'),
'manualStatus' => event.get('meetingFields.manualStatus'),
'auditMethod' => event.get('meetingFields.auditMethod'),
'meetingAgenda' => [] << {
'id' => event.get('meetingAgenda.id'),
'meetingFieldsId' => event.get('meetingAgenda.meetingFieldsId'),
'parentId' => event.get('meetingAgenda.parentId'),
'type' => event.get('meetingAgenda.type'),
'authorIds' => event.get('meetingAgenda.authorIds'),
'holder' => event.get('meetingAgenda.holder'),
'theme' => event.get('meetingAgenda.theme'),
'startTime' => event.get('meetingAgenda.startTime'),
'endTime' => event.get('meetingAgenda.endTime'),
'vid' => event.get('meetingAgenda.vid'),
'links' => event.get('meetingAgenda.links'),
'seq' => event.get('meetingAgenda.seq'),
'creator' => event.get('meetingAgenda.creator'),
'createDate' => event.get('meetingAgenda.createDate'),
'updator' => event.get('meetingAgenda.updator'),
'updateDate' => event.get('meetingAgenda.updateDate'),
'comments' => event.get('meetingAgenda.comments'),
'views' => event.get('meetingAgenda.views'),
'agendaIslogin' => event.get('meetingAgenda.agendaIslogin'),
'agendaIsrealnameAuthentication' => event.get('meetingAgenda.agendaIsrealnameAuthentication'),
'condition' => event.get('meetingAgenda.condition'),
'authors' => [] << {
'id' => event.get('author.id'),
'userId' => event.get('author.userId'),
'authorname' => event.get('author.authorname'),
'gender' => event.get('author.gender'),
'title' => event.get('author.title'),
'master' => event.get('author.master'),
'company' => event.get('author.company'),
'description' => event.get('author.description'),
'autorUrl' => event.get('author.autorUrl'),
'headImage' => event.get('author.headImage'),
'priority' => event.get('author.priority'),
'creator' => event.get('author.creator'),
'createDate' => event.get('author.createDate'),
'updator' => event.get('author.updator'),
'updateDate' => event.get('author.updateDate')
}
}
}
end
end
map['meetingAgendaIds'] ||= []
map['meetingAgenda'] ||= []
if (event.get('meetingAgenda.id') != nil)
if !(map['meetingAgendaIds'].include? event.get('meetingAgenda.id'))
map['meetingAgendaIds'] << event.get('meetingAgenda.id')
map['meetingAgenda'] << {
'id' => event.get('meetingAgenda.id'),
'meetingFieldsId' => event.get('meetingAgenda.meetingFieldsId'),
'parentId' => event.get('meetingAgenda.parentId'),
'type' => event.get('meetingAgenda.type'),
'authorIds' => event.get('meetingAgenda.authorIds'),
'holder' => event.get('meetingAgenda.holder'),
'theme' => event.get('meetingAgenda.theme'),
'startTime' => event.get('meetingAgenda.startTime'),
'endTime' => event.get('meetingAgenda.endTime'),
'vid' => event.get('meetingAgenda.vid'),
'links' => event.get('meetingAgenda.links'),
'seq' => event.get('meetingAgenda.seq'),
'creator' => event.get('meetingAgenda.creator'),
'createDate' => event.get('meetingAgenda.createDate'),
'updator' => event.get('meetingAgenda.updator'),
'updateDate' => event.get('meetingAgenda.updateDate'),
'comments' => event.get('meetingAgenda.comments'),
'views' => event.get('meetingAgenda.views'),
'agendaIslogin' => event.get('meetingAgenda.agendaIslogin'),
'agendaIsrealnameAuthentication' => event.get('meetingAgenda.agendaIsrealnameAuthentication'),
'condition' => event.get('meetingAgenda.condition')
}
end
end
event.cancel()
"
push_previous_map_as_event => true
timeout => 5
}
mutate {
# remove_field =>["@version", "meetingFieldsIds", "meetingAgendaIds", "authorIds"]
remove_field =>["@version"]
}
}
output {
elasticsearch {
id => "elasticsearch-meeting"
hosts => ["127.0.0.1:9200"]
index => "testmeeting"
document_type => "testmeeting"
document_id => "%{id}"
}
}`
这是我写的配置文件,但是无论怎么做,级联关系(二级数组)中的数据始终只有一条,我知道原因,是因为上面做了重复判断,但是不做重复判断的话又会出现重复情况,请问大神们这个地方怎么处理啊
数据表结构:
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。
绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论