尝试在Spark DataFrame上使用Johnnow预算管线,但无法在同一会话中读取Delta文件

发布于 2025-01-18 16:36:55 字数 1900 浏览 3 评论 0原文

我正在使用下面的代码从 hdfs 读取 Spark 数据帧:

from delta import *
from pyspark.sql import SparkSession



builder= SparkSession.builder.appName("MyApp") \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")

spark=configure_spark_with_delta_pip(builder).getOrCreate()


#change file path here

delta_df = spark.read.format("delta").load('hdfs://localhost:9000/final_project/data/2022-03-30/')


delta_df.show(10, truncate=False)

并使用下面的代码来使用预训练管道:

from sparknlp.pretrained import PipelineModel
from pyspark.sql import SparkSession
import sparknlp

# spark session one way
spark = SparkSession.builder \
    .appName("Spark NLP")\
    .master("local[4]")\
    .config("spark.driver.memory","16G")\
    .config("spark.driver.maxResultSize", "0") \
    .config("spark.kryoserializer.buffer.max", "2000M")\
    .config("spark.jars.packages", "com.johnsnowlabs.nlp:spark-nlp-spark32_2.12:3.4.2").getOrCreate()


# alternate way #uncomment below to use
#spark=sparknlp.start(spark32=True)


# unzip the file and change path here
pipeline = PipelineModel.load("/home/sidd0613/final_project/classifierdl_bertwiki_finance_sentiment_pipeline_en_3.3.0_2.4_1636617651675")


print("-------")

# creating a spark data frame from the sentence
df=spark.createDataFrame([["As interest rates have increased, housing rents have also increased."]]).toDF('text')

# passing dataframe to the pipeline to derive sentiment
result = pipeline.transform(df)

#printing the result
print(result)

print("DONE!!!")

我希望合并这两个代码,但是两个 Spark 会话没有合并或不能同时处理这两个任务。请帮忙!

我尝试合并两个 Spark 会话的 .config() 选项,但它不起作用 我还尝试创建两个 Spark 会话,但它不起作用,

正常的 Spark 会话足以读取其他格式文件,但要读取增量文件,我必须严格使用此选项:configure_spark_with_delta_pip(builder)

有没有办法绕过这个?或者让代码运行?

i am using the below code to read the spark dataframe from hdfs:

from delta import *
from pyspark.sql import SparkSession



builder= SparkSession.builder.appName("MyApp") \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")

spark=configure_spark_with_delta_pip(builder).getOrCreate()


#change file path here

delta_df = spark.read.format("delta").load('hdfs://localhost:9000/final_project/data/2022-03-30/')


delta_df.show(10, truncate=False)

and below code to use the pretrained pipeline:

from sparknlp.pretrained import PipelineModel
from pyspark.sql import SparkSession
import sparknlp

# spark session one way
spark = SparkSession.builder \
    .appName("Spark NLP")\
    .master("local[4]")\
    .config("spark.driver.memory","16G")\
    .config("spark.driver.maxResultSize", "0") \
    .config("spark.kryoserializer.buffer.max", "2000M")\
    .config("spark.jars.packages", "com.johnsnowlabs.nlp:spark-nlp-spark32_2.12:3.4.2").getOrCreate()


# alternate way #uncomment below to use
#spark=sparknlp.start(spark32=True)


# unzip the file and change path here
pipeline = PipelineModel.load("/home/sidd0613/final_project/classifierdl_bertwiki_finance_sentiment_pipeline_en_3.3.0_2.4_1636617651675")


print("-------")

# creating a spark data frame from the sentence
df=spark.createDataFrame([["As interest rates have increased, housing rents have also increased."]]).toDF('text')

# passing dataframe to the pipeline to derive sentiment
result = pipeline.transform(df)

#printing the result
print(result)

print("DONE!!!")

i wish to merge these two codes but the two spark sessions are not merging or not working for both tasks together. please help!

i tried merging the .config() options of both spark sessions and it did not work
also i tried to create two spark sessions but it did not work

a normal spark session is enough to read other format files but to read a delta file i had to strictly use this option : configure_spark_with_delta_pip(builder)

is there any way to bypass this? or to make the code running?

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(1

小巷里的女流氓 2025-01-25 16:36:55

configure_spark_with_delta_pip只是设置SparkSession的正确参数的快捷方式...如果您查看其源代码您会看到以下代码,您会看到它所做的一切都是配置spark.jars.packages。但是,由于您将其分别用于SparkNLP,因此您正在覆盖Delta的价值。

更新14.04.2022:尚未在答案时发布,但在1.2.0版中可用

来处理此类情况,configure_spark_with_delta_pip具有附加参数extrape_packages以指定其他参数。要配置的软件包。因此,在您的情况下,代码应如下:

builder = SparkSession.builder.appName("MyApp") \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog", 
            "org.apache.spark.sql.delta.catalog.DeltaCatalog")
    .config("spark.driver.memory","16G")\
    .config("spark.driver.maxResultSize", "0") \
    .config("spark.kryoserializer.buffer.max", "2000M")

my_packages = ["com.johnsnowlabs.nlp:spark-nlp-spark32_2.12:3.4.2"]

spark=configure_spark_with_delta_pip(builder, extra_packages=my_packages) \
  .getOrCreate()

在发布额外参数的实现之前,您需要避免使用该功能,然后只需自己配置所有参数,例如:

scala_version = "2.12"
delta_version = "1.1.0"
all_packages = ["com.johnsnowlabs.nlp:spark-nlp-spark32_2.12:3.4.2", 
   f"io.delta:delta-core_{scala_version}:{delta_version}"]

spark = SparkSession.builder.appName("MyApp") \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog", 
            "org.apache.spark.sql.delta.catalog.DeltaCatalog")
    .config("spark.driver.memory","16G")\
    .config("spark.driver.maxResultSize", "0") \
    .config("spark.kryoserializer.buffer.max", "2000M") \
    .config("spark.jars.packages", ",".join(all_packages)) \
    .getOrCreate()

The configure_spark_with_delta_pip is just a shortcut to setup correct parameters of the SparkSession... If you look into its source code you'll see following code, you'll see that everything it's doing is configuring the spark.jars.packages. But because you're using it separately for SparkNLP, you're overwriting Delta's value.

Update 14.04.2022: it wasn't released at time of answer, but available in version 1.2.0

To handle such situations, configure_spark_with_delta_pip has an additional parameter extra_packages to specify additional packages to be configured. So in your case the code should look as following:

builder = SparkSession.builder.appName("MyApp") \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog", 
            "org.apache.spark.sql.delta.catalog.DeltaCatalog")
    .config("spark.driver.memory","16G")\
    .config("spark.driver.maxResultSize", "0") \
    .config("spark.kryoserializer.buffer.max", "2000M")

my_packages = ["com.johnsnowlabs.nlp:spark-nlp-spark32_2.12:3.4.2"]

spark=configure_spark_with_delta_pip(builder, extra_packages=my_packages) \
  .getOrCreate()

Before that implementation with extra parameters is released, you need to avoid using that function, and simply configure all parameters yourself, like this:

scala_version = "2.12"
delta_version = "1.1.0"
all_packages = ["com.johnsnowlabs.nlp:spark-nlp-spark32_2.12:3.4.2", 
   f"io.delta:delta-core_{scala_version}:{delta_version}"]

spark = SparkSession.builder.appName("MyApp") \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog", 
            "org.apache.spark.sql.delta.catalog.DeltaCatalog")
    .config("spark.driver.memory","16G")\
    .config("spark.driver.maxResultSize", "0") \
    .config("spark.kryoserializer.buffer.max", "2000M") \
    .config("spark.jars.packages", ",".join(all_packages)) \
    .getOrCreate()
~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文