使用Flume+UKafka
1. Flume简介
Flume是一个高可用的,高可靠的,分布式的海量日志采集、聚合和传输的系统,Flume支持在日志系统中定制各类数据发送方,用于收集数据;同时,Flume提供对数据进行简单处理,并写到各种数据接受方(可定制)的能力。
- Flume event
Flume event是Flume自定义的数据传输格式。
- Agent
启动的一个Flume进程。
- Source
Flume Agent的Source用于接收外部数据源发送过来的数据(例如上例中的Web Server)。需要注意的是,外部数据源发送的数据必须满足Agent Source定义的数据源格式。比如,对于Avro Source,那么这个Source仅接收Avro格式的数据。外部数据源可以通过Avro Client的方式给Agent的Source发送数据;Avro Source也可以接收其它Agent的Avro Sink发送的Avro Event数据。
- Channel
Agent Channel是Agent Source接收到数据的一个缓冲。数据在被消费前(写入到Sink),对读取到的数据进行缓冲。一个Source可以写到多个Channel。
- Sink
从Challe的缓冲中取出数据,存储到最终流向的地方。
2. 下载安装
以flume-1.6.0版本为例:
- 下载并解压文件
wget http://apache.fayea.com/flume/1.6.0/apache-flume-1.6.0-bin.tar.gz tar xf apache-flume-1.6.0-bin.tar.gz
- 修改配置文件
在apache-flume-1.6.0-bin/conf/flume-env.sh中添加JAVA_OPTS
export JAVA_OPTS="-Xms100m -Xmx1024m -Dcom.sun.management.jmxremote"
3. 抓取本地日志到UKafka
- 修改配置文件
在cong/flume-conf.properties.sink.kafka中添加以下配置:
# 自定义一个source channel和sink agent.sources = seqGenSrc agent.channels = memoryChannel agent.sinks = kafkaSink # source的来源通过unix命令方式获取 # 参考 https://flume.apache.org/FlumeUserGuide.html#exec-source agent.sources.seqGenSrc.type = exec agent.sources.seqGenSrc.command = tail -f /tmp/access.log # 为source绑定一个channel agent.sources.seqGenSrc.channels = memoryChannel # sink source的内容到kafka # 详细内容请参考 https://flume.apache.org/FlumeUserGuide.html#kafka-sink agent.sinks.kafkaSink.type = org.apache.flume.sink.kafka.KafkaSink agent.sinks.kafkaSink.topic = flume_kafka_sink # kafka brokers地址 agent.sinks.kafkaSink.brokerList = ip1:9092,ip2:9092,ip3:9092 agent.sinks.kafkaSink.batchSize = 20 agent.sinks.kafkaSink.partition.key=region agent.sinks.kafkaSink.partitioner.class=org.apache.flume.plugins.SinglePartition # 为sink绑定一个 channel. agent.sinks.kafkaSink.channel = memoryChannel # channel的配置,将Source接收到数据的一个缓冲到内存中 # 详细说明请参考 https://flume.apache.org/FlumeUserGuide.html#memory-channel agent.channels.memoryChannel.type = memory agent.channels.memoryChannel.capacity = 10000 agent.channels.memoryChannel.transactionCapacity = 1500
注解:需要将ip换成自己kafka集群ip
- 启动命令
./bin/flume-ng agent -n agent -c conf -f conf/flume-conf.properties.sink.kafka
- 查看结果
在kafka集群上执行:
kafka-console-consumer.sh --zookeeper ip:2181 --topic flume_kafka_sink
注解:需要将ip换成自己kafka集群节点ip
可以看到上传的日志内容。