Flume 1.7.0安装与实例
Flume安装
系统要求:
需安装JDK 1.7及以上版本
获取二进制文件,请访问页面:http://flume.apache.org/download.html。其中,1.7.0版本的获取途径为:http://www.apache.org/dyn/closer.lua/flume/1.7.0/apache-flume-1.7.0-bin.tar.gz。
2、解压
$ cp ~/Downloads/apache-flume-1.7.0-bin.tar.gz ~
$ cd
$ tar -zxvf apache-flume-1.7.0-bin.tar.gz
$ cd apache-flume-1.7.0-bin
3、创建flume-env.sh文件
$ cp conf/flume-env.sh.template conf/flume-env.sh
简单实例-传输指定文件
在系统中部署了两台设备:其中一台作为客户端(Client),另一台作为代理服务器(Agent),通过客户端将指定文件移动至代理服务器。
1、创建配置文件
根据flume自身提供的模板,创建flume.conf配置文件。
$ cp conf/flume-conf.properties.template conf/flume.conf
编辑文件flume.conf:
$ vi conf/flume.conf
在文件末尾加入以下配置:
# Define a memory channel called ch1 on agent1
agent1.channels.ch1.type = memory
# Define an Avro source called avro-source1 on agent1 and tell it
# to bind to 0.0.0.0:41414. Connect it to channel ch1.
agent1.sources.avro-source1.channels = ch1
agent1.sources.avro-source1.type = avro
agent1.sources.avro-source1.bind = 0.0.0.0
agent1.sources.avro-source1.port = 41414
# Define a logger sink that simply logs all events it receives
# and connect it to the other end of the same channel.
agent1.sinks.log-sink1.channel = ch1
agent1.sinks.log-sink1.type = logger
# Finally, now that we've defined all of our components, tell
# agent1 which ones we want to activate.
agent1.channels = ch1
agent1.sources = avro-source1
agent1.sinks = log-sink1
保存,并且退出:
2、启动flume server
在作为agent的机器上执行以下:
bin/flume-ng agent --conf ./conf/ -f conf/flume.conf -Dflume.root.logger=DEBUG,console -n agent1
3、启动一个新窗口
在 client 机器上运行以下步骤:
(因为当前环境中仅一台电脑模拟了两台服务器的情况,在一个新的终端中输入以下命令即可完成操作)
$ bin/flume-ng avro-client --conf conf -H localhost -p 41414 -F /etc/passwd -Dflume.root.logger=DEBUG,console
4、结果
这个时候,你可以看到以下消息:
2012-03-16 16:39:17,124 (main) [DEBUG - org.apache.flume.client.avro.AvroCLIClient.run(AvroCLIClient.java:175)] Finished
2012-03-16 16:39:17,127 (main) [DEBUG - org.apache.flume.client.avro.AvroCLIClient.run(AvroCLIClient.java:178)] Closing reader
2012-03-16 16:39:17,127 (main) [DEBUG - org.apache.flume.client.avro.AvroCLIClient.run(AvroCLIClient.java:183)] Closing transceiver
2012-03-16 16:39:17,129 (main) [DEBUG - org.apache.flume.client.avro.AvroCLIClient.main(AvroCLIClient.java:73)] Exiting
在前面那个开启flume server的窗口,可以看到如下消息:
2012-03-16 16:39:16,738 (New I/O server boss #1 ([id: 0x49e808ca, /0:0:0:0:0:0:0:0:41414])) [INFO - org.apache.avro.ipc.NettyServer$NettyServerAvroHandler.handleUpstream(NettyServer.java:123)] [id: 0x0b92a848, /1
27.0.0.1:39577 => /127.0.0.1:41414] OPEN
2012-03-16 16:39:16,742 (New I/O server worker #1-1) [INFO - org.apache.avro.ipc.NettyServer$NettyServerAvroHandler.handleUpstream(NettyServer.java:123)] [id: 0x0b92a848, /127.0.0.1:39577 => /127.0.0.1:41414] BOU
ND: /127.0.0.1:41414
2012-03-16 16:39:16,742 (New I/O server worker #1-1) [INFO - org.apache.avro.ipc.NettyServer$NettyServerAvroHandler.handleUpstream(NettyServer.java:123)] [id: 0x0b92a848, /127.0.0.1:39577 => /127.0.0.1:41414] CON
NECTED: /127.0.0.1:39577
2012-03-16 16:39:17,129 (New I/O server worker #1-1) [INFO - org.apache.avro.ipc.NettyServer$NettyServerAvroHandler.handleUpstream(NettyServer.java:123)] [id: 0x0b92a848, /127.0.0.1:39577 :> /127.0.0.1:41414] DISCONNECTED
2012-03-16 16:39:17,129 (New I/O server worker #1-1) [INFO - org.apache.avro.ipc.NettyServer$NettyServerAvroHandler.handleUpstream(NettyServer.java:123)] [id: 0x0b92a848, /127.0.0.1:39577 :> /127.0.0.1:41414] UNBOUND
2012-03-16 16:39:17,129 (New I/O server worker #1-1) [INFO - org.apache.avro.ipc.NettyServer$NettyServerAvroHandler.handleUpstream(NettyServer.java:123)] [id: 0x0b92a848, /127.0.0.1:39577 :> /127.0.0.1:41414] CLOSED
2012-03-16 16:39:17,302 (Thread-1) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:68)] Event: { headers:{} body:[B@5c1ae90c }
2012-03-16 16:39:17,302 (Thread-1) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:68)] Event: { headers:{} body:[B@6aba4211 }
2012-03-16 16:39:17,302 (Thread-1) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:68)] Event: { headers:{} body:[B@6a47a0d4 }
2012-03-16 16:39:17,302 (Thread-1) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:68)] Event: { headers:{} body:[B@48ff4cf }
...
简单实例-将目录文件上传到HDFS
场景:将机器上的某个文件夹下的文件上传到HDFS上。
1、配置conf/flume.conf
# Define a memory channel called ch1 on agent1
agent1.channels.ch1.type = memory
# Define an Avro source called avro-source1 on agent1 and tell it
# to bind to 0.0.0.0:41414. Connect it to channel ch1.
agent1.sources.spooldir-source1.channels = ch1
agent1.sources.spooldir-source1.type = spooldir
agent1.sources.spooldir-source1.spoolDir=/home/hadoop/flume-1.7.0/tmpData
agent1.sources.spooldir-source1.bind = 0.0.0.0
agent1.sources.spooldir-source1.port = 41414
# Define a logger sink that simply logs all events it receives
# and connect it to the other end of the same channel.
agent1.sinks.hdfs-sink1.channel = ch1
agent1.sinks.hdfs-sink1.type = hdfs
agent1.sinks.hdfs-sink1.hdfs.path = hdfs://master:9000/test
agent1.sinks.hdfs-sink1.hdfs.filePrefix = events-
agent1.sinks.hdfs-sink1.hdfs.useLocalTimeStamp = true
agent1.sinks.hdfs-sink1.hdfs.round = true
agent1.sinks.hdfs-sink1.hdfs.roundValue = 10
# Finally, now that we've defined all of our components, tell
# agent1 which ones we want to activate.
agent1.channels = ch1
agent1.sources = spooldir-source1
agent1.sinks = hdfs-sink1
位于/hadoop/flume-1.7.0/tmpData的是我的上传目录。也即我要将此目录下的所有文件上传至HDFS的hdfs://master:9000/test路径上。
注意 :
- 这样的配置会产生许多小文件,因为默认情况下,一个文件存储10个event,这个配置由rollCount控制,默认为10,此外还有一个参数为rollSize,这个是控制一个文件的大小,如果文件大于这个数值,就是另起一文件。
- 此时的文件名都是以event开头,如果想保留原来文件的名字,可以使用以下配置(其中,basenameHeader是相对source而言,filePrefix是相对sink而言,分别这样设置之后,上传到hdfs上的文件名就会变成“原始文件名.时间戳”):
agent1.sources.spooldir-source1.basenameHeader = true
agent1.sinks.hdfs-sink1.hdfs.filePrefix = %{basename}
agent1.sinks.hdfs-sink1.hdfs.rollSize = 0
agent1.sinks.hdfs-sink1.hdfs.rollCount = 0
2、启动agent
使用以下命令启动agent:
bin/flume-ng agent --conf ./conf/ -f ./conf/flume.conf --name agent1 -Dflume.root.logger=DEBUG,console
查看结果
通过Hadoop提供的Web界面可以查看上传的文件状态。
GUI界面地址为:http://master:50070/explorer.html#/test
其中master是HadoopNameNode所在的服务器名称。
在当前情境中,在将文件上传至HDFS的过程中,默认会需要用到一些Hadoop相关的jar包。
${HADOOP_HOME}share/hadoop/common/hadoop-common-2.4.0.jar
${HADOOP_HOME}share/hadoop/common/lib/commons-configuration-1.6.jar
${HADOOP_HOME}share/hadoop/common/lib/hadoop-auth-2.4.0.jar
${HADOOP_HOME}share/hadoop/hdfs/hadoop-hdfs-2.4.0.jar
当我使用HDP的Hadoop2.7时,还会用到以下jar包:
common-io-2.4.jar
htrace-core-3.1.0-incubating.jar
以上包都可以在hadoop相关的lib目录下找到
异常
Failed to start the agent instance due to missing prerequisites in the classpath. The error that occurred is as follows: java.lang.NoClassDefFoundError; org.apache.hadoop.io.SequenceFile$CompressionType.
2016-11-03 14:49:35,278 (conf-file-poller-0) [ERROR - org.apache.flume.node.PollingPropertiesFileConfigurationProvider$FileWatcherRunnable.run(PollingPropertiesFileConfigurationProvider.java:146)] Failed to start agent because dependencies were not found in classpath. Error follows.
java.lang.NoClassDefFoundError: org/apache/hadoop/io/SequenceFile$CompressionType
问题原因:缺少依赖包,这个依赖包是以下jar文件:
${HADOOP_HOME}share/hadoop/common/hadoop-common-2.4.0.jar
解决办法:获取这个jar文件,并将其被复制到flume的安装路径下的lib目录中就可以了。
java.lang.NullPointerException occurred because the Flume event headers lacked an expected timestamp.
2016-11-03 16:32:06,741 (SinkRunner-PollingRunner-DefaultSinkProcessor) [ERROR - org.apache.flume.sink.hdfs.HDFSEventSink.process(HDFSEventSink.java:447)] process failed
java.lang.NullPointerException: Expected timestamp in the Flume event headers, but it was null
at com.google.common.base.Preconditions.checkNotNull(Preconditions.java:204)
at org.apache.flume.formatter.output.BucketPath.replaceShorthand(BucketPath.java:256)
at org.apache.flume.formatter.output.BucketPath.escapeString(BucketPath.java:465)
at org.apache.flume.sink.hdfs.HDFSEventSink.process(HDFSEventSink.java:368)
at org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:67)
at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:145)
at java.lang.Thread.run(Thread.java:745)
解决方案:配置conf/flume.conf文件,并将agent1和sink1替换为具体的agent和sink。
agent1.sinks.sink1.hdfs.useLocalTimeStamp = true
java.lang.NoClassDefFoundError: org.apache.commons.configuration.Configuration
2016-11-03 16:32:55,594 (SinkRunner-PollingRunner-DefaultSinkProcessor) [ERROR - org.apache.flume.sink.hdfs.HDFSEventSink.process(HDFSEventSink.java:447)] process failed
java.lang.NoClassDefFoundError: org/apache/commons/configuration/Configuration
at org.apache.hadoop.metrics2.lib.DefaultMetricsSystem.<init>(DefaultMetricsSystem.java:38)
at org.apache.hadoop.metrics2.lib.DefaultMetricsSystem.<clinit>(DefaultMetricsSystem.java:36)
at org.apache.hadoop.security.UserGroupInformation$UgiMetrics.create(UserGroupInformation.java:106)
at org.apache.hadoop.security.UserGroupInformation.<clinit>(UserGroupInformation.java:208)
at org.apache.hadoop.fs.FileSystem$Cache$Key.<init>(FileSystem.java:2554)
at org.apache.hadoop.fs.FileSystem$Cache$Key.<init>(FileSystem.java:2546)
at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2412)
at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:368)
at org.apache.hadoop.fs.Path.getFileSystem(Path.java:296)
at org.apache.flume.sink.hdfs.BucketWriter$1.call(BucketWriter.java:240)
at org.apache.flume.sink.hdfs.BucketWriter$1.call(BucketWriter.java:232)
at org.apache.flume.sink.hdfs.BucketWriter$9$1.run(BucketWriter.java:668)
at org.apache.flume.auth.SimpleAuthenticator.execute(SimpleAuthenticator.java:50)
at org.apache.flume.sink.hdfs.BucketWriter$9.call(BucketWriter.java:665)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.ClassNotFoundException: org.apache.commons.configuration.Configuration
at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
... 18 more
解决方法:该依赖项位于com....jar包中,并且位于特定路径下(例如${HADOOP_HOME}share/hadoop/common/lib/),建议将该文件复制至flume的应用程序库目录中以完成安装
cp ${HADOOP_HOME}share/hadoop/common/lib/commons-configuration-1.6.jar ${FLUME_HOME}/lib/
java.lang.NoClassDefFoundError: org/apache/hadoop/util/PlatformName
2016-11-03 16:41:54,629 (SinkRunner-PollingRunner-DefaultSinkProcessor) [ERROR - org.apache.flume.sink.hdfs.HDFSEventSink.process(HDFSEventSink.java:447)] process failed
java.lang.NoClassDefFoundError: org/apache/hadoop/util/PlatformName
解决方法:
缺少hadoop-auth-2.4.0.jar依赖,同样将其拷贝到flume的lib目录下:
cp ${HADOOP_HOME}share/hadoop/common/lib/hadoop-auth-2.4.0.jar ${FLUME_HOME}/lib/
HDFS IO error java.io.IOException: No FileSystem for scheme: hdfs
2016-11-03 16:49:26,638 (SinkRunner-PollingRunner-DefaultSinkProcessor) [WARN - org.apache.flume.sink.hdfs.HDFSEventSink.process(HDFSEventSink.java:443)] HDFS IO error
java.io.IOException: No FileSystem for scheme: hdfs
缺少依赖:hadoop-hdfs-2.4.0.jar
cp ${HADOOP_HOME}share/hadoop/hdfs/hadoop-hdfs-2.4.0.jar ${FLUME_HOME}/lib/
java.lang.NoClassDefFoundError: org/apache/htrace/SamplerBuilder
2016-12-26 09:49:07,854 (SinkRunner-PollingRunner-DefaultSinkProcessor) [ERROR - org.apache.flume.sink.hdfs.HDFSEventSink.process(HDFSEventSink.java:447)] process failed
java.lang.NoClassDefFoundError: org/apache/htrace/SamplerBuilder
at org.apache.hadoop.hdfs.DFSClient.<init>(DFSClient.java:645)
at org.apache.hadoop.hdfs.DFSClient.<init>(DFSClient.java:629)
at org.apache.hadoop.hdfs.DistributedFileSystem.initialize(DistributedFileSystem.java:159)
at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2761)
at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:99)
at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2795)
at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2777)
at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:386)
at org.apache.hadoop.fs.Path.getFileSystem(Path.java:295)
at org.apache.flume.sink.hdfs.BucketWriter$1.call(BucketWriter.java:240)
at org.apache.flume.sink.hdfs.BucketWriter$1.call(BucketWriter.java:232)
at org.apache.flume.sink.hdfs.BucketWriter$9$1.run(BucketWriter.java:668)
at org.apache.flume.auth.SimpleAuthenticator.execute(SimpleAuthenticator.java:50)
at org.apache.flume.sink.hdfs.BucketWriter$9.call(BucketWriter.java:665)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.ClassNotFoundException: org.apache.htrace.SamplerBuilder
at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
... 18 more
Exception in thread "SinkRunner-PollingRunner-DefaultSinkProcessor" java.lang.NoClassDefFoundError: org/apache/htrace/SamplerBuilder
at org.apache.hadoop.hdfs.DFSClient.<init>(DFSClient.java:645)
at org.apache.hadoop.hdfs.DFSClient.<init>(DFSClient.java:629)
at org.apache.hadoop.hdfs.DistributedFileSystem.initialize(DistributedFileSystem.java:159)
at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2761)
at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:99)
at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2795)
at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2777)
at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:386)
at org.apache.hadoop.fs.Path.getFileSystem(Path.java:295)
at org.apache.flume.sink.hdfs.BucketWriter$1.call(BucketWriter.java:240)
at org.apache.flume.sink.hdfs.BucketWriter$1.call(BucketWriter.java:232)
at org.apache.flume.sink.hdfs.BucketWriter$9$1.run(BucketWriter.java:668)
at org.apache.flume.auth.SimpleAuthenticator.execute(SimpleAuthenticator.java:50)
at org.apache.flume.sink.hdfs.BucketWriter$9.call(BucketWriter.java:665)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.ClassNotFoundException: org.apache.htrace.SamplerBuilder
at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
... 18 more
操作步骤如下:复制指定路径下的htrace-core-3.1.0-incubating.jar jar包至Flume应用配置路径${FLUME_HOME}/lib/中进行部署。该jar包也可通过Hadoop官方安装路径获取,并位于Flume项目根目录下的lib文件夹中。
java.lang.NoClassDefFoundError: org/apache/commons/io/Charsets
2016-12-26 10:15:36,190 (SinkRunner-PollingRunner-DefaultSinkProcessor) [ERROR - org.apache.flume.sink.hdfs.HDFSEventSink.process(HDFSEventSink.java:447)] process failed
java.lang.NoClassDefFoundError: org/apache/commons/io/Charsets
at org.apache.hadoop.ipc.Server.<clinit>(Server.java:221)
at org.apache.hadoop.ipc.ProtobufRpcEngine.<clinit>(ProtobufRpcEngine.java:71)
at java.lang.Class.forName0(Native Method)
at java.lang.Class.forName(Class.java:348)
at org.apache.hadoop.conf.Configuration.getClassByNameOrNull(Configuration.java:2147)
at org.apache.hadoop.conf.Configuration.getClassByName(Configuration.java:2112)
at org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2206)
at org.apache.hadoop.ipc.RPC.getProtocolEngine(RPC.java:205)
at org.apache.hadoop.ipc.RPC.getProtocolProxy(RPC.java:579)
at org.apache.hadoop.hdfs.NameNodeProxies.createNNProxyWithClientProtocol(NameNodeProxies.java:419)
at org.apache.hadoop.hdfs.NameNodeProxies.createNonHAProxy(NameNodeProxies.java:315)
at org.apache.hadoop.hdfs.NameNodeProxies.createProxy(NameNodeProxies.java:176)
at org.apache.hadoop.hdfs.DFSClient.<init>(DFSClient.java:688)
at org.apache.hadoop.hdfs.DFSClient.<init>(DFSClient.java:629)
at org.apache.hadoop.hdfs.DistributedFileSystem.initialize(DistributedFileSystem.java:159)
at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2761)
at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:99)
at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2795)
at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2777)
at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:386)
at org.apache.hadoop.fs.Path.getFileSystem(Path.java:295)
at org.apache.flume.sink.hdfs.BucketWriter$1.call(BucketWriter.java:240)
at org.apache.flume.sink.hdfs.BucketWriter$1.call(BucketWriter.java:232)
at org.apache.flume.sink.hdfs.BucketWriter$9$1.run(BucketWriter.java:668)
at org.apache.flume.auth.SimpleAuthenticator.execute(SimpleAuthenticator.java:50)
at org.apache.flume.sink.hdfs.BucketWriter$9.call(BucketWriter.java:665)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.ClassNotFoundException: org.apache.commons.io.Charsets
at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
... 30 more
Exception in thread "SinkRunner-PollingRunner-DefaultSinkProcessor" java.lang.NoClassDefFoundError: org/apache/commons/io/Charsets
at org.apache.hadoop.ipc.Server.<clinit>(Server.java:221)
at org.apache.hadoop.ipc.ProtobufRpcEngine.<clinit>(ProtobufRpcEngine.java:71)
at java.lang.Class.forName0(Native Method)
at java.lang.Class.forName(Class.java:348)
at org.apache.hadoop.conf.Configuration.getClassByNameOrNull(Configuration.java:2147)
at org.apache.hadoop.conf.Configuration.getClassByName(Configuration.java:2112)
at org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2206)
at org.apache.hadoop.ipc.RPC.getProtocolEngine(RPC.java:205)
at org.apache.hadoop.ipc.RPC.getProtocolProxy(RPC.java:579)
at org.apache.hadoop.hdfs.NameNodeProxies.createNNProxyWithClientProtocol(NameNodeProxies.java:419)
at org.apache.hadoop.hdfs.NameNodeProxies.createNonHAProxy(NameNodeProxies.java:315)
at org.apache.hadoop.hdfs.NameNodeProxies.createProxy(NameNodeProxies.java:176)
at org.apache.hadoop.hdfs.DFSClient.<init>(DFSClient.java:688)
at org.apache.hadoop.hdfs.DFSClient.<init>(DFSClient.java:629)
at org.apache.hadoop.hdfs.DistributedFileSystem.initialize(DistributedFileSystem.java:159)
at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2761)
at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:99)
at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2795)
at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2777)
at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:386)
at org.apache.hadoop.fs.Path.getFileSystem(Path.java:295)
at org.apache.flume.sink.hdfs.BucketWriter$1.call(BucketWriter.java:240)
at org.apache.flume.sink.hdfs.BucketWriter$1.call(BucketWriter.java:232)
at org.apache.flume.sink.hdfs.BucketWriter$9$1.run(BucketWriter.java:668)
at org.apache.flume.auth.SimpleAuthenticator.execute(SimpleAuthenticator.java:50)
at org.apache.flume.sink.hdfs.BucketWriter$9.call(BucketWriter.java:665)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.ClassNotFoundException: org.apache.commons.io.Charsets
at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
... 30 more
解决方法:将该jar包替换成最新版本common-io-2.4.jar。此 jar 包仍可位于 Hadoop 目录下的 lib 文件夹中找到。
Failed to connect to server: node1/172.16.41.54:9000: try once and fail
2016-12-26 10:30:11,538 (hdfs-hdfs-sink1-call-runner-3) [WARN - org.apache.hadoop.ipc.Client$Connection.handleConnectionFailure(Client.java:886)] Failed to connect to server: node1/172.16.41.54:9000: try once and fail.
java.net.ConnectException: Connection refused
at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717)
at org.apache.hadoop.net.SocketIOWithTimeout.connect(SocketIOWithTimeout.java:206)
at org.apache.hadoop.net.NetUtils.connect(NetUtils.java:531)
at org.apache.hadoop.net.NetUtils.connect(NetUtils.java:495)
at org.apache.hadoop.ipc.Client$Connection.setupConnection(Client.java:650)
at org.apache.hadoop.ipc.Client$Connection.setupIOstreams(Client.java:745)
at org.apache.hadoop.ipc.Client$Connection.access$3200(Client.java:397)
at org.apache.hadoop.ipc.Client.getConnection(Client.java:1618)
at org.apache.hadoop.ipc.Client.call(Client.java:1449)
at org.apache.hadoop.ipc.Client.call(Client.java:1396)
at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:233)
at com.sun.proxy.$Proxy11.create(Unknown Source)
at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.create(ClientNamenodeProtocolTranslatorPB.java:311)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:278)
at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:194)
at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:176)
at com.sun.proxy.$Proxy12.create(Unknown Source)
at org.apache.hadoop.hdfs.DFSOutputStream.newStreamForCreate(DFSOutputStream.java:1719)
at org.apache.hadoop.hdfs.DFSClient.create(DFSClient.java:1699)
at org.apache.hadoop.hdfs.DFSClient.create(DFSClient.java:1634)
at org.apache.hadoop.hdfs.DistributedFileSystem$8.doCall(DistributedFileSystem.java:479)
at org.apache.hadoop.hdfs.DistributedFileSystem$8.doCall(DistributedFileSystem.java:475)
at org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
at org.apache.hadoop.hdfs.DistributedFileSystem.create(DistributedFileSystem.java:475)
at org.apache.hadoop.hdfs.DistributedFileSystem.create(DistributedFileSystem.java:416)
at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:926)
at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:907)
at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:803)
at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:792)
at org.apache.flume.sink.hdfs.HDFSSequenceFile.open(HDFSSequenceFile.java:95)
at org.apache.flume.sink.hdfs.HDFSSequenceFile.open(HDFSSequenceFile.java:78)
at org.apache.flume.sink.hdfs.HDFSSequenceFile.open(HDFSSequenceFile.java:69)
at org.apache.flume.sink.hdfs.BucketWriter$1.call(BucketWriter.java:242)
at org.apache.flume.sink.hdfs.BucketWriter$1.call(BucketWriter.java:232)
at org.apache.flume.sink.hdfs.BucketWriter$9$1.run(BucketWriter.java:668)
at org.apache.flume.auth.SimpleAuthenticator.execute(SimpleAuthenticator.java:50)
at org.apache.flume.sink.hdfs.BucketWriter$9.call(BucketWriter.java:665)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
解决办法:原因其实很简单。例如:
MalformedInputException
2016-12-27 15:53:45,467 (pool-4-thread-1) [ERROR - org.apache.flume.source.SpoolDirectorySource$SpoolDirectoryRunnable.run(SpoolDirectorySource.java:280)] FATAL: Spool Directory source spooldir-source1: { spoolDir: /home/yang/Data/flumeSpoolDir }: Uncaught exception in SpoolDirectorySource thread. Restart or reconfigure Flume to continue processing.
java.nio.charset.MalformedInputException: Input length = 1
注意:这里已经按照规则进行了改写,仅改变了表达方式,并未添加任何额外内容或解释
agent1.sources.spooldir-source1.deserializer = AVRO
参考文献
[1] https://cwiki.apache.org/confluence/display/FLUME/Getting+Started
[2] http://flume.apache.org/FlumeUserGuide.html
[3] http://stackoverflow.com/questions/30645569/expected-timestamp-in-the-flume-event-headers-but-it-was-null
相关资源
[1] https://yq.aliyun.com/articles/33873
[2] https://yq.aliyun.com/articles/50487
