如何在AWS胶Python Shell作业中插入数据中的数据库?
我在S3中有大量数据。在我的python胶合作业中,我将以熊猫数据框架的形式从这些文件中提取数据,并在数据框架上应用必要的转换,然后使用PYMSSSQL库将其加载到Microsoft SQL数据库中。最终数据框架中包含100-200k行和180列的数据。目前,我正在使用PYMSSQL连接到数据库。问题是光标类的执行人数花费太多以加载数据。大约20分钟100k行。我检查了日志,始终是加载很慢。 已连接的屏幕快照。如何更快地加载它们?我在这里附加代码:
file=s3.get_object(Bucket=S3_BUCKET_NAME,Key=each_file)
for chunk in pd.read_csv(file['Body'],sep=",",header=None,low_memory=False,chunksize=100000):
all_data.append(chunk)
data_frame = pd.concat(all_data, axis= 0)
all_data.clear()
cols = data_frame.select_dtypes(object).columns
data_frame[cols] = data_frame[cols].apply(lambda x: x.str.strip())
data_frame.replace(to_replace ='',value =np.nan,inplace=True)
data_frame.fillna(value=np.nan, inplace=True)
data_frame.insert(0,'New-column', 1111)
sql_data_array =data_frame.replace({np.nan:None}).to_numpy()
sql_data_tuple=tuple(map(tuple, sql_data_array))
try:
sql="insert into [db].[schema].[table](column_names)values(%d,%s,%s,%s,%s,%s...)"
db_cursor.executemany(sql,sql_data_tuple)
print("loading completed on {}".format(datetime.datetime.now()))
except Exception as e:
print(e)
I have large sets of data in s3. In my Python glue job, I will be extracting data from those files in the form of a pandas data frame and apply necessary transformations on the data frame and then load it into Microsoft SQL database using PYMSSQL library. The final data frame contains an average of 100-200K rows and 180 columns of data. Currently I am using PYMSSQL to connect to the database. The problem is executemany of the cursor class takes too much to load the data. Approximately 20 Min for 100k rows. I checked the logs and it was always the loading which is slow. screenshot attached. How to load them faster? I am attaching my code here:
file=s3.get_object(Bucket=S3_BUCKET_NAME,Key=each_file)
for chunk in pd.read_csv(file['Body'],sep=",",header=None,low_memory=False,chunksize=100000):
all_data.append(chunk)
data_frame = pd.concat(all_data, axis= 0)
all_data.clear()
cols = data_frame.select_dtypes(object).columns
data_frame[cols] = data_frame[cols].apply(lambda x: x.str.strip())
data_frame.replace(to_replace ='',value =np.nan,inplace=True)
data_frame.fillna(value=np.nan, inplace=True)
data_frame.insert(0,'New-column', 1111)
sql_data_array =data_frame.replace({np.nan:None}).to_numpy()
sql_data_tuple=tuple(map(tuple, sql_data_array))
try:
sql="insert into [db].[schema].[table](column_names)values(%d,%s,%s,%s,%s,%s...)"
db_cursor.executemany(sql,sql_data_tuple)
print("loading completed on {}".format(datetime.datetime.now()))
except Exception as e:
print(e)
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论
评论(2)
如果有人正在寻找另一个解决方案。这是我的方法。
为了强制插入到RDS上托管的SQL Server中,我进行了以下操作:
- 用户很重要-Jars-First
If anyone is looking for another solution. Here is mine approach.
To force bulk inserts into SQL Server hosted on RDS I did the following:
--user-jars-first
我最终做到了这一点,并给了我更好的结果(在11分钟内100万):
(使用胶水2.0 python作业而不是python shell作业)
这是我使用的代码:
I ended up doing this and gave me much better results(1 Million in 11 Min):
(Use Glue 2.0 python job instead of python shell job)
Here is the code I've used: