- 資訊首頁(yè) > 開(kāi)發(fā)技術(shù) >
- Java基礎之MapReduce框架總結與擴展知識點(diǎn)
MapTask就是Map階段的job,它的數量由切片決定
1.Read階段:讀取文件,此時(shí)進(jìn)行對文件數據進(jìn)行切片(InputFormat進(jìn)行切片),通過(guò)切片,從而確定MapTask的數量,切片中包含數據和key(偏移量)
2.Map階段:這個(gè)階段是針對數據進(jìn)行map方法的計算操作,通過(guò)該方法,可以對切片中的key和value進(jìn)行處理
3.Collect收集階段:在用戶(hù)編寫(xiě)map()函數中,當數據處理完成后,一般會(huì )調用OutputCollector.collect()輸出結果。在該函數內部,它會(huì )將生成的key/value分區(調用Partitioner),并寫(xiě)入一個(gè)環(huán)形內存緩沖區中。
4.Spill階段:即“溢寫(xiě)”,當環(huán)形緩沖區滿(mǎn)后,MapReduce會(huì )將數據寫(xiě)到本地磁盤(pán)上,生成一個(gè)臨時(shí)文件。需要注意的是,將數據寫(xiě)入本地磁盤(pán)之前,先要對數據進(jìn)行一次本地排序,并在必要時(shí)對數據進(jìn)行合并、壓縮等操作。
5.Combine階段:當所有數據處理完成后,MapTask對所有臨時(shí)文件進(jìn)行一次合并,以確保最終只會(huì )生成一個(gè)數據文件,這個(gè)階段默認是沒(méi)有的,一般需要我們自定義
6.當所有數據處理完后,MapTask會(huì )將所有臨時(shí)文件合并成一個(gè)大文件,并保存到文件output/file.out中,同時(shí)生成相應的索引文件output/file.out.index。
7.在進(jìn)行文件合并過(guò)程中,MapTask以分區為單位進(jìn)行合并。對于某個(gè)分區,它將采用多輪遞歸合并的方式。每輪合并io.sort.factor(默認10)個(gè)文件,并將產(chǎn)生的文件重新加入待合并列表中,對文件排序后,重復以上過(guò)程,直到最終得到一個(gè)大文件。
8.讓每個(gè)MapTask最終只生成一個(gè)數據文件,可避免同時(shí)打開(kāi)大量文件和同時(shí)讀取大量小文件產(chǎn)生的隨機讀取帶來(lái)的開(kāi)銷(xiāo)
第四步溢寫(xiě)階段詳情:
ReduceTask就是Reduce階段的job,它的數量由Map階段的分區進(jìn)行決定
1.Copy階段:ReduceTask從各個(gè)MapTask上遠程拷貝一片數據,并針對某一片數據,如果其大小超過(guò)一定閾值,則寫(xiě)到磁盤(pán)上,否則直接放到內存中。
2.Merge階段:在遠程拷貝數據的同時(shí),ReduceTask啟動(dòng)了兩個(gè)后臺線(xiàn)程對內存和磁盤(pán)上的文件進(jìn)行合并,以防止內存使用過(guò)多或磁盤(pán)上文件過(guò)多。
3.Sort階段:按照MapReduce語(yǔ)義,用戶(hù)編寫(xiě)reduce()函數輸入數據是按key進(jìn)行聚集的一組數據。為了將key相同的數據聚在一起,Hadoop采用了基于排序的策略。由于各個(gè)MapTask已經(jīng)實(shí)現對自己的處理結果進(jìn)行了局部排序,因此,ReduceTask只需對所有數據進(jìn)行一次歸并排序即可。
4.Reduce階段:reduce()函數將計算結果寫(xiě)到HDFS上
我們在大數據開(kāi)篇概述中說(shuō)過(guò),數據是低價(jià)值的,所以我們要從海量數據中獲取到我們想要的數據,首先就需要對數據進(jìn)行清洗,這個(gè)過(guò)程也稱(chēng)之為ETL
還記得上一章中的Join案例么,我們對pname字段的填充,也算數據清洗的一種,下面我通過(guò)一個(gè)簡(jiǎn)單的案例來(lái)演示一下數據清洗
數據清洗案例
需求:過(guò)濾一下log日志中字段個(gè)數小于11的日志(隨便舉個(gè)栗子而已)
測試數據:就拿我們這兩天學(xué)習中HadoopNodeName產(chǎn)生的日志來(lái)當測試數據吧,我將log日志信息放到我的windows中,數據位置如下
/opt/module/hadoop-3.1.3/logs/hadoop-xxx-nodemanager-hadoop102.log
編寫(xiě)思路:
直接通過(guò)切片,然后判斷長(cháng)度即可,因為是舉個(gè)栗子,沒(méi)有那么復雜
真正的數據清洗會(huì )使用框架來(lái)做,這個(gè)我后面會(huì )為大家帶來(lái)相關(guān)的知識
package com.company.etl; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import java.io.IOException; public class ETLDriver { public static void main(String[] args) throws Exception { Job job = Job.getInstance(new Configuration()); job.setJarByClass(ETLDriver.class); job.setMapperClass(ETLMapper.class); job.setNumReduceTasks(0); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(NullWritable.class); FileInputFormat.setInputPaths(job,new Path("D:\\io\\input8")); FileOutputFormat.setOutputPath(job,new Path("D:\\io\\output88")); job.waitForCompletion(true); } }
package com.company.etl; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Counter; import org.apache.hadoop.mapreduce.Mapper; import java.io.IOException; public class ETLMapper extends Mapper<LongWritable, Text,Text, NullWritable> { @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { //清洗(過(guò)濾) String line = value.toString(); String[] info = line.split(" "); //判斷 if (info.length > 11){ context.write(value,NullWritable.get()); } } }
在上面數據清洗的基礎上進(jìn)行計數器的使用,Driver沒(méi)什么變化,只有Mapper
我們在Mapper的setup方法中,創(chuàng )建計數器的對象,然后在map方法中調用它即可
ETLMapper
package com.company.etl; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Counter; import org.apache.hadoop.mapreduce.Mapper; import java.io.IOException; public class ETLMapper extends Mapper<LongWritable, Text,Text, NullWritable> { private Counter sucess; private Counter fail; /* 創(chuàng )建計數器對象 */ @Override protected void setup(Context context) throws IOException, InterruptedException { /* getCounter(String groupName, String counterName); 第一個(gè)參數 :組名 隨便寫(xiě) 第二個(gè)參數 :計數器名 隨便寫(xiě) */ sucess = context.getCounter("ETL", "success"); fail = context.getCounter("ETL", "fail"); } @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { //清洗(過(guò)濾) String line = value.toString(); String[] info = line.split(" "); //判斷 if (info.length > 11){ context.write(value,NullWritable.get()); //統計 sucess.increment(1); }else{ fail.increment(1); } } }
好了,到這里,我們MapReduce就全部學(xué)習完畢了,接下來(lái),我再把整個(gè)內容串一下,還是MapReduce的那個(gè)圖
MapReduce的主要工作就是對數據進(jìn)行運算、分析,它的工作流程如下:
1.我們會(huì )將HDFS中的數據通過(guò)InputFormat進(jìn)行進(jìn)行讀取、切片,從而計算出MapTask的數量
2.每一個(gè)MapTask中都會(huì )有Mapper類(lèi),里面的map方法就是任務(wù)的具體實(shí)現,我們通過(guò)它,可以完成數據的key,value封裝,然后通過(guò)分區進(jìn)入shuffle中來(lái)完成每個(gè)MapTask中的數據分區排序
3.通過(guò)分區來(lái)決定ReduceTask的數量,每一個(gè)ReduceTask都有一個(gè)Reducer類(lèi),里面的reduce方法是ReduceTask的具體實(shí)現,它主要是完成最后的數據合并工作
4.當Reduce任務(wù)過(guò)重,我們可以通過(guò)Combiner合并,在Mapper階段來(lái)進(jìn)行局部的數據合并,減輕Reduce的任務(wù)量,當然,前提是Combiner所做的局部合并工作不會(huì )影響最終的結果
5.當Reducer的任務(wù)完成,會(huì )將最終的key,value寫(xiě)出,交給OutputFormat,用于數據的寫(xiě)出,通過(guò)OutputFormat來(lái)完成HDFS的寫(xiě)入操作
每一個(gè)MapTask和ReduceTask內部都是循環(huán)進(jìn)行讀取,并且它有三個(gè)方法:setup() map()/reduce() cleanup()
setup()
方法是在MapTask/ReduceTask剛剛啟動(dòng)時(shí)進(jìn)行調用,cleanup()
是在任務(wù)完成后調用
到此這篇關(guān)于Java基礎之MapReduce框架總結與擴展知識點(diǎn)的文章就介紹到這了,更多相關(guān)Java MapReduce框架內容請搜索腳本之家以前的文章或繼續瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
免責聲明:本站發(fā)布的內容(圖片、視頻和文字)以原創(chuàng )、來(lái)自本網(wǎng)站內容采集于網(wǎng)絡(luò )互聯(lián)網(wǎng)轉載等其它媒體和分享為主,內容觀(guān)點(diǎn)不代表本網(wǎng)站立場(chǎng),如侵犯了原作者的版權,請告知一經(jīng)查實(shí),將立刻刪除涉嫌侵權內容,聯(lián)系我們QQ:712375056,同時(shí)歡迎投稿傳遞力量。
Copyright ? 2009-2022 56dr.com. All Rights Reserved. 特網(wǎng)科技 特網(wǎng)云 版權所有 特網(wǎng)科技 粵ICP備16109289號
域名注冊服務(wù)機構:阿里云計算有限公司(萬(wàn)網(wǎng)) 域名服務(wù)機構:煙臺帝思普網(wǎng)絡(luò )科技有限公司(DNSPod) CDN服務(wù):阿里云計算有限公司 百度云 中國互聯(lián)網(wǎng)舉報中心 增值電信業(yè)務(wù)經(jīng)營(yíng)許可證B2
建議您使用Chrome、Firefox、Edge、IE10及以上版本和360等主流瀏覽器瀏覽本網(wǎng)站