本文共 23875 字,大约阅读时间需要 79 分钟。
strom学习(二)——storm源码解析与wordcount案例解析
https://blog.csdn.net/livan1234/article/details/81750091https://blog.csdn.net/it_freshman/article/details/83538152
https://github.com/apache/storm
storm-starter包含使用Storm的各种示例。如果这是您第一次使用Storm,请首先检查以下拓扑:
ExclamationTopology:用所有Java语言编写的基本拓扑
WordCountTopology:通过在Python中实现一个螺栓来利用多语言的基本拓扑 ReachTopology:Storm之上的复杂DRPC的示例 LambdaTopology:使用Java8 lambda表达式编写喷口/螺栓的示例 在熟悉了这些拓扑之后,请查看src / jvm / org / apache / storm / starter /中的其他拓扑, 例如RollingTopWords, 以获取更多高级实现。参考:https://blog.csdn.net/wuxintdrh/article/details/60866625 https://blog.csdn.net/yanhongbin1993/article/details/81951970
/** * File name:ExclamationTopolgy.java * Creation date :2020年11月2日 * Copyright (c) 2019 Oceanchen * All rights reserved. */package Topology;import java.util.Map;import backtype.storm.Config;import backtype.storm.LocalCluster;import backtype.storm.StormSubmitter;import backtype.storm.task.OutputCollector;import backtype.storm.task.TopologyContext;import backtype.storm.testing.TestWordSpout;import backtype.storm.topology.OutputFieldsDeclarer;import backtype.storm.topology.TopologyBuilder;import backtype.storm.topology.base.BaseRichBolt;import backtype.storm.tuple.Fields;import backtype.storm.tuple.Tuple;import backtype.storm.tuple.Values;import backtype.storm.utils.Utils;/** * @author Oceanchen * * @see 2020年11月2日 * * @description */ /* * 这个Topology包含一个Spout和两个Bolt,逻辑拓扑为线性结构 * Spout发射单词, 每个bolt在每个单词后面加个”!!!”,调用同一个bolt类 */ public class FirstTopology { /* 实现IRichBolt接口,功能就是加感叹号,在Bolt组件中有两个方法是我们常用的,prepare方法主要做一些准备工作, * 当该组件的任务在群集中的工作程序中初始化时调用。它为Bolt提供执行的环境,并给它提供一个Outputcollector(输出集合)用来发射tuple; * 第二个是execute方法,它用来处理一个单独的输入元组,在这个方法里用户可以定义想要实现的计算功能。*/ public static class ExclamationBolt extends BaseRichBolt { //首先创建一个输出集合用来存放和调用发射消息方法 OutputCollector _collector; /* * 接下来重写函数prepare,当该组件的任务在集群中初始化时调用。它需要输入三个参数,第一个conf是这个Bolt的storm配置文件中的配置,这里prepare方法只是简单地把OutputCollector作为一个类字段保存下来给后面execute方法使用,也就是初始化发射器; * 第二个是TopologyContext类类型context,用于获取有关拓扑中该任务位置的信息,包括任务ID和组件ID,输入和输出信息等; * 第三个是OutputcoCollector输出集合类型的发射器collector,该发射器用于从该Bolt发出元组Tuple。 * */ public void prepare(Map conf, TopologyContext context, OutputCollector collector) { _collector = collector; } /* * 下一步重写函数execute,它只有一个输入参数就是要处理的消息输入元组tuple,此处execute方法从bolt的一个输入接收tuple(一个bolt可能有多个输入源)。 */ public void execute(Tuple tuple) { // tuple为输入的数据,ExclamationBolt类逻辑就是获取tuple的第一个字段,在它后面加三个感叹号作为新的值被集合发射给下一个Bolt _collector.emit(tuple, new Values(tuple.getString(0) + "!!!")); // 这个ack是storm的一种消息传输保证机制,简单说就是监督tuple有没有在各个组件之间正常传输。 _collector.ack(tuple); } /* * 前面我们讲过源源不断的消息tuple会形成数据流,各个组件之间的消息是以流的形式传递的, * 所以最后我们还应该声明此拓扑流的输出模式,declarer用于声明输出流ID,输出字段以及每个输出流是否是直接流等,此处是声明了一个字段“word”用于下一个组件进行消息流的识别 */ public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("word")); } } /* 至此,Bolt组件就定义完了,有了发射组件类spout,执行组件类Bolt,接下来我们开始实例化并且创建拓扑提交器了。 */ public static void main(String[] args) throws Exception { // 创建拓扑构建器对象 TopologyBuilder builder = new TopologyBuilder(); // 接下来创建Spout对象,取名为word,并行度设置为10,意思是storm处理该拓扑时给该Spout分10个线程来运行它 /* * 这个例子的Spout组件是调用的storm-core包里的一个TestWordSpout类,它主要是随机发射一个单词 */ builder.setSpout("word", new TestWordSpout(), 10);//创建Bolt,该Bolt的名字是exclaim1,分配三个线程来处理它,它的上游是名为“word”的Spout(即,接收名为word的Spout的数据),spout和这个bolt之间流的分组方式是随机分组。这个例子只用定义一个Bolt组件ExclaimtionBolt,它继承自BaseRichBolt, builder.setBolt("exclaim1", new ExclamationBolt(), 3).shuffleGrouping("word");//创建第二个Bolt,该Bolt的名字是exclaim2,分配两个线程来处理它,它的上游是名为“Exclamation”的Bolt,这两个bolt之间流的分组方式也是随机分组。 builder.setBolt("exclaim2", new ExclamationBolt(), 2).shuffleGrouping("exclaim1"); // 创建输入的配置信息,它使用的是storm的配置文件,并设置为debug模式。这写的是一个简单的例子,如果要运行一些复杂的例子,需要在storm的配置文件中加入自己开发的拓扑需要用到的的配置信息。 Config conf = new Config(); conf.setDebug(true); /* * 接下来就是提交拓扑了,storm的运行有两种模式: 本地模式和分布式模式. * 在本地模式中,storm用一个进程里面的线程来模拟所有的spout和bolt. 本地模式对开发和测试来说比较有用。 * 在分布式模式下, storm由一堆机器组成。当你提交topology给主节点的时候,你同时也把topology的代码提交了。主节点负责分发你的代码并且负责给你的topolgoy分配工作进程。如果一个工作进程挂掉了,它会把任务重新分配到其它节点。 */ if (args != null && args.length > 0) { //如果需要传递参数(这个是storm内部处理参数,是用clojure语言写的,如果是集群模式提交拓扑的clojure代码里会有参数args),说明在集群模式上提交,//然后创建三个进程来执行此拓扑 conf.setNumWorkers(3); StormSubmitter.submitTopologyWithProgressBar(args[0], conf, builder.createTopology()); // args是集群提交的一个参数 } else { // 本地模式通过定义一个LocalCluster对象来定义一个进程内的集群。提交topology给这个虚拟的集群和提交topology给分布式集群是一样的。通过调用submitTopology方法来提交topology,它接受三个参数:要运行的topology的名字,一个配置对象以及要运行的topology本身。topology的名字是用来唯一区别一个topology的,这样你然后可以用这个名字来杀死这个topology的。前面已经说过了,你必须显式的杀掉一个topology,否则它会一直运行。Conf对象可以配置很多东西。 LocalCluster cluster = new LocalCluster(); cluster.submitTopology("test", conf, builder.createTopology()); Utils.sleep(10000); // 利用java中的utils类的线程休眠方法用来睡眠一段时间 cluster.killTopology("test"); // 这个句子是用来杀死拓扑 cluster.shutdown(); // 关闭集群,此时可以调用cleanup方法 } } }
参考: https://blog.csdn.net/iaprogramer/article/details/51669415 https://www.bilibili.com/video/BV1R7411q7rr?p=2 https://www.cnblogs.com/hseagle/p/3505938.html?utm_source=tuicool&utm_medium=referral
/** * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions * and limitations under the License. */package org.apache.storm.starter;import java.util.HashMap;import java.util.Map;import org.apache.storm.starter.spout.RandomSentenceSpout;import org.apache.storm.task.ShellBolt;import org.apache.storm.topology.BasicOutputCollector;import org.apache.storm.topology.ConfigurableTopology;import org.apache.storm.topology.IRichBolt;import org.apache.storm.topology.OutputFieldsDeclarer;import org.apache.storm.topology.TopologyBuilder;import org.apache.storm.topology.base.BaseBasicBolt;import org.apache.storm.tuple.Fields;import org.apache.storm.tuple.Tuple;import org.apache.storm.tuple.Values;/** * 驱动类 * 构造一个任务拓扑图 * 拓扑使用的类都尽量实现序列化接口java.io.serializable * supervisor和niumbus进行需要把jar包进行序列化和反序列化 * This topology demonstrates Storm's stream groupings and multilang * capabilities. */public class WordCountTopology extends ConfigurableTopology { public static void main(String[] args) throws Exception { ConfigurableTopology.start(new WordCountTopology(), args); } protected int run(String[] args) throws Exception { //使用storm api中TopologyBuilder TopologyBuilder builder = new TopologyBuilder();// 指定拓扑的spout组件 id,随机生成单词的soupt builder.setSpout("spout", new RandomSentenceSpout(), 5);//指定id,shuffleGrouping是构造两个组件的边,指定bolt连接的是哪个spout,涉及到数据分组问题// 如果下游处理单元bolt有多个,那么spout发送的tuple如何发到这多个bolt中// shuffleGrouping随机,使下游多个bolt能够平均得到tuple builder.setBolt("split", new SplitSentence(), 8).shuffleGrouping("spout");// 就是这个countbolt是从split里接受数据的 builder.setBolt("count", new WordCount(), 12).fieldsGrouping("split", new Fields("word")); conf.setDebug(true); String topologyName = "word-count"; conf.setNumWorkers(3); if (args != null && args.length > 0) { topologyName = args[0]; } return submit(topologyName, conf, builder); }//bolt组件,实现IRichBolt接口或者继承basebolt public static class SplitSentence extends ShellBolt implements IRichBolt { //parper()方法类似spout的open() public SplitSentence() { // 调用splitsentence.py进行对单词进行分割,并且利用发射器的emit方法进行发送tuple super("python", "splitsentence.py"); } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("word")); } @Override public MapgetComponentConfiguration() { return null; } }//指定wordcount这个bolt组件 public static class WordCount extends BaseBasicBolt { Map counts = new HashMap ();//相当于spout里面的nexttuple,循环调用// BasicOutputCollector就是bolt的数据发射器 @Override public void execute(Tuple tuple, BasicOutputCollector collector) { //获取到“word” String word = tuple.getString(0);// 从map的counts里面获取word的次数,看map中是否含有过这个单词的key,没有则为0 Integer count = counts.get(word); if (count == null) { count = 0; }// 如果有的话直接加1 count++; counts.put(word, count);// 继续发送tuple到后面的bolt,发送两个map collector.emit(new Values(word, count)); }//declareOutputFields声明向后发射的key是什么 @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("word", "count")); } }}
/** * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions * and limitations under the License. */package org.apache.storm.starter.spout;import java.text.SimpleDateFormat;import java.util.Date;import java.util.Map;import java.util.Random;import org.apache.storm.spout.SpoutOutputCollector;import org.apache.storm.task.TopologyContext;import org.apache.storm.topology.OutputFieldsDeclarer;import org.apache.storm.topology.base.BaseRichSpout;import org.apache.storm.tuple.Fields;import org.apache.storm.tuple.Values;import org.apache.storm.utils.Utils;import org.slf4j.Logger;import org.slf4j.LoggerFactory;//编写spout组件//继承BaseRichSpout,这个抽象类继承BaseRichSpout实现了IRichSpout接口//所以可以实现IRichSpout接口 或者 继承继承BaseRichSpoutpublic class RandomSentenceSpout extends BaseRichSpout { private static final Logger LOG = LoggerFactory.getLogger(RandomSentenceSpout.class); SpoutOutputCollector _collector; Random _rand;//open方法 是spout组件的初始化方法,是spout实例被创建后首先被调用的方法,只初始化调用一次//Mapconf 是storm里的storm.yaml文件的项写到map里// SpoutOutputCollector:tuple数据的发射器 创建类的实例变量,为了给后面的bolt传送数据用这个类 @Override public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) { _collector = collector; _rand = new Random(); }//nextTuple 实现如何从数据源上获取数据的逻辑,以及后面的组件bolt发送数据,循环调用 @Override public void nextTuple() { Utils.sleep(100); String[] sentences = new String[]{ sentence("the cow jumped over the moon"), sentence("an apple a day keeps the doctor away"), sentence("four score and seven years ago"), sentence("snow white and the seven dwarfs"), sentence("i am at two with nature") }; final String sentence = sentences[_rand.nextInt(sentences.length)]; LOG.debug("Emitting tuple: {}", sentence); _collector.emit(new Values(sentence)); } protected String sentence(String input) { return input; }//消息可靠保障机制(数据仅处理一次)当某个tuple在拓扑上处理成功后,调用ack方法执行一些消息处理成功后该干的事情//可以将tuple先存入到一个map中,当该拓扑处理成功后,就在map中移出掉 @Override public void ack(Object id) { }//消息处理失败,或者超时以后该干什么,例如重试,重试达到最大的阈值则丢弃 @Override public void fail(Object id) { }//声明向后面组件发射的tuple的key依次是什么 [“word”,“value”] @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("word")); }// 还有 getComponentConfiguration()函数:就是设置该组件spout的一些专用参数:用多少执行者线程跑 // Add unique identifier to each tuple, which is helpful for debugging public static class TimeStamped extends RandomSentenceSpout { private final String prefix; public TimeStamped() { this(""); } public TimeStamped(String prefix) { this.prefix = prefix; } protected String sentence(String input) { return prefix + currentDate() + " " + input; } private String currentDate() { return new SimpleDateFormat("yyyy.MM.dd_HH:mm:ss.SSSSSSSSS").format(new Date()); } }}
# Licensed to the Apache Software Foundation (ASF) under one# or more contributor license agreements. See the NOTICE file# distributed with this work for additional information# regarding copyright ownership. The ASF licenses this file# to you under the Apache License, Version 2.0 (the# "License"); you may not use this file except in compliance# with the License. You may obtain a copy of the License at## http://www.apache.org/licenses/LICENSE-2.0## Unless required by applicable law or agreed to in writing, software# distributed under the License is distributed on an "AS IS" BASIS,# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.# See the License for the specific language governing permissions and# limitations under the License.import stormclass SplitSentenceBolt(storm.BasicBolt): def process(self, tup): words = tup.values[0].split(" ") for word in words: storm.emit([word])SplitSentenceBolt().run()
利用maven,引入storm-core.jar依赖
4.0.0 com.test.maven project01 0.0.1-SNAPSHOT org.apache.storm storm-core 0.9.3 provided commons-lang commons-lang 2.6
/** * File name:testTopolgy.java * Creation date :2020年10月31日 * Copyright (c) 2019 Oceanchen * All rights reserved. */package Topology;import backtype.storm.Config;import backtype.storm.LocalCluster;import backtype.storm.topology.IRichSpout;import backtype.storm.topology.TopologyBuilder;import bolt.SpiltBolt;import bolt.WordCount;import spout.MySpout;//在主类中创建拓扑结构public class testTopolgy { public static void main(String[] args) throws InterruptedException { // 使用topolgyBuilder 类进行创建spout bolt组件 TopologyBuilder builder = new TopologyBuilder(); builder.setSpout("MySpout", new MySpout()); builder.setBolt("SpiltBolt", new SpiltBolt(),2).shuffleGrouping("MySpout"); builder.setBolt("WordCount", new WordCount()) .shuffleGrouping("SpiltBolt"); Config conf = new Config(); conf.setDebug(false); LocalCluster cluster = new LocalCluster(); cluster.submitTopology("WordCountTopolgy", conf, builder.createTopology()); //3、提交任务 -----两种模式 本地模式和集群模式 //这里将拓扑名称写死了,所以在集群上打包运行的时候,不用写拓扑名称了!也可用arg[0]//StormSubmitter.submitTopology("mywordcount", config, topologyBuilder.createTopology());//LocalCluster localCluster = new LocalCluster(); //localCluster.submitTopology("mywordcount",config,topologyBuilder.createTopology()); Thread.sleep(10000); cluster.shutdown(); }}
/** * File name:MySpout.java * Creation date :2020年10月31日 * Copyright (c) 2019 Oceanchen * All rights reserved. */package spout;import java.util.Map;import java.util.Random;import backtype.storm.spout.SpoutOutputCollector;import backtype.storm.task.OutputCollector;import backtype.storm.task.TopologyContext;import backtype.storm.topology.IRichSpout;import backtype.storm.topology.OutputFieldsDeclarer;import backtype.storm.topology.base.BaseBasicBolt;import backtype.storm.topology.base.BaseRichBolt;import backtype.storm.tuple.Fields;import backtype.storm.tuple.Tuple;import backtype.storm.tuple.Values;public class MySpout implements IRichSpout { private SpoutOutputCollector collector = null; private String[] Strings = { "the cow jumped over the moon", "an apple a day keeps the doctor away", "four score and seven years ago", "snow white and the seven dwarfs" };// 让该发射只执行两次 private int icount= 0;// 接收参数,定义发射器对象,然后接收参数中的发射器 public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) { this.collector = collector; } public void nextTuple() { icount++; if (icount<=2) { String wordString = Strings[new Random().nextInt(Strings.length)];// System.err.println(wordString); collector.emit(new Values(wordString)); } else { } } public void declareOutputFields(OutputFieldsDeclarer declarer) { // 描述传输的tuple的key依次是什么 declarer.declare(new Fields("word")); } public void ack(Object msgId) { // TODO Auto-generated method stub } public void fail(Object msgId) { // TODO Auto-generated method stub } public MapgetComponentConfiguration() { // TODO Auto-generated method stub return null; } public void close() { // TODO Auto-generated method stub } public void activate() { // TODO Auto-generated method stub } public void deactivate() { // TODO Auto-generated method stub }}
/** * File name:SpiltBolt.java * Creation date :2020年10月31日 * Copyright (c) 2019 Oceanchen * All rights reserved. */package bolt;import java.util.Map;import backtype.storm.task.OutputCollector;import backtype.storm.task.TopologyContext;import backtype.storm.topology.IRichBolt;import backtype.storm.topology.OutputFieldsDeclarer;import backtype.storm.tuple.Fields;import backtype.storm.tuple.Tuple;import backtype.storm.tuple.Values;public class SpiltBolt implements IRichBolt { private OutputCollector collector=null; public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { this.collector=collector; } public void execute(Tuple input) { String wordString=input.getString(0); String[] words = wordString.split(" "); for(String word : words){ word = word.trim(); Values values = new Values(word);// System.err.println(values); collector.emit(values); } } public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("word")); } public MapgetComponentConfiguration() { // TODO Auto-generated method stub return null; } public void cleanup() { // TODO Auto-generated method stub }}
/** * File name:WordCount.java * Creation date :2020年10月31日 * Copyright (c) 2019 Oceanchen * All rights reserved. */package bolt;import java.util.HashMap;import java.util.Iterator;import java.util.Map;import java.util.Map.Entry;import java.util.Set;import backtype.storm.task.OutputCollector;import backtype.storm.task.TopologyContext;import backtype.storm.topology.IRichBolt;import backtype.storm.topology.OutputFieldsDeclarer;import backtype.storm.tuple.Fields;import backtype.storm.tuple.Tuple;import backtype.storm.tuple.Values;public class WordCount implements IRichBolt { private OutputCollector collector = null; Mapmap = new HashMap (); private volatile boolean edit = false; public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { this.collector = collector; } public void execute(Tuple input) { String key = input.getString(0); Integer value = map.get(key); if (value == null) { value = 0; } value++; map.put(key, value); Values values = new Values(key, value); System.err.println(values);// collector.emit(values);// collector.ack(input); } public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("word", "count")); } public void cleanup() { //在主线程的cluster执行shutdown之后就会调用该方法 Set keySet = map.keySet(); Iterator it1 = keySet.iterator(); while(it1.hasNext()){ String ID = it1.next(); Integer integer = map.get(ID); System.out.println(ID+"单词的次数为"+integer.intValue()); } } public Map getComponentConfiguration() { // TODO Auto-generated method stub return null; }}
/** * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions * and limitations under the License. */package org.apache.storm.starter;import java.io.Serializable;import java.util.UUID;import org.apache.storm.Config;import org.apache.storm.topology.ConfigurableTopology;import org.apache.storm.topology.TopologyBuilder;import org.apache.storm.tuple.Values;public class LambdaTopology extends ConfigurableTopology { public static void main(String[] args) { ConfigurableTopology.start(new LambdaTopology(), args); } @Override protected int run(String[] args) throws Exception { TopologyBuilder builder = new TopologyBuilder(); // example. spout1: generate random strings // bolt1: get the first part of a string // bolt2: output the tuple //示例。 spout1:生成随机字符串 // bolt1:获取字符串的第一部分 // bolt2:输出元组 // NOTE: Variable used in lambda expression should be final or effectively final // (or it will cause compilation error), // and variable type should implement the Serializable interface if it isn't primitive type // (or it will cause not serializable exception). Prefix prefix = new Prefix("Hello lambda:"); String suffix = ":so cool!"; int tag = 999;//spout就是发送 序列化的一个utils类生成的uuid:例如dd813a25-290d-4d43-a5b0-d35a99ee08c9 builder.setSpout("spout1", () -> UUID.randomUUID().toString()); //bolt就是通过 - 进行对uuid进行分割,然后加上静态变量再传递出去 builder.setBolt("bolt1", (tuple, collector) -> { String[] parts = tuple.getStringByField("lambda").split("\\-"); collector.emit(new Values(prefix + parts[0] + suffix, tag)); }, "strValue", "intValue").shuffleGrouping("spout1"); //该bolt只是进行打印传递的tuple builder.setBolt("bolt2", tuple -> System.out.println(tuple)).shuffleGrouping("bolt1"); Config conf = new Config(); conf.setDebug(true); conf.setNumWorkers(2); return submit("lambda-demo", conf, builder); }}class Prefix implements Serializable { private String str; public Prefix(String str) { this.str = str; } @Override public String toString() { return this.str; }}
此拓扑对前N个字进行连续计算,拓扑在基数方面已经看到。 前N个计算是在完全可扩展的方式,并且可以使用类似的方法进行计算热门话题或Twitter上的图片趋势。