初步了解Hadoop平臺

jopen 10年前發布 | 32K 次閱讀 Hadoop 分布式/云計算/大數據


什么是Hadoop?

--------------------------------------------


hadoop一個用 于在普通硬件構成 的大集群上運行應用程序的框架。Hadoop框架透明地為應用程序提供可靠性與數據移動保障。Hadoop實現了一個被稱為 mapReduce的 計算模型,在這個計算模型中應用程序被分為很多的小塊,每一塊都能在集群中的任意節點上執行或重新執行。另外,它還提供了一個分布式文件系統(HDFS) 來在計算節 點上存儲數據,為集群提供了非常高的聚合帶寬。在本框架中無論是Map/Reduce還是分布式文件系統都被設計為能夠自動地處理節點上的錯誤。

Hadoop的組成:

  • Hadoop Common – contains libraries and utilities needed by other Hadoop modules
  • Hadoop Distributed File System (HDFS) – a distributed file-system that stores data on commodity machines, providing very high aggregate bandwidth across the cluster.
  • Hadoop YARN – a resource-management platform responsible for managing compute resources in clusters and using them for scheduling of users' applications.
  • Hadoop MapReduce – a programming model for large scale data processing.

Hadoop 框架中最核心的設計就是:MapReduce和HDFS。MapReduce的思想是由Google的一篇論文所提及而被廣為流傳的,簡單的一句話解釋 MapReduce就是“任務的分解與結果的匯總”。HDFS是Hadoop分布式文件系統(Hadoop Distributed File System)的縮寫,為分布式計算存儲提供了底層支持。


MapReduce原理:

---------------------------------------------

MapReduce 從它名字上來看就大致可以看出個緣由,兩個動詞Map和Reduce,“Map(展開)”就是將一個任務分解成為多個任務,“Reduce”就是將分解后 多任務處理的結果匯總起來,得出最后的分析結果。這不是什么新思想,其實在前面提到的多線程,多任務的設計就可以找到這種思想的影子。不論是現實社會,還 是在程序設計中,一項工作往往可以被拆分成為多個任務,任務之間的關系可以分為兩種:一種是不相關的任務,可以并行執行;另一種是任務之間有相互的依賴, 先后順序不能夠顛倒,這類任務是無法并行處理的。回到大學時期,教授上課時讓大家去分析關鍵路徑,無非就是找最省時的任務分解執行方式。在分布式系統中, 機器集群就可以看作硬件資源池,將并行的任務拆分,然后交由每一個空閑機器資源去處理,能夠極大地提高計算效率,同時這種資源無關性,對于計算集群的擴展 無疑提供了最好的設計保證。任務分解處理以后,那就需要將處理以后的結果再匯總起來,這就是Reduce要做的工作。結構圖如下:

edd15f7e76dd69430c2c76c70da0f9d8 (354×360)

網上有個簡單的比喻來解釋MapReduce原理:

我們要數圖書館中的所有書。你數1號書架,我數2號書架。這就是“Map”。我們人越多,數書就更快。

現在我們到一起,把所有人的統計數加在一起。這就是“Reduce”。


 圖:MapReduce結構示意圖 

上圖就是MapReduce大致的結構圖,在Map前還可能會對輸入的數據有Split(分割)的過程,保證任務并行效率,在Map之后還會有 Shuffle(混合)的過程,對于提高Reduce的效率以及減小數據傳輸的壓力有很大的幫助。后面會具體提及這些部分的細節。           

1.Map-Reduce的邏輯過程

假設我們需要處理一批有關天氣的數據,其格式如下:

  • 按照ASCII碼存儲,每行一條記錄
  • 每一行字符從0開始計數,第15個到第18個字符為年
  • 第25個到第29個字符為溫度,其中第25位是符號+/-

0067011990999991950051507+0000+

0043011990999991950051512+0022+

0043011990999991950051518-0011+

0043012650999991949032412+0111+

0043012650999991949032418+0078+

0067011990999991937051507+0001+

0043011990999991937051512-0002+

0043011990999991945051518+0001+

0043012650999991945032412+0002+

0043012650999991945032418+0078+

現在需要統計出每年的最高溫度。

Map-Reduce主要包括兩個步驟:Map和Reduce

每一步都有key-value對作為輸入和輸出:

  • map階段的key-value對的格式是由輸入的格式所決定的,如果是默認的TextInputFormat,則每行作為一個記錄進程處理,其中key為此行的開頭相對于文件的起始位置,value就是此行的字符文本
  • map階段的輸出的key-value對的格式必須同reduce階段的輸入key-value對的格式相對應

對于上面的例子,在map過程,輸入的key-value對如下:

(0, 0067011990999991950051507+0000+)

(33, 0043011990999991950051512+0022+)

(66, 0043011990999991950051518-0011+)

(99, 0043012650999991949032412+0111+)

(132, 0043012650999991949032418+0078+)

(165, 0067011990999991937051507+0001+)

(198, 0043011990999991937051512-0002+)

(231, 0043011990999991945051518+0001+)

(264, 0043012650999991945032412+0002+)

(297, 0043012650999991945032418+0078+)

在map過程中,通過對每一行字符串的解析,得到年-溫度的key-value對作為輸出:

(1950, 0)

(1950, 22)

(1950, -11)

(1949, 111)

(1949, 78)

(1937, 1)

(1937, -2)

(1945, 1)

(1945, 2)

(1945, 78)

在reduce過程,將map過程中的輸出,按照相同的key將value放到同一個列表中作為reduce的輸入

(1950, [0, 22, –11])

(1949, [111, 78])

(1937, [1, -2])

(1945, [1, 2, 78])

在reduce過程中,在列表中選擇出最大的溫度,將年-最大溫度的key-value作為輸出:

(1950, 22)

(1949, 111)

(1937, 1)

(1945, 78)

其邏輯過程可用如下圖表示:

2、編寫Map-Reduce程序

編寫Map-Reduce程序,一般需要實現兩個函數:mapper中的map函數和reducer中的reduce函數。

一般遵循以下格式:

  • map: (K1, V1)  ->  list(K2, V2)

public interface Mapper extends JobConfigurable, Closeable {

  void map(K1 key, V1 value, OutputCollector output, Reporter reporter)

  throws IOException;

}

  • reduce: (K2, list(V))  ->  list(K3, V3) 

public interface Reducer extends JobConfigurable, Closeable {

  void reduce(K2 key, Iterator values,

              OutputCollector output, Reporter reporter)

    throws IOException;

}

 

對于上面的例子,則實現的mapper如下:

public class MaxTemperatureMapper extends MapReduceBase implements Mapper {

    @Override

    public void map(LongWritable key, Text value, OutputCollector output, Reporter reporter) throws IOException {

        String line = value.toString();

        String year = line.substring(15, 19);

        int airTemperature;

        if (line.charAt(25) == '+') {

            airTemperature = Integer.parseInt(line.substring(26, 30));

        } else {

            airTemperature = Integer.parseInt(line.substring(25, 30));

        }

        output.collect(new Text(year), new IntWritable(airTemperature));

    }

}

實現的reducer如下:

public class MaxTemperatureReducer extends MapReduceBase implements Reducer {

    public void reduce(Text key, Iterator values, OutputCollector output, Reporter reporter) throws IOException {

        int maxValue = Integer.MIN_VALUE;

        while (values.hasNext()) {

            maxValue = Math.max(maxValue, values.next().get());

        }

        output.collect(key, new IntWritable(maxValue));

    }

}

 

欲運行上面實現的Mapper和Reduce,則需要生成一個Map-Reduce得任務(Job),其基本包括以下三部分:

  • 輸入的數據,也即需要處理的數據
  • Map-Reduce程序,也即上面實現的Mapper和Reducer
  • 此任務的配置項JobConf

欲配置JobConf,需要大致了解Hadoop運行job的基本原理:

  • Hadoop將Job分成task進行處理,共兩種task:map task和reduce task
  • Hadoop有兩類的節點控制job的運行:JobTracker和TaskTracker
    • JobTracker協調整個job的運行,將task分配到不同的TaskTracker上
    • TaskTracker負責運行task,并將結果返回給JobTracker
  • Hadoop將輸入數據分成固定大小的塊,我們稱之input split
  • Hadoop為每一個input split創建一個task,在此task中依次處理此split中的一個個記錄(record)
  • Hadoop會盡量讓輸入數據塊所在的DataNode和task所執行的DataNode(每個DataNode上都有一個TaskTracker)為同一個,可以提高運行效率,所以input split的大小也一般是HDFS的block的大小。
  • Reduce task的輸入一般為Map Task的輸出,Reduce Task的輸出為整個job的輸出,保存在HDFS上。
  • 在reduce中,相同key的所有的記錄一定會到同一個TaskTracker上面運行,然而不同的key可以在不同的TaskTracker上面運行,我們稱之為partition
    • partition的規則為:(K2, V2) –> Integer, 也即根據K2,生成一個partition的id,具有相同id的K2則進入同一個partition,被同一個TaskTracker上被同一個Reducer進行處理。

public interface Partitioner extends JobConfigurable {

  int getPartition(K2 key, V2 value, int numPartitions);

}

下圖大概描述了Map-Reduce的Job運行的基本原理:

image

 

下面我們討論JobConf,其有很多的項可以進行配置:

  • setInputFormat:設置map的輸入格式,默認為TextInputFormat,key為LongWritable, value為Text
  • setNumMapTasks:設置map任務的個數,此設置通常不起作用,map任務的個數取決于輸入的數據所能分成的input split的個數
  • setMapperClass:設置Mapper,默認為IdentityMapper
  • setMapRunnerClass:設置MapRunner, map task是由MapRunner運行的,默認為MapRunnable,其功能為讀取input split的一個個record,依次調用Mapper的map函數
  • setMapOutputKeyClass和setMapOutputValueClass:設置Mapper的輸出的key-value對的格式
  • setOutputKeyClass和setOutputValueClass:設置Reducer的輸出的key-value對的格式
  • setPartitionerClass 和setNumReduceTasks:設置Partitioner,默認為HashPartitioner,其根據key的hash值來決定進入哪個 partition,每個partition被一個reduce task處理,所以partition的個數等于reduce task的個數
  • setReducerClass:設置Reducer,默認為IdentityReducer
  • setOutputFormat:設置任務的輸出格式,默認為TextOutputFormat
  • FileInputFormat.addInputPath:設置輸入文件的路徑,可以使一個文件,一個路徑,一個通配符。可以被調用多次添加多個路徑
  • FileOutputFormat.setOutputPath:設置輸出文件的路徑,在job運行前此路徑不應該存在

當然不用所有的都設置,由上面的例子,可以編寫Map-Reduce程序如下:

public class MaxTemperature {

    public static void main(String[] args) throws IOException {

        if (args.length != 2) {

            System.err.println("Usage: MaxTemperature ");

            System.exit(-1);

        }

        JobConf conf = new JobConf(MaxTemperature.class);

        conf.setJobName("Max temperature");

        FileInputFormat.addInputPath(conf, new Path(args[0]));

        FileOutputFormat.setOutputPath(conf, new Path(args[1]));

        conf.setMapperClass(MaxTemperatureMapper.class);

        conf.setReducerClass(MaxTemperatureReducer.class);

        conf.setOutputKeyClass(Text.class);

        conf.setOutputValueClass(IntWritable.class);

        JobClient.runJob(conf);

    }

}

3、Map-Reduce數據流(data flow)

Map-Reduce的處理過程主要涉及以下四個部分:

  • 客戶端Client:用于提交Map-reduce任務job
  • JobTracker:協調整個job的運行,其為一個Java進程,其main class為JobTracker
  • TaskTracker:運行此job的task,處理input split,其為一個Java進程,其main class為TaskTracker
  • HDFS:hadoop分布式文件系統,用于在各個進程間共享Job相關的文件

image

3.1、任務提交

JobClient.runJob()創建一個新的JobClient實例,調用其submitJob()函數。

  • 向JobTracker請求一個新的job ID
  • 檢測此job的output配置
  • 計算此job的input splits
  • 將Job運行所需的資源拷貝到JobTracker的文件系統中的文件夾中,包括job jar文件,job.xml配置文件,input splits
  • 通知JobTracker此Job已經可以運行了

提交任務后,runJob每隔一秒鐘輪詢一次job的進度,將進度返回到命令行,直到任務運行完畢。

 

3.2、任務初始化

 

當JobTracker收到submitJob調用的時候,將此任務放到一個隊列中,job調度器將從隊列中獲取任務并初始化任務。

初始化首先創建一個對象來封裝job運行的tasks, status以及progress。

在創建task之前,job調度器首先從共享文件系統中獲得JobClient計算出的input splits。

其為每個input split創建一個map task。

每個task被分配一個ID。

 

3.3、任務分配

 

TaskTracker周期性的向JobTracker發送heartbeat。

在heartbeat中,TaskTracker告知JobTracker其已經準備運行一個新的task,JobTracker將分配給其一個task。

在JobTracker為TaskTracker選擇一個task之前,JobTracker必須首先按照優先級選擇一個Job,在最高優先級的Job中選擇一個task。

TaskTracker有固定數量的位置來運行map task或者reduce task。

默認的調度器對待map task優先于reduce task

當選擇reduce task的時候,JobTracker并不在多個task之間進行選擇,而是直接取下一個,因為reduce task沒有數據本地化的概念。

 

3.4、任務執行

 

TaskTracker被分配了一個task,下面便要運行此task。

首先,TaskTracker將此job的jar從共享文件系統中拷貝到TaskTracker的文件系統中。

TaskTracker從distributed cache中將job運行所需要的文件拷貝到本地磁盤。

其次,其為每個task創建一個本地的工作目錄,將jar解壓縮到文件目錄中。

其三,其創建一個TaskRunner來運行task。

TaskRunner創建一個新的JVM來運行task。

被創建的child JVM和TaskTracker通信來報告運行進度。

 

3.4.1、Map的過程

MapRunnable從input split中讀取一個個的record,然后依次調用Mapper的map函數,將結果輸出。

map的輸出并不是直接寫入硬盤,而是將其寫入緩存memory buffer。

當buffer中數據的到達一定的大小,一個背景線程將數據開始寫入硬盤。

在寫入硬盤之前,內存中的數據通過partitioner分成多個partition。

在同一個partition中,背景線程會將數據按照key在內存中排序。

每次從內存向硬盤flush數據,都生成一個新的spill文件。

當此task結束之前,所有的spill文件被合并為一個整的被partition的而且排好序的文件。

reducer可以通過http協議請求map的輸出文件,tracker.http.threads可以設置http服務線程數。

3.4.2、Reduce的過程

當map task結束后,其通知TaskTracker,TaskTracker通知JobTracker。

對于一個job,JobTracker知道TaskTracer和map輸出的對應關系。

reducer中一個線程周期性的向JobTracker請求map輸出的位置,直到其取得了所有的map輸出。

reduce task需要其對應的partition的所有的map輸出。

reduce task中的copy過程即當每個map task結束的時候就開始拷貝輸出,因為不同的map task完成時間不同。

reduce task中有多個copy線程,可以并行拷貝map輸出。

當很多map輸出拷貝到reduce task后,一個背景線程將其合并為一個大的排好序的文件。

當所有的map輸出都拷貝到reduce task后,進入sort過程,將所有的map輸出合并為大的排好序的文件。

最后進入reduce過程,調用reducer的reduce函數,處理排好序的輸出的每個key,最后的結果寫入HDFS。

 

image

 

3.5、任務結束

 

當JobTracker獲得最后一個task的運行成功的報告后,將job得狀態改為成功。

當JobClient從JobTracker輪詢的時候,發現此job已經成功結束,則向用戶打印消息,從runJob函數中返回。

HdFS的基本概念

--------------------------------------------------


1.1、數據塊(block)

  • HDFS(Hadoop Distributed File System)默認的最基本的存儲單位是64M的數據塊。
  • 和普通文件系統相同的是,HDFS中的文件是被分成64M一塊的數據塊存儲的。
  • 不同于普通文件系統的是,HDFS中,如果一個文件小于一個數據塊的大小,并不占用整個數據塊存儲空間。

1.2、元數據節點(Namenode)和數據節點(datanode)

  • 元數據節點用來管理文件系統的命名空間
    • 其將所有的文件和文件夾的元數據保存在一個文件系統樹中。
    • 這些信息也會在硬盤上保存成以下文件:命名空間鏡像(namespace image)及修改日志(edit log)
    • 其還保存了一個文件包括哪些數據塊,分布在哪些數據節點上。然而這些信息并不存儲在硬盤上,而是在系統啟動的時候從數據節點收集而成的。
  • 數據節點是文件系統中真正存儲數據的地方。
    • 客戶端(client)或者元數據信息(namenode)可以向數據節點請求寫入或者讀出數據塊。
    • 其周期性的向元數據節點回報其存儲的數據塊信息。
  • 從元數據節點(secondary namenode)
    • 從元數據節點并不是元數據節點出現問題時候的備用節點,它和元數據節點負責不同的事情。
    • 其主要功能就是周期性將元數據節點的命名空間鏡像文件和修改日志合并,以防日志文件過大。這點在下面會相信敘述。
    • 合并過后的命名空間鏡像文件也在從元數據節點保存了一份,以防元數據節點失敗的時候,可以恢復。

1.2.1、元數據節點文件夾結構

image

  • VERSION文件是java properties文件,保存了HDFS的版本號。
    • layoutVersion是一個負整數,保存了HDFS的持續化在硬盤上的數據結構的格式版本號。
    • namespaceID是文件系統的唯一標識符,是在文件系統初次格式化時生成的。
    • cTime此處為0
    • storageType表示此文件夾中保存的是元數據節點的數據結構。

namespaceID=1232737062

cTime=0

storageType=NAME_NODE

layoutVersion=-18

1.2.2、文件系統命名空間映像文件及修改日志

  • 當文件系統客戶端(client)進行寫操作時,首先把它記錄在修改日志中(edit log)
  • 元數據節點在內存中保存了文件系統的元數據信息。在記錄了修改日志后,元數據節點則修改內存中的數據結構。
  • 每次的寫操作成功之前,修改日志都會同步(sync)到文件系統。
  • fsimage文件,也即命名空間映像文件,是內存中的元數據在硬盤上的checkpoint,它是一種序列化的格式,并不能夠在硬盤上直接修改。
  • 同數據的機制相似,當元數據節點失敗時,則最新checkpoint的元數據信息從fsimage加載到內存中,然后逐一重新執行修改日志中的操作。
  • 從元數據節點就是用來幫助元數據節點將內存中的元數據信息checkpoint到硬盤上的
  • checkpoint的過程如下:
    • 從元數據節點通知元數據節點生成新的日志文件,以后的日志都寫到新的日志文件中。
    • 從元數據節點用http get從元數據節點獲得fsimage文件及舊的日志文件。
    • 從元數據節點將fsimage文件加載到內存中,并執行日志文件中的操作,然后生成新的fsimage文件。
    • 從元數據節點獎新的fsimage文件用http post傳回元數據節點
    • 元數據節點可以將舊的fsimage文件及舊的日志文件,換為新的fsimage文件和新的日志文件(第一步生成的),然后更新fstime文件,寫入此次checkpoint的時間。
    • 這樣元數據節點中的fsimage文件保存了最新的checkpoint的元數據信息,日志文件也重新開始,不會變的很大了。

hadoop01

1.2.3、從元數據節點的目錄結構

image

1.2.4、數據節點的目錄結構

image

  • 數據節點的VERSION文件格式如下:

namespaceID=1232737062

storageID=DS-1640411682-127.0.1.1-50010-1254997319480

cTime=0

storageType=DATA_NODE

layoutVersion=-18

  • blk_保存的是HDFS的數據塊,其中保存了具體的二進制數據。
  • blk_.meta保存的是數據塊的屬性信息:版本信息,類型信息,和checksum
  • 當一個目錄中的數據塊到達一定數量的時候,則創建子文件夾來保存數據塊及數據塊屬性信息。

二、數據流(data flow)

2.1、讀文件的過程

  • 客戶端(client)用FileSystem的open()函數打開文件
  • DistributedFileSystem用RPC調用元數據節點,得到文件的數據塊信息。
  • 對于每一個數據塊,元數據節點返回保存數據塊的數據節點的地址。
  • DistributedFileSystem返回FSDataInputStream給客戶端,用來讀取數據。
  • 客戶端調用stream的read()函數開始讀取數據。
  • DFSInputStream連接保存此文件第一個數據塊的最近的數據節點。
  • Data從數據節點讀到客戶端(client)
  • 當此數據塊讀取完畢時,DFSInputStream關閉和此數據節點的連接,然后連接此文件下一個數據塊的最近的數據節點。
  • 當客戶端讀取完畢數據的時候,調用FSDataInputStream的close函數。
  • 在讀取數據的過程中,如果客戶端在與數據節點通信出現錯誤,則嘗試連接包含此數據塊的下一個數據節點。
  • 失敗的數據節點將被記錄,以后不再連接。

image

2.2、寫文件的過程

  • 客戶端調用create()來創建文件
  • DistributedFileSystem用RPC調用元數據節點,在文件系統的命名空間中創建一個新的文件。
  • 元數據節點首先確定文件原來不存在,并且客戶端有創建文件的權限,然后創建新文件。
  • DistributedFileSystem返回DFSOutputStream,客戶端用于寫數據。
  • 客戶端開始寫入數據,DFSOutputStream將數據分成塊,寫入data queue。
  • Data queue由Data Streamer讀取,并通知元數據節點分配數據節點,用來存儲數據塊(每塊默認復制3塊)。分配的數據節點放在一個pipeline里。
  • Data Streamer將數據塊寫入pipeline中的第一個數據節點。第一個數據節點將數據塊發送給第二個數據節點。第二個數據節點將數據發送給第三個數據節點。
  • DFSOutputStream為發出去的數據塊保存了ack queue,等待pipeline中的數據節點告知數據已經寫入成功。
  • 如果數據節點在寫入的過程中失敗:
    • 關閉pipeline,將ack queue中的數據塊放入data queue的開始。
    • 當前的數據塊在已經寫入的數據節點中被元數據節點賦予新的標示,則錯誤節點重啟后能夠察覺其數據塊是過時的,會被刪除。
    • 失敗的數據節點從pipeline中移除,另外的數據塊則寫入pipeline中的另外兩個數據節點。
    • 元數據節點則被通知此數據塊是復制塊數不足,將來會再創建第三份備份。
  • 當客戶端結束寫入數據,則調用stream的close函數。此操作將所有的數據塊寫入pipeline中的數據節點,并等待ack queue返回成功。最后通知元數據節點寫入完畢。

image

                                                                                                                                                                                    

如果你想深入理解hadoop平臺的話我覺得研究其源碼是少不了的,Hadoop源碼網址。再結合這本書Hadoop源代碼分析(完整版)去看。

有時間的話我也會仔細去研究,不過現在還是先補基礎,基礎好了看這些東西要來的些。這里就簡單概述一下Hadoop平臺的東西吧。

 本文由用戶 jopen 自行上傳分享,僅供網友學習交流。所有權歸原作者,若您的權利被侵害,請聯系管理員。
 轉載本站原創文章,請注明出處,并保留原始鏈接、圖片水印。
 本站是一個以用戶分享為主的開源技術平臺,歡迎各類分享!