I have data coming from a source that I interface with pandas DataFrame. I have a datamodel interfaced by SQLAlchemy ORM. I have normalized the datamodel into two tables for MCVE sake:
channel
holding metadata about records (small volumetry, ~1k rows);record
holding record pointing to channel
(higher volumetry, 90k rows/day).The aim of channel
is to avoid repetition. What I want is to setup a pythonic insertion of data into record
table using SQLAlchemy with the constraint the data source does not know about channelid
.
Here a sample of data from the source (the only data I have access):
import pandas as pd
recs = [
{'serial': '1618741320', 'source': 1, 'channel': 4, 'timestamp': pd.Timestamp('2019-01-01 08:35:00'), 'value': 12},
{'serial': '1350397285', 'source': 2, 'channel': 3, 'timestamp': pd.Timestamp('2019-01-01 09:20:00'), 'value': 37},
{'serial': '814387724', 'source': 2, 'channel': 1, 'timestamp': pd.Timestamp('2019-01-01 12:30:00'), 'value': 581},
{'serial': '545914014', 'source': 3, 'channel': 0, 'timestamp': pd.Timestamp('2019-01-01 01:45:00'), 'value': 0},
{'serial': '814387724', 'source': 0, 'channel': 5, 'timestamp': pd.Timestamp('2019-01-01 14:20:00'), 'value': 699}
]
data = pd.DataFrame(recs)
Here a sample of meta stored in channel
that have been learned from the setup.
recs = [
{'channelid': 28, 'serial': '545914014', 'source': 3, 'channel': 0},
{'channelid': 73, 'serial': '1350397285', 'source': 2, 'channel': 3},
{'channelid': 239, 'serial': '1618741320', 'source': 1, 'channel': 4},
{'channelid': 245, 'serial': '814387724', 'source': 0, 'channel': 5},
{'channelid': 259, 'serial': '814387724', 'source': 2, 'channel': 1}
]
meta= pd.DataFrame(recs)
First let's start with a MCVE!
We define the datamodel:
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import create_engine
from sqlalchemy import Column, Integer, Float, String, DateTime
from sqlalchemy import UniqueConstraint, ForeignKey
from sqlalchemy.orm import relationship
Base = declarative_base()
Engine = create_engine("postgresql://postgres:postgres@localhost:5432/postgres")
class Channel(Base):
__tablename__ = 'channel'
__table_args__ = (UniqueConstraint('serial', 'source', 'channel'),)
id = Column(Integer, primary_key=True)
serial = Column(String, nullable=False)
source = Column(Integer, nullable=False)
channel = Column(Integer, nullable=False)
class Record(Base):
__tablename__ = 'record'
__table_args__ = (UniqueConstraint('channelid', 'timestamp'),)
id = Column(Integer, primary_key=True)
channelid = Column(Integer, ForeignKey('channel.id'), nullable=False)
timestamp = Column(DateTime, nullable=False)
value = Column(Float, nullable=False)
channel = relationship("Channel")
Base.metadata.drop_all(Engine)
Base.metadata.create_all(Engine)
And we feed the channel
table to reflect meta we already have:
with Engine.connect() as dbcon:
dbcon.execute(Channel.__table__.insert(), meta.to_dict(orient='records'))
Now we would like to easily insert data
into record
table, but unfortunately we lack the channelid
from our datasource (which does not know about it). Obviously this call fails:
with Engine.connect() as dbcon:
with dbcon.begin() as dbtrans:
dbcon.execute(Record.__table__.insert(), data.to_dict(orient='records'))
dbtrans.commit()
Because of:
IntegrityError: (psycopg2.errors.NotNullViolation) null value in column "channelid" violates not-null constraint
DETAIL: Failing row contains (6, null, 2019-01-01 08:35:00, 12).
[SQL: 'INSERT INTO record (timestamp, value) VALUES (%(timestamp)s, %(value)s)'] [parameters: ({'timestamp': Timestamp('2019-01-01 08:35:00'), 'value': 12}, {'timestamp': Timestamp('2019-01-01 09:20:00'), 'value': 37}, {'timestamp': Timestamp('2019-01-01 12:30:00'), 'value': 581}, {'timestamp': Timestamp('2019-01-01 01:45:00'), 'value': 0}, {'timestamp': Timestamp('2019-01-01 14:20:00'), 'value': 699})]
We could handle it with pandas:
meta = pd.read_sql("SELECT id AS channelid, serial, source, channel FROM channel;", Engine.connect())
full = data.merge(meta, on=['serial', 'source', 'channel'])
And the previous call will work because the association with channelid
is done:
channel serial source timestamp value channelid
0 4 1618741320 1 2019-01-01 08:35:00 12 239
1 3 1350397285 2 2019-01-01 09:20:00 37 73
2 1 814387724 2 2019-01-01 12:30:00 581 259
3 0 545914014 3 2019-01-01 01:45:00 0 28
4 5 814387724 0 2019-01-01 14:20:00 699 245
But this not the way I think it should be solved, mainly because it makes me perform the binding with pandas instead of SQLAlchemy.
I also have tried this, but it is totally inefficient for a dataset of 90k records:
from sqlalchemy.orm import sessionmaker
Session = sessionmaker(bind=Engine)
session = Session()
with session.begin_nested() as trans:
for rec in data.to_dict(orient='records'):
c = session.query(Channel).filter_by(**{k: rec.pop(k) for k in ['serial', 'source', 'channel']}).first()
r = Record(channelid=c.id, **rec)
session.add(r)
It takes almost 100 times longer than previous method using DataFrame.
I have focused my energy in building a comprehensive MCVE because I am more fluent in pandas than SQLAlchemy and I could not find a solution to my problem in the SQLAlchemy documentation.
My question is: "How can I resolve the channelid
to make my insert successful, in a fashion that is performant and relies on SQLAclhemy instead of pandas?"
Feel free to comment to improve this post. What am I looking for is a rational way to do it. It can imply to update the datamodel, I have this flexibility.
Reading more about SQLAlchemy and testing proposals @Ramasubramanian S
, the best I could achieve is:
ukeys = ['serial', 'source', 'channel']
with session.begin_nested() as trans:
g = data.groupby(ukeys)
for key in g.groups:
recs = []
for rec in data.loc[g.groups[key],:].to_dict(orient='records'):
m = {k: rec.pop(k) for k in ukeys}
c = session.query(Channel).filter_by(**m).first()
#r = Record(channel=c, **rec)
r = Record(channelid=c.id, **rec) # Bulk Insert needs explicit id not a relationship
recs.append(r)
#session.add_all(recs)
session.bulk_save_objects(recs) # Not working w/ relationship
Using relationship Record(channel=c, **rec)
the method session.bulk_save_objects
raises:
IntegrityError: (psycopg2.IntegrityError) ERREUR: une valeur NULL viole la contrainte NOT NULL de la colonne « channelid »
DETAIL: La ligne en échec contient (1, null, 2019-01-01 08:35:00, 12)
[SQL: INSERT INTO record (timestamp, value) VALUES (%(timestamp)s, %(value)s)]
[parameters: ({'timestamp': Timestamp('2019-01-01 08:35:00'), 'value': 12}, {'timestamp': Timestamp('2019-01-01 09:20:00'), 'value': 37}, {'timestamp': Timestamp('2019-01-01 12:30:00'), 'value': 581}, {'timestamp': Timestamp('2019-01-01 01:45:00'), 'value': 0}, {'timestamp': Timestamp('2019-01-01 14:20:00'), 'value': 699})]
(Background on this error at: http://sqlalche.me/e/gkpj)
Then channelid
is set to NULL
, it seems it cannot use relationship
capability, thus we need to explicitly pass channelid
to make it works.
Many to Many relationship between two tables is achieved by adding an association table such that it has two foreign keys - one from each table's primary key.
The comments class attribute defines a One-to-Many relationship between the Post model and the Comment model. You use the db. relationship() method, passing it the name of the comments model ( Comment in this case). You use the backref parameter to add a back reference that behaves like a column to the Comment model.
The relationship function is a part of Relationship API of SQLAlchemy ORM package. It provides a relationship between two mapped classes. This corresponds to a parent-child or associative table relationship.
One way to improve the performance of inserting multiple records is to create the objects and insert into the database in bulk using either bulk_save_objects or bulk_insert_mappings.
This link shows the performance comparison of various methods of inserting multiple records.
You can find a similar answer here
Cheers
I think the key to the solution is in your statement
channel holding metadata about records (small volumetry, ~1k rows);
Since you are saying this is not too high, i would just cache this in the memory
channels = session.query(Channel).all()
channel_map = {}
for c in channels:
channel_map['-'.join([c.serial, c.source, c.channel])] = c.id
And now you can do the bulk update or any other way you prefer
with session.begin_nested() as trans:
recs = []
for rec in data.to_dict(orient='records'):
channel_id = channel_map['-'.join([rec['serial'], rec['source'], rec['channel']])]
r = Record(channelid=channel_id, **rec) # Bulk Insert needs explicit id not a relationship
recs.append(r)
session.add_all(recs)
Note: Above is untested code but intention is to show a possible approach
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With