悲伤:池不使用同一类对象python运行
在上下文管理器中产生池并运行。 我创建了一个呼叫一个函数的类的对象,该函数创建池以执行某些操作 当通过该对象调用该函数时,池迭代正常工作,但是使用相同的对象并调用相同的函数崩溃,给出了错误的“池不运行”
实现:
class ETLJob(SomeClass):
def __init__(self):
super().__init__()
self.conf = None
def fetch_data(self):
"""Override the Data Processing Function
Returns:
None: Void
"""
return None
def transform_data(self):
"""Override the Data Processing Function
Returns:
None: Void
"""
return None
def ingest_single(self):
"""Override the single Data Point Ingestion Function
Ingest a single datapoint into the DB using API calls
Returns:
None: Void
"""
return None
@staticmethod
def bulk_ingest(func: Callable[[dict], None], conf: list) -> None:
"""Bulk ingest SPIRE harvested data.
Args:
func (Callable[[dict], None]): Single data point ingest function
conf (list): The client and plant configuration list
Returns:
None: Since this directly ingest data into DB
Usage:
```py
self.bulk_ingest(self.ingest_single, self.conf)
```
"""
with ProcessPool(nodes=pathos.helpers.cpu_count() * 2) as pool:
pool.map(func, conf)
pool.close()
pool.join()
if __name__ == "__main__":
sp = ETLJob()
sp.bulk_ingest(sp.ingest_single,sp.conf[:10])
sp.bulk_ingest(sp.ingest_single,sp.conf[20:30])
错误跟踪:
2022-05-20 10:51:29 | [INFO]:root - ERROR ingesting data for Client: 20 Plant: 20 @ Source: spire_actual
Message: Not able to connect to database either client id is wrong or not found.
2022-05-20 10:51:29 | [INFO]:root - ERROR ingesting data for Client: 20 Plant: 18 @ Source: spire_actual
Message: Not able to connect to database either client id is wrong or not found.
2022-05-20 10:51:29 | [INFO]:root - ERROR ingesting data for Client: 20 Plant: 17 @ Source: spire_actual
Message: Not able to connect to database either client id is wrong or not found.
2022-05-20 10:51:29 | [INFO]:root - ERROR ingesting data for Client: 20 Plant: 19 @ Source: spire_actual
Message: Not able to connect to database either client id is wrong or not found.
2022-05-20 10:51:30 | [INFO]:root - ERROR ingesting data for Client: 20 Plant: 21 @ Source: spire_actual
Message: Not able to connect to database either client id is wrong or not found.
Traceback (most recent call last):
File "/home/kayvan/projects/weather-repo/weather_combiner/data_pipelines/spire.py", line 596, in <module>
s.bulk_ingest(s.ingest_single, s.conf[30:35])
File "/home/kayvan/projects/weather-repo/weather_combiner/data_pipelines/spire.py", line 189, in bulk_ingest
pool.map(func, conf)
File "/home/kayvan/projects/weather/lib/python3.9/site-packages/pathos/multiprocessing.py", line 139, in map
return _pool.map(star(f), zip(*args)) # chunksize
File "/home/kayvan/projects/weather/lib/python3.9/site-packages/multiprocess/pool.py", line 364, in map
return self._map_async(func, iterable, mapstar, chunksize).get()
File "/home/kayvan/projects/weather/lib/python3.9/site-packages/multiprocess/pool.py", line 473, in _map_async
self._check_running()
File "/home/kayvan/projects/weather/lib/python3.9/site-packages/multiprocess/pool.py", line 350, in _check_running
raise ValueError("Pool not running")
ValueError: Pool not running
A pool is spawned and run within a context manager.
I created an object of a class that calls a function which creates a pool to do some operations
The pool iteration works smoothly when the function is called via that object but using the same object and calling the same function crashes giving an error "pool not running"
Implementation:
class ETLJob(SomeClass):
def __init__(self):
super().__init__()
self.conf = None
def fetch_data(self):
"""Override the Data Processing Function
Returns:
None: Void
"""
return None
def transform_data(self):
"""Override the Data Processing Function
Returns:
None: Void
"""
return None
def ingest_single(self):
"""Override the single Data Point Ingestion Function
Ingest a single datapoint into the DB using API calls
Returns:
None: Void
"""
return None
@staticmethod
def bulk_ingest(func: Callable[[dict], None], conf: list) -> None:
"""Bulk ingest SPIRE harvested data.
Args:
func (Callable[[dict], None]): Single data point ingest function
conf (list): The client and plant configuration list
Returns:
None: Since this directly ingest data into DB
Usage:
```py
self.bulk_ingest(self.ingest_single, self.conf)
```
"""
with ProcessPool(nodes=pathos.helpers.cpu_count() * 2) as pool:
pool.map(func, conf)
pool.close()
pool.join()
if __name__ == "__main__":
sp = ETLJob()
sp.bulk_ingest(sp.ingest_single,sp.conf[:10])
sp.bulk_ingest(sp.ingest_single,sp.conf[20:30])
Error Trace:
2022-05-20 10:51:29 | [INFO]:root - ERROR ingesting data for Client: 20 Plant: 20 @ Source: spire_actual
Message: Not able to connect to database either client id is wrong or not found.
2022-05-20 10:51:29 | [INFO]:root - ERROR ingesting data for Client: 20 Plant: 18 @ Source: spire_actual
Message: Not able to connect to database either client id is wrong or not found.
2022-05-20 10:51:29 | [INFO]:root - ERROR ingesting data for Client: 20 Plant: 17 @ Source: spire_actual
Message: Not able to connect to database either client id is wrong or not found.
2022-05-20 10:51:29 | [INFO]:root - ERROR ingesting data for Client: 20 Plant: 19 @ Source: spire_actual
Message: Not able to connect to database either client id is wrong or not found.
2022-05-20 10:51:30 | [INFO]:root - ERROR ingesting data for Client: 20 Plant: 21 @ Source: spire_actual
Message: Not able to connect to database either client id is wrong or not found.
Traceback (most recent call last):
File "/home/kayvan/projects/weather-repo/weather_combiner/data_pipelines/spire.py", line 596, in <module>
s.bulk_ingest(s.ingest_single, s.conf[30:35])
File "/home/kayvan/projects/weather-repo/weather_combiner/data_pipelines/spire.py", line 189, in bulk_ingest
pool.map(func, conf)
File "/home/kayvan/projects/weather/lib/python3.9/site-packages/pathos/multiprocessing.py", line 139, in map
return _pool.map(star(f), zip(*args)) # chunksize
File "/home/kayvan/projects/weather/lib/python3.9/site-packages/multiprocess/pool.py", line 364, in map
return self._map_async(func, iterable, mapstar, chunksize).get()
File "/home/kayvan/projects/weather/lib/python3.9/site-packages/multiprocess/pool.py", line 473, in _map_async
self._check_running()
File "/home/kayvan/projects/weather/lib/python3.9/site-packages/multiprocess/pool.py", line 350, in _check_running
raise ValueError("Pool not running")
ValueError: Pool not running
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论