如何基于PYPSPARK查询?

发布于 2025-01-22 07:27:04 字数 1113 浏览 3 评论 0原文

我有一个简单的Pyspark脚本,我想对每个部分进行基准测试。

# section 1: prepare data
df = spark.read.option(...).csv(...)
df.registerTempTable("MyData")

# section 2: Dataframe API
avg_earnings = df.agg({"earnings": "avg"}).show()

# section 3: SQL
avg_earnings = spark.sql("""SELECT AVG(earnings)
                            FROM MyData""").show()

生成可靠的测量值需要多次运行每个部分。我使用Python 时间模块的解决方案看起来像这样。

import time
for _ in range(iterations):
    t1 = time.time()
    df = spark.read.option(...).csv(...)
    df.registerTempTable("MyData")

    t2 = time.time()
    avg_earnings = df.agg({"earnings": "avg"}).show()

    t3 = time.time()
    avg_earnings = spark.sql("""SELECT AVG(earnings)
                            FROM MyData""").show()
    t4 = time.time()
   
    write_to_csv(t1, t2, t3, t4)

我的问题是每个部分如何一个基准?您还会使用时间模块吗?如何禁用Pyspark的缓存?


编辑: 绘制基准的前5个迭代表明,Pyspark正在做某种形式的缓存。

我如何禁用此行为?

I have got a simple pyspark script and I would like to benchmark each section.

# section 1: prepare data
df = spark.read.option(...).csv(...)
df.registerTempTable("MyData")

# section 2: Dataframe API
avg_earnings = df.agg({"earnings": "avg"}).show()

# section 3: SQL
avg_earnings = spark.sql("""SELECT AVG(earnings)
                            FROM MyData""").show()

Do generate reliable measurements one would need to run each section multiple times. My solution using the python time module looks like this.

import time
for _ in range(iterations):
    t1 = time.time()
    df = spark.read.option(...).csv(...)
    df.registerTempTable("MyData")

    t2 = time.time()
    avg_earnings = df.agg({"earnings": "avg"}).show()

    t3 = time.time()
    avg_earnings = spark.sql("""SELECT AVG(earnings)
                            FROM MyData""").show()
    t4 = time.time()
   
    write_to_csv(t1, t2, t3, t4)

My Question is how would one benchmark each section ? Would you use the time-module as well ? How would one disable caching for pyspark ?


Edit:
Plotting the first 5 iterations of the benchmark shows that pyspark is doing some form of caching.
benchmark of section 1

How can I disable this behaviour ?

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(1

·深蓝 2025-01-29 07:27:04

首先,您无法使用show进行基准测试,它仅计算并返回前20行。

第二,通常,Pyspark API和Spark SQL在场景后面共享相同的催化剂优化器,因此总的来说您正在做的事情(使用.agg vs avg())非常相似,没有太大的区别。

第三,通常,仅当您的数据确实很大,或者您的操作比预期的要长得多。除此之外,如果运行时差仅几分钟,那并不重要。

无论如何,要回答您的问题:

  1. 是的,使用time.time()进行测量没有错。
  2. 您应该使用count()而不是show()计数将继续前进并计算整个数据集。
  3. 如果您不调用它,则不必担心缓存。 Spark不会缓存,除非您要求它。实际上,在基准测试时您根本不应缓存。
  4. 您还应该使用静态分配而不是动态分配。或者,如果您使用的是Databricks或EMR,请使用固定数量的工人,并且不要自动尺度。

First, you can't benchmark using show, it only calculates and returns the top 20 rows.

Second, in general, PySpark API and Spark SQL share the same Catalyst Optimizer behind the scene, so overall what you are doing (using .agg vs avg()) is pretty much similar and don't have much difference.

Third, usually, benchmarking is only meaningful if your data is really big, or your operation is much longer than expected. Other than that, if the runtime difference is only a couple of minutes, it doesn't really matter.

Anyway, to answer your question:

  1. Yes, there is nothing wrong to use time.time() to measure.
  2. You should use count() instead of show(). count would go forward and compute your entire dataset.
  3. You don't have to worry about cache if you don't call it. Spark won't cache unless you ask for it. In fact, you shouldn't cache at all when benchmarking.
  4. You should also use static allocation instead of dynamic allocation. Or if you're using Databricks or EMR, use a fixed amount of workers and don't auto-scale it.
~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文