輕松使用Hadoop RPC

openkk 12年前發布 | 118K 次閱讀 Hadoop 分布式/云計算/大數據

Hadoop RPC是Hadoop的一個重要部分,提供分布式環境下的對象調用功能,源碼在org.apache.hadoop.ipc中。而HBase也幾乎完全copy了這部分的源碼,只是在配置項上面有所改動。

關于Hadoop RPC的機制分析和源碼解讀,網上已經有許多資料,一搜一大把,這里就不在描述了。本文通過一個小例子,介紹如何調用Hadoop RPC。

1. 應用場景

Hadoop RPC在整個Hadoop中應用非常廣泛,Client、DataNode、NameNode之間的通訊全靠它了。

舉個例子,我們平時操作HDFS的時候,使用的是FileSystem類,它的內部有個DFSClient對象,這個對象負責與NameNode打交道。在運行時,DFSClient在本地創建一個NameNode的代理,然后就操作這個代理,這個代理就會通過網絡,遠程調用到NameNode的方法,也能返回值。

在我的應用場景中,需要一個元數據服務器,各節點經常需要去查詢元數據,可以使用這套RPC機制。

2. Protocol

被遠程訪問的類,也就是Server端,必須實現VersionedProtocol接口,這個接口只有一個方法getProtocolVersion,用來判斷Server和Client端調用的是不是一個版本的,一般Server的代碼修改一次,版本號就得改一次。

在例子中,我們定義一個接口MyProtocol,繼承VersionedProtocol,里面定義Server端需要實現的方法。

這里MyProtocol接口只有一個方法println,輸入一個Text,打印出來,并返回一個Text。

MyProtocol.java代碼如下:

import org.apache.hadoop.io.Text;
 import org.apache.hadoop.ipc.VersionedProtocol;

public interface MyProtocol extends VersionedProtocol { public Text println(Text t); }</pre>

3. Server

Server端實現上述的Protocol接口,里面需要啟動一個RPC.Server,它是一個Thread。

構造方法是RPC.getServer(Object instance, String bindAddress, int port, Configuration conf)

  • instance:表示提供遠程訪問的對象,一般Server都會傳入this作為參數;
  • bindAddress:Server綁定的ip地址;
  • port:Server綁定的端口;
  • conf:Configuration對象,不用解釋了吧。

MyServer實現了MyProtocol接口中定義的println方法,將參數打印到控制臺,并返回finish。

MyServer.java代碼如下:

import java.io.IOException;
 import java.net.UnknownHostException;

import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.Text; import org.apache.hadoop.ipc.RPC; import org.apache.hadoop.ipc.RPC.Server;

public class MyServer implements MyProtocol{ private Server server;

 public MyServer(){
     try {
         server = RPC.getServer(this, "localhost", 8888, new Configuration());
         server.start();
         server.join();
     } catch (UnknownHostException e) {
         e.printStackTrace();
     } catch (IOException e) {
         e.printStackTrace();
     } catch (InterruptedException e) {
         e.printStackTrace();
     }
 }

 @Override
 public Text println(Text t){
     System.out.println(t);
     return new Text("finish");
 }

 @Override
 public long getProtocolVersion(String protocol, long clientVersion) throws IOException {
     return 1;
 }

 public static void main(String[] args) {
     new MyServer();
 }

}</pre>

4. Client

Client端需要創建一個Server的遠程代理,并可以通過操作這個代理,來調用到Server端的方法。

創建代理可以調用RPC.waitForProxy(Class protocol, long clientVersion, InetSocketAddress addr, Configuration conf)

  • protocol:一個Protocol的class,它必須是繼承VersionedProtocol的接口;
  • clientVersion:客戶端的版本號,如果與服務端不一致,則會拋錯;
  • addr:一個InetSocketAddress對象,包含了ip和port;
  • conf:不解釋。

這個方法會返回一個VersionedProtocol類型的代理對象,將它強制轉型成自己定義的Protocol,接下來就可以操作創建好的代理了。在例子中,我們通過代理來讓Server端打印字符串到控制臺,并接受返回的消息。

MyClient.java代碼如下:

import java.io.IOException;
 import java.net.InetSocketAddress;

import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.Text; import org.apache.hadoop.ipc.RPC;

public class MyClient {

 private MyProtocol proxy;

 public MyClient(){
     InetSocketAddress addr = new InetSocketAddress("localhost",8888);
     try {
         proxy = (MyProtocol) RPC.waitForProxy(MyProtocol.class, 1, addr , new Configuration());
     } catch (IOException e) {
         e.printStackTrace();
     }
 }

 public void println(String s){
     System.out.println(proxy.println(new Text(s)));
 }

 public void close(){
     RPC.stopProxy(proxy);
 }

 public static void main(String[] args) {
     MyClient c = new MyClient();
     c.println("123");
     c.close();
 }

}</pre>

5. 運行

運行MyServer,控制臺顯示:

2011-12-30 18:49:56 -[INFO] Initializing RPC Metrics with hostName=MyServer, port=8888
2011-12-30 18:49:56 -[INFO] IPC Server listener on 8888: starting
2011-12-30 18:49:56 -[INFO] IPC Server Responder: starting
2011-12-30 18:49:56 -[INFO] IPC Server handler 0 on 8888: starting

運行MyClient,控制臺顯示:

finish

MyServer端會追加顯示:

123

轉自: http://www.cnblogs.com/hiddenfox/archive/2011/12/30/2305786.html

 本文由用戶 openkk 自行上傳分享,僅供網友學習交流。所有權歸原作者,若您的權利被侵害,請聯系管理員。
 轉載本站原創文章,請注明出處,并保留原始鏈接、圖片水印。
 本站是一個以用戶分享為主的開源技術平臺,歡迎各類分享!