Teradata 驱动程序在 Spark 上下文中不可用

发布于 2025-01-16 05:25:17 字数 4756 浏览 0 评论 0原文

我目前正在设置一个包,Spark 作业可以在部署到 Databricks 之前在本地进行测试和调试。为此,我创建了一个包含所有必需的 Pyspark 库的 Docker 映像,并且在 Docker 映像中可以使用 pyspark 数据帧。还可以从可以通过 Maven 安装驱动程序的数据库获取数据(我已经使用 Kusto 数据库对其进行了测试)。

现在我想使用 pyspark 访问 Teradata 数据库中的数据。对于 Teradata,没有可用的 Maven 包,只有从 Teradata 网页获取的 .jar 文件。在 pyspark 中,您可以使用以下命令添加这些 jar:


    spark_conf = SparkConf()
    spark_conf.setAll([
                ("spark.jars", "/usr/app/terajdbc4.jar")
                ])
    
    # here a warning is send out
    #22/03/22 12:56:53 WARN NativeCodeLoader: Unable to load native-hadoop library for your #platform... using builtin-java classes where applicable
    #Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
    #Setting default log level to "WARN".
    #To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
    
    spark = SparkSession.builder.config(conf=spark_conf).getOrCreate()
    
    ## here the error occurs
    spark_df = spark.read.format("jdbc") \
                .option("driver", "com.teradata.jdbc.TeraDriver") \
                .option("url", f"jdbc:teradata://{self.dbhostip}/LOGMECH=LDAP") \
                .option("dbtable", query) \
                .option("user", self.username) \
                .option("password", self.password) \
                .option("fetchsize", 10000) \
                .option("numPartitions", partitions) \
                .load()

但是,spark.read 会遇到错误:

> Traceback (most recent call last):
  File "/databricks/python3/lib/python3.8/site-packages/pyspark/sql/utils.py", line 111, in deco
    return f(*a, **kw)
  File "/databricks/python3/lib/python3.8/site-packages/py4j/protocol.py", line 326, in get_return_value
    raise Py4JJavaError(
py4j.protocol.Py4JJavaError: An error occurred while calling o87.load.
: java.lang.NullPointerException
    at java.util.Hashtable.put(Hashtable.java:460)
    at java.util.Properties.setProperty(Properties.java:166)
    at org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions.$anonfun$asProperties$1(JDBCOptions.scala:51)
    at scala.collection.immutable.HashMap$HashMap1.foreach(HashMap.scala:234)
    at scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:468)
    at org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions.<init>(JDBCOptions.scala:51)
    at org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions.<init>(JDBCOptions.scala:38)
    at org.apache.spark.sql.execution.datasources.jdbc.JdbcRelationProvider.createRelation(JdbcRelationProvider.scala:32)
    at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:355)
    at org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:325)
    at org.apache.spark.sql.DataFrameReader.$anonfun$load$3(DataFrameReader.scala:307)
    at scala.Option.getOrElse(Option.scala:189)
    at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:307)
    at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:225)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
    at py4j.Gateway.invoke(Gateway.java:282)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.GatewayConnection.run(GatewayConnection.java:238)
    at java.lang.Thread.run(Thread.java:748)

Databricks 中使用相同的 jar,并且是通过 Databricks 安装功能安装的。那里的连接工作得很好。

从 Docker 中,我还可以通过一些 python 包使用 python 代码连接到 teradata。

从 Docker 中,与 Kusto 和 Pyspark 的连接也运行良好:


    # Here no warning is send out (compare to .jar init)
    spark = SparkSession.builder.config(
            "spark.jars.packages", "com.microsoft.azure.kusto:kusto-spark_3.0_2.12:2.7.3"
            ).getOrCreate()
    
    kusto_df = spark.read. \
            format("com.microsoft.kusto.spark.datasource"). \
            option("kustoCluster", self.cluster). \
            option("kustoDatabase", self.database). \
            option("kustoQuery", query). \
            option("kustoAadAppId", self.client_id). \
            option("kustoAadAppSecret", self.client_secret). \
            option("kustoAadAuthorityID", self.authority_id). \
            load().repartition(partitions)

现在最大的问题是,为什么它在 Databricks 上工作以使用 Teradata 驱动程序而不是在本地?

I am currently setting up a Package, where Spark Jobs can be tested and debugged locally before they are deployed to Databricks. For this I created a Docker Image with all the neccessary Pyspark libraries and within in the Docker Image it is possible to work with pyspark dataframes. It is also possible to get data from Databases where the Driver can be installed via Maven (I have tested it with the Kusto Database).

Now I would like to access data from the Teradata Database with pyspark. For Teradata there is no Maven package available, only a .jar file which you get from the Teradata webpage. In pyspark you can add those jars with:


    spark_conf = SparkConf()
    spark_conf.setAll([
                ("spark.jars", "/usr/app/terajdbc4.jar")
                ])
    
    # here a warning is send out
    #22/03/22 12:56:53 WARN NativeCodeLoader: Unable to load native-hadoop library for your #platform... using builtin-java classes where applicable
    #Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
    #Setting default log level to "WARN".
    #To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
    
    spark = SparkSession.builder.config(conf=spark_conf).getOrCreate()
    
    ## here the error occurs
    spark_df = spark.read.format("jdbc") \
                .option("driver", "com.teradata.jdbc.TeraDriver") \
                .option("url", f"jdbc:teradata://{self.dbhostip}/LOGMECH=LDAP") \
                .option("dbtable", query) \
                .option("user", self.username) \
                .option("password", self.password) \
                .option("fetchsize", 10000) \
                .option("numPartitions", partitions) \
                .load()

However the spark.read runs into an error:

> Traceback (most recent call last):
  File "/databricks/python3/lib/python3.8/site-packages/pyspark/sql/utils.py", line 111, in deco
    return f(*a, **kw)
  File "/databricks/python3/lib/python3.8/site-packages/py4j/protocol.py", line 326, in get_return_value
    raise Py4JJavaError(
py4j.protocol.Py4JJavaError: An error occurred while calling o87.load.
: java.lang.NullPointerException
    at java.util.Hashtable.put(Hashtable.java:460)
    at java.util.Properties.setProperty(Properties.java:166)
    at org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions.$anonfun$asProperties$1(JDBCOptions.scala:51)
    at scala.collection.immutable.HashMap$HashMap1.foreach(HashMap.scala:234)
    at scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:468)
    at org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions.<init>(JDBCOptions.scala:51)
    at org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions.<init>(JDBCOptions.scala:38)
    at org.apache.spark.sql.execution.datasources.jdbc.JdbcRelationProvider.createRelation(JdbcRelationProvider.scala:32)
    at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:355)
    at org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:325)
    at org.apache.spark.sql.DataFrameReader.$anonfun$load$3(DataFrameReader.scala:307)
    at scala.Option.getOrElse(Option.scala:189)
    at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:307)
    at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:225)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
    at py4j.Gateway.invoke(Gateway.java:282)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.GatewayConnection.run(GatewayConnection.java:238)
    at java.lang.Thread.run(Thread.java:748)

The same jar is used in Databricks and is there installed via the Databricks Installation functionality. There the connection is working perfectly fine.

From the Docker I can also connect to the teradata with python code via some python package.

From Docker the connection with Kusto and Pyspark is also working fine:


    # Here no warning is send out (compare to .jar init)
    spark = SparkSession.builder.config(
            "spark.jars.packages", "com.microsoft.azure.kusto:kusto-spark_3.0_2.12:2.7.3"
            ).getOrCreate()
    
    kusto_df = spark.read. \
            format("com.microsoft.kusto.spark.datasource"). \
            option("kustoCluster", self.cluster). \
            option("kustoDatabase", self.database). \
            option("kustoQuery", query). \
            option("kustoAadAppId", self.client_id). \
            option("kustoAadAppSecret", self.client_secret). \
            option("kustoAadAuthorityID", self.authority_id). \
            load().repartition(partitions)

Now the big question is, why is it working on Databricks to use the Teradata driver and not locally?

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。
列表为空,暂无数据
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文