Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How can I write my own aggregate functions with sqlalchemy?

How can I write my own aggregate functions with SQLAlchemy? As an easy example I would like to use numpy to calculate the variance. With sqlite it would look like this:

import sqlite3 as sqlite
import numpy as np

class self_written_SQLvar(object):
  def __init__(self):
    import numpy as np
    self.values = []
  def step(self, value):
    self.values.append(value)
  def finalize(self):
    return np.array(self.values).var()

cxn = sqlite.connect(':memory:')
cur = cxn.cursor()
cxn.create_aggregate("self_written_SQLvar", 1, self_written_SQLvar)
# Now - how to use it:
cur.execute("CREATE TABLE 'mytable' ('numbers' INTEGER)")
cur.execute("INSERT INTO 'mytable' VALUES (1)") 
cur.execute("INSERT INTO 'mytable' VALUES (2)") 
cur.execute("INSERT INTO 'mytable' VALUES (3)") 
cur.execute("INSERT INTO 'mytable' VALUES (4)")
a = cur.execute("SELECT avg(numbers), self_written_SQLvar(numbers) FROM mytable")
print a.fetchall()
>>> [(2.5, 1.25)]
like image 789
Philipp der Rautenberg Avatar asked Jun 15 '09 15:06

Philipp der Rautenberg


1 Answers

The creation of new aggregate functions is backend-dependant, and must be done directly with the API of the underlining connection. SQLAlchemy offers no facility for creating those.

However after created you can just use them in SQLAlchemy normally.

Example:

import sqlalchemy
from sqlalchemy import Column, Table, create_engine, MetaData, Integer
from sqlalchemy import func, select
from sqlalchemy.pool import StaticPool
from random import randrange
import numpy
import sqlite3

class NumpyVarAggregate(object):
  def __init__(self):
    self.values = []
  def step(self, value):
    self.values.append(value)
  def finalize(self):
    return numpy.array(self.values).var()

def sqlite_memory_engine_creator():
    con = sqlite3.connect(':memory:')
    con.create_aggregate("np_var", 1, NumpyVarAggregate)
    return con

e = create_engine('sqlite://', echo=True, poolclass=StaticPool,
                  creator=sqlite_memory_engine_creator)
m = MetaData(bind=e)
t = Table('mytable', m, 
            Column('id', Integer, primary_key=True),
            Column('number', Integer)
          )
m.create_all()

Now for the testing:

# insert 30 random-valued rows
t.insert().execute([{'number': randrange(100)} for x in xrange(30)])

for row in select([func.avg(t.c.number), func.np_var(t.c.number)]).execute():
    print 'RESULT ROW: ', row

That prints (with SQLAlchemy statement echo turned on):

2009-06-15 14:55:34,171 INFO sqlalchemy.engine.base.Engine.0x...d20c PRAGMA 
table_info("mytable")
2009-06-15 14:55:34,174 INFO sqlalchemy.engine.base.Engine.0x...d20c ()
2009-06-15 14:55:34,175 INFO sqlalchemy.engine.base.Engine.0x...d20c 
CREATE TABLE mytable (
    id INTEGER NOT NULL, 
    number INTEGER, 
    PRIMARY KEY (id)
)
2009-06-15 14:55:34,175 INFO sqlalchemy.engine.base.Engine.0x...d20c ()
2009-06-15 14:55:34,176 INFO sqlalchemy.engine.base.Engine.0x...d20c COMMIT
2009-06-15 14:55:34,177 INFO sqlalchemy.engine.base.Engine.0x...d20c INSERT
INTO mytable (number) VALUES (?)
2009-06-15 14:55:34,177 INFO sqlalchemy.engine.base.Engine.0x...d20c [[98], 
[94], [7], [1], [79], [77], [51], [28], [85], [26], [34], [68], [15], [43], 
[52], [97], [64], [82], [11], [71], [27], [75], [60], [85], [42], [40], 
[76], [12], [81], [69]]
2009-06-15 14:55:34,178 INFO sqlalchemy.engine.base.Engine.0x...d20c COMMIT
2009-06-15 14:55:34,180 INFO sqlalchemy.engine.base.Engine.0x...d20c SELECT
avg(mytable.number) AS avg_1, np_var(mytable.number) AS np_var_1 FROM mytable
2009-06-15 14:55:34,180 INFO sqlalchemy.engine.base.Engine.0x...d20c []
RESULT ROW: (55.0, 831.0)

Note that I didn't use SQLAlchemy's ORM (just the sql expression part of SQLAlchemy was used) but you could use ORM just as well.

like image 74
nosklo Avatar answered Oct 13 '22 06:10

nosklo