code

2017年8月31日 星期四

RITx Project Management Life Cycle 1 - Project life cycle

Project定義

"A project is a temporary endeavor undertaken to create a unique product, service, or result."

所以一個project是:
1. 有期效性
2. 產出結果 (deliverable)

所以project management就是在預定期限內,得到project結果的方法與工具組合。


Project Objective

1. scope: 達到要的程度
2. time: 在預定時間內
3. cost:在預算內

這三個要素構成一個objective,也就是目標,由於是限制性的因素,也被稱為TThe Triple Constraint。

三者必須互相平衡,就像三角椅一樣:


project management skill就是要確保The Triple Constraint處於平衡狀態。


Project Life Cycle

簡單來說是以下四個phases,每個phase都會有check point 產出,一步步抵達objectives。



Initiation

  1. 找出問題或機會
  2. 指派主持人和參與者

Planning

  1. 執行細節定義
  2. 時程定義
  3. 預算規劃
  4. 風險評估
  5. 品保規劃

Execution

  1. 執行
  2. 定期檢視檢討是否符合plan

Closing

  1. 合約善後
  2. 事後檢討
  3. 釋放資源


Project Manager的特質 

主要還是The triple contraint的管理,人格特質上基本就是要有領導魅力,以及善於溝通和在壓力狀態下冷靜。不過這特點其實每個職業工作者都應該要有,換句話說,每個人都該有project manager的特質,也都應該是潛在的project manager。

另外對risk management也必須很強大,我朋友某A是enginee),雖然參與的project成功deliver,品質與架構都非常優秀,但是這歸因於A爆肝完成,也就是the triple constraint管理徹底失衡。A發現個人責任感被充分壓榨,而其他管理者都涼涼的準時上下班,談笑風生,最後A離職了。

這是一個失敗的project management,事實上此案例中不論位階,所有管理者並沒有project management的知識與經驗,有的都是個人過往的工作慣性與土法煉鋼的方式,這證明了management是一個很需要專心學習與實踐的skill,難度不亞於學習AI或是Machine Learning (笑~)。

這個project最大的失敗在於管理者根本不知道所謂的The triple constraint,也沒有risk assessment和management。客戶不斷壓榨vendor時,vendor端的PM要想如何維護住triple constraint的平衡,必須好好思考怎麼引導客戶甚至高階經理人避免危及triple constraint平衡的可能。

要維持triple constraint的平衡,就該有tradeoff,但是本案例PM與上位管理者沒有此概念,長期把壓力施加在工程師A身上,最終導致enigneer的不滿而離職 。

同理心是PM另一個非常需要的特質,有此特質則team building與溝通能力,甚至leadership都能水到渠成。

朋友A觀察到的lesson就是: 要有同理心,而manager真的無法引導客戶或是上位經理人的壓力時,最好能用情感方式支持專案成員,例如陪伴加班是很實際的鼓舞方式,(一起吐苦水倒是不用,免得引爆辦公室政治),而研發出身的專案管理者,最好不要放棄R&D skill,能夠實際承接小部分開發工作的話,不但能夠同理超時工作身心俱疲的成員感受,也能舒緩成員工作負荷,並且增進團隊感情。


Functional Manager

一般的部門經理,跟project manager的腳色很不同,因為不會面臨跨部門資源整合的問題。摘要如下:










Applied Machine Learning in Python 2 - Lab: KNN

我們用癌症統計資料來玩玩看KNN

讀入資料

import numpy as np
import pandas as pd
from sklearn.datasets import load_breast_cancer

cancer = load_breast_cancer()

不過這是一個dict,還不是一個pandas dataframe。雖然Scikit-learn不需要一定使用dataframe,但這對清理資料有幫助,嘗試將之轉換成dataframe。

def to_dataframe():
    df = pd.DataFrame(data=cancer['data'], columns=cancer['feature_names'])
    df['target'] = cancer['target']
    return df


class distribution

惡性與良性的分類各有多少呢?

def class_distribution():
    df = to_dataframe()
    result = pd.Series({'malignant':len(df[df['target']==0]),
                        'benign':len(df[df['target']==1])})
    return result


區分label與data

def split_data_label():
    df = to_dataframe()
    X = df[df.columns[:-1]]
    y = df['target']

    return X,y


製作75% : 25% training set vs test set

def training_set():
    X, y = split_data_label()

    from sklearn.model_selection import train_test_split
    X_train, X_test, y_train, y_test = train_test_split(X, y, random_state=0)

    return X_train, X_test, y_train, y_test


train 1-NN classifier

def _1NN_classifier():
    X_train, X_test, y_train, y_test = training_set()

    knn = KNeighborsClassifier(n_neighbors=1)
    knn.fit(X_train, y_train)

    return  knn


使用classifier來預測

要製作input,這邊使用一個假的data,就是每個feature在dataframe的mean組成的feature vector。

def predict():
    cancerdf = to_dataframe()
    means = cancerdf.mean()[:-1].values.reshape(1, -1)

    knn = _1NN_classifier()
    result = knn.predict(means)

    return  result


predict test set

當然我們要來evaluate estimator的好壞的話,還是要來測試test set:
def predict_test_set():
    X_train, X_test, y_train, y_test = training_set()
    knn = _1NN_classifier()

    result = knn.predict(X_test)

    return  result


Accuracy

最後可以檢驗此次training結果的accuracy:
def answer_eight():
    X_train, X_test, y_train, y_test = training_set()
    knn = _1NN_classifier()

    result = knn.score(X_test, y_test)

    return  result


這次training結果對training set以及test set中的兩種classes的accuracy如下:





Applied Machine Learning in Python 1 - KNN

Libraries

scikit-learn : 本課程的主角。
SciPy: 數學與統計
NumPy: data structures
pandas:data analysis/manipulation
matplotlib:2d plotting library


從Dataframe開始

假設我們用pandas讀進以下的 59x7 dataframe:



可以看到已經有分類的fruit_label的column,所以這將會是一個supervised learning data。
怎麼build 一個classifier?


檢視data 是否需要前處理

這不必講了,就是一個必要過程。


檢視與挑選features

如果把以下幾個feature pair-wise劃出histogram以及scatterplot:


2d feature space 老實講我是看不出有形成clusters。
不過如果採用3d feature space好像稍微比較有明顯度:


不過反正這只是一個範例。


獲得Training set and test set

需要從原本dataframe分出training set以及 test set。

(1) 首先決定我們要那些features? 把這些columns抓出來變成我們的source dataframe:
# For this example, we use the mass, width, and height features of each fruit instance
X = fruits[['mass', 'width', 'height']]
y = fruits['fruit_label']

注意我們用大寫變數來表示一個2D data (2darray / dataframe)


 (2) 再來我們把label column特別拿出來變成一個series (1d array):
y = fruits['fruit_label']



(3) 再來就讓sklearn來幫我們製作出random 3:1比例的training set / test set:





from sklearn.model_selection import train_test_split
X_train, X_test, y_train, y_test = train_test_split(X, y, random_state=0)

這樣就有了training features + training labels, 以及test features + test labels。








train classifier (KNN k-nearest neighbors)

KNN是一種"memory based" supervised classifier,因為他的藉由記憶traning set中的label來分類未知的sample。 詳細知識已經寫在Columbia AI課程18。 下圖是1-NN的decision boundary結果(我們上面的fruit dataset):
所以我們需要知道

 1. distance function定義 (e.g. Euclidean)

 2. K值

 3. 有沒有weight給任何特定neighbors? 

4. K > 1的時候,如何決定新sample的class (e.g. 投票?)

 以下sklearn library幫我們製作一個KNN classifier:

from sklearn.neighbors import KNeighborsClassifier

knn = KNeighborsClassifier(n_neighbors = 5)
knn.fit(X_train, y_train)


KNeighborsClassifier 是一個sklearn estimator,所以都含有同樣的interface fit()
fit function會改變estimator internal state,當training完畢。

可以看到knn是以下設定的estimator instance:




評估estimator的好壞

我們的test set就派上用場了,利用estimator的score() funciton,我們ˊ可以評估test set的"accuracy",accuracy的定義可以看這篇筆記我怎麼找不到。


accuracy = #(true label) / #(predicted true)

如果不滿意accuracy或是specificity,就看要怎麼調整training過程。


使用estimator

classifier製作完成了,可以用以下的function來predict new sample:

fruit_prediction = knn.predict([[20, 4.3, 5.5]])

lookup_fruit_name = dict(zip(fruits.fruit_label.unique(), fruits.fruit_name.unique()))
print('prediction = {}'.format(lookup_fruit_name[fruit_prediction[0]]))



K值的影響

對decision boundary來說,這是一個tradeoff (bias - variance tradeoff):



對accuracy的影響 (只針對此次training與test,並非通例):


2017年8月29日 星期二

Distributed Java 3 - Message Passing

Distributed Parallelism

在一個data center中的電腦群是一個cluster,把其中所有的節點邏輯上抽象化成單一computation unit(有多核可以做parallelism),這個computation unit的觀點稱為global view。

所以一個program他的global view是看不到cluster內的節點的,例如一個program中access一個array,programmer不會知道哪些部分的資料是存在哪一個節點的,他邏輯上會認可這個array是跟單一一台電腦上的array沒什麼兩樣。

這樣的abstraction稱為 Single Program Multiple Data。

Message Passing Interface

問題是這些節點實際上在底層要怎麼互相溝通?常用的high level interface稱為MPI,可能長得如下:


rank是每個節點的id, 從0,1, ...
上面這個main program應該是在每個節點去init? 課堂中的summary有講:

"It is common for each node to execute one MPI process, but it is also possible to execute more than MPI process per multicore node so as to improve the utilization of processor cores within the node."

不過至少可以確定每個local view of XL在每個node上可以init出不同的連續整數是沒問題。

Point to point communication in MPI

假設有兩個node (rank0 rank1):


用不管什麼方式(可能是同一台電腦上的bus, 或是不同電腦上的網路線)連結再一起,其中rank0想要傳送字串s給 rank1,對programmer來說,只寫一個program再不同的rank上執行,但是由於每個rank執行的程式碼不同,所以可以用conditional:



Message ordering & Deadlock

message passing因為有dependency,就有race condition或是deadlock的可能性。


例如上面四ranks (nodes),R0收到A以及R3收到B這兩個事件的順序是不能被保證的。
能夠被保證的順序只有當同一對 sender/receiver,傳送同樣type/tag
為什麼要同樣type/tag? 不知道 XDDD

為什麼會有deadlock? 因為send / receive是blocking operation! 以下圖為例:


如果R0 先執行第一行,但是R1還沒收到就執行R1的第一行,由於send是blocking,R0只能一直等R1收,而R1也只能一直等R0收,就deadlock。

一個解決方法就是其中一個rank調換operation order。
另外一個方法就是如果MPI framework有支援sendreceive() function,則runtime會自己avoid deadlock。

Non-blocking Communications

上面提到send/receive是blocking operation,這不是很好的架構,因為容易浪費時間在等待上:


MPI framework可能提供asynchronous send/receive,可以叫做ISEND / IRECEIVE:


當然還是要提供blocking機智,否則某些operation壹定要等待某些message到來才能執行,叫做WAIT(REQUEST D)。

前面講的deadlock例子也可以用non-blocking MPI來解決。


Collective Communications

這就是一個rank要怎麼broadcast/multicast messages給其他multiple ranks。




2017年8月23日 星期三

Distributed Java 2 - Client-server programming

Sockets

分散的電腦網路節點群,每台電腦上面可能有一或多個正在執行的JVM instance (processes),想要彼此溝通可以使用low level construct: socket。


所以一個jvm endpoint建立一個client socket,另一個則扮演server endpoint,也建立一個server socket。

Java server socket:

1. 建立instance and listen
2. 一旦有人連過來,則先取得client socket reference(利用accept()),然後可以用此reference來read write client socket的資訊:


client side socket



Serialization / Deserialization

socket溝通是low level communication,是byte stream,但是programs是以high level abstraction在設計,是以objects為個體,所以如何把objects轉成bytes 透過socket傳出去,這個動作稱為 serialization。在接收方重建這些bytes為原本的object,稱為deserialization。

object可能有fields指向其他objects,構成一個graph。所以XML是一個為了描述這樣的object relationship graph的方法。但這個很大overhead。

Java program可以使用Serializable interface,並且可以使用@transient annotation來標注是否要serialize某些fields,這對optimization有幫助。

Remote Method Invocation (RMI)

如果本地端JVM物件y要呼叫遠端JVM上的x.foo(),怎麼辦?
其實Java有一個RMI framework,搞定了一切,他背後架構如下:


本地端RMI client有一個proxy object (stub),可以跟底層socket/network interface溝通,而遠端的JVM process要implement RMI server,同樣也有一個proxy object與底層溝通,稱為skeleton,其實就這樣。

還有x和y因為要透過網路溝通,必須要serializable。


Multi-cast Sockets

unicast : 點對點socket communication
broadcast : 一對多 in-LAN  socket communication
multicast : 一對多 over internet socket communication

multicast有以下的operations:


Publish-Subscribe 

producers publish topic messages,而consumers subscribe topics。



infrastructures可以distributed to 大數量的nodes,稱為brokers,負責流量控制。

一個implementation: Apache Kafka。


2017年8月17日 星期四

Distributed Java 1 - Map-Reduce

Map-Reduce

distributed programming主要是應用在big data,以key-value pair的結構來處理。要用k-v結構是因為,這樣才能在不同的節點把同樣的key的value combine起來,所以稱為reduce (借用functional programming中的reduce operation)。

一個例子,有四個kv組合如下。mapping function把value根據乘法因子分成不同的kv pairs:


如果最後reduce是一個sum function,則在每個節點對同一個key的value就進行summation:



注意map-reduce 是functional programming paradigm(有保證之前學過的functional parallelism),所以所有key/value都是immutable,這也提供極好的scalability和fault tolerance。

Hadoop

hadoop是一個distributed map-reduce implementation,假設運作在網路連結的不同節點群:

hadoop scheduler自動把map tasks和schedule tasks分配給節點,programmer不用操心這些事情,只要implement好map function和reduce function即可。

有其他更high-level的framework,例如Hive/Pig ,可以讓programmer不用知道map-reduce paradigm,但是會產生map-reduce program的semantics。

由於data 可能分散在數千個storage節點中:
1. mapping先把分散的data建立kv pair
2. framework shuffle 同樣的key 的kv pair到不同的processing nodes
3. reduce
4. flush back to storage

只有1. 3. 是programmer需要設計的,其他都是framework做掉了。


Spark Framework (more modern Hadoop XD)

Spark創作原因是每個節點的記憶體(caching)要能充分利用,這在Hadoop是比較弱的(hadoop會被計算結果都存入storage)。
Spark的data structure是RDD (resilient distributed dataset),kv pair可以是其中一種選項。

Hadoop的map-reduce operation被generalized:
intermediate transforms: 可以是map / filter / join / ...
terminal actions: 可以是reduce / collect / ...

舉例來說以下五個步驟是可能的Spark處理word count的過程:

1. 創造一個spark context sc
2. 使用此sc從storage讀取data INPUT
3. INPUT 被map成可以處理的單位,可能透過words = INPUT.flatmap { }
4. 把words這個collection去做成kv pair 或是tuple :pairs = words.mapToPair{ }
5. reduce階段,pairs.reduceByKey{ }


TF-IDF Weight

簡單來說就是要比對documents之間的相似性。

Term Frequency:
利用把documents簡化成某些terms的集合,看這些terms出現在candidates中的頻率,可以做成一個frequency table: (每個entry意思是term在此doc的出現次數)



Inverse Document -Frequency: 
定義DF:是某個term出現在幾個candidate documents中。
則IDF : inverse document frequency,為 N/DF,N是所有candidate documents的數目,這是用來加重少出現但是可能要緊的字。

舉來來說 “the”可能出現在全部N個documents中,所以DF = N,但是其IDF = N/N =1
而"qqq"可能出現在2個documents中,DF = 2,IDF = N/2,所以給予了較高的權重!

研究出來的適合weight:


不過根本課程無關 XDDD,但是跟map-reduce有關。

首先documents可能幾十億個分散在不同storage nodes中,要計算某個term frequency TFij:


有上面input也可以直接算出DF:



Page Rank

簡單來說page B的rank,是所有指向他的page As的contribution:

iteration {

1. contribution(B) = RANK(A) / A網頁發出的links數目
分母的意義是B對A的重要性的權重。

2. RANK(B) = 0.15 + 0.85*contribution(B) //這是經驗公式

}

Spark怎麼實現page rank?
1.



2.


rank的計算就比較straightforward。

2017年8月14日 星期一

Parallel Java 5 - Phaser

Barrier in Java

加入barrier會也是需要overhead,例如以下parallel code:



如果要確保所有Hello印出在所有Bye之前,我們插入barrier的位置可以在lookup(i)前或是後,但是這樣一定會讓整個parallel program的span = 200 (記得span就是parallel program中work最大的path)。

node ---> lookup ----> next

如果能夠把lookup和barrier也parallelize 就能降低span到100,類似以下?

node ------------> lookup
         --(join)---> barrier

這事實上是可以的,因為lookup的執行不需要等待barrier的執行,反過來說也是。
Java Phaser class提供這種所謂 split-phase barrier的機制:



Point to Point Synchronization

要壓榨barrier parallelism,必須考慮parallel program的結構,分析哪些statement是dependent on另外statement,不用一致性的等待擋住,以下為例,如果不考慮個別dependency的話,span = 6,因為barrier一刀橫在中間,使得phase 1 work (3) + phase 2 work (3) = 6。


這三個tasks的兩個phases的statements dependency列出來之後發現,可以fine-grained來設定barrier,Java也提供這樣的機制:



Pipeline

想像以下pipeline:


image processing分成好p個步驟,Denoise -> Registration -> Segmentation,這幾個步驟都必須等待前一步驟完成,所以是sequential。

如果有n張image,則同一個時間點可以parallel進行不同image的各個階段,所以n張image經過p個stage pipeline總共的parallel work = n*p (work的定義是所有cost的總和

computation graph:
D1 --join--> R1 --join--> S1
|fork
D2 --join--> R2 --join--> S2


span (CPL) = n 次Denoise  + 最後一張image 的剩下步驟只能sequentially complete: p - 1 work,所以總共是 n + p - 1

parallelism = work/span = n*p / (n+p-1) ,這邊可以假設 n >> p。所以 parallelism ~= p
也就是大量input的pipeline能獲得平行處理的加速,約等同於pipeline stages的數目。

如果用barrier可以很正確執行上面的semantics:





2017年8月11日 星期五

Parallel Java 4 - Loops

Forall Construct (Java沒有 @@

forall是for loop的parallel版本,可以看到semantics如下:


範例:
forall (i : [0:n-1]) a[i] = b[i] + c[i]

如果只對一個collection做事的話,也可以用stream。

forall (n) 會spawn n tasks,所以使用forall來設計parallel program的話,要盡量減少tasks數目,不能隨便使用。

Matrix Multiplication Example

用數學定義來做matrix multiplication的話:


會是一個triple loop:
對C中每個row I來說
對row I中每個column J說
對C[I][J]要loop相對應的A[I][all K] 以及 B[all K][J]

可以把計算每個C[i][J]的task parallelize,所以,所以計算C[0][0] 和 C[0][1]是independent的,那有辦法把裡面的K-loop parallelize嗎?


K-loop是對某特定C[I][J]做累加,如果不做任何synchronization的話,會產生data race。不過這不是concurrent programming,所以當然不會做synchronization,要不然就失去parallelism。

所以K-loop需要sequential execution。


Phase Barrier Construct in Forall

barrier是一個forall中使用的construct,可以把forall中的code分成好幾個phase,所有parallel execution都會被擋在barrier前,不能往下執行,直到所有的parallel execution都完成phase 1的code。

癌舉例來說,以下的兩種寫法都能得到一樣的Jacobi algorithm答案:


第一個方法顯而易見產生更多的tasks。
第二種方法把forall變成outer loop,但是由於有swap array pointer的需要,我們必須確保在swap pointer之前,所有的tasks都完成computation。此時需要barrier的協助,注意barrier是放入在inner sequential loop中

雖然這邊會instantiate 和第一個方法#tasks 一樣多的barriers,但是barriers的overhead遠比task coordination少得多,所以這也就是能夠比第一種方法加速的原因。

避免產生太多tasks!

由上面那一段可以知道,由於實體的cpu core數目相對於data數目總是太少(例如16 cores vs 1 billion elements),如果把每個element 都parallel去處理,反而會拖慢速度,因為overhead >> 加速性。

所以類似之前做法,要把1 billion elements拆成幾個 n parts,那些n parts就可以在inner sequential loop處理掉,而outer loop就可以用forall parallelize,這樣tasks的數目就減少到n個。

n 的數目要實驗,因為會依據環境而定。

至於怎麼拆法,那就不一定,看選擇。


2017年8月10日 星期四

Parallel Java 3 - Memoization and Streams

Memoization

這是一種functional programming optimization的方法。

在sequential FP中,這其實就是建立lookup table,類似dynamic programming的方法:


parallelization版本中,lookup table應該要存放的是future objects,這樣才不會馬上去compute tasks,但是一旦compute之後,就不會再做第二次:




Java Streams

簡單來說,就是把java collection轉換成functional programming的collection,所以一堆filter/map/reduce之類的function都可以用,對parallelization來說,跟scala依樣,只要加上個par就可以把stream轉換成parallel stream,所以所有的filter/map/reduce之類的operation都會run in parallel。



Determinism

首先定義
1. functional determinism: 如果一個parallel prorgam,同一個input永遠會有同一個output
2. structural determinism: 如果一個parallel program,同一個input永遠會有同一個computation graph
3. determinism: 1. 2.同時成立

parallel program最常見的問題就是data race,如果使用本課程所教的parallel programming constructs的話,保證是deterministic!

就這樣  @@









Parallel Java 2 - Future

Functional Programming

functional programming最大特點就是沒有state,也就是每次f(x) = y 是固定的,不可能有其他mapping (否則就違反 function 在數學上的定義)。

所以一但釐清function calls之間的dependency,則可以找到能夠parallelize的部分。例如以下三個function calls:


C = G(A), D = H(A),所以只要知道A的值,C和D的運算式可以平行的。


Future Model

Future是一個functional parallelism pattern,定義某個async object的functional operations,當建立物件時,不會立即執行task,而是當caller 呼叫 get() ,才會去執行task (可以看成function才真正去把input map成output,所以稱為Future。

例如上面說的 A = F(B) 這個mapping,如果用Future來表現 (以下是pseudocode):

FA = Future { F(B) },這邊宣告建立一個Future instance (也稱為promise),其執行的task為 F(B)。
則當要真的去取得FA的task結果,我們可以 FA.get()。

所以 宣告 FC = Future{ G(FA.get()) },FD = Future { H(FA.get()) }

注意因為Future要再get()被呼叫時才會真正去執行task 並且block and wait,所以它在宣告後可以馬上return,繼續執行下一個statement。如果把第一段三個數學式子化成computation graph:


可以看到一但G(A) H(A)要真正執行task的時候,他們要等待FA這個FUTURE執行完,也F(B)的value必須要得到,所以在graph中是以JOIN來block。


Implement Future model in Java

因為Future task是有return value,所以要extend RecursiveTask。
依樣要override compute(),其他沒什麼不一樣。

真正要create future object還是依樣要call fork(),主要的thread還是要join,這相當於implement Future中的get():


注意這邊的join()是有return value的,因為RecursiveTask有implement Java的Future interface,所以這個join()跟之前的RecursiveAction的join是不一樣的signature。