左连接两个流数据帧然后在 PySpark 中聚合它们后出现空结果

发布于 2025-01-19 04:34:50 字数 5117 浏览 4 评论 0原文

数据说明

我有两个带有日志的CSV文件 - 一个包含跨度日志,另一个包含错误日志(对于熟悉的人来说,弹性APM日志,弹性APM日志)。相关的跨度字段是timestampIDname and code>和持续时间,而相关的错误字段为error_timestamperror_idspan_idspan_id错误日志中的字段是一个外键,引用了SPAN日志中的ID字段,从而定义了跨度和错误之间的一对一关系。

注意:我知道,如果我已经有CSV文件,我不应该使用流API,但是我的任务是研究Spark结构化流。连接到实际(生产)流源不是研究阶段的选项,因此我创建了一个Python脚本,该脚本通过编写跨度日志来模拟流源到端口9999,并以相互时间顺序的顺序为端口9998。

任务

我要执行以下

  1. :加入跨度和错误流数据范围使用左外的联接
  2. 组由结果范围:
    一个。一个30秒的非重叠时间窗口,通过span.timestamp列窗口
    b。跨度名称
  3. 汇总组,计算:
    一个。 跨度的平均值
    b。组中不同的跨度数(span.id
    c。组中的不同错误数(error.error_id

我尝试了

模拟流源的服务器当然可以正常工作 - 我尝试处理两个流而无需加入,而一切都很好。

我已经阅读了,而且我认为我主要了解溪流外部连接的原则,水印的目的以及聚合规则。到目前为止,这是我的代码:

from pyspark.sql import functions as F

# ...
# initialization of spans_df and errors_df dataframes
# ...

wm_spans = spans_df.withWatermark("timestamp", "60 seconds")
wm_errors = errors_df.withWatermark("timestamp", "60 seconds")

aggregation = wm_spans.join(
        wm_errors,
        F.expr(
            """
            id = span_id AND
            error_timestamp >= timestamp AND
            error_timestamp <= timestamp + interval 30 seconds
            """
        ), 
        "leftOuter"
    ).groupBy(
        F.window("timestamp", "30 seconds", "30 seconds"), 
        "name"
    ).agg(
        F.avg(F.col('duration')).alias('davg'),
        F.approx_count_distinct(F.col('id')).alias('nSps'),
        F.approx_count_distinct(F.col('error_id')).alias('nErr'),
    )

query = aggregation.writeStream\
    .outputMode("append")\
    .format("console")\
    .start()

预期输出

基于有关窗户聚合中水印的章节有关外部与水印的章节,不同的跨度名称)对。每个这样的行的错误数可能为0(如果在给定时间窗口中未链接到任何跨度的错误)。

问题

第一次运行时,Spark给了我以下错误:

ERROR UnsupportedOperationChecker: Detected pattern of possible 'correctness' issue due to global 
watermark. The query contains stateful operation which can emit rows older than the current 
watermark plus allowed late record delay, which are "late rows" in downstream stateful operations 
and these rows can be discarded. Please refer the programming guide doc for more details. 
If you understand the possible risk of correctness issue and still need to run the query, 
you can disable this check by setting the config
`spark.sql.streaming.statefulOperator.checkCorrectness.enabled` to false.

因此,我将指定的配置设置为false,这将此错误消息转换为警告消息。但是,结果表是空的。我知道这很可能是由于上述警告中提到的正确性问题,但是我不知道如何解决/避免问题,同时仍达到目标。

我尝试了许多不同的

  1. 水印期组合(目前在两个数据范围内60秒)
  2. 加入条件期(当前30秒:error_timestamp&lt; = timestamp&= timestamp + Interval 30秒
  3. 窗口周期(当前30秒长,长度为30秒, 30秒的间隔)

注意: 并非每个 batch都会给出一个空的结果。起始时间戳为2022-01-31T22:00:00。在22:00:00到22:05:00之间有数千个跨度日志,但只有9个错误日志,其中9个错误在22:04:22和22:04:43之间。因此,它们属于两个30s时窗:22:04:00-22:04:30和22:04:30-22:05:00。处理数据集的这一部分的批次实际上导致了一个看似成功的聚合:

-------------------------------------------------
Batch: 26
-------------------------------------------------
+-----------------------+------------------------+-------...
|start                  |name                    |dmin   ...
+-----------------------+------------------------+-------...
|2022-01-30 22:04:00    |<one of the span names> |3251.0 ...
+-----------------------+------------------------+-------...

我发现这一结果令人惊讶 - 使用左外连接应确保保留所有行。 null列如果在右表中找不到匹配。但是,只有在右表中有匹配项时,我才能得到非空的结果

任何帮助将不胜感激!如果我忘了陈述一些相关信息,请告诉我,我会尽快添加 - 我不习惯于编写问题,因此请记住这一点。

相关问题

Data description

I have two CSV files with logs - one contains Span logs, and the other contains Error logs (Elastic APM logs, for those who are acquainted). The relevant span fields are timestamp, id, name and duration, while the relevant error fields are error_timestamp, error_id and span_id. The span_id field in error logs is a foreign key referencing the id field in span logs, thus defining a one-to-many relationship between spans and errors.

NOTE: I know I shouldn't use the Streaming API if I already have CSV files, but my task is to research Spark Structured Streaming. Connecting to the actual (production) streaming source is not an option for the research phase, so I created a Python script which simulates a streaming source by writing span logs to port 9999 and error logs to port 9998, in a mutually chronological order.

The task

I want to do the following:

  1. Join span and error Streaming DataFrames using a left outer join
  2. Group the resulting DataFrame by:
    a. A non-overlapping time-window of 30 seconds, windowing over the span.timestamp column
    b. The span name
  3. Aggregate the groups, calculating:
    a. average value of span.duration
    b. number of distinct spans in group (span.id)
    c. number of distinct errors in group (error.error_id)

What I tried

The server simulating a streaming source certainly works correctly - I tried processing both streams without joining them, and everything worked great.

I've read the official documentation for Spark Structured Streaming, and I think I have mostly understood the principles behind stream-stream outer joins, the purpose of watermarking, as well as the aggregation rules. This is my code so far:

from pyspark.sql import functions as F

# ...
# initialization of spans_df and errors_df dataframes
# ...

wm_spans = spans_df.withWatermark("timestamp", "60 seconds")
wm_errors = errors_df.withWatermark("timestamp", "60 seconds")

aggregation = wm_spans.join(
        wm_errors,
        F.expr(
            """
            id = span_id AND
            error_timestamp >= timestamp AND
            error_timestamp <= timestamp + interval 30 seconds
            """
        ), 
        "leftOuter"
    ).groupBy(
        F.window("timestamp", "30 seconds", "30 seconds"), 
        "name"
    ).agg(
        F.avg(F.col('duration')).alias('davg'),
        F.approx_count_distinct(F.col('id')).alias('nSps'),
        F.approx_count_distinct(F.col('error_id')).alias('nErr'),
    )

query = aggregation.writeStream\
    .outputMode("append")\
    .format("console")\
    .start()

Expected output

Based on the chapter about watermarks in windowed aggregations and the chapter about outer joins with watermarking in the official Spark Structured Streaming documentation, I would expect this code to produce a row for each (30s time window, distinct span name) pair. The number of errors for each such row could be 0 (if no error was linked to any of the spans in the given time window).

The problem

When I ran this for the first time, Spark gave me the following error:

ERROR UnsupportedOperationChecker: Detected pattern of possible 'correctness' issue due to global 
watermark. The query contains stateful operation which can emit rows older than the current 
watermark plus allowed late record delay, which are "late rows" in downstream stateful operations 
and these rows can be discarded. Please refer the programming guide doc for more details. 
If you understand the possible risk of correctness issue and still need to run the query, 
you can disable this check by setting the config
`spark.sql.streaming.statefulOperator.checkCorrectness.enabled` to false.

So I set the specified config to false, which turned this ERROR message into a WARN message. However, the resulting tables are empty. I am aware this is most likely due to the correctness issue mentioned in the warning above, but I don't have any idea how I could fix/avoid the issue, while still accomplishing my goal.

I tried many different combinations of

  1. Watermarking period (currently 60 seconds for both dataframes)
  2. Join condition period (currently 30 seconds: error_timestamp <= timestamp + interval 30 seconds)
  3. Windowing period (currently 30 seconds length, with a 30 seconds interval)

NOTE: Not every batch gives an empty result. The starting timestamp is 2022-01-31T22:00:00. There are thousands of span logs between 22:00:00 and 22:05:00, but only 9 error logs, with all 9 of them between 22:04:22 and 22:04:43. Thus, they fall into two 30s time windows: 22:04:00-22:04:30 and 22:04:30-22:05:00. The batches that are processing this part of the dataset actually result in a seemingly successful aggregation:

-------------------------------------------------
Batch: 26
-------------------------------------------------
+-----------------------+------------------------+-------...
|start                  |name                    |dmin   ...
+-----------------------+------------------------+-------...
|2022-01-30 22:04:00    |<one of the span names> |3251.0 ...
+-----------------------+------------------------+-------...

I find this result surprising - using left outer join should ensure that all rows from the left table are kept, possibly being extended by NULL columns if no match is found in the right table. However, I get non-empty results only when there is a match in the right table.

Any help will be appreciated! If I forgot to state some relevant information, please do tell and I'll add it ASAP - I am not exactly accustomed to writing SO questions, so please keep that in mind.

Related questions

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。
列表为空,暂无数据
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文