Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Large (6 million rows) pandas df causes memory error with `to_sql ` when chunksize =100, but can easily save file of 100,000 with no chunksize

Tags:

python

sql

pandas

I created a large database in Pandas, about 6 million rows of text data. I wanted to save this as a SQL database file, but when I try to save it, I get an out of memory RAM error. I even reduced the chuck size to 100 and it still crashes.

However, if I just have smaller version of that dataframe with 100,000 rows, and save it to a database with no chucksize specified, I have no issues saving tha dataframe.

This is my code

from sqlalchemy import create_engine
engine = sqlalchemy.create_engine("sqlite:///databasefile.db")
dataframe.to_sql("CS_table", engine, chunksize = 100)

My understanding was that since it's only processing 100 rows at a time, the RAM usage should reflect that of a save of 100 rows. Is there something else happening behind the scenes? Perhaps multi-threading?

Before I run this code, I am using 4.8 GB RAM, out of the 12.8 GB RAM available in Google Colab. Running the above code eats up all the RAM until the enviroment crashes.

I would like to be able to save my pandas dataframe to a SQL file without my environment crashing. The environment I am in is Google Colab. The pandas datafame is 2 columns, ~6 million rows. Each cell contains about this much text:

"The dominant sequence transduction models are based on complex recurrent or convolutional neural networks in an encoder-decoder configuration. The best performing models also connect the encoder and decoder through an attention mechanism. We propose a new simple network architecture, the Transformer, based solely on attention mechanisms, dispensing with recurrence and convolutions entirely. Experiments on two machine translation tasks show these models to be superior in quality while being more parallelizable and requiring significantly less time to train. Our model achieves 28.4 BLEU on the WMT 2014 English-to-German translation task, improving over the existing best results, including ensembles by over 2 BLEU. On the WMT 2014 English-to-French translation task, our model establishes a new single-model state-of-the-art BLEU score of 41.8 after training for 3.5 days on eight GPUs, a small fraction of the training costs of the best models from the literature. We show that the Transformer generalizes well to other tasks by applying it successfully to English constituency parsing both with large and limited training data."

Edit:

I did a keyboard interrupts at various stages. Here is the results of a keyboard interrupt after the first jump in RAM

---------------------------------------------------------------------------
KeyboardInterrupt                         Traceback (most recent call last)
<ipython-input-22-51b6e444f80d> in <module>()
----> 1 dfAllT.to_sql("CS_table23", engine, chunksize = 100)

12 frames
/usr/local/lib/python3.6/dist-packages/pandas/core/generic.py in to_sql(self, name, con, schema, if_exists, index, index_label, chunksize, dtype, method)
   2529         sql.to_sql(self, name, con, schema=schema, if_exists=if_exists,
   2530                    index=index, index_label=index_label, chunksize=chunksize,
-> 2531                    dtype=dtype, method=method)
   2532 
   2533     def to_pickle(self, path, compression='infer',

/usr/local/lib/python3.6/dist-packages/pandas/io/sql.py in to_sql(frame, name, con, schema, if_exists, index, index_label, chunksize, dtype, method)
    458     pandas_sql.to_sql(frame, name, if_exists=if_exists, index=index,
    459                       index_label=index_label, schema=schema,
--> 460                       chunksize=chunksize, dtype=dtype, method=method)
    461 
    462 

/usr/local/lib/python3.6/dist-packages/pandas/io/sql.py in to_sql(self, frame, name, if_exists, index, index_label, schema, chunksize, dtype, method)
   1172                          schema=schema, dtype=dtype)
   1173         table.create()
-> 1174         table.insert(chunksize, method=method)
   1175         if (not name.isdigit() and not name.islower()):
   1176             # check for potentially case sensitivity issues (GH7815)

/usr/local/lib/python3.6/dist-packages/pandas/io/sql.py in insert(self, chunksize, method)
    684 
    685                 chunk_iter = zip(*[arr[start_i:end_i] for arr in data_list])
--> 686                 exec_insert(conn, keys, chunk_iter)
    687 
    688     def _query_iterator(self, result, chunksize, columns, coerce_float=True,

/usr/local/lib/python3.6/dist-packages/pandas/io/sql.py in _execute_insert(self, conn, keys, data_iter)
    597         """
    598         data = [dict(zip(keys, row)) for row in data_iter]
--> 599         conn.execute(self.table.insert(), data)
    600 
    601     def _execute_insert_multi(self, conn, keys, data_iter):

/usr/local/lib/python3.6/dist-packages/sqlalchemy/engine/base.py in execute(self, object_, *multiparams, **params)
    986             raise exc.ObjectNotExecutableError(object_)
    987         else:
--> 988             return meth(self, multiparams, params)
    989 
    990     def _execute_function(self, func, multiparams, params):

/usr/local/lib/python3.6/dist-packages/sqlalchemy/sql/elements.py in _execute_on_connection(self, connection, multiparams, params)
    285     def _execute_on_connection(self, connection, multiparams, params):
    286         if self.supports_execution:
--> 287             return connection._execute_clauseelement(self, multiparams, params)
    288         else:
    289             raise exc.ObjectNotExecutableError(self)

/usr/local/lib/python3.6/dist-packages/sqlalchemy/engine/base.py in _execute_clauseelement(self, elem, multiparams, params)
   1105             distilled_params,
   1106             compiled_sql,
-> 1107             distilled_params,
   1108         )
   1109         if self._has_events or self.engine._has_events:

/usr/local/lib/python3.6/dist-packages/sqlalchemy/engine/base.py in _execute_context(self, dialect, constructor, statement, parameters, *args)
   1246         except BaseException as e:
   1247             self._handle_dbapi_exception(
-> 1248                 e, statement, parameters, cursor, context
   1249             )
   1250 

/usr/local/lib/python3.6/dist-packages/sqlalchemy/engine/base.py in _handle_dbapi_exception(self, e, statement, parameters, cursor, context)
   1466                 util.raise_from_cause(sqlalchemy_exception, exc_info)
   1467             else:
-> 1468                 util.reraise(*exc_info)
   1469 
   1470         finally:

/usr/local/lib/python3.6/dist-packages/sqlalchemy/util/compat.py in reraise(tp, value, tb, cause)
    127         if value.__traceback__ is not tb:
    128             raise value.with_traceback(tb)
--> 129         raise value
    130 
    131     def u(s):

/usr/local/lib/python3.6/dist-packages/sqlalchemy/engine/base.py in _execute_context(self, dialect, constructor, statement, parameters, *args)
   1222                 if not evt_handled:
   1223                     self.dialect.do_executemany(
-> 1224                         cursor, statement, parameters, context
   1225                     )
   1226             elif not parameters and context.no_parameters:

/usr/local/lib/python3.6/dist-packages/sqlalchemy/engine/default.py in do_executemany(self, cursor, statement, parameters, context)
    545 
    546     def do_executemany(self, cursor, statement, parameters, context=None):
--> 547         cursor.executemany(statement, parameters)
    548 
    549     def do_execute(self, cursor, statement, parameters, context=None):

KeyboardInterrupt: 

Here is the result if I do a keyboard interrupt just before it crashes

ERROR:root:Internal Python error in the inspect module.
Below is the traceback from this internal error.

Traceback (most recent call last):
  File "/usr/local/lib/python3.6/dist-packages/IPython/core/interactiveshell.py", line 2882, in run_code
    exec(code_obj, self.user_global_ns, self.user_ns)
  File "<ipython-input-24-68b60fe221fe>", line 1, in <module>
    dfAllT.to_sql("CS_table22", engine, chunksize = 100)
  File "/usr/local/lib/python3.6/dist-packages/pandas/core/generic.py", line 2531, in to_sql
    dtype=dtype, method=method)
  File "/usr/local/lib/python3.6/dist-packages/pandas/io/sql.py", line 460, in to_sql
    chunksize=chunksize, dtype=dtype, method=method)
  File "/usr/local/lib/python3.6/dist-packages/pandas/io/sql.py", line 1174, in to_sql
    table.insert(chunksize, method=method)
  File "/usr/local/lib/python3.6/dist-packages/pandas/io/sql.py", line 686, in insert
    exec_insert(conn, keys, chunk_iter)
  File "/usr/local/lib/python3.6/dist-packages/pandas/io/sql.py", line 599, in _execute_insert
    conn.execute(self.table.insert(), data)
  File "/usr/local/lib/python3.6/dist-packages/sqlalchemy/engine/base.py", line 988, in execute
    return meth(self, multiparams, params)
  File "/usr/local/lib/python3.6/dist-packages/sqlalchemy/sql/elements.py", line 287, in _execute_on_connection
    return connection._execute_clauseelement(self, multiparams, params)
  File "/usr/local/lib/python3.6/dist-packages/sqlalchemy/engine/base.py", line 1107, in _execute_clauseelement
    distilled_params,
  File "/usr/local/lib/python3.6/dist-packages/sqlalchemy/engine/base.py", line 1248, in _execute_context
    e, statement, parameters, cursor, context
  File "/usr/local/lib/python3.6/dist-packages/sqlalchemy/engine/base.py", line 1468, in _handle_dbapi_exception
    util.reraise(*exc_info)
  File "/usr/local/lib/python3.6/dist-packages/sqlalchemy/util/compat.py", line 129, in reraise
    raise value
  File "/usr/local/lib/python3.6/dist-packages/sqlalchemy/engine/base.py", line 1224, in _execute_context
    cursor, statement, parameters, context
  File "/usr/local/lib/python3.6/dist-packages/sqlalchemy/engine/default.py", line 547, in do_executemany
    cursor.executemany(statement, parameters)
KeyboardInterrupt

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/usr/local/lib/python3.6/dist-packages/IPython/core/interactiveshell.py", line 1823, in showtraceback
    stb = value._render_traceback_()
AttributeError: 'KeyboardInterrupt' object has no attribute '_render_traceback_'

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/usr/local/lib/python3.6/dist-packages/IPython/core/ultratb.py", line 1132, in get_records
    return _fixed_getinnerframes(etb, number_of_lines_of_context, tb_offset)
  File "/usr/local/lib/python3.6/dist-packages/IPython/core/ultratb.py", line 313, in wrapped
    return f(*args, **kwargs)
  File "/usr/local/lib/python3.6/dist-packages/IPython/core/ultratb.py", line 358, in _fixed_getinnerframes
    records = fix_frame_records_filenames(inspect.getinnerframes(etb, context))
  File "/usr/lib/python3.6/inspect.py", line 1488, in getinnerframes
    frameinfo = (tb.tb_frame,) + getframeinfo(tb, context)
  File "/usr/lib/python3.6/inspect.py", line 1446, in getframeinfo
    filename = getsourcefile(frame) or getfile(frame)
  File "/usr/lib/python3.6/inspect.py", line 696, in getsourcefile
    if getattr(getmodule(object, filename), '__loader__', None) is not None:
  File "/usr/lib/python3.6/inspect.py", line 739, in getmodule
    f = getabsfile(module)
  File "/usr/lib/python3.6/inspect.py", line 708, in getabsfile
    _filename = getsourcefile(object) or getfile(object)
  File "/usr/lib/python3.6/inspect.py", line 693, in getsourcefile
    if os.path.exists(filename):
  File "/usr/lib/python3.6/genericpath.py", line 19, in exists
    os.stat(path)
KeyboardInterrupt

I did another run right before it crashed, and this seemed to give another different result

ERROR:root:Internal Python error in the inspect module.
Below is the traceback from this internal error.

Traceback (most recent call last):
  File "/usr/local/lib/python3.6/dist-packages/IPython/core/interactiveshell.py", line 2882, in run_code
    exec(code_obj, self.user_global_ns, self.user_ns)
  File "<ipython-input-28-f18004debe33>", line 1, in <module>
    dfAllT.to_sql("CS_table25", engine, chunksize = 100)
  File "/usr/local/lib/python3.6/dist-packages/pandas/core/generic.py", line 2531, in to_sql
    dtype=dtype, method=method)
  File "/usr/local/lib/python3.6/dist-packages/pandas/io/sql.py", line 460, in to_sql
    chunksize=chunksize, dtype=dtype, method=method)
  File "/usr/local/lib/python3.6/dist-packages/pandas/io/sql.py", line 1174, in to_sql
    table.insert(chunksize, method=method)
  File "/usr/local/lib/python3.6/dist-packages/pandas/io/sql.py", line 686, in insert
    exec_insert(conn, keys, chunk_iter)
  File "/usr/local/lib/python3.6/dist-packages/pandas/io/sql.py", line 598, in _execute_insert
    data = [dict(zip(keys, row)) for row in data_iter]
  File "/usr/local/lib/python3.6/dist-packages/pandas/io/sql.py", line 598, in <listcomp>
    data = [dict(zip(keys, row)) for row in data_iter]
KeyboardInterrupt

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/usr/local/lib/python3.6/dist-packages/IPython/core/interactiveshell.py", line 1823, in showtraceback
    stb = value._render_traceback_()
AttributeError: 'KeyboardInterrupt' object has no attribute '_render_traceback_'

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/usr/local/lib/python3.6/dist-packages/IPython/core/ultratb.py", line 1132, in get_records
    return _fixed_getinnerframes(etb, number_of_lines_of_context, tb_offset)
  File "/usr/local/lib/python3.6/dist-packages/IPython/core/ultratb.py", line 313, in wrapped
    return f(*args, **kwargs)
  File "/usr/local/lib/python3.6/dist-packages/IPython/core/ultratb.py", line 358, in _fixed_getinnerframes
    records = fix_frame_records_filenames(inspect.getinnerframes(etb, context))
  File "/usr/lib/python3.6/inspect.py", line 1488, in getinnerframes
    frameinfo = (tb.tb_frame,) + getframeinfo(tb, context)
  File "/usr/lib/python3.6/inspect.py", line 1446, in getframeinfo
    filename = getsourcefile(frame) or getfile(frame)
  File "/usr/lib/python3.6/inspect.py", line 696, in getsourcefile
    if getattr(getmodule(object, filename), '__loader__', None) is not None:
  File "/usr/lib/python3.6/inspect.py", line 742, in getmodule
    os.path.realpath(f)] = module.__name__
  File "/usr/lib/python3.6/posixpath.py", line 388, in realpath
    path, ok = _joinrealpath(filename[:0], filename, {})
  File "/usr/lib/python3.6/posixpath.py", line 421, in _joinrealpath
    newpath = join(path, name)
KeyboardInterrupt
---------------------------------------------------------------------------

Other things I have tried:

Using dropna to drop all none/nan values

dfAllT = dfAllT.applymap(str) to make sure all my values are strings

dfAllT.reset_index(drop=True, inplace=True) to make sure index is not out of alignment.

Edit:

Like what is mentioned in the comment, I have now tried to use to_sql in a loop.

for i in range(586147):
    print(i)
    dfAllT.iloc[i*10000:(i+1)*10000].to_sql('CS_table', engine, if_exists= 'append')

This operation eventually eats at my RAM, and eventually causes a crash about halfway through. I wonder if this suggests that sqlite is holding everything in memory, and if there is a work around.

Edit:

I tried a few more things, shorter chucks, disposing the engine after each step and creating a new one. Still eventually ate of all RAM and crashed.

for i in range(586147):
    print(i)
    engine = sqlalchemy.create_engine("sqlite:///CSTitlesSummariesData.db")
    dfAllT.iloc[i*10:(i+1)*10].to_sql('CS_table', engine, index = False, if_exists= 'append')
    engine.dispose() 
    gc.collect 

My thoughts:

So it looks like somehow the entire database is somehow kept in active memory somehow.

The pandas dataframe from which this is made from was 5 gigs (or at least that's how much RAM is being before I try to convert it to sqlite). My system crashes at around 12.72 gigs. I would imagine the sqlite database takes of less RAM than the pandas dataframe.

like image 613
SantoshGupta7 Avatar asked May 29 '19 22:05

SantoshGupta7


People also ask

How do I reduce panda memory usage?

Simply Convert the int64 values as int8 and float64 as float8. This will reduce memory usage.

How long does DF To_sql take?

If I export it to csv with dataframe. to_csv , the output is an 11MB file (which is produced instantly). If, however, I export to a Microsoft SQL Server with the to_sql method, it takes between 5 and 6 minutes! No columns are text: only int, float, bool and dates.

How much memory does a Pandas Dataframe use?

The long answer is the size limit for pandas DataFrames is 100 gigabytes (GB) of memory instead of a set number of cells. In effect, this benchmark is so large that it would take an extraordinarily large data set to reach it.

How many rows can pandas process?

It can calculate basic statistics for more than a billion rows per second. It supports multiple visualizations allowing interactive exploration of big data. Dask and Vaex Dataframes are not fully compatible with Pandas Dataframes, but some most common “data wrangling” operations are supported by both tools.


3 Answers

I've used df.to_sql for 1 year and now I'm struggling with the fact I running big resources and it wasn't working. I realized that chucksize overload your memory, pandas loaded in memory and after that sent it by chuncks. I had to control directly using sql. (here is where I found the solution -> https://github.com/pandas-dev/pandas/issues/12265 I really encourage you to read untill the end.)

If you need to read data from your databaset without overloading your memory, check this piece of code:

def get_data_by_chunks(cls, table, chunksize: int) -> iter:
    with MysqlClient.get_engine().begin() as conn:
        query_count = "select COUNT(*) from my_query"
        row_count = conn.execute(query_count, where).fetchone()[0]

        for i in range(math.ceil(row_count / chunksize)):
            query = """
               SELECT * FROM my_table
               WHERE my_filiters
               LIMIT {offset}, {row_count};
             """
            yield pd.read_sql(query, conn)

for df in get_data_by_chunks(cls, table, chunksize: int):
    print(df.shape)
like image 163
juanjosegdoj Avatar answered Sep 19 '22 16:09

juanjosegdoj


From stepping through the code I think it's this line, which reads creates a bunch of DataFrames:

chunk_iter = zip(*[arr[start_i:end_i] for arr in data_list])

Which looks like it's probably a bug. Specifically this happens prior to database insertion, in preparation.

One trick you can do is CTRL-C whilst the memory is rapidly increasing, and see which line is paused on (my bet is this one).

User Edit:

Problem was solved by using

explicit loop (rather than using chunk), ie. for i in range(100): df.iloc[i * 100000:(i+1):100000].to_sql(...)

Which still resulted in memory errors, but allowed the user to continue where loop left off before the crash.

A more robust solution is to "perhaps try a raw connection, rather than using SQLEngine?\ " But user didn't have a chance to try this

like image 35
Andy Hayden Avatar answered Sep 20 '22 16:09

Andy Hayden


I know this isn't an answer to the question. I'm writing this for people who are in a hurry and to_sql runs out of memory quickly for them. I read the source and came up with my own iterative pandas-to-sqlite function which uses existing APIs and doesn't copy the dataframe. It currently uses SQLiteDatabase which comes with a warning because this code is incompatible with sqlalchemy.

This one is tested on a 3 column 94 million lines (2.2GB) indexed dataframe. On my rather old machine it takes less than 5 minutes and less than 5GB RAM to insert the entire data. I also added tqdm for a good measure.

import pandas as pd
from pandas.io.sql import SQLiteDatabase, SQLiteTable
import sqlite3
from tqdm import tqdm

def df_to_sqlite(df: pd.DataFrame, db_file_name: str, table_name: str, chunk_size = 1_000_000):
    # see https://stackoverflow.com/a/70488765/227755
    con = sqlite3.connect(db_file_name)
    db = SQLiteDatabase(con=con)
    table = SQLiteTable(table_name, db, df, index=True, if_exists="fail", dtype=None)
    table.create()  # can be optimized further by postponing index creation, but that means we use private/protected APIs.
    insert = table.insert_statement(num_rows=1)  # single insert statement
    it = df.itertuples(index=True, name=None)  # just regular tuples
    pb = tqdm(it, total=len(df))  # not needed but nice to have
    with con:
        while True:
            con.execute("begin")
            try:
                for c in range(0, chunk_size):
                    row = next(it, None)
                    if row is None:
                        pb.update(c)
                        return
                    con.execute(insert, row)
                pb.update(chunk_size)
            finally:
                con.execute("commit")
like image 32
nurettin Avatar answered Sep 18 '22 16:09

nurettin