Pyleus 介紹:使用純 Python 的構建 Storm 拓撲的開源框架

jopen 10年前發布 | 37K 次閱讀 Python Python開發

大聲的宣誓,我們喜歡python,現在使用python的做web開發的人有相當大的比例,在大數據的行業中,python也是相當熱門。

Pylenus 是一個新的開源框架,這個框架的目標是完成一些和其他框架一樣的

Storm (事件處理器)

例如Hadoop:讓開發者全部使用python完成快速的開發迭代,使用大量的時間去解決商業之間的聯系問題,使用少量的時間去構建基礎的平臺。

首先一個簡短的介紹在Storm來自這個網站,Apache Storm 是一個免費得開源的分布式的即時的開源計算系統,Storm可以很簡單的可靠的無限的處理數據流在大量的數據,還可以進行數據實時的批處理就像hadoop一樣。

一個Pyleus拓撲包括,最少情況下,一個YAML文件描述的拓撲結構,聲明每個組件以及它們之間的順序。pyleus的命令行工具包含一個內建的Storm JAR,這個JAR可以提交任何Storm簇。

當涉及到海量數據處理演示時,“字數統計”是最典型的。由于Storm是用來操作“無限量數據流”,我們不能計算每個單次的總數,因為輸入流可以無限地繼續下去。相反,我們的拓撲結構將記錄每個次的遞增數,來記錄每一次我們看到它的值。

所以該如何使用Pyleus建立一個單詞計數的Storm拓撲呢?你所需要的是一個pyleus_topology.yaml和一些Python組件。

作為簡單的演示,你需要知道Storm的三個核心概念:

  • tuple是Storm拓撲中的數據單元,流入和流出Storm組件。

  • Spouts是將tuple導入拓撲的組件. 通常,spout消耗來自外部源的數據,如KafkaKinesis,然后將記錄標記元組。

  • Bolts訂閱一個或多個其他spoutsbolts的輸出流,做一些處理,然后標記為自己的元組。

這種拓撲有三個組成部分:一個spout發出隨機行的“lorem ipsum”文本,一個bolt將行拆分為單詞,bolt完成計數并記錄的相同的單詞出現的次數。

pyleus_topology.yaml
word_count/
    __init__.py
    line_spout.py
    split_words.py
    count_words.py

下面是pyleus_topology.yaml的內容:

name: word_count

topology:

    - spout:
        name: line-spout
        module: word_count.line_spout

    - bolt:
        name: split-words
        module: word_count.split_words
        groupings:
            - shuffle_grouping: line-spout

    - bolt:
        name: count-words
        module: word_count.count_words
        groupings:
            - fields_grouping:
                component: split-words
                fields:
                    - word

spout的配置是自解釋的,但bolts必須注明它所訂閱的元組流。拆詞的bolt綁定到shuffle_grouping——這意味著從line-spout發出的tuples應為split-words所有實例情況下的均勻和隨機分布,比如可以為一,五,或五十。

count-words,還是使用fields_grouping在 ‘word(詞)’ 域。這迫使所有元組發出split-words與‘word(詞)’一致的count-words實例。 這允許代碼word_count.count_words去做一個假設,它能“see(看)”到所有在同樣的處理中的一樣的單詞。 

word_count/line_spout.py:

import random

from pyleus.storm import Spout

LINES = """
Lorem ipsum dolor sit amet, consectetur
adipiscing elit. Curabitur pharetra ante eget
nunc blandit vestibulum. Curabitur tempus mi
...
vitae cursus leo, a congue justo.
""".strip().split('\n')


class LineSpout(Spout):

    OUTPUT_FIELDS = ["line"]

    def next_tuple(self):
        line = random.choice(LINES)
        tup = (line,)
        self.emit(tup)


if __name__ == '__main__':
    LineSpout().run()

word_count/count_words.py:

from collections import defaultdict

from pyleus.storm import SimpleBolt


class CountWordsBolt(SimpleBolt):

    def initialize(self):
        self.words = defaultdict(int)

    def process_tuple(self, tup):
        word, = tup.values

        self.words[word] += 1

        msg = "'{0}' has been seen {1} times\n".format(word, self.words[word])
        with open("/tmp/word_counts.txt", 'a') as f:
            f.write(msg)


if __name__ == '__main__':
    CountWordsBolt().run()

word_count/split_words.py 留給讀者作為練習。 (或者, 你可以在GitHub上查看所有的例子 full example)

現在,在pyleus目錄下運行將會產生一個文件word_count.jar。你可以使用pyleus submit來提交,或者你也可以使用pyleus local來在本地運行測試。

對于Pyleus topology(拓撲)來說代碼是非常簡單的,但是有一個非常有意思的特征值得我們關注的是它自己內部對virtualenv(https://virtualenv.readthedocs.org/)的集成。簡單的包含一個requirements.txt文件和你自己的pyleus_topology.yaml,pyleus build將會產生一個virtualenv使得你的代碼在這個JAR包中使用以及嵌入。你甚至可以重用Pyleus相關的組建包,并且在pyleus_topology.yaml里面直接關聯他們接可以了!

在Yelp的團隊已經開發出一種Pyleus端口為了從一個內部資源源消費數據,并且為其建立了一個Python包。現在,公司內其他人可以添加一行代碼到他們的 requirements.txt 和在他們的  pyleus_topology.yaml 中使用端口就無需編寫一行代碼了。

Pyleus現在是beta版軟件,但feedbackpull requests都很愉快地接受了。

開始使用Pyleus,用 pip install pyleus 安裝它,然后查看源碼在GitHub的上更多示例。

 本文由用戶 jopen 自行上傳分享,僅供網友學習交流。所有權歸原作者,若您的權利被侵害,請聯系管理員。
 轉載本站原創文章,請注明出處,并保留原始鏈接、圖片水印。
 本站是一個以用戶分享為主的開源技術平臺,歡迎各類分享!