博客
关于我
Apache Storm中自带的拓扑例子解析
阅读量:296 次
发布时间:2019-03-03

本文共 23875 字,大约阅读时间需要 79 分钟。

Apache Storm中自带的拓扑例子解析

Apache Storm

strom学习(二)——storm源码解析与wordcount案例解析

https://blog.csdn.net/livan1234/article/details/81750091

https://blog.csdn.net/it_freshman/article/details/83538152

https://github.com/apache/storm

examples

storm-starter

storm-starter包含使用Storm的各种示例。如果这是您第一次使用Storm,请首先检查以下拓扑:

ExclamationTopology:用所有Java语言编写的基本拓扑

WordCountTopology:通过在Python中实现一个螺栓来利用多语言的基本拓扑
ReachTopology:Storm之上的复杂DRPC的示例
LambdaTopology:使用Java8 lambda表达式编写喷口/螺栓的示例
在熟悉了这些拓扑之后,请查看src / jvm / org / apache / storm / starter /中的其他拓扑, 例如RollingTopWords, 以获取更多高级实现。

ExclamationTopology

参考:https://blog.csdn.net/wuxintdrh/article/details/60866625

https://blog.csdn.net/yanhongbin1993/article/details/81951970

/** * File name:ExclamationTopolgy.java * Creation date :2020年11月2日 * Copyright (c) 2019 Oceanchen * All rights reserved. */package Topology;import java.util.Map;import backtype.storm.Config;import backtype.storm.LocalCluster;import backtype.storm.StormSubmitter;import backtype.storm.task.OutputCollector;import backtype.storm.task.TopologyContext;import backtype.storm.testing.TestWordSpout;import backtype.storm.topology.OutputFieldsDeclarer;import backtype.storm.topology.TopologyBuilder;import backtype.storm.topology.base.BaseRichBolt;import backtype.storm.tuple.Fields;import backtype.storm.tuple.Tuple;import backtype.storm.tuple.Values;import backtype.storm.utils.Utils;/** * @author Oceanchen * * @see 2020年11月2日 *  * @description */	/*	 * 这个Topology包含一个Spout和两个Bolt,逻辑拓扑为线性结构	 * Spout发射单词, 每个bolt在每个单词后面加个”!!!”,调用同一个bolt类	 */	public class FirstTopology {   	     /* 实现IRichBolt接口,功能就是加感叹号,在Bolt组件中有两个方法是我们常用的,prepare方法主要做一些准备工作,		 * 当该组件的任务在群集中的工作程序中初始化时调用。它为Bolt提供执行的环境,并给它提供一个Outputcollector(输出集合)用来发射tuple;		 * 第二个是execute方法,它用来处理一个单独的输入元组,在这个方法里用户可以定义想要实现的计算功能。*/		public static class ExclamationBolt extends BaseRichBolt {   			//首先创建一个输出集合用来存放和调用发射消息方法			OutputCollector _collector;			/*			 * 接下来重写函数prepare,当该组件的任务在集群中初始化时调用。它需要输入三个参数,第一个conf是这个Bolt的storm配置文件中的配置,这里prepare方法只是简单地把OutputCollector作为一个类字段保存下来给后面execute方法使用,也就是初始化发射器;			 * 第二个是TopologyContext类类型context,用于获取有关拓扑中该任务位置的信息,包括任务ID和组件ID,输入和输出信息等;			 * 第三个是OutputcoCollector输出集合类型的发射器collector,该发射器用于从该Bolt发出元组Tuple。			 * 			 */			public void prepare(Map conf, TopologyContext context, OutputCollector collector) {   				_collector = collector;			}			/*			 * 下一步重写函数execute,它只有一个输入参数就是要处理的消息输入元组tuple,此处execute方法从bolt的一个输入接收tuple(一个bolt可能有多个输入源)。			 */			public void execute(Tuple tuple) {   				// tuple为输入的数据,ExclamationBolt类逻辑就是获取tuple的第一个字段,在它后面加三个感叹号作为新的值被集合发射给下一个Bolt				_collector.emit(tuple, new Values(tuple.getString(0) + "!!!"));				// 这个ack是storm的一种消息传输保证机制,简单说就是监督tuple有没有在各个组件之间正常传输。				_collector.ack(tuple);			}			/*			 * 前面我们讲过源源不断的消息tuple会形成数据流,各个组件之间的消息是以流的形式传递的,			 * 所以最后我们还应该声明此拓扑流的输出模式,declarer用于声明输出流ID,输出字段以及每个输出流是否是直接流等,此处是声明了一个字段“word”用于下一个组件进行消息流的识别			 */			public void declareOutputFields(OutputFieldsDeclarer declarer) {   				declarer.declare(new Fields("word"));			}		}		/* 至此,Bolt组件就定义完了,有了发射组件类spout,执行组件类Bolt,接下来我们开始实例化并且创建拓扑提交器了。 */		public static void main(String[] args) throws Exception {   			// 创建拓扑构建器对象			TopologyBuilder builder = new TopologyBuilder();			// 接下来创建Spout对象,取名为word,并行度设置为10,意思是storm处理该拓扑时给该Spout分10个线程来运行它				/*		 * 这个例子的Spout组件是调用的storm-core包里的一个TestWordSpout类,它主要是随机发射一个单词		 */			builder.setSpout("word", new TestWordSpout(), 10);//创建Bolt,该Bolt的名字是exclaim1,分配三个线程来处理它,它的上游是名为“word”的Spout(即,接收名为word的Spout的数据),spout和这个bolt之间流的分组方式是随机分组。这个例子只用定义一个Bolt组件ExclaimtionBolt,它继承自BaseRichBolt,			builder.setBolt("exclaim1", new ExclamationBolt(), 3).shuffleGrouping("word");//创建第二个Bolt,该Bolt的名字是exclaim2,分配两个线程来处理它,它的上游是名为“Exclamation”的Bolt,这两个bolt之间流的分组方式也是随机分组。			builder.setBolt("exclaim2", new ExclamationBolt(), 2).shuffleGrouping("exclaim1");			// 创建输入的配置信息,它使用的是storm的配置文件,并设置为debug模式。这写的是一个简单的例子,如果要运行一些复杂的例子,需要在storm的配置文件中加入自己开发的拓扑需要用到的的配置信息。			Config conf = new Config();			conf.setDebug(true);			/*			 * 接下来就是提交拓扑了,storm的运行有两种模式: 本地模式和分布式模式.			 * 在本地模式中,storm用一个进程里面的线程来模拟所有的spout和bolt. 本地模式对开发和测试来说比较有用。			 * 在分布式模式下, storm由一堆机器组成。当你提交topology给主节点的时候,你同时也把topology的代码提交了。主节点负责分发你的代码并且负责给你的topolgoy分配工作进程。如果一个工作进程挂掉了,它会把任务重新分配到其它节点。			 */			if (args != null && args.length > 0) {   //如果需要传递参数(这个是storm内部处理参数,是用clojure语言写的,如果是集群模式提交拓扑的clojure代码里会有参数args),说明在集群模式上提交,//然后创建三个进程来执行此拓扑				conf.setNumWorkers(3);				StormSubmitter.submitTopologyWithProgressBar(args[0], conf, builder.createTopology()); // args是集群提交的一个参数			} else {   				// 本地模式通过定义一个LocalCluster对象来定义一个进程内的集群。提交topology给这个虚拟的集群和提交topology给分布式集群是一样的。通过调用submitTopology方法来提交topology,它接受三个参数:要运行的topology的名字,一个配置对象以及要运行的topology本身。topology的名字是用来唯一区别一个topology的,这样你然后可以用这个名字来杀死这个topology的。前面已经说过了,你必须显式的杀掉一个topology,否则它会一直运行。Conf对象可以配置很多东西。				LocalCluster cluster = new LocalCluster();				cluster.submitTopology("test", conf, builder.createTopology());				Utils.sleep(10000); // 利用java中的utils类的线程休眠方法用来睡眠一段时间				cluster.killTopology("test"); // 这个句子是用来杀死拓扑				cluster.shutdown(); // 关闭集群,此时可以调用cleanup方法			}		}	}

WordCountTopologyNode部分使用js文件实现同样功能

WordCountTopology

参考:

https://blog.csdn.net/iaprogramer/article/details/51669415
https://www.bilibili.com/video/BV1R7411q7rr?p=2
https://www.cnblogs.com/hseagle/p/3505938.html?utm_source=tuicool&utm_medium=referral

storm-starter中的WordCountTopology例子代码
WordCountTopology.java
/** * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements.  See the NOTICE file distributed with * this work for additional information regarding copyright ownership.  The ASF licenses this file to you under the Apache License, Version * 2.0 (the "License"); you may not use this file except in compliance with the License.  You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions * and limitations under the License. */package org.apache.storm.starter;import java.util.HashMap;import java.util.Map;import org.apache.storm.starter.spout.RandomSentenceSpout;import org.apache.storm.task.ShellBolt;import org.apache.storm.topology.BasicOutputCollector;import org.apache.storm.topology.ConfigurableTopology;import org.apache.storm.topology.IRichBolt;import org.apache.storm.topology.OutputFieldsDeclarer;import org.apache.storm.topology.TopologyBuilder;import org.apache.storm.topology.base.BaseBasicBolt;import org.apache.storm.tuple.Fields;import org.apache.storm.tuple.Tuple;import org.apache.storm.tuple.Values;/** * 驱动类 * 构造一个任务拓扑图 * 拓扑使用的类都尽量实现序列化接口java.io.serializable * supervisor和niumbus进行需要把jar包进行序列化和反序列化 * This topology demonstrates Storm's stream groupings and multilang * capabilities. */public class WordCountTopology extends ConfigurableTopology {       public static void main(String[] args) throws Exception {           ConfigurableTopology.start(new WordCountTopology(), args);    }    protected int run(String[] args) throws Exception {           //使用storm api中TopologyBuilder        TopologyBuilder builder = new TopologyBuilder();//        指定拓扑的spout组件 id,随机生成单词的soupt        builder.setSpout("spout", new RandomSentenceSpout(), 5);//指定id,shuffleGrouping是构造两个组件的边,指定bolt连接的是哪个spout,涉及到数据分组问题//        如果下游处理单元bolt有多个,那么spout发送的tuple如何发到这多个bolt中//        shuffleGrouping随机,使下游多个bolt能够平均得到tuple        builder.setBolt("split", new SplitSentence(), 8).shuffleGrouping("spout");//        就是这个countbolt是从split里接受数据的        builder.setBolt("count", new WordCount(), 12).fieldsGrouping("split", new Fields("word"));        conf.setDebug(true);        String topologyName = "word-count";        conf.setNumWorkers(3);        if (args != null && args.length > 0) {               topologyName = args[0];        }        return submit(topologyName, conf, builder);    }//bolt组件,实现IRichBolt接口或者继承basebolt    public static class SplitSentence extends ShellBolt implements IRichBolt {   //parper()方法类似spout的open()        public SplitSentence() {   //            调用splitsentence.py进行对单词进行分割,并且利用发射器的emit方法进行发送tuple            super("python", "splitsentence.py");        }        @Override        public void declareOutputFields(OutputFieldsDeclarer declarer) {               declarer.declare(new Fields("word"));        }        @Override        public Map
getComponentConfiguration() { return null; } }//指定wordcount这个bolt组件 public static class WordCount extends BaseBasicBolt { Map
counts = new HashMap
();//相当于spout里面的nexttuple,循环调用// BasicOutputCollector就是bolt的数据发射器 @Override public void execute(Tuple tuple, BasicOutputCollector collector) { //获取到“word” String word = tuple.getString(0);// 从map的counts里面获取word的次数,看map中是否含有过这个单词的key,没有则为0 Integer count = counts.get(word); if (count == null) { count = 0; }// 如果有的话直接加1 count++; counts.put(word, count);// 继续发送tuple到后面的bolt,发送两个map collector.emit(new Values(word, count)); }//declareOutputFields声明向后发射的key是什么 @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("word", "count")); } }}
RandomSentenceSpout.java
/** * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements.  See the NOTICE file distributed with * this work for additional information regarding copyright ownership.  The ASF licenses this file to you under the Apache License, Version * 2.0 (the "License"); you may not use this file except in compliance with the License.  You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions * and limitations under the License. */package org.apache.storm.starter.spout;import java.text.SimpleDateFormat;import java.util.Date;import java.util.Map;import java.util.Random;import org.apache.storm.spout.SpoutOutputCollector;import org.apache.storm.task.TopologyContext;import org.apache.storm.topology.OutputFieldsDeclarer;import org.apache.storm.topology.base.BaseRichSpout;import org.apache.storm.tuple.Fields;import org.apache.storm.tuple.Values;import org.apache.storm.utils.Utils;import org.slf4j.Logger;import org.slf4j.LoggerFactory;//编写spout组件//继承BaseRichSpout,这个抽象类继承BaseRichSpout实现了IRichSpout接口//所以可以实现IRichSpout接口  或者   继承继承BaseRichSpoutpublic class RandomSentenceSpout extends BaseRichSpout {       private static final Logger LOG = LoggerFactory.getLogger(RandomSentenceSpout.class);    SpoutOutputCollector _collector;    Random _rand;//open方法 是spout组件的初始化方法,是spout实例被创建后首先被调用的方法,只初始化调用一次//Map
conf 是storm里的storm.yaml文件的项写到map里// SpoutOutputCollector:tuple数据的发射器 创建类的实例变量,为了给后面的bolt传送数据用这个类 @Override public void open(Map
conf, TopologyContext context, SpoutOutputCollector collector) { _collector = collector; _rand = new Random(); }//nextTuple 实现如何从数据源上获取数据的逻辑,以及后面的组件bolt发送数据,循环调用 @Override public void nextTuple() { Utils.sleep(100); String[] sentences = new String[]{ sentence("the cow jumped over the moon"), sentence("an apple a day keeps the doctor away"), sentence("four score and seven years ago"), sentence("snow white and the seven dwarfs"), sentence("i am at two with nature") }; final String sentence = sentences[_rand.nextInt(sentences.length)]; LOG.debug("Emitting tuple: {}", sentence); _collector.emit(new Values(sentence)); } protected String sentence(String input) { return input; }//消息可靠保障机制(数据仅处理一次)当某个tuple在拓扑上处理成功后,调用ack方法执行一些消息处理成功后该干的事情//可以将tuple先存入到一个map中,当该拓扑处理成功后,就在map中移出掉 @Override public void ack(Object id) { }//消息处理失败,或者超时以后该干什么,例如重试,重试达到最大的阈值则丢弃 @Override public void fail(Object id) { }//声明向后面组件发射的tuple的key依次是什么 [“word”,“value”] @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("word")); }// 还有 getComponentConfiguration()函数:就是设置该组件spout的一些专用参数:用多少执行者线程跑 // Add unique identifier to each tuple, which is helpful for debugging public static class TimeStamped extends RandomSentenceSpout { private final String prefix; public TimeStamped() { this(""); } public TimeStamped(String prefix) { this.prefix = prefix; } protected String sentence(String input) { return prefix + currentDate() + " " + input; } private String currentDate() { return new SimpleDateFormat("yyyy.MM.dd_HH:mm:ss.SSSSSSSSS").format(new Date()); } }}
SplitSentence.py
# Licensed to the Apache Software Foundation (ASF) under one# or more contributor license agreements.  See the NOTICE file# distributed with this work for additional information# regarding copyright ownership.  The ASF licenses this file# to you under the Apache License, Version 2.0 (the# "License"); you may not use this file except in compliance# with the License.  You may obtain a copy of the License at## http://www.apache.org/licenses/LICENSE-2.0## Unless required by applicable law or agreed to in writing, software# distributed under the License is distributed on an "AS IS" BASIS,# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.# See the License for the specific language governing permissions and# limitations under the License.import stormclass SplitSentenceBolt(storm.BasicBolt):    def process(self, tup):        words = tup.values[0].split(" ")        for word in words:          storm.emit([word])SplitSentenceBolt().run()
自己实现WordCountTopology

在这里插入图片描述

上图参考:https://blog.csdn.net/livan1234/article/details/81750091?utm_medium=distribute.wap_relevant.none-task-blog-baidulandingword-7

利用maven,引入storm-core.jar依赖

pom.xml
4.0.0
com.test.maven
project01
0.0.1-SNAPSHOT
org.apache.storm
storm-core
0.9.3
provided
commons-lang
commons-lang
2.6
testTopolgy.java
/** * File name:testTopolgy.java * Creation date :2020年10月31日 * Copyright (c) 2019 Oceanchen * All rights reserved. */package Topology;import backtype.storm.Config;import backtype.storm.LocalCluster;import backtype.storm.topology.IRichSpout;import backtype.storm.topology.TopologyBuilder;import bolt.SpiltBolt;import bolt.WordCount;import spout.MySpout;//在主类中创建拓扑结构public class testTopolgy {   	public static void main(String[] args) throws InterruptedException {   //      使用topolgyBuilder 类进行创建spout bolt组件		TopologyBuilder builder = new TopologyBuilder();		builder.setSpout("MySpout", new MySpout());		builder.setBolt("SpiltBolt", new SpiltBolt(),2).shuffleGrouping("MySpout");		builder.setBolt("WordCount", new WordCount())						.shuffleGrouping("SpiltBolt");		Config conf = new Config();		conf.setDebug(false);		LocalCluster cluster = new LocalCluster();		cluster.submitTopology("WordCountTopolgy", conf, builder.createTopology()); //3、提交任务  -----两种模式 本地模式和集群模式    //这里将拓扑名称写死了,所以在集群上打包运行的时候,不用写拓扑名称了!也可用arg[0]//StormSubmitter.submitTopology("mywordcount", config, topologyBuilder.createTopology());//LocalCluster localCluster = new LocalCluster();     //localCluster.submitTopology("mywordcount",config,topologyBuilder.createTopology());		Thread.sleep(10000);		cluster.shutdown();	}}
MySpout.java
/** * File name:MySpout.java * Creation date :2020年10月31日 * Copyright (c) 2019 Oceanchen * All rights reserved. */package spout;import java.util.Map;import java.util.Random;import backtype.storm.spout.SpoutOutputCollector;import backtype.storm.task.OutputCollector;import backtype.storm.task.TopologyContext;import backtype.storm.topology.IRichSpout;import backtype.storm.topology.OutputFieldsDeclarer;import backtype.storm.topology.base.BaseBasicBolt;import backtype.storm.topology.base.BaseRichBolt;import backtype.storm.tuple.Fields;import backtype.storm.tuple.Tuple;import backtype.storm.tuple.Values;public class MySpout implements IRichSpout {   	private SpoutOutputCollector collector = null;	private String[] Strings = {    "the cow jumped over the moon", "an apple a day keeps the doctor away",			"four score and seven years ago", "snow white and the seven dwarfs" };//	让该发射只执行两次	private int icount= 0;//	接收参数,定义发射器对象,然后接收参数中的发射器	public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {   		this.collector = collector;	}	public void nextTuple() {   		icount++;		if (icount<=2) {   			String wordString = Strings[new Random().nextInt(Strings.length)];//			System.err.println(wordString);			collector.emit(new Values(wordString));		} else {   					}	}	public void declareOutputFields(OutputFieldsDeclarer declarer) {   //		描述传输的tuple的key依次是什么		declarer.declare(new Fields("word"));	}	public void ack(Object msgId) {   		// TODO Auto-generated method stub	}	public void fail(Object msgId) {   		// TODO Auto-generated method stub	}	public Map
getComponentConfiguration() { // TODO Auto-generated method stub return null; } public void close() { // TODO Auto-generated method stub } public void activate() { // TODO Auto-generated method stub } public void deactivate() { // TODO Auto-generated method stub }}
SpiltBolt.java
/** * File name:SpiltBolt.java * Creation date :2020年10月31日 * Copyright (c) 2019 Oceanchen * All rights reserved. */package bolt;import java.util.Map;import backtype.storm.task.OutputCollector;import backtype.storm.task.TopologyContext;import backtype.storm.topology.IRichBolt;import backtype.storm.topology.OutputFieldsDeclarer;import backtype.storm.tuple.Fields;import backtype.storm.tuple.Tuple;import backtype.storm.tuple.Values;public class SpiltBolt implements IRichBolt {   	private OutputCollector collector=null;	public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {   		this.collector=collector;	}	public void execute(Tuple input) {   		String wordString=input.getString(0);		String[] words = wordString.split(" ");        for(String word : words){               word = word.trim();                        Values values = new Values(word);//            System.err.println(values);            collector.emit(values);        }        	}	public void declareOutputFields(OutputFieldsDeclarer declarer) {   		declarer.declare(new Fields("word"));	}	public Map
getComponentConfiguration() { // TODO Auto-generated method stub return null; } public void cleanup() { // TODO Auto-generated method stub }}
WordCount.java
/** * File name:WordCount.java * Creation date :2020年10月31日 * Copyright (c) 2019 Oceanchen * All rights reserved. */package bolt;import java.util.HashMap;import java.util.Iterator;import java.util.Map;import java.util.Map.Entry;import java.util.Set;import backtype.storm.task.OutputCollector;import backtype.storm.task.TopologyContext;import backtype.storm.topology.IRichBolt;import backtype.storm.topology.OutputFieldsDeclarer;import backtype.storm.tuple.Fields;import backtype.storm.tuple.Tuple;import backtype.storm.tuple.Values;public class WordCount implements IRichBolt {   	private OutputCollector collector = null;	Map
map = new HashMap
(); private volatile boolean edit = false; public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { this.collector = collector; } public void execute(Tuple input) { String key = input.getString(0); Integer value = map.get(key); if (value == null) { value = 0; } value++; map.put(key, value); Values values = new Values(key, value); System.err.println(values);// collector.emit(values);// collector.ack(input); } public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("word", "count")); } public void cleanup() { //在主线程的cluster执行shutdown之后就会调用该方法 Set
keySet = map.keySet(); Iterator
it1 = keySet.iterator(); while(it1.hasNext()){ String ID = it1.next(); Integer integer = map.get(ID); System.out.println(ID+"单词的次数为"+integer.intValue()); } } public Map
getComponentConfiguration() { // TODO Auto-generated method stub return null; }}

LambdaTopology

/** * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements.  See the NOTICE file distributed with * this work for additional information regarding copyright ownership.  The ASF licenses this file to you under the Apache License, Version * 2.0 (the "License"); you may not use this file except in compliance with the License.  You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions * and limitations under the License. */package org.apache.storm.starter;import java.io.Serializable;import java.util.UUID;import org.apache.storm.Config;import org.apache.storm.topology.ConfigurableTopology;import org.apache.storm.topology.TopologyBuilder;import org.apache.storm.tuple.Values;public class LambdaTopology extends ConfigurableTopology {       public static void main(String[] args) {           ConfigurableTopology.start(new LambdaTopology(), args);    }    @Override    protected int run(String[] args) throws Exception {           TopologyBuilder builder = new TopologyBuilder();        // example. spout1: generate random strings        // bolt1: get the first part of a string        // bolt2: output the tuple         //示例。  spout1:生成随机字符串         // bolt1:获取字符串的第一部分         // bolt2:输出元组        // NOTE: Variable used in lambda expression should be final or effectively final        // (or it will cause compilation error),        // and variable type should implement the Serializable interface if it isn't primitive type        // (or it will cause not serializable exception).        Prefix prefix = new Prefix("Hello lambda:");        String suffix = ":so cool!";        int tag = 999;//spout就是发送 序列化的一个utils类生成的uuid:例如dd813a25-290d-4d43-a5b0-d35a99ee08c9        builder.setSpout("spout1", () -> UUID.randomUUID().toString());        //bolt就是通过 - 进行对uuid进行分割,然后加上静态变量再传递出去        builder.setBolt("bolt1", (tuple, collector) -> {               String[] parts = tuple.getStringByField("lambda").split("\\-");            collector.emit(new Values(prefix + parts[0] + suffix, tag));        }, "strValue", "intValue").shuffleGrouping("spout1");        //该bolt只是进行打印传递的tuple        builder.setBolt("bolt2", tuple -> System.out.println(tuple)).shuffleGrouping("bolt1");        Config conf = new Config();        conf.setDebug(true);        conf.setNumWorkers(2);        return submit("lambda-demo", conf, builder);    }}class Prefix implements Serializable {       private String str;    public Prefix(String str) {           this.str = str;    }    @Override    public String toString() {           return this.str;    }}

RollingTopWords

此拓扑对前N个字进行连续计算,拓扑在基数方面已经看到。 前N个计算是在完全可扩展的方式,并且可以使用类似的方法进行计算热门话题或Twitter上的图片趋势。

你可能感兴趣的文章
MySQL 常见的 9 种优化方法
查看>>
MySQL 常见的开放性问题
查看>>
Mysql 常见错误
查看>>
mysql 常见问题
查看>>
MYSQL 幻读(Phantom Problem)不可重复读
查看>>
mysql 往字段后面加字符串
查看>>
mysql 快照读 幻读_innodb当前读 与 快照读 and rr级别是否真正避免了幻读
查看>>
MySQL 快速创建千万级测试数据
查看>>
mysql 快速自增假数据, 新增假数据,mysql自增假数据
查看>>
MySql 手动执行主从备份
查看>>
Mysql 批量修改四种方式效率对比(一)
查看>>
Mysql 报错 Field 'id' doesn't have a default value
查看>>
MySQL 报错:Duplicate entry 'xxx' for key 'UNIQ_XXXX'
查看>>
Mysql 拼接多个字段作为查询条件查询方法
查看>>
mysql 排序id_mysql如何按特定id排序
查看>>
Mysql 提示:Communication link failure
查看>>
mysql 插入是否成功_PDO mysql:如何知道插入是否成功
查看>>
Mysql 数据库InnoDB存储引擎中主要组件的刷新清理条件:脏页、RedoLog重做日志、Insert Buffer或ChangeBuffer、Undo Log
查看>>
mysql 数据库中 count(*),count(1),count(列名)区别和效率问题
查看>>
mysql 数据库备份及ibdata1的瘦身
查看>>