Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

SQLAlchemy: group by day over multiple tables

In my Flask application, I have something similar to a bank account: One User has one Account, credit entries are modeled as Incomings, deductions are modeled as Outgoings.

The problem:

Get an "account statement" for one user, i.e. credit entries / deductions per day, e.g.

Thu 29 Aug 2019
  Some deduction: -23.00
  Some credit: 123.00
Fri 30 Aug 2019
  Big credit: 4223.00
  Another deduction: -42.00

My data model:

This is what (a simplified version of) my models.py looks like:

from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy     import Column, Float, ForeignKey, Integer, Text, TIMESTAMP
from sqlalchemy.orm import relationship

Base = declarative_base()

class Account(Base):
    __tablename__ = 'account'
    id        = Column(Integer, primary_key=True)
    balance   = Column(Float,   nullable=False)
    userID    = Column(Integer, ForeignKey('user.id'))
    incomings = relationship("Incoming", back_populates="account")
    outgoings = relationship("Outgoing", back_populates="account")
    user      = relationship("User",     back_populates="account")

class Incoming(Base):
    __tablename__ = 'incoming'
    id          = Column(Integer,   primary_key=True)
    accountID   = Column(Integer,   ForeignKey('account.id'))
    amount      = Column(Float,     nullable=False)
    description = Column(Text,      nullable=False)
    timestamp   = Column(TIMESTAMP, nullable=False)
    account     = relationship("Account", back_populates="incomings")

class Outgoing(Base):
    __tablename__ = 'outgoing'
    id          = Column(Integer,   primary_key=True)
    accountID   = Column(Integer,   ForeignKey('account.id'))
    amount      = Column(Float,     nullable=False)
    description = Column(Text,      nullable=False)
    timestamp   = Column(TIMESTAMP, nullable=False)
    account     = relationship("Account", back_populates="outgoings")

class User(Base):
    __tablename__ = 'user'
    id      = Column(Integer,   primary_key=True)
    name    = Column(Text,      nullable=False)
    account = relationship("Account", back_populates="user")

My general intended approach:

  1. get all Incomings for user, grouped by day
  2. get all Outgoings for user, grouped by day
  3. somehow merge the two lists, grouped by day

My background:

It's been a while since I've worked with the underlying database PostgreSQL (but then, I did manage to set up a trigger function to auto-update the balance), but as far as SQLAlchemy (the ORM in use) is concerned, I merely seem to have scratched the surface.

Step 1: get all Incomings for user, grouped by day

Following the first SO hit, I tried

from sqlalchemy import func

# existing sample account ID
accountID  = 42
# not relevant to the point at hand, known to work
db_session = get_a_scoped_session_from_elsewhere()

db_incomings = db_session.query(Incoming)                         \
                         .filter(Incoming.accountID == accountID) \
                         .group_by(func.day(Incoming.timestamp))  \
                         .all()

but this fails with

ProgrammingError: (psycopg2.errors.UndefinedFunction) ...
 ... function day(timestamp without time zone) does not exist

which seems to indicate that PostgreSQL doesn't support day.

According to this SO answer,

# imports and variables as above
db_incomings = db_session.query(Incoming)                                      \
                         .filter(Incoming.accountID == accountID)              \
                         .group_by(func.date_trunc('day', Incoming.timestamp)) \
                         .all()

works for PostgreSQL, but for me fails with

ProgrammingError: (psycopg2.errors.GroupingError) ...
 ... column "incoming.id" must appear in the GROUP BY clause ...
 ... or be used in an aggregate function

When I just blindly try to do what the error message tells me and add incoming.id to the GROUP BY clause as in

db_incomings = db_session.query(Incoming)                                      \
                         .filter(Incoming.accountID == accountID)              \
                         .group_by(Incoming.id,
                                   func.date_trunc('day', Incoming.timestamp)) \
                         .all()

the code works, but does not return the wanted result; instead, I get a list of objects like

{'timestamp': datetime.datetime(2019, 8, 29, 10, 4, 27, 459000), 'id': 1, 'accountID': 42, ...}
{'timestamp': datetime.datetime(2019, 8, 29, 10, 8, 21, 493000), 'id': 2, 'accountID': 42, ...}
{'timestamp': datetime.datetime(2019, 8, 29, 10, 8, 42, 660000), 'id': 3, 'accountID': 42, ...}

which isn't really surprising, considering I'm grouping by Incoming.id.

Trying to understand the underlying issue (see e.g. here or here), it seems I cannot reference a field on the SELECT statement (i.e. the SQLAlchemy .query) if it doesn't appear on the GROUP BY clause (i.e. the SQLAlchemy .group_by). Looking at the error message, this also seems to be the case vice versa.

I've been wrecking my brain for a couple of hours now, found lots of alternatives to func.date_trunc and have 800 browser tabs open, but still no idea how to approach this.

My question: How do I need to structure / build up the SQLAlchemy query ?

like image 259
ssc Avatar asked Aug 31 '19 10:08

ssc


2 Answers

SQL works with and returns tabular data (or relations, if you prefer to think of it that way, but not all SQL tables are relations). What this implies is that a nested table such as depicted in the question is not that common a feature. There are ways to produce something of the kind in Postgresql, for example using arrays of JSON or composites, but it is entirely possible to just fetch tabular data and perform the nesting in the application. Python has itertools.groupby(), which fits the bill quite well, given sorted data.

The error column "incoming.id" must appear in the GROUP BY clause... is saying that non-aggregates in the select list, having clause, etc. must appear in the GROUP BY clause or be used in an aggregate, lest they have possibly indeterminate values. In other words the value would have to be picked from just some row in the group, because GROUP BY condenses the grouped rows into a single row, and it would be anyone's guess which row they were picked from. The implementation might allow this, like SQLite does and MySQL used to do, but the SQL standard forbids such. The exception to the rule is when there is a functional dependency; the GROUP BY clause determines the non-aggregates. Think of a join between tables A and B grouped by A's primary key. No matter which row in a group the system would pick the values for A's columns from, they would be the same since the grouping was done based on the primary key.

To address the 3 point general intended approach, one way would be to select a union of incoming and outgoing, ordered by their timestamps. Since there is no inheritance hierarchy setup––as there might not even be one, I'm not familiar with accounting––a revert to using Core and plain result tuples makes things easier in this case:

incoming = select([literal('incoming').label('type'), Incoming.__table__]).\
    where(Incoming.accountID == accountID)

outgoing = select([literal('outgoing').label('type'), Outgoing.__table__]).\
    where(Outgoing.accountID == accountID)

all_entries = incoming.union(outgoing)
all_entries = all_entries.order_by(all_entries.c.timestamp)
all_entries = db_session.execute(all_entries)

Then in order to form the nested structure itertools.groupby() is used:

date_groups = groupby(all_entries, lambda ent: ent.timestamp.date())
date_groups = [(k, [dict(ent) for ent in g]) for k, g in date_groups]

The end result is a list of 2-tuples of date and list of dictionaries of entries in ascending order. Not quite the ORM solution, but gets the job done. An example:

In [55]: session.add_all([Incoming(accountID=1, amount=1, description='incoming',
    ...:                           timestamp=datetime.utcnow() - timedelta(days=i))
    ...:                  for i in range(3)])
    ...:                  

In [56]: session.add_all([Outgoing(accountID=1, amount=2, description='outgoing',
    ...:                           timestamp=datetime.utcnow() - timedelta(days=i))
    ...:                  for i in range(3)])
    ...:                  

In [57]: session.commit()

In [58]: incoming = select([literal('incoming').label('type'), Incoming.__table__]).\
    ...:     where(Incoming.accountID == 1)
    ...: 
    ...: outgoing = select([literal('outgoing').label('type'), Outgoing.__table__]).\
    ...:     where(Outgoing.accountID == 1)
    ...: 
    ...: all_entries = incoming.union(outgoing)
    ...: all_entries = all_entries.order_by(all_entries.c.timestamp)
    ...: all_entries = db_session.execute(all_entries)

In [59]: date_groups = groupby(all_entries, lambda ent: ent.timestamp.date())
    ...: [(k, [dict(ent) for ent in g]) for k, g in date_groups]
Out[59]: 
[(datetime.date(2019, 9, 1),
  [{'accountID': 1,
    'amount': 1.0,
    'description': 'incoming',
    'id': 5,
    'timestamp': datetime.datetime(2019, 9, 1, 20, 33, 6, 101521),
    'type': 'incoming'},
   {'accountID': 1,
    'amount': 2.0,
    'description': 'outgoing',
    'id': 4,
    'timestamp': datetime.datetime(2019, 9, 1, 20, 33, 29, 420446),
    'type': 'outgoing'}]),
 (datetime.date(2019, 9, 2),
  [{'accountID': 1,
    'amount': 1.0,
    'description': 'incoming',
    'id': 4,
    'timestamp': datetime.datetime(2019, 9, 2, 20, 33, 6, 101495),
    'type': 'incoming'},
   {'accountID': 1,
    'amount': 2.0,
    'description': 'outgoing',
    'id': 3,
    'timestamp': datetime.datetime(2019, 9, 2, 20, 33, 29, 420419),
    'type': 'outgoing'}]),
 (datetime.date(2019, 9, 3),
  [{'accountID': 1,
    'amount': 1.0,
    'description': 'incoming',
    'id': 3,
    'timestamp': datetime.datetime(2019, 9, 3, 20, 33, 6, 101428),
    'type': 'incoming'},
   {'accountID': 1,
    'amount': 2.0,
    'description': 'outgoing',
    'id': 2,
    'timestamp': datetime.datetime(2019, 9, 3, 20, 33, 29, 420352),
    'type': 'outgoing'}])]

As mentioned, Postgresql can produce pretty much the same result as is using an array of JSON:

from sqlalchemy.dialects.postgresql import aggregate_order_by

incoming = select([literal('incoming').label('type'), Incoming.__table__]).\
    where(Incoming.accountID == accountID)

outgoing = select([literal('outgoing').label('type'), Outgoing.__table__]).\
    where(Outgoing.accountID == accountID)

all_entries = incoming.union(outgoing).alias('all_entries')

day = func.date_trunc('day', all_entries.c.timestamp)

stmt = select([day,
               func.array_agg(aggregate_order_by(
                   func.row_to_json(literal_column('all_entries.*')),
                   all_entries.c.timestamp))]).\
    group_by(day).\
    order_by(day)

db_session.execute(stmt).fetchall()

If in fact Incoming and Outgoing can be thought of as children of a common base, for example Entry, using unions can be somewhat automated with concrete table inheritance:

from sqlalchemy.ext.declarative import AbstractConcreteBase

class Entry(AbstractConcreteBase, Base):
    pass

class Incoming(Entry):
    __tablename__ = 'incoming'
    id          = Column(Integer,   primary_key=True)
    accountID   = Column(Integer,   ForeignKey('account.id'))
    amount      = Column(Float,     nullable=False)
    description = Column(Text,      nullable=False)
    timestamp   = Column(TIMESTAMP, nullable=False)
    account     = relationship("Account", back_populates="incomings")

    __mapper_args__ = {
        'polymorphic_identity': 'incoming',
        'concrete': True
    }

class Outgoing(Entry):
    __tablename__ = 'outgoing'
    id          = Column(Integer,   primary_key=True)
    accountID   = Column(Integer,   ForeignKey('account.id'))
    amount      = Column(Float,     nullable=False)
    description = Column(Text,      nullable=False)
    timestamp   = Column(TIMESTAMP, nullable=False)
    account     = relationship("Account", back_populates="outgoings")

    __mapper_args__ = {
        'polymorphic_identity': 'outgoing',
        'concrete': True
    }

Unfortunately using AbstractConcreteBase requires a manual call to configure_mappers() when all necessary classes have been defined; in this case the earliest possibility is after defining User, because Account depends on it through relationships:

from sqlalchemy.orm import configure_mappers
configure_mappers()

Then in order to fetch all Incoming and Outgoing in a single polymorphic ORM query use Entry:

session.query(Entry).\
    filter(Entry.accountID == accountID).\
    order_by(Entry.timestamp).\
    all()

and proceed to use itertools.groupby() as above on the resulting list of Incoming and Outgoing.


P.s. Be careful with binary floating point and money. We once had fun time figuring out why a purchase of 40.80 ended up as 40.79.

like image 50
Ilja Everilä Avatar answered Oct 17 '22 04:10

Ilja Everilä


Actually, there is no need to do groupby at SQL level as we are not doing any aggregation.

As I understand, requirement is to print details incoming and outgoing transactions of an account in chronological order with a date header on date boundary.

As there is no aggregation (like sum, count etc.) being done, groupby at SQL level does not come into picture.

So we will simply load all incoming and outgoing records for the account, put them in common list, sort the list and then group the txns by date and print

incoming = session.query(Incoming).filter(Incoming.accountID == 1).all()
outgoing = session.query(Outgoing).filter(Incoming.accountID == 1).all()

txns = [*incoming, *outgoing]

txns = sorted(txns, key=lambda t: t.timestamp)

from itertools import groupby

for d, dtxns in groupby(txns, key=lambda t: t.timestamp.date()):
    print(d)
    for txn in dtxns:
        print(txn)

Alternatively, get the incoming and outgoing records merged and sorted by date at DB level, using sql like

select *, 'incoming' as direction from incoming
union all
select *, 'outgoing' as direction from outgoing
order by timestamp

I do not have a good grip on SQLAlchemy, so will not be able to help on how to do this using ORM.

One advantage of of getting the sorting done at DB level is that, now you can iterate over records, finding the date boundary yourself, to avoid large number of records being loaded in memory, if that happens to be a concern.

d = null
for txn in session.query(......):
    if d != txn.timestamp.date():
        print(d)
        d = txn.timestamp.date()
    print(txn)

Hope this helps.

Edit:

It turns out, that because of ORM mapping, you can access the transactions of an account from Account object:

ac = session.query(Account).filter(Account.id == 1).one()
txns = sorted((*ac.incomings, *ac.outgoings), key=lambda t: t.timestamp)
like image 36
ckedar Avatar answered Oct 17 '22 02:10

ckedar