将目录中的许多文件转换为 blob 到 DB2
我对此很陌生。我需要转换所有文件(.csv 和 xlsx),即 Supermarket.xlsx、sales.csv、marketing xlsx,这些文件上传到目录中,并将其转换为 blob 数据到 DB2,表名称 SB_DATA_BLOB_TEST,字段名称为“data_column”、“摄取日期时间”、“摄取文件名”、“行 ID”。
我只设法插入 1 个文件,并说明时间戳、文件名和 row_id,但是如何将相同的函数应用于上传到该目录的文件列表,并应用时间戳、相应地列出文件名和 row_id,而不插入这个 row_id 也是手动的吗?
代码:
import os
import pandas as pd
from subprocess import Popen, PIPE, run
import jaydebeapi
from project_lib import Project
constants = {
'INPUT_DIR': '/project_data/data_asset/'
}
file_names = {
'Supermart': 'Supermart.xlsx'
}
schema_name = 'ABC.'
table_prefix = 'SB_'
timestamp = pd.Timestamp.now("Asia/Singapore").strftime("%Y%m%d %H%M%S")
file = constants['INPUT_DIR'] + file_names['Supermart'] ## data
filename = constants['INPUT_DIR'] + file_names['Supermart'] ## ingestion_file_name
def convertToBinaryData(filename):
# Convert digital data to binary format
with open(filename, 'rb') as file:
binaryData = file.read()
return binaryData
def insertBLOB(data, ingestion_datetime, ingestion_filename, row_id):
print("Inserting BLOB into ABC SB_Data_Blob table")
try:
project = Project.access()
abc_sb_credentials = project.get_connection(name="abc_sb")
print(abc_sb_credentials)
abc_sb_connection = jaydebeapi.connect('com.ibm.db2.jcc.DB2Driver',
'{}://{}:{}/{}:user={};password={};'.format('jdbc:db2',
abc_sb_credentials['host'],
abc_sb_credentials['port'],
abc_sb_credentials['database'],
abc_sb_credentials['username'],
abc_sb_credentials['password']))
curs = abc_sb_connection.cursor()
sql_insert_blob_query = """ INSERT INTO ABC.SB_DATA_BLOB_TEST
(data_column, ingestion_date_time, ingestion_file_name, row_id) VALUES (?,?,?,?)"""
file = convertToBinaryData(data)
# Convert data into tuple format
insert_blob_tuple = (jaydebeapi.Binary(file), ingestion_datetime, ingestion_filename, row_id)
result = curs.execute(sql_insert_blob_query, insert_blob_tuple)
abc_sb_connection.commit()
print("File is inserted successfully as a BLOB into SB_DATA_BLOB table", result)
except Exception as error:
print(f"{error}")
print("Failed inserting BLOB data into DB2 table SB_DATA_BLOB".format(error))
finally:
## if abc_sb_connection.is_connected():
curs.close()
abc_sb_connection.close()
print("DB2 connection is closed")
insertBLOB(file, timestamp, filename, '2')
I'm rather new to this. I need to convert all files (.csv and xlsx) i.e Supermarket.xlsx, sales.csv, marketing xlsx which are uploaded into a directory and convert it into a blob data into DB2, table name SB_DATA_BLOB_TEST with field names "data_column", "ingestion_date_time", "ingestion_file_name", "row_id".
I have only managed to insert 1 file, and state the timestamp, filename and row_id, but how can I apply the same function to a list of files uploaded to that directory, and apply the timestamp, list the filenames and the row_id accordingly without inserting this row_id manually as well?
The code:
import os
import pandas as pd
from subprocess import Popen, PIPE, run
import jaydebeapi
from project_lib import Project
constants = {
'INPUT_DIR': '/project_data/data_asset/'
}
file_names = {
'Supermart': 'Supermart.xlsx'
}
schema_name = 'ABC.'
table_prefix = 'SB_'
timestamp = pd.Timestamp.now("Asia/Singapore").strftime("%Y%m%d %H%M%S")
file = constants['INPUT_DIR'] + file_names['Supermart'] ## data
filename = constants['INPUT_DIR'] + file_names['Supermart'] ## ingestion_file_name
def convertToBinaryData(filename):
# Convert digital data to binary format
with open(filename, 'rb') as file:
binaryData = file.read()
return binaryData
def insertBLOB(data, ingestion_datetime, ingestion_filename, row_id):
print("Inserting BLOB into ABC SB_Data_Blob table")
try:
project = Project.access()
abc_sb_credentials = project.get_connection(name="abc_sb")
print(abc_sb_credentials)
abc_sb_connection = jaydebeapi.connect('com.ibm.db2.jcc.DB2Driver',
'{}://{}:{}/{}:user={};password={};'.format('jdbc:db2',
abc_sb_credentials['host'],
abc_sb_credentials['port'],
abc_sb_credentials['database'],
abc_sb_credentials['username'],
abc_sb_credentials['password']))
curs = abc_sb_connection.cursor()
sql_insert_blob_query = """ INSERT INTO ABC.SB_DATA_BLOB_TEST
(data_column, ingestion_date_time, ingestion_file_name, row_id) VALUES (?,?,?,?)"""
file = convertToBinaryData(data)
# Convert data into tuple format
insert_blob_tuple = (jaydebeapi.Binary(file), ingestion_datetime, ingestion_filename, row_id)
result = curs.execute(sql_insert_blob_query, insert_blob_tuple)
abc_sb_connection.commit()
print("File is inserted successfully as a BLOB into SB_DATA_BLOB table", result)
except Exception as error:
print(f"{error}")
print("Failed inserting BLOB data into DB2 table SB_DATA_BLOB".format(error))
finally:
## if abc_sb_connection.is_connected():
curs.close()
abc_sb_connection.close()
print("DB2 connection is closed")
insertBLOB(file, timestamp, filename, '2')
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。
绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论
评论(1)
中的所有这些文件。
假设目录由
constants['INPUT_DIR']
给出,文件名应按扩展名.xlsx
和.csv
过滤,然后使用 Python模块glob
及其方法全局
。假设您的目录包含:
Supermart.xlsx
Supermart.csv
那么列表将仅包含以下名称:
['Supermart.xlsx', 'Supermart.csv' ]
。另请参阅:
List all those files in your directory.
Suppose the directory is given by
constants['INPUT_DIR']
and the filenames should be filtered by extension.xlsx
and.csv
, then use Pythons moduleglob
and its methodglob
.Supposes your directory contains:
Supermart.xlsx
Supermart.csv
then the list will consist only the names like:
['Supermart.xlsx', 'Supermart.csv']
.See also: