知乎 Live 全文搜索之完成爬蟲

2789284943 7年前發布 | 32K 次閱讀 知乎live 網絡爬蟲

看這篇文章前推薦閱讀相關的如下文章:

  1. 使用API登錄知乎并獲得token

  2. 知乎Live全文搜索之模型設計和爬蟲實現

  3. 知乎Live全文搜索之模型接口

抓取話題信息

給新增的Topic提供數據。在parse_live_link中,解析到Live數據中包含了topic的id, 基于這個id拼鏈接,然后在fetch方法中添加對topic頁面的處理,新增parse_topic_link方法:

TOPIC_API_URL = 'https://api.zhihu.com/topics/{}'
class Crawler:
   def __init__(self, max_redirect=10, max_tries=4,
                max_tasks=10, *, loop=None):
       ...
       self.seen_topics = set()
   async def parse_topic_link(self, response):
       rs = await response.json()
       if response.status == 200:
           rs['avatar_url'] = await self.convert_local_image(
               rs['avatar_url'].replace('_s', '_r'))
           Topic.add_or_update(**flatten_live_dict(rs, TOPIC_KEYS))
   async def parse_live_link(self, response):
       ...
       topics = live_dict.pop('topics')
       for topic in topics:
           topic_id = topic['id']
           if topic_id not in self.seen_topics:
               self.seen_topics.add(topic_id)
               self.add_url(TOPIC_API_URL.format(topic_id),
                            self.max_redirect)
       ...
   async def fetch(self, url, max_redirect):
       try:
           if 'api.zhihu.com' in url:
               parse_func = (self.parse_topic_link if 'topics' in url
                             else self.parse_live_link)
               next_url = await parse_func(response)
           else:
               next_url = await self.parse_zhuanlan_link(response)
        ...

思考下,這是不是一種套路(模式):

  1. 初始化一個已完成的url的集合

  2. 啟動一定量的worker,每個worker都在等待從隊列獲取要抓取的url

  3. 一次性添加足量要抓取的鏈接到隊列中,讓每個worker都忙起來(執行前要確認之前沒有抓取過)

  4. worker在parse處理中又會解析到新的要抓取的鏈接,放入隊列

  5. worker消費任務,過程中也可能生產任務給自己或者其他worker來消費

  6. 全部任務都完成了,通過 self.q.join() 結束

  7. 停止這些worker,任務完成

修改live灌suggest數據的方式

在上上篇我把相關字段的文本用analyze接口拆分成不同的token成為一個列表賦值給live_suggest,其實完全不需要這樣,因為 Completion(analyzer=ik_analyzer) 就是做這個的。gen_suggests用最簡單的input+weight就可以:

def gen_suggests(topics, tags, outline, username, subject):
   suggests = [{'input': item, 'weight': weight}
               for item, weight in ((topics, 10), (subject, 5), (outline, 3),
                                    (tags, 3), (username, 2)) if item]
   return suggests

下載主講人頭像

小程序開發工具中不能直接使用知乎的圖片資源,所以我只能下載下來并生成一個本地的圖片地址:

import os
IMAGE_FOLDER = 'static/images/zhihu'
if not os.path.exists(IMAGE_FOLDER):
   os.mkdir(IMAGE_FOLDER)
class Crawler:
   ...
   async def convert_local_image(self, pic):
       pic_name = pic.split('/')[-1]
       path = os.path.join(IMAGE_FOLDER, pic_name)
       if not os.path.exists(path):
           async with self.session.get(pic) as resp:
               content = await resp.read()
               with open(path, 'wb') as f:
                   f.write(content)
       return path
   async def parse_live_link(self, response):
       ...
       for live in rs['data']:
           ...
           speaker = live.pop('speaker')
           speaker_id = speaker['member']['id']
           speaker['member']['avatar_url'] = await self.convert_local_image(  # noqa
                   speaker['member']['avatar_url'])
           ...

這樣User類中的avatar_url最后會變成 static/images/zhihu/v2-4db301967fffa08dfa727ff467170e_s.jpg 這樣的地址了。未來我們將讓sanic來提供靜態資源服務。當然,也可以只存文件名,在接口返回前再去拼圖片地址。

抓取專欄信息

知乎Live申請通過之后,主講人可以寫一篇專欄介紹Live,文章中帶上Live的鏈接來導流,知乎Live官方也會收錄這個Live的專欄文章。為了讓微信小程序的效果更好,我想要抓專欄頭圖,并且保存專欄鏈接,希望在小城中能給跳轉進去(以證明不可行)。下面我將遍歷 知乎Live官方專欄 收錄的專欄,解析每個專欄的標題,去ES里面匹配是不是有對應的subject匹配,如果匹配還會驗證用戶的hash值確保正確,如果沒找到還會從Live正文中搜索live的鏈接的方式來匹配。

看起來很繞,但是沒辦法啦,因為專欄和live沒有明確的匹配關系,目測是知乎2個部門寫的不同的系統。

最后要提的是專欄的抓取和live的api不同,它不提供paging鍵,也就是返回內容中并不告訴你下一頁的地址,所以需要我們人工的修改鏈接,這需要一個轉化的函數:

from urllib.parse import urlparse, parse_qsl, urlunparse, urlencode
def get_next_url(url):
   url_parts = list(urlparse(url))
   query = dict(parse_qsl(url_parts[4]))
   query['offset'] = int(query['offset']) + int(query['limit'])
   url_parts[4] = urlencode(query)
   return urlunparse(url_parts)

這個方法在我實際工作中很常用:

In : get_next_url('http://dongwm.com?offset=10&limit=20')
Out: 'http://dongwm.com?offset=30&limit=20'
In : get_next_url('http://dongwm.com?offset=20&limit=30')
Out: 'http://dongwm.com?offset=50&limit=30'
ZHUANLAN_API_URL = 'https://zhuanlan.zhihu.com/api/columns/zhihulive/posts?limit=20&offset={offset}'
LIVE_REGEX = re.compile(r'<a href="https://(www.)?zhihu.com/lives/(\d+)(.*)?"')  # noqa
class Crawler:
   def __init__(self, max_redirect=10, max_tries=4,
                max_tasks=10, *, loop=None):
       ...
       self.seen_zhuanlan = set()
   ...
   async def parse_zhuanlan_link(self, response):
       posts = await response.json()
       if response.status == 200 and posts:
           for post in posts:
               cover = post['titleImage']
               if not cover:
                   continue
               s = Live.search()
               title = post['title']
               for sep in ('-', '—'):
                   if sep in title:
                       title = title.split(sep)[-1].strip()
               speaker_id = post['author']['hash']
               zid = post['url'].split('/')[-1]
               s = s.query(Q('match_phrase', subject=title))
               lives = await s.execute()
               for live in lives:
                   if live.speaker and live.speaker.speaker_id == speaker_id:
                       await self.update_live(zid, cover, live)
                       break
               else:
                   match = LIVE_REGEX.search(post['content'])
                   if match:
                       live_id = match.group(2)
                       try:
                           live = await Live.get(live_id)
                       except NotFoundError:
                           pass
                       else:
                           await self.update_live(zid, cover, live)
           return get_next_url(response.url)
   async def update_live(self, zid, cover, live):
       if live.id in self.seen_zhuanlan:
           return
       self.seen_zhuanlan.add(live.id)
       zhuanlan_url = ZHUANLAN_URL.format(zid)
       cover = await self.convert_local_image(cover)
       await live.update(cover=cover, zhuanlan_url=zhuanlan_url)
   def add_zhuanlan_urls(self):
       for offset in range(self.max_tasks):
           self.add_url(ZHUANLAN_API_URL.format(offset=offset * 20))
   async def crawl(self):
       self.__workers = [asyncio.Task(self.work(), loop=self.loop)
                         for _ in range(self.max_tasks)]
       self.t0 = time.time()
       await self.q.join()
       self.add_zhuanlan_urls()
       await self.q.join()
       self.t1 = time.time()
       for w in self.__workers:
           w.cancel()

其中crawl方法中用2次join用來確保 先抓取全部live信息之后再去抓專欄信息 ,因為得先確保live內容足夠完整才能搜索匹配,其次由于parse_live_link和parse_zhuanlan_link都涉及到Live的更新,在并發中容易造成同時更新某些live而觸發版本沖突的ConflictError。

我使用 s = s.query(Q('match_phrase', subject=title)) 進行標題匹配,首先我們先聊聊在ES中match和term的區別, 簡單的說:

term用于精確查詢,match用于全文檢索

我們要把標題和Live的subject字段去匹配,但是由于subject設置了analyzer,所以無法使用term。除非新加一個字段,修改成類似cover的那種 Text(index='not_analyzed') 。但是這樣新增字段實在有點浪費,用math會把要匹配的標題分詞之后挨個去匹配,匹配其中一個或多個的文檔就會被搜索出來, 顯然不滿足「精確」,所以我想到了「短語匹配」(Phrase Matching)。

短語匹配和match查詢類似,match_phrase查詢首先解析查詢字符串產生一個詞條列表。但只保留含有了所有搜索詞條的文檔,并且還要求這些詞條的順序也一致。就是相當于雖然分詞了但是詞的順序是有要求的,效果類似于精確匹配。

更新用戶舉辦的Live數量

之前我們給User添加了incr_live_count這個方法,調用一次live_count就+1,由于這個爬蟲每次都是重新過一遍,所以需要僅在創建live的時候才更新:

async def parse_live_link(self, response):
   ...
   result = await Live.add(**live_dict)                                                                                  
   if result.meta['version'] == 1:                                                                                        
       user.incr_live_count()

ES每次每次都會返回添加/更新的結果,其中的version字段正好被我們利用。

優化抓取

終于到最后一個小節了。再次道歉,之前分享的爬蟲其中有一句檢查要抓取的url是不是在self.seen_uls里面的判斷,如果已經抓取過就assert拋了異常,這其實造成最后就剩下一個協程在執行任務了。

現在我重構了這部分的內容,大家看代碼體會吧:

class Crawler:
   def __init__(self, max_redirect=10, max_tries=4,
                max_tasks=10, *, loop=None):
       self.__stopped = {}.fromkeys(['ended', 'ongoing', 'posts'], False)
   async def fetch(self, url, max_redirect):
       ...
       if next_url is not None:
           self.add_url(next_url, max_redirect)
       else:
           # 如果live或者知乎官方專欄接口不再返回下一頁,這個類型的任務就停止
           for type in self.__stopped:
               if type in url:
                   self.__stopped[type] = True
   async def work(self):
       try:
           while 1:
               url, max_redirect = await self.q.get()
               if url in self.seen_urls:
                   type = url.split('/')[-1].split('?')[0]
                   # 如果一個接口返回的next_url已經抓去過,自動添加next_url的下一頁
                   # 除非這個類型的任務停止狀態為True
                   if not type.isdigit() and not self.__stopped[type]:
                       self.add_url(get_next_url(url), max_redirect)
               await self.fetch(url, max_redirect)
               self.q.task_done()
               asyncio.sleep(1)
       except asyncio.CancelledError:
           pass

這樣就既不會重復抓取,也能保證worker都能正常工作。

截止發稿,抓取到的Live 1967個, 話題 656 個 完整抓取一次大概調用約950次API(1967 / 10 + 1967 / 20 + 656), 在我MacBook上耗時 70 - 90 s。

 

 

來自:http://mp.weixin.qq.com/s/BWQQSLFYGFMoUgLteTu7tQ

 

 本文由用戶 2789284943 自行上傳分享,僅供網友學習交流。所有權歸原作者,若您的權利被侵害,請聯系管理員。
 轉載本站原創文章,請注明出處,并保留原始鏈接、圖片水印。
 本站是一個以用戶分享為主的開源技術平臺,歡迎各類分享!