5.6 并行流水线
并发算法虽然可以充分发挥多核CPU的性能。但不幸的是,并非所有的计算都可以改造成并发的形式。那什么样的算法是无法使用并发进行计算的呢?简单来说,执行过程中有数据相关性的运算都是无法完美并行化的。
假如现在有两个数,B和C。如果我们要计算(B+C)*B/2,那么这个运行过程就是无法并行的。原因是,如果B+C没有执行完成,则永远算不出(B+C)*B,这就是数据相关性。如果线程执行时,所需的数据存在这种依赖关系,那么,就没有办法将它们完美的并行化。如图5.10所示,诠释了这个道理。
图5.10 (B+C)*B/2无法并行化
那遇到这种情况时,有没有什么补救措施呢?答案是肯定的,那就是借鉴日常生产中的流水线思想。
比如,现在要生产一批小玩偶。小玩偶的制作分为四个步骤,第一要组装身体,第二要在身体上安装四肢和头部,第三,给组装完成的玩偶穿上一件漂亮的衣服,第四,就可以包装出货了。为了加快制作玩具的进度,我们不可能叫四个人同时加工一个玩具,因为这四个步骤有着严重的依赖关系。如果没有身体,就没有地方安装四肢,如果没有组装完成,就不能穿衣服,如果没有穿上衣服,就不能包装发货。因此,找四个人来做一个玩偶是毫无意义的。
但是,如果你现在要制作的不是1只玩偶,而是1万只玩偶,那情况就不同了。你可以找四个人,第一个人只负责组装身体,完成后交给第二个人;第二个人只负责安装头部和四肢,交付第三人;第三人只负责穿衣服,并交付第四人;第四人只负责包装发货。这样所有人都可以一起工作,共同完成任务,而整个时间周期也能缩短到原来的1/4左右,这就是流水线的思想。一旦流水线满载,每次只需要一步(假设一个玩偶需要四步)就可以产生一个玩偶,如图5.11所示。
图5.11 使用流水线生产玩偶
类似的思想可以借鉴到程序开发中。即使(B+C)*B/2无法并行,但是如果你需要计算一大堆B和C的值,你依然可以将它流水化。首先将计算过程拆分为三个步骤:
P1:A=B+C
P2:D=A×B
P3:D=D/2
上述步骤中P1、P2和P3均在单独的线程中计算,并且每个线程只负责自己的工作。此时,P3的计算结果就是最终需要的答案。
P1接收B和C的值,并求和,将结果输入给P2。P2求乘积后输入给P3。P3将D除以2得到最终值。一旦这条流水线建立,只需要一个计算步骤就可以得到(B+C)*B/2的结果。
为了实现这个功能,我们需要定义一个在线程间携带结果进行信息交换的载体:
public class Msg { public double i; public double j; public String orgStr=null; }
P1计算的是加法:
01 public class Plus implements Runnable { 02 public static BlockingQueue<Msg> bq=new LinkedBlockingQueue<Msg>(); 03 @Override 04 public void run() { 05 while(true){ 06 try { 07 Msg msg=bq.take(); 08 msg.j=msg.i+msg.j; 09 Multiply.bq.add(msg); 10 } catch (InterruptedException e) { 11 } 12 } 13 } 14 }
上述代码中,P1取得封装了两个操作数的Msg,并进行求和,将结果传递给乘法线程P2(第9行)。当没有数据需要处理时,P1进行等待。
P2计算乘法:
01 public class Multiply implements Runnable { 02 public static BlockingQueue<Msg> bq = new LinkedBlockingQueue<Msg>(); 03 04 @Override 05 public void run() { 06 while (true) { 07 try { 08 Msg msg = bq.take(); 09 msg.i = msg.i * msg.j; 10 Div.bq.add(msg); 11 } catch (InterruptedException e) { 12 } 13 } 14 } 15 }
和P1非常类似,P2计算相乘结果后,将中间结果传递给除法线程P3。
P3计算除法:
01 public class Div implements Runnable { 02 public static BlockingQueue<Msg> bq = new LinkedBlockingQueue<Msg>(); 03 04 @Override 05 public void run() { 06 while (true) { 07 try { 08 Msg msg = bq.take(); 09 msg.i = msg.i / 2; 10 System.out.println(msg.orgStr + "=" + msg.i); 11 } catch (InterruptedException e) { 12 } 13 } 14 } 15 }
P3将结果除以2后输出最终的结果。
最后是提交任务的主线程,这里,我们提交100万个请求,让线程组进行计算:
01 public class PStreamMain { 02 public static void main(String[] args) { 03 new Thread(new Plus()).start(); 04 new Thread(new Multiply()).start(); 05 new Thread(new Div()).start(); 06 07 for (int i = 1; i <= 1000; i++) { 08 for (int j = 1; j <= 1000; j++) { 09 Msg msg = new Msg(); 10 msg.i = i; 11 msg.j = j; 12 msg.orgStr = "((" + i + "+" + j + ")*" + i + ")/2"; 13 Plus.bq.add(msg); 14 } 15 } 16 } 17 }
上述代码第13行,将数据提交给P1加法线程,开启流水线的计算。在多核或者分布式场景中,这种设计思路可以有效地将有依赖关系的操作分配在不同的线程中进行计算,尽可能利用多核优势。
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。
绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论