Python爬蟲開發(三-續):快速線程池爬蟲

CarolynEUEZ 8年前發布 | 20K 次閱讀 線程池 Python Python開發

0×00 簡介

0×01 功能定義

0×02 總體流程

0×03 線程池任務迭代

0×04 具體實現

0×05 測試使用

0×06 結語

0×00 簡介

本文算是填前面的一個坑,有朋友和我將我前面寫了這么多,真正沒看到什么特別突出的實戰,給了應對各種情況的方案。多線程那里講的也是坑。忽然想想,說的也對,為讀者考慮我確實應該把多線程這里的坑補完。

然后決定再以一篇文章的形式講一下這個輕型線程池爬蟲,同時也為大家提供一個思路。代碼都是經過調試的,并且留了相對友好的用戶接口。可以很容易得添加各種各樣增強型的功能。

0×01 功能定義

1.  可選擇的單頁面爬蟲與多頁面線程池爬蟲

2.  可定制對HTML的處理

3.  可定制獲取HTML的方式(應對動態頁面)

4.  當設置為非單頁面爬蟲時,自動啟動對當前域名下所有的頁面進行深度優先爬取

5.  自定義線程數

0×02 總體流程

0×03 線程池任務迭代實現

雖然在上面圖中寫出了線程池的影子,但是我們還是需要單獨拿出來寫一下線程池的到底是怎么樣工作的,以方便讀者更好地理解源代碼的內容。

0×04 具體實現

到這里相信讀者知道用線程池來完成我們需要完成的爬蟲了吧。關于具體內容的實現,是接下來我們要講的。

1.    依賴:

我們需要用到這五個模塊,我相信大家都很熟悉,那么就不多介紹了,如果有朋友不熟悉的話可以翻到前面的文章重新復習一下。

threading
Queue
urlparse
requests
bs4

2.    類的聲明:

ScraperWorkerBase這個類是完全可以復寫的,只要和原有的接口保持一致,可以滿足用戶的各種各樣的需求,例如,定義頁面掃描函數需要復寫parse方法(當然這些我是在后面會有實例給大家展示)

那么我們還需要介紹一下其他的接口:

Execute方法中控制主邏輯,用最簡潔的語言和代碼表現邏輯,如果有需要自定義自己的邏輯控制方法,那么務必保持第一個返回值仍然是inpage_url

__get_html_data控制獲取html數據的方法,你可以自己定制headers,cookies,post_data

__get_soup這個方法是以bs4模塊來解析html文檔

__get_all_url與__get_url_inpage不建議大家修改,如果修改了的話可能會影響主爬蟲控制器的運行

然后我在這里做一張ScraperWorkerBase的流程圖大家可以參考一下

class ScraperWorkerBase(object):     """     No needs to learn how is work,     rewrite parse_page using self.soup(Beautiful), and return result,     you can get the result by using              (inpage_urls, your_own_result) urlscraper.execute()          But this class is default for scraper to use,     To enhance its function , you can completement this class     like:          class MyWorker(ScraperWorkerBase):              def parse_page(self):             all_tags = self.soup.find_all('img')             for i in all_tags:                 print i          """     def __init__(self, url = ''):                  self.target_url = url         self.netloc = urlparse.urlparse(self.target_url)[1]                           self.response = None         self.soup = None                  self.url_in_site = []         self.url_out_site = []              """override this method to get html data via any way you want or need"""         def __get_html_data(self):         try:             self.response = requests.get(self.target_url, timeout = 5)         except:             return ""                  print "[_] Got response"         return self.response.text     def __get_soup(self):         text = self.__get_html_data()         if text == '':             return []                  return bs4.BeautifulSoup(text)     def __get_all_url(self):         url_lists = []                  self.soup = self.__get_soup()         if isinstance(self.soup, type(None)):             return []                  all_tags = self.soup.findAll("a")         for a in all_tags:             try:                 #print a['href']                 url_lists.append(a["href"])             except:                 pass                      return url_lists          def get_urls_inpage(self):         ret_list = self.__get_all_url()                  if ret_list == []:             return ([],[])         else:             for url in ret_list:                 o = urlparse.urlparse(url)                 #                 #print url                 if self.netloc in o[1]:                     self.url_in_site.append(o.geturl())                 else:                     self.url_out_site.append(o.geturl())                              inurlset = set(self.url_in_site)                      outurlset = set(self.url_out_site)                      return inurlset, outurlset     def execute(self):         inpage_url = self.get_urls_inpage()         undefined_result = self.parse_page()         return inpage_url, undefined_result               """You can override this method to define your own needs"""     def parse_page(self):         pass                  

這個類定義了處理HTML頁面的基本方法,如果需要僅僅是獲取頁面所有的超鏈接的話,那么最基礎的Worker類已經替大家實現了,但是如果需要對某類網站特定元素進行處理,那么完全可以只復寫parse_page

例如:

如果要繞開網站的限制進行爬取數據,就需要復寫:

但是如果需要對特定url進行限制,最好不要去復寫__get_all_url方法,而應該去復寫get_urls_inpage方法

關于Scraper的類說明:

這個類顯然沒有前面的那么好理解,但是如果使用過HTMLParser或者是SGMLParser的讀者,肯定是記得那個feed方法的。這與我們要介紹的這個類有一些相似的地方。

在這個類中,我們建立Scraper對象的時候,需要傳入的參數直接決定了我們的線程池爬蟲的類型:究竟要不要啟動多線程,啟動多少個線程,使用哪個處理函數來除了web頁面?這些都是我們要考慮的問題。所以接下來我們對這些部分進行一些說明

這些設定很好理解,在__init__中輸入是否是單頁面爬蟲模式,設定線程數,設定爬蟲解析的具體類。然后對應初始化線程池:初始化的時候要生成多個_worker方法,循環工作,然后在_worker方法的工作時完成對傳入的實際進行解析的ScraperWorkerBase類進行調用,然后收集結果填入任務隊列。

通過feed的方法來添加目標url,可以輸入list,也可以直接輸入str對象。

當不想讓Scraper再工作的時候,調用kill_workers就可以停止所有的worker線程。

但是僅僅是明白這個只是可能僅僅會使用而已,既然是開發我們肯定是要清楚地講這個Scraper是怎么樣被組織起來的,他是怎么樣工作的。

首先第一個概念就是任務隊列:我們feed進的數據實際就是把任務添加到任務隊列中,然后任務分配的時候,每個爬蟲都要get到屬于自己的任務,然后各司其職的去做,互不干擾。

第二個類似的概念就是結果隊列:結果隊列毫無疑問就是用于存儲結果的,在外部獲取這個Scraper的結果隊列以后,需要去獲取結果隊列中的元素,由于隊列的性質,當結果被抽走的時候,被獲取的結果就會被刪除。

在大家明確了這兩個概念以后,這個Scraper的工作原理接回很容易被理解了:

當然這個圖我在做的時候是有點小偷懶的,本來應該做兩種類型的Scraper,因為實際在使用的過程中,Scraper在一開始要被指定為單頁還是多頁,但是為了避免大量的重復所以在作圖的時候我就在最后做了一個邏輯判斷來表明類型,來幫助大家理解這個解析過程。我相信一個visio流程圖比長篇大論的文字解釋要直觀的多對吧?

那么接下來我們看一下爬蟲的實體怎么寫:

class Scraper(object):
    def __init__(self, single_page = True,  workers_num = 8, worker_class = ScraperWorkerBase):
        self.count = 0
        self.workers_num = workers_num

        """get worker_class"""
        self.worker_class = worker_class

        """check if the workers should die"""
        self.all_dead = False

        """store the visited pages"""
        self.visited = set()

        """by ScraperWorkerBase 's extension result queue"""
        self.result_urls_queue = Queue.Queue()
        self.result_elements_queue = Queue.Queue()

        """
        if single_page == True,
        the task_queue should store the tasks (unhandled)
        """
        self.task_queue = Queue.Queue()

        self.single_page = single_page
        if self.single_page == False:
            self.__init_workers()
        else:
            self.__init_single_worker()

    def __check_single_page(self):
        if self.single_page == True:
            raise StandardError('[!] Single page won\'t allow you use many workers')

    """init worker(s)"""
    def __init_single_worker(self):
        ret = threading.Thread(target=self._single_worker)
        ret.start()
    def __init_workers(self):
        self.__check_single_page()

        for _ in range(self.workers_num):
            ret = threading.Thread(target=self._worker)
            ret.start()

    """return results"""    def get_result_urls_queue(self):

        return self.result_urls_queue
    def get_result_elements_queue(self):
        return self.result_elements_queue

    """woker function"""
    def _single_worker(self):
        if self.all_dead != False:
            self.all_dead = False
        scraper = None
        while not self.all_dead:
            try:

                url = self.task_queue.get(block=True)
                print 'Workding', url
                try:
                    if url[:url.index('#')] in self.visited:
                        continue
                except:
                    pass

                if url in self.visited:
                    continue
                else:
                    pass
                self.count = self.count+ 1
                print 'Having process', self.count , 'Pages'
                scraper = self.worker_class(url)
                self.visited.add(url)
                urlset, result_entity = scraper.execute()
                for i in urlset[0]:
                    #self.task_queue.put(i)
                    self.result_urls_queue.put(i)

                if result_entity != None:
                    pass
                else:
                    self.result_elements_queue.put(result_entity)

            except:
                pass            
            finally:
                pass        
    def _worker(self):
        if self.all_dead != False:
            self.all_dead = False
        scraper = None
        while not self.all_dead:
            try:

                url = self.task_queue.get(block=True)
                print 'Workding', url
                try:
                    if url[:url.index('#')] in self.visited:
                        continue
                except:
                    pass

                if url in self.visited:
                    continue
                else:
                    pass
                self.count = self.count + 1
                print 'Having process', self.count , 'Pages'
                scraper = self.worker_class(url)
                self.visited.add(url)
                urlset, result_entity = scraper.execute()
                for i in urlset[0]:
                    if i in self.visited:
                        continue
                    else:
                        pass
                    self.task_queue.put(i)
                    self.result_urls_queue.put(i)

                if result_entity != None:
                    pass
                else:
                    self.result_elements_queue.put(result_entity)

            except:
                pass            
            finally:
                pass

    """scraper interface"""
    def kill_workers(self):
        if self.all_dead == False:
            self.all_dead = True
        else:
            pass
    def feed(self, target_urls = []):
        if isinstance(target_urls, list):
            for target_url in target_urls:
                self.task_queue.put(target_url)
        elif isinstance(target_urls, str):
            self.task_queue.put(target_urls)
        else:
            pass


        #return url result
        return (self.get_result_urls_queue(), self.get_result_elements_queue() )

這些設定很好理解,在__init__中輸入是否是單頁面爬蟲模式,設定線程數,設定爬蟲解析的具體類。然后對應初始化線程池:初始化的時候要生成多個_worker方法,循環工作,然后在_worker方法的工作時完成對傳入的實際進行解析的ScraperWorkerBase類進行調用,然后收集結果填入任務隊列。

通過feed的方法來添加目標url,可以輸入list,也可以直接輸入str對象。

當不想讓Scraper再工作的時候,調用kill_workers就可以停止所有的worker線程。

0×04 使用實例

下面是幾個相對完整的使用實例:

單頁面爬蟲使用實例

#encoding:utf-8
from scraper import *
import Queue
import time
import sys
import bs4
test_obj = Scraper(single_page=True, workers_num=15)
test_obj.feed(['http://freebuf.com'])
time.sleep(5)
z = test_obj.get_result_urls_queue()
while True:
    try :
        print z.get(timeout=4)
    except:
        pass

線程池爬蟲實例:

尋找一個網站下所有的url

#encoding:utf-8
from scraper import *
import Queue
import time
import sys
import bs4
test_obj = Scraper(single_page=False, workers_num=15)
test_obj.feed(['http://freebuf.com'])
time.sleep(5)
z = test_obj.get_result_urls_queue()
while True:
    try :
        print z.get(timeout=4)
    except:
        pass

我們發現和上面的單頁面爬蟲只是一個參數的區別。實際的效果還是不錯的。

下面是自定義爬取方案的應用

這樣的示例代碼基本把這個爬蟲的目的和接口完整的展示出來了,用戶可以在MyWorker中定義自己的處理函數。

0×05 測試使用

在實際的使用中,這個小型爬蟲的效果還是相當不錯的,靈活,簡單,可擴展性高。有興趣的朋友可以給它配置更多的功能型組件,比如數據庫,爬取特定關鍵元素,針對某一個頁面的數據處理。比如在實際的使用中,這個模塊作為我自己正在編寫的一個xss_fuzz工具的一個部分而存在。

下面給出一些測試數據供大家參考(在普通網絡狀況):

這個結果是在本機上測試的結果,在不同的電腦商測試結果均不同,8線程是比較小的線程數目,有興趣的朋友可以采用16線程或者是更多的線程測試,效果可能更加明顯,如果為了防止頁面卡死,可以在worker中設置超時時間,一旦有那個頁面一時間很難打開也能很快轉換到新的頁面,同樣也能提高效率。

0×06 結語

關于爬蟲的開發,我相信到現在,大家都已經沒有什么問題了,如果要問網站爬行時候什么的頁面權重怎么處理,簡單無非是在爬蟲過程中計算某個頁面被多少頁面所指(當然這個算法沒有這么簡單),并不是什么很高深的技術,如果有興趣的小伙伴仍然可以去深入學習,大家都知道搜索引擎的核心也是爬蟲技術。

項目Github主頁: https://github.com/VillanCh/simple_scraper

來自: http://www.freebuf.com/articles/system/100668.html

 本文由用戶 CarolynEUEZ 自行上傳分享,僅供網友學習交流。所有權歸原作者,若您的權利被侵害,請聯系管理員。
 轉載本站原創文章,請注明出處,并保留原始鏈接、圖片水印。
 本站是一個以用戶分享為主的開源技術平臺,歡迎各類分享!