GroupingComparator應(yīng)用案例
更新時(shí)間:2015年12月29日14時(shí)58分 來源:傳智播客云計(jì)算學(xué)科 瀏覽次數(shù):
MapReduce中的GroupingComparator應(yīng)用案例
在日常的數(shù)據(jù)統(tǒng)計(jì)分析中,常常會(huì)有類似如下的求分組最大值統(tǒng)計(jì)需求,用到的數(shù)據(jù)示例如下:
itemid amount date …
10001 136.6 2015-1-12 …
10001 165.5 2015-1-12 …
10002 122.5 2015-1-12 …
10002 166.88 2015-1-12 …
10003 189.65 2015-1-12 …
10003 198.62 2015-1-13 …
10001 278.6 2015-1-13 …
10001 143.6 2015-1-13 …
需求是求出整個(gè)數(shù)據(jù)集中每一種商品銷售額最大的單筆訂單,結(jié)果如下:
10001 278.60
10002 166.88
10003 198.62
... ...
如果用傳統(tǒng)sql來求解,這是極其簡(jiǎn)單的:
select itemid,max(amount) from t_order group by itemid;
而用mapreduce程序,該如何實(shí)現(xiàn)呢?最簡(jiǎn)單的辦法是:
1、在mapper中將日志的每一行解析成鍵值對(duì): “key: itemid ,value:amount”
2、經(jīng)過shuffle之后,相同itemid的數(shù)據(jù)會(huì)發(fā)送給同一個(gè)reducer
3、然后,我們就可以在reducer中遍歷某個(gè)item的一組values,
4、這一組values對(duì)于amount來說是無序的,進(jìn)而需要在reducer中緩存這一組values,然后排序從而取到這一組values中的最大值。
這個(gè)辦法固然可行,但是效率不是很高,因?yàn)樵趓educer中針對(duì)一組values取最大amount,需要在內(nèi)存中進(jìn)行緩存并排序,在數(shù)據(jù)量大的情況下,會(huì)耗費(fèi)相當(dāng)多的內(nèi)存空間和cpu運(yùn)算資源,甚至可能會(huì)內(nèi)存溢出。
現(xiàn)在,就讓我們來思考另一種實(shí)現(xiàn)方式,如果能讓數(shù)據(jù)到達(dá)reducer時(shí)的次序是針對(duì)amount的倒序,則我們可以直接取改組values的第一個(gè)值即可,如何實(shí)現(xiàn)呢?
1、首先,我們構(gòu)造一個(gè)bean<itemid,amount> implements WritableComparable作為mapper輸出的key來傳遞數(shù)據(jù),在其compareTo()方法中定義邏輯:按照itemid升序及amount降序,這樣一來,mapper輸出的數(shù)據(jù)就會(huì)按照amount降序排列,示例如下:
<10001,278.60>
<10001,165.50>
<10001,136.60>
<10002,166.88>
<10002,122.5>
.......
2、但是,這樣一來,又帶來一個(gè)棘手的問題——相同item的bean在shuffle時(shí)不一定發(fā)往同一個(gè)reducer!因?yàn)槊恳粋€(gè)bean(就算是相同itemid)都是一個(gè)不同的對(duì)象,而默認(rèn)HashPartitioner分區(qū)的邏輯是用bean的hashcode計(jì)算分區(qū)號(hào)。從而,需要自定義一個(gè)ItemPartitioner,實(shí)現(xiàn)將相同itemid的bean發(fā)往同一個(gè)reducer,代碼如下所示:
class ItemPartitioner extends Partitioner{
int getPartition(bean,numreducertasks){
return bean.getItemid.hashCode() % numreducertasks;
}
}
這樣一來,可以保證相同item的數(shù)據(jù)會(huì)到達(dá)同一個(gè)reducer,并且是按照amount降序排序,如下所示:
<10001,278.60>
<10001,165.50>
<10001,136.60>
.......
3、接下來,就是如何取到這一組values中的最大值。
在默認(rèn)情況下,reducer會(huì)將拿到的數(shù)據(jù)按照相同key進(jìn)行聚合,然后對(duì)聚合起來的每一組數(shù)據(jù)調(diào)用一次reduce方法,此處麻煩的問題是,這里的每一個(gè)key都是一個(gè)對(duì)象,從而,就算是相同itemid的數(shù)據(jù),也不會(huì)聚合到一組,而是會(huì)逐一地調(diào)用reduce()方法進(jìn)行處理,這樣一來,我們也就沒辦法取到最大值了;
4、要解決這個(gè)問題,就得借助GroupingComparator了,其工作機(jī)制是這樣:
當(dāng)mapper輸出的相同partition的kv數(shù)據(jù)到達(dá)一個(gè)Reducer后,會(huì)有一個(gè)聚合的過程,即將“相同”key的kv聚合到一起(其實(shí)質(zhì)是利用GroupingComparator來對(duì)key進(jìn)行比較),然后將這一組聚合好的kv中最前面的一個(gè)kv的key傳給reduce方法的入?yún)ey,將一個(gè)用來遍歷這一組kv數(shù)據(jù)的values的迭代器iterator傳給reduce方法的入?yún)terator。
5、從而,我們可以自定義一個(gè)GroupingComparator來定義哪些kv可以聚合成一組,代碼示例如下:
public class GroupingComparator extends WritableComparator{
protected GroupingComparator() {
super(Bean.class,true);
}
@Override
public int compare(WritableComparable a, WritableComparable b) {
Bean kv1 = (Bean) a;
Bean kv2 = (Bean) b;
int cmp = kv1.getItemid().compareTo(kv2.getItemid());
return cmp;
}
}
6、這樣一來,雖然不同的bean是不同的對(duì)象,但是在進(jìn)行聚合的時(shí)候,根據(jù)GroupingComparator ,只要是itemid相同的bean都會(huì)算成一組聚合kv,然后這一組聚合kv的最前面一個(gè)kv(也就是amount值最大的那一個(gè))會(huì)傳入reduce方法的入?yún)ey,從而,在我們的reduce方法中,只要直接輸出這個(gè)key就ok了:
@Override
protected void reduce(Bean bean,Iterable<NullWritable> arg1,Context context)throws IOException, InterruptedException {
context.write(bean, NullWritable.get());
}
當(dāng)然,要想讓這個(gè)GroupingComparator 生效,還需要在job中進(jìn)行注冊(cè):
job.setGroupingComparatorClass(GroupingComparator.class);
綜上所述,該案例需要自定義這幾個(gè)元素:
自定義的復(fù)合key
自定義的partitioner
自定義的GroupingComparator