輕松使用Hadoop RPC
Hadoop RPC是Hadoop的一個重要部分,提供分布式環境下的對象調用功能,源碼在org.apache.hadoop.ipc中。而HBase也幾乎完全copy了這部分的源碼,只是在配置項上面有所改動。
關于Hadoop RPC的機制分析和源碼解讀,網上已經有許多資料,一搜一大把,這里就不在描述了。本文通過一個小例子,介紹如何調用Hadoop RPC。
1. 應用場景
Hadoop RPC在整個Hadoop中應用非常廣泛,Client、DataNode、NameNode之間的通訊全靠它了。
舉個例子,我們平時操作HDFS的時候,使用的是FileSystem類,它的內部有個DFSClient對象,這個對象負責與NameNode打交道。在運行時,DFSClient在本地創建一個NameNode的代理,然后就操作這個代理,這個代理就會通過網絡,遠程調用到NameNode的方法,也能返回值。
在我的應用場景中,需要一個元數據服務器,各節點經常需要去查詢元數據,可以使用這套RPC機制。
2. Protocol
被遠程訪問的類,也就是Server端,必須實現VersionedProtocol接口,這個接口只有一個方法getProtocolVersion,用來判斷Server和Client端調用的是不是一個版本的,一般Server的代碼修改一次,版本號就得改一次。
在例子中,我們定義一個接口MyProtocol,繼承VersionedProtocol,里面定義Server端需要實現的方法。
這里MyProtocol接口只有一個方法println,輸入一個Text,打印出來,并返回一個Text。
MyProtocol.java代碼如下:
import org.apache.hadoop.io.Text; import org.apache.hadoop.ipc.VersionedProtocol;public interface MyProtocol extends VersionedProtocol { public Text println(Text t); }</pre>
3. Server
Server端實現上述的Protocol接口,里面需要啟動一個RPC.Server,它是一個Thread。
構造方法是RPC.getServer(Object instance, String bindAddress, int port, Configuration conf)
- instance:表示提供遠程訪問的對象,一般Server都會傳入this作為參數;
- bindAddress:Server綁定的ip地址;
- port:Server綁定的端口;
- conf:Configuration對象,不用解釋了吧。
MyServer實現了MyProtocol接口中定義的println方法,將參數打印到控制臺,并返回finish。
MyServer.java代碼如下:
import java.io.IOException; import java.net.UnknownHostException;import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.Text; import org.apache.hadoop.ipc.RPC; import org.apache.hadoop.ipc.RPC.Server;
public class MyServer implements MyProtocol{ private Server server;
public MyServer(){ try { server = RPC.getServer(this, "localhost", 8888, new Configuration()); server.start(); server.join(); } catch (UnknownHostException e) { e.printStackTrace(); } catch (IOException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } } @Override public Text println(Text t){ System.out.println(t); return new Text("finish"); } @Override public long getProtocolVersion(String protocol, long clientVersion) throws IOException { return 1; } public static void main(String[] args) { new MyServer(); }
}</pre>
4. Client
Client端需要創建一個Server的遠程代理,并可以通過操作這個代理,來調用到Server端的方法。
創建代理可以調用RPC.waitForProxy(Class protocol, long clientVersion, InetSocketAddress addr, Configuration conf)
- protocol:一個Protocol的class,它必須是繼承VersionedProtocol的接口;
- clientVersion:客戶端的版本號,如果與服務端不一致,則會拋錯;
- addr:一個InetSocketAddress對象,包含了ip和port;
- conf:不解釋。
這個方法會返回一個VersionedProtocol類型的代理對象,將它強制轉型成自己定義的Protocol,接下來就可以操作創建好的代理了。在例子中,我們通過代理來讓Server端打印字符串到控制臺,并接受返回的消息。
MyClient.java代碼如下:
import java.io.IOException; import java.net.InetSocketAddress;import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.Text; import org.apache.hadoop.ipc.RPC;
public class MyClient {
private MyProtocol proxy; public MyClient(){ InetSocketAddress addr = new InetSocketAddress("localhost",8888); try { proxy = (MyProtocol) RPC.waitForProxy(MyProtocol.class, 1, addr , new Configuration()); } catch (IOException e) { e.printStackTrace(); } } public void println(String s){ System.out.println(proxy.println(new Text(s))); } public void close(){ RPC.stopProxy(proxy); } public static void main(String[] args) { MyClient c = new MyClient(); c.println("123"); c.close(); }
}</pre>
5. 運行
運行MyServer,控制臺顯示:
2011-12-30 18:49:56 -[INFO] Initializing RPC Metrics with hostName=MyServer, port=8888
2011-12-30 18:49:56 -[INFO] IPC Server listener on 8888: starting
2011-12-30 18:49:56 -[INFO] IPC Server Responder: starting
2011-12-30 18:49:56 -[INFO] IPC Server handler 0 on 8888: starting運行MyClient,控制臺顯示:
finish
MyServer端會追加顯示:
轉自: http://www.cnblogs.com/hiddenfox/archive/2011/12/30/2305786.html123