将火花数据帧插入分区表

发布于 2025-01-22 16:02:40 字数 714 浏览 4 评论 0原文

我已经看到了用于插入蜂巢表的方法,例如insertinto(table_name,oftrite = true,但我无法弄清楚如何处理下面的方案。

对于第一次运行,类似的数据框架需要保存在表格中,由“ date_key”分区

+---+----------+
| id|  date_key|
+---+----------+
|  1|202201    |
|  2|202203    |
|  3|202201    |
+---+----------+

。 想使用date_key将新数据附加到其相应

+---+----------+
| id|  date_key|
+---+----------+
|  4|202204    |
|  5|202203    |
|  6|202204    |
+---+----------+

,我

  1. 分区
  2. 的 例如上面的样本输入?

在每次运行中,

都将来自多个分区的新数据, 我无法使用df.write.partitionby(“ date_key”)。insertinto(table_name),因为有一个错误,说insertinto无法与一起使用。 partitionby

I have seen methods for inserting into Hive table, such as insertInto(table_name, overwrite =True, but I couldn't work out how to handle the scenario below.

For the first run, a dataframe like this needs to be saved in a table, partitioned by 'date_key'. There could be one or more partitions eg 202201 and 202203

+---+----------+
| id|  date_key|
+---+----------+
|  1|202201    |
|  2|202203    |
|  3|202201    |
+---+----------+

For subsequent run, the data comes in also like this, and I'd like to append the new data to their corresponding partitions using date_key

+---+----------+
| id|  date_key|
+---+----------+
|  4|202204    |
|  5|202203    |
|  6|202204    |
+---+----------+

Could you please help to shed some light on how to handle

  1. if during each run there will only be new data from one partition
  2. if during each run there will new data from multiple partitions, like the sample inputs above?

Many thanks for your help. Let me know if I can explain the problem better.

Edited:
I could not use df.write.partitionBy("date_key").insertInto(table_name), as there was an error saying insertInto can not be used together with partitionBy.

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(2

别靠近我心 2025-01-29 16:02:40

在我的示例中,第一次运行将创建新的分区表数据C2是分区列。

df1 = spark.createDataFrame([
    (1, 'a'),
    (2, 'b'),
], 'c1 int, c2 string')
df1.show()
df1.write.partitionBy('c2').mode('overwrite').saveAsTable('data')

/
  c2=a
    part-00000-7810a4aa-a5a1-4c4f-a09a-ef86a66041c9.c000.snappy.parquet
  c2=b
    part-00000-7810a4aa-a5a1-4c4f-a09a-ef86a66041c9.c000.snappy.parquet

第二次运行,您不要需要任何花哨的东西,只需附加insertinto。 Spark知道您有C2是分区列,并且它将不必通过partitionby

df2 = spark.createDataFrame([
    (1, 'a'),
    (3, 'c'),
], 'c1 int, c2 string')
df2.show()
df2.write.mode('append').insertInto('data')

/
  c2=a
    part-00000-7810a4aa-a5a1-4c4f-a09a-ef86a66041c9.c000.snappy.parquet
    part-00000-dcd9029e-8c65-4397-bca5-ab2691ece7ff.c000.snappy.parquet
  c2=b
    part-00000-7810a4aa-a5a1-4c4f-a09a-ef86a66041c9.c000.snappy.parquet
  c2=c
    part-00000-dcd9029e-8c65-4397-bca5-ab2691ece7ff.c000.snappy.parquet

In my example here, first run will create new partitioned table data. c2 is the partition column.

df1 = spark.createDataFrame([
    (1, 'a'),
    (2, 'b'),
], 'c1 int, c2 string')
df1.show()
df1.write.partitionBy('c2').mode('overwrite').saveAsTable('data')

/
  c2=a
    part-00000-7810a4aa-a5a1-4c4f-a09a-ef86a66041c9.c000.snappy.parquet
  c2=b
    part-00000-7810a4aa-a5a1-4c4f-a09a-ef86a66041c9.c000.snappy.parquet

Second run, you don't need anything fancy, just append and insertInto. Spark knows you have c2 is the partition column and will it properly, you don't have to tell it via partitionBy,

df2 = spark.createDataFrame([
    (1, 'a'),
    (3, 'c'),
], 'c1 int, c2 string')
df2.show()
df2.write.mode('append').insertInto('data')

/
  c2=a
    part-00000-7810a4aa-a5a1-4c4f-a09a-ef86a66041c9.c000.snappy.parquet
    part-00000-dcd9029e-8c65-4397-bca5-ab2691ece7ff.c000.snappy.parquet
  c2=b
    part-00000-7810a4aa-a5a1-4c4f-a09a-ef86a66041c9.c000.snappy.parquet
  c2=c
    part-00000-dcd9029e-8c65-4397-bca5-ab2691ece7ff.c000.snappy.parquet
九局 2025-01-29 16:02:40

如果表是外部表,则可以使用以下代码将数据写入外部分区表,

df.write.partitionBy("date_key").mode("append").option("path","/path/to/external/table/on/hdfs").saveAsTable("table_name_here")

如果它是蜂巢托管表
如下

df.write.partitionBy("date_key").mode("append").saveAsTable("tableName")

if the table is an external table you can use the following code to write the data out to the external partitioned table

df.write.partitionBy("date_key").mode("append").option("path","/path/to/external/table/on/hdfs").saveAsTable("table_name_here")

If it is a hive managed table then you can simply use the saveAsTable API
as follows

df.write.partitionBy("date_key").mode("append").saveAsTable("tableName")
~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文