Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Indexing in datafusion

Context: I am using datafusion to build a data validator for a csv file input.

Requirement: I want to add row number where the error occurred in output report. In pandas, I have ability to add row index which can be used for this purpose. Is there a way to achieve similar result in datafusion.

like image 211
praveent Avatar asked Jan 21 '26 17:01

praveent


2 Answers

There doesn't appear to be any easy way to do this within datafusion after opening the CSV file. But you could instead open the CSV file directly with arrow, produce a new RecordBatch that incorporates the index column, and then feed this to datafusion using a MemTable. Here's the example assuming we are only processing one batch ...

use datafusion::prelude::*;
use datafusion::datasource::MemTable;
use arrow::util::pretty::print_batches;
use arrow::record_batch::RecordBatch;
use arrow::array::{UInt32Array, Int64Array};
use arrow::datatypes::{Schema, Field, DataType};
use arrow::csv;

use std::fs::File;
use std::sync::Arc;

#[tokio::main]
async fn main() -> datafusion::error::Result<()> {


    let schema = Schema::new(vec![
        Field::new("a", DataType::Int64, false),
        Field::new("b", DataType::Int64, false),
    ]);
    
    let file = File::open("tests/example.csv")?;
    
    let mut csv = csv::Reader::new(file, Arc::new(schema), true, None, 1024, None, None);
    let batch = csv.next().unwrap()?;

    let length = batch.num_rows() as u32;
    let idx_array = UInt32Array::from((0..length).collect::<Vec<u32>>());
    let a_array = Int64Array::from(batch.column(0).as_any().downcast_ref::<Int64Array>().unwrap().values().to_vec());
    let b_array = Int64Array::from(batch.column(1).as_any().downcast_ref::<Int64Array>().unwrap().values().to_vec());
    let new_schema = Schema::new(vec![
        Field::new("idx", DataType::UInt32, true),
        Field::new("a", DataType::Int64, false),
        Field::new("b", DataType::Int64, false),
    ]);

    let new_batch = RecordBatch::try_new(Arc::new(new_schema),
        vec![Arc::new(idx_array), Arc::new(a_array), Arc::new(b_array)])?;
    let mem_table = MemTable::try_new(new_batch.schema(), vec![vec![new_batch]])?;
    
    let mut ctx = ExecutionContext::new();

    // create the dataframe
    let df = ctx.read_table(Arc::new(mem_table))?;

    let results = df.collect().await?;

    print_batches(&results).unwrap();

    // do whatever you need to do
    // do whatever you need to do
    // do whatever you need to do
    
    Ok(())
}

My example.csv looks like this ...

a,b
1,2
1,3
4,2
2,6
3,7

And the output should be ...

+-----+---+---+
| idx | a | b |
+-----+---+---+
| 0   | 1 | 2 |
| 1   | 1 | 3 |
| 2   | 4 | 2 |
| 3   | 2 | 6 |
| 4   | 3 | 7 |
+-----+---+---+

Though if you're really just in search of a crate with functionality like pandas in python, I'd urge you to checkout polars.

like image 147
Ian Graham Avatar answered Jan 23 '26 05:01

Ian Graham


Given Ian Grahams advice to checkout polars, I thought I give an example on how this could be achieved in polars as well:

use polars::prelude::*;
use std::io::Cursor;

fn main() -> Result<()> {

    // use an in memory repr for the csv
    let csv = Cursor::new(
        "a,b
1,2
1,3
4,2
2,6
3,7
",
    );

    // parse the csv into a DataFrame
    let mut df = CsvReader::new(csv).finish()?;

    // create the index column based on the dataframes height
    // note that we use the `NoNull` wrapper to create from `T` instead of `Option<T>`
    let mut idx: NoNull<UInt32Chunked> = (0..df.height() as u32).collect();
    idx.rename("idx");

    // add the index column to the DataFrame
    df.insert_at_idx(0, idx.into_inner().into_series())?;


    // print output
    dbg!(df);

    Ok(())
}

Outputs:

+-----+-----+-----+
| idx | a   | b   |
| --- | --- | --- |
| u32 | i64 | i64 |
+=====+=====+=====+
| 0   | 1   | 2   |
+-----+-----+-----+
| 1   | 1   | 3   |
+-----+-----+-----+
| 2   | 4   | 2   |
+-----+-----+-----+
| 3   | 2   | 6   |
+-----+-----+-----+
| 4   | 3   | 7   |
+-----+-----+-----+

like image 36
ritchie46 Avatar answered Jan 23 '26 06:01

ritchie46



Donate For Us

If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!