Erlang并行快速排序实战

本节我将用Erlang多进程方式实现快速排序,快速排序采用的是分治的思想,即将原问题分解为若干个规模更小但结构与原问题相似的子问题。递归地解这些子问题,然后将这些子问题的解组合为原问题的解。通过对比快速排序的串行算法,我们将此串行算法改进为并行算法。

首先,来看看快速排序的串行算法:

1.从数列中挑出一个元素,称为 "基准"(pivot);

2.重新排序数列,所有元素比基准值小的摆放在基准前面,所有元素比基准值大的摆在基准的后面(相同的数可以到任一边)。在这个分割结束之后,该基准就处于数列的中间位置。这个称为分割(partition)操作;

3.递归地(recursive)把小于基准值元素的子数列和大于基准值元素的子数列排序。

此思想网上到处都有,现在来看看快速排序的并行算法:

其中的一个简单思想是,对每次划分过后得到的两个序列分别使用两个处理器完成递归排序。例如对一个长为n的序列,首先划分得到两个长为n/2的序列,将其交给两个处理器分别处理;而后进一步划分得到四个长为n/4的序列,再分别交给四个处理器处理:如此递归下去最终得到排序好的序列。

  当然,这里是理想的划分情况,如果划分步骤不能达到平均分配的目的,那么排序效率会相对较差。

  跟上节讲得并行枚举排序进程调用的区别在于,枚举排序一次性将数据传给所有节点进程,而快速排序进程调用在分割数据的过程中呈现的是树状形式。

  并行快排的伪代码如下:

 
 
 
 
  1.  //快速排序的并行算法  
  2.    
  3.  输入:无序数组data[1,n],使用的处理器个数为2^m  
  4.  输出:有序数组data[1,n]  
  5.    
  6.  Begin  
  7.       para_quicksort(data,1,n,m,0)  
  8.  End  
  9.    
  10. procedure para_quicksort(data,i,j,m,id)  
  11. Begin  
  12.     if (j-i) <=k or m=0 then   
  13.           Pid call quicksort(data,i,j)  
  14.     else 
  15.           Pid:r = patition(data,i,j)  
  16.           Pid send data[r+1,m-1] to Pid+2 ^(m-1)  
  17.           para_quicksort(data,i,r-1,m-1,id)  
  18.           par_quicksort(data,r+1,j,m-1,id+2^(m-1) )  
  19.           Pid+2 ^ (m-1) send data[r+1,m-1] back to Pid  
  20.     end if 
  21. End 

现在,就来看看Erlang代码实现并行快排:

  设置启动接口,启动多个进程之后,在总控端调用Dict = store(['node1@pc1305','node2@pc1305']...)存储各节点进程的字典表,然后调用start([3,4,512,1,2,5,……],2,Dict)进行快排,其中第二参数表示需要最多2^2=4个进程来完成任务,总控端也作为一个排序的节点。

 
 
 
 
  1.  %% 启动排序时,需存储各节点信息:使用Dict = store(['node1@pc1305','node2@pc1305']...)方法  
  2.  %%然后,便可启动start(Data,M,Dict) 进行排序,M为启动的进程个数指标,Dict为上述存储的节点  
  3.    
  4.  -module(para_qsort).  
  5.  -export([para_qsort/3,start/3,store/1,lookup/2]).  
  6.    
  7.  store(L) -> store(L,[]).  
  8.    
  9.  store([],Ret) -> Ret;  
  10. store(L,Ret) ->  
  11.     Value = lists:nth(1,L),  
  12.     Key = length(Ret)+1,  
  13.     io:format("Key=~p Value=~p~n",[Key,Value]),  
  14.     New = [{Key,Value} | Ret],  
  15.     store(lists:delete(Value,L) , New).  
  16.       
  17. lookup(Key,Dict) ->  
  18.     {K,V} = lists:nth(1,Dict),  
  19.     if 
  20.         K =:= Key ->  
  21.             V;  
  22.         K =/= Key ->  
  23.             Filter = lists:delete({K,V},Dict),  
  24.             lookup(Key,Filter)  
  25.     end.  
  26.  
  27. start(Data,M,Dict) ->   
  28.     register(monitor, spawn(fun() -> wait([]) end)),  
  29.     put(monitor, node()),  
  30.     NewDict = [{monitor, get(monitor)} | Dict],  
  31.     para_qsort(Data,M,NewDict). 

  wait/1函数主要对各节点排序好的结果进行归并整理,然后输出最终结果,代码如下

 
 
 
 
  1.  %%服务器节点监听  
  2.  wait(Ret) ->  
  3.      Len1 = parser(Ret)+length(Ret)-1,  
  4.      Len2 = len(Ret),  
  5.      if 
  6.          (Len1 =:= Len2) and  (Len2 =/=0) ->  
  7.              SortRet= merge(Ret,[],1),  
  8.              io:format("Para qsort result:~n",[]),  
  9.              io:format("~p~n",[SortRet]);  
  10.         (Len1 =/= Len2) or (Len2 =:= 0) ->  
  11.             receive  
  12.                 {Node,Pid,L} ->  
  13.                     {Data,I,J} = L,  
  14.                     Temp = [{Node,Data,I,J}|Ret],  
  15.                     wait(Temp)  
  16.             end;  
  17.               
  18.         die ->  
  19.             quit  
  20.     end.  
  21.  
  22. len([]) -> 0;  
  23. len([H|T])->  
  24.     {Node,Data,I,J} = H,  
  25.     length(Data).  
  26.  
  27. parser([]) -> 0;  
  28. parser([H|T]) ->  
  29.     {Node,Data,I,J}=H,  
  30.     J-I+1+parser(T).  
  31.  
  32. merge([],Ret,Len) -> Ret;  
  33. merge(T,Ret,Start) ->  
  34.     H = lists:nth(1,T),  
  35.     {Node,Data,I,J} = H,  
  36.     if 
  37.         I =:= Start ->  
  38.             ListBetween = lists:sublist(Data,I, J-I+1),  
  39.             case (I=:=1) of  
  40.                 true ->  
  41.                     Temp = lists:append(Ret,ListBetween);  
  42.                 false ->  
  43.                     Pivo = lists:append(Ret,[lists:nth(I-1,Data)]),  
  44.                     Temp = lists:append(Pivo,ListBetween)  
  45.             end,  
  46.             io:format("~n-----<< ~p >> processing data[~p,~p]~n-------------------------Result=~p~n",[Node,I,J,Temp]),  
  47.             Len = Start+J-I+2,  
  48.             NewData = lists:delete(H,T),  
  49.             merge(NewData,Temp,Len);  
  50.         I =/= Start ->  
  51.             NewData = lists:delete(H,T),  
  52.             Temp = lists:append(NewData,[H]),  
  53.             merge(Temp,Ret,Start)  
  54.     end. 

para_qsort完成对排序任务的分发,代码如下:

 
 
 
 
  1. para_qsort(Data,M,Dict) ->  
  2.     %%进程个数最多为2^M次方  
  3.     I = 1,   
  4.     J = length(Data),  
  5.     ID = 0,  
  6.     para_qsort(Data,I,J,M,ID,Dict).  
  7.  
  8. para_qsort(Data,I,J, M, ID,Dict) ->  
  9.     %% 阀值,列表排序个数小于K时直接调用qsort直接快排  
  10.    K = 4,   
  11.    Value1 = (J-I) < K,  
  12.    Value2 = (M =:= 0),  
  13.    Value = Value1 or Value2,  
  14.    if 
  15.        Value =:= true  ->  
  16.            %io:format("~nCurrent node: (~p) ~nData= ~p  I=~p  J=~p~n",[node(),Data,I,J]),  
  17.            ListBefore = lists:sublist(Data, I-1),  
  18.            ListBetween = lists:sublist(Data,I, J-I+1),  
  19.            ListAfter = lists:sublist(Data,J+1,length(Data)-J),  
  20.            Ret = lists:sort(ListBetween),  
  21.            Temp = lists:append(ListBefore,Ret),  
  22.            NewData =  lists:append(Temp, ListAfter),  
  23.            {monitor, lookup(monitor,Dict) } ! {node(),self(),{NewData,I,J} },  
  24.            io:format("-------------------------------------------------------------");  
  25.        Value =:= false  ->  
  26.            {NewData,R} = partition(Data,I,J),  
  27.            send(NewData, R+1, J, M-1,ID + math:pow(2 ,(M-1)), Dict ),  
  28.            para_qsort(NewData,I,R-1,M-1,ID, Dict)  
  29.    end. 

  其中patition用于将data按基准值划分为两部分,代码如下:

 
 
 
 
  1. partition(Data, K, L) ->  
  2.     Pivo= lists:nth(L, Data),  
  3.     ListBefore = lists:sublist(Data, K-1),  
  4.     ListBetween = lists:sublist(Data,K, L-K),  
  5.     ListAfter = lists:sublist(Data,L+1,length(Data)-L),  
  6.     Left = [X|| X <- ListBetween,X =
  7.     Right = [X || X <- ListBetween,X > Pivo],  
  8.     Ret = lists:append(Left,[Pivo|Right]),  
  9.     Temp = lists:append(ListBefore,Ret),  
  10.    NewData =  lists:append(Temp, ListAfter),  
  11.    Len = length(ListBefore)+length(Left)+1,  
  12.    {NewData, Len}. 

任务分割完后,会将此任务发送给下个节点进程进行处理,send代码如下:

 
 
 
 
  1. send( Data, I, J, M,No ,Dict) ->  
  2.     ID = trunc(No),  
  3.     Node = lookup(ID,Dict),  
  4.     Pid = spawn(Node, fun() -> loop() end),  
  5.     Pid ! {node(),self(), {Data,I,J,M,ID,Dict}}.  
  6.  
  7. %%客户节点N准备接受排序数据  
  8. loop() ->  
  9.     receive  
  10.        {Node,Pid,die} ->  
  11.            disconnect_node(Node),  
  12.            io:format("Node (~p) disconnected~n",[Node]),  
  13.            quit;  
  14.        {Node,Pid,L} ->      
  15.            {Data,I,J,M,ID,Dict} = L,  
  16.            %io:format("4----:Current node:~p server node:~p~n",[node(),lookup(monitor,Dict)]),  
  17.            para_qsort(Data,I,J,M,ID,Dict)  
  18.            %loop()  
  19.    end. 

  思想比较简单,我只是没对它进行进一步优化,最终运行结果如下:

  大家可以看到,此data的划分就是不均匀的,node1、node3只处理了一个数据,node2处理了3个数据,其余的都是server处理的,因此排序的效率问题很差。此程序只是展示如何使用Erlang实现并行快排,并不是学习如何优化问题。

  大致就说这么多了,以后我将实现PSRS并行算法的Erlang实现,这个算法Erlang实现起来比较麻烦,大家可以先去看看。

原文:http://www.cnblogs.com/itfreer/archive/2012/05/14/Erlang_in_practise_quick-sort.html

文章标题:Erlang并行快速排序实战
网页地址:http://www.csdahua.cn/qtweb/news26/475926.html

网站建设、网络推广公司-快上网,是专注品牌与效果的网站制作,网络营销seo公司;服务项目有等

广告

声明:本网站发布的内容(图片、视频和文字)以用户投稿、用户转载内容为主,如果涉及侵权请尽快告知,我们将会在第一时间删除。文章观点不代表本网站立场,如需处理请联系客服。电话:028-86922220;邮箱:631063699@qq.com。内容未经允许不得转载,或转载时需注明来源: 快上网