需要管道列名称,列编号,数据类型更改检测器的想法 - 胶水
我在AWS中设置了以下管道设置(试图在此处不提供太多公司信息)。这可能是过度简化的,但是这里是。
我们有一个ETL过程,其中包含每天爬行的数据源,然后在雅典娜上演。然后,我们进行了胶水作业设置,可以从雅典娜的分阶段数据中拾取特定的表,这些数据将写入其他地方的MySQL数据库。 (这都是设置和工作)。
该公司要求我们确保在MySQL中维护列名称,数据类型,无效或无效等。我们不会在MySQL上掉落并重新创建表,因为还有其他部门可能需要其他一些转换等,还需要在数据上执行与这些转换,还需要与这些转换以及参考完整性等。
当时我需要实现的目标是跟踪一旦数据上演,列重命名,列添加,添加了列,删除列,添加了新表格,删除了表等。
我已经尝试设置一个更改表,然后查询信息模式:
SELECT * FROM information_schema.columns
WHERE table_schema = 'table'
and table_name = 'table_name'
然后通过对前一天进行计数,将结果列拉到变形值表中,而今天对其进行计数。如果有更改,请比较从上面查询返回的列名与前几天存在的内容(这是在ChangElog中存储的,如下所示:
table_name varchar(255) ,
count_columns int,
table_id int,
created_at timestamp ,
updated_at timestamp
然后,Table_id引用另一个表,
table_id int (foreign key to changelog)
column_name varchar(45),
column_datatype varchar(255)
column_nullable char(5),
created_at timestamp ,
updated_at timestamp
我知道没有参考完整性在athena中,但是我是我自己的原因之一是我觉得这个解决方案的原因之一(加载每个桌子和比较)。 lambda查询雅典娜的数据,比较源/目的地,然后在mySQL中纠正它?
I have the following pipeline setup in AWS (trying not to give away too much company info away here). This may be over simplification, but here goes.
We have an ETL process that contain datasources that are crawled daily, and then staged in Athena. We then have Glue jobs setup to pick up specific tables from the staged data in Athena that writes to a MySQL database hosted elsewhere. (This is all setup and working).
The company requires us to make sure that the column names, datatypes, nullable or not etc are maintained in mySQL. We do NOT drop and recreate tables on MySQL, as there are other departments that may require some other transformations etc to be performed on the data - also views tied to these, together with referential integrity etc.
What I then need to achieve, is tracking Column Rename, Column Datatype change, Column Added, Column Removed, New Table Added, Table Removed etc on athena tables, as soon as the data is staged.
I have tried setting up a changelog table, and then querying information schema :
SELECT * FROM information_schema.columns
WHERE table_schema = 'table'
and table_name = 'table_name'
and then pulling the resulting columns into the changelog table by counting the columns the previous day, vs counting it today. If there is a change, compare the column names returned from the above query to what existed the previous days (this is stored in changelog as follows :
table_name varchar(255) ,
count_columns int,
table_id int,
created_at timestamp ,
updated_at timestamp
then table_id references another table
table_id int (foreign key to changelog)
column_name varchar(45),
column_datatype varchar(255)
column_nullable char(5),
created_at timestamp ,
updated_at timestamp
I know there's no such thing as referential integrity in Athena, but I am creating my own. It's one of the reasons I feel this solution is messy and takes too long (loading each table and comparing etc). Is there nothing built into Athena or AWS Glue that can handle this? Can we perhaps have a Lambda query the data in athena, compare source/destination and then correct it in mysql?
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论
评论(1)
因此,这就是我解决的方法。
我创建了以下表:
一个包含当前表快照的DBTABLES表。
步骤1-将JSON源数据转换并将其转换为Parquet的AWS胶合作业 - 将其写入登台存储桶。
步骤2-爬行数据的爬行者。
步骤3
每天早晨,lambda运行以下操作:
事件触发 - 写入S3登台桶
lambda(使用AWS Wrangler作为一层)
So here's how I solved it.
I created the following tables :
A dbtables table that contains a snapshot of current tables.
Step 1 - an AWS glue job that converts the json source data and converts it into parquet - writing it to a staging bucket.
Step 2 - A Crawler that crawls the data in staging.
Step 3
Every morning, a lambda runs that does the following :
Event Trigger - a Write to an S3 staging bucket
Lambda (using AWS Wrangler as a layer)