$ bin/flume-ng agent -n foo -f conf/flume-conf.properties.template |
$ bin/flume-ng avro-client -H localhost -p 41414 -F /usr/logs/log.10 |
#List the sources, sinks and channels for the agent <agent>.sources = <Source> <agent>.sinks = <Sink> <agent>.channels = <Channel1> <Channel2> #set channel for source <agent>.sources.<Source>.channels = <Channel1> <Channel2> ... #set channel for sink <agent>.sinks.<Sink>.channel = <Channel1> |
weblog-agent.sources = avro-AppSrv-source weblog-agent.sinks = hdfs-Cluster1-sink weblog-agent.channels = mem-channel-1 #set channel for source weblog-agent.sources.avro-AppSrv-source.channels = mem-channel-1 #set channel for sink weblog-agent.sinks.hdfs-Cluster1-sink.channel = mem-channel-1 |
#Properties for sources <agent>.sources.<Source>.<someProperty> = <someValue> .. #Properties for channels <agent>.channel.<Channel>.<someProperty> = <someValue> .. #Properties for sinks <agent>.sources.<Sink>.<someProperty> = <someValue> |
weblog-agent.sources = avro-AppSrv-source weblog-agent.sinks = hdfs-Cluster1-sink weblog-agent.channels = mem-channel-1 #set channel for sources, sinks .. #properties of avro-AppSrv-source weblog-agent.sources.avro-AppSrv-source.type = avro weblog-agent.sources.avro-AppSrv-source.bind = localhost weblog-agent.sources.avro-AppSrv-source.port = 10000 #properties of mem-channel-1 weblog-agent.channels.mem-channel-1.type = memory weblog-agent.channels.mem-channel-1.capacity = 1000 weblog-agent.channels.mem-channel-1.transactionCapacity = 100 #properties of hdfs-Cluster1-sink weblog-agent.sinks.hdfs-Cluster1-sink.type = hdfs weblog-agent.sinks.hdfs-Cluster1-sink.hdfs.path = hdfs://namenode/flume/webdata/ … |
#List the sources, sinks and channels for the agent <agent>.sources = <Source1> <Source2> <agent>.sinks = <Sink1> <Sink2> <agent>.channels = <Channel1> <Channel2> |
#List the sources, sinks and channels in the agent weblog-agent.sources = avro-AppSrv-source1 exec-tail-source2 weblog-agent.sinks = hdfs-Cluster1-sink1 avro-forward-sink2 weblog-agent.channels = mem-channel-1 jdbc-channel-2 ## Flow-1 configuration weblog-agent.sources.avro-AppSrv-source1.channels = mem-channel-1 weblog-agent.sinks.hdfs-Cluster1-sink1.channel = mem-channel-1 ## Flow-2 configuration weblog-agent.sources.exec-tail-source2.channels = jdbc-channel-2 weblog-agent.sinks.avro-forward-sink2.channel = jdbc-channel-2 |
## weblog agent config #List sources, sinks and channels in the agent weblog-agent.sources = avro-AppSrv-source weblog-agent.sinks = avro-forward-sink weblog-agent.channels = jdbc-channel #define the flow weblog-agent.sources.avro-AppSrv-source.channels = jdbc-channel weblog-agent.sinks.avro-forward-sink.channel = jdbc-channel #avro sink properties weblog-agent.sources.avro-forward-sink.type = avro weblog-agent.sources.avro-forward-sink.hostname = 10.1.1.100 weblog-agent.sources.avro-forward-sink.port = 10000 #configure other pieces ... |
## hdfs-agent config #List sources, sinks and channels in the agent hdfs-agent.sources = avro-collection-source hdfs-agent.sinks = hdfs-sink hdfs-agent.channels = mem-channel #define the flow hdfs-agent.sources.avro-collection-source.channels = mem-channel hdfs-agent.sinks.hdfs-sink.channel = mem-channel #avro source properties hdfs-agent.sources.avro-collection-source.type = avro hdfs-agent.sources.avro-collection-source.bind = 10.1.1.100 hdfs-agent.sources.avro-collection-source.port = 10000 #configure other pieces ... |
#List the sources, sinks and channels for the agent <agent>.sources = <Source1> <agent>.sinks = <Sink1> <Sink2> <agent>.channels = <Channel1> <Channel2> #set list of channels for source (separated by space) <agent>.sources.<Source1>.channels = <Channel1> <Channel2> #set channel for sinks <agent>.sinks.<Sink1>.channel = <Channel1> <agent>.sinks.<Sink2>.channel = <Channel2> <agent>.sources.<Source1>.selector.type = replicating |
# Mapping for multiplexing selector <agent>.sources.<Source1>.selector.type = multiplexing <agent>.sources.<Source1>.selector.header = <someHeader> <agent>.sources.<Source1>.selector.mapping.<Value1> = <Channel1> <agent>.sources.<Source1>.selector.mapping.<Value2> = <Channel1> <Channel2> <agent>.sources.<Source1>.selector.mapping.<Value3> = <Channel2> ... <agent>.sources.<Source1>.selector.default = <Channel2> |
#List the sources, sinks and channels in the agent weblog-agent.sources = avro-AppSrv-source1 weblog-agent.sinks = hdfs-Cluster1-sink1 avro-forward-sink2 weblog-agent.channels = mem-channel-1 jdbc-channel-2 # set channels for source weblog-agent.sources.avro-AppSrv-source1.channels = mem-channel-1 jdbc-channel-2 #set channel for sinks weblog-agent.sinks.hdfs-Cluster1-sink1.channel = mem-channel-1 weblog-agent.sinks.avro-forward-sink2.channel = jdbc-channel-2 # weblog-agent.sources.avro-AppSrv-source1.selector.type = multiplexing weblog-agent.sources.avro-AppSrv-source1.selector.header = State weblog-agent.sources.avro-AppSrv-source1.selector.mapping.CA = mem-channel-1 weblog-agent.sources.avro-AppSrv-source1.selector.mapping.AZ = jdbc-channel-2 weblog-agent.sources.avro-AppSrv-source1.selector.mapping.NY = mem-channel-1 jdbc-channel-2 weblog-agent.sources.avro-AppSrv-source1.selector.default = mem-channel-1 |
Property Name | Default | Description |
type | - | The component type name, needs to be avro |
bind | - | hostname or IP address to listen on |
port | - | Port # to bind to |
Property Name | Default | Description |
type | - | The component type name, needs to be exec |
command | - | The command to execute |
restartThrottle | 10000 | Amount of time (in millis) to wait before attempting a restart |
restart | false | Whether the executed cmd should be restarted if it dies |
logStdErr | false | Whether the command’s stderr should be logged |
备注: 在ExecSource不能保证,如果有一个失败的放入到通道的事件,客户也知道。在这种情况下,数据将丢失。 |
exec-agent.sources = tail exec-agent.channels = memoryChannel-1 exec-agent.sinks = logger exec-agent.sources.tail.type = exec exec-agent.sources.tail.command = tail -f /var/log/secure |
Property Name | Default | Description |
type | - | The component type name, needs to be netcat |
bind | - | Host name or IP address to bind to |
port | - | Port # to bind to |
max-line-length | 512 | Max line length per event body (in bytes) |
Property Name | Default | Description |
type | - | The component type name, needs to be seq |
Property Name | Default | Description |
type | - | The component type name, needs to be syslogtcp |
host | - | Host name or IP address to bind to |
port | - | Port # to bind to |
syslog-agent.sources = syslog syslog-agent.channels = memoryChannel-1 syslog-agent.sinks = logger syslog-agent.sources.syslog.type = syslogtcp syslog-agent.sources.syslog.port = 5140 syslog-agent.sources.syslog.host = localhost |
Property Name | Default | Description |
type | - | The component type name, needs to be syslogudp |
host | - | Host name or IP address to bind to |
port | - | Port # to bind to |
syslog-agent.sources = syslog syslog-agent.channels = memoryChannel-1 syslog-agent.sinks = logger syslog-agent.sources.syslog.type = syslogudp syslog-agent.sources.syslog.port = 5140 syslog-agent.sources.syslog.host = localhost |
Property Name | Default | Description |
type | - | The component type name, needs to be org.apache.flume.source.avroLegacy.AvroLegacySource |
host | - | The hostname or IP address to bind to |
port | - | The port # to listen on |
Property Name | Default | Description |
type | - | The component type name, needs to be org.apache.source.thriftLegacy.ThriftLegacySource |
host | - | The hostname or IP address to bind to |
port | - | The port # to listen on |
注:Flume1.x中的可靠性语义不同的是从0.9.x.端到端或DFO模式的0.9.x版本的代理不会被遗留源支持。 0.9.x版本唯一支持的模式是Best Effort。 |
%{host} | host name stored in event header |
%t | Unix time in milliseconds |
%a | locale’s short weekday name (Mon, Tue, …) |
%A | locale’s full weekday name (Monday, Tuesday, …) |
%b | locale’s short month name (Jan, Feb,…) |
%B | locale’s long month name (January, February,…) |
%c | locale’s date and time (Thu Mar 3 23:05:25 2005) |
%d | day of month (01) |
%D | date; same as %m/%d/%y |
%H | hour (00..23) |
%I | hour (01..12) |
%j | day of year (001..366) |
%k | hour ( 0..23) |
%m | month (01..12) |
%M | minute (00..59) |
%P | locale’s equivalent of am or pm |
%s | seconds since 1970-01-01 00:00:00 UTC |
%S | second (00..59) |
%y | last two digits of year (00..99) |
%Y | year (2010) |
%z | +hhmm numeric timezone (for example, -0400) |
Name | Default | Description |
type | - | The component type name, needs to be hdfs |
hdfs.path | - | HDFS directory path (eg hdfs://namenode/flume/webdata/) |
hdfs.filePrefix | FlumeData | Name prefixed to files created by Flume in hdfs directory |
hdfs.rollInterval | 30 | Number of seconds to wait before rolling current file |
hdfs.rollSize | 1024 | File size to trigger roll (in bytes) |
hdfs.rollCount | 10 | Number of events written to file before it rolled |
hdfs.batchSize | 1 | number of events written to file before it flushed to HDFS |
hdfs.txnEventMax | 100 | |
hdfs.codeC | - | Compression codec. one of following : gzip, bzip2, lzo, snappy |
hdfs.fileType | SequenceFile | File format - currently SequenceFile or DataStream |
hdfs.maxOpenFiles | 5000 | |
hdfs.writeFormat | - | “Text” or “Writable” |
hdfs.appendTimeout | 1000 | |
hdfs.callTimeout | 5000 | |
hdfs.threadsPoolSize | 10 | |
hdfs.kerberosPrincipal | “” | Kerberos user principal for accessing secure HDFS |
hdfs.kerberosKeytab | “” | Kerberos keytab for accessing secure HDFS |
type | - | The component type name, needs to be logger |
Property Name | Default | Description |
type | - | The component type name, needs to be avro |
hostname | - | The hostname or IP address to bind to |
port | - | The port # to listen on |
batch-size | 100 | number of event to batch together for send. |
Property Name | Default | Description |
type | - | The component type name, needs to be irc |
hostname | - | The hostname or IP address to connect to |
port | 6667 | The port number of remote host to connect |
nick | - | Nick name |
user | - | User name |
password | - | User password |
chan | - | channel |
name | ||
splitlines | - | (boolean) |
splitchars | \n | line separator (if you were to enter the default value into the config file, the you would need to escape the backslash, like this: \\n) |
Property Name | Default | Description |
type | - | The component type name, needs to be file_roll |
sink.directory | - | |
sink.rollInterval | 30 |
Property Name | Default | Description |
type | - | The component type name, needs to be null |
自定义Sink
自定义接收器是你自己的Sink接口实现。自定义Sink和它的依赖必须包含在代理中的classpath。自定义Sink的类型是其FQCN。
Flume通道
通道是一个仓库,事件存储在上面。源通过通道添加事件,接收器通过通道取事件。
内存通道
事件存储在一个可配置的最大尺寸在内存中的队列。适用场景:需要更高的吞吐量,代理出现故障后数据丢失的情况。
Property Name | Default | Description |
type | - | The component type name, needs to be memory |
capacity | 100 | The max number of events stored in the channel |
transactionCapacity | 100 | The max number of events stored in the channel per transaction |
keep-alive | 3 | Timeout in seconds for adding or removing an event |
JDBC通道
Property Name | Default | Description |
type | - | The component type name, needs to be jdbc |
db.type | DERBY | Database vendor, needs to be DERBY. |
driver.class | org.apache.derby.jdbc.EmbeddedDriver | Class for vendors JDBC driver |
driver.url | (constructed from other properties) | JDBC connection URL |
db.username | sa | User id for db connection |
db.password | password for db connection | |
connection.properties.file | - | JDBC Connection property file path |
create.schema | true | If true, then creates db schema if not there |
create.index | true | Create indexes to speed up lookups |
create.foreignkey | true | |
transaction.isolation | READ_COMMITTED | Isolation level for db session READ_UNCOMMITTED, READ_COMMITTED, SERIALIZABLE, REPEATABLE_READ |
maximum.connections | 10 | Max connections allowed to db |
maximum.capacity | 0 (unlimited) | Max number of events in the channel |
sysprop.* | DB Vendor specific properties | |
sysprop.user.home | Home path to store embedded Derby database |
Property Name | Default | Description |
type | - | The component type name, needs to be org.apache.flume.channel.recoverable.memory.RecoverableMemoryChannel |
wal.dataDir | (${user.home}/.flume/recoverable-memory-channel | |
wal.rollSize | (0x04000000) | Max size (in bytes) of a single file before we roll |
wal.minRetentionPeriod | 300000 | Min amount of time (in millis) to keep a log |
wal.workerInterval | 60000 | How often (in millis) the background worker checks for old logs |
wal.maxLogsSize | (0x20000000) | Total amt (in bytes) of logs to keep, excluding the current log |
Property Name | Default | Description |
type | - | The component type name, needs to be org.apache.flume.channel.file.FileChannel |
Property Name | Default | Description |
type | - | The component type name, needs to be org.apache.flume.channel.PseudoTxnMemoryChannel |
capacity | 50 | The max number of events stored in the channel |
keep-alive | 3 | Timeout in seconds for adding or removing an event |
| 欢迎光临 数码鹭岛论坛 (http://www.clore.net/forum/) | Powered by Discuz! X3.2 |