如何使用位或位和功能在两个列条件下在两个列条件下更新Pyspark DataFrame列值?
我需要在 flag ,包含许多标志,每个标志都是
2^n
int 编号,添加) > pyspark dataframe在两个条件下,即列(age)
value> = 65 和列 flag> flag
不包含新的标志值通过位或位和函数检查的标志值:( flag
& newflag
)== 0
我已经使用示例dataframe和python脚本演示了我的工作在下面查看),但遇到了一个错误消息。
错误消息是:AnalySisexception:无法解析'( flag
和2)'由于数据类型不匹配:'( flag
和2)'需要布尔式类型,而不是int;
from pyspark.sql.types import StructType,StructField, StringType, IntegerType`
from pyspark.sql.functions import *
# create a data frame with two columns: Age and Flag and three rows
data = [
(61,0),
(65,1),
(66,10) #previous inserted Flag 2 and 8, add up to 10, Flag is 2^n
]
schema = StructType([ \
StructField("Age",IntegerType(), True), \
StructField("Flag",IntegerType(), True) \
])
df = spark.createDataFrame(data=data,schema=schema)
#df.printSchema()
df.show(truncate=False)
N_FLAG_AGE65=2
new_column = when(
(col("Age") >= 65) & ((col("Flag") & lit(N_FLAG_AGE65) == 0)),
col("Flag")+N_FLAG_AGE65
).otherwise(col("Flag"))
df = df.withColumn("Flag", new_column)
df.show(truncate=False)
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论
评论(1)
构建输入源DF后,
df.show(truncate = false)的第一行显示行应该是
我的更新算法,是检查两个列(年龄和标志),如果
age> = 65
和标志位函数不包含n_flag_age65,我们通过flag = flag = flag+n_flag_age65
更新标志字段。因此,我认为预期的结果应该是“ new_column”有条件表达式的原始语法无法与
df = df.withcolumn(“ flag”,new_column)
我进行了语法更改,现在,对于以下代码,添加新常数
lit(n_flag_age65)
natercolumn(flag65_exp)
and usedexpr(“ age> = 65 and flag&amp) ; lit(n_flag_age65)= 0
然后flag+lit(n_flag_age65)else flag end')
indf.withColumn )
after input source df is constructed, the first display line of
df.show(truncate=False)
should beMy updating algorithm is to check both columns (Age and Flag), if
age >=65
and Flag bit function does not contain N_FLAG_AGE65, we update Flag field byFlag = Flag+N_FLAG_AGE65
. Thus, the expected result should beI think that the original syntax of "new_column" conditional expression won't work with
df = df.withColumn("Flag", new_column)
I did syntax change, it works now for the following code by adding a new constant
lit(N_FLAG_AGE65)
calledcolumn(Flag65_exp)
and usedexpr("case when Age>=65 and Flag & lit(N_FLAG_AGE65)=0
thenFlag+lit(N_FLAG_AGE65) Else Flag End")
indf.withColumn("Flag",expr("..."))