Custom source in flume

2015-04-07T06:21:56

I have created a custom source for flume and copied the jar files in the following locations :

 mkdir -p /usr/lib/flume-ng/plugins.d/MyFlumeSource/lib/MyFlumeSource.jar

 chown -R flume:flume /var/lib/flume-ng/

Also in /etc/flume-ng/conf/flume-env.sh

FLUME_CLASSPATH="/usr/lib/flume-ng/plugins.d/MyFlumeSource/lib/MyFlumeSource.jar"

Updated the Flume configuration file as

 # Name the components on this agent 
tail1.sources = seq-source  
tail1.channels = mem-channel
tail1.sinks = hdfs-sink

# Describe/configure Source
tail1.sources.seq-source.type = org.custom.flume.source.MySource

# Describe the sink
tail1.sinks.hdfs-sink.type = hdfs
tail1.sinks.hdfs-sink.hdfs.path = /user/flume
tail1.sinks.hdfs-sink.hdfs.filePrefix = log
tail1.sinks.hdfs-sink.hdfs.rollInterval = 0
tail1.sinks.hdfs-sink.hdfs.rollCount = 10000
tail1.sinks.hdfs-sink.hdfs.fileType = DataStream

# Use a channel which buffers events in file
tail1.channels.mem-channel.type = memory
tail1.channels.mem-channel.capacity = 1000
tail1.channels.mem-channel.transactionCapacity = 100

# Bind the source and sink to the channel
tail1.sources.seq-source.channels = mem-channel
tail1.sinks.hdfs-sink.channel = mem-channel

Trying to run the flume agent as

flume-ng agent  --conf /var/lib/flume-ng/plugins.d/MyFlumeSource/lib/MyFlumeSource.jar --conf-file /etc/flume-ng/conf/flume-conf.properties --name tail1

flume-ng agent --conf-file /etc/flume-ng/conf/flume-conf.properties --name tail1

In both cases I am getting the following error :

 ERROR node.PollingPropertiesFileConfigurationProvider: Failed to load configuration data. Exception follows.
org.apache.flume.FlumeException: Unable to create source: seq-source, type: org.custom.flume.source.MySource, class: org.custom.flume.source.MySource
        at org.apache.flume.source.DefaultSourceFactory.create(DefaultSourceFactory.java:48)
        at org.apache.flume.node.AbstractConfigurationProvider.loadSources(AbstractConfigurationProvider.java:322)
        at org.apache.flume.node.AbstractConfigurationProvider.getConfiguration(AbstractConfigurationProvider.java:97)
        at org.apache.flume.node.PollingPropertiesFileConfigurationProvider$FileWatcherRunnable.run(PollingPropertiesFileConfigurationProvider.java:140)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
        at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:304)
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:178)
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
        at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.InstantiationException
        at sun.reflect.InstantiationExceptionConstructorAccessorImpl.newInstance(InstantiationExceptionConstructorAccessorImpl.java:48)
        at java.lang.reflect.Constructor.newInstance(Constructor.java:526)
        at java.lang.Class.newInstance(Class.java:379)
        at org.apache.flume.source.DefaultSourceFactory.create(DefaultSourceFactory.java:44)
        ... 10 more

If any one aware of it please help me.

Copyright License:
Author:「Maddy」,Reproduced under the CC 4.0 BY-SA copyright license with link to original source & disclaimer.
Link to:https://stackoverflow.com/questions/29480641/custom-source-in-flume

About “Custom source in flume” questions

I have created a custom source for flume and copied the jar files in the following locations : mkdir -p /usr/lib/flume-ng/plugins.d/MyFlumeSource/lib/MyFlumeSource.jar chown -R flume:flume /var...
I have a homegrown event collection and processing framework which i am looking to potentially replace with Apache Flume 1.2.0 as we are going to start loading data into Hadoop. Since Flume has no...
I am trying to configure an flume agent with custom source from here, I tried to run flume agent using command flume-ng agent --conf conf --conf-file conf/twitter1.conf --name TwitterAgent B...
I have a custom source for my Flume (version 1.5.0) agent and I want to debug it. It's actually custom Twitter source, from Cloudera's example here. I have a number of questions: (1) Is it possibl...
I am trying to write my own sink and source in flume version 1.3.It doesn't have config file flume-site.xml. How could I use this custom sink after I compile my java file?
I have just started looking into flume for writing messages to hdfs using the hdfs sink. I am wondering if flume source can act as a jms consumer for my message broker. Does flume provide integra...
I am writing custom flume source and sink and currently I am not seeing a best way to re-use the existing flume-ng code. For example, I am not able to extend the AvroSource class and customize some
I want to use flume to transfert data from hdfs directory into directory in hdfs, in this transfer I want to apply processing morphline. For example: my source is "hdfs://localhost:8020/user/flume/
I am using Flume 1.6 and have a custom sink implementation. I have built a JAR file with all necessary dependencies and placed it under <FLUME_DIR>/plugins.d/MySink/lib/MySink.jar As far as ...
I'm trying to use Flume to ship my access logs to a Spark cluster. But there're a bunch of limitations that forces me to write a custom application (or a Flume source) to read the log files. What ...

Copyright License:Reproduced under the CC 4.0 BY-SA copyright license with link to original source & disclaimer.