Storm實驗 -- 單詞計數
1. 使用mvn命令創建項目
2. 實現數據源,用重復的靜態語句來模擬數據源
mvn archetype:generate -DgroupId=storm.test -DartifactId=Storm01 -DpackageName=com.zhch.v1
然后編輯配置文件pom.xml,添加storm依賴
<dependency> <groupId>org.apache.storm</groupId> <artifactId>storm-core</artifactId> <version>0.9.4</version> </dependency>
最后通過下述命令來編譯項目,編譯正確完成后導入到IDE中
mvn install
當然,也可以在IDE中安裝maven插件,從而直接在IDE中創建maven項目
2. 實現數據源,用重復的靜態語句來模擬數據源
package storm.test.v1;
import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichSpout;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
import java.util.Map;
public class SentenceSpout extends BaseRichSpout {
private String[] sentences = {
"storm integrates with the queueing",
"and database technologies you already use",
"a storm topology consumes streams of data",
"and processes those streams in arbitrarily complex ways",
"repartitioning the streams between each stage of the computation however needed"
};
private int index = 0;
private SpoutOutputCollector collector;
@Override
public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
outputFieldsDeclarer.declare(new Fields("sentence"));
}
@Override
public void open(Map map, TopologyContext topologyContext, SpoutOutputCollector spoutOutputCollector) {
this.collector = spoutOutputCollector;
}
@Override
public void nextTuple() {
this.collector.emit(new Values(sentences[index]));
index++;
if (index >= sentences.length) {
index = 0;
}
try {
Thread.sleep(1);
} catch (InterruptedException e) {
}
}
} 3. 實現語句分割bolt
package storm.test.v1;
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichBolt;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;
import java.util.Map;
public class SplitSentenceBolt extends BaseRichBolt {
private OutputCollector collector;
@Override
public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {
this.collector = outputCollector;
}
@Override
public void execute(Tuple tuple) {
String sentence = tuple.getStringByField("sentence");
String[] words = sentence.split(" ");
for (String word : words) {
this.collector.emit(new Values(word));
}
}
@Override
public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
outputFieldsDeclarer.declare(new Fields("word"));
}
} 4. 實現單詞計數bolt
package storm.test.v1;
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichBolt;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;
import java.util.HashMap;
import java.util.Map;
public class WordCountBolt extends BaseRichBolt {
private OutputCollector collector;
private HashMap<String, Long> counts = null;
@Override
public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {
this.collector = outputCollector;
this.counts = new HashMap<String, Long>();
}
@Override
public void execute(Tuple tuple) {
String word = tuple.getStringByField("word");
Long count = this.counts.get(word);
if (count == null) {
count = 0L;
}
count++;
this.counts.put(word, count);
this.collector.emit(new Values(word, count));
}
@Override
public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
outputFieldsDeclarer.declare(new Fields("word", "count"));
}
} 5. 實現上報bolt
package storm.test.v1;
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichBolt;
import backtype.storm.tuple.Tuple;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
public class ReportBolt extends BaseRichBolt {
private HashMap<String, Long> counts = null;
@Override
public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {
counts = new HashMap<String, Long>();
}
@Override
public void execute(Tuple tuple) {
String word = tuple.getStringByField("word");
Long count = tuple.getLongByField("count");
this.counts.put(word, count);
}
@Override
public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
}
@Override
public void cleanup() { //本地模式下,終止topology時可以保證cleanup()被執行
System.out.println("--- FINAL COUNTS ---");
List<String> keys = new ArrayList<String>();
keys.addAll(this.counts.keySet());
Collections.sort(keys);
for (String key : keys) {
System.out.println(key + " : " + this.counts.get(key));
}
System.out.println("----------");
}
} 6. 實現單詞計數topology
package storm.test.v1;
import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.topology.TopologyBuilder;
import backtype.storm.tuple.Fields;
public class WordCountTopology {
private static final String SENTENCE_SPOUT_ID = "sentence-spout";
private static final String SPLIT_BOLT_ID = "split-bolt";
private static final String COUNT_BOLT_ID = "conut-bolt";
private static final String REPORT_BOLT_ID = "report-bolt";
private static final String TOPOLOGY_NAME = "word-count-topology";
public static void main(String[] args) {
SentenceSpout spout = new SentenceSpout();
SplitSentenceBolt spiltBolt = new SplitSentenceBolt();
WordCountBolt countBolt = new WordCountBolt();
ReportBolt reportBolt = new ReportBolt();
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout(SENTENCE_SPOUT_ID, spout); //注冊數據源
builder.setBolt(SPLIT_BOLT_ID, spiltBolt) //注冊bolt
.shuffleGrouping(SENTENCE_SPOUT_ID); //該bolt訂閱spout隨機均勻發射來的數據流
builder.setBolt(COUNT_BOLT_ID, countBolt)
.fieldsGrouping(SPLIT_BOLT_ID, new Fields("word")); //該bolt訂閱spiltBolt發射來的數據流,并且保證"word"字段值相同的tuple會被路由到同一個countBolt
builder.setBolt(REPORT_BOLT_ID, reportBolt)
.globalGrouping(COUNT_BOLT_ID); //該bolt訂閱countBolt發射來的數據流,并且所有的tuple都會被路由到唯一的一個reportBolt中
Config config = new Config();
//本地模式啟動
LocalCluster cluster = new LocalCluster();
cluster.submitTopology(TOPOLOGY_NAME, config, builder.createTopology());
try {
Thread.sleep(5 * 1000);
} catch (InterruptedException e) {
}
cluster.killTopology(TOPOLOGY_NAME);
cluster.shutdown();
}
} 7. 運行結果:
--- FINAL COUNTS --- a : 302 already : 302 and : 604 arbitrarily : 302 between : 302 complex : 302 computation : 302 consumes : 302 data : 302 database : 302 each : 302 however : 302 in : 302 integrates : 302 needed : 302 of : 604 processes : 302 queueing : 302 repartitioning : 302 stage : 302 storm : 604 streams : 906 technologies : 302 the : 906 those : 302 topology : 302 use : 302 ways : 302 with : 302 you : 302 ----------
本文由用戶 fpcm 自行上傳分享,僅供網友學習交流。所有權歸原作者,若您的權利被侵害,請聯系管理員。
轉載本站原創文章,請注明出處,并保留原始鏈接、圖片水印。
本站是一個以用戶分享為主的開源技術平臺,歡迎各類分享!