同步UKafka数据到UHadoop

(以下文件目录来历可参考上一小节)

- 准备jar包

由于UHadoop集群使用hadoop-2.6.0-cdh5.4.9版本。所以,需要将hadoop-hdfs-2.6.0-cdh5.4.9.jar拷贝到上一步中下载的apache/flume/1.6.0-bin/lib目录下。

jar包可以从UHadoop集群的/home/hadoop/lib/lib/hadoop-hdfs-2.6.0-cdh5.4.9.jar拷贝,或者点此下载

注解:如果是自建集群,请拷贝相应版本jar包到相同目录。

- 配置文件

在cong/flume-conf.properties中添加以下配置:

agent.sources = seqGenSrc
agent.channels = memoryChannel
agent.sinks = hdfsSink

# source的来源通过kafka获取
# 请参考https://flume.apache.org/FlumeUserGuide.html#kafka-source
agent.sources.seqGenSrc.type = org.apache.flume.source.kafka.KafkaSource

#kafka zookeeper 地址
agent.sources.seqGenSrc.zookeeperConnect = ip1:2181,ip2:2181,ip3:2181
agent.sources.seqGenSrc.topic = flume_kafka_sink
agent.sources.seqGenSrc.groupId = flume
agent.sources.seqGenSrc.interceptors = i1
agent.sources.seqGenSrc.interceptors.i1.type = timestamp
agent.sources.seqGenSrc.kafka.consumer.timeout.ms = 100

# 为soure绑定channel
agent.sources.seqGenSrc.channels = memoryChannel

# sink到hdfs
agent.sinks.hdfsSink.type = hdfs

# sink到hdfs的地址
agent.sinks.hdfsSink.hdfs.path = hdfs://uhadoop-YYYYYY-master1:8020/kafka/%{topi c}/%y-%m-%d
agent.sinks.hdfsSink.hdfs.rollInterval = 0
agent.sinks.hdfsSink.hdfs.rollSize = 134217728
agent.sinks.hdfsSink.hdfs.rollCount = 0
agent.sinks.hdfsSink.hdfs.rollInterval = 0
agent.sinks.hdfsSink.hdfs.minBlockReplicas = 1 agent.sinks.hdfsSink.hdfs.writeFormat = Text
agent.sinks.hdfsSink.hdfs.fileType = DataStream
agent.sinks.hdfsSink.hdfs.batchSize = 1000
agent.sinks.hdfsSink.hdfs.threadsPoolSize = 100

# 指定从哪个channel sink数据
agent.sinks.hdfsSink.channel = memoryChannel

# channel的配置,将Source接收到数据的一个缓冲到内存中。
# 详细说明请参考https://flume.apache.org/FlumeUserGuide.html#memory-channel
agent.channels.memoryChannel.type = memory
agent.channels.memoryChannel.capacity = 10000
agent.channels.memoryChannel.transactionCapacity = 1500

1.需要将ip1、ip2、ip3换成自己kafka集群节点ip,YYYYYY修改为具体的uhadoop集群id;
2.需要为UKafka集群的节点添加UHadoop集群的host

- 启动命令

./bin/flume-ng agent -n agent -c conf -f conf/flume-conf.properties

- 执行结果

可以在hdfs上看到上传的文件

[hadoop@uhadoop-YYYYYY-master1 root]$ hdfs dfs -ls -R /kafka
drwxrwxrwt  -   root    supergroup  0   2016-03-12 18:48    /kafka/flume_kafka_sink
drwxrwxrwt  -   root    supergroup  0   2016-03-12 18:48    /kafka/flume_kafka_sink/16-06-12
-rw-r--r--  3   root    supergroup  6   2016-03-12 18:48    /kafka/flume_kafka_sink/16-06-12/FlumeData.1457779695244.tmp