乔布里(Joblib)给我带来模棱两可的结果

发布于 2025-01-19 13:17:15 字数 7496 浏览 5 评论 0原文

我正在尝试从teradata获取数据

select ... from table1_1
union all
select .. from table_2
union all
select ... from table_3

- 注意:一个或多个选择..可能会失败,这不应导致整个联盟失败。

from .base import Base
from joblib import Parallel, delayed
import re
import pandas as pd

class TeradataWithSpark(Base):
    def __init__(self, spark, host=None, port=None, database=None, username=None, password=None):
        super().__init__(spark, host, port, database, username, password)
        self._reader = self._spark.read.format("jdbc") \
                .option("url", f'jdbc:teradata://{self._host}/Database={self._database},LOGMECH=LDAP') \
                .option("user", self._username) \
                .option("password", self._password) \
                .option("driver", "com.teradata.jdbc.TeraDriver")

    def run_query(self, query, return_pandasDF=True):
        spark_df = self._reader.option('dbtable', f"({query}) as tbl").load()
        if return_pandasDF:
            return spark_df.toPandas()
        else:
            return spark_df

    def run_queries_and_union_all(self, queries, return_pandasDF=True):
        def run(query):
            try:
                return self._reader.option('dbtable', f"({query}) as tbl").load().toPandas()
            except Exception as e:
                return None
        
        dfs = Parallel(n_jobs=10, prefer='threads')(delayed(run)(q) for q in queries)
        concat_df = pd.concat(dfs).reset_index(drop=True)
        if return_pandasDF:
            return concat_df
        else:
            return self._spark.createDataFrame(concat_df)

    def split_query_and_run_individually(self, query, separator='union all', return_pandasDF=True):
        queries = re.split(separator, query, flags=re.IGNORECASE)
        return self.run_queries_and_union_all(queries, return_pandasDF)

您可以看到> split_query_and_run_individaly方法基于union all将查询分配,然后在并行线程中运行所有子查询n_jobs = 10

但是我面临的问题是,当我增加线程的数量时,数据被损坏了,

n_jobs = 1
                               src_tbl  total_count   data_date
0     dsl_dim_mdm_....................           61  2022-03-17
1     dsl_agg_call....................      3992202  2022-03-27
2      dsl_call_ac....................       924719  2022-03-27
3      dsl_dim_acc....................         4762  2022-03-31
4                 ....................         6821  2022-03-31
5     dsl_dim_geo_....................      8610038  2022-04-05
6              dsl....................        67116  2022-03-31
7           dsl_rl....................      2087669  2022-04-06
8             dsl_....................          154  2022-04-01
9             dsl_....................        85630  2022-03-27
10  dsl_selling_da....................           53  2021-03-03
11  dsl_speaker_ev....................        17765  2022-03-31
12       dsl_speak....................        26269  2022-08-24
13   dsl_speaker_e....................         4202  2022-04-05
14              ds....................          268  2022-03-31
15      dsl_rltn_r....................       255794  2022-03-18
16     dsl_rltn_nr....................        12088  2022-03-18
17        dsl_rapp....................        81182  2022-01-01
18   dsl_dim_physi....................       109299  2022-03-31
19             dsl....................         4265  2022-02-01
20         dsl_fac....................       117978  2022-04-03
21      dsl_coachi....................          242  2022-03-31
22   dsl_speaker_e....................        16653  2022-03-31
23     dsl_dim_cal....................        17817  2099-12-31
24    dsl_rltn_nrt....................         3304  2022-02-01
Time took: 3.4742537260055544 minutes
-----------
n_jobs=10
                              src_tbl  total_count   data_date
0             dsl_sel................        85630  2022-03-27
1             dsl_sel................        85630  2022-03-27
2             dsl_sel................        85630  2022-03-27
3             dsl_sel................        85630  2022-03-27
4             dsl_sel................        85630  2022-03-27
5             dsl_sel................        85630  2022-03-27
6             dsl_sel................        85630  2022-03-27
7             dsl_sel................        85630  2022-03-27
8             dsl_sel................        85630  2022-03-27
9             dsl_sel................        85630  2022-03-27
10  dsl_speaker_event................        17765  2022-03-31
11   dsl_speaker_even................         4202  2022-04-05
12   dsl_speaker_even................         4202  2022-04-05
13              dsl_s................          268  2022-03-31
14        dsl_rapper_................        81182  2022-01-01
15        dsl_rapper_................        81182  2022-01-01
16     dsl_rltn_nrtl_................        12088  2022-03-18
17        dsl_rapper_................        81182  2022-01-01
18   dsl_dim_physicia................       109299  2022-03-31
19             dsl_cu................         4265  2022-02-01
20         dsl_fact_f................       117978  2022-04-03
21      dsl_coaching_................          242  2022-03-31
22   dsl_speaker_even................        16653  2022-03-31
23     dsl_dim_call_c................        17817  2099-12-31
24    dsl_rltn_nrtl_r................         3304  2022-02-01
Time took: 1.8048373858133953 minutes
-----------
n_jobs=-1
                            src_tbl  total_count   data_date
0   dsl_dim_acc....................         4762  2022-03-31
1   dsl_dim_acc....................         4762  2022-03-31
2   dsl_dim_acc....................         4762  2022-03-31
3   dsl_dim_acc....................         4762  2022-03-31
4   dsl_dim_acc....................         4762  2022-03-31
5   dsl_dim_acc....................         4762  2022-03-31
6   dsl_dim_acc....................         4762  2022-03-31
7   dsl_dim_acc....................         4762  2022-03-31
8   dsl_dim_acc....................         4762  2022-03-31
9   dsl_dim_acc....................         4762  2022-03-31
10  dsl_dim_acc....................         4762  2022-03-31
11  dsl_dim_acc....................         4762  2022-03-31
12  dsl_dim_acc....................         4762  2022-03-31
13  dsl_dim_acc....................         4762  2022-03-31
14  dsl_dim_acc....................         4762  2022-03-31
15  dsl_dim_acc....................         4762  2022-03-31
16  dsl_dim_acc....................         4762  2022-03-31
17  dsl_dim_acc....................         4762  2022-03-31
18  dsl_dim_acc....................         4762  2022-03-31
19  dsl_dim_acc....................         4762  2022-03-31
20  dsl_dim_acc....................         4762  2022-03-31
21  dsl_dim_acc....................         4762  2022-03-31
22  dsl_dim_acc....................         4762  2022-03-31
23  dsl_dim_acc....................         4762  2022-03-31
24  dsl_dim_acc....................         4762  2022-03-31
25  dsl_dim_acc....................         4762  2022-03-31
-----------

结果变得模棱两可。正在发生的事情是,每个查询的结果彼此重叠。

我还使用 self。我尝试了threadpoolexecutor,但结果相似。有人知道如何解决这个问题吗? 版本

Python 3.6.8
Spark 2.4.0-cdh6.3.4

I am trying to fetch data from teradata--

select ... from table1_1
union all
select .. from table_2
union all
select ... from table_3

NOTE: One or more select .. may fail which should not cause whole union to fail.

from .base import Base
from joblib import Parallel, delayed
import re
import pandas as pd

class TeradataWithSpark(Base):
    def __init__(self, spark, host=None, port=None, database=None, username=None, password=None):
        super().__init__(spark, host, port, database, username, password)
        self._reader = self._spark.read.format("jdbc") \
                .option("url", f'jdbc:teradata://{self._host}/Database={self._database},LOGMECH=LDAP') \
                .option("user", self._username) \
                .option("password", self._password) \
                .option("driver", "com.teradata.jdbc.TeraDriver")

    def run_query(self, query, return_pandasDF=True):
        spark_df = self._reader.option('dbtable', f"({query}) as tbl").load()
        if return_pandasDF:
            return spark_df.toPandas()
        else:
            return spark_df

    def run_queries_and_union_all(self, queries, return_pandasDF=True):
        def run(query):
            try:
                return self._reader.option('dbtable', f"({query}) as tbl").load().toPandas()
            except Exception as e:
                return None
        
        dfs = Parallel(n_jobs=10, prefer='threads')(delayed(run)(q) for q in queries)
        concat_df = pd.concat(dfs).reset_index(drop=True)
        if return_pandasDF:
            return concat_df
        else:
            return self._spark.createDataFrame(concat_df)

    def split_query_and_run_individually(self, query, separator='union all', return_pandasDF=True):
        queries = re.split(separator, query, flags=re.IGNORECASE)
        return self.run_queries_and_union_all(queries, return_pandasDF)

As you can see split_query_and_run_individually method splits the query based on union all then runs all of the sub queries in parallel threads n_jobs=10.

But the problem I am facing is that the data is corrupted like this

n_jobs = 1
                               src_tbl  total_count   data_date
0     dsl_dim_mdm_....................           61  2022-03-17
1     dsl_agg_call....................      3992202  2022-03-27
2      dsl_call_ac....................       924719  2022-03-27
3      dsl_dim_acc....................         4762  2022-03-31
4                 ....................         6821  2022-03-31
5     dsl_dim_geo_....................      8610038  2022-04-05
6              dsl....................        67116  2022-03-31
7           dsl_rl....................      2087669  2022-04-06
8             dsl_....................          154  2022-04-01
9             dsl_....................        85630  2022-03-27
10  dsl_selling_da....................           53  2021-03-03
11  dsl_speaker_ev....................        17765  2022-03-31
12       dsl_speak....................        26269  2022-08-24
13   dsl_speaker_e....................         4202  2022-04-05
14              ds....................          268  2022-03-31
15      dsl_rltn_r....................       255794  2022-03-18
16     dsl_rltn_nr....................        12088  2022-03-18
17        dsl_rapp....................        81182  2022-01-01
18   dsl_dim_physi....................       109299  2022-03-31
19             dsl....................         4265  2022-02-01
20         dsl_fac....................       117978  2022-04-03
21      dsl_coachi....................          242  2022-03-31
22   dsl_speaker_e....................        16653  2022-03-31
23     dsl_dim_cal....................        17817  2099-12-31
24    dsl_rltn_nrt....................         3304  2022-02-01
Time took: 3.4742537260055544 minutes
-----------
n_jobs=10
                              src_tbl  total_count   data_date
0             dsl_sel................        85630  2022-03-27
1             dsl_sel................        85630  2022-03-27
2             dsl_sel................        85630  2022-03-27
3             dsl_sel................        85630  2022-03-27
4             dsl_sel................        85630  2022-03-27
5             dsl_sel................        85630  2022-03-27
6             dsl_sel................        85630  2022-03-27
7             dsl_sel................        85630  2022-03-27
8             dsl_sel................        85630  2022-03-27
9             dsl_sel................        85630  2022-03-27
10  dsl_speaker_event................        17765  2022-03-31
11   dsl_speaker_even................         4202  2022-04-05
12   dsl_speaker_even................         4202  2022-04-05
13              dsl_s................          268  2022-03-31
14        dsl_rapper_................        81182  2022-01-01
15        dsl_rapper_................        81182  2022-01-01
16     dsl_rltn_nrtl_................        12088  2022-03-18
17        dsl_rapper_................        81182  2022-01-01
18   dsl_dim_physicia................       109299  2022-03-31
19             dsl_cu................         4265  2022-02-01
20         dsl_fact_f................       117978  2022-04-03
21      dsl_coaching_................          242  2022-03-31
22   dsl_speaker_even................        16653  2022-03-31
23     dsl_dim_call_c................        17817  2099-12-31
24    dsl_rltn_nrtl_r................         3304  2022-02-01
Time took: 1.8048373858133953 minutes
-----------
n_jobs=-1
                            src_tbl  total_count   data_date
0   dsl_dim_acc....................         4762  2022-03-31
1   dsl_dim_acc....................         4762  2022-03-31
2   dsl_dim_acc....................         4762  2022-03-31
3   dsl_dim_acc....................         4762  2022-03-31
4   dsl_dim_acc....................         4762  2022-03-31
5   dsl_dim_acc....................         4762  2022-03-31
6   dsl_dim_acc....................         4762  2022-03-31
7   dsl_dim_acc....................         4762  2022-03-31
8   dsl_dim_acc....................         4762  2022-03-31
9   dsl_dim_acc....................         4762  2022-03-31
10  dsl_dim_acc....................         4762  2022-03-31
11  dsl_dim_acc....................         4762  2022-03-31
12  dsl_dim_acc....................         4762  2022-03-31
13  dsl_dim_acc....................         4762  2022-03-31
14  dsl_dim_acc....................         4762  2022-03-31
15  dsl_dim_acc....................         4762  2022-03-31
16  dsl_dim_acc....................         4762  2022-03-31
17  dsl_dim_acc....................         4762  2022-03-31
18  dsl_dim_acc....................         4762  2022-03-31
19  dsl_dim_acc....................         4762  2022-03-31
20  dsl_dim_acc....................         4762  2022-03-31
21  dsl_dim_acc....................         4762  2022-03-31
22  dsl_dim_acc....................         4762  2022-03-31
23  dsl_dim_acc....................         4762  2022-03-31
24  dsl_dim_acc....................         4762  2022-03-31
25  dsl_dim_acc....................         4762  2022-03-31
-----------

As you can see as I increase the number of threads the result becomes ambiguous. What is happening is that the results from each query are overlapping with each other.

I have also implemented the same class with teradatasql library which works just fine with n_jobs=-1. I think self._reader.option('dbtable', f"({query}) as tbl").load() is getting messed up in threads. I tried with ThreadpoolExecutor but similar result. Does anyone know how to solve this issue?
Versions

Python 3.6.8
Spark 2.4.0-cdh6.3.4

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(1

街角卖回忆 2025-01-26 13:17:15

多亏了@pltc,这是一种解决方案。尽管与teradatasql具有多线程的库相比,它非常慢,尽管

from .base import Base
import re
import pandas as pd
from pyspark.sql import DataFrame
from functools import reduce

class TeradataWithSpark(Base):
    def __init__(self, spark, host=None, port=None, database=None, username=None, password=None):
        super().__init__(spark, host, port, database, username, password)
        self._reader = self._spark.read.format("jdbc") \
                .option("url", f'jdbc:teradata://{self._host}/Database={self._database},LOGMECH=LDAP') \
                .option("user", self._username) \
                .option("password", self._password) \
                .option("driver", "com.teradata.jdbc.TeraDriver")

    def run_query(self, query, return_pandasDF=True):
        # spark_df = self._reader.option('dbtable', f"({query}) as tbl").load()
        # if return_pandasDF:
        #     return spark_df.toPandas()
        # else:
        #     return spark_df
        return self.split_query_and_run_individually(query, r'union all', return_pandasDF)

    def run_queries_and_union_all(self, queries, return_pandasDF=True):     
        dataframes = []
        for each_query in queries:
            try:
                spark_df = self._reader.option('dbtable', f"({each_query}) as tbl").load()
                dataframes.append(spark_df)
            except Exception as e:
                # simply ignoring the query
                print(f'Error while reading the query {each_query}')
            
        concat_sparkDf = reduce(DataFrame.unionAll, dataframes)
        if return_pandasDF:
            return concat_sparkDf.toPandas()
        else:
            return concat_sparkDf

    def split_query_and_run_individually(self, query, separator=r'union all', return_pandasDF=True):
        queries = re.split(separator, query, flags=re.IGNORECASE)
        return self.run_queries_and_union_all(queries, return_pandasDF)

Thanks to @pltc, here is one solution. Although it is very slow compared to teradatasql library with multithreading although FAIR schedulers on

from .base import Base
import re
import pandas as pd
from pyspark.sql import DataFrame
from functools import reduce

class TeradataWithSpark(Base):
    def __init__(self, spark, host=None, port=None, database=None, username=None, password=None):
        super().__init__(spark, host, port, database, username, password)
        self._reader = self._spark.read.format("jdbc") \
                .option("url", f'jdbc:teradata://{self._host}/Database={self._database},LOGMECH=LDAP') \
                .option("user", self._username) \
                .option("password", self._password) \
                .option("driver", "com.teradata.jdbc.TeraDriver")

    def run_query(self, query, return_pandasDF=True):
        # spark_df = self._reader.option('dbtable', f"({query}) as tbl").load()
        # if return_pandasDF:
        #     return spark_df.toPandas()
        # else:
        #     return spark_df
        return self.split_query_and_run_individually(query, r'union all', return_pandasDF)

    def run_queries_and_union_all(self, queries, return_pandasDF=True):     
        dataframes = []
        for each_query in queries:
            try:
                spark_df = self._reader.option('dbtable', f"({each_query}) as tbl").load()
                dataframes.append(spark_df)
            except Exception as e:
                # simply ignoring the query
                print(f'Error while reading the query {each_query}')
            
        concat_sparkDf = reduce(DataFrame.unionAll, dataframes)
        if return_pandasDF:
            return concat_sparkDf.toPandas()
        else:
            return concat_sparkDf

    def split_query_and_run_individually(self, query, separator=r'union all', return_pandasDF=True):
        queries = re.split(separator, query, flags=re.IGNORECASE)
        return self.run_queries_and_union_all(queries, return_pandasDF)
~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文