如何使用位或位和功能在两个列条件下在两个列条件下更新Pyspark DataFrame列值?

发布于 2025-01-21 15:02:30 字数 1195 浏览 3 评论 0 原文

我需要在 flag ,包含许多标志,每个标志都是 2^n int 编号,添加) > pyspark dataframe在两个条件下,即列(age) value> = 65 和列 flag> flag 不包含新的标志值通过位或位和函数检查的标志值:( flag & newflag )== 0

我已经使用示例dataframe和python脚本演示了我的工作在下面查看),但遇到了一个错误消息。 错误消息是:AnalySisexception:无法解析'( flag 和2)'由于数据类型不匹配:'( flag 和2)'需要布尔式类型,而不是int;

from pyspark.sql.types import StructType,StructField, StringType, IntegerType`
from pyspark.sql.functions import *

# create a data frame with two columns: Age and Flag and three rows
data = [
(61,0),
(65,1),
(66,10)  #previous inserted Flag 2 and 8, add up to 10, Flag is 2^n
]
schema = StructType([ \
StructField("Age",IntegerType(), True), \
StructField("Flag",IntegerType(), True) \
])

df = spark.createDataFrame(data=data,schema=schema)
#df.printSchema()
df.show(truncate=False)

N_FLAG_AGE65=2
new_column = when(
   (col("Age") >= 65) & ((col("Flag") & lit(N_FLAG_AGE65) == 0)), 
   col("Flag")+N_FLAG_AGE65     
).otherwise(col("Flag"))
df = df.withColumn("Flag", new_column)
df.show(truncate=False)

I need to update a column (Flag, containing many flags, each flag is 2^n int number, add up) in a pyspark dataframe under two conditions, i.e. column(Age) value >= 65 and column Flag does not contain the new flag value which is checked by a Bitwise or bit and function: (Flag & newFlag) == 0

I have demonstrated my work using a sample dataframe and python script(plelase see it below) but encountered an error message.
the error message is: AnalysisException: cannot resolve '(Flag AND 2)' due to data type mismatch: '(Flag AND 2)' requires boolean type, not int;

from pyspark.sql.types import StructType,StructField, StringType, IntegerType`
from pyspark.sql.functions import *

# create a data frame with two columns: Age and Flag and three rows
data = [
(61,0),
(65,1),
(66,10)  #previous inserted Flag 2 and 8, add up to 10, Flag is 2^n
]
schema = StructType([ \
StructField("Age",IntegerType(), True), \
StructField("Flag",IntegerType(), True) \
])

df = spark.createDataFrame(data=data,schema=schema)
#df.printSchema()
df.show(truncate=False)

N_FLAG_AGE65=2
new_column = when(
   (col("Age") >= 65) & ((col("Flag") & lit(N_FLAG_AGE65) == 0)), 
   col("Flag")+N_FLAG_AGE65     
).otherwise(col("Flag"))
df = df.withColumn("Flag", new_column)
df.show(truncate=False)

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(1

如果没结果 2025-01-28 15:02:30

构建输入源DF后, df.show(truncate = false)的第一行显示行应该是

+---+----+
|Age|Flag|
+---+----+
|61 |0   |
|65 |1   |
|66 |10  |
+---+----+

我的更新算法,是检查两个列(年龄和标志),如果 age> = 65 和标志位函数不包含n_flag_age65,我们通过 flag = flag = flag+n_flag_age65 更新标志字段。因此,我认为预期的结果应该是

+---+----+
|Age|Flag|
+---+----+
|61 |0   |
|65 |3   |
|66 |10  |
+---+----+

“ new_column”有条件表达式的原始语法无法与 df = df.withcolumn(“ flag”,new_column)

我进行了语法更改,现在,对于以下代码,添加新常数 lit(n_flag_age65) nater column(flag65_exp) and used expr(“ age> = 65 and flag&amp) ; lit(n_flag_age65)= 0 然后 flag+lit(n_flag_age65)else flag end') in df.withColumn )

%python
from pyspark.sql.types import StructType,StructField, 
StringType, IntegerType
from pyspark.sql.functions import *

# create a data frame with two columns: Age and Flag and three 
rows
data = [
(61,0),
(65,1),
(66,10)  #previous inserted Flag 2 and 8, add up to 10, Flag is 
2^n
]
schema = StructType([ \
StructField("Age",IntegerType(), True), \
StructField("Flag",IntegerType(), True) \
])

df = spark.createDataFrame(data=data,schema=schema)
#df.printSchema()
df.show(truncate=False)

N_FLAG_AGE65=2
df=df.withColumn('Flag65_exp', lit(N_FLAG_AGE65))
df = df.withColumn("Flag", expr("case when Age>=65 and Flag & 
lit(N_FLAG_AGE65)=0 then Flag+lit(N_FLAG_AGE65) Else Flag End"))
df.show(truncate=False)
#source df
+---+----+
|Age|Flag|
+---+----+
|61 |0   |
|65 |1   |
|66 |10  |
+---+----+

#updated df 
+---+----+----------+
|Age|Flag|Flag65_exp|
+---+----+----------+
|61 |0   |2         |
|65 |3   |2         |
|66 |10  |2         |
+---+----+----------+

after input source df is constructed, the first display line of df.show(truncate=False) should be

+---+----+
|Age|Flag|
+---+----+
|61 |0   |
|65 |1   |
|66 |10  |
+---+----+

My updating algorithm is to check both columns (Age and Flag), if age >=65 and Flag bit function does not contain N_FLAG_AGE65, we update Flag field by Flag = Flag+N_FLAG_AGE65. Thus, the expected result should be

+---+----+
|Age|Flag|
+---+----+
|61 |0   |
|65 |3   |
|66 |10  |
+---+----+

I think that the original syntax of "new_column" conditional expression won't work with df = df.withColumn("Flag", new_column)

I did syntax change, it works now for the following code by adding a new constant lit(N_FLAG_AGE65) called column(Flag65_exp) and used expr("case when Age>=65 and Flag & lit(N_FLAG_AGE65)=0 then Flag+lit(N_FLAG_AGE65) Else Flag End") in df.withColumn("Flag",expr("..."))

%python
from pyspark.sql.types import StructType,StructField, 
StringType, IntegerType
from pyspark.sql.functions import *

# create a data frame with two columns: Age and Flag and three 
rows
data = [
(61,0),
(65,1),
(66,10)  #previous inserted Flag 2 and 8, add up to 10, Flag is 
2^n
]
schema = StructType([ \
StructField("Age",IntegerType(), True), \
StructField("Flag",IntegerType(), True) \
])

df = spark.createDataFrame(data=data,schema=schema)
#df.printSchema()
df.show(truncate=False)

N_FLAG_AGE65=2
df=df.withColumn('Flag65_exp', lit(N_FLAG_AGE65))
df = df.withColumn("Flag", expr("case when Age>=65 and Flag & 
lit(N_FLAG_AGE65)=0 then Flag+lit(N_FLAG_AGE65) Else Flag End"))
df.show(truncate=False)
#source df
+---+----+
|Age|Flag|
+---+----+
|61 |0   |
|65 |1   |
|66 |10  |
+---+----+

#updated df 
+---+----+----------+
|Age|Flag|Flag65_exp|
+---+----+----------+
|61 |0   |2         |
|65 |3   |2         |
|66 |10  |2         |
+---+----+----------+
~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文