Prometheus的Pyspark UDF监视

发布于 2025-01-24 12:52:11 字数 1297 浏览 2 评论 0 原文

我正在尝试使用计数器在UDF中监视某些逻辑。

counter = Counter(...).labels("value")

@ufd
def do_smthng(col):
  if col:
    counter.label("not_null").inc()
  else:
    counter.label("null").inc()
  return col

这不是实际情况,但是您应该明白这个主意。 我遵循了这篇文章: https://kb.databricks.com/metrics/metrics/spark-metrics/spark-metrics.html

到目前为止,我已经尝试过:

  • 使用全局Propetheus计数器(锁定失败是不可拾取的),
  • 使用PY4J创建自定义源:

# noinspection PyPep8Naming
class CustomMetrics:
    def __init__(self, sourceName, metricRegistry):
        self.metricRegistry = metricRegistry
        self.sourceName = sourceName

    class Java:
        implements = ["org.apache.spark.metrics.source.Source"]

py_4j_gateway = spark_session.sparkContext._gateway
metric_registry = py_4j_gateway.jvm.com.codahale.metrics.MetricRegistry()
SparkEnv = py_4j_gateway.jvm.org.apache.spark.SparkEnv
custom_metrics_provider = CustomMetrics("spark.ingest.custom", metric_registry)

它在同一错误中失败。 我也无法获得 sparkenv.get.metricssystem ,因此无论如何我都无法注册自定义指标客户端。

我无法从Python访问内部度量注册表吗? 我开始想知道人们如何使用自定义指标监视火花管道。

火花3.1.2 Python 3.8 x86 Mackbook Pro M1 Pro

I am am trying to monitor some logic in a udf using counters.

i.e.

counter = Counter(...).labels("value")

@ufd
def do_smthng(col):
  if col:
    counter.label("not_null").inc()
  else:
    counter.label("null").inc()
  return col

This is not the real case, but you should get the idea.
I have followed this article:
https://kb.databricks.com/metrics/spark-metrics.html

I have so far tried:

  • Using a global prometheus counter (Failed with Lock is not picklable)
  • Creating a custom source using py4j:

# noinspection PyPep8Naming
class CustomMetrics:
    def __init__(self, sourceName, metricRegistry):
        self.metricRegistry = metricRegistry
        self.sourceName = sourceName

    class Java:
        implements = ["org.apache.spark.metrics.source.Source"]

py_4j_gateway = spark_session.sparkContext._gateway
metric_registry = py_4j_gateway.jvm.com.codahale.metrics.MetricRegistry()
SparkEnv = py_4j_gateway.jvm.org.apache.spark.SparkEnv
custom_metrics_provider = CustomMetrics("spark.ingest.custom", metric_registry)

Which failed with the same error.
I also can't get SparkEnv.get.metricsSystem so I can't register the custom metrics client in any case.

Is there no way for me to access the internal metric registry from python?
I am starting to wonder how people do monitor spark pipelines with custom metrics.

Spark 3.1.2
Python 3.8 x86
MackBook Pro M1 Pro

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(1

浪荡不羁 2025-01-31 12:52:11

为什么不使用累加器 ?它可以访问,非常适合计算事物。在发明火花之前,这是用于收集指标的地图减少的持有。

编辑这不会在Spark UI中显示。很明显,问题已知,但已关闭。我将在此处留给不使用Python的人的其余答案,但这在Spark UI中不显示。 (可以用于其他事物,但不能在Spark UI中显示)

然后,您的累加器可以通过a 'Prometheusservlet'

namespace = AccumulatorSource注意:用户可配置的来源
累加器到度量系统DoubleAccumulatorsource
longAccumulatorsource

Why don't you use a accumulator? It's made to be accessible and is perfect for counting things. It's a hold over from Map Reduce that was used for collecting metrics before spark was invented.

EDIT this does not get displayed in the spark UI. It's clear the issue is known but has been closed. I'll leave the rest of the answer here for people that don't use python, but this does not display in the Spark UI. (Can be used for other things but not display in spark UI)

Your accumulator can then be exposed as a sink via a 'PrometheusServlet'

namespace=AccumulatorSource note: User-configurable sources to attach
accumulators to metric system DoubleAccumulatorSource
LongAccumulatorSource

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文