code

顯示具有 Distributed Programming 標籤的文章。 顯示所有文章
顯示具有 Distributed Programming 標籤的文章。 顯示所有文章

2017年9月2日 星期六

Distributed Java 4 - MPI & Multithreading

Processes & Threads

這是之前提過高階ABSTRACTION的底層JVM process,一個process可以創造多個threads,分享process的資源(記憶體)。

對datacenter中的一個node來說,普遍一個node有16核心,一個process產生的一個thread可以使用一個核心的計算能力,所以單一thread並不能使用multicore能力。

如果一個process產生multithreads,當然有syncrhonization的壓力,但是有不少好處,包括responsiveness / performance / resource sharing等。

那為何不一個node就跑一個process + 多個threads就好? (某些node的確只有一個process,例如medical system)

因為一個process就是一個application program,通常不會一個電腦只跑一個application,光是OS就是一個application了。此外一個node上跑多個processes有他的好處:

首先可以多工(多個application),例如JVM需要garbage collection,這是一個獨立的process,而且當garbage collection發生的時候,會blocking。所以不能單獨只有一個process在JVM node上。

此外單一process的multithreading帶來的throughput 提升是有極限的:


超過一個小量 n (workload dependent)個threads之後,甚至效能會下降,因為synchroniation overhead超越了多執行續帶來的效益。而multi process沒有此限制,可以充分利用一個node中的所有cores,這提供了scalability

最後multiprocess提供resilience,也就是容錯。當一個process掛點,另一個監測的process可以知道,然後啟動一個新的process來繼續原本掛點process的任務。

所以processes是 distributed computing的基本單位 (提供高度scalability within datacenter),而threads則是concurrency/parallelism的基本單位 (提供有限scalability within a node)。


Multithreaded Servers

server先建立一個server socket,然後在一個loop中等待request。以之前的socket programming file server assignment來說,就是那樣。 但是一次只能等一個request充分serve之後,才能接受下一個request。

所以要在每一次聽到一個request之後,就new一個thread去handle request,這就是利用到了multithreading:


不過allocate resource for a thread是很大的overhead,所以要利用threadpool或是tasks來reuse thread instance。


MPI and Multithreading

之前提到的MPI架構只有single threaded,所以並沒有用到一個node上所有的cores:


現在如果要充分利用cores,就使用multithread,master thread在 一個node上負責啟動MPI:


所以每個node還是一樣只有一個rank (應該也可以多個rank,反正一個rank就是一個JVM),在一個rank裡面有多個threads可以run in parallel。

有可能有以下幾種threading modes:
1. funnel: 只有一個master thread
2. serialized: 多個threads但一次只有一個thread能進行MPI calls
3. multiple: 多個threads且可以同時有多個threads進行MPI calls,當然如果是non-blocking MPI,兩個thread不能wait for 同一個requests。


Distributed Actors

之前concurrent Java 課程提到過Actor pattern,習題作業也有一題利用Sieve of Eratosthenes 演算法來使用actors計算質數,主程式碼如下:

public final class SieveActor extends Sieve {
    /**
     * {@inheritDoc}
     *
     * TODO Use the SieveActorActor class to calculate the number of primes <=
     * limit in parallel. You might consider how you can model the Sieve of
     * Eratosthenes as a pipeline of actors, each corresponding to a single
     * prime number.
     */

    public static int primesCount = 0;

    public SieveActor() { primesCount = 0; }

    @Override
    public int countPrimes(final int limit) {

        //multiple threads will enter this critical section
        PCDP.finish(() -> {
            final SieveActorActor actor = new SieveActorActor(2);

            for (int i = 3; i <= limit; i+=2) //just send odd numbers, since multiples of 2 are aready filtered
                actor.send(i);

            actor.send(0);
        });

        return primesCount;
    }

    /**
     * An actor class that helps implement the Sieve of Eratosthenes in
     * parallel.
     */
    public static final class SieveActorActor extends Actor {
        /**
         * Process a single message sent to this actor.
         *
         * TODO complete this method.
         *
         * @param msg Received message
         */

        private SieveActorActor nextActor;
        private final int localPrimeNumber;


        public SieveActorActor(int prime) {
            localPrimeNumber = prime;
            primesCount+=1;
        }

        @Override
        public void process(final Object msg) {

            final int candidate = (Integer)msg;

            if (candidate == 0) {
                if (nextActor != null)
                    nextActor.send(0);
                return;
            }

            boolean isMult = ((candidate % localPrimeNumber) == 0);

            if (!isMult) {
                if (nextActor == null)
                    nextActor = new SieveActorActor(candidate); //the first non-multiple, new = starts actor
                else
                    nextActor.send(candidate);
            }

        }
    }
}

上面是單一 node版本,現在要善用distributed programming的話,就要在remote node上面產生新的remote actor,以下的步驟必須達到:

1. 製作一個config file,寫入每個actor被哪一個node host
2. 要有能力create remote actor
3. 要有能力傳message給remote actor
4. 因為是跨node傳message,當然messages要是serialized


Distributed Reactive Programming

在pub-sub model,publisher可以主動push或是subscriber主動pull,reactive programming就是要利用async events來coordinate這樣的溝通,讓micro services可以更好的implement。

JDK 9 提供Flow API來實作reactive streams spec。
略。



2017年8月29日 星期二

Distributed Java 3 - Message Passing

Distributed Parallelism

在一個data center中的電腦群是一個cluster,把其中所有的節點邏輯上抽象化成單一computation unit(有多核可以做parallelism),這個computation unit的觀點稱為global view。

所以一個program他的global view是看不到cluster內的節點的,例如一個program中access一個array,programmer不會知道哪些部分的資料是存在哪一個節點的,他邏輯上會認可這個array是跟單一一台電腦上的array沒什麼兩樣。

這樣的abstraction稱為 Single Program Multiple Data。

Message Passing Interface

問題是這些節點實際上在底層要怎麼互相溝通?常用的high level interface稱為MPI,可能長得如下:


rank是每個節點的id, 從0,1, ...
上面這個main program應該是在每個節點去init? 課堂中的summary有講:

"It is common for each node to execute one MPI process, but it is also possible to execute more than MPI process per multicore node so as to improve the utilization of processor cores within the node."

不過至少可以確定每個local view of XL在每個node上可以init出不同的連續整數是沒問題。

Point to point communication in MPI

假設有兩個node (rank0 rank1):


用不管什麼方式(可能是同一台電腦上的bus, 或是不同電腦上的網路線)連結再一起,其中rank0想要傳送字串s給 rank1,對programmer來說,只寫一個program再不同的rank上執行,但是由於每個rank執行的程式碼不同,所以可以用conditional:



Message ordering & Deadlock

message passing因為有dependency,就有race condition或是deadlock的可能性。


例如上面四ranks (nodes),R0收到A以及R3收到B這兩個事件的順序是不能被保證的。
能夠被保證的順序只有當同一對 sender/receiver,傳送同樣type/tag
為什麼要同樣type/tag? 不知道 XDDD

為什麼會有deadlock? 因為send / receive是blocking operation! 以下圖為例:


如果R0 先執行第一行,但是R1還沒收到就執行R1的第一行,由於send是blocking,R0只能一直等R1收,而R1也只能一直等R0收,就deadlock。

一個解決方法就是其中一個rank調換operation order。
另外一個方法就是如果MPI framework有支援sendreceive() function,則runtime會自己avoid deadlock。

Non-blocking Communications

上面提到send/receive是blocking operation,這不是很好的架構,因為容易浪費時間在等待上:


MPI framework可能提供asynchronous send/receive,可以叫做ISEND / IRECEIVE:


當然還是要提供blocking機智,否則某些operation壹定要等待某些message到來才能執行,叫做WAIT(REQUEST D)。

前面講的deadlock例子也可以用non-blocking MPI來解決。


Collective Communications

這就是一個rank要怎麼broadcast/multicast messages給其他multiple ranks。




2017年8月23日 星期三

Distributed Java 2 - Client-server programming

Sockets

分散的電腦網路節點群,每台電腦上面可能有一或多個正在執行的JVM instance (processes),想要彼此溝通可以使用low level construct: socket。


所以一個jvm endpoint建立一個client socket,另一個則扮演server endpoint,也建立一個server socket。

Java server socket:

1. 建立instance and listen
2. 一旦有人連過來,則先取得client socket reference(利用accept()),然後可以用此reference來read write client socket的資訊:


client side socket



Serialization / Deserialization

socket溝通是low level communication,是byte stream,但是programs是以high level abstraction在設計,是以objects為個體,所以如何把objects轉成bytes 透過socket傳出去,這個動作稱為 serialization。在接收方重建這些bytes為原本的object,稱為deserialization。

object可能有fields指向其他objects,構成一個graph。所以XML是一個為了描述這樣的object relationship graph的方法。但這個很大overhead。

Java program可以使用Serializable interface,並且可以使用@transient annotation來標注是否要serialize某些fields,這對optimization有幫助。

Remote Method Invocation (RMI)

如果本地端JVM物件y要呼叫遠端JVM上的x.foo(),怎麼辦?
其實Java有一個RMI framework,搞定了一切,他背後架構如下:


本地端RMI client有一個proxy object (stub),可以跟底層socket/network interface溝通,而遠端的JVM process要implement RMI server,同樣也有一個proxy object與底層溝通,稱為skeleton,其實就這樣。

還有x和y因為要透過網路溝通,必須要serializable。


Multi-cast Sockets

unicast : 點對點socket communication
broadcast : 一對多 in-LAN  socket communication
multicast : 一對多 over internet socket communication

multicast有以下的operations:


Publish-Subscribe 

producers publish topic messages,而consumers subscribe topics。



infrastructures可以distributed to 大數量的nodes,稱為brokers,負責流量控制。

一個implementation: Apache Kafka。


2017年8月17日 星期四

Distributed Java 1 - Map-Reduce

Map-Reduce

distributed programming主要是應用在big data,以key-value pair的結構來處理。要用k-v結構是因為,這樣才能在不同的節點把同樣的key的value combine起來,所以稱為reduce (借用functional programming中的reduce operation)。

一個例子,有四個kv組合如下。mapping function把value根據乘法因子分成不同的kv pairs:


如果最後reduce是一個sum function,則在每個節點對同一個key的value就進行summation:



注意map-reduce 是functional programming paradigm(有保證之前學過的functional parallelism),所以所有key/value都是immutable,這也提供極好的scalability和fault tolerance。

Hadoop

hadoop是一個distributed map-reduce implementation,假設運作在網路連結的不同節點群:

hadoop scheduler自動把map tasks和schedule tasks分配給節點,programmer不用操心這些事情,只要implement好map function和reduce function即可。

有其他更high-level的framework,例如Hive/Pig ,可以讓programmer不用知道map-reduce paradigm,但是會產生map-reduce program的semantics。

由於data 可能分散在數千個storage節點中:
1. mapping先把分散的data建立kv pair
2. framework shuffle 同樣的key 的kv pair到不同的processing nodes
3. reduce
4. flush back to storage

只有1. 3. 是programmer需要設計的,其他都是framework做掉了。


Spark Framework (more modern Hadoop XD)

Spark創作原因是每個節點的記憶體(caching)要能充分利用,這在Hadoop是比較弱的(hadoop會被計算結果都存入storage)。
Spark的data structure是RDD (resilient distributed dataset),kv pair可以是其中一種選項。

Hadoop的map-reduce operation被generalized:
intermediate transforms: 可以是map / filter / join / ...
terminal actions: 可以是reduce / collect / ...

舉例來說以下五個步驟是可能的Spark處理word count的過程:

1. 創造一個spark context sc
2. 使用此sc從storage讀取data INPUT
3. INPUT 被map成可以處理的單位,可能透過words = INPUT.flatmap { }
4. 把words這個collection去做成kv pair 或是tuple :pairs = words.mapToPair{ }
5. reduce階段,pairs.reduceByKey{ }


TF-IDF Weight

簡單來說就是要比對documents之間的相似性。

Term Frequency:
利用把documents簡化成某些terms的集合,看這些terms出現在candidates中的頻率,可以做成一個frequency table: (每個entry意思是term在此doc的出現次數)



Inverse Document -Frequency: 
定義DF:是某個term出現在幾個candidate documents中。
則IDF : inverse document frequency,為 N/DF,N是所有candidate documents的數目,這是用來加重少出現但是可能要緊的字。

舉來來說 “the”可能出現在全部N個documents中,所以DF = N,但是其IDF = N/N =1
而"qqq"可能出現在2個documents中,DF = 2,IDF = N/2,所以給予了較高的權重!

研究出來的適合weight:


不過根本課程無關 XDDD,但是跟map-reduce有關。

首先documents可能幾十億個分散在不同storage nodes中,要計算某個term frequency TFij:


有上面input也可以直接算出DF:



Page Rank

簡單來說page B的rank,是所有指向他的page As的contribution:

iteration {

1. contribution(B) = RANK(A) / A網頁發出的links數目
分母的意義是B對A的重要性的權重。

2. RANK(B) = 0.15 + 0.85*contribution(B) //這是經驗公式

}

Spark怎麼實現page rank?
1.



2.


rank的計算就比較straightforward。