1. 概要
本文主要介紹了Hadoop Streaming的一些高級(jí)編程技巧,包括,怎樣在mapredue作業(yè)中定制輸出輸出格式?怎樣向mapreduce作業(yè)中傳遞參數(shù)?怎么在mapreduce作業(yè)中加載詞典?怎樣利用Hadoop Streamng處理二進(jìn)制格式的數(shù)據(jù)等。
關(guān)于Hadoop Streaming的基本編程方法,可參考:Hadoop Streaming編程。
2. 在mapreduce作業(yè)中定制輸入輸出格式
Hadoop 0.21.0之前的版本中的Hadoop Streaming工具只支持文本格式的數(shù)據(jù),而從Hadoop 0.21.0開(kāi)始,也支持二進(jìn)制格式的數(shù)據(jù)。這里介紹文本文件的輸入輸出格式定制,關(guān)于二進(jìn)制數(shù)據(jù)的格式,可參考第5節(jié)。
Hadoop Streaming提交作業(yè)的格式為:
1
2
3 |
Usage: $HADOOP_HOME/bin/hadoop jar \
$HADOOP_HOME/hadoop-streaming.jar [options]
|
其中,-D選項(xiàng)中的一些配置屬性可定義輸入輸出格式,具體如下(注意,對(duì)于文本而言,每一行中存在一個(gè)key/value對(duì),這里只能定制key和value之間的分割符,而行與行之間的分隔符不可定制,只能是\n):
(1)stream.map.input.field.separator/stream.map.output.field.separator: map task輸入/輸出數(shù)據(jù)的分隔符,默認(rèn)均為\t。
(2)stream.num.map.output.key.fields:指定map task輸出記錄中key所占的域數(shù)目,如
每一行形式為,Key1\tkey2\tkey3\tvalue,采用默認(rèn)的分隔符,且stream.num.map.output.key.fields設(shè)為2,則Key1\tkey2表示key,key3\tvalue表示value。
(3)stream.reduce.input.field.separator/stream.reduce.output.field.separator:reduce task輸入/輸出數(shù)據(jù)的分隔符,默認(rèn)均為\t。
(4)stream.num.reduce.output.key.fields:指定reduce task輸出記錄中key所占的域數(shù)目
3. 向mapreduce作業(yè)傳遞參數(shù)
提交作業(yè)時(shí),使用-cmdenv選項(xiàng)以環(huán)境變量的形式將你的參數(shù)傳遞給mapper/reducer,如:
1
2
3
4
5
6
7
8
9
10
11 |
$HADOOP_HOME/bin/hadoop jar \
contrib/streaming/hadoop-0.20.2-streaming.jar \
-input input \
-ouput output \
-cmdenv grade=1 \
…….
|
然后編寫(xiě)mapper或reducer時(shí),使用main函數(shù)的第三個(gè)參數(shù)捕獲你傳入的環(huán)境變量,如:
1
2
3
4
5
6
7
8
9
10
11
12
13 |
int main( int argc, char *argv[], char *env[]){
int i, grade;
for (i = 0; env[i] != NULL; i++)
if ( strncmp (env[i], “grade=”, 6) == 0)
grade= atoi (env[i]+6);
……
}
|
4. 在mapreduce作業(yè)中加載詞典
提交作業(yè)時(shí),使用-file選項(xiàng),如:
1
2
3
4
5
6
7
8
9
10
11 |
$HADOOP_HOME/bin/hadoop jar \
contrib/streaming/hadoop-0.20.2-streaming.jar \
-input input \
-ouput output \
-file dict.txt \
…….
|
然后編寫(xiě)mapper或reducer時(shí),像本地文件一樣打開(kāi)并使用dic.txt文件,如:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 |
int main( int argc, char *argv[], char *env[]){
FILE *fp;
char buffer[1024];
fp = fopen ( "dict.txt" , "r" );
if (!fp) return 1;
while ( fgets (buffer, 1024, fp)!=NULL) {
……
}
……
}
|
如果要加載非常大的詞典或配置文件,Hadoop Streaming還提供了另外一個(gè)選項(xiàng)-files,該選項(xiàng)后面跟的是HDFS上的一個(gè)文件(將你的配置文件放到HDFS上,再大也可以!?。。?,你可以在程序中像打開(kāi)本地文件一樣打開(kāi)該文件,此外,你也可以使用#符號(hào)在本地建一個(gè)系統(tǒng)鏈接,如:
1
2
3
4
5
6
7 |
$HADOOP_HOME/bin/hadoop jar \
contrib/streaming/hadoop-0.20.2-streaming.jar \
-file hdfs: //host:fs_port/user/dict.txt#dict_link \
…….
|
在代碼中這樣做:
如:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 |
int main( int argc, char *argv[], char *env[]){
FILE *fp;
char buffer[1024];
fp = fopen ( "dict_link " , "r" ); //or fp = fopen("dict.txt ","r");
if (!fp) return 1;
while ( fgets (buffer, 1024, fp)!=NULL) {
……
}
……
}
|
5. 處理二進(jìn)制格式的數(shù)據(jù)
從Hadoop 0.21.0開(kāi)始,streaming支持二進(jìn)制文件(具體可參考:HADOOP-1722),用戶(hù)提交作業(yè)時(shí),使用-io選項(xiàng)指明二進(jìn)制文件格式。0.21.0版本中增加了兩種二進(jìn)制文件格式,分別為:
(1) rawbytes:key和value均用【4個(gè)字節(jié)的長(zhǎng)度+原始字節(jié)】表示
(2) typedbytes:key和value均用【1字節(jié)類(lèi)型+4字節(jié)長(zhǎng)度+原始字節(jié)】表示
用戶(hù)提交作業(yè)時(shí),如果用-io指定二進(jìn)制格式為typedbytes,則map的輸入輸出,reduce的輸入輸出均為typedbytes,如果想細(xì)粒度的控制這幾個(gè)輸入輸出,可采用以下幾個(gè)選項(xiàng):
1
2
3
4
5
6
7 |
-D stream.map.input=[identifier]
-D stream.map.output=[identifier]
-D stream.reduce.input=[identifier]
-D stream.reduce.output=[identifier]
|
你如果采用的python語(yǔ)言,下面是從 HADOOP-1722 中得到的一個(gè)例子(里面用到了解析typedbytes的python庫(kù),見(jiàn):http://github.com/klbostee/typedbytes ):
mapper腳本如下:
1
2
3
4
5
6
7
8
9
10
11
12
13 |
import sys
import typedbytes
input = typedbytes.PairedInput(sys.stdin)
output = typedbytes.PairedOutput(sys.stdout)
for (key, value) in input :
for word in value.split():
output.write((word, 1 ))
|
reducer腳本:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17 |
import sys
import typedbytes
from itertools import groupby
from operator import itemgetter
input = typedbytes.PairedInput(sys.stdin)
output = typedbytes.PairedOutput(sys.stdout)
for (key, group) in groupby( input , itemgetter( 0 )):
values = map (itemgetter( 1 ), group)
output.write((key, sum (values)))
|
6. 自定義counter并增加counter的值
用戶(hù)采用某種語(yǔ)言編寫(xiě)的mapper或者reducer可采用標(biāo)準(zhǔn)錯(cuò)誤輸出(stderr)自定義和改變counter值,格式為:reporter:counter:<group>,<counter>,<amount>,如,在C語(yǔ)言編寫(xiě)的mapper/reducer中:
1 |
fprintf (stderr, “reporter:counter:group,counter1,1”); //將組group中的counter1增加1
|
注:用戶(hù)定義的自定義counter的最終結(jié)果會(huì)在桌面或者web界面上顯示出來(lái)。
如果你想在mapreduce作業(yè)執(zhí)行過(guò)程中,打印一些狀態(tài)信息,同樣可使用標(biāo)準(zhǔn)錯(cuò)誤輸出,格式為:reporter:status:<message>,如,在C語(yǔ)言編寫(xiě)的mapper/reducer中:
1 |
fprintf (stderr, “reporter:status:mapreduce job is started…..”); //在shell桌面上打印“mapreduce job is started…..”
|
7. 在mapreduce使用Linux Pipes
迄今為止(0.21.0版本之前,包括0.21.0),Hadoop Streaming是不支持Linux Pipes,如:-mapper “cut -f1 | sed s/foo/bar/g”會(huì)報(bào)”java.io.IOException: Broken pipe”錯(cuò)誤。
8. 在mapreduce中獲取JobConf的屬性值
在0.21.0版本中,streaming作業(yè)執(zhí)行過(guò)程中,JobConf中以mapreduce開(kāi)頭的屬性(如mapreduce.job.id)會(huì)傳遞給mapper和reducer,關(guān)于這些參數(shù),可參考:http://hadoop./mapreduce/docs/r0.21.0/mapred_tutorial.html#Configured+Parameters
其中,屬性名字中的“.”會(huì)變成“_”,如mapreduce.job.id會(huì)變?yōu)閙apreduce_job_id,用戶(hù)可在mapper/reducer中獲取這些屬性值直接使用(可能是傳遞給環(huán)境變量參數(shù),即main函數(shù)的第三個(gè)參數(shù),本文作業(yè)還未進(jìn)行驗(yàn)證)。
9. 一些Hadoop Streaming的開(kāi)源軟件包
(1) 針對(duì)Hadoop Streaming常用操作的C++封裝包(如自定義和更新counter,輸出狀態(tài)信息等):https://github.com/dgleich/hadoopcxx
(2) C++實(shí)現(xiàn)的typedbytes代碼庫(kù):https://github.com/dgleich/libtypedbytes
(3) python實(shí)現(xiàn)的typedbytes代碼庫(kù): http://github.com/klbostee/typedbytes
(4) Java實(shí)現(xiàn)的typedbytes代碼庫(kù)(Hadoop 0.21.0代碼中自帶)
10. 總結(jié)
Hadoop Streaming使得程序員采用各種語(yǔ)言編寫(xiě)mapreduce程序變得可能,它具備程序員所需的大部分功能接口,同時(shí)由于這種方法編寫(xiě)mapreduce作業(yè)簡(jiǎn)單快速,越來(lái)越多的程序員開(kāi)始嘗試使用Hadoop Steraming。
11. 參考資料
http://hadoop./mapreduce/docs/r0.21.0/streaming.html
https://issues./jira/browse/HADOOP-1722