Pyleus 介紹:使用純 Python 的構建 Storm 拓撲的開源框架
大聲的宣誓,我們喜歡python,現在使用python的做web開發的人有相當大的比例,在大數據的行業中,python也是相當熱門。
Pylenus 是一個新的開源框架,這個框架的目標是完成一些和其他框架一樣的
Storm (事件處理器)
例如Hadoop:讓開發者全部使用python完成快速的開發迭代,使用大量的時間去解決商業之間的聯系問題,使用少量的時間去構建基礎的平臺。
首先一個簡短的介紹在Storm來自這個網站,Apache Storm 是一個免費得開源的分布式的即時的開源計算系統,Storm可以很簡單的可靠的無限的處理數據流在大量的數據,還可以進行數據實時的批處理就像hadoop一樣。
一個Pyleus拓撲包括,最少情況下,一個YAML文件描述的拓撲結構,聲明每個組件以及它們之間的順序。pyleus的命令行工具包含一個內建的Storm JAR,這個JAR可以提交任何Storm簇。
當涉及到海量數據處理演示時,“字數統計”是最典型的。由于Storm是用來操作“無限量數據流”,我們不能計算每個單次的總數,因為輸入流可以無限地繼續下去。相反,我們的拓撲結構將記錄每個次的遞增數,來記錄每一次我們看到它的值。
所以該如何使用Pyleus建立一個單詞計數的Storm拓撲呢?你所需要的是一個pyleus_topology.yaml和一些Python組件。
作為簡單的演示,你需要知道Storm的三個核心概念:
-
tuple是Storm拓撲中的數據單元,流入和流出Storm組件。
-
Spouts是將tuple導入拓撲的組件. 通常,spout消耗來自外部源的數據,如Kafka或Kinesis,然后將記錄標記元組。
-
Bolts訂閱一個或多個其他spouts和bolts的輸出流,做一些處理,然后標記為自己的元組。
這種拓撲有三個組成部分:一個spout發出隨機行的“lorem ipsum”文本,一個bolt將行拆分為單詞,bolt完成計數并記錄的相同的單詞出現的次數。
pyleus_topology.yaml word_count/ __init__.py line_spout.py split_words.py count_words.py
下面是pyleus_topology.yaml的內容:
name: word_count topology: - spout: name: line-spout module: word_count.line_spout - bolt: name: split-words module: word_count.split_words groupings: - shuffle_grouping: line-spout - bolt: name: count-words module: word_count.count_words groupings: - fields_grouping: component: split-words fields: - word
spout的配置是自解釋的,但bolts必須注明它所訂閱的元組流。拆詞的bolt綁定到shuffle_grouping——這意味著從line-spout發出的tuples應為split-words所有實例情況下的均勻和隨機分布,比如可以為一,五,或五十。
count-words,還是使用fields_grouping在 ‘word(詞)’ 域。這迫使所有元組發出split-words與‘word(詞)’一致的count-words實例。 這允許代碼word_count.count_words去做一個假設,它能“see(看)”到所有在同樣的處理中的一樣的單詞。
word_count/line_spout.py:
import random from pyleus.storm import Spout LINES = """ Lorem ipsum dolor sit amet, consectetur adipiscing elit. Curabitur pharetra ante eget nunc blandit vestibulum. Curabitur tempus mi ... vitae cursus leo, a congue justo. """.strip().split('\n') class LineSpout(Spout): OUTPUT_FIELDS = ["line"] def next_tuple(self): line = random.choice(LINES) tup = (line,) self.emit(tup) if __name__ == '__main__': LineSpout().run()
word_count/count_words.py:
from collections import defaultdict from pyleus.storm import SimpleBolt class CountWordsBolt(SimpleBolt): def initialize(self): self.words = defaultdict(int) def process_tuple(self, tup): word, = tup.values self.words[word] += 1 msg = "'{0}' has been seen {1} times\n".format(word, self.words[word]) with open("/tmp/word_counts.txt", 'a') as f: f.write(msg) if __name__ == '__main__': CountWordsBolt().run()
word_count/split_words.py 留給讀者作為練習。 (或者, 你可以在GitHub上查看所有的例子 full example)
現在,在pyleus目錄下運行將會產生一個文件word_count.jar。你可以使用pyleus submit來提交,或者你也可以使用pyleus local來在本地運行測試。
對于Pyleus topology(拓撲)來說代碼是非常簡單的,但是有一個非常有意思的特征值得我們關注的是它自己內部對virtualenv(https://virtualenv.readthedocs.org/)的集成。簡單的包含一個requirements.txt文件和你自己的pyleus_topology.yaml,pyleus build將會產生一個virtualenv使得你的代碼在這個JAR包中使用以及嵌入。你甚至可以重用Pyleus相關的組建包,并且在pyleus_topology.yaml里面直接關聯他們接可以了!
在Yelp的團隊已經開發出一種Pyleus端口為了從一個內部資源源消費數據,并且為其建立了一個Python包。現在,公司內其他人可以添加一行代碼到他們的 requirements.txt 和在他們的 pyleus_topology.yaml 中使用端口就無需編寫一行代碼了。
Pyleus現在是beta版軟件,但feedback和pull requests都很愉快地接受了。
開始使用Pyleus,用 pip install pyleus 安裝它,然后查看源碼在GitHub的上更多示例。