尝试在Spark DataFrame上使用Johnnow预算管线,但无法在同一会话中读取Delta文件
我正在使用下面的代码从 hdfs 读取 Spark 数据帧:
from delta import *
from pyspark.sql import SparkSession
builder= SparkSession.builder.appName("MyApp") \
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
.config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")
spark=configure_spark_with_delta_pip(builder).getOrCreate()
#change file path here
delta_df = spark.read.format("delta").load('hdfs://localhost:9000/final_project/data/2022-03-30/')
delta_df.show(10, truncate=False)
并使用下面的代码来使用预训练管道:
from sparknlp.pretrained import PipelineModel
from pyspark.sql import SparkSession
import sparknlp
# spark session one way
spark = SparkSession.builder \
.appName("Spark NLP")\
.master("local[4]")\
.config("spark.driver.memory","16G")\
.config("spark.driver.maxResultSize", "0") \
.config("spark.kryoserializer.buffer.max", "2000M")\
.config("spark.jars.packages", "com.johnsnowlabs.nlp:spark-nlp-spark32_2.12:3.4.2").getOrCreate()
# alternate way #uncomment below to use
#spark=sparknlp.start(spark32=True)
# unzip the file and change path here
pipeline = PipelineModel.load("/home/sidd0613/final_project/classifierdl_bertwiki_finance_sentiment_pipeline_en_3.3.0_2.4_1636617651675")
print("-------")
# creating a spark data frame from the sentence
df=spark.createDataFrame([["As interest rates have increased, housing rents have also increased."]]).toDF('text')
# passing dataframe to the pipeline to derive sentiment
result = pipeline.transform(df)
#printing the result
print(result)
print("DONE!!!")
我希望合并这两个代码,但是两个 Spark 会话没有合并或不能同时处理这两个任务。请帮忙!
我尝试合并两个 Spark 会话的 .config() 选项,但它不起作用 我还尝试创建两个 Spark 会话,但它不起作用,
正常的 Spark 会话足以读取其他格式文件,但要读取增量文件,我必须严格使用此选项:configure_spark_with_delta_pip(builder)
有没有办法绕过这个?或者让代码运行?
i am using the below code to read the spark dataframe from hdfs:
from delta import *
from pyspark.sql import SparkSession
builder= SparkSession.builder.appName("MyApp") \
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
.config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")
spark=configure_spark_with_delta_pip(builder).getOrCreate()
#change file path here
delta_df = spark.read.format("delta").load('hdfs://localhost:9000/final_project/data/2022-03-30/')
delta_df.show(10, truncate=False)
and below code to use the pretrained pipeline:
from sparknlp.pretrained import PipelineModel
from pyspark.sql import SparkSession
import sparknlp
# spark session one way
spark = SparkSession.builder \
.appName("Spark NLP")\
.master("local[4]")\
.config("spark.driver.memory","16G")\
.config("spark.driver.maxResultSize", "0") \
.config("spark.kryoserializer.buffer.max", "2000M")\
.config("spark.jars.packages", "com.johnsnowlabs.nlp:spark-nlp-spark32_2.12:3.4.2").getOrCreate()
# alternate way #uncomment below to use
#spark=sparknlp.start(spark32=True)
# unzip the file and change path here
pipeline = PipelineModel.load("/home/sidd0613/final_project/classifierdl_bertwiki_finance_sentiment_pipeline_en_3.3.0_2.4_1636617651675")
print("-------")
# creating a spark data frame from the sentence
df=spark.createDataFrame([["As interest rates have increased, housing rents have also increased."]]).toDF('text')
# passing dataframe to the pipeline to derive sentiment
result = pipeline.transform(df)
#printing the result
print(result)
print("DONE!!!")
i wish to merge these two codes but the two spark sessions are not merging or not working for both tasks together. please help!
i tried merging the .config() options of both spark sessions and it did not work
also i tried to create two spark sessions but it did not work
a normal spark session is enough to read other format files but to read a delta file i had to strictly use this option : configure_spark_with_delta_pip(builder)
is there any way to bypass this? or to make the code running?
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论
评论(1)
configure_spark_with_delta_pip
只是设置SparkSession的正确参数的快捷方式...如果您查看其源代码您会看到以下代码,您会看到它所做的一切都是配置spark.jars.packages
。但是,由于您将其分别用于SparkNLP,因此您正在覆盖Delta的价值。更新14.04.2022:尚未在答案时发布,但在1.2.0版中可用
来处理此类情况,
configure_spark_with_delta_pip
具有附加参数extrape_packages
以指定其他参数。要配置的软件包。因此,在您的情况下,代码应如下:在发布额外参数的实现之前,您需要避免使用该功能,然后只需自己配置所有参数,例如:
The
configure_spark_with_delta_pip
is just a shortcut to setup correct parameters of the SparkSession... If you look into its source code you'll see following code, you'll see that everything it's doing is configuring thespark.jars.packages
. But because you're using it separately for SparkNLP, you're overwriting Delta's value.Update 14.04.2022: it wasn't released at time of answer, but available in version 1.2.0
To handle such situations,
configure_spark_with_delta_pip
has an additional parameterextra_packages
to specify additional packages to be configured. So in your case the code should look as following:Before that implementation with extra parameters is released, you need to avoid using that function, and simply configure all parameters yourself, like this: