国产成人精品18p,天天干成人网,无码专区狠狠躁天天躁,美女脱精光隐私扒开免费观看

淺談實(shí)時(shí)計算框架Flink集群搭建與運行機制

發(fā)布時(shí)間:2021-08-15 18:37 來(lái)源: 閱讀:0 作者:知了一笑 欄目: 服務(wù)器 歡迎投稿:712375056

目錄

    一、Flink概述

    1.1、基礎簡(jiǎn)介

    主要特性包括:批流一體化、精密的狀態(tài)管理、事件時(shí)間支持以及精確一次的狀態(tài)一致性保障等。Flink不僅可以運行在包括YARN、Mesos、Kubernetes在內的多種資源管理框架上,還支持在裸機集群上獨立部署。在啟用高可用選項的情況下,它不存在單點(diǎn)失效問(wèn)題。

    這里要說(shuō)明兩個(gè)概念:

    • 邊界:無(wú)邊界和有邊界數據流,可以理解為數據的聚合策略或者條件;
    • 狀態(tài):即執行順序上是否存在依賴(lài)關(guān)系,即下次執行是否依賴(lài)上次結果;

    1.2、應用場(chǎng)景

    Data Driven

    事件驅動(dòng)型應用無(wú)須查詢(xún)遠程數據庫,本地數據訪(fǎng)問(wèn)使得它具有更高的吞吐和更低的延遲,以反欺詐案例來(lái)看,DataDriven把處理的規則模型寫(xiě)到DatastreamAPI中,然后將整個(gè)邏輯抽象到Flink引擎,當事件或者數據流入就會(huì )觸發(fā)相應的規則模型,一旦觸發(fā)規則中的條件后,DataDriven會(huì )快速處理并對業(yè)務(wù)應用進(jìn)行通知。

    Data Analytics

    和批量分析相比,由于流式分析省掉了周期性的數據導入和查詢(xún)過(guò)程,因此從事件中獲取指標的延遲更低。不僅如此,批量查詢(xún)必須處理那些由定期導入和輸入有界性導致的人工數據邊界,而流式查詢(xún)則無(wú)須考慮該問(wèn)題,Flink為持續流式分析和批量分析都提供了良好的支持,實(shí)時(shí)處理分析數據,應用較多的場(chǎng)景如實(shí)時(shí)大屏、實(shí)時(shí)報表。

    Data Pipeline

    與周期性的ETL作業(yè)任務(wù)相比,持續數據管道可以明顯降低將數據移動(dòng)到目的端的延遲,例如基于上游的StreamETL進(jìn)行實(shí)時(shí)清洗或擴展數據,可以在下游構建實(shí)時(shí)數倉,確保數據查詢(xún)的時(shí)效性,形成高時(shí)效的數據查詢(xún)鏈路,這種場(chǎng)景在媒體流的推薦或者搜索引擎中十分常見(jiàn)。

    二、環(huán)境部署

    2.1、安裝包管理

    [root@hop01 opt]# tar -zxvf flink-1.7.0-bin-hadoop27-scala_2.11.tgz

    [root@hop02 opt]# mv flink-1.7.0 flink1.7

    2.2、集群配置

    管理節點(diǎn)

    [root@hop01 opt]# cd /opt/flink1.7/conf

    [root@hop01 conf]# vim flink-conf.yaml

    jobmanager.rpc.address: hop01

    分布節點(diǎn)

    [root@hop01 conf]# vim slaves

    hop02

    hop03

    兩個(gè)配置同步到所有集群節點(diǎn)下面。

    2.3、啟動(dòng)與停止

    /opt/flink1.7/bin/start-cluster.sh

    /opt/flink1.7/bin/stop-cluster.sh

    啟動(dòng)日志:

    [root@hop01 conf]# /opt/flink1.7/bin/start-cluster.sh

    Starting cluster.

    Starting standalonesession daemon on host hop01.

    Starting taskexecutor daemon on host hop02.

    Starting taskexecutor daemon on host hop03.

    2.4、Web界面

    訪(fǎng)問(wèn):http://hop01:8081/

    三、開(kāi)發(fā)入門(mén)案例

    3.1、數據腳本

    分發(fā)一個(gè)數據腳本到各個(gè)節點(diǎn):

    /var/flink/test/word.txt

    3.2、引入基礎依賴(lài)

    這里基于Java寫(xiě)的基礎案例。

    <dependencies>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>1.7.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java_2.11</artifactId>
            <version>1.7.0</version>
        </dependency>
    </dependencies>

    3.3、讀取文件數據

    這里直接讀取文件中的數據,經(jīng)過(guò)程序流程分析出每個(gè)單詞出現的次數。

    public class WordCount {
        public static void main(String[] args) throws Exception {
            // 讀取文件數據
            readFile () ;
        }
    
        public static void readFile () throws Exception {
            // 1、執行環(huán)境創(chuàng  )建
            ExecutionEnvironment environment = ExecutionEnvironment.getExecutionEnvironment();
    
            // 2、讀取數據文件
            String filePath = "/var/flink/test/word.txt" ;
            DataSet<String> inputFile = environment.readTextFile(filePath);
    
            // 3、分組并求和
            DataSet<Tuple2<String, Integer>> wordDataSet = inputFile.flatMap(new WordFlatMapFunction(
            )).groupBy(0).sum(1);
    
            // 4、打印處理結果
            wordDataSet.print();
        }
    
        // 數據讀取個(gè)切割方式
        static class WordFlatMapFunction implements FlatMapFunction<String, Tuple2<String, Integer>> {
            @Override
            public void flatMap(String input, Collector<Tuple2<String, Integer>> collector){
                String[] wordArr = input.split(",");
                for (String word : wordArr) {
                    collector.collect(new Tuple2<>(word, 1));
                }
            }
        }
    }

    3.4、讀取端口數據

    在hop01服務(wù)上創(chuàng )建一個(gè)端口,并模擬一些數據發(fā)送到該端口:

    [root@hop01 ~]# nc -lk 5566

    c++,java

    通過(guò)Flink程序讀取并分析該端口的數據內容:

    public class WordCount {
        public static void main(String[] args) throws Exception {
            // 讀取端口數據
            readPort ();
        }
    
        public static void readPort () throws Exception {
            // 1、執行環(huán)境創(chuàng  )建
            StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
    
            // 2、讀取Socket數據端口
            DataStreamSource<String> inputStream = environment.socketTextStream("hop01", 5566);
    
            // 3、數據讀取個(gè)切割方式
            SingleOutputStreamOperator<Tuple2<String, Integer>> resultDataStream = inputStream.flatMap(
                    new FlatMapFunction<String, Tuple2<String, Integer>>()
            {
                @Override
                public void flatMap(String input, Collector<Tuple2<String, Integer>> collector) {
                    String[] wordArr = input.split(",");
                    for (String word : wordArr) {
                        collector.collect(new Tuple2<>(word, 1));
                    }
                }
            }).keyBy(0).sum(1);
    
            // 4、打印分析結果
            resultDataStream.print();
    
            // 5、環(huán)境啟動(dòng)
            environment.execute();
        }
    }

    四、運行機制

    4.1、FlinkClient

    客戶(hù)端用來(lái)準備和發(fā)送數據流到JobManager節點(diǎn),之后根據具體需求,客戶(hù)端可以直接斷開(kāi)連接,或者維持連接狀態(tài)等待任務(wù)處理結果。

    4.2、JobManager

    在Flink集群中,會(huì )啟動(dòng)一個(gè)JobManger節點(diǎn)和至少一個(gè)TaskManager節點(diǎn),JobManager收到客戶(hù)端提交的任務(wù)后,JobManager會(huì )把任務(wù)協(xié)調下發(fā)到具體的TaskManager節點(diǎn)去執行,TaskManager節點(diǎn)將心跳和處理信息發(fā)送給JobManager。

    4.3、TaskManager

    任務(wù)槽(slot)是TaskManager中最小的資源調度單位,在啟動(dòng)的時(shí)候就設置好了槽位數,每個(gè)槽位能啟動(dòng)一個(gè)Task,接收JobManager節點(diǎn)部署的任務(wù),并進(jìn)行具體的分析處理。

    五、源代碼地址

    GitHub·地址

    GitEE·地址

    以上就是淺談實(shí)時(shí)計算框架Flink集群搭建與運行機制的詳細內容,更多關(guān)于實(shí)時(shí)計算框架 Flink集群搭建與運行機制的資料請關(guān)注腳本之家其它相關(guān)文章!

    免責聲明:本站發(fā)布的內容(圖片、視頻和文字)以原創(chuàng )、來(lái)自本網(wǎng)站內容采集于網(wǎng)絡(luò )互聯(lián)網(wǎng)轉載等其它媒體和分享為主,內容觀(guān)點(diǎn)不代表本網(wǎng)站立場(chǎng),如侵犯了原作者的版權,請告知一經(jīng)查實(shí),將立刻刪除涉嫌侵權內容,聯(lián)系我們QQ:712375056,同時(shí)歡迎投稿傳遞力量。

    欧美性XXXXX极品| 国产精品无码久久久久| 99久久精品免费观看国产| 又爽又黄又无遮挡网站| 国产美女裸体丝袜喷水视频| 成年午夜无码AV片在线观看|