乔布里(Joblib)给我带来模棱两可的结果
我正在尝试从teradata获取数据
select ... from table1_1
union all
select .. from table_2
union all
select ... from table_3
- 注意:一个或多个选择..
可能会失败,这不应导致整个联盟失败。
from .base import Base
from joblib import Parallel, delayed
import re
import pandas as pd
class TeradataWithSpark(Base):
def __init__(self, spark, host=None, port=None, database=None, username=None, password=None):
super().__init__(spark, host, port, database, username, password)
self._reader = self._spark.read.format("jdbc") \
.option("url", f'jdbc:teradata://{self._host}/Database={self._database},LOGMECH=LDAP') \
.option("user", self._username) \
.option("password", self._password) \
.option("driver", "com.teradata.jdbc.TeraDriver")
def run_query(self, query, return_pandasDF=True):
spark_df = self._reader.option('dbtable', f"({query}) as tbl").load()
if return_pandasDF:
return spark_df.toPandas()
else:
return spark_df
def run_queries_and_union_all(self, queries, return_pandasDF=True):
def run(query):
try:
return self._reader.option('dbtable', f"({query}) as tbl").load().toPandas()
except Exception as e:
return None
dfs = Parallel(n_jobs=10, prefer='threads')(delayed(run)(q) for q in queries)
concat_df = pd.concat(dfs).reset_index(drop=True)
if return_pandasDF:
return concat_df
else:
return self._spark.createDataFrame(concat_df)
def split_query_and_run_individually(self, query, separator='union all', return_pandasDF=True):
queries = re.split(separator, query, flags=re.IGNORECASE)
return self.run_queries_and_union_all(queries, return_pandasDF)
您可以看到> split_query_and_run_individaly
方法基于union all
将查询分配,然后在并行线程中运行所有子查询n_jobs = 10
。
但是我面临的问题是,当我增加线程的数量时,数据被损坏了,
n_jobs = 1
src_tbl total_count data_date
0 dsl_dim_mdm_.................... 61 2022-03-17
1 dsl_agg_call.................... 3992202 2022-03-27
2 dsl_call_ac.................... 924719 2022-03-27
3 dsl_dim_acc.................... 4762 2022-03-31
4 .................... 6821 2022-03-31
5 dsl_dim_geo_.................... 8610038 2022-04-05
6 dsl.................... 67116 2022-03-31
7 dsl_rl.................... 2087669 2022-04-06
8 dsl_.................... 154 2022-04-01
9 dsl_.................... 85630 2022-03-27
10 dsl_selling_da.................... 53 2021-03-03
11 dsl_speaker_ev.................... 17765 2022-03-31
12 dsl_speak.................... 26269 2022-08-24
13 dsl_speaker_e.................... 4202 2022-04-05
14 ds.................... 268 2022-03-31
15 dsl_rltn_r.................... 255794 2022-03-18
16 dsl_rltn_nr.................... 12088 2022-03-18
17 dsl_rapp.................... 81182 2022-01-01
18 dsl_dim_physi.................... 109299 2022-03-31
19 dsl.................... 4265 2022-02-01
20 dsl_fac.................... 117978 2022-04-03
21 dsl_coachi.................... 242 2022-03-31
22 dsl_speaker_e.................... 16653 2022-03-31
23 dsl_dim_cal.................... 17817 2099-12-31
24 dsl_rltn_nrt.................... 3304 2022-02-01
Time took: 3.4742537260055544 minutes
-----------
n_jobs=10
src_tbl total_count data_date
0 dsl_sel................ 85630 2022-03-27
1 dsl_sel................ 85630 2022-03-27
2 dsl_sel................ 85630 2022-03-27
3 dsl_sel................ 85630 2022-03-27
4 dsl_sel................ 85630 2022-03-27
5 dsl_sel................ 85630 2022-03-27
6 dsl_sel................ 85630 2022-03-27
7 dsl_sel................ 85630 2022-03-27
8 dsl_sel................ 85630 2022-03-27
9 dsl_sel................ 85630 2022-03-27
10 dsl_speaker_event................ 17765 2022-03-31
11 dsl_speaker_even................ 4202 2022-04-05
12 dsl_speaker_even................ 4202 2022-04-05
13 dsl_s................ 268 2022-03-31
14 dsl_rapper_................ 81182 2022-01-01
15 dsl_rapper_................ 81182 2022-01-01
16 dsl_rltn_nrtl_................ 12088 2022-03-18
17 dsl_rapper_................ 81182 2022-01-01
18 dsl_dim_physicia................ 109299 2022-03-31
19 dsl_cu................ 4265 2022-02-01
20 dsl_fact_f................ 117978 2022-04-03
21 dsl_coaching_................ 242 2022-03-31
22 dsl_speaker_even................ 16653 2022-03-31
23 dsl_dim_call_c................ 17817 2099-12-31
24 dsl_rltn_nrtl_r................ 3304 2022-02-01
Time took: 1.8048373858133953 minutes
-----------
n_jobs=-1
src_tbl total_count data_date
0 dsl_dim_acc.................... 4762 2022-03-31
1 dsl_dim_acc.................... 4762 2022-03-31
2 dsl_dim_acc.................... 4762 2022-03-31
3 dsl_dim_acc.................... 4762 2022-03-31
4 dsl_dim_acc.................... 4762 2022-03-31
5 dsl_dim_acc.................... 4762 2022-03-31
6 dsl_dim_acc.................... 4762 2022-03-31
7 dsl_dim_acc.................... 4762 2022-03-31
8 dsl_dim_acc.................... 4762 2022-03-31
9 dsl_dim_acc.................... 4762 2022-03-31
10 dsl_dim_acc.................... 4762 2022-03-31
11 dsl_dim_acc.................... 4762 2022-03-31
12 dsl_dim_acc.................... 4762 2022-03-31
13 dsl_dim_acc.................... 4762 2022-03-31
14 dsl_dim_acc.................... 4762 2022-03-31
15 dsl_dim_acc.................... 4762 2022-03-31
16 dsl_dim_acc.................... 4762 2022-03-31
17 dsl_dim_acc.................... 4762 2022-03-31
18 dsl_dim_acc.................... 4762 2022-03-31
19 dsl_dim_acc.................... 4762 2022-03-31
20 dsl_dim_acc.................... 4762 2022-03-31
21 dsl_dim_acc.................... 4762 2022-03-31
22 dsl_dim_acc.................... 4762 2022-03-31
23 dsl_dim_acc.................... 4762 2022-03-31
24 dsl_dim_acc.................... 4762 2022-03-31
25 dsl_dim_acc.................... 4762 2022-03-31
-----------
结果变得模棱两可。正在发生的事情是,每个查询的结果彼此重叠。
我还使用 self。我尝试了threadpoolexecutor
,但结果相似。有人知道如何解决这个问题吗? 版本
Python 3.6.8
Spark 2.4.0-cdh6.3.4
I am trying to fetch data from teradata--
select ... from table1_1
union all
select .. from table_2
union all
select ... from table_3
NOTE: One or more select ..
may fail which should not cause whole union to fail.
from .base import Base
from joblib import Parallel, delayed
import re
import pandas as pd
class TeradataWithSpark(Base):
def __init__(self, spark, host=None, port=None, database=None, username=None, password=None):
super().__init__(spark, host, port, database, username, password)
self._reader = self._spark.read.format("jdbc") \
.option("url", f'jdbc:teradata://{self._host}/Database={self._database},LOGMECH=LDAP') \
.option("user", self._username) \
.option("password", self._password) \
.option("driver", "com.teradata.jdbc.TeraDriver")
def run_query(self, query, return_pandasDF=True):
spark_df = self._reader.option('dbtable', f"({query}) as tbl").load()
if return_pandasDF:
return spark_df.toPandas()
else:
return spark_df
def run_queries_and_union_all(self, queries, return_pandasDF=True):
def run(query):
try:
return self._reader.option('dbtable', f"({query}) as tbl").load().toPandas()
except Exception as e:
return None
dfs = Parallel(n_jobs=10, prefer='threads')(delayed(run)(q) for q in queries)
concat_df = pd.concat(dfs).reset_index(drop=True)
if return_pandasDF:
return concat_df
else:
return self._spark.createDataFrame(concat_df)
def split_query_and_run_individually(self, query, separator='union all', return_pandasDF=True):
queries = re.split(separator, query, flags=re.IGNORECASE)
return self.run_queries_and_union_all(queries, return_pandasDF)
As you can see split_query_and_run_individually
method splits the query based on union all
then runs all of the sub queries in parallel threads n_jobs=10
.
But the problem I am facing is that the data is corrupted like this
n_jobs = 1
src_tbl total_count data_date
0 dsl_dim_mdm_.................... 61 2022-03-17
1 dsl_agg_call.................... 3992202 2022-03-27
2 dsl_call_ac.................... 924719 2022-03-27
3 dsl_dim_acc.................... 4762 2022-03-31
4 .................... 6821 2022-03-31
5 dsl_dim_geo_.................... 8610038 2022-04-05
6 dsl.................... 67116 2022-03-31
7 dsl_rl.................... 2087669 2022-04-06
8 dsl_.................... 154 2022-04-01
9 dsl_.................... 85630 2022-03-27
10 dsl_selling_da.................... 53 2021-03-03
11 dsl_speaker_ev.................... 17765 2022-03-31
12 dsl_speak.................... 26269 2022-08-24
13 dsl_speaker_e.................... 4202 2022-04-05
14 ds.................... 268 2022-03-31
15 dsl_rltn_r.................... 255794 2022-03-18
16 dsl_rltn_nr.................... 12088 2022-03-18
17 dsl_rapp.................... 81182 2022-01-01
18 dsl_dim_physi.................... 109299 2022-03-31
19 dsl.................... 4265 2022-02-01
20 dsl_fac.................... 117978 2022-04-03
21 dsl_coachi.................... 242 2022-03-31
22 dsl_speaker_e.................... 16653 2022-03-31
23 dsl_dim_cal.................... 17817 2099-12-31
24 dsl_rltn_nrt.................... 3304 2022-02-01
Time took: 3.4742537260055544 minutes
-----------
n_jobs=10
src_tbl total_count data_date
0 dsl_sel................ 85630 2022-03-27
1 dsl_sel................ 85630 2022-03-27
2 dsl_sel................ 85630 2022-03-27
3 dsl_sel................ 85630 2022-03-27
4 dsl_sel................ 85630 2022-03-27
5 dsl_sel................ 85630 2022-03-27
6 dsl_sel................ 85630 2022-03-27
7 dsl_sel................ 85630 2022-03-27
8 dsl_sel................ 85630 2022-03-27
9 dsl_sel................ 85630 2022-03-27
10 dsl_speaker_event................ 17765 2022-03-31
11 dsl_speaker_even................ 4202 2022-04-05
12 dsl_speaker_even................ 4202 2022-04-05
13 dsl_s................ 268 2022-03-31
14 dsl_rapper_................ 81182 2022-01-01
15 dsl_rapper_................ 81182 2022-01-01
16 dsl_rltn_nrtl_................ 12088 2022-03-18
17 dsl_rapper_................ 81182 2022-01-01
18 dsl_dim_physicia................ 109299 2022-03-31
19 dsl_cu................ 4265 2022-02-01
20 dsl_fact_f................ 117978 2022-04-03
21 dsl_coaching_................ 242 2022-03-31
22 dsl_speaker_even................ 16653 2022-03-31
23 dsl_dim_call_c................ 17817 2099-12-31
24 dsl_rltn_nrtl_r................ 3304 2022-02-01
Time took: 1.8048373858133953 minutes
-----------
n_jobs=-1
src_tbl total_count data_date
0 dsl_dim_acc.................... 4762 2022-03-31
1 dsl_dim_acc.................... 4762 2022-03-31
2 dsl_dim_acc.................... 4762 2022-03-31
3 dsl_dim_acc.................... 4762 2022-03-31
4 dsl_dim_acc.................... 4762 2022-03-31
5 dsl_dim_acc.................... 4762 2022-03-31
6 dsl_dim_acc.................... 4762 2022-03-31
7 dsl_dim_acc.................... 4762 2022-03-31
8 dsl_dim_acc.................... 4762 2022-03-31
9 dsl_dim_acc.................... 4762 2022-03-31
10 dsl_dim_acc.................... 4762 2022-03-31
11 dsl_dim_acc.................... 4762 2022-03-31
12 dsl_dim_acc.................... 4762 2022-03-31
13 dsl_dim_acc.................... 4762 2022-03-31
14 dsl_dim_acc.................... 4762 2022-03-31
15 dsl_dim_acc.................... 4762 2022-03-31
16 dsl_dim_acc.................... 4762 2022-03-31
17 dsl_dim_acc.................... 4762 2022-03-31
18 dsl_dim_acc.................... 4762 2022-03-31
19 dsl_dim_acc.................... 4762 2022-03-31
20 dsl_dim_acc.................... 4762 2022-03-31
21 dsl_dim_acc.................... 4762 2022-03-31
22 dsl_dim_acc.................... 4762 2022-03-31
23 dsl_dim_acc.................... 4762 2022-03-31
24 dsl_dim_acc.................... 4762 2022-03-31
25 dsl_dim_acc.................... 4762 2022-03-31
-----------
As you can see as I increase the number of threads the result becomes ambiguous. What is happening is that the results from each query are overlapping with each other.
I have also implemented the same class with teradatasql
library which works just fine with n_jobs=-1. I think self._reader.option('dbtable', f"({query}) as tbl").load()
is getting messed up in threads. I tried with ThreadpoolExecutor
but similar result. Does anyone know how to solve this issue?
Versions
Python 3.6.8
Spark 2.4.0-cdh6.3.4
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论
评论(1)
多亏了@pltc,这是一种解决方案。尽管与
teradatasql
具有多线程的库相比,它非常慢,尽管Thanks to @pltc, here is one solution. Although it is very slow compared to
teradatasql
library with multithreading although FAIR schedulers on