Play! Akka Flume實現的完整數據收集
-
前言
現如今,大數據如火如荼。針對用戶行為,用戶喜好等后續大數據分析也是十分火熱。這個小項目實現了后臺數據收集的一系列完整流程。
-
項目總體流程以及用到的技術
Play ! 作為web服務器,使用RESTful 規范編寫接口(客戶端事先埋點,然后調用接口上傳數據)
Play !接口接受到的記錄(json形式)經過處理后,先保存到 concurrentQueue中
Play! 啟動后,start一個Akka schedulable actor.他每隔一段時間,讓子actor去poll queue中的數據
調用flume的封裝的rpc,將數據發送到指定的端口。
Flume source端接收數據,按照配置重定向數據,sink到console.
3. 后臺實現
3.1 編寫接口
采用RESTful編寫接口,首先在play! 的conf中routes定義接口:
#run log POST /events/runlogs controllers.RunLogs.create()
然后編寫controller
public static Result create(){ JsonNode js = request().body().asJson(); RunLog.create(js); //return ok anyway return ok(); }
然后是model
public static void create(JsonNode js) { if (js.has(LOG)) { String logBody = js.findPath(LOG).asText(); //add one log into queue QueueManager.INSTANCE.addRunLog(logBody); } }
可以看到,這些代碼遵循MVC規范,首先讓play!知道接口的定義,前端發送過來請求的時候,知道調用哪個controller中的哪個方法,并返回數據。而controller將數據從請求體中剝離出來,并發送給真正處理數據的model.
3.2 Queue
看到model中,接收到數據后,添加到queue中保存。
定義為:ConcurrentLinkedQueue<String>
3.3 Akka 定時調度
Akka負責定時從queue取出數據,然后通過rpc發送給flume。
akka的初始化,啟動是伴隨著play! 的啟動而進行的,每個play!只有一個akka system。所以首先要編寫一個Global extends GlobalSetting(GlobalSettings is instantiated by the framework when an application starts,to let you perform specific tasks at start-up or shut-down),然后override onStart方法,在此方法中初始化akka的調度器。
代碼如下:
ActorRef dispatcher = Akka.system().actorOf(new Props(Dispatcher.class)); Akka.system().scheduler().schedule( Duration.create(200,TimeUnit.MICROSECONDS), Duration.create(2,TimeUnit.SECONDS), dispatcher, "run", Akka.system().dispatcher(), null );
可以看到,每個兩秒鐘,scheduler就會調用dispatcher,讓他工作。
dispatcher在這里相當于一個master,他接到工作后,會通知自己的slave去工作(發送數據給rpc)。
代碼如下:
ActorRef workRouter = getContext().actorOf( new Props(WorkerActor.class).withRouter(new RoundRobinRouter(40)) , "transferRouter"); @Override public void onReceive(Object message) throws Exception { ConcurrentLinkedQueue<String> runLogs = QueueManager.INSTANCE.getRunLogs(); dispatch(runLogs); } private void dispatch(Queue<?> queue) { if (!queue.isEmpty()) { Object obj = queue.poll(); List<String> data = new ArrayList<>(); while (obj != null) { data.add(obj.toString()); obj = queue.poll(); } workRouter.tell(new DispatchMsg("runlogs", data), getSelf()); } }
首先定義了一個router,他負責按照輪訓算法,找到到底要讓哪個slave去工作。
當dispatcher收到消息后,就讓router通知WorkerActor去工作,并把從queue取出的數據給他,讓他將這些數據通過rpc發送給遠端的flume。
這樣設計的目的在于:
接口接受消息,暫時保存在queue中,快速回復客戶端,不堵塞。
利用akka并發能力,從queue中取出消息,找到一個worker去進行耗時較長的rpc工作。
workeractor
@Override public void onReceive(Object message) throws Exception { if (message instanceof DispatchMsg) { DispatchMsg msg = (DispatchMsg) message; String business = msg.business; List<String> datas = (List<String>) msg.data; sendMsg.ToFlume.sendDataToFlume(datas); } }
3.4 rpc發送
flume集成了rpc
public void sendDataToFlume(List<String> datas) { List<Event> es = new LinkedList<Event>(); for (String data : datas){ // Create a Flume Event object that encapsulates the sample data Event event = EventBuilder.withBody(data, Charset.forName("UTF-8")); es.add(event); } // Send the event try { client.appendBatch(es); System.out.println("data sent"); } catch (EventDeliveryException e) { // clean up and recreate the client client.close(); client = null; client = RpcClientFactory.getDefaultInstance(hostname, port); } }
這里接受的數據為list,批處理。
首先新建client,這里注意hostname跟port,是flume服務器端source的ip跟端口。
然后批量發送數據。
4. Flume
flume的配置內容如下:exemple.conf
a1.channels = c1 a1.sources = r1 a1.sinks = k1 a1.channels.c1.type = memory a1.sources.r1.channels = c1 a1.sources.r1.type = avro a1.sources.r1.bind = 0.0.0.0 a1.sources.r1.port = 41414 a1.sinks.k1.channel = c1 a1.sinks.k1.type = logger
此處flume端為簡單的單點配置,source接收41414的rpc消息,然后保存到channel中,sink到console中(數據收集一般sink到HDFS中,并且可以多點收集)。
啟動命令如下:
bin/flume-ng agent --conf conf --conf-file example.conf --name a1 -Dflume.root.logger=INFO,console
好了,啟動flume后,啟動play!然后利用客戶端發送消息,就可以在flume端看到消息打印到console了。
項目的所有代碼在 https://github.com/YulinGUO/collectEvents
如果有問題,請留言。
來自:http://my.oschina.net/yulinguo/blog/372191