JLiteSpider:輕量級的分布式 Java 爬蟲框架

JLiteSpider

A lite distributed Java spider framework.

這是一個輕量級的分布式java爬蟲框架

特點

這是一個強大,但又輕量級的分布式爬蟲框架。jlitespider天生具有分布式的特點,各個worker之間需要通過一個或者多個消息隊列來連接。消息隊列我的選擇是 rabbitmq 。worker和消息之間可以是一對一,一對多,多對一或多對多的關系,這些都可以自由而又簡單地配置。消息隊列中存儲的消息分為四種:url,頁面源碼,解析后的結果以及自定義的消息。同樣的,worker的工作也分為四部分:下載頁面,解析頁面,數據持久化和自定義的操作。

用戶只需要在配置文件中,規定好worker和消息隊列之間的關系。接著在代碼中,定義好worker的四部分工作。即可完成爬蟲的編寫。

總體的使用流程如下:

  • 啟動rabbitmq。
  • 在配置文件中定義worker和消息隊列之間的關系。
  • 在代碼中編寫worker的工作。
  • 最后,啟動爬蟲。

安裝

使用maven:

<dependency>
  <groupId>com.github.luohaha</groupId>
  <artifactId>jlitespider</artifactId>
  <version>0.4.1</version>
</dependency>

直接下載jar包:

Worker和消息隊列之間關系

worker和消息隊列之間的關系可以是一對一,多對一,一對多,多對多,都是可以配置的。在配置文件中,寫上要監聽的消息隊列和要發送的消息隊列。例如:

{
    "workerid" : 2,
    "mq" : [{
        "name" : "one",
        "host" : "localhost",
        "port" : 5672,
        "qos" : 3  ,
        "queue" : "url"
    },
    {
        "name" : "two",
        "host" : "localhost",
        "port" : 5672,
        "qos" : 3  ,
        "queue" : "hello"
    }],
    "sendto" : ["two"],
    "recvfrom" : ["one", "two"]
}

workerid : worker的id號

mq : 各個消息隊列所在的位置,和配置信息。 name 字段為這個消息隊列的唯一標識符,供消息隊列的獲取使用。 host 為消息隊列所在的主機ip, port 為消息隊列的監聽端口號(rabbitmq中默認為5672)。 qos 為消息隊列每次將消息發給worker時的消息個數。 queue 為消息隊列的名字。 host + port + queue 可以理解為是消息隊列的唯一地址。

sendto : 要發送到的消息隊列,填入的信息為 mq 中的 name 字段中的標識符。

recvfrom : 要監聽的消息隊列,消息隊列會把消息分發到這個worker中。填入的信息同樣為 mq 中的 name 字段中的標識符。

消息的設計

在消息隊列中,消息一共有四種類型。分別是url,page,result和自定義類型。在worker的程序中,可以通過messagequeue的四種方法(sendUrl, sendPage, sendResult, send)來插入消息。worker的downloader會處理url消息,processor會處理page消息,saver會處理result消息,freeman會處理所有的自定義的消息。我們所要做的工作,就是實現好worker中的這四個函數。

Worker接口的設計

JLiteSpider將整個的爬蟲抓取流程抽象成四個部分,由四個接口來定義。分別是downloader,processor,saver和freeman。它們分別處理上述提到的四種消息。

你所需要做的是,實現這個接口,并將想要抓取的url鏈表返回。具體的實現細節,可以由你高度定制。

1. Downloader:

這部分實現的是頁面下載的任務,將想要抓取的url鏈表,轉化(下載后存儲)為相應的頁面數據鏈表。

接口設計如下:

public interface Downloader {
    /**

 * 下載url所指定的頁面。
 * @param url 
 * 收到的由消息隊列傳過來的消息
 * @param mQueue 
 * 提供把消息發送到各個消息隊列的方法
 * @throws IOException
 */
public void download(Object url, Map<String, MessageQueue> mQueue) throws IOException;

}</code></pre>

你同樣可以實現這個接口,具體的實現可由你自由定制,只要實現 download 函數。 url 是消息隊列推送過來的消息,里面不一定是一條 url ,具體是什么內容,是由你當初傳入消息隊列時決定的。 mQueue 提供了消息發送到各個消息隊列的方法,通過 mQueue.get("...") 選取消息隊列,然后執行messagequeue的四種方法(sendUrl, sendPage, sendResult, send)來插入消息。

2. Processor:

Processor 是解析器的接口,這里會從網頁的原始文件中提取出有用的信息。

接口設計:

public interface Processor{
    /**

 * 處理下載下來的頁面源代碼
 * @param page
 * 消息隊列推送過來的頁面源代碼數據消息
 * @param mQueue
 * 提供把消息發送到各個消息隊列的方法
 * @throws IOException
 */
public void process(Object page, Map<String, MessageQueue> mQueue) throws IOException;

}</code></pre>

實現這個接口,完成對頁面源碼的解析處理。 page 是由消息隊列推送過來的消息,具體格式同樣是由你在傳入時決定好的。 mQueue 使用同上。

3. Saver:

Saver 實現的是對解析得到結果的處理,可以將你解析后得到的數據存入數據庫,文件等等。或者將url重新存入消息隊列,實現迭代抓取。

接口的設計:

public interface Saver {
    /**

 * 處理最終解析得到的結果
 * @param result 
 * 消息隊列推送過來的結果消息
 * @param mQueue 
 * 提供把消息發送到各個消息隊列的方法
 * @throws IOException
 */
public void save(Object result, Map<String, MessageQueue> mQueue) throws IOException;

}</code></pre>

通過實現這個接口,可以完成對結果的處理。你同樣可以實現這個接口,具體的實現可由你自由定制,只要實現 download 函數。 result 是消息隊列推送過來的結果消息,具體的格式是由你當初傳入消息隊列時決定的。 mQueue 的使用同上。

4. Freeman:

通過上述的三個流程,可以實現爬蟲抓取的一個正常流程。但是 jlitespider 同樣提供了自定義的功能,你可以完善,加強,改進甚至顛覆上述的抓取流程。 freeman 就是一個處理自定義消息格式的接口,實現它就可以定義自己的格式,以至于定義自己的流程。

接口的設計:

public interface Freeman {
    /**

 * 自定義的處理函數
 * @param key
 * key為自定義的消息標記
 * @param msg
 * 消息隊列推送的消息
 * @param mQueue
 * 提供把消息發送到各個消息隊列的方法
 * @throws IOException
 */
public void doSomeThing(String key, Object msg, Map<String, MessageQueue> mQueue) throws IOException;

}</code></pre>

通過實現 doSomeThing 函數,你就可以處理來自消息隊列的自定義消息。 key 為消息的標記, msg 為消息的內容。同樣,通過 mQueue 的 send 方法,可以實現向消息隊列發送自定義消息的操作。(需要注意,自定義的消息標記不能為: url , page , result 。否則會被認為是 jlitespider 的保留消息,也就是由上述的三個接口函數來處理。)

總結說明

jlitespider 的設計可能會讓您有些疑惑,不過等您熟悉這一整套的設計之后,您就會發現 jlitespider 是多么的靈活和易于使用。

使用方法

JLiteSpider使用:

//worker的啟動
Spider.create() //創建實例
      .setDownloader(...) //設置實現了Downloader接口的下載器
      .setProcessor(...) //設置實現了Processor接口的解析器
      .setSaver(...) //設置實現了Saver接口的數據持久化方法
      .setFreeman(...) //設置自定義消息的處理函數
      .setSettingFile(...) //設置配置文件
      .begin(); //開始爬蟲

//消息隊列中初始消息添加器的使用 MessageQueueAdder.create("localhost", 5672, "url") .addUrl(...) //向消息隊列添加url類型的消息 .addPage(...) //向消息隊列添加page類型的消息 .addResult(...) //向消息隊列添加result類型的消息 .add(..., ...) //向消息隊列添加自定義類型的消息 .close() //關閉連接,一定要記得在最后調用!</code></pre>

以豆瓣電影的頁面為例子,假設我們要抓取豆瓣電影的愛情分類中的所有電影名稱,并存入txt文件中:

  • 首先,需要設計消息隊列和worker之間的關系。我的設計是有兩個worker和兩個消息隊列,其中一個worker在main消息隊列上,負責下載,解析并把最終結果傳入data消息隊列。第二個worker從data消息隊列中取數據,并存入txt文件中。兩個worker的配置文件如下:

第一個worker:

{
    "workerid" : 1,
    "mq" : [{
        "name" : "main",
        "host" : "localhost",
        "port" : 5672,
        "qos" : 3  ,
        "queue" : "main"
    }, {
        "name" : "data",
        "host" : "localhost",
        "port" : 5672,
        "qos" : 3  ,
        "queue" : "data"
    }],
    "sendto" : ["main", "data"],
    "recvfrom" : ["main"]
}

第二個worker:

{
    "workerid" : 2,
    "mq" : [{
        "name" : "main",
        "host" : "localhost",
        "port" : 5672,
        "qos" : 3  ,
        "queue" : "main"
    }, {
        "name" : "data",
        "host" : "localhost",
        "port" : 5672,
        "qos" : 3  ,
        "queue" : "data"
    }],
    "sendto" : [],
    "recvfrom" : ["data"]
}
  • 接著,編寫第一個worker的代碼,如下:

//下載頁面數據,并存入main隊列。
public class DoubanDownloader implements Downloader {
    private Logger logger = Logger.getLogger("DoubanDownloader");
    @Override
    public void download(Object url, Map<String, MessageQueue> mQueue) throws IOException {
        // TODO Auto-generated method stub
        String result = "";
        try {
            result = Network.create()
                            .setUserAgent("...")
                            .setCookie("...")
                            .downloader(url.toString());
        } catch (IOException e) {
            logger.info("本次下載失敗!重新下載!");
            //因為下載失敗,所以將url重新放入main隊列中
            mQueue.get("main").sendUrl(url);
        }
        //下載成功,將頁面數據放入main消息隊列
        mQueue.get("main").sendPage(result);
    }

}</code></pre>

//解析頁面數據,將結果放入main消息隊列。同時,后面頁面的url信息同樣需要放入隊列,以便迭代抓取。
public class DoubanProcessor implements Processor {
//url去重復
    private Set<String> urlset = new HashSet<>();
    @Override
    public void process(Object page, Map<String, MessageQueue> mQueue) throws IOException {
        // TODO Auto-generated method stub
        String path = "http://[@id=content]/div/div[1]/div[2]/table/tbody/tr/td[1]/a/@title";
        List<String> result = Xsoup.compile(path).evaluate(Jsoup.parse(page.toString())).list();
        //將結果放入main消息隊列
        mQueue.get("main").sendResult(result);
        path = "http://[@id=content]/div/div[1]/div[3]/a/@href";
        List<String> url = Xsoup.compile(path).evaluate(Jsoup.parse(page.toString())).list();
        for (String each : url) {
            if (!urlset.contains(each)) {
            //如果url之前并未抓取過,則加入main隊列,作為接下來要抓取的url
                mQueue.get("main").sendUrl(each);
                urlset.add(each);
            }
        }
    }

}</code></pre>

//把最終的數據放入data消息隊列
public class DoubanSaver implements Saver {

@Override
public void save(Object result, Map<String, MessageQueue> mQueue) throws IOException {
    // TODO Auto-generated method stub
    List<String> rList = (List<String>) result;
    for (String each : rList) {
    //把數據發往data消息隊列
        mQueue.get("data").send("cc", each);
    }
}

}</code></pre>

//啟動worker的主程序
public class DoubanSpider {
    public static void main(String[] args) {
        try {
            Spider.create().setDownloader(new DoubanDownloader())
                           .setProcessor(new DoubanProcessor())
                           .setSaver(new DoubanSaver())
                           .setSettingFile("./conf/setting.json")
                           .begin();
        } catch (ShutdownSignalException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        } catch (ConsumerCancelledException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        } catch (IOException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        } catch (TimeoutException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        } catch (InterruptedException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        } catch (SpiderSettingFileException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
    }
}
  • 接下來,還要寫第二個worker的代碼。

//接收data消息隊列中的數據,寫入txt
public class SaveToFile implements Freeman {
    @Override
    public void doSomeThing(String key, Object msg, Map<String, MessageQueue> mQueue) throws IOException {
        // TODO Auto-generated method stub
        File file = new File("./output/name.txt");
        FileWriter fileWriter = new FileWriter(file, true);
        fileWriter.write(msg.toString() + "\n");
        fileWriter.flush();
        fileWriter.close();
    }
}
//第二個worker的啟動主程序
public class SaveToFileSpider {
    public static void main(String[] args) {
        try {
            Spider.create().setFreeman(new SaveToFile())
                           .setSettingFile("./conf/setting2.json")
                           .begin();
        } catch (ShutdownSignalException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        } catch (ConsumerCancelledException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        } catch (IOException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        } catch (TimeoutException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        } catch (InterruptedException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        } catch (SpiderSettingFileException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
    }
}
  • 還要編寫一個main消息隊列的初始化程序,把第一個入口url放入main消息隊列中。

//把入口url放入main消息隊列
public class AddUrls {
    public static void main(String[] args) {
        try {
            MessageQueueAdder.create("localhost", 5672, "main")
                             .addUrl("https://movie.douban.com/tag/%E7%88%B1%E6%83%85?start=0&type=T")
                             .close();
        } catch (IOException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        } catch (TimeoutException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
    }
}
  • 最后,依次啟動程序。啟動的順序是:rabbitmq -> worker1/2 -> 初始化消息程序。關于rabbitmq的使用,它的官方網站上有詳細的安裝和使用文檔,可用于快速搭建rabbitmq的server。

輔助工具

當前版本的 jlitespider 能提供的輔助工具并不多,您在使用 jlitespider 的過程中,可以將您實現的輔助工具合并到 jlitespider 中來,一起來完善 jlitespider 的功能。輔助工具在包 com.github.luohaha.jlitespider.extension 中。

  • Network

簡單的網絡下載器,輸入url,返回頁面源代碼。使用如下:

String result = Network.create()
                .setCookie("...")
                .setProxy("...")
                .setTimeout(...)
                .setUserAgent("...")
                .downloader(url);
  • 解析工具

項目中依賴了兩個很常用的解析工具:xsoup 和 jsoup 。

 

 

 本文由用戶 StephaineRF 自行上傳分享,僅供網友學習交流。所有權歸原作者,若您的權利被侵害,請聯系管理員。
 轉載本站原創文章,請注明出處,并保留原始鏈接、圖片水印。
 本站是一個以用戶分享為主的開源技術平臺,歡迎各類分享!