I'm currently playing with Apache Arrow's java API (though I use it from Scala for the code samples) to get some familiarity with this tool.
As an exercise, I chose to load a CSV file into arrow vectors and then to save these to an arrow file. The first part seemed easy enough, and I tried it like this:
val csvLines: Stream[Array[String]] = <open stream from CSV parser>
// There are other types of allocator, but things work with this one...
val allocator = new RootAllocator(Int.MaxValue)
// Initialize the vectors
val vectors = initVectors(csvLines.head, allocator)
// Put their mutators into an array for easy access
val mutators = vectors.map(_.getMutator)
// Work on the data, zipping it with its index
Stream.from(0)
.zip(csvLines.tail) // Work on the tail (head contains the headers)
.foreach(rowTup => // rowTup = (index, csvRow as an Array[String])
Range(0, rowTup._2.size) // Iterate on each column...
.foreach(columnNumber =>
writeToMutator(
mutators(columnNumber), // get that column's mutator
idx=rowTup._1, // pass the current row number
data=rowTup._2(columnNumber) // pass the entry of the curernt column
)
)
)
With initVectors()
and writeToMutator()
defined as:
def initVectors(
columns: Array[String],
alloc: RootAllocator): Array[NullableVarCharVector] = {
// Initialize a vector for each column
val vectors = columns.map(colName =>
new NullableVarCharVector(colName, alloc))
// 4096 size, for 1024 values initially. This is arbitrary
vectors.foreach(_.allocateNew(2^12,1024))
vectors
}
def writeToMutator(
mutator: NullableVarCharVector#Mutator,
idx: Int,
data: String): Unit = {
// The CSV may contain null values
if (data != null) {
val bytes = data.getBytes()
mutator.setSafe(idx, bytes, 0, bytes.length)
}
mutator.setNull(idx)
}
(I currently don't care about using the correct type, and store everything as strings, or VarChar
in arrow's terns)
So at this point I have a collection of NullableVarCharVector
and can read and write from/to them. Everything great at this point. Now, for the next step, though, I was left wondering about how to actually wrap them together and serialize them to an arrow file. I stumbled on an AbstractFieldWriter
abstract class, but how to use the implementations is unclear.
So, the question mainly is:
edited to add: The metadata description page provides a good general overview on that topic.
The api's test classes seem to contain a few things that could help, I'll post a reply with a sample once I've tried it out.
Looking at TestArrowFile.java and BaseFileTest.java I found:
So, filling up vectors now looks like:
// Open stream of rows
val csvLines: Stream[Array[String]] = <open stream from CSV parser>
// Define a parent to hold the vectors
val parent = MapVector.empty("parent", allocator)
// Create a new writer. VarCharWriterImpl would probably do as well?
val writer = new ComplexWriterImpl("root", parent)
// Initialise a writer for each column, using the header as the name
val rootWriter = writer.rootAsMap()
val writers = csvLines.head.map(colName =>
rootWriter.varChar(colName))
Stream.from(0)
.zip(csvLines.tail) // Zip the rows with their index
.foreach( rowTup => { // Iterate on each (index, row) tuple
val (idx, row) = rowTup
Range(0, row.size) // Iterate on each field of the row
.foreach(column =>
Option(row(column)) // row(column) may be null,
.foreach(str => // use the option as a null check
write(writers(column), idx, allocator, str)
)
)
}
)
toFile(parent.getChild("root"), "csv.arrow") // Save everything to a file
with write
defined as:
def write(writer: VarCharWriter, idx: Int,
allocator: BufferAllocator, data: String): Unit = {
// Set the position to the correct index
writer.setPosition(idx)
val bytes = data.getBytes()
// Apparently the allocator is required again to build a new buffer
val varchar = allocator.buffer(bytes.length)
varchar.setBytes(0, data.getBytes())
writer.writeVarChar(0, bytes.length, varchar)
}
def toFile(parent: FieldVector, fName: String): Unit = {
// Extract a schema from the parent: that's the part I struggled with in the original question
val rootSchema = new VectorSchemaRoot(parent)
val stream = new FileOutputStream(fName)
val fileWriter = new ArrowFileWriter(
rootSchema,
null, // We don't use dictionary encoding.
stream.getChannel)
// Write everything to file...
fileWriter.start()
fileWriter.writeBatch()
fileWriter.end()
stream.close()
}
With the above I'm able to save a CSV to file. I checked everything went well by reading it and converting it to a CSV again, and the content is unchanged.
Note that the ComplexWriterImpl
allows to write columns of different types, something that will come in handy to avoid storing number columns as strings.
(I'm playing with the reading side of things for now, this things will probably deserve their own SO questions.)
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With