如何在AWS胶Python Shell作业中插入数据中的数据库?

发布于 2025-02-11 00:17:57 字数 1223 浏览 1 评论 0原文

我在S3中有大量数据。在我的python胶合作业中,我将以熊猫数据框架的形式从这些文件中提取数据,并在数据框架上应用必要的转换,然后使用PYMSSSQL库将其加载到Microsoft SQL数据库中。最终数据框架中包含100-200k行和180列的数据。目前,我正在使用PYMSSQL连接到数据库。问题是光标类的执行人数花费太多以加载数据。大约20分钟100k行。我检查了日志,始终是加载很慢。 已连接的屏幕快照。如何更快地加载它们?我在这里附加代码:

file=s3.get_object(Bucket=S3_BUCKET_NAME,Key=each_file)
for chunk in pd.read_csv(file['Body'],sep=",",header=None,low_memory=False,chunksize=100000):
 all_data.append(chunk)

data_frame = pd.concat(all_data, axis= 0)
all_data.clear()
cols = data_frame.select_dtypes(object).columns
    data_frame[cols] = data_frame[cols].apply(lambda x: x.str.strip())
    data_frame.replace(to_replace ='',value =np.nan,inplace=True)
    data_frame.fillna(value=np.nan, inplace=True)
    data_frame.insert(0,'New-column', 1111)
    sql_data_array =data_frame.replace({np.nan:None}).to_numpy()
    sql_data_tuple=tuple(map(tuple, sql_data_array))
try:
    sql="insert into [db].[schema].[table](column_names)values(%d,%s,%s,%s,%s,%s...)"
    db_cursor.executemany(sql,sql_data_tuple)
    print("loading completed on {}".format(datetime.datetime.now()))
except Exception as e:
    print(e)

I have large sets of data in s3. In my Python glue job, I will be extracting data from those files in the form of a pandas data frame and apply necessary transformations on the data frame and then load it into Microsoft SQL database using PYMSSQL library. The final data frame contains an average of 100-200K rows and 180 columns of data. Currently I am using PYMSSQL to connect to the database. The problem is executemany of the cursor class takes too much to load the data. Approximately 20 Min for 100k rows. I checked the logs and it was always the loading which is slow. screenshot attached. How to load them faster? I am attaching my code here:

file=s3.get_object(Bucket=S3_BUCKET_NAME,Key=each_file)
for chunk in pd.read_csv(file['Body'],sep=",",header=None,low_memory=False,chunksize=100000):
 all_data.append(chunk)

data_frame = pd.concat(all_data, axis= 0)
all_data.clear()
cols = data_frame.select_dtypes(object).columns
    data_frame[cols] = data_frame[cols].apply(lambda x: x.str.strip())
    data_frame.replace(to_replace ='',value =np.nan,inplace=True)
    data_frame.fillna(value=np.nan, inplace=True)
    data_frame.insert(0,'New-column', 1111)
    sql_data_array =data_frame.replace({np.nan:None}).to_numpy()
    sql_data_tuple=tuple(map(tuple, sql_data_array))
try:
    sql="insert into [db].[schema].[table](column_names)values(%d,%s,%s,%s,%s,%s...)"
    db_cursor.executemany(sql,sql_data_tuple)
    print("loading completed on {}".format(datetime.datetime.now()))
except Exception as e:
    print(e)

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(2

女中豪杰 2025-02-18 00:17:57

如果有人正在寻找另一个解决方案。这是我的方法。

为了强制插入到RDS上托管的SQL Server中,我进行了以下操作:

  1. 将自定义JAR文件上传到胶水作业中(JDBC 8 + Azure Spark)。
  1. 使用GLUE的- 用户很重要-Jars-First

将此值设置为true时,它优先考虑客户在类路径中的额外JAR文件。此选项仅在AWS GLUE 2.0版或更高版本中可用。

If anyone is looking for another solution. Here is mine approach.

To force bulk inserts into SQL Server hosted on RDS I did the following:

  1. Upload custom jar files into the Glue job (JDBC 8 + Azure Spark).
  1. Important to use Glue's --user-jars-first

When setting this value to true, it prioritizes the customer's extra JAR files in the classpath. This option is only available in AWS Glue version 2.0 or later.

猫弦 2025-02-18 00:17:57

我最终做到了这一点,并给了我更好的结果(在11分钟内100万):
(使用胶水2.0 python作业而不是python shell作业)

  1. 从S3提取数据
  2. ,使用pandas
  3. 将其转换为CSV作为CSV转换为S3。
  4. 从目录表中创建了一个动态帧,该框架是使用爬车手创建的,该帧是通过爬行转换后的CSV文件来创建的。或者,您可以使用选项直接创建动态帧。
  5. 将动态框架同步到使用crawler创建的目录表,通过爬行目标MSSQL表来同步。

这是我使用的代码:

csv_buffer = StringIO()
s3_resource = boto3.resource("s3", region_name=AWS_REGION)
file = s3.get_object(Bucket=S3_BUCKET_NAME, Key=each_file)
for chunk in pd.read_csv(file['Body'], sep=",", header=None, low_memory=False, chunksize=100000):
    all_data.append(chunk)

data_frame = pd.concat(all_data, axis=0)
all_data.clear()
cols = data_frame.select_dtypes(object).columns
data_frame[cols] = data_frame[cols].apply(lambda x: x.str.strip())
data_frame.replace(to_replace='', value=np.nan, inplace=True)
data_frame.fillna(value=np.nan, inplace=True)
data_frame.insert(0, 'New-column', 1234)

data_frame.to_csv(csv_buffer)
result = s3_resource.Object(S3_BUCKET_NAME, 'path in s3').put(Body=csv_buffer.getvalue())
datasource0 = glueContext.create_dynamic_frame.from_catalog(database="source db name", table_name="source table name",
                                                            transformation_ctx="datasource0")

applymapping1 = ApplyMapping.apply(frame=datasource0, mappings=[mappings], transformation_ctx="applymapping1")

selectfields2 = SelectFields.apply(frame=applymapping1, paths=[column names of destination catalog table],
                                   transformation_ctx="selectfields2")

resolvechoice3 = ResolveChoice.apply(frame=selectfields2, choice="MATCH_CATALOG", database="destination dbname",
                                     table_name="destination table name", transformation_ctx="resolvechoice3")

resolvechoice4 = ResolveChoice.apply(frame=resolvechoice3, choice="make_cols", transformation_ctx="resolvechoice4")

datasink5 = glueContext.write_dynamic_frame.from_catalog(frame=resolvechoice4, database="destination db name",
                                                         table_name="destination table name",
                                                         transformation_ctx="datasink5")

job.commit()

I ended up doing this and gave me much better results(1 Million in 11 Min):
(Use Glue 2.0 python job instead of python shell job)

  1. Extracted the data from s3
  2. Transformed it using Pandas
  3. Uploaded the transformed file as a CSV to s3.
  4. Created a dynamic frame from a catalog table that was created using a crawler by crawling the transformed CSV file. Or You can create dynamic frame directly using Options.
  5. Synchronize the dynamic frame to the catalog table that was created using a crawler by crawling the Destination MSSQL table.

Here is the code I've used:

csv_buffer = StringIO()
s3_resource = boto3.resource("s3", region_name=AWS_REGION)
file = s3.get_object(Bucket=S3_BUCKET_NAME, Key=each_file)
for chunk in pd.read_csv(file['Body'], sep=",", header=None, low_memory=False, chunksize=100000):
    all_data.append(chunk)

data_frame = pd.concat(all_data, axis=0)
all_data.clear()
cols = data_frame.select_dtypes(object).columns
data_frame[cols] = data_frame[cols].apply(lambda x: x.str.strip())
data_frame.replace(to_replace='', value=np.nan, inplace=True)
data_frame.fillna(value=np.nan, inplace=True)
data_frame.insert(0, 'New-column', 1234)

data_frame.to_csv(csv_buffer)
result = s3_resource.Object(S3_BUCKET_NAME, 'path in s3').put(Body=csv_buffer.getvalue())
datasource0 = glueContext.create_dynamic_frame.from_catalog(database="source db name", table_name="source table name",
                                                            transformation_ctx="datasource0")

applymapping1 = ApplyMapping.apply(frame=datasource0, mappings=[mappings], transformation_ctx="applymapping1")

selectfields2 = SelectFields.apply(frame=applymapping1, paths=[column names of destination catalog table],
                                   transformation_ctx="selectfields2")

resolvechoice3 = ResolveChoice.apply(frame=selectfields2, choice="MATCH_CATALOG", database="destination dbname",
                                     table_name="destination table name", transformation_ctx="resolvechoice3")

resolvechoice4 = ResolveChoice.apply(frame=resolvechoice3, choice="make_cols", transformation_ctx="resolvechoice4")

datasink5 = glueContext.write_dynamic_frame.from_catalog(frame=resolvechoice4, database="destination db name",
                                                         table_name="destination table name",
                                                         transformation_ctx="datasink5")

job.commit()
~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文