How can I write my own aggregate functions with SQLAlchemy? As an easy example I would like to use numpy to calculate the variance. With sqlite it would look like this:
import sqlite3 as sqlite
import numpy as np
class self_written_SQLvar(object):
def __init__(self):
import numpy as np
self.values = []
def step(self, value):
self.values.append(value)
def finalize(self):
return np.array(self.values).var()
cxn = sqlite.connect(':memory:')
cur = cxn.cursor()
cxn.create_aggregate("self_written_SQLvar", 1, self_written_SQLvar)
# Now - how to use it:
cur.execute("CREATE TABLE 'mytable' ('numbers' INTEGER)")
cur.execute("INSERT INTO 'mytable' VALUES (1)")
cur.execute("INSERT INTO 'mytable' VALUES (2)")
cur.execute("INSERT INTO 'mytable' VALUES (3)")
cur.execute("INSERT INTO 'mytable' VALUES (4)")
a = cur.execute("SELECT avg(numbers), self_written_SQLvar(numbers) FROM mytable")
print a.fetchall()
>>> [(2.5, 1.25)]
The creation of new aggregate functions is backend-dependant, and must be done directly with the API of the underlining connection. SQLAlchemy offers no facility for creating those.
However after created you can just use them in SQLAlchemy normally.
Example:
import sqlalchemy
from sqlalchemy import Column, Table, create_engine, MetaData, Integer
from sqlalchemy import func, select
from sqlalchemy.pool import StaticPool
from random import randrange
import numpy
import sqlite3
class NumpyVarAggregate(object):
def __init__(self):
self.values = []
def step(self, value):
self.values.append(value)
def finalize(self):
return numpy.array(self.values).var()
def sqlite_memory_engine_creator():
con = sqlite3.connect(':memory:')
con.create_aggregate("np_var", 1, NumpyVarAggregate)
return con
e = create_engine('sqlite://', echo=True, poolclass=StaticPool,
creator=sqlite_memory_engine_creator)
m = MetaData(bind=e)
t = Table('mytable', m,
Column('id', Integer, primary_key=True),
Column('number', Integer)
)
m.create_all()
Now for the testing:
# insert 30 random-valued rows
t.insert().execute([{'number': randrange(100)} for x in xrange(30)])
for row in select([func.avg(t.c.number), func.np_var(t.c.number)]).execute():
print 'RESULT ROW: ', row
That prints (with SQLAlchemy statement echo turned on):
2009-06-15 14:55:34,171 INFO sqlalchemy.engine.base.Engine.0x...d20c PRAGMA
table_info("mytable")
2009-06-15 14:55:34,174 INFO sqlalchemy.engine.base.Engine.0x...d20c ()
2009-06-15 14:55:34,175 INFO sqlalchemy.engine.base.Engine.0x...d20c
CREATE TABLE mytable (
id INTEGER NOT NULL,
number INTEGER,
PRIMARY KEY (id)
)
2009-06-15 14:55:34,175 INFO sqlalchemy.engine.base.Engine.0x...d20c ()
2009-06-15 14:55:34,176 INFO sqlalchemy.engine.base.Engine.0x...d20c COMMIT
2009-06-15 14:55:34,177 INFO sqlalchemy.engine.base.Engine.0x...d20c INSERT
INTO mytable (number) VALUES (?)
2009-06-15 14:55:34,177 INFO sqlalchemy.engine.base.Engine.0x...d20c [[98],
[94], [7], [1], [79], [77], [51], [28], [85], [26], [34], [68], [15], [43],
[52], [97], [64], [82], [11], [71], [27], [75], [60], [85], [42], [40],
[76], [12], [81], [69]]
2009-06-15 14:55:34,178 INFO sqlalchemy.engine.base.Engine.0x...d20c COMMIT
2009-06-15 14:55:34,180 INFO sqlalchemy.engine.base.Engine.0x...d20c SELECT
avg(mytable.number) AS avg_1, np_var(mytable.number) AS np_var_1 FROM mytable
2009-06-15 14:55:34,180 INFO sqlalchemy.engine.base.Engine.0x...d20c []
RESULT ROW: (55.0, 831.0)
Note that I didn't use SQLAlchemy's ORM (just the sql expression part of SQLAlchemy was used) but you could use ORM just as well.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With