Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to stream from a generator to a polars dataframe and subsequent lazy plan?

I have a very long generator function that I want to process as a column using Polars. Due to its size, I want to run it in lazy streaming mode using the generator as a source, but I have been unable to work out how to do it (if it is possible).

Creating a normal dataframe and then converting to lazy obviously doesn't work since the generator is exhausted before the lazy plan is run with collect(). This also happens with the LazyFrame initialiser, which is just a shortcut to above.

Are there any other options that don't involve writing then scanning a csv?

Example code:

import polars as pl

def Generator():
    yield 1
    yield 2
    yield 3

generator = Generator()
df = pl.DataFrame({"a": generator}).lazy()

print(df)
# naive plan...

print([i for i in generator])
# []

generator2 = Generator()
df = pl.LazyFrame({"a": generator2})

print(df)
# naive plan...

print([i for i in generator2])
# []
like image 426
AroneyS Avatar asked Sep 03 '25 02:09

AroneyS


1 Answers

Creating a normal dataframe and then converting to lazy obviously doesn't work since the generator is exhausted before the lazy plan is run with collect().

This is a mistaken assumption. If you do:

df = pl.DataFrame({"a": generator}).lazy()

then, yes, the generator is exhausted but all of its data is in df so it doesn't matter that the generator is exhausted

If you take that back a step and imagine you're just doing df = pl.DataFrame({"a": generator}) then clearly you've got the data in df. When you tack on .lazy() all you're doing is setting a flag that any operations shouldn't be computed until .collect (or similar) is invoked.

If you've got the memory to do that then it is the best way to consume the generator.

If you don't have the memory to consume the generator then you'd have to dump it into a file but that file doesn't have to be a csv. You can use pyarrow to dump it into a parquet or ipc file which you can subsequently scan with polars.

That would look like this:

import pyarrow as pa
import polars as pl
generator = Generator()
with pa.ipc.new_file('somefile.ipc', pa.schema([('a',pa.int32())])) as writer:
    while True:
        try:
            writer.write(
                pa.record_batch([pa.array([next(generator)], pa.int32())], names=['a'])
            )
        except StopIteration:
            break

df=pl.scan_ipc('somefile.ipc')

If you want to write a parquet file then you'd use

import pyarrow.parquet as pq
import pyarrow as pa
import polars as pl
generator = Generator()
with pq.ParquetWriter('somefile.parquet',  pa.schema([('a',pa.int32())])) as writer:
    while True:
        try:
            writer.write(
                pa.record_batch([pa.array([next(generator)], pa.int32())], names=['a'])
            )
        except StopIteration:
            break
dfpq=pl.scan_parquet('somefile.parquet')
like image 176
Dean MacGregor Avatar answered Sep 07 '25 17:09

Dean MacGregor