I have users writing AVRO files and I want to use Flume to move all those files into HDFS using Flume. So I can later use Hive or Pig to query/analyse the data.
On the client I installed flume and have a SpoolDir source and AVRO sink like this:
a1.sources = src1
a1.sinks = sink1
a1.channels = c1
a1.channels.c1.type = memory
a1.sources.src1.type = spooldir
a1.sources.src1.channels = c1
a1.sources.src1.spoolDir = {directory}
a1.sources.src1.fileHeader = true
a1.sources.src1.deserializer = avro
a1.sinks.sink1.type = avro
a1.sinks.sink1.channel = c1
a1.sinks.sink1.hostname = {IP}
a1.sinks.sink1.port = 41414
On the hadoop cluster I have this AVRO source and HDFS sink:
a1.sources = avro1
a1.sinks = sink1
a1.channels = c1
a1.channels.c1.type = memory
a1.sources.avro1.type = avro
a1.sources.avro1.channels = c1
a1.sources.avro1.bind = 0.0.0.0
a1.sources.avro1.port = 41414
a1.sinks.sink1.type = hdfs
a1.sinks.sink1.channel = c1
a1.sinks.sink1.hdfs.path = {hdfs dir}
a1.sinks.sink1.hdfs.fileSuffix = .avro
a1.sinks.sink1.hdfs.rollSize = 67108864
a1.sinks.sink1.hdfs.fileType = DataStream
The problem is that the files on HDFS are not valid AVRO files! I am using the hue UI to check whenever the file is a valid AVRO file or not. If I upload an AVRO I file that I generate on my pc to the cluster I can see its contents fine. But the files from flume are not valid AVRO files.
I tried the flume avro client that is included in flume but didn't work because it sends an flume event per line breaking the avro files, that is fixed with the spooldir
source using deserializer = avro
. So I think the problem is on the HDFS sink when is writing the files.
Using hdfs.fileType = DataStream
it writes the values from the avro fields not the whole avro file, losing all the schema information. If I use hdfs.fileType = SequenceFile
the files are not valid for some reason.
Any ideas?
Thanks
You have to add this to your hdfs sink configuration (value of this property is by default TEXT
):
a1.sinks.sink1.serializer = avro_event
This should write valid avro files, but with the default schema.
However, since your were using avro files as your input, you probably want to write avro files with the same schema. For that you can use the AvroEventSerializer from cloudera's cdk. Assuming you built the code and placed the jar in flume's lib
directory, you can now define the Serializer in the properties file:
a1.sinks.sink1.serializer = org.apache.flume.serialization.AvroEventSerializer$Builder
The serializer assumes that the avro schema is present the header of every event, either as a URL or as a LITERAL. To use the latter approach (that is less efficient, but might be easier to try out), you must tell your source on the client side to add the schema literal to every event, by adding this property:
a1.sources.src1.deserializer.schemaType = LITERAL
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With