Apache Flume 是一个分布式,高可用的数据收集系统。它能够从不一样的数据源收集数据,通过聚合后发送到存储系统中,一般用于日志数据的收集。Flume 分为 NG 和 OG (1.0 以前) 两个版本,NG 在 OG 的基础上进行了彻底的重构,是目前使用最为普遍的版本。下面的介绍均以 NG 为基础。html
下图为 Flume 的基本架构图:git
外部数据源以特定格式向 Flume 发送 events
(事件),当 source
接收到 events
时,它将其存储到一个或多个 channel
,channe
会一直保存 events
直到它被 sink
所消费。sink
的主要功能从 channel
中读取 events
,并将其存入外部存储系统或转发到下一个 source
,成功后再从 channel
中移除 events
。github
1. Eventweb
Evnet
是 Flume NG 数据传输的基本单元。相似于 JMS 和消息系统中的消息。一个 Evnet
由标题和正文组成:前者是键/值映射,后者是任意字节数组。shell
2. Sourceapache
数据收集组件,从外部数据源收集数据,并存储到 Channel 中。数组
3. Channelbash
Channel
是源和接收器之间的管道,用于临时存储数据。能够是内存或持久化的文件系统:服务器
Memory Channel
: 使用内存,优势是速度快,但数据可能会丢失 (如忽然宕机);File Channel
: 使用持久化的文件系统,优势是能保证数据不丢失,可是速度慢。4. Sink架构
Sink
的主要功能从 Channel
中读取 Evnet
,并将其存入外部存储系统或将其转发到下一个 Source
,成功后再从 Channel
中移除 Event
。
5. Agent
是一个独立的 (JVM) 进程,包含 Source
、 Channel
、 Sink
等组件。
Flume 中的每个组件都提供了丰富的类型,适用于不一样场景:
Source 类型 :内置了几十种类型,如 Avro Source
,Thrift Source
,Kafka Source
,JMS Source
;
Sink 类型 :HDFS Sink
,Hive Sink
,HBaseSinks
,Avro Sink
等;
Channel 类型 :Memory Channel
,JDBC Channel
,Kafka Channel
,File Channel
等。
对于 Flume 的使用,除非有特别的需求,不然经过组合内置的各类类型的 Source,Sink 和 Channel 就能知足大多数的需求。在 Flume 官网 上对全部类型组件的配置参数均以表格的方式作了详尽的介绍,并附有配置样例;同时不一样版本的参数可能略有所不一样,因此使用时建议选取官网对应版本的 User Guide 做为主要参考资料。
Flume 支持多种架构模式,分别介绍以下
Flume 支持跨越多个 Agent 的数据传递,这要求前一个 Agent 的 Sink 和下一个 Agent 的 Source 都必须是 Avro
类型,Sink 指向 Source 所在主机名 (或 IP 地址) 和端口(详细配置见下文案例三)。
日志收集中经常存在大量的客户端(好比分布式 web 服务),Flume 支持使用多个 Agent 分别收集日志,而后经过一个或者多个 Agent 聚合后再存储到文件系统中。
Flume 支持从一个 Source 向多个 Channel,也就是向多个 Sink 传递事件,这个操做称之为 Fan Out
(扇出)。默认状况下 Fan Out
是向全部的 Channel 复制 Event
,即全部 Channel 收到的数据都是相同的。同时 Flume 也支持在 Source
上自定义一个复用选择器 (multiplexing selector) 来实现自定义的路由规则。
Flume 配置一般须要如下两个步骤:
<Agent>.sources = <Source> <Agent>.sinks = <Sink> <Agent>.channels = <Channel1> <Channel2> # set channel for source <Agent>.sources.<Source>.channels = <Channel1> <Channel2> ... # set channel for sink <Agent>.sinks.<Sink>.channel = <Channel1>
<Agent>.sources.<Source>.<someProperty> = <someValue> # properties for channels <Agent>.channel.<Channel>.<someProperty> = <someValue> # properties for sinks <Agent>.sources.<Sink>.<someProperty> = <someValue>
为方便你们后期查阅,本仓库中全部软件的安装均单独成篇,Flume 的安装见:
介绍几个 Flume 的使用案例:
需求: 监听文件内容变更,将新增长的内容输出到控制台。
实现: 主要使用 Exec Source
配合 tail
命令实现。
新建配置文件 exec-memory-logger.properties
,其内容以下:
#指定agent的sources,sinks,channels a1.sources = s1 a1.sinks = k1 a1.channels = c1 #配置sources属性 a1.sources.s1.type = exec a1.sources.s1.command = tail -F /tmp/log.txt a1.sources.s1.shell = /bin/bash -c #将sources与channels进行绑定 a1.sources.s1.channels = c1 #配置sink a1.sinks.k1.type = logger #将sinks与channels进行绑定 a1.sinks.k1.channel = c1 #配置channel类型 a1.channels.c1.type = memory
flume-ng agent \ --conf conf \ --conf-file /usr/app/apache-flume-1.6.0-cdh5.15.2-bin/examples/exec-memory-logger.properties \ --name a1 \ -Dflume.root.logger=INFO,console
向文件中追加数据:
控制台的显示:
需求: 监听指定目录,将目录下新增长的文件存储到 HDFS。
实现:使用 Spooling Directory Source
和 HDFS Sink
。
#指定agent的sources,sinks,channels a1.sources = s1 a1.sinks = k1 a1.channels = c1 #配置sources属性 a1.sources.s1.type =spooldir a1.sources.s1.spoolDir =/tmp/logs a1.sources.s1.basenameHeader = true a1.sources.s1.basenameHeaderKey = fileName #将sources与channels进行绑定 a1.sources.s1.channels =c1 #配置sink a1.sinks.k1.type = hdfs a1.sinks.k1.hdfs.path = /flume/events/%y-%m-%d/%H/ a1.sinks.k1.hdfs.filePrefix = %{fileName} #生成的文件类型,默认是Sequencefile,可用DataStream,则为普通文本 a1.sinks.k1.hdfs.fileType = DataStream a1.sinks.k1.hdfs.useLocalTimeStamp = true #将sinks与channels进行绑定 a1.sinks.k1.channel = c1 #配置channel类型 a1.channels.c1.type = memory
flume-ng agent \ --conf conf \ --conf-file /usr/app/apache-flume-1.6.0-cdh5.15.2-bin/examples/spooling-memory-hdfs.properties \ --name a1 -Dflume.root.logger=INFO,console
拷贝任意文件到监听目录下,能够从日志看到文件上传到 HDFS 的路径:
# cp log.txt logs/
查看上传到 HDFS 上的文件内容与本地是否一致:
# hdfs dfs -cat /flume/events/19-04-09/13/log.txt.1554788567801
需求: 将本服务器收集到的数据发送到另一台服务器。
实现:使用 avro sources
和 avro Sink
实现。
新建配置 netcat-memory-avro.properties
,监听文件内容变化,而后将新的文件内容经过 avro sink
发送到 hadoop001 这台服务器的 8888 端口:
#指定agent的sources,sinks,channels a1.sources = s1 a1.sinks = k1 a1.channels = c1 #配置sources属性 a1.sources.s1.type = exec a1.sources.s1.command = tail -F /tmp/log.txt a1.sources.s1.shell = /bin/bash -c a1.sources.s1.channels = c1 #配置sink a1.sinks.k1.type = avro a1.sinks.k1.hostname = hadoop001 a1.sinks.k1.port = 8888 a1.sinks.k1.batch-size = 1 a1.sinks.k1.channel = c1 #配置channel类型 a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100
使用 avro source
监听 hadoop001 服务器的 8888 端口,将获取到内容输出到控制台:
#指定agent的sources,sinks,channels a2.sources = s2 a2.sinks = k2 a2.channels = c2 #配置sources属性 a2.sources.s2.type = avro a2.sources.s2.bind = hadoop001 a2.sources.s2.port = 8888 #将sources与channels进行绑定 a2.sources.s2.channels = c2 #配置sink a2.sinks.k2.type = logger #将sinks与channels进行绑定 a2.sinks.k2.channel = c2 #配置channel类型 a2.channels.c2.type = memory a2.channels.c2.capacity = 1000 a2.channels.c2.transactionCapacity = 100
启动日志汇集 Flume:
flume-ng agent \ --conf conf \ --conf-file /usr/app/apache-flume-1.6.0-cdh5.15.2-bin/examples/avro-memory-logger.properties \ --name a2 -Dflume.root.logger=INFO,console
在启动日志收集 Flume:
flume-ng agent \ --conf conf \ --conf-file /usr/app/apache-flume-1.6.0-cdh5.15.2-bin/examples/netcat-memory-avro.properties \ --name a1 -Dflume.root.logger=INFO,console
这里建议按以上顺序启动,缘由是 avro.source
会先与端口进行绑定,这样 avro sink
链接时才不会报没法链接的异常。可是即便不按顺序启动也是不要紧的,sink
会一直重试,直至创建好链接。
向文件 tmp/log.txt
中追加内容:
能够看到已经从 8888 端口监听到内容,并成功输出到控制台:
更多大数据系列文章能够参见 GitHub 开源项目: 大数据入门指南