# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file# distributed with this work for additional information# regarding copyright ownership. The ASF licenses this file# to you under the Apache License, Version 2.0 (the# "License"); you may not use this file except in compliance# with the License. You may obtain a copy of the License at## http://www.apache.org/licenses/LICENSE-2.0## Unless required by applicable law or agreed to in writing,# software distributed under the License is distributed on an# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY# KIND, either express or implied. See the License for the# specific language governing permissions and limitations# under the License. # The configuration file needs to define the sources, # the channels and the sinks.# Sources, channels and sinks are defined per agent, # in this case called 'agent'#new method ,TAILDIR
logcollect.sources = taildir-sourcelogcollect.sinks = userOperLogSink userBehaviorSinklogcollect.channels = userOperLogChannel userBehaviorChannel# Describe/configure the source
logcollect.sources.taildir-source.type = TAILDIRlogcollect.sources.taildir-source.channels =userOperLogChannel userBehaviorChannellogcollect.sources.taildir-source.channels.skipToEnd = Truelogcollect.sources.taildir-source.positionFile = /home/logcollect/flume/taildir_position.json# throught Space-separated list file dir which will been taillogcollect.sources.taildir-source.filegroups = f1 logcollect.sources.taildir-source.kafka.consumer.timeout.ms = 60000logcollect.sources.taildir-source.filegroups.f1 = /home/logcollect/server/log/.*.log
logcollect.sources.taildir-source.headers.f1.headerKey1 = value1#logcollect.sources.taildir-source.filegroups.f2 = /usr/local/tomcat/logs/gi/gi.log#logcollect.sources.taildir-source.headers.f2.headerKey1 =value2#logcollect.sources.taildir-source.headers.f2.headerKey2 =value2-2logcollect.sources.taildir-source.fileHeader = true # interceptor# 拦截器logcollect.sources.taildir-source.interceptors = interceptor# 拦截器类型(正则) regex_extractorlogcollect.sources.taildir-source.interceptors.interceptor.type = regex_extractor# 接收日志中包含KCAELBDDCYOXSLGR或DXADXHZWEXTGCFQB的数据logcollect.sources.taildir-source.interceptors.interceptor.regex = .*(userOperLog|UserBehaviorLog).*logcollect.sources.taildir-source.interceptors.interceptor.serializers = s1logcollect.sources.taildir-source.interceptors.interceptor.serializers.s1.name = key# selector
# 选择器logcollect.sources.taildir-source.selector.type = multiplexinglogcollect.sources.taildir-source.selector.header = key# 日志含有KCAELBDDCYOXSLGR的写入到 risk-channel 通道logcollect.sources.taildir-source.selector.mapping.userOperLog = userOperLogChannel# 日志含有DXADXHZWEXTGCFQB的写入到 user-channel 通道logcollect.sources.taildir-source.selector.mapping.UserBehaviorLog= userBehaviorChannel
# Describe/configure the sink
# sink 类型logcollect.sinks.userOperLogSink.type = org.apache.flume.sink.kafka.KafkaSink# kafka topiclogcollect.sinks.userOperLogSink.topic = user_oper_loglogcollect.sinks.userOperLogSink.brokerList = 10.168.79.166:9092,10.168.30.114:9092,10.168.92.222:9092logcollect.sinks.userOperLogSink.batchSize = 200logcollect.sinks.userOperLogSink.requiredAcks = 1# sink 对应的 channellogcollect.sinks.userOperLogSink.channel = userOperLogChannel# user-sinklogcollect.sinks.userBehaviorSink.type = org.apache.flume.sink.kafka.KafkaSinklogcollect.sinks.userBehaviorSink.topic = user_behavior_loglogcollect.sinks.userBehaviorSink.brokerList = 10.168.79.166:9092,10.168.30.114:9092,10.168.92.222:9092logcollect.sinks.userBehaviorSink.batchSize = 200logcollect.sinks.userBehaviorSink.requiredAcks = 1# sink 对应的 channellogcollect.sinks.userBehaviorSink.channel = userBehaviorChannel # Use a channel which buffers events in memory# Describe/configure the channel# risk-channel# channel 类型为 memory 内存logcollect.channels.userOperLogChannel.type = memory# 存储在通道中的最大事件数logcollect.channels.userOperLogChannel.capacity=10000# 字节缓冲区百分比和信道中所有事件的估计总大小logcollect.channels.userOperLogChannel.byteCapacityBufferPercentage=2000# user-channellogcollect.channels.userBehaviorChannel.type = memorylogcollect.channels.userBehaviorChannel.capacity=10000logcollect.channels.userBehaviorChannel.byteCapacityBufferPercentage=2000