使用SPARK BQ连接器从DataProc上的Pyspark作业中将标签添加到大查询表中

发布于 2025-02-02 05:15:45 字数 848 浏览 2 评论 0原文

我正在尝试在Google DataProc群集上使用PY-Spark来运行Spark作业并将结果写入大查询表。

Spark BigQuery连接器文档 - https://githbithub.com/googleclecloclocdataproc/spark-bigquroc/spark-bigquery-conector

要求在桌子的​​创建期间,大查询表中应存在某些标签。

SPARK BQ连接器没有提供任何添加标签以添加标签以进行写操作的规定,

df.write.format("bigquery") \
    .mode("overwrite") \
    .option("temporaryGcsBucket", "tempdataprocbqpath") \
    .option("createDisposition", "CREATE_IF_NEEDED") \
    .save("abc.tg_dataset_1.test_table_with_labels")

以上命令在背景中创建BigQuery Load作业,该作业将表加载数据。 进一步检查后,大查询负载作业语法本身并不支持与大查询 - 查询作业相反的标签。

是否有任何计划支持以下

  1. 的大查询负载工作支持中
  2. 标签在Spark BQ连接器的写操作中为标签

的支持。由于在加载/写操作过程中没有任何规定要添加标签

I am trying to use py-spark on google dataproc cluster to run a spark job and writing results to a Big Query table.

Spark Bigquery Connector Documentation - https://github.com/GoogleCloudDataproc/spark-bigquery-connector

The requirement is during the creation of the table, there are certain labels that should be present on the big query table.

The spark bq connector does not provide any provision to add labels for write operation

df.write.format("bigquery") \
    .mode("overwrite") \
    .option("temporaryGcsBucket", "tempdataprocbqpath") \
    .option("createDisposition", "CREATE_IF_NEEDED") \
    .save("abc.tg_dataset_1.test_table_with_labels")

The above command creates bigquery load job in background that loads the table with the data.
Having checked further, the big query load job syntax itself does not support addition of labels in contrast to big query - query job.

Is there any plan to support the below

  1. Support for labels in big query load job
  2. Support for labels in write operation of spark bq connector.

Since there is no provision to add labels during the load/write operation, the current workaround used is to have the table created with schema/labels before the pyspark job

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(2

温柔戏命师 2025-02-09 05:15:45

问题在10个月前发布。我不确定当时是否可以使用或最近添加它,但是相同的GitHub文档提供了添加标签的选项:

-BigQueryTableLabel: -
可以用来将标签添加到表中时将标签添加到表中。可以设置多个标签。
(可选)

因此您可以在代码中添加类似的内容: -

spark.conf.set(“ bigquerytablelabel.name”,“ sample_table_name”)

Question was posted 10 months ago. I am not sure if it was available at that time or added recently but the same github documentation has provided the option to add labels :-

bigQueryTableLabel :-
Can be used to add labels to the table while writing to a table. Multiple labels can be set.
(Optional)

so you can add something like this in your code:-

spark.conf.set("bigQueryTableLabel.name", "sample_table_name")

尴尬癌患者 2025-02-09 05:15:45

这个问题已经发布了很长时间。但是,请通过以下信息回答。它将帮助开发人员获得有关每个Spark Job的插槽用途。

我已经写了下面的代码,以捕获相对于每个Spark Job的BigQuery插槽使用情况。
您可以在下面使用下面的补丁,其中您可以通过Spark Job执行查询。

 def getDataprocJobId(spark: SparkSession): String = {
    spark.sparkContext.getConf.get(KEY_SPARK_YARN_TAGS, UNKNOWN).split(',')
      .find(_.startsWith(DATAPROC_JOB_PREFIX))
      .map(_.substring(DATAPROC_JOB_PREFIX.length))
      .getOrElse(spark.sparkContext.appName)
  }

val bld = QueryJobConfiguration.newBuilder(sql)
val dataprocjobid: String = getDataprocJobId(spark)
  var dataproc_job_id: String = {
    if (dataprocjobid.startsWith("exp")) {
      val pattern= "exp\\-[a-zA-Z0-9-](.*)(?=-EXPORT)".r
      pattern.findFirstIn(dataprocjobid).toString
    }
    else {
      dataprocjobid
    }
  }

import scala.collection.JavaConverters._
bld.setLabels(Map("dataproc_job_id" -> dataproc_job_id.toLowerCase).asJava)

**Its working fine and I can see the labels inside information_schema table.**

select
 query
,TIMESTAMP_DIFF(start_time, end_time, MINUTE) as diff
,total_bytes_processed
,total_slot_ms
,cache_hit
,lbl.value as dataproc_job_id
from `region-eu.INFORMATION_SCHEMA.JOBS_BY_PROJECT` ,unnest(labels) lbl
where date(creation_time) >='2024-04-19'
and lbl.key='dataproc_job_id' limit 10;

但是,它有一定的局限性。

1)Label keys and values can be no longer than 63 characters
2)can only contain lowercase letters, numeric characters, underscores and dashes. International characters are allowed. 
3)Label keys must start with a letter and each label in the list must have a different key.

对于我们的系统,很少有Spark Jobs的工作ID超过63的限制,因此我们使用Regex从中获取有意义的信息。

This question was posted a long time back. However answering with below piece of information. It will help developers to get the slot usuage with respect to each spark job.

I have written a below piece of code to capture the bigquery slot usage with respect to each spark job.
You can use below patch wherein you are executing your queries via spark job.

 def getDataprocJobId(spark: SparkSession): String = {
    spark.sparkContext.getConf.get(KEY_SPARK_YARN_TAGS, UNKNOWN).split(',')
      .find(_.startsWith(DATAPROC_JOB_PREFIX))
      .map(_.substring(DATAPROC_JOB_PREFIX.length))
      .getOrElse(spark.sparkContext.appName)
  }

val bld = QueryJobConfiguration.newBuilder(sql)
val dataprocjobid: String = getDataprocJobId(spark)
  var dataproc_job_id: String = {
    if (dataprocjobid.startsWith("exp")) {
      val pattern= "exp\\-[a-zA-Z0-9-](.*)(?=-EXPORT)".r
      pattern.findFirstIn(dataprocjobid).toString
    }
    else {
      dataprocjobid
    }
  }

import scala.collection.JavaConverters._
bld.setLabels(Map("dataproc_job_id" -> dataproc_job_id.toLowerCase).asJava)

**Its working fine and I can see the labels inside information_schema table.**

select
 query
,TIMESTAMP_DIFF(start_time, end_time, MINUTE) as diff
,total_bytes_processed
,total_slot_ms
,cache_hit
,lbl.value as dataproc_job_id
from `region-eu.INFORMATION_SCHEMA.JOBS_BY_PROJECT` ,unnest(labels) lbl
where date(creation_time) >='2024-04-19'
and lbl.key='dataproc_job_id' limit 10;

However, there are certain limitations to it.

1)Label keys and values can be no longer than 63 characters
2)can only contain lowercase letters, numeric characters, underscores and dashes. International characters are allowed. 
3)Label keys must start with a letter and each label in the list must have a different key.

for our system there were few spark jobs wherein job id was exceeding the limit of 63, So we have used regex to get the meaningful information out of it.

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文