pyspark用户定义的函数中的访问对象从外部范围中,避免使用腌制物:无法序列化对象

发布于 2025-01-26 04:02:13 字数 2556 浏览 3 评论 0原文

如何避免在pyspark用户定义的功能中初始化类?这是一个例子。

创建一个Spark会话和代表四个纬度和纵向的数据框架。

import pandas as pd
from pyspark import SparkConf
from pyspark.sql import SparkSession

conf = SparkConf()
conf.set('spark.sql.execution.arrow.pyspark.enabled', 'true')
spark = SparkSession.builder.config(conf=conf).getOrCreate()

sdf = spark.createDataFrame(pd.DataFrame({
    'lat': [37, 42, 35, -22],
    'lng': [-113, -107, 127, 34]}))

这是Spark DataFrame

+---+----+
|lat| lng|
+---+----+
| 37|-113|
| 42|-107|
| 35| 127|
|-22|  34|
+---+----+

通过TimeZoneFinder < / code>软件包,在每个纬度 /经度上使用一个时区字符串丰富数据框。 代码在没有错误的情况下运行

from typing import Iterator
from timezonefinder import TimezoneFinder

def func(iterator: Iterator[pd.DataFrame]) -> Iterator[pd.DataFrame]:
    for dx in iterator:
        tzf = TimezoneFinder()
        dx['timezone'] = [tzf.timezone_at(lng=a, lat=b) for a, b in zip(dx['lng'], dx['lat'])]
        yield dx
pdf = sdf.mapInPandas(func, schema='lat double, lng double, timezone string').toPandas()

上述代码在没有错误的情况下运行,并在下面创建PANDAS DATAFRAME。问题是timezonefinder类是在用户定义的函数中初始化的,

In [4]: pdf
Out[4]:
    lat    lng         timezone
0  37.0 -113.0  America/Phoenix
1  42.0 -107.0   America/Denver
2  35.0  127.0       Asia/Seoul
3 -22.0   34.0    Africa/Maputo

该函数创建瓶颈是如何使该代码更像以下面的方式运行,其中timezonefinder class类在用户定义的函数中初始化一次。如今,下面的代码生成此错误picklingError:无法序列化对象:typeError:cockle'_io.bufferedReader'对象

def func(iterator: Iterator[pd.DataFrame]) -> Iterator[pd.DataFrame]:
    for dx in iterator:
        dx['timezone'] = [tzf.timezone_at(lng=a, lat=b) for a, b in zip(dx['lng'], dx['lat'])]
        yield dx
tzf = TimezoneFinder()
pdf = sdf.mapInPandas(func, schema='lat double, lng double, timezone string').toPandas()

update-还尝试使用functions.partials.partials.partials.partial和code> and code> and Code>和外部功能,但仍会收到相同的错误。也就是说,这种方法无效:

def outer(iterator, tzf):
    def func(iterator: Iterator[pd.DataFrame]) -> Iterator[pd.DataFrame]:
        for dx in iterator:
            dx['timezone'] = [tzf.timezone_at(lng=a, lat=b) for a, b in zip(dx['lng'], dx['lat'])]
            yield dx
    return func(iterator)
tzf = TimezoneFinder()
outer = partial(outer, tzf=tzf)
pdf = sdf.mapInPandas(outer, schema='lat double, lng double, timezone string').toPandas()

How do I avoid initializing a class within a pyspark user-defined function? Here is an example.

Creating a spark session and DataFrame representing four latitudes and longitudes.

import pandas as pd
from pyspark import SparkConf
from pyspark.sql import SparkSession

conf = SparkConf()
conf.set('spark.sql.execution.arrow.pyspark.enabled', 'true')
spark = SparkSession.builder.config(conf=conf).getOrCreate()

sdf = spark.createDataFrame(pd.DataFrame({
    'lat': [37, 42, 35, -22],
    'lng': [-113, -107, 127, 34]}))

Here is the Spark DataFrame

+---+----+
|lat| lng|
+---+----+
| 37|-113|
| 42|-107|
| 35| 127|
|-22|  34|
+---+----+

Enriching the DataFrame with a timezone string at each latitude / longitude via the timezonefinder package. Code below runs without errors

from typing import Iterator
from timezonefinder import TimezoneFinder

def func(iterator: Iterator[pd.DataFrame]) -> Iterator[pd.DataFrame]:
    for dx in iterator:
        tzf = TimezoneFinder()
        dx['timezone'] = [tzf.timezone_at(lng=a, lat=b) for a, b in zip(dx['lng'], dx['lat'])]
        yield dx
pdf = sdf.mapInPandas(func, schema='lat double, lng double, timezone string').toPandas()

The above code runs without errors and creates the pandas DataFrame below. The issue is the TimezoneFinder class is initialized within the user-defined function which creates a bottleneck

In [4]: pdf
Out[4]:
    lat    lng         timezone
0  37.0 -113.0  America/Phoenix
1  42.0 -107.0   America/Denver
2  35.0  127.0       Asia/Seoul
3 -22.0   34.0    Africa/Maputo

The question is how to get this code to run more like below, where the TimezoneFinder class is initialized once and outside of the user-defined function. As is, the code below generates this error PicklingError: Could not serialize object: TypeError: cannot pickle '_io.BufferedReader' object

def func(iterator: Iterator[pd.DataFrame]) -> Iterator[pd.DataFrame]:
    for dx in iterator:
        dx['timezone'] = [tzf.timezone_at(lng=a, lat=b) for a, b in zip(dx['lng'], dx['lat'])]
        yield dx
tzf = TimezoneFinder()
pdf = sdf.mapInPandas(func, schema='lat double, lng double, timezone string').toPandas()

UPDATE - Also tried to use functools.partial and an outer function but still received same error. That is, this approach does not work:

def outer(iterator, tzf):
    def func(iterator: Iterator[pd.DataFrame]) -> Iterator[pd.DataFrame]:
        for dx in iterator:
            dx['timezone'] = [tzf.timezone_at(lng=a, lat=b) for a, b in zip(dx['lng'], dx['lat'])]
            yield dx
    return func(iterator)
tzf = TimezoneFinder()
outer = partial(outer, tzf=tzf)
pdf = sdf.mapInPandas(outer, schema='lat double, lng double, timezone string').toPandas()

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(1

海风掠过北极光 2025-02-02 04:02:13

您将需要每个工人的对象的缓存实例。
您可以做到这一点

instance = [None]

def func(iterator: Iterator[pd.DataFrame]) -> Iterator[pd.DataFrame]:
    if instance[0] is None:
        instance[0] = TimezoneFinder()
    tzf = instance[0]
    for dx in iterator:
        dx['timezone'] = [tzf.timezone_at(lng=a, lat=b) for a, b in zip(dx['lng'], dx['lat'])]
        yield dx

,请注意,为此,将在模块中定义您的函数,以使实例缓存在某个地方进行实时。否则,您必须将其悬挂在一些内置模块中,例如os.instance = []

You will need a cached instance of the object on every worker.
You could do that as follows

instance = [None]

def func(iterator: Iterator[pd.DataFrame]) -> Iterator[pd.DataFrame]:
    if instance[0] is None:
        instance[0] = TimezoneFinder()
    tzf = instance[0]
    for dx in iterator:
        dx['timezone'] = [tzf.timezone_at(lng=a, lat=b) for a, b in zip(dx['lng'], dx['lat'])]
        yield dx

Note that for this to work, your function would be defined within a module, to give the instance cache somewhere to live. Else you would have to hang it off some builtin module, e.g., os.instance = [].

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文