Python開發的 dht網絡爬蟲

jopen 10年前發布 | 90K 次閱讀 Python 網絡爬蟲

使用 libtorrent 的python綁定庫實現一個dht網絡爬蟲,抓取dht網絡中的磁力鏈接。

 

dht 網絡簡介

p2p網絡

在P2P網絡中,通過種子文件下載資源時,要知道資源在P2P網絡中哪些計算機中,這些傳輸資源的計算機稱作peer。在傳統的P2P網絡中,使用tracker服務器跟蹤資源的peer。要下載資源,首先需要取得這些peer。

 

dht網絡

tracker服務器面臨一些版權和法律問題。于是出現了DHT,它把tracker上的資源peer信息分散到了整個網絡中。dht網絡是由分布 式節點構成,節點(node)是實現了DHT協議的p2p客戶端。P2P客戶端程序既是peer也是node。DHT網絡有多種算法,常用的有 Kademlia。

 

dht網絡下載

P2P客戶端使用種子文件下載資源時,如果沒有tracker服務器,它就向DHT網絡查詢資源的peer列表, 然后從peer下載資源。

 

Magnet是磁力鏈接

資源的標識在DHT網絡中稱為infohash,是一個通過sha1算法得到的20字節長的字符串。infohash是使用種子文件的文件描述信息 計算得到。磁力鏈接是把infohash編碼成16進制字符串得到。P2P客戶端使用磁力鏈接,下載資源的種子文件,然后根據種子文件下載資源。

 

Kademlia 算法

Kademlia是DHT網絡的一種實現, 具體的算法參見:DHT協議

 

KRPC 協議

KRPC 是節點之間的交互協議,使用UDP來傳送。

包括4種請求:ping,find_node,get_peer,announce_peer。其中get_peer和announce_peer是節點間查詢資源的主要消息。

 

dht 爬蟲原理

主要的思路就是偽裝為p2p客戶端,加入dht網絡,收集dht網絡中的get_peer和announce_peer消息,這些消息是其他node發送給偽裝的p2p客戶端的udp消息。

 

本文dht爬蟲的實現

爬蟲運行環境

  1. linux 系統

    </li>

  2. python 2.7

    </li>

  3. libtorrent 庫的python綁定

    </li>

  4. twisted 網絡庫

    </li>

  5. 防火墻開啟固定的udp和tcp端口

    </li> </ol>

     

    libtorrent 庫的介紹

    libtorrent庫是p2p下載的客戶端庫,有豐富的接口,可以用來開發下載p2p網絡上的資源。它有python的綁定庫,本爬蟲就是使用它的python庫開發的。

    在libtorrent中有幾個概念需要解釋下。 session 相當于p2p客戶端,session開啟一個tcp和一個udp端口,用來與其他p2p客戶端交換數據。可以在一個進程內定義多個session,也就是多個p2p客戶端,來加快收集速度。

    alert是libtorrent中用來收集各種消息的隊列,每個session都有一個自己的alert消息隊列。KRPC協議的get_peer和announce_peer消息也是從這個隊列中獲取,就是用這兩個消息收集磁力鏈接的。

     

    主要實現代碼

    爬蟲實現的主要代碼比較簡單

    # 事件通知處理函數
        def _handle_alerts(self, session, alerts):
            while len(alerts):
                alert = alerts.pop()

            # 獲取dht_announce_alert和dht_get_peer_alert消息
            # 從這兩消息收集磁力鏈接
            if isinstance(alert, lt.add_torrent_alert):
                alert.handle.set_upload_limit(self._torrent_upload_limit)
                alert.handle.set_download_limit(self._torrent_download_limit)
            elif isinstance(alert, lt.dht_announce_alert):
                info_hash = alert.info_hash.to_string().encode('hex')
                if info_hash in self._meta_list:
                    self._meta_list[info_hash] += 1
                else:
                    self._meta_list[info_hash] = 1
                    self._current_meta_count += 1
            elif isinstance(alert, lt.dht_get_peers_alert):
                info_hash = alert.info_hash.to_string().encode('hex')
                if info_hash in self._meta_list:
                    self._meta_list[info_hash] += 1
                else:
                    self._infohash_queue_from_getpeers.append(info_hash)
                    self._meta_list[info_hash] = 1
                    self._current_meta_count += 1
    
    def start_work(self):
        '''主工作循環,檢查消息,顯示狀態'''
        # 清理屏幕
        begin_time = time.time()
        show_interval = self._delay_interval
        while True:
            for session in self._sessions:
                session.post_torrent_updates()
                # 從隊列中獲取信息
                self._handle_alerts(session, session.pop_alerts())
            time.sleep(self._sleep_time)
            if show_interval > 0:
                show_interval -= 1
                continue
            show_interval = self._delay_interval
    
            # 統計信息顯示
            show_content = ['torrents:']
            interval = time.time() - begin_time
            show_content.append('  pid: %s' % os.getpid())
            show_content.append('  time: %s' %
                                time.strftime('%Y-%m-%d %H:%M:%S'))
            show_content.append('  run time: %s' % self._get_runtime(interval))
            show_content.append('  start port: %d' % self._start_port)
            show_content.append('  collect session num: %d' %
                                len(self._sessions))
            show_content.append('  info hash nums from get peers: %d' %
                                len(self._infohash_queue_from_getpeers))
            show_content.append('  torrent collection rate: %f /minute' %
                                (self._current_meta_count * 60 / interval))
            show_content.append('  current torrent count: %d' %
                                self._current_meta_count)
            show_content.append('  total torrent count: %d' %
                                len(self._meta_list))
            show_content.append('\n')
    
            # 存儲運行狀態到文件
            try:
                with open(self._stat_file, 'wb') as f:
                    f.write('\n'.join(show_content))
                with open(self._result_file, 'wb') as f:
                    json.dump(self._meta_list, f)
            except Exception as err:
                pass
    
            # 測試是否到達退出時間
            if interval >= self._exit_time:
                # stop
                break
    
            # 每天結束備份結果文件
            self._backup_result()
    
        # 銷毀p2p客戶端
        for session in self._sessions:
            torrents = session.get_torrents()
            for torrent in torrents:
                session.remove_torrent(torrent)</pre> <p> </p>
    

    運行效率

    在我的一臺512M內存,單cpu機器上。爬蟲剛開始運行稍慢,運行幾分鐘后收集速度穩定在 180個每分鐘,1小時采集10000左右。

    運行狀態

    run times: 12

    torrents: pid: 11480 time: 2014-08-18 22:45:01 run time: day: 0, hour: 0, minute: 12, second: 25 start port: 32900 collect session num: 20 info hash nums from get peers: 2222 torrent collection rate: 179.098480 /minute current torrent count: 2224 total torrent count: 58037</pre>

     

    爬蟲完整代碼

    完整的代碼參見:https://github.com/blueskyz/DHTCrawler

    還包括一個基于twisted的監控進程,用來查看爬蟲狀態,在爬蟲進程退出后重新啟動。

     

    原文鏈接:python開發的 dht網絡爬蟲

 本文由用戶 jopen 自行上傳分享,僅供網友學習交流。所有權歸原作者,若您的權利被侵害,請聯系管理員。
 轉載本站原創文章,請注明出處,并保留原始鏈接、圖片水印。
 本站是一個以用戶分享為主的開源技術平臺,歡迎各類分享!