如何基于PYPSPARK查询?
我有一个简单的Pyspark脚本,我想对每个部分进行基准测试。
# section 1: prepare data
df = spark.read.option(...).csv(...)
df.registerTempTable("MyData")
# section 2: Dataframe API
avg_earnings = df.agg({"earnings": "avg"}).show()
# section 3: SQL
avg_earnings = spark.sql("""SELECT AVG(earnings)
FROM MyData""").show()
生成可靠的测量值需要多次运行每个部分。我使用Python 时间模块的解决方案看起来像这样。
import time
for _ in range(iterations):
t1 = time.time()
df = spark.read.option(...).csv(...)
df.registerTempTable("MyData")
t2 = time.time()
avg_earnings = df.agg({"earnings": "avg"}).show()
t3 = time.time()
avg_earnings = spark.sql("""SELECT AVG(earnings)
FROM MyData""").show()
t4 = time.time()
write_to_csv(t1, t2, t3, t4)
我的问题是每个部分如何一个基准?您还会使用时间模块吗?如何禁用Pyspark的缓存?
编辑: 绘制基准的前5个迭代表明,Pyspark正在做某种形式的缓存。
我如何禁用此行为?
I have got a simple pyspark script and I would like to benchmark each section.
# section 1: prepare data
df = spark.read.option(...).csv(...)
df.registerTempTable("MyData")
# section 2: Dataframe API
avg_earnings = df.agg({"earnings": "avg"}).show()
# section 3: SQL
avg_earnings = spark.sql("""SELECT AVG(earnings)
FROM MyData""").show()
Do generate reliable measurements one would need to run each section multiple times. My solution using the python time module looks like this.
import time
for _ in range(iterations):
t1 = time.time()
df = spark.read.option(...).csv(...)
df.registerTempTable("MyData")
t2 = time.time()
avg_earnings = df.agg({"earnings": "avg"}).show()
t3 = time.time()
avg_earnings = spark.sql("""SELECT AVG(earnings)
FROM MyData""").show()
t4 = time.time()
write_to_csv(t1, t2, t3, t4)
My Question is how would one benchmark each section ? Would you use the time-module as well ? How would one disable caching for pyspark ?
Edit:
Plotting the first 5 iterations of the benchmark shows that pyspark is doing some form of caching.
How can I disable this behaviour ?
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论
评论(1)
首先,您无法使用
show
进行基准测试,它仅计算并返回前20行。第二,通常,Pyspark API和Spark SQL在场景后面共享相同的催化剂优化器,因此总的来说您正在做的事情(使用
.agg
vsavg()
)非常相似,没有太大的区别。第三,通常,仅当您的数据确实很大,或者您的操作比预期的要长得多。除此之外,如果运行时差仅几分钟,那并不重要。
无论如何,要回答您的问题:
time.time()
进行测量没有错。count()
而不是show()
。计数
将继续前进并计算整个数据集。缓存
。 Spark不会缓存,除非您要求它。实际上,在基准测试时您根本不应缓存。First, you can't benchmark using
show
, it only calculates and returns the top 20 rows.Second, in general, PySpark API and Spark SQL share the same Catalyst Optimizer behind the scene, so overall what you are doing (using
.agg
vsavg()
) is pretty much similar and don't have much difference.Third, usually, benchmarking is only meaningful if your data is really big, or your operation is much longer than expected. Other than that, if the runtime difference is only a couple of minutes, it doesn't really matter.
Anyway, to answer your question:
time.time()
to measure.count()
instead ofshow()
.count
would go forward and compute your entire dataset.cache
if you don't call it. Spark won't cache unless you ask for it. In fact, you shouldn't cache at all when benchmarking.