Pyspark DataFrame将损坏的数据写入表
我正在尝试读取CSV文件并将数据加载到Hive外部表,但Spark DataFrame正在向表中添加不良数据。请帮助我找到解决方案。
CREATE EXTERNAL TABLE IF NOT EXISTS customer_360.3P_BRIDG_SITE_test (
SID STRING,
NAME STRING,
STREET_ADDRESS_1 STRING,
STREET_ADDRESS_2 STRING,
CITY STRING,
STATE STRING,
POSTAL_CODE STRING,
COUNTRY STRING,
IS_ACTIVE BOOLEAN,
CREATED_AT TIMESTAMP,
UPDATED_AT TIMESTAMP,
STORE_ID STRING,
SITE_ID BIGINT,
SITE_TIMEZONE STRING,
IS_AGENT_ACTIVE BOOLEAN
)
COMMENT 'BRIDG SITE VIEW DATA'
ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.OpenCSVSerde'
WITH SERDEPROPERTIES('separatorChar' = '|','quoteChar'='"')
STORED AS TEXTFILE
TBLPROPERTIES ('external'='true',
'skip.header.line.count'='1',
'external.table.purge'='true');
我将CSV文件放置在HDFS位置,其中创建了外部表
输入CSV文件,其中包含双引号的字段 “ sid” |“ name” |“ street_address_1” |“ street_address_2” |“ city” |“ state” |“ postal_code” |“ country” |“ is_Active” |“ IS_ACTIVE” |“ create_at” |“ intern_at” |“ updated_at” |“ sport_id” |“ store_id” |“ site_id site_id “ |” site_timezone“ |” is_agent_active” “ Lowes3” |“ 3” |“ 2888 Brice Road” |“” |“哥伦布” |“ OH” |“ 43232” |“ US” |“ true” |“ 2021-12-14 07:48:32.094586” “ 2021-12-14 07:48:32.094586” |“ 3” | -2697538329879565818 |“ America/new_york” “ Lowes4” |“ 4” |“ 2700 Rainier Avenue South” |“” |“ Seattle” |“ WA” |“ 98144” |“ US” |“ true” |“ 2021-12-14 07:48:32.094586” |“ 2021-12-14 07:48:32.094586” |“ 4” | -1298267539761959388 |“ America/Los_angeles” |“ true”
import pyspark.sql.functions as f
import logging
import datetime
from pyspark.sql import SparkSession
from pyspark.sql.types import StringType , BooleanType , TimestampType , IntegerType
from pyspark_llap import HiveWarehouseSession
logging.basicConfig(level=logging.INFO)
spark = (SparkSession
.builder
.appName('load site table')
.config("spark.sql.catalogImplementation", "hive")
.enableHiveSupport()
.getOrCreate()
)
hive = HiveWarehouseSession.session(spark).build()
spark.sparkContext.setLogLevel("ERROR")
if __name__ == "__main__":
try:
logging.info('running application')
df = spark.read.format("csv").option("header",'True').option("delimiter", '|').option("inferSchema",'True').\
option("quote", "\"").load ("/xxx/customer_360.db/SITE_test/x.csv")
df.printSchema()
#df = df.withColumn("INPUT_CREATED_DATE",f.to_timestamp(f.col('INPUT_CREATED_DATE'), "yyyy-MM-dd HH:mm:ss"))
df = df.withColumn ('CREATED_AT' ,f.to_timestamp(f.col ('CREATED_AT'),"yyyy-MM-dd HH:mm:ss"))
df = df.withColumn ('UPDATED_AT' ,f.to_timestamp(f.col ('UPDATED_AT'),"yyyy-MM-dd HH:mm:ss"))
df = df.withColumn ('SITE_ID' , f.col ('SITE_ID').cast ("integer"))
df.printSchema()
final_df = df.select (
f.col ('SID'),
f.col ('NAME') ,
f.col ('STREET_ADDRESS_1') ,
f.col ('STREET_ADDRESS_2') ,
f.col ('CITY') ,
f.col ('STATE') ,
f.col ('POSTAL_CODE') ,
f.col ('COUNTRY') ,
f.col ('IS_ACTIVE') ,
f.col ('CREATED_AT') ,
f.col ('UPDATED_AT') ,
f.col ('STORE_ID') ,
f.col ('SITE_ID') ,
f.col ('SITE_TIMEZONE') ,
f.col ('IS_AGENT_ACTIVE'))
final_df.write.format(HiveWarehouseSession().HIVE_WAREHOUSE_CONNECTOR).mode("append").option("table",'customer_360.SITE_test').save()
except Exception as ex:
logging.info("Error occurred while loading site data")
logging.info(str(ex))
Spark Job将数据加载到表格上,下方是Islog
下方
INFO:root:running application
root
|-- SID: string (nullable = true)
|-- NAME: integer (nullable = true)
|-- STREET_ADDRESS_1: string (nullable = true)
|-- STREET_ADDRESS_2: string (nullable = true)
|-- CITY: string (nullable = true)
|-- STATE: string (nullable = true)
|-- POSTAL_CODE: integer (nullable = true)
|-- COUNTRY: string (nullable = true)
|-- IS_ACTIVE: boolean (nullable = true)
|-- CREATED_AT: timestamp (nullable = true)
|-- UPDATED_AT: timestamp (nullable = true)
|-- STORE_ID: integer (nullable = true)
|-- SITE_ID: long (nullable = true)
|-- SITE_TIMEZONE: string (nullable = true)
|-- IS_AGENT_ACTIVE: boolean (nullable = true)
root
|-- SID: string (nullable = true)
|-- NAME: integer (nullable = true)
|-- STREET_ADDRESS_1: string (nullable = true)
|-- STREET_ADDRESS_2: string (nullable = true)
|-- CITY: string (nullable = true)
|-- STATE: string (nullable = true)
|-- POSTAL_CODE: integer (nullable = true)
|-- COUNTRY: string (nullable = true)
|-- IS_ACTIVE: boolean (nullable = true)
|-- CREATED_AT: timestamp (nullable = true)
|-- UPDATED_AT: timestamp (nullable = true)
|-- STORE_ID: integer (nullable = true)
|-- SITE_ID: integer (nullable = true)
|-- SITE_TIMEZONE: string (nullable = true)
|-- IS_AGENT_ACTIVE: boolean (nullable = true)
在查询表上选择 * *从site.test返回输出以下
返回
? | NULL | NULL | NULL | NULL | NULL | NULL | NULL | NULL | NULL | NULL | NULL | NULL | NULL | NULL |
| P?!E?7?%7?-Do?A??gr???`
-???Y????????????????g????????_??????>?????6?? | ??n???????y/:???]y?Z??ai????^Vn???????;???;< ???e??<???g
?=???S]?}?o?????V?z?h????s?o=??????@>???pkM??z?B????4
i am trying to a read a csv file and load data to an hive external table but spark dataframe is adding bad data to the table.Please help me find a solution to it.
CREATE EXTERNAL TABLE IF NOT EXISTS customer_360.3P_BRIDG_SITE_test (
SID STRING,
NAME STRING,
STREET_ADDRESS_1 STRING,
STREET_ADDRESS_2 STRING,
CITY STRING,
STATE STRING,
POSTAL_CODE STRING,
COUNTRY STRING,
IS_ACTIVE BOOLEAN,
CREATED_AT TIMESTAMP,
UPDATED_AT TIMESTAMP,
STORE_ID STRING,
SITE_ID BIGINT,
SITE_TIMEZONE STRING,
IS_AGENT_ACTIVE BOOLEAN
)
COMMENT 'BRIDG SITE VIEW DATA'
ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.OpenCSVSerde'
WITH SERDEPROPERTIES('separatorChar' = '|','quoteChar'='"')
STORED AS TEXTFILE
TBLPROPERTIES ('external'='true',
'skip.header.line.count'='1',
'external.table.purge'='true');
I have placed the csv file in hdfs location where the external table is created
input csv file with fields enclosed by double quotes
"SID"|"NAME"|"STREET_ADDRESS_1"|"STREET_ADDRESS_2"|"CITY"|"STATE"|"POSTAL_CODE"|"COUNTRY"|"IS_ACTIVE"|"CREATED_AT"|"UPDATED_AT"|"STORE_ID"|"SITE_ID"|"SITE_TIMEZONE"|"IS_AGENT_ACTIVE"
"lowes3"|"3"|"2888 BRICE ROAD"|""|"COLUMBUS"|"OH"|"43232"|"US"|"true"|"2021-12-14 07:48:32.094586"|"2021-12-14 07:48:32.094586"|"3"|-2697538329879565818|"America/New_York"|"true"
"lowes4"|"4"|"2700 RAINIER AVENUE SOUTH"|""|"SEATTLE"|"WA"|"98144"|"US"|"true"|"2021-12-14 07:48:32.094586"|"2021-12-14 07:48:32.094586"|"4"|-1298267539761959388|"America/Los_Angeles"|"true"
import pyspark.sql.functions as f
import logging
import datetime
from pyspark.sql import SparkSession
from pyspark.sql.types import StringType , BooleanType , TimestampType , IntegerType
from pyspark_llap import HiveWarehouseSession
logging.basicConfig(level=logging.INFO)
spark = (SparkSession
.builder
.appName('load site table')
.config("spark.sql.catalogImplementation", "hive")
.enableHiveSupport()
.getOrCreate()
)
hive = HiveWarehouseSession.session(spark).build()
spark.sparkContext.setLogLevel("ERROR")
if __name__ == "__main__":
try:
logging.info('running application')
df = spark.read.format("csv").option("header",'True').option("delimiter", '|').option("inferSchema",'True').\
option("quote", "\"").load ("/xxx/customer_360.db/SITE_test/x.csv")
df.printSchema()
#df = df.withColumn("INPUT_CREATED_DATE",f.to_timestamp(f.col('INPUT_CREATED_DATE'), "yyyy-MM-dd HH:mm:ss"))
df = df.withColumn ('CREATED_AT' ,f.to_timestamp(f.col ('CREATED_AT'),"yyyy-MM-dd HH:mm:ss"))
df = df.withColumn ('UPDATED_AT' ,f.to_timestamp(f.col ('UPDATED_AT'),"yyyy-MM-dd HH:mm:ss"))
df = df.withColumn ('SITE_ID' , f.col ('SITE_ID').cast ("integer"))
df.printSchema()
final_df = df.select (
f.col ('SID'),
f.col ('NAME') ,
f.col ('STREET_ADDRESS_1') ,
f.col ('STREET_ADDRESS_2') ,
f.col ('CITY') ,
f.col ('STATE') ,
f.col ('POSTAL_CODE') ,
f.col ('COUNTRY') ,
f.col ('IS_ACTIVE') ,
f.col ('CREATED_AT') ,
f.col ('UPDATED_AT') ,
f.col ('STORE_ID') ,
f.col ('SITE_ID') ,
f.col ('SITE_TIMEZONE') ,
f.col ('IS_AGENT_ACTIVE'))
final_df.write.format(HiveWarehouseSession().HIVE_WAREHOUSE_CONNECTOR).mode("append").option("table",'customer_360.SITE_test').save()
except Exception as ex:
logging.info("Error occurred while loading site data")
logging.info(str(ex))
spark job loaded data to the table successfully and below islog
INFO:root:running application
root
|-- SID: string (nullable = true)
|-- NAME: integer (nullable = true)
|-- STREET_ADDRESS_1: string (nullable = true)
|-- STREET_ADDRESS_2: string (nullable = true)
|-- CITY: string (nullable = true)
|-- STATE: string (nullable = true)
|-- POSTAL_CODE: integer (nullable = true)
|-- COUNTRY: string (nullable = true)
|-- IS_ACTIVE: boolean (nullable = true)
|-- CREATED_AT: timestamp (nullable = true)
|-- UPDATED_AT: timestamp (nullable = true)
|-- STORE_ID: integer (nullable = true)
|-- SITE_ID: long (nullable = true)
|-- SITE_TIMEZONE: string (nullable = true)
|-- IS_AGENT_ACTIVE: boolean (nullable = true)
root
|-- SID: string (nullable = true)
|-- NAME: integer (nullable = true)
|-- STREET_ADDRESS_1: string (nullable = true)
|-- STREET_ADDRESS_2: string (nullable = true)
|-- CITY: string (nullable = true)
|-- STATE: string (nullable = true)
|-- POSTAL_CODE: integer (nullable = true)
|-- COUNTRY: string (nullable = true)
|-- IS_ACTIVE: boolean (nullable = true)
|-- CREATED_AT: timestamp (nullable = true)
|-- UPDATED_AT: timestamp (nullable = true)
|-- STORE_ID: integer (nullable = true)
|-- SITE_ID: integer (nullable = true)
|-- SITE_TIMEZONE: string (nullable = true)
|-- IS_AGENT_ACTIVE: boolean (nullable = true)
upon querying table select * from site.test returns below output
? | NULL | NULL | NULL | NULL | NULL | NULL | NULL | NULL | NULL | NULL | NULL | NULL | NULL | NULL |
| P?!E?7?%7?-Do?A??gr???`
-???Y????????????????g????????_??????>?????6?? | ??n???????y/:???]y?Z??ai????^Vn???????;???;< ???e??<???g
?=???S]?}?o?????V?z?h????s?o=??????@>???pkM??z?B????4
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。
绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论