Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Bulk Insert A Pandas DataFrame Using SQLAlchemy

I have some rather large pandas DataFrames and I'd like to use the new bulk SQL mappings to upload them to a Microsoft SQL Server via SQL Alchemy. The pandas.to_sql method, while nice, is slow.

I'm having trouble writing the code...

I'd like to be able to pass this function a pandas DataFrame which I'm calling table, a schema name I'm calling schema, and a table name I'm calling name. Ideally, the function will 1.) delete the table if it already exists. 2.) create a new table 3.) create a mapper and 4.) bulk insert using the mapper and pandas data. I'm stuck on part 3.

Here's my (admittedly rough) code. I'm struggling with how to get the mapper function to work with my primary keys. I don't really need primary keys but the mapper function requires it.

Thanks for the insights.

from sqlalchemy import create_engine Table, Column, MetaData
from sqlalchemy.orm import mapper, create_session
from sqlalchemy.ext.declarative import declarative_base
from pandas.io.sql import SQLTable, SQLDatabase

def bulk_upload(table, schema, name):
    e = create_engine('mssql+pyodbc://MYDB')
    s = create_session(bind=e)
    m = MetaData(bind=e,reflect=True,schema=schema)
    Base = declarative_base(bind=e,metadata=m)
    t = Table(name,m)
    m.remove(t)
    t.drop(checkfirst=True)
    sqld = SQLDatabase(e, schema=schema,meta=m)
    sqlt = SQLTable(name, sqld, table).table
    sqlt.metadata = m
    m.create_all(bind=e,tables=[sqlt])    
    class MyClass(Base):
        return
    mapper(MyClass, sqlt)    

    s.bulk_insert_mappings(MyClass, table.to_dict(orient='records'))
    return
like image 236
none Avatar asked Aug 13 '15 20:08

none


2 Answers

I ran into a similar issue with pd.to_sql taking hours to upload data. The below code bulk inserted the same data in a few seconds.

from sqlalchemy import create_engine
import psycopg2 as pg
#load python script that batch loads pandas df to sql
import cStringIO

address = 'postgresql://<username>:<pswd>@<host>:<port>/<database>'
engine = create_engine(address)
connection = engine.raw_connection()
cursor = connection.cursor()

#df is the dataframe containing an index and the columns "Event" and "Day"
#create Index column to use as primary key
df.reset_index(inplace=True)
df.rename(columns={'index':'Index'}, inplace =True)

#create the table but first drop if it already exists
command = '''DROP TABLE IF EXISTS localytics_app2;
CREATE TABLE localytics_app2
(
"Index" serial primary key,
"Event" text,
"Day" timestamp without time zone,
);'''
cursor.execute(command)
connection.commit()

#stream the data using 'to_csv' and StringIO(); then use sql's 'copy_from' function
output = cStringIO.StringIO()
#ignore the index
df.to_csv(output, sep='\t', header=False, index=False)
#jump to start of stream
output.seek(0)
contents = output.getvalue()
cur = connection.cursor()
#null values become ''
cur.copy_from(output, 'localytics_app2', null="")    
connection.commit()
cur.close()
like image 167
ansonw Avatar answered Oct 16 '22 07:10

ansonw


This might have been answered by then, but I found the solution by collating different answers on this site and aligning with SQLAlchemy's doc.

  1. The table needs to already exist in db1; with an index set up with auto_increment on.
  2. The Class Current needs to align with the dataframe imported in the CSV and the table in the db1.

Hope this helps whoever comes here and wants to mix Panda and SQLAlchemy in a quick way.

from urllib import quote_plus as urlquote
import sqlalchemy
from sqlalchemy import create_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import Column, Integer, String, Numeric
from sqlalchemy.orm import sessionmaker
import pandas as pd


# Set up of the engine to connect to the database
# the urlquote is used for passing the password which might contain special characters such as "/"
engine = create_engine('mysql://root:%s@localhost/db1' % urlquote('weirdPassword*withsp€cialcharacters'), echo=False)
conn = engine.connect()
Base = declarative_base()

#Declaration of the class in order to write into the database. This structure is standard and should align with SQLAlchemy's doc.
class Current(Base):
    __tablename__ = 'tableName'

    id = Column(Integer, primary_key=True)
    Date = Column(String(500))
    Type = Column(String(500))
    Value = Column(Numeric())

    def __repr__(self):
        return "(id='%s', Date='%s', Type='%s', Value='%s')" % (self.id, self.Date, self.Type, self.Value)

# Set up of the table in db and the file to import
fileToRead = 'file.csv'
tableToWriteTo = 'tableName'

# Panda to create a lovely dataframe
df_to_be_written = pd.read_csv(fileToRead)
# The orient='records' is the key of this, it allows to align with the format mentioned in the doc to insert in bulks.
listToWrite = df_to_be_written.to_dict(orient='records')

metadata = sqlalchemy.schema.MetaData(bind=engine,reflect=True)
table = sqlalchemy.Table(tableToWriteTo, metadata, autoload=True)

# Open the session
Session = sessionmaker(bind=engine)
session = Session()

# Inser the dataframe into the database in one bulk
conn.execute(table.insert(), listToWrite)

# Commit the changes
session.commit()

# Close the session
session.close()
like image 26
AkaGonjo Avatar answered Oct 16 '22 07:10

AkaGonjo