Storm消费kafka消息
- 安装Strom依赖库
下载并解压Storm发行版本
这个版本带有kafka相关操作的完整依赖包。
- 修改storm.yaml配置文件
Storm发行版本的解压目录下有一个conf/storm.yaml文件,用于配置Storm。默认配置可以在github查看。conf/storm.yaml中的配置选项将覆盖defaults.yaml中的默认配置。以下配置选项是必须在conf/storm.yaml中进行配置的:
1.storm.zookeeper.servers
Storm集群使用的Zookeeper集群地址,其格式如下:
storm.zookeeper.servers:
"111.222.333.444"
"555.666.777.888"
如果Zookeeper集群使用的不是默认端口,那么还需要配置storm.zookeeper.port
2.storm.local.dir
Nimbus和Supervisor进程用于存储少量状态,如jars、confs等的本地磁盘目录,需要提前创建该目录并给以足够的访问权限。然后在storm.yaml中配置该目录,如:
storm.local.dir: "/data/storm"
3.nimbus.host
Storm集群的Nimbus机器地址。各个Supervisor工作节点需要知道哪个机器是Nimbus,以便下载Topologies的jars、confs等文件,如:
nimbus.host: "111.222.333.444"
4.supervisor.slots.ports
对于每个Supervisor工作节点,需要配置该工作节点可以运行的worker数量和每个worker使用的端口。默认情况下,每个节点上可运行4个workers,分别在6700、6701、6702、6703端口,如:
supervisor.slots.ports:
6700
6701
6702
6703
- 启动Storm各个后台进程
最后一步,启动Storm的所有后台进程。和Zookeeper一样,Storm也是快速失败(fail-fast)的系统。这样Storm才能在任意时刻被停止,并且当进程重启后被正确地恢复执行。这也是为什么Storm不在进程没保存状态的原因。即使Nimbus或Supervisors被重启,运行中的Topologies不会受到影响。
以下是启动Storm各个后台进程的方式:
1.Nimbus
在Storm主控节点上执行
bin/storm nimbus > /dev/null 2>&1 &"
2.Supervisor
在Storm各个工作节点上执行
bin/storm supervisor > /dev/null 2>&1 &"
3.UI
在Storm主控节点上执行
bin/storm ui > /dev/null 2>&1 &"
启动后,可通过http:\\/\\/{nimbus host}:8080观察集群的worker资源使用情况、Topologies的运行状态等信息。
注解:
1.Storm后台进程被启动后,将在Storm、安装部署目录下的logs/子目录生成各个进程的日志文件;
2.Storm UI必须和Storm Nimbus部署在同一台机器上。否则,Storm UI无法正常工作。
至此,Storm集群已经部署、配置完毕。
Storm UI展示界面:
- Java代码
1.topology.java
- topology.java
import java.util.HashMap; import java.util.Map; import backtype.storm.Config; import backtype.storm.LocalCluster; import backtype.storm.StormSubmitter; import backtype.storm.spout.SchemeAsMultiScheme; import backtype.storm.topology.TopologyBuilder; import storm.kafka.BrokerHosts; import storm.kafka.KafkaSpout; import storm.kafka.SpoutConfig; import storm.kafka.ZkHosts; import storm.kafka.bolt.KafkaBolt; public class topology { public static void main(String [] args) throws Exception{ //配置 zookeeper 主机:端口号 BrokerHosts brokerHosts = new ZkHosts("110.64.76.130:2181,110.64.76.131:2181, 110.64.76.132:2181"); //接收消息队列的主题 String topic="test"; //zookeeper 设置文件中的配置,如果 zookeeper 配置文件中设置为主机 名:端口号 ,该项为空 String zkRoot=""; //任意 String spoutId="test_consumer_group"; SpoutConfig spoutConfig = new SpoutConfig(brokerHosts, topic, zkRoot, spoutId); //设置如何处理 kafka 消息队列输入流 spoutConfig.scheme=new SchemeAsMultiScheme(new MessageScheme()); //从 offset 最小值开始读取数据,否则从启动后开始读 spoutConfig.forceFromStart=true; Config conf=new Config(); //不输出调试信息 conf.setDebug(false); //设置一个spout task中处于pending状态的最大的tuples数量 conf.put(Config.TOPOLOGY_MAX_SPOUT_PENDING, 1); Map<String, String> map=new HashMap<String,String>(); //配置 Kafka broker 地址 map.put("metadata.broker.list", "master:9092,slave1:9092,slave2:9092"); // serializer.class为消息的序列化类 map.put("serializer.class", "kafka.serializer.StringEncoder"); conf.put("kafka.broker.properties", map); // 配置 KafkaBolt 生成的 topic conf.put("topic", "receiver"); TopologyBuilder builder = new TopologyBuilder(); builder.setSpout("spout", new KafkaSpout(spoutConfig),1); builder.setBolt("bolt1", new QueryBolt(),1).setNumTasks(1).shuffleGrouping("spout"); builder.setBolt("bolt2", new KafkaBolt<String, String>(),1).setNumTasks(1).shuffleGrouping("bolt1"); String name= topology.class.getSimpleName(); if (args != null && args.length > 0) { // Nimbus host name passed from command line conf.put(Config.NIMBUS_HOST, args[0]); conf.setNumWorkers(3); StormSubmitter.submitTopologyWithProgressBar(name, conf, builder.createTopology()); } else { conf.setMaxTaskParallelism(3); LocalCluster cluster = new LocalCluster(); cluster.submitTopology(name, conf, builder.createTopology()); Thread.sleep(60000); cluster.shutdown(); } } }
2.MessageScheme.java
- MessageScheme.java
import java.io.UnsupportedEncodingException; import java.util.List; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import backtype.storm.spout.Scheme; import backtype.storm.tuple.Fields; import backtype.storm.tuple.Values; public class MessageScheme implements Scheme { private static final Logger LOGGER = LoggerFactory.getLogger(MessageScheme.class); public List<Object> deserialize(byte[] ser) { try { //从 kafka 中读取的值直接序列化为 UTF-8 的 str String mString=new String(ser, "UTF-8"); return new Values(mString); } catch (UnsupportedEncodingException e) { // TODO Auto-generated catch block LOGGER.error("Cannot parse the provided message"); } return null; } public Fields getOutputFields() { // TODO Auto-generated method stub return new Fields("msg"); } }
3.QueryBolt.java
- QueryBolt.java
import java.io.FileNotFoundException; import java.io.FileOutputStream; import java.io.IOException; import java.io.PrintStream; import java.util.ArrayList; import java.util.List; import java.util.Map; import backtype.storm.task.OutputCollector; import backtype.storm.task.TopologyContext; import backtype.storm.topology.IRichBolt; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.tuple.Fields; import backtype.storm.tuple.Tuple; import backtype.storm.tuple.Values; public class QueryBolt implements IRichBolt { List<String> list; OutputCollector collector; public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { list=new ArrayList<String>(); this.collector=collector; } public void execute(Tuple input) { // TODO Auto-generated method stub String str=(String) input.getValue(0); //将 str 加入到 list list.add(str); //发送 ack collector.ack(input); //发送该 str collector.emit(new Values(str)); } public void cleanup() {//topology被killed时调用 //将 list 的值写入到文件 try { FileOutputStream outputStream = new FileOutputStream("/data/" + this + ".txt"); PrintStream p = new PrintStream(outputStream); p.println("begin!"); p.println(list.size()); for(String tmp:list){ p.println(tmp); } p.println("end!"); try { p.close(); outputStream.close(); } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); } } catch (FileNotFoundException e) { // TODO Auto-generated catch block e.printStackTrace(); } } public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("message")); } public Map<String, Object> getComponentConfiguration() { // TODO Auto-generated method stub return null; } }
- 编译
从上面下载的storm包中找到下面3个包,导入到工程中:
编译后导出jar包。
- 拷贝集群运行依赖包到默认路径
cp /usr/local/kafka/libs/* /storm-kafka/storm-kafka-0.9.2-incubating.jar apache-storm-0.9.2-incubating/lib/
- 启动Storm Topology
bin/storm jar /data/sjwc.jar topology
其中,sjwc.jar是包含Topology实现代码的jar包,topology的main方法是Topology的入口。
运行结果可以在/data/下看到:
[root@uhadoop-XXXXXX-master1 data]# more QueryBolt@78ae427c.txt begin! 94 jjjlk;a lkkj ivl ;okv 2 7 8 9 6 3
94是单词个数,下面是topic中所有内容。
- 停止Storm Topology
storm kill {topname}
注解:{topname}为Topology提交到Storm集群时指定的Topology任务名称。