Avro簡介
一、引言
1、 簡介
Avro是Hadoop中的一個子項目,也是Apache中一個獨立的項目,Avro是一個基于二進制數據傳輸高性能的中間件。在Hadoop的其他項目中例如HBase(Ref)和Hive(Ref)的Client端與服務端的數據傳輸也采用了這個工具。Avro是一個數據序列化的系統。Avro 可以將數據結構或對象轉化成便于存儲或傳輸的格式。Avro設計之初就用來支持數據密集型應用,適合于遠程或本地大規模數據的存儲和交換。
2、 特點
? 豐富的數據結構類型;
? 快速可壓縮的二進制數據形式,對數據二進制序列化后可以節約數據存儲空間和網絡傳輸帶寬;
? 存儲持久數據的文件容器;
? 可以實現遠程過程調用RPC;
? 簡單的動態語言結合功能。
avro支持跨編程語言實現(C, C++, C#,Java, Python, Ruby, PHP),類似于Thrift,但是avro的顯著特征是:avro依賴于模式,動態加載相關數據的模式,Avro數據的讀寫操作很頻繁,而這些操作使用的都是模式,這樣就減少寫入每個數據文件的開銷,使得序列化快速而又輕巧。這種數據及其模式的自我描述方便了動態腳本語言的使用。當Avro數據存儲到文件中時,它的模式也隨之存儲,這樣任何程序都可以對文件進行處理。如果讀取數據時使用的模式與寫入數據時使用的模式不同,也很容易解決,因為讀取和寫入的模式都是已知的。
New schema |
Writer |
Reader |
Action |
Added field |
Old |
New |
The reader uses the default value of the new field, since it is not written by the writer. |
|
New |
Old |
The reader does not know about the new field written by the writer, so it is ignored (projection). |
Removed field |
Old |
New |
The reader ignores the removed field (projection). |
|
New |
Old |
The removed field is not written by the writer. If the old schema had a default defined for the field, the reader uses this; otherwise, it gets an error. In this case, it is best to update the reader’s schema, either at the same time as or before the writer’s. |
Avro和動態語言結合后,讀/寫數據文件和使用RPC協議都不需要生成代碼,而代碼生成作為一種可選的優化只需要在靜態類型語言中實現。
Avro依賴于模式(Schema)。通過模式定義各種數據結構,只有確定了模式才能對數據進行解釋,所以在數據的序列化和反序列化之前,必須先確定模式的結構。正是模式的引入,使得數據具有了自描述的功能,同時能夠實現動態加載,另外與其他的數據序列化系統如Thrift相比,數據之間不存在其他的任何標識,有利于提高數據處理的效率。
二、技術要領
1、 類型
數據類型標準化的意義:一方面使不同系統對相同的數據能夠正確解析,另一方面,數據類型的標準定義有利于數據序列化/反序列化。
簡單的數據類型:Avro定義了幾種簡單數據類型,下表是其簡單說明:
類型 |
說明 |
null |
no value |
boolean |
a binary value |
int |
32-bit signed integer |
long |
64-bit signed integer |
float |
single precision (32-bit) IEEE 754 floating-point number |
double |
double precision (64-bit) IEEE 754 floating-point number |
bytes |
sequence of 8-bit unsigned bytes |
string |
unicode character sequence |
簡單數據類型由類型名稱定義,不包含屬性信息,例如字符串定義如下:
{"type": "string"}
復雜數據類型:Avro定義了六種復雜數據類型,每一種復雜數據類型都具有獨特的屬性,下表就每一種復雜數據類型進行說明。
類型 |
屬性 |
說明 |
Records |
type name |
record |
name |
a JSON string providing the name of the record (required). |
|
namespace |
a JSON string that qualifies the name(optional). |
|
doc |
a JSON string providing documentation to the user of this schema (optional). |
|
aliases |
a JSON array of strings, providing alternate names for this record (optional). |
|
fields |
a JSON array, listing fields (required). |
|
name |
a JSON string. |
|
type |
a schema/a string of defined record. |
|
default |
a default value for field when lack. |
|
order |
ordering of this field. |
|
Enums |
type name |
enum |
|
name |
a JSON string providing the name of the enum (required). |
namespace |
a JSON string that qualifies the name. |
|
doc |
a JSON string providing documentation to the user of this schema (optional). |
|
aliases |
a JSON array of strings, providing alternate names for this enum (optional) |
|
symbols |
a JSON array, listing symbols, as JSON strings (required). All symbols in an enum must be unique. |
|
Arrays |
type name |
array |
|
items |
the schema of the array’s items. |
Maps |
type name |
map |
|
values |
the schema of the map’s values. |
Fixed |
type name |
fixed |
|
name |
a string naming this fixed (required). |
|
namespace |
a string that qualifies the name. |
|
aliases |
a JSON array of strings, providing alternate names for this enum (optional). |
|
size |
an integer, specifying the number of bytes per value (required). |
Unions |
|
a JSON arrays |
每一種復雜數據類型都含有各自的一些屬性,其中部分屬性是必需的,部分是可選的。
這里需要說明Record類型中field屬性的默認值,當Record Schema實例數據中某個field屬性沒有提供實例數據時,則由默認值提供,具體值見下表。Union的field默認值由Union定義中的第一個Schema決定。
avro type |
json type |
example |
null |
null |
null |
boolean |
boolean |
true |
int,long |
integer |
1 |
float,double |
number |
1.1 |
bytes |
string |
"\u00FF" |
string |
string |
"foo" |
record |
object |
{"a": 1} |
enum |
string |
"FOO" |
array |
array |
[1] |
map |
object |
{"a": 1} |
fixed |
string |
"\u00ff" |
2、 序列化/反序列化
Avro指定兩種數據序列化編碼方式:binary encoding 和Json encoding。使用二進制編碼會高效序列化,并且序列化后得到的結果會比較小;而JSON一般用于調試系統或是基于WEB的應用。
binary encoding規則如下:
1、 簡單數據類型
Type |
Encoding |
Example |
null |
Zero bytes |
Null |
boolean |
A single byte |
{true:1, false:0} |
int/long |
variable-length zig-zag coding |
|
float |
4 bytes |
Java's floatToIntBits |
double |
8 bytes |
Java's doubleToLongBits |
bytes |
a long followed by that many bytes of data |
|
string |
a long followed by that many bytes of UTF-8 encoded character data |
“foo”:{3,f,o,o} 06 66 6f 6f |
2、 復雜數據類型
Type |
encoding |
Records |
encoded just the concatenation of the encodings of its fields |
Enums |
a int representing the zero-based position of the symbol in the schema |
Arrays |
encoded as series of blocks. A block with count 0 indicates the end of the array. block:{long,items} |
Maps |
encoded as series of blocks. A block with count 0 indicates the end of the map. block:{long,key/value pairs}. |
Unions |
encoded by first writing a long value indicating the zero-based position within the union of the schema of its value. The value is then encoded per the indicated schema within the union. |
fixed |
encoded using number of bytes declared in the schema |
實例:
? records
{
"type":"record",
"name":"test",
"fields" : [
{"name": "a","type": "long"},
{"name": "b","type": "string"}
]
}
假設:a=27b=”foo” (encoding:36(27), 06(3), 66("f"), 6f("o"))
binary encoding:3606 66 6f 6f
? enums
{"type": "enum","name": "Foo", "symbols": ["A","B", "C", "D"] }
“D”(encoding: 06(3))
binary encoding: 06
? arrays
{"type": "array","items": "long"}
設:{3, 27 } (encoding:04(2), 06(3), 36(27) )
binary encoding:0406 36 00
? maps
設:{("a":1), ("b":2) } (encoding:61(“a”), 62(“b”), 02(1), 04(2))
binary encoding:0261 02 02 62 04
? unions
["string","null"]
設:(1)null; (2) “a”
binary encoding:
(1) 02;說明:02代表null在union定義中的位置1;
(2) 00 02 61;說明:00為string在union定義的位置,02 61為”a”的編碼。
圖1表示的是Avro本地序列化和反序列化的實例,它將用戶定義的模式和具體的數據編碼成二進制序列存儲在對象容器文件中,例如用戶定義了包含學號、姓名、院系和電話的學生模式,而Avro對其進行編碼后存儲在student.db文件中,其中存儲數據的模式放在文件頭的元數據中,這樣讀取的模式即使與寫入的模式不同,也可以迅速地讀出數據。假如另一個程序需要獲取學生的姓名和電話,只需要定義包含姓名和電話的學生模式,然后用此模式去讀取容器文件中的數據即可。
圖表 1
3、 模式Schema
Schema通過JSON對象表示。Schema定義了簡單數據類型和復雜數據類型,其中復雜數據類型包含不同屬性。通過各種數據類型用戶可以自定義豐富的數據結構。
Schema由下列JSON對象之一定義:
1. JSON字符串:命名
2. JSON對象:{“type”: “typeName” …attributes…}
3. JSON數組:Avro中Union的定義
舉例:
{"namespace": "example.avro",
"type":"record",
"name":"User",
"fields": [
{"name":"name", "type": "string"},
{"name":"favorite_number", "type": ["int", "null"]},
{"name":"favorite_color", "type": ["string","null"]}
]
}
4、 排序
Avro為數據定義了一個標準的排列順序。比較在很多時候是經常被使用到的對象之間的操作,標準定義可以進行方便有效的比較和排序。同時標準的定義可以方便對Avro的二進制編碼數據直接進行排序而不需要反序列化。
只有當數據項包含相同的Schema的時候,數據之間的比較才有意義。數據的比較按照Schema深度優先,從左至右的順序遞歸的進行。找到第一個不匹配即可終止比較。
兩個擁有相同的模式的項的比較按照以下規則進行:
null:總是相等。
int,long,float:按照數值大小比較。
boolean:false在true之前。
string:按照字典序進行比較。
bytes,fixed:按照byte的字典序進行比較。
array:按照元素的字典序進行比較。
enum:按照符號在枚舉中的位置比較。
record:按照域的字典序排序,如果指定了以下屬性:
“ascending”,域值的順序不變。
“descending”,域值的順序顛倒。
“ignore”,排序的時候忽略域值。
map:不可進行比較。
5、 對象容器文件
Avro定義了一個簡單的對象容器文件格式。一個文件對應一個模式,所有存儲在文件中的對象都是根據模式寫入的。對象按照塊進行存儲,塊可以采用壓縮的方式存儲。為了在進行mapreduce處理的時候有效的切分文件,在塊之間采用了同步記號。一個文件可以包含任意用戶定義的元數據。
一個文件由兩部分組成:文件頭和一個或者多個文件數據塊。
文件頭:
? 四個字節,ASCII‘O’,‘b’,‘j’,1。
? 文件元數據,用于描述Schema。
? 16字節的文件同步記號。
? 其中,文件元數據的格式為:
i. 值為-1的長整型,表明這是一個元數據塊。
ii. 標識塊長度的長整型。
iii. 標識塊中key/value對數目的長整型。
iv. 每一個key/value對的string key和bytesvalue。
v. 標識塊中字節總數的4字節長的整數。
文件數據塊:
數據是以塊結構進行組織的,一個文件可以包含一個或者多個文件數據塊。
? 表示文件中塊中對象數目的長整型。
? 表示塊中數據序列化后的字節數長度的長整型。
? 序列化的對象。
? 16字節的文件同步記號。
當數據塊的長度為0時即為文件數據塊的最后一個數據,此后的所有數據被自動忽略。
下圖示對象容器文件的結構分解及說明:
一個存儲文件由兩部分組成:頭信息(Header)和數據塊(Data Block)。而頭信息又由三部分構成:四個字節的前綴,文件Meta-data信息和隨機生成的16字節同步標記符。Avro目前支持的Meta-data有兩種:schema和codec。
codec表示對后面的文件數據塊(File Data Block)采用何種壓縮方式。Avro的實現都需要支持下面兩種壓縮方式:null(不壓縮)和deflate(使用Deflate算法壓縮數據塊)。除了文檔中認定的兩種Meta-data,用戶還可以自定義適用于自己的Meta-data。這里用long型來表示有多少個Meta-data數據對,也是讓用戶在實際應用中可以定義足夠的Meta-data信息。對于每對Meta-data信息,都有一個string型的key(需要以“avro.” 為前綴)和二進制編碼后的value。對于文件中頭信息之后的每個數據塊,有這樣的結構:一個long值記錄當前塊有多少個對象,一個long值用于記錄當前塊經過壓縮后的字節數,真正的序列化對象和16字節長度的同步標記符。由于對象可以組織成不同的塊,使用時就可以不經過反序列化而對某個數據塊進行操作。還可以由數據塊數,對象數和同步標記符來定位損壞的塊以確保數據完整性。
三、RPC實現
當在RPC中使用Avro時,服務器和客戶端可以在握手連接時交換模式。服務器和客戶端有彼此全部的模式,因此相同命名字段、缺失字段和多余字段等信息之間通信中需要處理的一致性問題就可以容易解決。如圖2所示,協議中定義了用于傳輸的消息,消息使用框架后放入緩沖區中進行傳輸,由于傳輸的初始就交換了各自的協議定義,因此即使傳輸雙方使用的協議不同所傳輸的數據也能夠正確解析。
圖表 2
Avro作為RPC框架來使用。客戶端希望同服務器端交互時,就需要交換雙方通信的協議,它類似于模式,需要雙方來定義,在Avro中被稱為消息(Message)。通信雙方都必須保持這種協議,以便于解析從對方發送過來的數據,這也就是傳說中的握手階段。
消息從客戶端發送到服務器端需要經過傳輸層(Transport Layer),它發送消息并接收服務器端的響應。到達傳輸層的數據就是二進制數據。通常以HTTP作為傳輸模型,數據以POST方式發送到對方去。在 Avro中,它的消息被封裝成為一組緩沖區(Buffer),類似于下圖的模型:
如上圖,每個緩沖區以四個字節開頭,中間是多個字節的緩沖數據,最后以一個空緩沖區結尾。這種機制的好處在于,發送端在發送數據時可以很方便地組裝不同數據源的數據,接收方也可以將數據存入不同的存儲區。還有,當往緩沖區中寫數據時,大對象可以獨占一個緩沖區,而不是與其它小對象混合存放,便于接收方方便地讀取大對象。
對象容器文件是Avro的數據存儲的具體實現,數據交換則由RPC服務提供,與對象容器文件類似,數據交換也完全依賴Schema,所以與Hadoop目前的RPC不同,Avro在數據交換之前需要通過握手過程先交換Schema。
1、 握手過程
握手的過程是確保Server和Client獲得對方的Schema定義,從而使Server能夠正確反序列化請求信息,Client能夠正確反序列化響應信息。一般的,Server/Client會緩存最近使用到的一些協議格式,所以,大多數情況下,握手過程不需要交換整個Schema文本。
所有的RPC請求和響應處理都建立在已經完成握手的基礎上。對于無狀態的連接,所有的請求響應之前都附有一次握手過程;對于有狀態的連接,一次握手完成,整個連接的生命期內都有效。
具體過程:
Client發起HandshakeRequest,其中含有Client本身SchemaHash值和對應Server端的Schema Hash值(clientHash!=null,clientProtocol=null, serverHash!=null)。如果本地緩存有serverHash值則直接填充,如果沒有則通過猜測填充。
Server用如下之一HandshakeResponse響應Client請求:
(match=BOTH, serverProtocol=null,serverHash=null):當Client發送正確的serverHash值且Server緩存相應的clientHash。握手過程完成,之后的數據交換都遵守本次握手結果。
(match=CLIENT, serverProtocol!=null,serverHash!=null):當Server緩存有Client的Schema,但是Client請求中ServerHash值不正確。此時Server發送Server端的Schema數據和相應的Hash值,此次握手完成,之后的數據交換都遵守本次握手結果。
(match=NONE):當Client發送的ServerHash不正確且Server端沒有Client Schema的緩存。這種情況下Client需要重新提交請求信息 (clientHash!=null,clientProtocol!=null, serverHash!=null),Server響應 (match=BOTH, serverProtocol=null,serverHash=null),此次握手過程完成,之后的數據交換都遵守本次握手結果。
握手過程使用的Schema結構如下示。
{
"type":"record",
"name":"HandshakeRequest","namespace":"org.apache.avro.ipc",
"fields":[
{"name":"clientHash", "type": {"type": "fixed","name": "MD5", "size": 16}},
{"name":"clientProtocol", "type": ["null","string"]},
{"name":"serverHash", "type": "MD5"},
{"name":"meta", "type": ["null", {"type":"map", "values": "bytes"}]}
]
}
{
"type":"record",
"name":"HandshakeResponse", "namespace":"org.apache.avro.ipc",
"fields":[
{"name":"match","type": {"type": "enum","name": "HandshakeMatch",
"symbols":["BOTH", "CLIENT", "NONE"]}},
{"name":"serverProtocol", "type": ["null","string"]},
{"name":"serverHash","type": ["null", {"type":"fixed", "name": "MD5", "size": 16}]},
{"name":"meta","type": ["null", {"type":"map", "values": "bytes"}]}
]
}
2、 消息幀格式
消息從客戶端發送到服務器端需要經過傳輸層,它發送請求并接收服務器端的響應。到達傳輸層的數據就是二進制數據。通常以HTTP作為傳輸模型,數據以POST方式發送到對方去。在 Avro中消息首先分幀后被封裝成為一組緩沖區(Buffer)。
數據幀的格式如下:
? 一系列Buffer:
1、4字節的Buffer長度
2、Buffer字節數據
? 長度為0的Buffer結束數據幀
3、 Call格式
一個調用由請求消息、結果響應消息或者錯誤消息組成。請求和響應包含可擴展的元數據,兩種消息都按照之前提出的方法分幀。
調用的請求格式為:
? 請求元數據,一個類型值的映射。
? 消息名,一個Avro字符串。
? 消息參數。參數根據消息的請求定義序列化。
調用的響應格式為:
? 響應的元數據,一個類型值的映射。
? 一字節的錯誤標志位。
? 如果錯誤標志為false,響應消息,根據響應的模式序列化。
如果錯誤標志位true,錯誤消息,根據消息的錯誤聯合模式序列化。
四、實例
1、 本地序列化/反序列化
user.avsc
{"namespace":"example.avro",
"type": "record",
"name": "User",
"fields": [
{"name": "name", "type":"string"},
{"name": "favorite_number", "type": ["int", "null"]},
{"name": "favorite_color", "type":["string", "null"]}
]
}
Main.java
public class Main {
public static void main(String[] args)throws Exception {
User user1 = new User();
user1.setName("Alyssa");
user1.setFavoriteNumber(256);
// Leave favorite color null
// Alternate constructor
User user2 = new User("Ben", 7,"red");
// Construct via builder
User user3 = User.newBuilder()
.setName("Charlie")
.setFavoriteColor("blue")
.setFavoriteNumber(null)
.build();
// Serialize user1 and user2to disk
File file = new File("users.avro");
DatumWriter<User> userDatumWriter = new SpecificDatumWriter<User>(User.class);
DataFileWriter<User> dataFileWriter = newDataFileWriter<User>(userDatumWriter);
dataFileWriter.create(user1.getSchema(),new File("users.avro"));
dataFileWriter.append(user1);
dataFileWriter.append(user2);
dataFileWriter.append(user3);
dataFileWriter.close();
// Deserialize Usersfrom disk
DatumReader<User> userDatumReader = newSpecificDatumReader<User>(User.class);
DataFileReader<User> dataFileReader = newDataFileReader<User>(file, userDatumReader);
User user = null;
while (dataFileReader.hasNext()) {
// Reuse user object bypassing it to next(). This saves us from
// allocating and garbagecollecting many objects for files with
// many items.
user = dataFileReader.next(user);
System.out.println(user);
}
}
}
2、 RPC
mail.avsc
{"namespace":"example.proto",
"protocol": "Mail",
"types": [
{"name": "Message", "type":"record",
"fields": [
{"name": "to", "type": "string"},
{"name": "from", "type": "string"},
{"name": "body", "type":"string"}
]
}
],
"messages": {
"send": {
"request": [{"name": "message","type": "Message"}],
"response": "string"
}
}
}
Main.java
public class Main {
public static class MailImpl implements Mail {
// in this simple example just return details of the message
public Utf8 send(Message message) {
System.out.println("Sending message");
return new Utf8("Sending message to " + message.getTo().toString()
+ " from " +message.getFrom().toString()
+ " with body " +message.getBody().toString());
}
}
private static Server server;
private static void startServer() throws IOException {
server = new NettyServer(new SpecificResponder(Mail.class,new MailImpl()),newInetSocketAddress(65111));
// the server implements the Mail protocol (MailImpl)
}
public static void main(String[] args)throws IOException {
System.out.println("Starting server");
// usually this would be anotherapp, but for simplicity
startServer();
System.out.println("Server started");
NettyTransceiver client = new NettyTransceiver(new InetSocketAddress(65111));
// client code - attach to the server and send a message
Mail proxy = (Mail) SpecificRequestor.getClient(Mail.class, client);
System.out.println("Client built, got proxy");
// fill in the Message record and send it
Message message = new Message();
message.setTo(new Utf8("127.0.0.1"));
message.setFrom(new Utf8("127.0.0.1"));
message.setBody(new Utf8("this is my message"));
System.out.println("Calling proxy.send with message: " + message.toString());
System.out.println("Result: " +proxy.send(message));
// cleanup
client.close();
server.close();
}
}