国产成人精品18p,天天干成人网,无码专区狠狠躁天天躁,美女脱精光隐私扒开免费观看

Java基礎之MapReduce框架總結與擴展知識點(diǎn)

發(fā)布時(shí)間:2021-07-05 18:40 來(lái)源:腳本之家 閱讀:0 作者:GaryLea 欄目: 開(kāi)發(fā)技術(shù) 歡迎投稿:712375056

目錄

一、MapTask工作機制

MapTask就是Map階段的job,它的數量由切片決定

二、MapTask工作流程:

1.Read階段:讀取文件,此時(shí)進(jìn)行對文件數據進(jìn)行切片(InputFormat進(jìn)行切片),通過(guò)切片,從而確定MapTask的數量,切片中包含數據和key(偏移量)

2.Map階段:這個(gè)階段是針對數據進(jìn)行map方法的計算操作,通過(guò)該方法,可以對切片中的key和value進(jìn)行處理

3.Collect收集階段:在用戶(hù)編寫(xiě)map()函數中,當數據處理完成后,一般會(huì )調用OutputCollector.collect()輸出結果。在該函數內部,它會(huì )將生成的key/value分區(調用Partitioner),并寫(xiě)入一個(gè)環(huán)形內存緩沖區中。

4.Spill階段:即“溢寫(xiě)”,當環(huán)形緩沖區滿(mǎn)后,MapReduce會(huì )將數據寫(xiě)到本地磁盤(pán)上,生成一個(gè)臨時(shí)文件。需要注意的是,將數據寫(xiě)入本地磁盤(pán)之前,先要對數據進(jìn)行一次本地排序,并在必要時(shí)對數據進(jìn)行合并、壓縮等操作。

5.Combine階段:當所有數據處理完成后,MapTask對所有臨時(shí)文件進(jìn)行一次合并,以確保最終只會(huì )生成一個(gè)數據文件,這個(gè)階段默認是沒(méi)有的,一般需要我們自定義

6.當所有數據處理完后,MapTask會(huì )將所有臨時(shí)文件合并成一個(gè)大文件,并保存到文件output/file.out中,同時(shí)生成相應的索引文件output/file.out.index。

7.在進(jìn)行文件合并過(guò)程中,MapTask以分區為單位進(jìn)行合并。對于某個(gè)分區,它將采用多輪遞歸合并的方式。每輪合并io.sort.factor(默認10)個(gè)文件,并將產(chǎn)生的文件重新加入待合并列表中,對文件排序后,重復以上過(guò)程,直到最終得到一個(gè)大文件。

8.讓每個(gè)MapTask最終只生成一個(gè)數據文件,可避免同時(shí)打開(kāi)大量文件和同時(shí)讀取大量小文件產(chǎn)生的隨機讀取帶來(lái)的開(kāi)銷(xiāo)

第四步溢寫(xiě)階段詳情:

  • 步驟1:利用快速排序算法對緩存區內的數據進(jìn)行排序,排序方式是,先按照分區編號Partition進(jìn)行排序,然后按照key進(jìn)行排序。這樣,經(jīng)過(guò)排序后,數據以分區為單位聚集在一起,且同一分區內所有數據按照key有序。
  • 步驟2:按照分區編號由小到大依次將每個(gè)分區中的數據寫(xiě)入任務(wù)工作目錄下的臨時(shí)文件output/spillN.out(N表示當前溢寫(xiě)次數)中。如果用戶(hù)設置了Combiner,則寫(xiě)入文件之前,對每個(gè)分區中的數據進(jìn)行一次聚集操作。
  • 步驟3:將分區數據的元信息寫(xiě)到內存索引數據結構SpillRecord中,其中每個(gè)分區的元信息包括在臨時(shí)文件中的偏移量、壓縮前數據大小和壓縮后數據大小。如果當前內存索引大小超過(guò)1MB,則將內存索引寫(xiě)到文件output/spillN.out.index中。

三、ReduceTask工作機制

ReduceTask就是Reduce階段的job,它的數量由Map階段的分區進(jìn)行決定

四、ReduceTask工作流程:

1.Copy階段:ReduceTask從各個(gè)MapTask上遠程拷貝一片數據,并針對某一片數據,如果其大小超過(guò)一定閾值,則寫(xiě)到磁盤(pán)上,否則直接放到內存中。

2.Merge階段:在遠程拷貝數據的同時(shí),ReduceTask啟動(dòng)了兩個(gè)后臺線(xiàn)程對內存和磁盤(pán)上的文件進(jìn)行合并,以防止內存使用過(guò)多或磁盤(pán)上文件過(guò)多。

3.Sort階段:按照MapReduce語(yǔ)義,用戶(hù)編寫(xiě)reduce()函數輸入數據是按key進(jìn)行聚集的一組數據。為了將key相同的數據聚在一起,Hadoop采用了基于排序的策略。由于各個(gè)MapTask已經(jīng)實(shí)現對自己的處理結果進(jìn)行了局部排序,因此,ReduceTask只需對所有數據進(jìn)行一次歸并排序即可。

4.Reduce階段:reduce()函數將計算結果寫(xiě)到HDFS上

五、數據清洗(ETL)

我們在大數據開(kāi)篇概述中說(shuō)過(guò),數據是低價(jià)值的,所以我們要從海量數據中獲取到我們想要的數據,首先就需要對數據進(jìn)行清洗,這個(gè)過(guò)程也稱(chēng)之為ETL

還記得上一章中的Join案例么,我們對pname字段的填充,也算數據清洗的一種,下面我通過(guò)一個(gè)簡(jiǎn)單的案例來(lái)演示一下數據清洗

數據清洗案例

需求:過(guò)濾一下log日志中字段個(gè)數小于11的日志(隨便舉個(gè)栗子而已)

測試數據:就拿我們這兩天學(xué)習中HadoopNodeName產(chǎn)生的日志來(lái)當測試數據吧,我將log日志信息放到我的windows中,數據位置如下

/opt/module/hadoop-3.1.3/logs/hadoop-xxx-nodemanager-hadoop102.log

編寫(xiě)思路:

直接通過(guò)切片,然后判斷長(cháng)度即可,因為是舉個(gè)栗子,沒(méi)有那么復雜

真正的數據清洗會(huì )使用框架來(lái)做,這個(gè)我后面會(huì )為大家帶來(lái)相關(guān)的知識

  • ETLDriver
package com.company.etl;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

public class ETLDriver {
    public static void main(String[] args) throws Exception {
        Job job = Job.getInstance(new Configuration());

        job.setJarByClass(ETLDriver.class);

        job.setMapperClass(ETLMapper.class);

        job.setNumReduceTasks(0);

        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(NullWritable.class);


        FileInputFormat.setInputPaths(job,new Path("D:\\io\\input8"));
        FileOutputFormat.setOutputPath(job,new Path("D:\\io\\output88"));

        job.waitForCompletion(true);
    }
}

  • ETLMapper
package com.company.etl;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Counter;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

public class ETLMapper extends Mapper<LongWritable, Text,Text, NullWritable> {
    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {

        //清洗(過(guò)濾)
        String line = value.toString();
        String[] info = line.split(" ");
        //判斷
        if (info.length > 11){
            context.write(value,NullWritable.get());
        }
    }
}

六、計數器應用

  • 顧名思義,計數器的作用就是用于計數的,在Hadoop中,它內部也有一個(gè)計數器,用于監控統計我們處理數據的數量
  • 我們通常在MapReduce中通過(guò)上下文 context進(jìn)行應用,例如在Mapper中,我通過(guò)step方法進(jìn)行初始化計數器,然后在我們map方法中進(jìn)行計數

七、計數器案例

在上面數據清洗的基礎上進(jìn)行計數器的使用,Driver沒(méi)什么變化,只有Mapper

我們在Mapper的setup方法中,創(chuàng )建計數器的對象,然后在map方法中調用它即可

ETLMapper

package com.company.etl;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Counter;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

public class ETLMapper extends Mapper<LongWritable, Text,Text, NullWritable> {

    private Counter sucess;
    private Counter fail;
    /*
        創(chuàng  )建計數器對象
     */
    @Override
    protected void setup(Context context) throws IOException, InterruptedException {
        /*
             getCounter(String groupName, String counterName);
             第一個(gè)參數 :組名 隨便寫(xiě)
             第二個(gè)參數 :計數器名 隨便寫(xiě)
         */
        sucess = context.getCounter("ETL", "success");
        fail = context.getCounter("ETL", "fail");

    }

    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {

        //清洗(過(guò)濾)
        String line = value.toString();
        String[] info = line.split(" ");
        //判斷
        if (info.length > 11){
            context.write(value,NullWritable.get());
            //統計
            sucess.increment(1);
        }else{
            fail.increment(1);
        }

    }
}

八、MapReduce總結

好了,到這里,我們MapReduce就全部學(xué)習完畢了,接下來(lái),我再把整個(gè)內容串一下,還是MapReduce的那個(gè)圖

MapReduce的主要工作就是對數據進(jìn)行運算、分析,它的工作流程如下:

1.我們會(huì )將HDFS中的數據通過(guò)InputFormat進(jìn)行進(jìn)行讀取、切片,從而計算出MapTask的數量

2.每一個(gè)MapTask中都會(huì )有Mapper類(lèi),里面的map方法就是任務(wù)的具體實(shí)現,我們通過(guò)它,可以完成數據的key,value封裝,然后通過(guò)分區進(jìn)入shuffle中來(lái)完成每個(gè)MapTask中的數據分區排序

3.通過(guò)分區來(lái)決定ReduceTask的數量,每一個(gè)ReduceTask都有一個(gè)Reducer類(lèi),里面的reduce方法是ReduceTask的具體實(shí)現,它主要是完成最后的數據合并工作

4.當Reduce任務(wù)過(guò)重,我們可以通過(guò)Combiner合并,在Mapper階段來(lái)進(jìn)行局部的數據合并,減輕Reduce的任務(wù)量,當然,前提是Combiner所做的局部合并工作不會(huì )影響最終的結果

5.當Reducer的任務(wù)完成,會(huì )將最終的key,value寫(xiě)出,交給OutputFormat,用于數據的寫(xiě)出,通過(guò)OutputFormat來(lái)完成HDFS的寫(xiě)入操作

每一個(gè)MapTask和ReduceTask內部都是循環(huán)進(jìn)行讀取,并且它有三個(gè)方法:setup() map()/reduce() cleanup()
setup()方法是在MapTask/ReduceTask剛剛啟動(dòng)時(shí)進(jìn)行調用,cleanup()是在任務(wù)完成后調用

到此這篇關(guān)于Java基礎之MapReduce框架總結與擴展知識點(diǎn)的文章就介紹到這了,更多相關(guān)Java MapReduce框架內容請搜索腳本之家以前的文章或繼續瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!

免責聲明:本站發(fā)布的內容(圖片、視頻和文字)以原創(chuàng )、來(lái)自本網(wǎng)站內容采集于網(wǎng)絡(luò )互聯(lián)網(wǎng)轉載等其它媒體和分享為主,內容觀(guān)點(diǎn)不代表本網(wǎng)站立場(chǎng),如侵犯了原作者的版權,請告知一經(jīng)查實(shí),將立刻刪除涉嫌侵權內容,聯(lián)系我們QQ:712375056,同時(shí)歡迎投稿傳遞力量。

国产尤物AV尤物在线看| 18禁裸体动漫美女无遮挡网站| 亚洲日韩乱码中文无码蜜桃臀网站| 久久天堂AV女色优精品| 欧美精品一国产成人综合久久| 亚洲精品乱码久久久久久|