주식회사 누리아이티

정보자산의 보안강화를 위한 다계층 인증SW (BaroPAM) 전문기업.

▶ BaroSolution/가이드

이상금융거래 탐지/차단을 위한 실시간 로그 수집기인 BaroCollector 관리

누리아이티 2020. 2. 25. 12:43

1. BaroCollector 설치

 

BaroCollector 설치는 컴파일 후 생성된 flume-ng-jdbc-source-2.0.jar 파일을 $FLUME_HOME/lib 디렉토리에 다음과 같이 복사하면 된다.

 

[root] /home/flume-ng-sources/target > cp flume-ng-jdbc-source-2.0.jar $FLUME_HOME/lib/.

 

 

2. 환경 변수 설정

 

BaroCollector를 기동하려면 환경설정 파일인 flume-env.sh에 다음과 같이 환경 변수들을 정의해야 한다.

 

변수 설명 비고
FLUME_HOME Apache Flume이 설치된 디렉토리를 지정하는 변수  
FLUME_CLASSPATH Apache Flume Library 디렉토리를 지정하는 변수  
JAVA_HOME JDK가 설치된 디렉토리를 지정하는 변수  
CLASSPATH Java 프로그램을 컴파일(javac)이나 실행(java)할 때나 관련된 클래스를 지정하는 변수  
LANG 동일한 언어를 지원하는 데 필요한 로케일을 지정하는 변수  
PATH $FLUME_HOME/bin, $JAVA_HOME/bin PATH에 반드시 포함되어야 한다.  
     

 

[root] /home/apache-flume-1.6.0-bin/agent > vi flume-env.sh
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements.  See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership.  The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License.  You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
 
# If this file is placed at FLUME_CONF_DIR/flume-env.sh, it will be sourced
# during Flume startup.
 
# Give Flume more memory and pre-allocate, enable remote monitoring via JMX
#JAVA_OPTS="-Xms100m -Xmx200m -Dcom.sun.management.jmxremote"
JAVA_OPTS="-XX:MaxDirectMemorySize=128m"
 
# Note that the Flume conf directory is always included in the classpath.
FLUME_HOME=/home/apache-flume-1.6.0-bin
FLUME_CLASSPATH=$FLUME_HOME/lib
 
# Java variables can be set here
JAVA_HOME=/usr/lib/jvm/jre-1.7.0-openjdk.x86_64
CLASSPATH=$CLASSPATH:$FLUME_CLASSPATH:$JAVA_HOME/lib:
 
# Enviroment variables can be set here.
LANG=ko_KR.euckr
#LANG=ko_KR.utf8
PATH=$PATH:$FLUME_HOME/bin:$JAVA_HOME/bin:/etc/alternatives

 

 

3. Log4j 속성 설정

 

log4j는 프로그램을 작성하는 도중에 로그를 남기기 위해 사용되는 자바 기반 로깅 유틸리티이다. 디버그용 도구로 주로 사용되고 있다.

 

log4j의 최근 버전에 의하면 높은 등급에서 낮은 등급으로의 6개 로그 레벨(FATAL, ERROR, WARN, INFO, DEBUG, TRACE)을 가지고 있다. 설정 파일에 대상별(자바에서는 패키지)로 레벨을 지정이 가능하고 그 등급 이상의 로그만 저장하는 방식이다.

 

[root] /home/apache-flume-1.6.0-bin/agent > vi log4j.properties
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements.  See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership.  The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License.  You may obtain a copy of the License at
#
#  http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied.  See the License for the
# specific language governing permissions and limitations
# under the License.
#
 
# Define some default values that can be overridden by system properties.
#
# For testing, it may also be convenient to specify
# -Dflume.root.logger=DEBUG,console when launching flume.
 
#flume.root.logger=DEBUG,console
flume.root.logger=INFO,LOGFILE
flume.log.dir=./logs
flume.log.file=flume.log
 
log4j.logger.org.apache.flume.lifecycle = INFO
log4j.logger.org.jboss = WARN
log4j.logger.org.mortbay = INFO
log4j.logger.org.apache.avro.ipc.NettyTransceiver = WARN
log4j.logger.org.apache.hadoop = INFO
 
# Define the root logger to the system property "flume.root.logger".
log4j.rootLogger=${flume.root.logger}
 
 
# Stock log4j rolling file appender
# Default log rotation configuration
log4j.appender.LOGFILE=org.apache.log4j.RollingFileAppender
log4j.appender.LOGFILE.MaxFileSize=100MB
log4j.appender.LOGFILE.MaxBackupIndex=10
log4j.appender.LOGFILE.File=${flume.log.dir}/${flume.log.file}
log4j.appender.LOGFILE.layout=org.apache.log4j.PatternLayout
log4j.appender.LOGFILE.layout.ConversionPattern=%d{dd MMM yyyy HH:mm:ss,SSS} %-5p [%t] (%C.%M:%L) %x - %m%n
 
 
# Warning: If you enable the following appender it will fill up your disk if you don't have a cleanup job!
# This uses the updated rolling file appender from log4j-extras that supports a reliable time-based rolling policy.
# See http://logging.apache.org/log4j/companions/extras/apidocs/org/apache/log4j/rolling/TimeBasedRollingPolicy.html
# Add "DAILY" to flume.root.logger above if you want to use this
log4j.appender.DAILY=org.apache.log4j.rolling.RollingFileAppender
log4j.appender.DAILY.rollingPolicy=org.apache.log4j.rolling.TimeBasedRollingPolicy
log4j.appender.DAILY.rollingPolicy.ActiveFileName=${flume.log.dir}/${flume.log.file}
log4j.appender.DAILY.rollingPolicy.FileNamePattern=${flume.log.dir}/${flume.log.file}.%d{yyyy-MM-dd}
log4j.appender.DAILY.layout=org.apache.log4j.PatternLayout
log4j.appender.DAILY.layout.ConversionPattern=%d{dd MMM yyyy HH:mm:ss,SSS} %-5p [%t] (%C.%M:%L) %x - %m%n
 
 
# console
# Add "console" to flume.root.logger above if you want to use this
log4j.appender.console=org.apache.log4j.ConsoleAppender
log4j.appender.console.target=System.err
log4j.appender.console.layout=org.apache.log4j.PatternLayout
log4j.appender.console.layout.ConversionPattern=%d (%t) [%p - %l] %m%n

 

 

4. BaroCollector 속성 설정

 

BaroCollector JDBCSource를 사용하려면 환경설정 파일인 flume.conf에 다음과 같이 Property들을 정의해야 한다.

 

[root] /home/apache-flume-1.6.0-bin/agent > vi flume.conf
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements.  See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership.  The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License.  You may obtain a copy of the License at
#
#  http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied.  See the License for the
# specific language governing permissions and limitations
# under the License.
 
# The configuration file needs to define the sources,
# the channels and the sinks.
# Sources, channels and sinks are defined per agent,
# in this case called 'agent'
 
agent_500.sources = sql-source1
agent_500.channels = mem-channel1
agent_500.sinks = sql-sink1
 
# Describe/configure the source
agent_500.sources.sql-source1.type = org.apache.flume.source.JDBCSource
agent_500.sources.sql-source1.restartThrottle = 1000
agent_500.sources.sql-source1.restart = true
#agent_500.sources.sql-source1.restart = false
agent_500.sources.sql-source1.batchSize = 100
 
# URL to connect to database
agent_500.sources.sql-source1.source.jdbc.driver = oracle.jdbc.OracleDriver
agent_500.sources.sql-source1.source.connection.url = jdbc:oracle:thin:@160.61.8.105:1521:KOREA
 
# Database connection properties
#agent_500.sources.sql-source1.source charset = euc-kr
agent_500.sources.sql-source1.source.charset = utf-8
agent_500.sources.sql-source1.source.user = FDSMON
agent_500.sources.sql-source1.source.password = fdsmon2015
 
# Columns to import to kafka (default * import entire row)
agent_500.sources.sql-source1.source.sql.statement = SELECT TRD_DT,TLGR_SN,TRD_DT || TLGR_SN AS UUID,TO_CHAR(WRK_DTM, 'YYYYMMDD') || TO_CHAR(SYSTIMESTAMP,'HH24MISSFF6') AS LOG_DTTM,'005' AS TASK_TYPE,NVL(WRK_EMP_NO, ' ') AS USER_ID,NVL(RCVG_FNS_CD, ' ') AS RCVG_FNS_CD,NVL(OUR_ANCO_TP_CD, ' ') AS OUR_ANCO_TP_CD,NVL(RCVG_BNK_BRCH_CD, ' ') AS RCVG_BNK_BRCH_CD,NVL(RCVG_ACNT_NO, ' ') AS RCVG_ACNT_NO,NVL(TRD_AMT, 0) AS TRD_AMT,NVL(RCVG_ACNT_NM, ' ') AS RCVG_ACNT_NM,NVL(OTAM_ACNT_NM, ' ') AS OTAM_ACNT_NM,NVL(OTAM_ACNT_NO, ' ') AS OTAM_ACNT_NO,NVL(BKNG_TRD_STAT_CD, ' ') AS BKNG_TRD_STAT_CD,NVL(CHNL_TP_CD, ' ') AS CHNL_TP_CD,NVL(MDM_CLSS_CD, ' ') AS MDM_CLSS_CD,NVL(ELEC_FN_MDM_TP_CD, ' ') AS ELEC_FN_MDM_TP_CD,NVL(ATM_MACH_NO, ' ') AS ATM_MACH_NO,NVL(WRK_TRM_IP_ADDR, ' ') AS IP_ADDR FROM TYS.BKTR25P WHERE TRD_DT >= ? AND TLGR_SN > ? AND BKNG_TRD_STAT_CD  = '02' AND ROWNUM < 100 ORDER BY TRD_DT ASC,TLGR_SN ASC
agent_500.sources.sql-source1.source.data.format = delimit
agent_500.sources.sql-source1.source.data.delimit = ,
agent_500.sources.sql-source1.source.data.charset =
#agent_500.sources.sql-source1.source.data.charset = euc-kr,ms949
 
# Increment column properties
agent_500.sources.sql-source1.source.incremental.column.name = TRD_DT,TLGR_SN
 
# Increment value is from you want to start taking data from tables (0 will import entire table)
agent_500.sources.sql-source1.source.incremental.value =
 
# Query delay, each configured milisecond the query will be sent
agent_500.sources.sql-source1.source.run.query.delay = 1000
 
# Status file is used to save last readed row
agent_500.sources.sql-source1.source.status.file.path = ./status
agent_500.sources.sql-source1.source.status.file.name = sql-source1.status
 
# Static Interceptor
#agent_500.sources.sql-source1.interceptors = i1
#agent_500.sources.sql-source1.interceptors.i1.type = static
#agent_500.sources.sql-source1.interceptors.i1.key = state
#agent_500.sources.sql-source1.interceptors.i1.value = 005
 
# The channel can be defined as follows.
agent_500.sources.sql-source1.channels = mem-channel1
 
# Multiplexing Channel Selector
#agent_500.sources.sql-source1.selector.type = multiplexing
#agent_500.sources.sql-source1.selector.header = state
#agent_500.sources.sql-source1.selector.mapping.500 = mem-channel1
 
# Each sink's type must be defined
agent_500.sinks.sql-sink1.type = org.apache.flume.sink.SQLSink
 
# URL to connect to database
agent_500.sinks.sql-sink1.sink.jdbc.driver = sunje.sundb.jdbc.SundbDriver
agent_500.sinks.sql-sink1.sink.connection.url = jdbc:sundb://160.61.194.54:22581/test
 
# Database connection properties
#agent_500.sinks.sql-sink1.sink.charset = euc-kr
agent_500.sinks.sql-sink1.sink.charset = utf-8
agent_500.sinks.sql-sink1.sink.user = BAROFDS
agent_500.sinks.sql-sink1.sink.password = BAROFDS
 
# Data format properties(json, keyvalue, xml, delimit, string)
agent_500.sinks.sql-sink1.sink.data.format = delimit
 
# Data delimit properties(delimit)
agent_500.sinks.sql-sink1.sink.data.delimit = ,
 
# Data table properties
agent_500.sinks.sql-sink1.sink.data.table = NF_COLLECT_LOG_500
 
# SQL properties(xml, delimit, string)
agent_500.sinks.sql-sink1.sink.sql.statement = INSERT INTO NF_COLLECT_LOG_500 (UUID,LOG_DTTM,USER_ID,RCVG_FNS_CD,OUR_ANCO_TP_CD,RCVG_BNK_BRCH_CD,RCVG_ACNT_NO,TRD_AMT,RCVG_ACNT_NM,OTAM_ACNT_NM,OTAM_ACNT_NO,BKNG_TRD_STAT_CD,CHNL_TP_CD,MDM_CLASS_CD,ELEC_FN_MDM_TP_CD,ATM_MACH_NO,IP_ADDR) VALUES (?,?,?,?,?,?,?,TO_NUMBER(?),?,?,?,?,?,?,?,?,?)
agent_500.sinks.sql-sink1.sink.sql.dataseq = 2,3,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19
agent_500.sinks.sql-sink1.sink.rule.dataseq = 2,3,4
 
# Data charset properties
agent_500.sinks.sql-sink1.sink.data.charset =
 
# UnitRule properties(Y or N)
agent_500.sinks.sql-sink1.sink.unit.rule = Y
 
# UnitRule SQL statement properties
agent_500.sinks.sql-sink1.sink.unit.rule.statement = SELECT UNITRULE_CODE, UNITRULE_UNION_SQL FROM GF_UNIT_RULE WHERE TASK_TYPE = ? AND USE_YN = 'Y' ORDER BY UNITRULE_CODE ASC
 
# UnitRule code update properties
agent_500.sinks.sql-sink1.sink.unit.rule.code.update = UPDATE NF_COLLECT_LOG_{task_type} SET UNITRULE_CODE = DECODE(UNITRULE_CODE, NULL, ?, UNITRULE_CODE || ',' || ?) WHERE UUID = ? AND LOG_DTTM = ? AND (UNITRULE_CODE IS NULL OR UNITRULE_CODE NOT LIKE '%' || ? || '%')
 
# Rule set properties(Y or N)
agent_500.sinks.sql-sink1.sink.rule.set = Y
 
# Rule set detector SQL Statement properties
agent_500.sinks.sql-sink1.sink.rule.set.detect.stmt1 = SELECT A.USER_ID AS USER_ID, NVL(B.FINNAL_HANDLING, ' ') AS FINNAL_HANDLING FROM NF_COLLECT_LOG_{task_type} A, GF_FDS B WHERE A.UUID = ? AND A.LOG_DTTM = ? AND A.USER_ID > ' ' AND B.USER_ID(+) = A.USER_ID AND B.TRANS_DTTM(+) >= TO_CHAR(SYSDATE, 'YYYYMMDD') || '000000000000' AND B.TRANS_DTTM(+) <= TO_CHAR(SYSDATE, 'YYYYMMDD') || '240000000000' AND B.FINNAL_HANDLING(+) <> '001'
agent_500.sinks.sql-sink1.sink.rule.set.detect.stmt2 = SELECT UNITRULE_CODE FROM NF_COLLECT_LOG_100 WHERE USER_ID = ? AND LOG_DTTM >= TO_CHAR(SYSDATE, 'YYYYMMDD') || '000000000000' AND LOG_DTTM <= TO_CHAR(SYSDATE, 'YYYYMMDD') || '240000000000' AND UNITRULE_CODE > ' ' UNION ALL SELECT UNITRULE_CODE FROM NF_COLLECT_LOG_200 WHERE USER_ID = ? AND LOG_DTTM >= TO_CHAR(SYSDATE, 'YYYYMMDD') || '000000000000' AND LOG_DTTM <= TO_CHAR(SYSDATE, 'YYYYMMDD') || '240000000000' AND UNITRULE_CODE > ' ' UNION ALL SELECT UNITRULE_CODE FROM NF_COLLECT_LOG_300 WHERE USER_ID = ? AND LOG_DTTM >= TO_CHAR(SYSDATE, 'YYYYMMDD') || '000000000000' AND LOG_DTTM <= TO_CHAR(SYSDATE, 'YYYYMMDD') || '240000000000' AND UNITRULE_CODE > ' ' UNION ALL SELECT UNITRULE_CODE FROM NF_COLLECT_LOG_400 WHERE USER_ID = ? AND LOG_DTTM >= TO_CHAR(SYSDATE, 'YYYYMMDD') || '000000000000' AND LOG_DTTM <= TO_CHAR(SYSDATE, 'YYYYMMDD') || '240000000000' AND UNITRULE_CODE > ' ' UNION ALL SELECT UNITRULE_CODE FROM NF_COLLECT_LOG_500 WHERE USER_ID = ? AND LOG_DTTM >= TO_CHAR(SYSDATE, 'YYYYMMDD') || '000000000000' AND LOG_DTTM <= TO_CHAR(SYSDATE, 'YYYYMMDD') || '240000000000' AND UNITRULE_CODE > ' ' UNION ALL SELECT UNITRULE_CODE FROM NF_COLLECT_LOG_600 WHERE USER_ID = ? AND LOG_DTTM >= TO_CHAR(SYSDATE, 'YYYYMMDD') || '000000000000' AND LOG_DTTM <= TO_CHAR(SYSDATE, 'YYYYMMDD') || '240000000000' AND UNITRULE_CODE > ' '
agent_500.sinks.sql-sink1.sink.rule.set.detect.stmt3 = SELECT RULESET_CODE, UNITRULE_CODE FROM GF_RULE_SET WHERE USE_YN = 'Y' ORDER BY RULESET_CODE ASC
agent_500.sinks.sql-sink1.sink.rule.set.fds.stmt = INSERT INTO GF_FDS (TRANS_DTTM,USER_ID,RULESET_CODE,ACCIDENT_YN,MEASURE_YN,RCVG_ACNT_NM_YN,PAST_INFO_YN,FINNAL_HANDLING) SELECT TO_CHAR(SYSTIMESTAMP, 'YYYYMMDDHH24MISSFF6'),?,?,'Y','N','N','N','001' FROM DUAL WHERE NOT EXISTS (SELECT 'Y' FROM GF_FDS Z WHERE Z.USER_ID = ? AND Z.TRANS_DTTM >= TO_CHAR(SYSDATE, 'YYYYMMDD') || '000000000000' AND Z.TRANS_DTTM <= TO_CHAR(SYSDATE, 'YYYYMMDD') || '240000000000' AND Z.FINNAL_HANDLING = '001')
 
# Accidents registered in the banking system(Y or N)
agent_500.sinks.sql-sink1.sink.fraud.detect = N
 
# SMS, BankSystem properties
agent_500.sinks.sql-sink1.sink.sms.sql.statemet =
agent_500.sinks.sql-sink1.sink.cust.jdbc.driver = oracle.jdbc.OracleDriver
agent_500.sinks.sql-sink1.sink.cust.connection.url = jdbc:oracle:thin:@160.61.8.105:1521:KOREA
agent_500.sinks.sql-sink1.sink.cust.charset = utf-8
agent_500.sinks.sql-sink1.sink.cust.user = FDSMON
agent_500.sinks.sql-sink1.sink.cust.password = fdsmon2015
agent_500.sinks.sql-sink1.sink.cust.sql.statemet =
 
# Specify the channel the sink should use
agent_500.sinks.sql-sink1.channel = mem-channel1
 
# Each channel's type is defined.
#agent_500.channels.mem-channel1.type = memory
agent_500.channels.mem-channel1.type = file
agent_500.channels.mem-channel1.checkpointDir = ./checkpoint_500
agent_500.channels.mem-channel1.dataDirs = ./checkdata_500
 
# Other config values specific to each type of channel(sink or source)
# can be defined as well
# In this case, it specifies the capacity of the memory channel
agent_500.channels.mem-channel1.capacity = 1080000
agent_500.channels.mem-channel1.transactionCapacity = 10000
agent_500.channels.mem-channel1.keep-alive = 3

 

 

5. BaroCollector 기동

 

BaroCollector를 기동하는 startup.sh 쉘 스크립트는 다음과 같다.

 

[root] /home/apache-flume-1.6.0-bin/agent > vi startup.sh
#!/bin/sh
 
#export FLUME_HOME=/home/apache-flume-1.6.0-bin;
#export JAVA_HOME=/usr/lib/jvm/jre-1.7.0-openjdk.x86_64;
 
#export CLASSPATH=$CLASSPATH:$FLUME_HOME/lib:$JAVA_HOME/lib
#export PATH=$PATH:$FLUME_HOME/bin:$JAVA_HOME/bin
 
flume-ng agent -n agent_100 -c $FLUME_HOME/agent_100/ -f flume.conf

 

BaroCollector 기동은 startup.sh 쉘 스크립트를 백드라운드 프로세스로 다음과 같이 실행하면 된다.

 

[root] /home/apache-flume-1.6.0-bin/agent > sh startup.sh &

 

BaroCollector가 실행되고 있는지 확인하기 위해서는 다음과 같은 명령어를 수행한다.

 

[root] /home/apache-flume-1.6.0-bin/agent > ps -ef|grep flume | grep agent_500 | grep -v grep

 

그러면, 다음과 같이 BaroCollector 프로세스가 존재하는지 확인할 수 있다.

 

[root] /home/apache-flume-1.6.0-bin/agent > ps -ef|grep flume | grep agent_500 | grep -v grep
root      4666  4662 41 11:05 pts/1    00:00:01 /usr/lib/jvm/jre-1.7.0-openjdk.x86_64/bin/java -XX:MaxDirectMemorySize=128m -cp /home/apache-flume-1.6.0-bin/agent/spooldir:/home/apache-flume-1.6.0-bin/lib/*:/home/apache-flume-1.6.0-bin/lib:/lib/* -Djava.library.path= org.apache.flume.node.Application -n agent_500 -f flume.conf

 

 

6. BaroCollector 종료

 

BaroCollector를 종료하는 shutdown.sh 쉘 스크립트는 다음과 같다.

 

[root] /home/apache-flume-1.6.0-bin/agent > vi shutdown.sh
#!/bin/sh
 
#export FLUME_HOME=/home/apache-flume-1.6.0-bin;
#export JAVA_HOME=/usr/lib/jvm/jre-1.7.0-openjdk.x86_64;
 
#export CLASSPATH=$CLASSPATH:$FLUME_HOME/lib:$JAVA_HOME/lib
#export PATH=$PATH:$FLUME_HOME/bin:$JAVA_HOME/bin
 
ps -ef|grep flume | grep agent_500 | grep -v grep |awk '{print "kill -9 "$2}'|sh -v

 

BaroCollector 종료는 shutdown.sh 쉘 스크립트를 다음과 같이 실행하면 된다.

 

[root] /home/apache-flume-1.6.0-bin/agent > sh shutdown.sh

 

 

7. BaroCollector 로그

 

BaroCollector 로그는 BaroCollector가 실행되면서 발생한 로그(INFO, WARN, ERROR) 및 수집하면서 남긴 로그들이 남아 향후 BaroCollector 상태 및 장애 발생시 원인 구명 등에 활용하는 중요한 로그다.

 

[root] /home/apache-flume-1.6.0-bin/agent/logs > ls -al
합계 20
drwxr-xr-x 2 root root  4096 12  4 11:05 .
drwxr-xr-x 6 root root  4096 12  4 11:04 ..
-rw-r--r-- 1 root root 12188 12  4 11:05 flume.log

 

[root] /home/apache-flume-1.6.0-bin/agent/logs > vi flume.log
04 12 2015 11:05:19,976 INFO  [lifecycleSupervisor-1-0] (org.apache.flume.node.PollingPropertiesFileConfigurationProvider.start:61)  - Configuration provider starting
04 12 2015 11:05:19,981 INFO  [conf-file-poller-0] (org.apache.flume.node.PollingPropertiesFileConfigurationProvider$FileWatcherRunnable.run:133)  - Reloading configuration file:flume.conf
04 12 2015 11:05:20,007 INFO  [conf-file-poller-0] (org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.addProperty:1017)  - Processing:avroSink1
04 12 2015 11:05:20,009 INFO  [conf-file-poller-0] (org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.addProperty:1017)  - Processing:avroSink1
04 12 2015 11:05:20,009 INFO  [conf-file-poller-0] (org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.addProperty:1017)  - Processing:avroSink1
04 122015 11:05:20,009 INFO  [conf-file-poller-0] (org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.addProperty:931)  - Added sinks: avroSink1 Agent: spooldir
04 12 2015 11:05:20,010 INFO  [conf-file-poller-0] (org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.addProperty:1017)  - Processing:avroSink1
04 12 2015 11:05:20,010 INFO  [conf-file-poller-0] (org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.addProperty:1017)  - Processing:avroSink1
04 12 2015 11:05:20,010 INFO  [conf-file-poller-0] (org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.addProperty:1017)  - Processing:avroSink1
04 12 2015 11:05:20,032 INFO  [conf-file-poller-0] (org.apache.flume.conf.FlumeConfiguration.validateConfiguration:141)  - Post-validation flume configuration contains configuration for agents: [spooldir]
04 12 2015 11:05:20,033 INFO  [conf-file-poller-0] (org.apache.flume.node.AbstractConfigurationProvider.loadChannels:145)  - Creating channels
04 12 2015 11:05:20,058 INFO  [conf-file-poller-0] (org.apache.flume.channel.DefaultChannelFactory.create:42)  - Creating instance of channel mem-channel1 type file
04 12 2015 11:05:20,102 INFO  [conf-file-poller-0] (org.apache.flume.node.AbstractConfigurationProvider.loadChannels:200)  - Created channel mem-channel1
04 12 2015 11:05:20,103 INFO  [conf-file-poller-0] (org.apache.flume.source.DefaultSourceFactory.create:41)  - Creating instance of source agent1, type spooldir
04 12 2015 11:05:20,132 INFO  [conf-file-poller-0] (org.apache.flume.interceptor.StaticInterceptor$Builder.build:133)  - Creating StaticInterceptor: preserveExisting=true,key=task_type,value=700
04 12 2015 11:05:20,133 INFO  [conf-file-poller-0] (org.apache.flume.sink.DefaultSinkFactory.create:42)  - Creating instance of sink: avroSink1, type: avro
04 12 2015 11:05:20,138 INFO  [conf-file-poller-0] (org.apache.flume.sink.AbstractRpcSink.configure:183)  - Connection reset is set to 0. Will not reset connection to next hop
04 12 2015 11:05:20,139 INFO  [conf-file-poller-0] (org.apache.flume.node.AbstractConfigurationProvider.getConfiguration:114)  - Channel mem-channel1 connected to [agent1, avroSink1]
04 12 2015 11:05:20,144 INFO  [conf-file-poller-0] (org.apache.flume.node.Application.startAllComponents:138)  - Starting new configuration:{ sourceRunners:{agent1=EventDrivenSourceRunner: { source:Spool Directory source agent1: { spoolDir: ./file } }} sinkRunners:{avroSink1=SinkRunner: { policy:org.apache.flume.sink.DefaultSinkProcessor@7209d9af counterGroup:{ name:null counters:{} } }} channels:{mem-channel1=FileChannel mem-channel1 { dataDirs: [./checkdata1] }} }
04 12 2015 11:05:20,145 INFO  [conf-file-poller-0] (org.apache.flume.node.Application.startAllComponents:145)  - Starting Channel mem-channel1
04 12 2015 11:05:20,146 INFO  [lifecycleSupervisor-1-0] (org.apache.flume.channel.file.FileChannel.start:273)  - Starting FileChannel mem-channel1 { dataDirs: [./checkdata1] }...
04 12 2015 11:05:20,186 INFO  [lifecycleSupervisor-1-0] (org.apache.flume.channel.file.Log.<init>:344)  - Encryption is not enabled
04 12 2015 11:05:20,187 INFO  [lifecycleSupervisor-1-0] (org.apache.flume.channel.file.Log.replay:392)  - Replay started
04 12 2015 11:05:20,209 INFO  [lifecycleSupervisor-1-0] (org.apache.flume.channel.file.Log.replay:404)  - Found NextFileID 15, from [./checkdata1/log-14, ./checkdata1/log-15, ./checkdata1/log-12, ./checkdata1/log-13]
04 12 2015 11:05:20,226 INFO  [lifecycleSupervisor-1-0] (org.apache.flume.channel.file.EventQueueBackingStoreFileV3.<init>:55)  - Starting up with ./checkpoint1/checkpoint and ./checkpoint1/checkpoint.meta
04 12 2015 11:05:20,226 INFO  [lifecycleSupervisor-1-0] (org.apache.flume.channel.file.EventQueueBackingStoreFileV3.<init>:59)  - Reading checkpoint metadata from ./checkpoint1/checkpoint.meta
04 12 2015 11:05:20,455 INFO  [lifecycleSupervisor-1-0] (org.apache.flume.channel.file.FlumeEventQueue.<init>:116)  - QueueSet population inserting 0 took 0
04 12 2015 11:05:20,496 INFO  [lifecycleSupervisor-1-0] (org.apache.flume.channel.file.Log.replay:443)  - Last Checkpoint Thu Nov 26 11:19:29 KST 2015, queue depth = 0
04 12 2015 11:05:20,502 INFO  [lifecycleSupervisor-1-0] (org.apache.flume.channel.file.Log.doReplay:528)  - Replaying logs with v2 replay logic
04 12 2015 11:05:20,533 INFO  [lifecycleSupervisor-1-0] (org.apache.flume.channel.file.ReplayHandler.replayLog:249)  - Starting replay of [./checkdata1/log-12, ./checkdata1/log-13, ./checkdata1/log-14, ./checkdata1/log-15]
04 12 2015 11:05:20,546 INFO  [lifecycleSupervisor-1-0] (org.apache.flume.channel.file.ReplayHandler.replayLog:262)  - Replaying ./checkdata1/log-12
04 12 2015 11:05:20,586 INFO  [lifecycleSupervisor-1-0] (org.apache.flume.tools.DirectMemoryUtils.getDefaultDirectMemorySize:114)  - Unable to get maxDirectMemory from VM: NoSuchMethodException: sun.misc.VM.maxDirectMemory(null)
04 12 2015 11:05:20,593 INFO  [lifecycleSupervisor-1-0] (org.apache.flume.tools.DirectMemoryUtils.allocate:48)  - Direct Memory Allocation:  Allocation = 1048576, Allocated = 0, MaxDirectMemorySize = 134217728, Remaining = 134217728
04 12 2015 11:05:20,620 INFO  [lifecycleSupervisor-1-0] (org.apache.flume.channel.file.LogFile$SequentialReader.skipToLastCheckpointPosition:632)  - fast-forward to checkpoint position: 6610
04 12 2015 11:05:20,628 INFO  [lifecycleSupervisor-1-0] (org.apache.flume.channel.file.LogFile$SequentialReader.next:657)  - Encountered EOF at 6610 in ./checkdata1/log-12
04 12 2015 11:05:20,628 INFO  [lifecycleSupervisor-1-0] (org.apache.flume.channel.file.ReplayHandler.replayLog:262)  - Replaying ./checkdata1/log-13
04 12 2015 11:05:20,629 INFO  [lifecycleSupervisor-1-0] (org.apache.flume.channel.file.LogFile$SequentialReader.skipToLastCheckpointPosition:632)  - fast-forward to checkpoint position: 10491
04 12 2015 11:05:20,638 INFO  [lifecycleSupervisor-1-0] (org.apache.flume.channel.file.LogFile$SequentialReader.next:657)  - Encountered EOF at 10491 in ./checkdata1/log-13
04 12 2015 11:05:20,638 INFO  [lifecycleSupervisor-1-0] (org.apache.flume.channel.file.ReplayHandler.replayLog:262)  - Replaying ./checkdata1/log-14
04 12 2015 11:05:20,646 INFO  [lifecycleSupervisor-1-0] (org.apache.flume.channel.file.LogFile$SequentialReader.skipToLastCheckpointPosition:634)  - Checkpoint for file(/home/apache-flume-1.6.0-bin/agent/spooldir/./checkdata1/log-14) is: 1448504287314, which is beyond the requested checkpoint time: 1448504339289 and position 0
04 12 2015 11:05:20,660 INFO  [lifecycleSupervisor-1-0] (org.apache.flume.channel.file.ReplayHandler.replayLog:262)  - Replaying ./checkdata1/log-15
04 12 2015 11:05:20,664 INFO  [lifecycleSupervisor-1-0] (org.apache.flume.channel.file.LogFile$SequentialReader.skipToLastCheckpointPosition:632)  - fast-forward to checkpoint position: 3417
04 12 2015 11:05:20,672 INFO  [lifecycleSupervisor-1-0] (org.apache.flume.channel.file.LogFile$SequentialReader.next:657)  - Encountered EOF at 3417 in ./checkdata1/log-15
04 12 2015 11:05:20,672 INFO  [lifecycleSupervisor-1-0] (org.apache.flume.channel.file.LogFile$SequentialReader.next:657)  - Encountered EOF at 155 in ./checkdata1/log-14
04 12 2015 11:05:20,673 INFO  [lifecycleSupervisor-1-0] (org.apache.flume.channel.file.ReplayHandler.replayLog:346)  - read: 5, put: 0, take: 0, rollback: 0, commit: 0, skip: 5, eventCount:0
04 12 2015 11:05:20,673 INFO  [lifecycleSupervisor-1-0] (org.apache.flume.channel.file.FlumeEventQueue.replayComplete:409)  - Search Count = 0, Search Time = 0, Copy Count = 0, Copy Time = 0
04 12 2015 11:05:20,702 INFO  [lifecycleSupervisor-1-0] (org.apache.flume.channel.file.Log.replay:491)  - Rolling ./checkdata1
04 12 2015 11:05:20,702 INFO  [lifecycleSupervisor-1-0] (org.apache.flume.channel.file.Log.roll:961)  - Roll start ./checkdata1
04 12 2015 11:05:20,705 INFO  [lifecycleSupervisor-1-0] (org.apache.flume.channel.file.LogFile$Writer.<init>:214)  - Opened ./checkdata1/log-16
04 12 2015 11:05:20,746 INFO  [lifecycleSupervisor-1-0] (org.apache.flume.channel.file.Log.roll:977)  - Roll end
04 12 2015 11:05:20,747 INFO  [lifecycleSupervisor-1-0] (org.apache.flume.channel.file.EventQueueBackingStoreFile.beginCheckpoint:230)  - Start checkpoint for ./checkpoint1/checkpoint, elements to sync = 0
04 12 2015 11:05:20,772 INFO  [lifecycleSupervisor-1-0] (org.apache.flume.channel.file.EventQueueBackingStoreFile.checkpoint:255)  - Updating checkpoint metadata: logWriteOrderID: 1449194720258, queueSize: 0, queueHead: 731
04 12 2015 11:05:20,836 INFO  [lifecycleSupervisor-1-0] (org.apache.flume.channel.file.Log.writeCheckpoint:1034)  - Updated checkpoint for file: ./checkdata1/log-16 position: 0 logWriteOrderID: 1449194720258
04 12 2015 11:05:20,836 INFO  [lifecycleSupervisor-1-0] (org.apache.flume.channel.file.FileChannel.start:301)  - Queue Size after replay: 0 [channel=mem-channel1]
04 12 2015 11:05:20,886 INFO  [lifecycleSupervisor-1-0] (org.apache.flume.instrumentation.MonitoredCounterGroup.register:120)  - Monitored counter group for type: CHANNEL, name: mem-channel1: Successfully registered new MBean.
04 12 2015 11:05:20,886 INFO  [lifecycleSupervisor-1-0] (org.apache.flume.instrumentation.MonitoredCounterGroup.start:96)  - Component type: CHANNEL, name: mem-channel1 started
04 12 2015 11:05:20,886 INFO  [conf-file-poller-0] (org.apache.flume.node.Application.startAllComponents:173)  - Starting Sink avroSink1
04 12 2015 11:05:20,886 INFO  [lifecycleSupervisor-1-3] (org.apache.flume.sink.AbstractRpcSink.start:289)  - Starting RpcSink avroSink1 { host: 1.234.83.169, port: 61616 }...
04 12 2015 11:05:20,886 INFO  [conf-file-poller-0] (org.apache.flume.node.Application.startAllComponents:184)  - Starting Source agent1
04 12 2015 11:05:20,887 INFO  [lifecycleSupervisor-1-4] (org.apache.flume.source.SpoolDirectorySource.start:78)  - SpoolDirectorySource source starting with directory: ./file
04 12 2015 11:05:20,887 INFO  [lifecycleSupervisor-1-3] (org.apache.flume.instrumentation.MonitoredCounterGroup.register:120)  - Monitored counter group for type: SINK, name: avroSink1: Successfully registered new MBean.
04 12 2015 11:05:20,887 INFO  [lifecycleSupervisor-1-3] (org.apache.flume.instrumentation.MonitoredCounterGroup.start:96)  - Component type: SINK, name: avroSink1 started
04 12 2015 11:05:20,887 INFO  [lifecycleSupervisor-1-3] (org.apache.flume.sink.AbstractRpcSink.createConnection:206)  - Rpc sink avroSink1: Building RpcClient with hostname: 1.234.83.169, port: 61616
04 12 2015 11:05:20,888 INFO  [lifecycleSupervisor-1-3] (org.apache.flume.sink.AvroSink.initializeRpcClient:126)  - Attempting to create Avro Rpc client.
04 12 2015 11:05:21,214 INFO  [lifecycleSupervisor-1-4] (org.apache.flume.instrumentation.MonitoredCounterGroup.register:120)  - Monitored counter group for type: SOURCE, name: agent1: Successfully registered new MBean.
04 12 2015 11:05:21,214 INFO  [lifecycleSupervisor-1-4] (org.apache.flume.instrumentation.MonitoredCounterGroup.start:96)  - Component type: SOURCE, name: agent1 started
04 12 2015 11:05:21,441 INFO  [lifecycleSupervisor-1-3] (org.apache.flume.sink.AbstractRpcSink.start:303)  - Rpc sink avroSink1 started.