从火花数据框架上的熊猫执行预处理操作

发布于 2025-01-30 02:59:36 字数 634 浏览 4 评论 0原文

我有一个相当大的CSV,因此我使用AWS EMR将数据读取到Spark DataFrame中以执行一些操作。我有一个熊猫功能,可以进行一些简单的预处理:

def clean_census_data(df):
    """
    This function cleans the dataframe and drops columns that contain 70% NaN values
    """
    # Replace None or 0 with np.nan
    df = df.replace('None', np.nan)
    # Replace weird numbers
    df = df.replace(-666666666.0, np.nan)
    
    # Drop columns that contain 70% NaN or 0 values
    df = df.loc[:, df.isnull().mean() < .7]
    
    
    return df

我想将此函数应用于Spark DataFrame,但是功能并不相同。我不熟悉Spark,并且在Pandas中执行这些相当简单的操作对我来说并不明显,如何在Spark中执行相同的操作。我知道我可以将火花数据框架转换为熊猫,但这似乎不是很有效。

I have a rather large CSV so I am using AWS EMR to read the data into a Spark dataframe to perform some operations. I have a pandas function that does some simple preprocessing:

def clean_census_data(df):
    """
    This function cleans the dataframe and drops columns that contain 70% NaN values
    """
    # Replace None or 0 with np.nan
    df = df.replace('None', np.nan)
    # Replace weird numbers
    df = df.replace(-666666666.0, np.nan)
    
    # Drop columns that contain 70% NaN or 0 values
    df = df.loc[:, df.isnull().mean() < .7]
    
    
    return df

I want to apply this function onto a Spark dataframe, but the functions are not the same. I am not familiar with Spark and performing these rather simple operations in pandas is not obvious to me how to perform the same operations in Spark. I know I can convert a Spark dataframe into pandas, but that does not seem very efficient.

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(2

冰魂雪魄 2025-02-06 02:59:36

第一个答案,所以请友好。此功能应与pyspark数据框架而不是pandas dataframes一起使用,并且应给您类似的结果:

def clean_census_data(df):
    """
    This function cleans the dataframe and drops columns that contain 70% NaN values
    """
    # Replace None or 0 with np.nan
    df = df.replace('None', None)

    # Replace weird numbers
    df = df.replace(-666666666.0, None)

    # Drop columns that contain 70% NaN or 0 values
    selection_dict = df.select([(count(when(isnan(c) | col(c).isNull() | (col(c).cast('int') == 0), c))/count(c) > .7).alias(c) for c in df.columns]).first().asDict()
    columns_to_remove = [name for name, is_selected in selection_dict.items() if is_selected]
    df = df.drop(*columns_to_remove)

    return df

注意:结果框架中的不包含无代而不是np.nan。

First answer, so please be kind. This function should work with pyspark dataframes instead of pandas dataframes, and should give you similar results:

def clean_census_data(df):
    """
    This function cleans the dataframe and drops columns that contain 70% NaN values
    """
    # Replace None or 0 with np.nan
    df = df.replace('None', None)

    # Replace weird numbers
    df = df.replace(-666666666.0, None)

    # Drop columns that contain 70% NaN or 0 values
    selection_dict = df.select([(count(when(isnan(c) | col(c).isNull() | (col(c).cast('int') == 0), c))/count(c) > .7).alias(c) for c in df.columns]).first().asDict()
    columns_to_remove = [name for name, is_selected in selection_dict.items() if is_selected]
    df = df.drop(*columns_to_remove)

    return df

Attention: The resulting dataframe contains None instead of np.nan.

扭转时空 2025-02-06 02:59:36

本机火花功能可以为每一列进行这样的聚合。
以下数据帧包含零,NAN和零的百分比。

df2 = df1.select(
    [(F.count(F.when(F.isnan(c) | F.isnull(c) | (F.col(c) == 0), c))
     / F.count(F.lit(1))).alias(c) 
     for c in df1.columns]
)

示例:

from pyspark.sql import functions as F
df1 = spark.createDataFrame(
    [(1000, 0, None),
     (None, 2, None),
     (None, 3, 2222),
     (None, 4, 2233),
     (None, 5, 2244)],
    ['c1', 'c2', 'c3'])

df2 = df1.select(
    [(F.count(F.when(F.isnan(c) | F.isnull(c) | (F.col(c) == 0), c))
     / F.count(F.lit(1))).alias(c) 
     for c in df1.columns]
)
df2.show()
# +---+---+---+
# | c1| c2| c3|
# +---+---+---+
# |0.8|0.2|0.4|
# +---+---+---+

剩下的只是从DF1中选择列:

df = df1.select([c for c in df1.columns if df2.head()[c] < .7])
df.show()
# +---+----+
# | c2|  c3|
# +---+----+
# |  0|null|
# |  2|null|
# |  3|2222|
# |  4|2233|
# |  5|2244|
# +---+----+

该百分比是根据此条件计算的,请根据您的需求进行更改:
f.isnan(c)| f.isnull(c)| (f.col(c)== 0)

这将用np.nan替换为np.nan:
df.fillna(np.nan)

这将用NP.NAN替换指定的值:
df.replace(-666666666,np.nan)

Native Spark functions can do such aggregation for every column.
The following dataframe contains the percentage of nulls, nans and zeros.

df2 = df1.select(
    [(F.count(F.when(F.isnan(c) | F.isnull(c) | (F.col(c) == 0), c))
     / F.count(F.lit(1))).alias(c) 
     for c in df1.columns]
)

With an example:

from pyspark.sql import functions as F
df1 = spark.createDataFrame(
    [(1000, 0, None),
     (None, 2, None),
     (None, 3, 2222),
     (None, 4, 2233),
     (None, 5, 2244)],
    ['c1', 'c2', 'c3'])

df2 = df1.select(
    [(F.count(F.when(F.isnan(c) | F.isnull(c) | (F.col(c) == 0), c))
     / F.count(F.lit(1))).alias(c) 
     for c in df1.columns]
)
df2.show()
# +---+---+---+
# | c1| c2| c3|
# +---+---+---+
# |0.8|0.2|0.4|
# +---+---+---+

What remains is just selecting the columns from df1:

df = df1.select([c for c in df1.columns if df2.head()[c] < .7])
df.show()
# +---+----+
# | c2|  c3|
# +---+----+
# |  0|null|
# |  2|null|
# |  3|2222|
# |  4|2233|
# |  5|2244|
# +---+----+

The percentage is calculated based on this condition, change it according to your needs:
F.isnan(c) | F.isnull(c) | (F.col(c) == 0)

This would replace None with np.nan:
df.fillna(np.nan)

This would replace specified value with np.nan:
df.replace(-666666666, np.nan)

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文