Advertisement

Flume安装及部署

阅读量:

Flume安装及部署

一、安装部署

(1)将apache-flume-1.10.1-bin.tar.gz上传到linux的/opt/software目录下
(2)解压apache-flume-1.10.1-bin.tar.gz到/opt/module/目录下

[xxx@hadoop102 software]$ tar -zxf /opt/software/apache-flume-1.10.1-bin.tar.gz -C /opt/module/

(3)修改apache-flume-1.10.1-bin的名称为flume

[xxx@hadoop102 module]$ mv /opt/module/apache-flume-1.10.1-bin /opt/module/flume

(4)修改conf目录下的log4j2.xml配置文件,配置日志文件路径

[xxx@hadoop102 conf]$ vim log4j2.xml

复制代码
    <?xml version="1.0" encoding="UTF-8"?>
    
    <!--
    
     Licensed to the Apache Software Foundation (ASF) under one or more
    
     contributor license agreements.  See the NOTICE file distributed with
    
     this work for additional information regarding copyright ownership.
    
     The ASF licenses this file to You under the Apache License, Version 2.0
    
     (the "License"); you may not use this file except in compliance with
    
     the License.  You may obtain a copy of the License at
    
     
    
    http://www.apache.org/licenses/LICENSE-2.0
    
     
    
     Unless required by applicable law or agreed to in writing, software
    
     distributed under the License is distributed on an "AS IS" BASIS,
    
     WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    
     See the License for the specific language governing permissions and
    
     limitations under the License.
    
     
    
    -->
    
    <Configuration status="ERROR">
    
      <Properties>
    
    <Property name="LOG_DIR">/opt/module/flume/log</Property>
    
      </Properties>
    
      <Appenders>
    
    <Console name="Console" target="SYSTEM_ERR">
    
      <PatternLayout pattern="%d (%t) [%p - %l] %m%n" />
    
    </Console>
    
    <RollingFile name="LogFile" fileName="${LOG_DIR}/flume.log" filePattern="${LOG_DIR}/archive/flume.log.%d{yyyyMMdd}-%i">
    
      <PatternLayout pattern="%d{dd MMM yyyy HH:mm:ss,SSS} %-5p [%t] (%C.%M:%L) %equals{%x}{[]}{} - %m%n" />
    
      <Policies>
    
        <!-- Roll every night at midnight or when the file reaches 100MB -->
    
       <SizeBasedTriggeringPolicy size="100 MB"/>
    
       <CronTriggeringPolicy schedule="0 0 0 * * ?"/>
    
      </Policies>
    
      <DefaultRolloverStrategy min="1" max="20">
    
        <Delete basePath="${LOG_DIR}/archive">
    
          <!-- Nested conditions: the inner condition is only evaluated on files for which the outer conditions are true. -->
    
          <IfFileName glob="flume.log.*">
    
            <!-- Only allow 1 GB of files to accumulate -->
    
            <IfAccumulatedFileSize exceeds="1 GB"/>
         </IfFileName>
        </Delete>
      </DefaultRolloverStrategy>
    </RollingFile>
      </Appenders>
    
     
    
      <Loggers>
    
    <Logger name="org.apache.flume.lifecycle" level="info"/>
    
    <Logger name="org.jboss" level="WARN"/>
    
    <Logger name="org.apache.avro.ipc.netty.NettyTransceiver" level="WARN"/>
    
    <Logger name="org.apache.hadoop" level="INFO"/>
    
    <Logger name="org.apache.hadoop.hive" level="ERROR"/>
    
    # 引入控制台输出,方便学习查看日志
    
    <Root level="INFO">
    
      <AppenderRef ref="LogFile" />
    
      <AppenderRef ref="Console" />
    
    </Root>
    
      </Loggers>
    
     
    
    </Configuration>
    
    
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
    
    AI写代码

二、Flume传输配置文件

1、log日志采集传输

(1)file_to_kafka.conf
复制代码
    # Name the components on this agent
    a1.sources = r1
    a1.channels = c1
    
    # Describe/configure the source
    a1.sources.r1.type = TAILDIR
    a1.sources.r1.positionFile = /opt/module/flume/taildir_position.json
    a1.sources.r1.filegroups = f1
    a1.sources.r1.filegroups.f1 = /opt/module/applog/log/app.*
    
    
    # Use a channel which buffers events in memory
    a1.channels.c1.type = org.apache.flume.channel.kafka.KafkaChannel
    a1.channels.c1.kafka.bootstrap.servers = hadoop102:9092,hadoop103:9092,hadoop104:9092
    a1.channels.c1.kafka.topic = topic_log
    
    a1.channels.c1.parseAsFlumeEvent = false
    
    
    # Bind the source and sink to the channel
    a1.sources.r1.channels = c1
    
    
    
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
    
    AI写代码
(2)kafka_to_hdfs_log.conf
复制代码
    # Name the components on this agent
    a1.sources = r1
    a1.sinks = k1
    a1.channels = c1
    
    # Describe/configure the source
    a1.sources.r1.type = org.apache.flume.source.kafka.KafkaSource
    a1.sources.r1.batchSize = 5000
    a1.sources.r1.batchDurationMillis = 2000
    a1.sources.r1.kafka.bootstrap.servers = hadoop102:9092,hadoop103:9092,hadoop104:9092
    a1.sources.r1.kafka.topics = topic_log
    a1.sources.r1.kafka.consumer.group.id = flume1
    #a1.sources.r1.kafka.consumer.auto.offset.reset=earliest
    a1.sources.r1.interceptors = i1
    a1.sources.r1.interceptors.i1.type = com.atguigu.gmall.interceptor.TimeStampInterceptor$MyBuilder
    
    # Describe the sink
    a1.sinks.k1.type = hdfs
    a1.sinks.k1.hdfs.path = /origin_data/gmall/log/topic_log/%Y-%m-%d
    a1.sinks.k1.hdfs.filePrefix = log
    a1.sinks.k1.hdfs.round = false
    
    a1.sinks.k1.hdfs.rollInterval = 10
    a1.sinks.k1.hdfs.rollSize = 134217728
    a1.sinks.k1.hdfs.rollCount = 0
    
    #控制输出文件类型
    a1.sinks.k1.hdfs.fileType = CompressedStream
    a1.sinks.k1.hdfs.codeC = gzip
    # Use a channel which buffers events in file
    a1.channels.c1.type = file
    a1.channels.c1.checkpointDir = /opt/module/flume/checkpoint/behavior1
    a1.channels.c1.dataDirs = /opt/module/flume/data/behavior1
    a1.channels.c1.maxFileSize = 2146435071
    a1.channels.c1.capacity = 1000000
    a1.channels.c1.keep-alive = 6
    
    # Bind the source and sink to the channel
    a1.sources.r1.channels = c1
    a1.sinks.k1.channel = c1
    
    
    
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
    
    AI写代码

其中拦截器需要上传至flume/lib/interceptor

2、业务数据传输

kafka_to_hdfs_db.conf
复制代码
    #Rename
    a1.sources = r1
    a1.channels = c1
    a1.sinks = k1
    #Source
    a1.sources.r1.type = org.apache.flume.source.kafka.KafkaSource
    a1.sources.r1.batchSize = 5000
    a1.sources.r1.batchDurationMillis = 2000
    a1.sources.r1.kafka.bootstrap.servers = hadoop102:9092,hadoop103:9092
    a1.sources.r1.kafka.topics = topic_db
    a1.sources.r1.kafka.consumer.group.id = flume1
    a1.sources.r1.kafka.consumer.auto.offset.reset=earliest
    a1.sources.r1.setTopicHeader = true
    a1.sources.r1.topicHeader = topic
    a1.sources.r1.interceptors = i1
    a1.sources.r1.interceptors.i1.type = com.atguigu.gmall.interceptor.TableNameAndTimeStampInterceptor$MyBuilder
    #channel
    a1.channels.c1.type = file
    a1.channels.c1.checkpointDir = /opt/module/flume/checkpoint/behavior2
    a1.channels.c1.dataDirs = /opt/module/flume/data/behavior2/
    a1.channels.c1.maxFileSize = 2146435071
    a1.channels.c1.capacity = 1000000
    a1.channels.c1.keep-alive = 6
    
    ## sink
    a1.sinks.k1.type = hdfs
    a1.sinks.k1.hdfs.path = /origin_data/gmall/db/%{tableName}_inc/%Y-%m-%d
    a1.sinks.k1.hdfs.filePrefix = db
    a1.sinks.k1.hdfs.round = false
    a1.sinks.k1.hdfs.rollInterval = 10
    a1.sinks.k1.hdfs.rollSize = 134217728
    a1.sinks.k1.hdfs.rollCount = 0
    
    a1.sinks.k1.hdfs.fileType = CompressedStream
    a1.sinks.k1.hdfs.codeC = gzip
    
    ## 拼装
    a1.sources.r1.channels = c1
    a1.sinks.k1.channel= c1
    
    
    
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
    
    AI写代码

全部评论 (0)

还没有任何评论哟~