Pyspark DataFrame将损坏的数据写入表

发布于 2025-01-22 19:33:15 字数 6396 浏览 0 评论 0原文

我正在尝试读取CSV文件并将数据加载到Hive外部表,但Spark DataFrame正在向表中添加不良数据。请帮助我找到解决方案。

CREATE EXTERNAL TABLE IF NOT EXISTS customer_360.3P_BRIDG_SITE_test (
    SID STRING,
    NAME STRING,
    STREET_ADDRESS_1 STRING,
    STREET_ADDRESS_2 STRING,
    CITY STRING,
    STATE STRING,
    POSTAL_CODE STRING,
    COUNTRY STRING,
    IS_ACTIVE BOOLEAN,
    CREATED_AT TIMESTAMP,
    UPDATED_AT TIMESTAMP,
    STORE_ID STRING,
    SITE_ID BIGINT,
    SITE_TIMEZONE STRING,
    IS_AGENT_ACTIVE BOOLEAN
    )
    COMMENT 'BRIDG SITE VIEW DATA'
    ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.OpenCSVSerde'
    WITH SERDEPROPERTIES('separatorChar' = '|','quoteChar'='"')
    STORED AS TEXTFILE
    TBLPROPERTIES ('external'='true',
                    'skip.header.line.count'='1',
                    'external.table.purge'='true');

我将CSV文件放置在HDFS位置,其中创建了外部表

输入CSV文件,其中包含双引号的字段 “ sid” |“ name” |“ street_address_1” |“ street_address_2” |“ city” |“ state” |“ postal_code” |“ country” |“ is_Active” |“ IS_ACTIVE” |“ create_at” |“ intern_at” |“ updated_at” |“ sport_id” |“ store_id” |“ site_id site_id “ |” site_timezone“ |” is_agent_active” “ Lowes3” |“ 3” |“ 2888 Brice Road” |“” |“哥伦布” |“ OH” |“ 43232” |“ US” |“ true” |“ 2021-12-14 07:48:32.094586” “ 2021-12-14 07:48:32.094586” |“ 3” | -2697538329879565818 |“ America/new_york” “ Lowes4” |“ 4” |“ 2700 Rainier Avenue South” |“” |“ Seattle” |“ WA” |“ 98144” |“ US” |“ true” |“ 2021-12-14 07:48:32.094586” |“ 2021-12-14 07:48:32.094586” |“ 4” | -1298267539761959388 |“ America/Los_angeles” |“ true”

import pyspark.sql.functions as f
import logging
import datetime
from pyspark.sql import SparkSession
from pyspark.sql.types import StringType , BooleanType , TimestampType , IntegerType
from pyspark_llap import HiveWarehouseSession


logging.basicConfig(level=logging.INFO)
spark = (SparkSession
                .builder
                .appName('load site table')
                .config("spark.sql.catalogImplementation", "hive")
                .enableHiveSupport()
                .getOrCreate()
                )
hive = HiveWarehouseSession.session(spark).build()
spark.sparkContext.setLogLevel("ERROR")


if __name__ == "__main__":
    try:
        logging.info('running application')
        df = spark.read.format("csv").option("header",'True').option("delimiter", '|').option("inferSchema",'True').\
             option("quote", "\"").load ("/xxx/customer_360.db/SITE_test/x.csv")
        df.printSchema()
        #df = df.withColumn("INPUT_CREATED_DATE",f.to_timestamp(f.col('INPUT_CREATED_DATE'), "yyyy-MM-dd HH:mm:ss"))
        df = df.withColumn ('CREATED_AT' ,f.to_timestamp(f.col ('CREATED_AT'),"yyyy-MM-dd HH:mm:ss"))
        df = df.withColumn ('UPDATED_AT' ,f.to_timestamp(f.col ('UPDATED_AT'),"yyyy-MM-dd HH:mm:ss"))
        df = df.withColumn ('SITE_ID' , f.col ('SITE_ID').cast ("integer"))
        df.printSchema()
        final_df = df.select (
            f.col ('SID'),
            f.col ('NAME') ,
            f.col ('STREET_ADDRESS_1') ,
            f.col ('STREET_ADDRESS_2') ,
            f.col ('CITY') ,
            f.col ('STATE') ,
            f.col ('POSTAL_CODE') ,
            f.col ('COUNTRY') ,
            f.col ('IS_ACTIVE') ,
            f.col ('CREATED_AT') ,
            f.col ('UPDATED_AT') ,
            f.col ('STORE_ID') ,
            f.col ('SITE_ID') ,
            f.col ('SITE_TIMEZONE') ,
            f.col ('IS_AGENT_ACTIVE'))
        final_df.write.format(HiveWarehouseSession().HIVE_WAREHOUSE_CONNECTOR).mode("append").option("table",'customer_360.SITE_test').save()
    except Exception as ex:
        logging.info("Error occurred while loading site data")
        logging.info(str(ex))

Spark Job将数据加载到表格上,下方是Islog

下方
INFO:root:running application
root
 |-- SID: string (nullable = true)
 |-- NAME: integer (nullable = true)
 |-- STREET_ADDRESS_1: string (nullable = true)
 |-- STREET_ADDRESS_2: string (nullable = true)
 |-- CITY: string (nullable = true)
 |-- STATE: string (nullable = true)
 |-- POSTAL_CODE: integer (nullable = true)
 |-- COUNTRY: string (nullable = true)
 |-- IS_ACTIVE: boolean (nullable = true)
 |-- CREATED_AT: timestamp (nullable = true)
 |-- UPDATED_AT: timestamp (nullable = true)
 |-- STORE_ID: integer (nullable = true)
 |-- SITE_ID: long (nullable = true)
 |-- SITE_TIMEZONE: string (nullable = true)
 |-- IS_AGENT_ACTIVE: boolean (nullable = true)

root
 |-- SID: string (nullable = true)
 |-- NAME: integer (nullable = true)
 |-- STREET_ADDRESS_1: string (nullable = true)
 |-- STREET_ADDRESS_2: string (nullable = true)
 |-- CITY: string (nullable = true)
 |-- STATE: string (nullable = true)
 |-- POSTAL_CODE: integer (nullable = true)
 |-- COUNTRY: string (nullable = true)
 |-- IS_ACTIVE: boolean (nullable = true)
 |-- CREATED_AT: timestamp (nullable = true)
 |-- UPDATED_AT: timestamp (nullable = true)
 |-- STORE_ID: integer (nullable = true)
 |-- SITE_ID: integer (nullable = true)
 |-- SITE_TIMEZONE: string (nullable = true)
 |-- IS_AGENT_ACTIVE: boolean (nullable = true)

在查询表上选择 * *从site.test返回输出以下

返回
?                                                 | NULL                                               | NULL                                               | NULL                                               | NULL                                | NULL                      | NULL                            | NULL                        | NULL                          | NULL                           | NULL                           | NULL                         | NULL                        | NULL                              | NULL                                |
| P?!E?7?%7?-Do?A??gr???`
                         -???Y????????????????g????????_??????>?????6?? | ??n???????y/:???]y?Z??ai????^Vn???????;???;< ???e??<???g
                                                                                                                                  ?=???S]?}?o?????V?z?h????s?o=??????@>???pkM??z?B????4
                                                                                                                                                                                   

i am trying to a read a csv file and load data to an hive external table but spark dataframe is adding bad data to the table.Please help me find a solution to it.

CREATE EXTERNAL TABLE IF NOT EXISTS customer_360.3P_BRIDG_SITE_test (
    SID STRING,
    NAME STRING,
    STREET_ADDRESS_1 STRING,
    STREET_ADDRESS_2 STRING,
    CITY STRING,
    STATE STRING,
    POSTAL_CODE STRING,
    COUNTRY STRING,
    IS_ACTIVE BOOLEAN,
    CREATED_AT TIMESTAMP,
    UPDATED_AT TIMESTAMP,
    STORE_ID STRING,
    SITE_ID BIGINT,
    SITE_TIMEZONE STRING,
    IS_AGENT_ACTIVE BOOLEAN
    )
    COMMENT 'BRIDG SITE VIEW DATA'
    ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.OpenCSVSerde'
    WITH SERDEPROPERTIES('separatorChar' = '|','quoteChar'='"')
    STORED AS TEXTFILE
    TBLPROPERTIES ('external'='true',
                    'skip.header.line.count'='1',
                    'external.table.purge'='true');

I have placed the csv file in hdfs location where the external table is created

input csv file with fields enclosed by double quotes
"SID"|"NAME"|"STREET_ADDRESS_1"|"STREET_ADDRESS_2"|"CITY"|"STATE"|"POSTAL_CODE"|"COUNTRY"|"IS_ACTIVE"|"CREATED_AT"|"UPDATED_AT"|"STORE_ID"|"SITE_ID"|"SITE_TIMEZONE"|"IS_AGENT_ACTIVE"
"lowes3"|"3"|"2888 BRICE ROAD"|""|"COLUMBUS"|"OH"|"43232"|"US"|"true"|"2021-12-14 07:48:32.094586"|"2021-12-14 07:48:32.094586"|"3"|-2697538329879565818|"America/New_York"|"true"
"lowes4"|"4"|"2700 RAINIER AVENUE SOUTH"|""|"SEATTLE"|"WA"|"98144"|"US"|"true"|"2021-12-14 07:48:32.094586"|"2021-12-14 07:48:32.094586"|"4"|-1298267539761959388|"America/Los_Angeles"|"true"

import pyspark.sql.functions as f
import logging
import datetime
from pyspark.sql import SparkSession
from pyspark.sql.types import StringType , BooleanType , TimestampType , IntegerType
from pyspark_llap import HiveWarehouseSession


logging.basicConfig(level=logging.INFO)
spark = (SparkSession
                .builder
                .appName('load site table')
                .config("spark.sql.catalogImplementation", "hive")
                .enableHiveSupport()
                .getOrCreate()
                )
hive = HiveWarehouseSession.session(spark).build()
spark.sparkContext.setLogLevel("ERROR")


if __name__ == "__main__":
    try:
        logging.info('running application')
        df = spark.read.format("csv").option("header",'True').option("delimiter", '|').option("inferSchema",'True').\
             option("quote", "\"").load ("/xxx/customer_360.db/SITE_test/x.csv")
        df.printSchema()
        #df = df.withColumn("INPUT_CREATED_DATE",f.to_timestamp(f.col('INPUT_CREATED_DATE'), "yyyy-MM-dd HH:mm:ss"))
        df = df.withColumn ('CREATED_AT' ,f.to_timestamp(f.col ('CREATED_AT'),"yyyy-MM-dd HH:mm:ss"))
        df = df.withColumn ('UPDATED_AT' ,f.to_timestamp(f.col ('UPDATED_AT'),"yyyy-MM-dd HH:mm:ss"))
        df = df.withColumn ('SITE_ID' , f.col ('SITE_ID').cast ("integer"))
        df.printSchema()
        final_df = df.select (
            f.col ('SID'),
            f.col ('NAME') ,
            f.col ('STREET_ADDRESS_1') ,
            f.col ('STREET_ADDRESS_2') ,
            f.col ('CITY') ,
            f.col ('STATE') ,
            f.col ('POSTAL_CODE') ,
            f.col ('COUNTRY') ,
            f.col ('IS_ACTIVE') ,
            f.col ('CREATED_AT') ,
            f.col ('UPDATED_AT') ,
            f.col ('STORE_ID') ,
            f.col ('SITE_ID') ,
            f.col ('SITE_TIMEZONE') ,
            f.col ('IS_AGENT_ACTIVE'))
        final_df.write.format(HiveWarehouseSession().HIVE_WAREHOUSE_CONNECTOR).mode("append").option("table",'customer_360.SITE_test').save()
    except Exception as ex:
        logging.info("Error occurred while loading site data")
        logging.info(str(ex))

spark job loaded data to the table successfully and below islog

INFO:root:running application
root
 |-- SID: string (nullable = true)
 |-- NAME: integer (nullable = true)
 |-- STREET_ADDRESS_1: string (nullable = true)
 |-- STREET_ADDRESS_2: string (nullable = true)
 |-- CITY: string (nullable = true)
 |-- STATE: string (nullable = true)
 |-- POSTAL_CODE: integer (nullable = true)
 |-- COUNTRY: string (nullable = true)
 |-- IS_ACTIVE: boolean (nullable = true)
 |-- CREATED_AT: timestamp (nullable = true)
 |-- UPDATED_AT: timestamp (nullable = true)
 |-- STORE_ID: integer (nullable = true)
 |-- SITE_ID: long (nullable = true)
 |-- SITE_TIMEZONE: string (nullable = true)
 |-- IS_AGENT_ACTIVE: boolean (nullable = true)

root
 |-- SID: string (nullable = true)
 |-- NAME: integer (nullable = true)
 |-- STREET_ADDRESS_1: string (nullable = true)
 |-- STREET_ADDRESS_2: string (nullable = true)
 |-- CITY: string (nullable = true)
 |-- STATE: string (nullable = true)
 |-- POSTAL_CODE: integer (nullable = true)
 |-- COUNTRY: string (nullable = true)
 |-- IS_ACTIVE: boolean (nullable = true)
 |-- CREATED_AT: timestamp (nullable = true)
 |-- UPDATED_AT: timestamp (nullable = true)
 |-- STORE_ID: integer (nullable = true)
 |-- SITE_ID: integer (nullable = true)
 |-- SITE_TIMEZONE: string (nullable = true)
 |-- IS_AGENT_ACTIVE: boolean (nullable = true)

upon querying table select * from site.test returns below output

?                                                 | NULL                                               | NULL                                               | NULL                                               | NULL                                | NULL                      | NULL                            | NULL                        | NULL                          | NULL                           | NULL                           | NULL                         | NULL                        | NULL                              | NULL                                |
| P?!E?7?%7?-Do?A??gr???`
                         -???Y????????????????g????????_??????>?????6?? | ??n???????y/:???]y?Z??ai????^Vn???????;???;< ???e??<???g
                                                                                                                                  ?=???S]?}?o?????V?z?h????s?o=??????@>???pkM??z?B????4
                                                                                                                                                                                   

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。
列表为空,暂无数据
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文