多进程或多线程? - 并行化数百万次迭代的简单计算并将结果存储在单个数据结构中

发布于 2025-01-07 14:21:28 字数 560 浏览 4 评论 0原文

我有一个包含 {string:list} 条目的字典 D,我计算一个函数 f(D[s1],D[s2]) -->漂浮 对于 D 中的一对字符串 (s1,s2)。

此外, 我创建了一个自定义矩阵类 LabeledNumericMatrix,它允许我执行 m[ ID1, ID2 ] = 1.0 等赋值。

我需要计算字符串 S 集中所有二元组的 f(x,y) 并将结果存储在 m[x,y] 中,包括当 s1=s2 时。 这很容易编写为循环,但随着集合 S 的大小增长到较大值(例如 10,000 或更大),执行此代码需要相当长的时间。

我存储在标记矩阵 m 中的结果都不相互依赖。 因此,使用 python 的多线程或多进程服务来并行计算似乎很简单。 然而,由于cPython并没有真正允许我通过线程同时执行f(x,y)的计算和m[x,y]的存储,看来多进程是我唯一的选择。 但是,我不认为多进程旨在在进程之间传递 1GB 数据结构,例如我的标记矩阵结构包含 10000x10000 个元素。

任何人都可以提供以下建议:(a)我是否应该避免尝试并行化我的算法,以及(b)如果我可以进行并行化,如何做到这一点,最好是在 cPython 中?

I have a dictionary D of {string:list} entries, and I compute a function
f( D[s1],D[s2] ) --> float
for a pair of strings (s1,s2) in D.

Additionally,
I have created a custom matrix class LabeledNumericMatrix that allows me to perform assignments such as m[ ID1, ID2 ] = 1.0 .

I need to calculate f(x,y) and store the result in m[x,y] for all 2-tuples in the set of strings S, including when s1=s2.
This is easy to code as a loop, but execution of this code takes quite some time as the size of the set S grows to large values such as 10,000 or more.

None of the results I store in my labeled matrix m depend on each other.
Therefore, it seems straightforward to parallelize this computation by using python's multithread or multiprocess services.
However, since cPython doesn't truly allow my to simultaneously execute calculation of f(x,y) and storage of m[x,y] through threading, it seems that multiprocess is my only choice.
However, I don't think multiprocess is designed to pass around 1GB data structures between processes, such as my labelled matrix structure containing 10000x10000 elements.

Can anyone provide advice of (a) if I should avoid trying to parallelize my algorithm, and (b) if I can do the parallelization, how to do such, preferably in cPython?

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(5

灵芸 2025-01-14 14:21:28

第一个选项 - 服务器进程

创建一个服务器进程。它是多处理包的一部分,允许并行访问数据结构。这样每个进程都会直接访问数据结构,锁定其他进程。

来自文档

服务器进程

Manager() 返回的管理器对象控制着一个服务器进程,该进程
保存 Python 对象并允许其他进程操作它们
使用代理。

Manager() 返回的管理器将支持类型列表、dict
命名空间、Lock、RLock、信号量、BoundedSemaphore、条件、事件、
队列、值和数组。

第二个选项 - 工作人员池

创建一个 工作人员池工作人员,一个输入队列和一个结果队列。

  • 主进程充当生产者,将向输入队列提供 (s1, s2) 对。
  • 每个工作进程将从输入队列中读取一对,并将结果写入输出队列。
  • 主线程将从结果队列中读取结果,并将其写入结果字典中。

第三种选择 - 划分为独立问题

您的数据是独立的: f( D[si],D[sj] ) 是一个隐蔽的问题,独立于任何 f( D[sk],D[sl] ) 。此外,每对的计算时间应该相当相等,或者至少处于相同的数量级。

将任务划分为 n 个输入集,其中 n 是您拥有的计算单元(核心,甚至计算机)的数量。将每个输入集分配给不同的进程,然后连接输出。

First option - a Server Process

Create a Server process. It's part of the Multiprocessing package which allows parallel access to data structures. This way every process will access the data structure directly, locking other processes.

From the documentation:

Server process

A manager object returned by Manager() controls a server process which
holds Python objects and allows other processes to manipulate them
using proxies.

A manager returned by Manager() will support types list, dict,
Namespace, Lock, RLock, Semaphore, BoundedSemaphore, Condition, Event,
Queue, Value and Array.

Second option - Pool of workers

Create a Pool of workers, an input Queue and a result Queue.

  • The main process, acting as a producer, will feed the input queue with pairs (s1, s2).
  • Each worker process will read a pair from the input Queue, and write the result into the output Queue.
  • The main thread will read the results from the result Queue, and write them into the result dictionary.

Third option - divide to independent problems

Your data is independent: f( D[si],D[sj] ) is a secluded problem, independent of any f( D[sk],D[sl] ) . furthermore, the computation time of each pair should be fairly equal, or at least in the same order of magnitude.

Divide the task into n inputs sets, where n is the number of computation units (cores, or even computers) you have. Give each input set to a different process, and join the output.

蓝天 2025-01-14 14:21:28

线程绝对不会提高性能 - 它对于 CPU 密集型任务来说是不合适的工具。

所以唯一可能的选择是多处理,但由于你有一个大数据结构,我建议类似mmap (相当低级别,但内置)或 Redis(美味且高级 API,但应安装和配置)。

You definitely won't get any performance increase with threading - it is an inappropriate tool for cpu-bound tasks.

So the only possible choice is multiprocessing, but since you have a big data structure, I'd suggest something like mmap (pretty low level, but builtin) or Redis (tasty and high level API, but should be installed and configured).

寄居人 2025-01-14 14:21:28

你分析过你的代码吗?是仅仅计算 f 成本太高,还是将结果存储在数据结构中(或者两者都存在)?

如果 f 占主导地位,那么在开始担心并行化之前,您应该确保无法进行算法改进。通过将部分或全部函数转换为 C 扩展,也许可以使用 cython 。如果您确实使用多处理,那么我不明白为什么您需要在进程之间传递整个数据结构?

如果将结果存储在矩阵中成本太高,您可以通过使用更高效的数据结构(例如 array.arraynumpy.ndarray)。除非您非常仔细地设计和实现自定义矩阵类,否则它几乎肯定会比那些慢。

Have you profiled your code? Is it just calculating f that is too expensive, or storing the results in the data structure (or maybe both)?

If f is dominant, then you should make sure you can't make algorithmic improvements before you start worrying about parallelization. You might be able to get a big speed up by turning some or all of the function into a C extension, perhaps using cython. If you do go with multiprocessing, then I don't see why you need to pass the entire data structure between processes?

If storing results in the matrix is too expensive, you might speed up your code by using a more efficient data structure (like array.array or numpy.ndarray). Unless you have been very careful designing and implementing your custom matrix class, it will almost certainly be slower than those.

记忆消瘦 2025-01-14 14:21:28

谢谢大家的回复。

我已经为所提出的问题创建了一个解决方案(不是“解决方案”),并且由于其他人可能会发现它有用,因此我在此处发布代码。我的解决方案是 Adam Matan 建议的选项 1 和 3 的混合。该代码包含我的 vi 会话中的行号,这将有助于下面的讨论。

 12 # System libraries needed by this module.
 13 import numpy, multiprocessing, time
 14 
 15 # Third-party libraries needed by this module.
 16 import labeledMatrix
 17 
 18 # ----- Begin code for this module. -----
 19 from commonFunctions import debugMessage
 20 
 21 def createSimilarityMatrix( fvFileHandle, fvFileParser, fvSimScorer, colIDs, rowIDs=None,
 22                             exceptionType=ValueError, useNumType=numpy.float, verbose=False,
 23                             maxProcesses=None, processCheckTime=1.0 ):
 24  """Create a labeled similarity matrix from vectorial data in [fvFileHandle] that can be
 25  parsed by [fvFileParser].
 26  [fvSimScorer] should be a function that can return a floating point value for a pair of vectors.
 27 
 28  If the matrix [rowIDs] are not specified, they will be the same as the [colIDs].
 29 
 30  [exceptionType] will be raised when a row or column ID cannot be found in the vectorial data.
 31  [maxProcesses] specifies the number of CPUs to use for calculation; default value is all available CPUs.
 32  [processCheckTime] is the interval for checking activity of CPUs (if completed calculation or not).
 33 
 34  Return: a LabeledNumericMatrix with corresponding row and column IDs."""
 35 
 36  # Setup row/col ID information.
 37  useColIDs = list( colIDs )
 38  useRowIDs = rowIDs or useColIDs
 39  featureData = fvFileParser( fvFileHandle, retainIDs=(useColIDs+useRowIDs) )
 40  verbose and debugMessage( "Retrieved %i feature vectors from FV file." % len(featureData) )
 41  featureIDs = featureData.keys()
 42  absentIDs = [ ID for ID in set(useColIDs + useRowIDs) if ID not in featureIDs ]
 43  if absentIDs: 
 44   raise exceptionType, "IDs %s not found in feature vector file." % absentIDs
 45  # Otherwise, proceed to creation of matrix.
 46  resultMatrix = labeledMatrix.LabeledNumericMatrix( useRowIDs, useColIDs, numType=useNumType )
 47  calculateSymmetric = True if set( useRowIDs ) == set( useColIDs ) else False
 48  
 49  # Setup data structures needed for parallelization.
 50  numSubprocesses = multiprocessing.cpu_count() if maxProcesses==None else int(maxProcesses)
 51  assert numSubprocesses >= 1, "Specification of %i CPUs to calculate similarity matrix." % numSubprocesses
 52  dataManager = multiprocessing.Manager()
 53  sharedFeatureData = dataManager.dict( featureData )
 54  resultQueue = multiprocessing.Queue() 
 55  # Assign jobs evenly through number of processors available.
 56  jobList = [ list() for i in range(numSubprocesses) ]
 57  calculationNumber = 0 # Will hold total number of results stored.
 58  if calculateSymmetric: # Perform calculations with n(n+1)/2 pairs, instead of n^2 pairs.
 59   remainingIDs = list( useRowIDs )
 60   while remainingIDs:
 61    firstID = remainingIDs[0]
 62    for secondID in remainingIDs:
 63     jobList[ calculationNumber % numSubprocesses ].append( (firstID, secondID) )
 64     calculationNumber += 1
 65    remainingIDs.remove( firstID )
 66  else: # Straight processing one at a time.
 67   for rowID in useRowIDs:
 68    for colID in useColIDs:
 69     jobList[ calculationNumber % numSubprocesses ].append( (rowID, colID) )
 70     calculationNumber += 1
 71     
 72  verbose and debugMessage( "Completed setup of job distribution: %s." % [len(js) for js in jobList] )
 73  # Define a function to perform calculation and store results
 74  def runJobs( scoreFunc, pairs, featureData, resultQueue ):
 75   for pair in pairs:
 76    score = scoreFunc( featureData[pair[0]], featureData[pair[1]] )
 77    resultQueue.put( ( pair, score ) )
 78   verbose and debugMessage( "%s: completed all calculations." % multiprocessing.current_process().name )
 79   
 80   
 81  # Create processes to perform parallelized computing.
 82  processes = list()
 83  for num in range(numSubprocesses):
 84   processes.append( multiprocessing.Process( target=runJobs,
 85                                              args=( fvSimScorer, jobList[num], sharedFeatureData, resultQueue ) ) )
 86  # Launch processes and wait for them to all complete.
 87  import Queue # For Queue.Empty exception.
 88  for p in processes:
 89   p.start()
 90  assignmentsCompleted = 0
 91  while assignmentsCompleted < calculationNumber:
 92   numActive = [ p.is_alive() for p in processes ].count( True )
 93   verbose and debugMessage( "%i/%i complete; Active processes: %i" % \
 94               ( assignmentsCompleted, calculationNumber, numActive ) )
 95   while True: # Empty queue immediately to avoid underlying pipe/socket implementation from hanging.
 96    try: 
 97     pair, score = resultQueue.get( block=False )
 98     resultMatrix[ pair[0], pair[1] ] = score
 99     assignmentsCompleted += 1
100     if calculateSymmetric:
101      resultMatrix[ pair[1], pair[0] ] = score
102    except Queue.Empty:
103     break 
104   if numActive == 0: finished = True
105   else:
106    time.sleep( processCheckTime )
107  # Result queue emptied and no active processes remaining - completed calculations.
108  return resultMatrix
109 ## end of createSimilarityMatrix()

第 36-47 行只是与问题定义相关的初步内容,问题定义是原始问题的一部分。
绕过 cPython 的 GIL 的多处理设置位于第 49-56 行,第 57-70 行用于均匀地创建细分任务。使用第 57-70 行代码代替 itertools.product,因为当行/列 ID 列表达到 40,000 左右时,产品最终会占用大量内存。

实际要执行的计算在第74-78行,这里利用了ID→向量条目的共享字典和共享结果队列。

第 81-85 行设置了实际的 Process 对象,尽管它们实际上尚未启动。

在我的第一次尝试中(此处未显示),“try ... resultQueue.get() and allocate except ...”代码实际上位于外部控制循环之外(虽然并非所有计算都完成)。当我在 9x9 矩阵的单元测试上运行该版本的代码时,没有出现任何问题。
然而,当放大到 200x200 或更大时,我发现这段代码挂起,尽管在执行之间没有更改代码中的任何内容。

根据此讨论(http://bugs.python.org/issue8426)和多进程的官方文档,如果底层实现没有非常大的管道/套接字大小,则 multiprocess.Queue 的使用可能会挂起。因此,这里给出的代码作为我的解决方案定期清空队列,同时检查进程的完成情况(参见第 91-106 行),以便子进程可以继续将新结果放入其中并避免管道过满。

当我在 1000x1000 的较大矩阵上测试代码时,我注意到计算代码远远领先于队列和矩阵分配代码。使用 cProfile,我发现一个瓶颈是默认轮询间隔 processCheckTime=1.0(第 23 行),降低该值可以提高结果速度(有关计时示例,请参阅帖子底部)。对于刚接触 Python 多重处理的其他人来说,这可能是有用的信息。

总的来说,这可能不是最好的实现,但它确实为进一步优化提供了一个起点。正如人们常说的,通过并行化进行优化需要正确的分析和思考。

时序示例,全部使用 8 个 CPU。

200x200(20100 次计算/作业)

t=1.0:执行时间18s

t=0.01:执行时间3s

500x500(125250 次计算/作业)

t=1.0:执行时间86s

t=0.01:执行时间23s

500x500(125250次 -粘贴代码,这是我使用的单元测试发展的一部分。显然,标记的矩阵类代码不在这里,并且不包括指纹读取器/记分器代码(尽管创建自己的代码非常简单)。当然,如果对某人有帮助,我也很乐意分享该代码。

112 def unitTest():
113  import cStringIO, os
114  from fingerprintReader import MismatchKernelReader
115  from fingerprintScorers import FeatureVectorLinearKernel
116  exampleData = cStringIO.StringIO() # 9 examples from GPCR (3,1)-mismatch descriptors, first 10 columns.
117  exampleData.write( ",AAA,AAC,AAD,AAE,AAF,AAG,AAH,AAI,AAK"  + os.linesep )
118  exampleData.write( "TS1R2_HUMAN,5,2,3,6,8,6,6,7,4" + os.linesep )
119  exampleData.write( "SSR1_HUMAN,11,6,5,7,4,7,4,7,9" + os.linesep )
120  exampleData.write( "OXYR_HUMAN,27,13,14,14,15,14,11,16,14" + os.linesep )
121  exampleData.write( "ADA1A_HUMAN,7,3,5,4,5,7,3,8,4" + os.linesep )
122  exampleData.write( "TA2R_HUMAN,16,6,7,8,9,10,6,6,6" + os.linesep )
123  exampleData.write( "OXER1_HUMAN,10,6,5,7,11,9,5,10,6" + os.linesep )
124  exampleData.write( "NPY1R_HUMAN,3,3,0,2,3,1,0,6,2" + os.linesep )
125  exampleData.write( "NPSR1_HUMAN,0,1,1,0,3,0,0,6,2" + os.linesep )
126  exampleData.write( "HRH3_HUMAN,16,9,9,13,14,14,9,11,9" + os.linesep )
127  exampleData.write( "HCAR2_HUMAN,3,1,3,2,5,1,1,6,2" )
128  columnIDs = ( "TS1R2_HUMAN", "SSR1_HUMAN", "OXYR_HUMAN", "ADA1A_HUMAN", "TA2R_HUMAN", "OXER1_HUMAN",
129                "NPY1R_HUMAN", "NPSR1_HUMAN", "HRH3_HUMAN", "HCAR2_HUMAN", )
130  m = createSimilarityMatrix( exampleData, MismatchKernelReader, FeatureVectorLinearKernel, columnIDs,
131                              verbose=True, )
132  m.SetOutputPrecision( 6 )
133  print m
134 
135 ## end of unitTest()

Thank you everyone for your responses.

I have created a solution (not "the" solution) to the proposed problem, and since others may find it useful, I am posting the code here. My solution is a hybrid of options 1 and 3 suggested by Adam Matan. The code contains line numbers from my vi session, which will help in the discussion below.

 12 # System libraries needed by this module.
 13 import numpy, multiprocessing, time
 14 
 15 # Third-party libraries needed by this module.
 16 import labeledMatrix
 17 
 18 # ----- Begin code for this module. -----
 19 from commonFunctions import debugMessage
 20 
 21 def createSimilarityMatrix( fvFileHandle, fvFileParser, fvSimScorer, colIDs, rowIDs=None,
 22                             exceptionType=ValueError, useNumType=numpy.float, verbose=False,
 23                             maxProcesses=None, processCheckTime=1.0 ):
 24  """Create a labeled similarity matrix from vectorial data in [fvFileHandle] that can be
 25  parsed by [fvFileParser].
 26  [fvSimScorer] should be a function that can return a floating point value for a pair of vectors.
 27 
 28  If the matrix [rowIDs] are not specified, they will be the same as the [colIDs].
 29 
 30  [exceptionType] will be raised when a row or column ID cannot be found in the vectorial data.
 31  [maxProcesses] specifies the number of CPUs to use for calculation; default value is all available CPUs.
 32  [processCheckTime] is the interval for checking activity of CPUs (if completed calculation or not).
 33 
 34  Return: a LabeledNumericMatrix with corresponding row and column IDs."""
 35 
 36  # Setup row/col ID information.
 37  useColIDs = list( colIDs )
 38  useRowIDs = rowIDs or useColIDs
 39  featureData = fvFileParser( fvFileHandle, retainIDs=(useColIDs+useRowIDs) )
 40  verbose and debugMessage( "Retrieved %i feature vectors from FV file." % len(featureData) )
 41  featureIDs = featureData.keys()
 42  absentIDs = [ ID for ID in set(useColIDs + useRowIDs) if ID not in featureIDs ]
 43  if absentIDs: 
 44   raise exceptionType, "IDs %s not found in feature vector file." % absentIDs
 45  # Otherwise, proceed to creation of matrix.
 46  resultMatrix = labeledMatrix.LabeledNumericMatrix( useRowIDs, useColIDs, numType=useNumType )
 47  calculateSymmetric = True if set( useRowIDs ) == set( useColIDs ) else False
 48  
 49  # Setup data structures needed for parallelization.
 50  numSubprocesses = multiprocessing.cpu_count() if maxProcesses==None else int(maxProcesses)
 51  assert numSubprocesses >= 1, "Specification of %i CPUs to calculate similarity matrix." % numSubprocesses
 52  dataManager = multiprocessing.Manager()
 53  sharedFeatureData = dataManager.dict( featureData )
 54  resultQueue = multiprocessing.Queue() 
 55  # Assign jobs evenly through number of processors available.
 56  jobList = [ list() for i in range(numSubprocesses) ]
 57  calculationNumber = 0 # Will hold total number of results stored.
 58  if calculateSymmetric: # Perform calculations with n(n+1)/2 pairs, instead of n^2 pairs.
 59   remainingIDs = list( useRowIDs )
 60   while remainingIDs:
 61    firstID = remainingIDs[0]
 62    for secondID in remainingIDs:
 63     jobList[ calculationNumber % numSubprocesses ].append( (firstID, secondID) )
 64     calculationNumber += 1
 65    remainingIDs.remove( firstID )
 66  else: # Straight processing one at a time.
 67   for rowID in useRowIDs:
 68    for colID in useColIDs:
 69     jobList[ calculationNumber % numSubprocesses ].append( (rowID, colID) )
 70     calculationNumber += 1
 71     
 72  verbose and debugMessage( "Completed setup of job distribution: %s." % [len(js) for js in jobList] )
 73  # Define a function to perform calculation and store results
 74  def runJobs( scoreFunc, pairs, featureData, resultQueue ):
 75   for pair in pairs:
 76    score = scoreFunc( featureData[pair[0]], featureData[pair[1]] )
 77    resultQueue.put( ( pair, score ) )
 78   verbose and debugMessage( "%s: completed all calculations." % multiprocessing.current_process().name )
 79   
 80   
 81  # Create processes to perform parallelized computing.
 82  processes = list()
 83  for num in range(numSubprocesses):
 84   processes.append( multiprocessing.Process( target=runJobs,
 85                                              args=( fvSimScorer, jobList[num], sharedFeatureData, resultQueue ) ) )
 86  # Launch processes and wait for them to all complete.
 87  import Queue # For Queue.Empty exception.
 88  for p in processes:
 89   p.start()
 90  assignmentsCompleted = 0
 91  while assignmentsCompleted < calculationNumber:
 92   numActive = [ p.is_alive() for p in processes ].count( True )
 93   verbose and debugMessage( "%i/%i complete; Active processes: %i" % \
 94               ( assignmentsCompleted, calculationNumber, numActive ) )
 95   while True: # Empty queue immediately to avoid underlying pipe/socket implementation from hanging.
 96    try: 
 97     pair, score = resultQueue.get( block=False )
 98     resultMatrix[ pair[0], pair[1] ] = score
 99     assignmentsCompleted += 1
100     if calculateSymmetric:
101      resultMatrix[ pair[1], pair[0] ] = score
102    except Queue.Empty:
103     break 
104   if numActive == 0: finished = True
105   else:
106    time.sleep( processCheckTime )
107  # Result queue emptied and no active processes remaining - completed calculations.
108  return resultMatrix
109 ## end of createSimilarityMatrix()

Lines 36-47 are simply preliminary stuff related to the problem definition that was a part of the original question.
The setup for the multiprocessing to get around cPython's GIL is in lines 49-56, with lines 57-70 used to evenly create the subdivided tasks. Code in lines 57-70 is used instead of itertools.product, because when the list of row/column IDs reaches 40,000 or so, the product ends up taking an enormous amount of memory.

The actual computation to be performed is in lines 74-78, and here the shared dictionary of ID->vector entries and shared result queue are utilized.

Lines 81-85 setup the actual Process objects, though they haven't actually been started yet.

In my first attempt (not shown here), the "try ... resultQueue.get() and assign except ..." code was actually located outside of the outer control loop (while not all calculations finished). When I ran that version of the code on a unit test of a 9x9 matrix, there were no problems.
However, moving up to 200x200 or larger, I found this code to hang, despite not changing anything in the code between executions.

According to this discussion (http://bugs.python.org/issue8426) and the official documentation for multiprocess, the use of multiprocess.Queue can hang if the underlying implementation doesn't have a very large pipe/socket size. Therefore, the code given here as my solution periodically empties the queue while checking on completion of processes (see lines 91-106) so that the child processes can continue to put new results in it and avoid the pipe being overfull.

When I tested the code on larger matrices of 1000x1000, I noticed that the computation code finished well ahead of the Queue and matrix assignment code. Using cProfile, I found that one bottleneck was the default polling interval processCheckTime=1.0 (line 23), and lowering this value improved the speed of results (see bottom of post for timing examples). This might be useful information for other people new to multiprocessing in Python.

Overall, this probably is not be the best implementation possible, but it does provide a starting point for further optimization. As is often said, optimization via parallelization requires proper analysis and thought.

Timing examples, all with 8 CPUs.

200x200 (20100 calculations/assignments)

t=1.0 : execution time 18s

t=0.01: execution time 3s

500x500 (125250 calculations/assignments)

t=1.0 : execution time 86s

t=0.01: execution time 23s

In case anyone wants to copy-and-paste the code, here is a unit-test I used for part of the development. Obviously, the labelled matrix class code is not here, and the fingerprint reader/scorer code is not included (though it is pretty simple to roll your own). Of course, I'm happy to share that code as well if would help someone.

112 def unitTest():
113  import cStringIO, os
114  from fingerprintReader import MismatchKernelReader
115  from fingerprintScorers import FeatureVectorLinearKernel
116  exampleData = cStringIO.StringIO() # 9 examples from GPCR (3,1)-mismatch descriptors, first 10 columns.
117  exampleData.write( ",AAA,AAC,AAD,AAE,AAF,AAG,AAH,AAI,AAK"  + os.linesep )
118  exampleData.write( "TS1R2_HUMAN,5,2,3,6,8,6,6,7,4" + os.linesep )
119  exampleData.write( "SSR1_HUMAN,11,6,5,7,4,7,4,7,9" + os.linesep )
120  exampleData.write( "OXYR_HUMAN,27,13,14,14,15,14,11,16,14" + os.linesep )
121  exampleData.write( "ADA1A_HUMAN,7,3,5,4,5,7,3,8,4" + os.linesep )
122  exampleData.write( "TA2R_HUMAN,16,6,7,8,9,10,6,6,6" + os.linesep )
123  exampleData.write( "OXER1_HUMAN,10,6,5,7,11,9,5,10,6" + os.linesep )
124  exampleData.write( "NPY1R_HUMAN,3,3,0,2,3,1,0,6,2" + os.linesep )
125  exampleData.write( "NPSR1_HUMAN,0,1,1,0,3,0,0,6,2" + os.linesep )
126  exampleData.write( "HRH3_HUMAN,16,9,9,13,14,14,9,11,9" + os.linesep )
127  exampleData.write( "HCAR2_HUMAN,3,1,3,2,5,1,1,6,2" )
128  columnIDs = ( "TS1R2_HUMAN", "SSR1_HUMAN", "OXYR_HUMAN", "ADA1A_HUMAN", "TA2R_HUMAN", "OXER1_HUMAN",
129                "NPY1R_HUMAN", "NPSR1_HUMAN", "HRH3_HUMAN", "HCAR2_HUMAN", )
130  m = createSimilarityMatrix( exampleData, MismatchKernelReader, FeatureVectorLinearKernel, columnIDs,
131                              verbose=True, )
132  m.SetOutputPrecision( 6 )
133  print m
134 
135 ## end of unitTest()
伊面 2025-01-14 14:21:28

参考我在 3 月 21 日发布的代码中附加的最后一条评论,我发现 multiprocessing.Pool + SQLite (pysqlite2) 无法用于我的特定任务,因为出现了两个问题:

(1) 使用默认连接,第一个工作程序除外,执行插入查询的所有其他工作进程仅执行一次。
(2) 当我将连接关键字更改为 check_same_thread=False 时,将使用完整的工作池,但只有一些查询成功,一些查询失败。当每个worker也执行time.sleep(0.01)时,查询失败的数量减少了,但并没有完全减少。
(3) 不太重要的是,我可以听到我的硬盘疯狂地读/写,即使对于 10 个插入查询的小作业列表也是如此。

接下来,我求助于 MySQL-Python,效果好多了。确实,必须设置 MySQL 服务器守护程序、用户以及该用户的数据库,但这些步骤相对简单。

这是对我有用的示例代码。显然它可以优化,但它为那些正在寻找如何开始使用多处理的人传达了基本思想。

  1 from multiprocessing import Pool, current_process
  2 import MySQLdb
  3 from numpy import random
  4
  5 
  6 if __name__ == "__main__":
  7  
  8   numValues   = 50000
  9   tableName   = "tempTable"
 10   useHostName = ""
 11   useUserName = ""  # Insert your values here.
 12   usePassword = ""
 13   useDBName   = ""
 14   
 15   # Setup database and table for results.
 16   dbConnection = MySQLdb.connect( host=useHostName, user=useUserName, passwd=usePassword, db=useDBName )
 17   topCursor = dbConnection.cursor()
 18   # Assuming table does not exist, will be eliminated at the end of the script.
 19   topCursor.execute( 'CREATE TABLE %s (oneText TEXT, oneValue REAL)' % tableName )
 20   topCursor.close() 
 21   dbConnection.close()
 22   
 23   # Define simple function for storing results.
 24   def work( storeValue ):
 25     #print "%s storing value %f" % ( current_process().name, storeValue )
 26     try:
 27       dbConnection = MySQLdb.connect( host=useHostName, user=useUserName, passwd=usePassword, db=useDBName )
 28       cursor = dbConnection.cursor()
 29       cursor.execute( "SET AUTOCOMMIT=1" )
 30       try:
 31         query = "INSERT INTO %s VALUES ('%s',%f)" % ( tableName, current_process().name, storeValue )
 32         #print query
 33         cursor.execute( query )
 34       except:
 35         print "Query failed."
 36       
 37       cursor.close()
 38       dbConnection.close()
 39     except: 
 40       print "Connection/cursor problem."
 41   
 42   
 43   # Create set of values to assign
 44   values = random.random( numValues )
 45   
 46   # Create pool of workers 
 47   pool = Pool( processes=6 )
 48   # Execute assignments.
 49   for value in values: pool.apply_async( func=work, args=(value,) )
 50   pool.close()
 51   pool.join()
 52 
 53   # Cleanup temporary table.
 54   dbConnection = MySQLdb.connect( host=useHostName, user=useUserName, passwd=usePassword, db=useDBName )
 55   topCursor = dbConnection.cursor()
 56   topCursor.execute( 'DROP TABLE %s' % tableName )
 57   topCursor.close()
 58   dbConnection.close()

In reference to my last comment attached to the code posted on March 21, I found multiprocessing.Pool + SQLite (pysqlite2) unusable for my particular task, as two problems occurred:

(1) Using the default connection, except for the first worker, every other worker process that performed an insert query only executed once.
(2) When I change the connection keywords to check_same_thread=False, then the full pool of workers is used, but then only some queries succeed and some queries fail. When each worker also executed time.sleep(0.01), the number of query failures was reduced, but not entirely.
(3) Less importantly, I could hear my hard disk reading/writing frantically, even for a small job list of 10 insert queries.

I next resorted to MySQL-Python, and things worked out much better. True, one must setup the MySQL server daemon, a user, and a database for that user, but those steps are relatively simple.

Here is sample code that worked for me. Obviously it could be optimized, but it conveys the basic idea for those who are looking for how to get starting using multiprocessing.

  1 from multiprocessing import Pool, current_process
  2 import MySQLdb
  3 from numpy import random
  4
  5 
  6 if __name__ == "__main__":
  7  
  8   numValues   = 50000
  9   tableName   = "tempTable"
 10   useHostName = ""
 11   useUserName = ""  # Insert your values here.
 12   usePassword = ""
 13   useDBName   = ""
 14   
 15   # Setup database and table for results.
 16   dbConnection = MySQLdb.connect( host=useHostName, user=useUserName, passwd=usePassword, db=useDBName )
 17   topCursor = dbConnection.cursor()
 18   # Assuming table does not exist, will be eliminated at the end of the script.
 19   topCursor.execute( 'CREATE TABLE %s (oneText TEXT, oneValue REAL)' % tableName )
 20   topCursor.close() 
 21   dbConnection.close()
 22   
 23   # Define simple function for storing results.
 24   def work( storeValue ):
 25     #print "%s storing value %f" % ( current_process().name, storeValue )
 26     try:
 27       dbConnection = MySQLdb.connect( host=useHostName, user=useUserName, passwd=usePassword, db=useDBName )
 28       cursor = dbConnection.cursor()
 29       cursor.execute( "SET AUTOCOMMIT=1" )
 30       try:
 31         query = "INSERT INTO %s VALUES ('%s',%f)" % ( tableName, current_process().name, storeValue )
 32         #print query
 33         cursor.execute( query )
 34       except:
 35         print "Query failed."
 36       
 37       cursor.close()
 38       dbConnection.close()
 39     except: 
 40       print "Connection/cursor problem."
 41   
 42   
 43   # Create set of values to assign
 44   values = random.random( numValues )
 45   
 46   # Create pool of workers 
 47   pool = Pool( processes=6 )
 48   # Execute assignments.
 49   for value in values: pool.apply_async( func=work, args=(value,) )
 50   pool.close()
 51   pool.join()
 52 
 53   # Cleanup temporary table.
 54   dbConnection = MySQLdb.connect( host=useHostName, user=useUserName, passwd=usePassword, db=useDBName )
 55   topCursor = dbConnection.cursor()
 56   topCursor.execute( 'DROP TABLE %s' % tableName )
 57   topCursor.close()
 58   dbConnection.close()
~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文