To give as much context as I can / is needed, I'm trying to pull some data stored on a remote postgres server (heroku) into a pandas DataFrame, using psycopg2 to connect.
I'm interested in two specific tables, users and events, and the connection works fine, because when pulling down the user data
import pandas.io.sql as sql
# [...]
users = sql.read_sql("SELECT * FROM users", conn)
after waiting a few seconds, the DataFrame is returned as expected.
<class 'pandas.core.frame.DataFrame'>
Int64Index: 67458 entries, 0 to 67457
Data columns (total 35 columns): [...]
Yet when trying to pull the bigger, heavier events data straight from ipython, after a long time, it just crashes:
In [11]: events = sql.read_sql("SELECT * FROM events", conn)
vagrant@data-science-toolbox:~$
and when trying from an iPython notebook I get the Dead kernel error
The kernel has died, would you like to restart it? If you do not restart the kernel, you will be able to save the notebook, but running code will not work until the notebook is reopened.
Update #1:
To get a better idea of the size of the events table I'm trying to pull in, here are the number of records and the number of attributes for each:
In [11]: sql.read_sql("SELECT count(*) FROM events", conn)
Out[11]:
count
0 2711453
In [12]: len(sql.read_sql("SELECT * FROM events LIMIT 1", conn).columns)
Out[12]: 18
Update #2:
Memory is definitely a bottleneck for the current implementation of read_sql
: when pulling down the events and trying to run another instance of iPython the result is
vagrant@data-science-toolbox:~$ sudo ipython
-bash: fork: Cannot allocate memory
Update #3:
I first tried with a read_sql_chunked
implementation that would just return the array of partial DataFrames:
def read_sql_chunked(query, conn, nrows, chunksize=1000):
start = 0
dfs = []
while start < nrows:
df = pd.read_sql("%s LIMIT %s OFFSET %s" % (query, chunksize, start), conn)
start += chunksize
dfs.append(df)
print "Events added: %s to %s of %s" % (start-chunksize, start, nrows)
# print "concatenating dfs"
return dfs
event_dfs = read_sql_chunked("SELECT * FROM events", conn, events_count, 100000)
and that works well, but when trying to concatenate the DataFrames, the kernel dies again.
And this is after giving the VM 2GB of RAM.
Based on Andy's explanation of read_sql
vs. read_csv
difference in implementation and performance, the next thing I tried was to append the records into a CSV and then read them all into a DataFrame:
event_dfs[0].to_csv(path+'new_events.csv', encoding='utf-8')
for df in event_dfs[1:]:
df.to_csv(path+'new_events.csv', mode='a', header=False, encoding='utf-8')
Again, the writing to CSV completes successfully – a 657MB file – but reading from the CSV never completes.
How can one approximate how much RAM would be sufficient to read say a 657MB CSV file, since 2GB seem not to be enough?
Feels like I'm missing some fundamental understanding of either DataFrames or psycopg2, but I'm stuck, I can't even pinpoint the bottleneck or where to optimize.
What's the proper strategy to pull larger amounts of data from a remote (postgres) server?
Introduction to Vaex. Vaex is a python library that is an out-of-core dataframe, which can handle up to 1 billion rows per second. 1 billion rows.
The pyarrow library is able to construct a pandas. DataFrame faster than using pandas.
The short answer is yes, there is a size limit for pandas DataFrames, but it's so large you will likely never have to worry about it. The long answer is the size limit for pandas DataFrames is 100 gigabytes (GB) of memory instead of a set number of cells.
Use efficient datatypesThe default pandas data types are not the most memory efficient. This is especially true for text data columns with relatively few unique values (commonly referred to as “low-cardinality” data). By using more efficient data types, you can store larger datasets in memory.
I suspect there's a couple of (related) things at play here causing slowness:
read_sql
is written in python so it's a little slow (especially compared to read_csv
, which is written in cython - and carefully implemented for speed!) and it relies on sqlalchemy rather than some (potentially much faster) C-DBAPI. The impetus to move to sqlalchmey was to make that move easier in the future (as well as cross-sql-platform support).
I think the immediate solution is a chunk-based approach (and there is a feature request to have this work natively in pandas read_sql
and read_sql_table
).
EDIT: As of Pandas v0.16.2 this chunk based approach is natively implemented in read_sql
.
Since you're using postgres you have access the the LIMIT and OFFSET queries, which makes chunking quite easy. (Am I right in thinking these aren't available in all sql languages?)
First, get the number of rows (or an estimate) in your table:
nrows = con.execute('SELECT count(*) FROM users').fetchone()[0] # also works with an sqlalchemy engine
Use this to iterate through the table (for debugging you could add some print statements to confirm that it was working/not crashed!) and then combine the result:
def read_sql_chunked(query, con, nrows, chunksize=1000):
start = 1
dfs = [] # Note: could probably make this neater with a generator/for loop
while start < nrows:
df = pd.read_sql("%s LIMIT %s OFFSET %s" % (query, chunksize, start), con)
dfs.append(df)
return pd.concat(dfs, ignore_index=True)
Note: this assumes that the database fits in memory! If it doesn't you'll need to work on each chunk (mapreduce style)... or invest in more memory!
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With