将Avro转换为Azure的镶木

发布于 2025-01-31 03:55:16 字数 1259 浏览 3 评论 0原文

我每天在Azure Blob帐户中创建数百个AVRO文件。这些AVRO文件包含带有不同模式的JSON对象。每个文件大小范围在100kb至1 MB之间。我需要将它们转换为镶木格式,然后存储在同一Azure Blob帐户中。

我的约束:我需要使用Azure本机服务。我不想使用Azure函数,因为即使是一个文件也可能需要大量时间进行转换,而Azure函数将无法处理。因此,不寻找Azure充当解决方案的核心。

鉴于所描述的要求/约束​​,最佳方法是什么?

样品输入AVRO:

{
    "SequenceNumber": 123, 
    "Offset": "21212", 
    "EnqueuedTimeUtc": "2/18/2022 3:54:39 PM", 
    "SystemProperties": 
        {
            "x-opt-kafka-key": "211223131", 
            "x-opt-enqueued-time": 12321312321
        }, 
    "Properties": 
        {
            "element1": "value1", 
            "element2": "value2"}, 
    "Body":"{ 
                        \"version\":\"1.0\",
                        \"type\":\"some-type\",
                        \"data\":{
                                            \"id\":\"some-id\",
                                            \"color\":\"some-color\"
                                            \"address\":{
                                                                    \"line1\":\"some-line\",
                                                                    \"line-2\":\"some-line\"                                        },
}}

PS我已经用胶水完成了AWS。但是,ADF不能很好地处理复杂的AVRO文件。

感谢很多帮助/建议。

I have hundreds of avro files created daily in Azure BLOB account. These avro files contain JSON objects with varying schemas. Each file size ranges between 100KB to 1 MB. I need to transform them into parquet format and store in the same Azure BLOB account.

My constraints: I need to use Azure native services. I do not want to use Azure Functions as even a single file may take a substantial time to transform, which Azure Functions will not handle. Hence, not looking for Azure Functions as the core of the solution.

What would be the most optimal approach, given the requirements/constraints described?

sample input avro:

{
    "SequenceNumber": 123, 
    "Offset": "21212", 
    "EnqueuedTimeUtc": "2/18/2022 3:54:39 PM", 
    "SystemProperties": 
        {
            "x-opt-kafka-key": "211223131", 
            "x-opt-enqueued-time": 12321312321
        }, 
    "Properties": 
        {
            "element1": "value1", 
            "element2": "value2"}, 
    "Body":"{ 
                        \"version\":\"1.0\",
                        \"type\":\"some-type\",
                        \"data\":{
                                            \"id\":\"some-id\",
                                            \"color\":\"some-color\"
                                            \"address\":{
                                                                    \"line1\":\"some-line\",
                                                                    \"line-2\":\"some-line\"                                        },
}}

P.S. I have done it in AWS with Glue. However, ADF does not handle complex avro files well.

Much help/advice is appreciated.

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(1

能怎样 2025-02-07 03:55:16

我使用Azure Synapse Analytics进行了非常相似的用例,将AVRO转换为平坦的关系表。关键组件如下:

”在此处输入图像描述”

  • Azure Data Lake Gen 2 -AVRO文件存储在数据湖
  • Synapse Pipelines中 - 管道使用GET Metadata Activit并联运行的每个活动都可以处理每个文件
  • 突触笔记本 - 参数化的火花笔记本,它们打开RAW AVRO文件,并仅使用几行Scala将其将其置于关系形式中。可以很容易地对此进行调整以将文件保存为parquet。

Azure数据工厂(ADF)和Databricks将提供类似的体系结构。

为了成本,您必须为您的情况尝试一下,但是 azure noreferrer“ a>可以为您提供一个初学者的想法。对于Synapse笔记本电脑,它也将取决于您使用的火花池的大小。这些可以是小型,中等,大,Xlarge和xxlarge,成本相应上涨。我建议尝试少量文件,进行比较,并且发现哪个文件更易于使用,更快,更便宜的等等,

我们的示例中有一个带有两个列的AVRO文件,标题和车身。我们只对包含JSON并将其保存到数据库的“身体列”感兴趣。我们的JSON并不是特别复杂,但我们确实处理了许多不同数量的列。一个简化的示例:

参数单元

val pSourceFilePath = "abfss://[email protected]/raw/somepath/" 

加载单元格

这个看似简单的spark.read.load语句读取我们的特定文件格式,但是您可能需要更改为spark.read.format(“ avro”) - 这确实取决于您的文件:

// Read the parquet file and assign to dataframe
val df = spark.read.
    load(s"${pSourceFilePath}")

再次扩展

此单元格,使用星号语法(*)将所有JSON扩展到关系列 - 这适用于我们的格式,但我无法确定它将为您的工作:

// Expand the json column to many columns
val df2 = df.select($"body.*", input_file_name.as("filename"))

然后,您可以为Parquet编写以更熟悉的柱状格式编写数据框。

I do a very similar use case to convert avro into flat relational tables using Azure Synapse Analytics. The key components are as follows:

enter image description here

  • Azure Data Lake Gen 2 - avro files are stored in the data lake
  • Synapse Pipelines - pipelines pick up the avro files using Get Metadata activities to get information about the files, For Each activities running in parallel to process each file
  • Synapse Notebooks - parameterised Spark notebooks which open the raw avro file and flatten it into a relational form using only a few lines of Scala. This could easily be adapted to save the files out as parquet.

Azure Data Factory (ADF) and Databricks would offer a similar architecture.

For costs, you would have to try it for your scenario, but the Azure Pricing Calculator can give you an idea for starters. For Synapse Notebooks, it will also depend on the size of the Spark pools you use. These can be small, medium, large, Xlarge, and XXLarge, with costs rising accordingly. I would suggest trying both for a smaller number of the files, do a comparison and which one you found easier to work with, faster, cheaper etc

For my example, we have an avro file with two columns, header and body. We're only interested in the body column which contains json and saving it to a database. Our json isn't particularly complicated but we do deal with many different amounts of columns. A simplified example:

Parameters cell

val pSourceFilePath = "abfss://[email protected]/raw/somepath/" 

Load cell

This deceptively simple spark.read.load statement reads our particular file format, but you may need to change to spark.read.format("avro") - it really depends on your files:

// Read the parquet file and assign to dataframe
val df = spark.read.
    load(s"${pSourceFilePath}")

Expand cell

This again deceptively simple code, using the asterisk syntax (*) to expand all the json to relational columns - this works for our format but I can't be sure it will work for yours:

// Expand the json column to many columns
val df2 = df.select(
quot;body.*", input_file_name.as("filename"))

You could then write the dataframe, which is now in a more familiar columnar format, to parquet.

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文