我的GCP DataProc可行的用例是否可行?

发布于 2025-02-07 06:01:08 字数 2690 浏览 2 评论 0原文

不确定是否有一个地方/人要求一对一的建议设置和调整。但是,这里的数字和任何地方一样好,可以找到一些帮助。

我们的团队主要使用BigQuery对位置驱动数据进行数据分析。我们将数据携带回2019年,因此我们携带了很多数据。我们添加了一些聚类(总是进行日期分区),以帮助保持成本降低,但它到达了不可行的地步。目前,我们的数据和每日原始数据范围为3-8 TB(几步之后,减少了很多)。

首先,我们想将200 TB的数据移至GCS,并将其分割为更详细的水平。此数据的模式是:

UID-字符串 Timestamp_of_observation-时间戳, 拉特 - 浮动, lon-浮动, 数据源 - 字符串, CBG(CENSUS_BLOCK_GROUP的缩写) - 字符串

我们想使用Hive分区将数据保存到GCS,以便我们的存储桶文件夹结构看起来像 年>月> Day> CBG

知道我们正在处理约200tb和3年的数据,并且仅CBG就有大约200,000种可能性吗? 我们还有其他一些选项使用人口普查区域(84,414个子文件夹)或县(35,000),对我们来说越粒度越好。

我的第一次尝试要么只有一个OOM,要么我的阶段永远运行。我最初的pyspark代码看起来如下:

from pyspark import SparkFiles
from pyspark.sql.functions import year, month, dayofmonth, rand
from pyspark.sql.functions import col, spark_partition_id, asc, desc
 
# use appropriate version for jar depending on the scala version
spark = SparkSession.builder\
  .appName('BigNumeric')\
  .getOrCreate()
spark.conf.set("spark.sql.shuffle.partitions", 365*100)

df = spark.read \
  .format("bigquery") \
  .load("data-location-338617.Raw_Location_Data.data_cbg_enrich_proto")

df1 = df.withColumn("year", year(col("visit_timestamp"))) \
    .withColumn("month", month(col("visit_timestamp"))) \
    .withColumn("day", dayofmonth(col("visit_timestamp"))) \
    .withColumn("cbg", col("boundary_partition")) \
    .withColumn('salt', rand())


df1.repartition(365*100,'salt','year','month','day') \
    .drop('salt') \
    .write.mode("overwrite") \
    .format("parquet") \
    .partitionBy("year", "month", "day", "cbg") \
    .save("gs://initial_test/cbg_data/")

此代码是给我的,但是一名工程师。他告诉我要添加盐以偏斜,以增加分区。

任何建议都会有所帮助。这里的目标是进行一批巨大的批次将我们的数据迁移到GCS,然后每天开始将我们的原始数据转换为GC而不是BigQuery。

我想设想要编写的文件编号是3 12 30*200000(216000000),这似乎很多。有没有更好的方法来组织此方法,我们最初的目的是使这些数据在下游更便宜。目前,日期分区是最小化成本的最佳方法,我们在CBG列上进行了聚类,但似乎并没有降低成本很大。我的想法是,使用GCS Hive结构,它实际上将使CBG(或其他空间分组)成为真实的分区,现在只是一个集群。

最后,我“我对集群配置没有太大的作用,我已经玩过数量的工人节点和机器,但没有真正获得任何帮助的任何帮助,并感谢您的查看!

这是群集设置CLI代码

gcloud dataproc clusters create cluster-f35f --autoscaling-policy location_data --enable-component-gateway --bucket cbg-test-patino --region us-central1 --zone us-central1-f --master-machine-type n1-standard-8 --master-boot-disk-type pd-ssd --master-boot-disk-size 500 --num-workers 30 --worker-machine-type n2-standard-16 --worker-boot-disk-type pd-ssd --worker-boot-disk-size 1000 --image-version 2.0-debian10 --optional-components JUPYTER --project data-***********     --initialization-actions gs://goog-dataproc-initialization-actions-us-central1/connectors/connectors.sh     --metadata bigquery-connector-version=1.2.0     --metadata spark-bigquery-connector-version=0.21.0

Not sure if there is a place/people to ask for one on one advice for Dataproc setup and tuning. But figure here is as good as place as any to find some help.

Our team has been primarily using BigQuery to do our data analysis on location driven data. We're carrying data back to 2019, so we're carry a lot of data. We've added some clustering (always had date partitioning) to help keep cost down, but its getting to the point where it just not feasible. At the moment we have upwards to 200 TB of data and daily raw data ranges from 3-8 TB (gets reduce quite a bit after a few steps).

First we'd like to move our 200 TB of data to GCS and segment it to more granular level. The schema for this data is:

uid -- STRING
timestamp_of_observation -- TIMESTAMP,
lat -- FLOAT,
lon -- FLOAT,
datasource -- STRING,
cbg (short for census_block_group) -- STRING

We would like to save the data to GCS using hive partitioning so that our bucket folder structure looks like
year > month > day > cbg

Knowing we are processing about 200TB and 3 years of data and cbgs alone have about 200,000 possibilities is this feasible?
We have a few other options using either census block tracts (84,414 subfolders) or counties (35,000), the more granularity for us the better.

My first attempts I either get just a OOM or I get stages just running forever. My initial pyspark code looks like the following:

from pyspark import SparkFiles
from pyspark.sql.functions import year, month, dayofmonth, rand
from pyspark.sql.functions import col, spark_partition_id, asc, desc
 
# use appropriate version for jar depending on the scala version
spark = SparkSession.builder\
  .appName('BigNumeric')\
  .getOrCreate()
spark.conf.set("spark.sql.shuffle.partitions", 365*100)

df = spark.read \
  .format("bigquery") \
  .load("data-location-338617.Raw_Location_Data.data_cbg_enrich_proto")

df1 = df.withColumn("year", year(col("visit_timestamp"))) \
    .withColumn("month", month(col("visit_timestamp"))) \
    .withColumn("day", dayofmonth(col("visit_timestamp"))) \
    .withColumn("cbg", col("boundary_partition")) \
    .withColumn('salt', rand())


df1.repartition(365*100,'salt','year','month','day') \
    .drop('salt') \
    .write.mode("overwrite") \
    .format("parquet") \
    .partitionBy("year", "month", "day", "cbg") \
    .save("gs://initial_test/cbg_data/")

This code was given to me but a fellow engineer. He told me to add salt for skewness, to increase my partitions.

Any and all advice would be helpful. The goal here to do one huge batch to migrate our data to GCS and then daily begin to save our raw data transformed to GCS as oppose to Bigquery.

I would envision that the file numbers to be written are 31230*200000 (216000000) which seems like a lot. Is there a better way to organize this, our original purpose was to make this data MUCH cheaper downstream to query. Right now the date partition has been the best way to minimize cost, we have clustering on CBG column but it doesn't seem to drive cost down very much. My thought is that with the GCS hive structure, it would essentially make CBG (or other spatial grouping) as a true partition and now just a cluster.

Lastly I"m not doing much to the cluster configuration, I've played around with number of worker nodes and machines but haven't truly gotten anything to work again any help is appreciated and thank you for looking!

This is the cluster setup CLI code

gcloud dataproc clusters create cluster-f35f --autoscaling-policy location_data --enable-component-gateway --bucket cbg-test-patino --region us-central1 --zone us-central1-f --master-machine-type n1-standard-8 --master-boot-disk-type pd-ssd --master-boot-disk-size 500 --num-workers 30 --worker-machine-type n2-standard-16 --worker-boot-disk-type pd-ssd --worker-boot-disk-size 1000 --image-version 2.0-debian10 --optional-components JUPYTER --project data-***********     --initialization-actions gs://goog-dataproc-initialization-actions-us-central1/connectors/connectors.sh     --metadata bigquery-connector-version=1.2.0     --metadata spark-bigquery-connector-version=0.21.0

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。
列表为空,暂无数据
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文