JLiteSpider:輕量級的分布式 Java 爬蟲框架
JLiteSpider
A lite distributed Java spider framework.
這是一個輕量級的分布式java爬蟲框架
特點
這是一個強大,但又輕量級的分布式爬蟲框架。jlitespider天生具有分布式的特點,各個worker之間需要通過一個或者多個消息隊列來連接。消息隊列我的選擇是 rabbitmq 。worker和消息之間可以是一對一,一對多,多對一或多對多的關系,這些都可以自由而又簡單地配置。消息隊列中存儲的消息分為四種:url,頁面源碼,解析后的結果以及自定義的消息。同樣的,worker的工作也分為四部分:下載頁面,解析頁面,數據持久化和自定義的操作。
用戶只需要在配置文件中,規定好worker和消息隊列之間的關系。接著在代碼中,定義好worker的四部分工作。即可完成爬蟲的編寫。
總體的使用流程如下:
- 啟動rabbitmq。
- 在配置文件中定義worker和消息隊列之間的關系。
- 在代碼中編寫worker的工作。
- 最后,啟動爬蟲。
安裝
使用maven:
<dependency>
<groupId>com.github.luohaha</groupId>
<artifactId>jlitespider</artifactId>
<version>0.4.1</version>
</dependency>
直接下載jar包:
Worker和消息隊列之間關系
worker和消息隊列之間的關系可以是一對一,多對一,一對多,多對多,都是可以配置的。在配置文件中,寫上要監聽的消息隊列和要發送的消息隊列。例如:
{
"workerid" : 2,
"mq" : [{
"name" : "one",
"host" : "localhost",
"port" : 5672,
"qos" : 3 ,
"queue" : "url"
},
{
"name" : "two",
"host" : "localhost",
"port" : 5672,
"qos" : 3 ,
"queue" : "hello"
}],
"sendto" : ["two"],
"recvfrom" : ["one", "two"]
}
workerid : worker的id號
mq : 各個消息隊列所在的位置,和配置信息。 name 字段為這個消息隊列的唯一標識符,供消息隊列的獲取使用。 host 為消息隊列所在的主機ip, port 為消息隊列的監聽端口號(rabbitmq中默認為5672)。 qos 為消息隊列每次將消息發給worker時的消息個數。 queue 為消息隊列的名字。 host + port + queue 可以理解為是消息隊列的唯一地址。
sendto : 要發送到的消息隊列,填入的信息為 mq 中的 name 字段中的標識符。
recvfrom : 要監聽的消息隊列,消息隊列會把消息分發到這個worker中。填入的信息同樣為 mq 中的 name 字段中的標識符。
消息的設計
在消息隊列中,消息一共有四種類型。分別是url,page,result和自定義類型。在worker的程序中,可以通過messagequeue的四種方法(sendUrl, sendPage, sendResult, send)來插入消息。worker的downloader會處理url消息,processor會處理page消息,saver會處理result消息,freeman會處理所有的自定義的消息。我們所要做的工作,就是實現好worker中的這四個函數。
Worker接口的設計
JLiteSpider將整個的爬蟲抓取流程抽象成四個部分,由四個接口來定義。分別是downloader,processor,saver和freeman。它們分別處理上述提到的四種消息。
你所需要做的是,實現這個接口,并將想要抓取的url鏈表返回。具體的實現細節,可以由你高度定制。
1. Downloader:
這部分實現的是頁面下載的任務,將想要抓取的url鏈表,轉化(下載后存儲)為相應的頁面數據鏈表。
接口設計如下:
public interface Downloader {
/**
* 下載url所指定的頁面。
* @param url
* 收到的由消息隊列傳過來的消息
* @param mQueue
* 提供把消息發送到各個消息隊列的方法
* @throws IOException
*/
public void download(Object url, Map<String, MessageQueue> mQueue) throws IOException;
}</code></pre>
你同樣可以實現這個接口,具體的實現可由你自由定制,只要實現 download 函數。 url 是消息隊列推送過來的消息,里面不一定是一條 url ,具體是什么內容,是由你當初傳入消息隊列時決定的。 mQueue 提供了消息發送到各個消息隊列的方法,通過 mQueue.get("...") 選取消息隊列,然后執行messagequeue的四種方法(sendUrl, sendPage, sendResult, send)來插入消息。
2. Processor:
Processor 是解析器的接口,這里會從網頁的原始文件中提取出有用的信息。
接口設計:
public interface Processor{
/**
* 處理下載下來的頁面源代碼
* @param page
* 消息隊列推送過來的頁面源代碼數據消息
* @param mQueue
* 提供把消息發送到各個消息隊列的方法
* @throws IOException
*/
public void process(Object page, Map<String, MessageQueue> mQueue) throws IOException;
}</code></pre>
實現這個接口,完成對頁面源碼的解析處理。 page 是由消息隊列推送過來的消息,具體格式同樣是由你在傳入時決定好的。 mQueue 使用同上。
3. Saver:
Saver 實現的是對解析得到結果的處理,可以將你解析后得到的數據存入數據庫,文件等等。或者將url重新存入消息隊列,實現迭代抓取。
接口的設計:
public interface Saver {
/**
* 處理最終解析得到的結果
* @param result
* 消息隊列推送過來的結果消息
* @param mQueue
* 提供把消息發送到各個消息隊列的方法
* @throws IOException
*/
public void save(Object result, Map<String, MessageQueue> mQueue) throws IOException;
}</code></pre>
通過實現這個接口,可以完成對結果的處理。你同樣可以實現這個接口,具體的實現可由你自由定制,只要實現 download 函數。 result 是消息隊列推送過來的結果消息,具體的格式是由你當初傳入消息隊列時決定的。 mQueue 的使用同上。
4. Freeman:
通過上述的三個流程,可以實現爬蟲抓取的一個正常流程。但是 jlitespider 同樣提供了自定義的功能,你可以完善,加強,改進甚至顛覆上述的抓取流程。 freeman 就是一個處理自定義消息格式的接口,實現它就可以定義自己的格式,以至于定義自己的流程。
接口的設計:
public interface Freeman {
/**
* 自定義的處理函數
* @param key
* key為自定義的消息標記
* @param msg
* 消息隊列推送的消息
* @param mQueue
* 提供把消息發送到各個消息隊列的方法
* @throws IOException
*/
public void doSomeThing(String key, Object msg, Map<String, MessageQueue> mQueue) throws IOException;
}</code></pre>
通過實現 doSomeThing 函數,你就可以處理來自消息隊列的自定義消息。 key 為消息的標記, msg 為消息的內容。同樣,通過 mQueue 的 send 方法,可以實現向消息隊列發送自定義消息的操作。(需要注意,自定義的消息標記不能為: url , page , result 。否則會被認為是 jlitespider 的保留消息,也就是由上述的三個接口函數來處理。)
總結說明
jlitespider 的設計可能會讓您有些疑惑,不過等您熟悉這一整套的設計之后,您就會發現 jlitespider 是多么的靈活和易于使用。
使用方法
JLiteSpider使用:
//worker的啟動
Spider.create() //創建實例
.setDownloader(...) //設置實現了Downloader接口的下載器
.setProcessor(...) //設置實現了Processor接口的解析器
.setSaver(...) //設置實現了Saver接口的數據持久化方法
.setFreeman(...) //設置自定義消息的處理函數
.setSettingFile(...) //設置配置文件
.begin(); //開始爬蟲
//消息隊列中初始消息添加器的使用
MessageQueueAdder.create("localhost", 5672, "url")
.addUrl(...) //向消息隊列添加url類型的消息
.addPage(...) //向消息隊列添加page類型的消息
.addResult(...) //向消息隊列添加result類型的消息
.add(..., ...) //向消息隊列添加自定義類型的消息
.close() //關閉連接,一定要記得在最后調用!</code></pre>
以豆瓣電影的頁面為例子,假設我們要抓取豆瓣電影的愛情分類中的所有電影名稱,并存入txt文件中:
-
首先,需要設計消息隊列和worker之間的關系。我的設計是有兩個worker和兩個消息隊列,其中一個worker在main消息隊列上,負責下載,解析并把最終結果傳入data消息隊列。第二個worker從data消息隊列中取數據,并存入txt文件中。兩個worker的配置文件如下:
第一個worker:
{
"workerid" : 1,
"mq" : [{
"name" : "main",
"host" : "localhost",
"port" : 5672,
"qos" : 3 ,
"queue" : "main"
}, {
"name" : "data",
"host" : "localhost",
"port" : 5672,
"qos" : 3 ,
"queue" : "data"
}],
"sendto" : ["main", "data"],
"recvfrom" : ["main"]
}
第二個worker:
{
"workerid" : 2,
"mq" : [{
"name" : "main",
"host" : "localhost",
"port" : 5672,
"qos" : 3 ,
"queue" : "main"
}, {
"name" : "data",
"host" : "localhost",
"port" : 5672,
"qos" : 3 ,
"queue" : "data"
}],
"sendto" : [],
"recvfrom" : ["data"]
}
-
接著,編寫第一個worker的代碼,如下:
//下載頁面數據,并存入main隊列。
public class DoubanDownloader implements Downloader {
private Logger logger = Logger.getLogger("DoubanDownloader");
@Override
public void download(Object url, Map<String, MessageQueue> mQueue) throws IOException {
// TODO Auto-generated method stub
String result = "";
try {
result = Network.create()
.setUserAgent("...")
.setCookie("...")
.downloader(url.toString());
} catch (IOException e) {
logger.info("本次下載失敗!重新下載!");
//因為下載失敗,所以將url重新放入main隊列中
mQueue.get("main").sendUrl(url);
}
//下載成功,將頁面數據放入main消息隊列
mQueue.get("main").sendPage(result);
}
}</code></pre>
//解析頁面數據,將結果放入main消息隊列。同時,后面頁面的url信息同樣需要放入隊列,以便迭代抓取。
public class DoubanProcessor implements Processor {
//url去重復
private Set<String> urlset = new HashSet<>();
@Override
public void process(Object page, Map<String, MessageQueue> mQueue) throws IOException {
// TODO Auto-generated method stub
String path = "http://[@id=content]/div/div[1]/div[2]/table/tbody/tr/td[1]/a/@title";
List<String> result = Xsoup.compile(path).evaluate(Jsoup.parse(page.toString())).list();
//將結果放入main消息隊列
mQueue.get("main").sendResult(result);
path = "http://[@id=content]/div/div[1]/div[3]/a/@href";
List<String> url = Xsoup.compile(path).evaluate(Jsoup.parse(page.toString())).list();
for (String each : url) {
if (!urlset.contains(each)) {
//如果url之前并未抓取過,則加入main隊列,作為接下來要抓取的url
mQueue.get("main").sendUrl(each);
urlset.add(each);
}
}
}
}</code></pre>
//把最終的數據放入data消息隊列
public class DoubanSaver implements Saver {
@Override
public void save(Object result, Map<String, MessageQueue> mQueue) throws IOException {
// TODO Auto-generated method stub
List<String> rList = (List<String>) result;
for (String each : rList) {
//把數據發往data消息隊列
mQueue.get("data").send("cc", each);
}
}
}</code></pre>
//啟動worker的主程序
public class DoubanSpider {
public static void main(String[] args) {
try {
Spider.create().setDownloader(new DoubanDownloader())
.setProcessor(new DoubanProcessor())
.setSaver(new DoubanSaver())
.setSettingFile("./conf/setting.json")
.begin();
} catch (ShutdownSignalException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} catch (ConsumerCancelledException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} catch (TimeoutException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} catch (SpiderSettingFileException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
-
接下來,還要寫第二個worker的代碼。
//接收data消息隊列中的數據,寫入txt
public class SaveToFile implements Freeman {
@Override
public void doSomeThing(String key, Object msg, Map<String, MessageQueue> mQueue) throws IOException {
// TODO Auto-generated method stub
File file = new File("./output/name.txt");
FileWriter fileWriter = new FileWriter(file, true);
fileWriter.write(msg.toString() + "\n");
fileWriter.flush();
fileWriter.close();
}
}
//第二個worker的啟動主程序
public class SaveToFileSpider {
public static void main(String[] args) {
try {
Spider.create().setFreeman(new SaveToFile())
.setSettingFile("./conf/setting2.json")
.begin();
} catch (ShutdownSignalException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} catch (ConsumerCancelledException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} catch (TimeoutException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} catch (SpiderSettingFileException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
-
還要編寫一個main消息隊列的初始化程序,把第一個入口url放入main消息隊列中。
//把入口url放入main消息隊列
public class AddUrls {
public static void main(String[] args) {
try {
MessageQueueAdder.create("localhost", 5672, "main")
.addUrl("https://movie.douban.com/tag/%E7%88%B1%E6%83%85?start=0&type=T")
.close();
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} catch (TimeoutException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
-
最后,依次啟動程序。啟動的順序是:rabbitmq -> worker1/2 -> 初始化消息程序。關于rabbitmq的使用,它的官方網站上有詳細的安裝和使用文檔,可用于快速搭建rabbitmq的server。
輔助工具
當前版本的 jlitespider 能提供的輔助工具并不多,您在使用 jlitespider 的過程中,可以將您實現的輔助工具合并到 jlitespider 中來,一起來完善 jlitespider 的功能。輔助工具在包 com.github.luohaha.jlitespider.extension 中。
-
Network
簡單的網絡下載器,輸入url,返回頁面源代碼。使用如下:
String result = Network.create()
.setCookie("...")
.setProxy("...")
.setTimeout(...)
.setUserAgent("...")
.downloader(url);
-
解析工具
項目中依賴了兩個很常用的解析工具:xsoup 和 jsoup 。