I want to store Numpy arrays in a PostgreSQL database in binary (bytea) form. I can get this to work fine in test #1 (see below), but I don't want to have to be manipulating the data arrays before inserts and after selects every time - I want to use psycopg2's adapters and converters.
Here's what I have at the moment:
import numpy as np
import psycopg2, psycopg2.extras
def my_adapter(spectrum):
return psycopg2.Binary(spectrum)
def my_converter(my_buffer, cursor):
return np.frombuffer(my_buffer)
class MyBinaryTest():
# Connection info
user = 'postgres'
password = 'XXXXXXXXXX'
host = 'localhost'
database = 'test_binary'
def __init__(self):
pass
def set_up(self):
# Set up
connection = psycopg2.connect(host=self.host, user=self.user, password=self.password)
connection.set_isolation_level(psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT)
cursor = connection.cursor()
try: # Clear out any old test database
cursor.execute('drop database %s' % (self.database, ))
except:
pass
cursor.execute('create database %s' % (self.database, ))
cursor.close()
connection.close()
# Direct connectly to the database and set up our table
self.connection = psycopg2.connect(host=self.host, user=self.user, password=self.password, database=self.database)
self.cursor = self.connection.cursor(cursor_factory=psycopg2.extras.DictCursor)
self.cursor.execute('''CREATE TABLE spectrum (
"sid" integer not null primary key,
"data" bytea not null
);
CREATE SEQUENCE spectrum_id;
ALTER TABLE spectrum
ALTER COLUMN sid
SET DEFAULT NEXTVAL('spectrum_id');
''')
self.connection.commit()
def perform_test_one(self):
# Lets do a test
shape = (2, 100)
data = np.random.random(shape)
# Binary up the data
send_data = psycopg2.Binary(data)
self.cursor.execute('insert into spectrum (data) values (%s) returning sid;', [send_data])
self.connection.commit()
# Retrieve the data we just inserted
query = self.cursor.execute('select * from spectrum')
result = self.cursor.fetchall()
print "Type of data retrieved:", type(result[0]['data'])
# Convert it back to a numpy array of the same shape
retrieved_data = np.frombuffer(result[0]['data']).reshape(*shape)
# Ensure there was no problem
assert np.all(retrieved_data == data)
print "Everything went swimmingly in test one!"
return True
def perform_test_two(self):
if not self.use_adapters: return False
# Lets do a test
shape = (2, 100)
data = np.random.random(shape)
# No changes made to the data, as the adapter should take care of it (and it does)
self.cursor.execute('insert into spectrum (data) values (%s) returning sid;', [data])
self.connection.commit()
# Retrieve the data we just inserted
query = self.cursor.execute('select * from spectrum')
result = self.cursor.fetchall()
# No need to change the type of data, as the converter should take care of it
# (But, we never make it here)
retrieved_data = result[0]['data']
# Ensure there was no problem
assert np.all(retrieved_data == data.flatten())
print "Everything went swimmingly in test two!"
return True
def setup_adapters_and_converters(self):
# Set up test adapters
psycopg2.extensions.register_adapter(np.ndarray, my_adapter)
# Register our converter
self.cursor.execute("select null::bytea;")
my_oid = self.cursor.description[0][1]
obj = psycopg2.extensions.new_type((my_oid, ), "numpy_array", my_converter)
psycopg2.extensions.register_type(obj, self.connection)
self.connection.commit()
self.use_adapters = True
def tear_down(self):
# Tear down
self.cursor.close()
self.connection.close()
connection = psycopg2.connect(host=self.host, user=self.user, password=self.password)
connection.set_isolation_level(psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT)
cursor = connection.cursor()
cursor.execute('drop database %s' % (self.database, ))
cursor.close()
connection.close()
test = MyBinaryTest()
test.set_up()
test.perform_test_one()
test.setup_adapters_and_converters()
test.perform_test_two()
test.tear_down()
Now, test #1 works fine. When I take the code I have used in test 1 and setup a psycopg2 adapter and converter, it does not work (test 2). This is because the data being fed to the converter is not actually a buffer anymore; it's PosgreSQL's string representation of bytea. The output is as follows:
In [1]: run -i test_binary.py
Type of data retrieved: type 'buffer'>
Everything went swimmingly in test one!
ERROR: An unexpected error occurred while tokenizing input
The following traceback may be corrupted or invalid
The error message is: ('EOF in multi-line statement', (273, 0))
---------------------------------------------------------------------------
ValueError Traceback (most recent call last)
/Users/andycasey/thesis/scope/scope/test_binary.py in <module>()
155 test.perform_test_one()
156 test.setup_adapters_and_converters()
--> 157 test.perform_test_two()
158 test.tear_down()
159
/Users/andycasey/thesis/scope/scope/test_binary.py in perform_test_two(self)
101 # Retrieve the data we just inserted
102 query = self.cursor.execute('select * from spectrum')
--> 103 result = self.cursor.fetchall()
104
105 # No need to change the type of data, as the converter should take care of it
/Library/Python/2.6/site-packages/psycopg2/extras.pyc in fetchall(self)
81 def fetchall(self):
82 if self._prefetch:
---> 83 res = _cursor.fetchall(self)
84 if self._query_executed:
85 self._build_index()
/Users/andycasey/thesis/scope/scope/test_binary.py in my_converter(my_buffer, cursor)
7
8 def my_converter(my_buffer, cursor):
----> 9 return np.frombuffer(my_buffer)
10
11
ValueError: buffer size must be a multiple of element size
WARNING: Failure executing file: <test_binary.py>
In [2]: %debug
> /Users/andycasey/thesis/scope/scope/test_binary.py(9)my_converter()
8 def my_converter(my_buffer, cursor):
----> 9 return np.frombuffer(my_buffer)
10
ipdb> my_buffer
'\\x40e67378b9b8ae3f78b15ebecf20ef3f4092f00289dc803f20a843f40b9ddd3f64b6ec99bf62e83f8cea6eb60758d43f2ba47d8e6d5be73f4e88f267bbb2d83ffacc8aad2220d43fc6006b9c7eb7d33ff440cccc638de33f70e0b4b906a1e13fe0eca2af2f87c83f98d31f41e081ee3f1e6f5b8a52fdea3f80fcbd0ec3a0a93f95316c9e462eed3f83fe6d8d2463ea3fb44849fa8404d33f701be5924049df3f6ef3ca0c50f6d63f0c7b7d800cfdda3fc000e89b890c983fb32cf3e4ba1dea3f87f17f7efc06e33f2e194b361190ed3f60e955f0456d933ff24dd5aabc7eeb3f7802405af74ddc3f9ce9c3852db8e03fa0c936267c19d33f3406c35637f9ec3f288d23502e70ee3f08fe67e7ed8ec53f00f5cde29763dc3f26bcb4d362c4e23fa9e01fac6cd8e33fbec912f5ff7ae13f7fbd61e2e585ed3fa0070671e970e83f68ef1f6e0b90da3fce9ce834bfa6d43fa02b825d144e903f42912641e5aedd3f645a299de883db3fd8b5126bb8f6c23f3c5d4ae40ecccd3f5ae503835d00e13fcc784bdb7ea9c43f880ebfb30719be3f1dffcb042f58e23f44cc727ab3dfc53f1bbe477eb861e43f3c4f55f6aea5e53fdc80f6fa91d6e33f12b580ef03acd03f1cb78f8dccaac13f9ebdbd206453d43f32ffc626fe4ddc3f625ff4e2b317d33f44822e2f0d52ca3f38fea7c36ba6cb3ff0290b4707cedc3fd456190f786bcd3f7ed46219b47eda3f66fbdef755c3df3f40ccd47f88978c3f382897872cf5b73f5d24a66af5d7e13f2dd179d56ea3ee3fc4bb5b0962bcd63f20024c1c55ddb63f68a02e5f73fbd13f21eeb68b333de63f1a19dfe1b713e53f7556fedbb698e53f44eb6e9228accf3fe61a509c1d4ae43fe0fb0624828fa83f1822e55e76cdd23f801708ab685dd93f06076be2e92bed3f5ac2ff90247fed3fd42902b6b974d13f9df97b70385ce83fdabc4af1e81fe83f250611249338e73fc0251f9c9739e93f5821b6024621d63f7a7e1fc15605e73fab085fa8bb67e83fb4eb1d087ef5dd3fd1b450d406cbe13f0078ed1c422d3e3f44ed12d19085e83f117d628438daea3f15c776903519e23f747f248fa2e0c83ffcd052e9c4edc93f177a255a0a91e93fbe3b9b894d8edf3fea9fb6dd8be4e23fdc879e88e094e83f18bd28327ae3c03fc1bfd06d0379ec3fe8d7ee7e066ee03f750c4e0f4802e33fca3e4d0e34d3da3fe0578becde30c43f6044d9ad900ed23f08a2562899a3d13f5a83cf6694f3e33f001c61debd5f513fa009953fde2c9a3f29d53b02ca65e53fda066b4421a8ea3f58f074484a08cc3fe239b4b7eb57e03f1f904fe586bde43f9ce6edd599d1d13f43878f622d7ee23fd3ebab4e7904e93f7c3437ad0e16d23fac5e5e9e08a9c83f2b7b2d56db34e73f74f8cd68effeed3f4c279a9d4210c53ffafad9b31886d33f4c3eb4acc9b0dc3f6ed2f82f486edc3fc349273cbe1fec3fe2f70e89b061d83facaa25cb8fdbcd3fb0659c127fb7e83f00a224076b6da43f9ab1eb331dfade3fc86e03757e3bec3f3d00c8545ccce93f90fac6a4cc21b93f08f57560a68bc63fd8cccbabcd13b03fc679c7f9ece6df3f4a8c78aa1a1aed3ffecac18174dbe43fdfe102cffb48e93f0078f7fa27cc463fb40acdaea46ee63f54f754df4daadf3f2a9e063d0ab3da3f82a21b50d3c6d33f1182e48aafb5ed3fb67f3de3b109d63f494258c18422e13f8a5542fc1491e63f43247cbeabece13feb9355572f68eb3f3cf415eee8f1d53f887df6aab75bb43f0042cd907780523ff5e724cad881e03fdb9de04e99ffe43fd6594feb9b75ec3f6d4e6fcf7690e13fabe634f015dee13f584563d26021c93f6f1916ee57c8e13fd8906bad6fa7cd3ff8fad5b03b02eb3f1b3b87c15f16e53f4014ec100f79c73f1aee1302d960d83f45be6b695ed9e13ffc86d1d311dbdb3f089e89e6389fb93f24d742e400cbd63fa048c53d8fbf9c3f6eb1db094d81ed3f8bbf0cba79fde63f70e8f3d63c43c33ff1c5e6fed947e43f64f3a21f062ee03f0d12c4282794e03fa0a3be998572ba3f16510b776d7aeb3fb8c7ca308d2acd3f6f37eb1eb330ef3f1ba1bdb6577fe73f78d805294a05b43f0ed0bea2f180db3f5a4cce890b57ea3f2472556ba6f1e43f1a79fcc20701e53fe2ae8a1ea5f7d73fe0bd1efc12caec3ff94b1e02a75bed3f78e098184e3fea3f46ff0b2344dedb3f1cdc0f7b72efdb3f6ceb0b772b37e43f47e49b2a7088ea3f'
Does anyone know how I can either (a) de-serialize the string representation coming back to me in my_converter so I return a Numpy array each time, or (b) force PostgreSQL/psycopg2 to send the buffer representation to the converter (which I can use) instead of the string representation?
Thanks!
I'm on OS X 10.6.8 with Python 2.6.1 (r261:67515), PostgreSQL 9.0.3 and psycopg2 2.4 (dt dec pq3 ext)
The format you see in the debugger is easy to parse: it is PostgreSQL hex binary format (http://www.postgresql.org/docs/9.1/static/datatype-binary.html). psycopg can parse that format and return a buffer containing the data; you can use that buffer to obtain an array. Instead of writing a typecaster from scratch, write one invoking the original func and postprocess its result. Sorry but I can't remember its name now and I'm writing from a mobile: you may get further help from the mailing list.
Edit: complete solution.
The default bytea typecaster (which is the object that can parse the postgres binary representation and return a buffer object out of it) is psycopg2.BINARY. We can use it to create a typecaster converting to array instead:
In [1]: import psycopg2
In [2]: import numpy as np
In [3]: a = np.eye(3)
In [4]: a
Out[4]:
array([[ 1., 0., 0.],
[ 0., 1., 0.],
[ 0., 0., 1.]])
In [5]: cnn = psycopg2.connect('')
# The adapter: converts from python to postgres
# note: this only works on numpy version whose arrays
# support the buffer protocol,
# e.g. it works on 1.5.1 but not on 1.0.4 on my tests.
In [12]: def adapt_array(a):
....: return psycopg2.Binary(a)
....:
In [13]: psycopg2.extensions.register_adapter(np.ndarray, adapt_array)
# The typecaster: from postgres to python
In [21]: def typecast_array(data, cur):
....: if data is None: return None
....: buf = psycopg2.BINARY(data, cur)
....: return np.frombuffer(buf)
....:
In [24]: ARRAY = psycopg2.extensions.new_type(psycopg2.BINARY.values,
'ARRAY', typecast_array)
In [25]: psycopg2.extensions.register_type(ARRAY)
# Now it works "as expected"
In [26]: cur = cnn.cursor()
In [27]: cur.execute("select %s", (a,))
In [28]: cur.fetchone()[0]
Out[28]: array([ 1., 0., 0., 0., 1., 0., 0., 0., 1.])
As you know, np.frombuffer(a) loses the array shape, so you will have to figure out a way to preserve it.
pFor the case of numpy arrays one can avoid the buffer strategy with all its disadvantages like the loss of shape and data type. Following a stackoverflow question about storing a numpy array in sqlite3 one can easily adapt the approach for postgres.
import os
import psycopg2 as psql
import numpy as np
# converts from python to postgres
def _adapt_array(text):
out = io.BytesIO()
np.save(out, text)
out.seek(0)
return psql.Binary(out.read())
# converts from postgres to python
def _typecast_array(value, cur):
if value is None:
return None
data = psql.BINARY(value, cur)
bdata = io.BytesIO(data)
bdata.seek(0)
return np.load(bdata)
con = psql.connect('')
psql.extensions.register_adapter(np.ndarray, _adapt_array)
t_array = psql.extensions.new_type(psql.BINARY.values, "numpy", _typecast_array)
psql.extensions.register_type(t_array)
cur = con.cursor()
Now one can create and fill a table (with a
defined as in the previous post)
cur.execute("create table test (column BYTEA)")
cur.execute("insert into test values(%s)", (a,))
And restore the numpy object
cur.execute("select * from test")
cur.fetchone()[0]
Result:
array([[ 1., 0., 0.],
[ 0., 1., 0.],
[ 0., 0., 1.]])
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With