Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to resolve related foreign key at insert time using SQLAlchemy?

Motivations

I have data coming from a source that I interface with pandas DataFrame. I have a datamodel interfaced by SQLAlchemy ORM. I have normalized the datamodel into two tables for MCVE sake:

  • channel holding metadata about records (small volumetry, ~1k rows);
  • record holding record pointing to channel (higher volumetry, 90k rows/day).

The aim of channel is to avoid repetition. What I want is to setup a pythonic insertion of data into record table using SQLAlchemy with the constraint the data source does not know about channelid.

Data Source

Here a sample of data from the source (the only data I have access):

import pandas as pd
recs = [
    {'serial': '1618741320', 'source': 1, 'channel': 4, 'timestamp': pd.Timestamp('2019-01-01 08:35:00'), 'value': 12},
    {'serial': '1350397285', 'source': 2, 'channel': 3, 'timestamp': pd.Timestamp('2019-01-01 09:20:00'), 'value': 37},
    {'serial': '814387724', 'source': 2, 'channel': 1, 'timestamp': pd.Timestamp('2019-01-01 12:30:00'), 'value': 581},
    {'serial': '545914014', 'source': 3, 'channel': 0, 'timestamp': pd.Timestamp('2019-01-01 01:45:00'), 'value': 0},
    {'serial': '814387724', 'source': 0, 'channel': 5, 'timestamp': pd.Timestamp('2019-01-01 14:20:00'), 'value': 699}
]
data = pd.DataFrame(recs)

Here a sample of meta stored in channel that have been learned from the setup.

recs = [
    {'channelid': 28, 'serial': '545914014', 'source': 3, 'channel': 0},
    {'channelid': 73, 'serial': '1350397285', 'source': 2, 'channel': 3},
    {'channelid': 239, 'serial': '1618741320', 'source': 1, 'channel': 4},
    {'channelid': 245, 'serial': '814387724', 'source': 0, 'channel': 5},
    {'channelid': 259, 'serial': '814387724', 'source': 2, 'channel': 1}
]
meta= pd.DataFrame(recs)

MCVE

First let's start with a MCVE!

We define the datamodel:

from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import create_engine
from sqlalchemy import Column, Integer, Float, String, DateTime
from sqlalchemy import UniqueConstraint, ForeignKey
from sqlalchemy.orm import relationship

Base = declarative_base()
Engine = create_engine("postgresql://postgres:postgres@localhost:5432/postgres")

class Channel(Base):
    __tablename__ = 'channel'
    __table_args__ = (UniqueConstraint('serial', 'source', 'channel'),)
    id = Column(Integer, primary_key=True)
    serial = Column(String, nullable=False)
    source = Column(Integer, nullable=False)
    channel = Column(Integer, nullable=False)

class Record(Base):
    __tablename__ = 'record'
    __table_args__ = (UniqueConstraint('channelid', 'timestamp'),)
    id = Column(Integer, primary_key=True)
    channelid = Column(Integer, ForeignKey('channel.id'), nullable=False)
    timestamp = Column(DateTime, nullable=False)
    value = Column(Float, nullable=False)
    channel = relationship("Channel")

Base.metadata.drop_all(Engine)
Base.metadata.create_all(Engine)

And we feed the channel table to reflect meta we already have:

with Engine.connect() as dbcon:
    dbcon.execute(Channel.__table__.insert(), meta.to_dict(orient='records'))

Issue to solve

Now we would like to easily insert data into record table, but unfortunately we lack the channelid from our datasource (which does not know about it). Obviously this call fails:

with Engine.connect() as dbcon:
    with dbcon.begin() as dbtrans:
        dbcon.execute(Record.__table__.insert(), data.to_dict(orient='records'))
        dbtrans.commit()

Because of:

IntegrityError: (psycopg2.errors.NotNullViolation) null value in column "channelid" violates not-null constraint
DETAIL:  Failing row contains (6, null, 2019-01-01 08:35:00, 12).
 [SQL: 'INSERT INTO record (timestamp, value) VALUES (%(timestamp)s, %(value)s)'] [parameters: ({'timestamp': Timestamp('2019-01-01 08:35:00'), 'value': 12}, {'timestamp': Timestamp('2019-01-01 09:20:00'), 'value': 37}, {'timestamp': Timestamp('2019-01-01 12:30:00'), 'value': 581}, {'timestamp': Timestamp('2019-01-01 01:45:00'), 'value': 0}, {'timestamp': Timestamp('2019-01-01 14:20:00'), 'value': 699})]

We could handle it with pandas:

meta = pd.read_sql("SELECT id AS channelid, serial, source, channel FROM channel;", Engine.connect())
full = data.merge(meta, on=['serial', 'source', 'channel'])

And the previous call will work because the association with channelid is done:

   channel      serial  source           timestamp  value  channelid
0        4  1618741320       1 2019-01-01 08:35:00     12        239
1        3  1350397285       2 2019-01-01 09:20:00     37         73
2        1   814387724       2 2019-01-01 12:30:00    581        259
3        0   545914014       3 2019-01-01 01:45:00      0         28
4        5   814387724       0 2019-01-01 14:20:00    699        245

But this not the way I think it should be solved, mainly because it makes me perform the binding with pandas instead of SQLAlchemy.

I also have tried this, but it is totally inefficient for a dataset of 90k records:

from sqlalchemy.orm import sessionmaker
Session = sessionmaker(bind=Engine)
session = Session()
with session.begin_nested() as trans:
    for rec in data.to_dict(orient='records'):
        c = session.query(Channel).filter_by(**{k: rec.pop(k) for k in ['serial', 'source', 'channel']}).first()
        r = Record(channelid=c.id, **rec)
        session.add(r)

It takes almost 100 times longer than previous method using DataFrame.

Question

I have focused my energy in building a comprehensive MCVE because I am more fluent in pandas than SQLAlchemy and I could not find a solution to my problem in the SQLAlchemy documentation.

My question is: "How can I resolve the channelid to make my insert successful, in a fashion that is performant and relies on SQLAclhemy instead of pandas?"

Feel free to comment to improve this post. What am I looking for is a rational way to do it. It can imply to update the datamodel, I have this flexibility.

Update

Reading more about SQLAlchemy and testing proposals @Ramasubramanian S, the best I could achieve is:

ukeys = ['serial', 'source', 'channel']
with session.begin_nested() as trans:
    g = data.groupby(ukeys)
    for key in g.groups:
        recs = []
        for rec in data.loc[g.groups[key],:].to_dict(orient='records'):
            m = {k: rec.pop(k) for k in ukeys}
            c = session.query(Channel).filter_by(**m).first()
            #r = Record(channel=c, **rec)  
            r = Record(channelid=c.id, **rec) # Bulk Insert needs explicit id not a relationship
            recs.append(r)
        #session.add_all(recs)
        session.bulk_save_objects(recs) # Not working w/ relationship

Using relationship Record(channel=c, **rec) the method session.bulk_save_objects raises:

IntegrityError: (psycopg2.IntegrityError) ERREUR:  une valeur NULL viole la contrainte NOT NULL de la colonne « channelid »
DETAIL:  La ligne en échec contient (1, null, 2019-01-01 08:35:00, 12)

[SQL: INSERT INTO record (timestamp, value) VALUES (%(timestamp)s, %(value)s)]
[parameters: ({'timestamp': Timestamp('2019-01-01 08:35:00'), 'value': 12}, {'timestamp': Timestamp('2019-01-01 09:20:00'), 'value': 37}, {'timestamp': Timestamp('2019-01-01 12:30:00'), 'value': 581}, {'timestamp': Timestamp('2019-01-01 01:45:00'), 'value': 0}, {'timestamp': Timestamp('2019-01-01 14:20:00'), 'value': 699})]
(Background on this error at: http://sqlalche.me/e/gkpj)

Then channelid is set to NULL, it seems it cannot use relationship capability, thus we need to explicitly pass channelid to make it works.

like image 790
jlandercy Avatar asked Jun 09 '19 11:06

jlandercy


People also ask

How will you implement many to many relationship in SQLAlchemy?

Many to Many relationship between two tables is achieved by adding an association table such that it has two foreign keys - one from each table's primary key.

How do you query a one to many relationship in flask?

The comments class attribute defines a One-to-Many relationship between the Post model and the Comment model. You use the db. relationship() method, passing it the name of the comments model ( Comment in this case). You use the backref parameter to add a back reference that behaves like a column to the Comment model.

What does relationship do in SQLAlchemy?

The relationship function is a part of Relationship API of SQLAlchemy ORM package. It provides a relationship between two mapped classes. This corresponds to a parent-child or associative table relationship.


2 Answers

One way to improve the performance of inserting multiple records is to create the objects and insert into the database in bulk using either bulk_save_objects or bulk_insert_mappings.

This link shows the performance comparison of various methods of inserting multiple records.

You can find a similar answer here

Cheers

like image 117
Ramasubramanian S Avatar answered Sep 21 '22 13:09

Ramasubramanian S


I think the key to the solution is in your statement

channel holding metadata about records (small volumetry, ~1k rows);

Since you are saying this is not too high, i would just cache this in the memory

channels = session.query(Channel).all()

channel_map = {}
for c in channels:
    channel_map['-'.join([c.serial, c.source, c.channel])] = c.id

And now you can do the bulk update or any other way you prefer

with session.begin_nested() as trans:
    recs = []
    for rec in data.to_dict(orient='records'):
        channel_id = channel_map['-'.join([rec['serial'], rec['source'], rec['channel']])]
        r = Record(channelid=channel_id, **rec) # Bulk Insert needs explicit id not a relationship
        recs.append(r)
    session.add_all(recs)

Note: Above is untested code but intention is to show a possible approach

like image 45
Tarun Lalwani Avatar answered Sep 19 '22 13:09

Tarun Lalwani