Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Using a psycopg2 converter to retrieve bytea data from PostgreSQL

I want to store Numpy arrays in a PostgreSQL database in binary (bytea) form. I can get this to work fine in test #1 (see below), but I don't want to have to be manipulating the data arrays before inserts and after selects every time - I want to use psycopg2's adapters and converters.

Here's what I have at the moment:

import numpy as np
import psycopg2, psycopg2.extras


def my_adapter(spectrum):
    return psycopg2.Binary(spectrum)

def my_converter(my_buffer, cursor):
    return np.frombuffer(my_buffer)


class MyBinaryTest():

    # Connection info
    user = 'postgres'
    password = 'XXXXXXXXXX'
    host = 'localhost'
    database = 'test_binary'


    def __init__(self):
        pass

    def set_up(self):

        # Set up
        connection = psycopg2.connect(host=self.host, user=self.user, password=self.password)

        connection.set_isolation_level(psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT)

        cursor = connection.cursor()
        try: # Clear out any old test database
            cursor.execute('drop database %s' % (self.database, ))
        except:
            pass

        cursor.execute('create database %s' % (self.database, ))
        cursor.close()
        connection.close()

        # Direct connectly to the database and set up our table            
        self.connection = psycopg2.connect(host=self.host, user=self.user, password=self.password, database=self.database)
        self.cursor = self.connection.cursor(cursor_factory=psycopg2.extras.DictCursor)

        self.cursor.execute('''CREATE TABLE spectrum (
            "sid" integer not null primary key,
            "data" bytea not null
            );

            CREATE SEQUENCE spectrum_id;
            ALTER TABLE spectrum
                ALTER COLUMN sid
                    SET DEFAULT NEXTVAL('spectrum_id');
            ''')
        self.connection.commit()



    def perform_test_one(self):

        # Lets do a test

        shape = (2, 100)
        data = np.random.random(shape)

        # Binary up the data
        send_data = psycopg2.Binary(data)

        self.cursor.execute('insert into spectrum (data) values (%s) returning sid;', [send_data])
        self.connection.commit()

        # Retrieve the data we just inserted
        query = self.cursor.execute('select * from spectrum')
        result = self.cursor.fetchall()

        print "Type of data retrieved:", type(result[0]['data'])

        # Convert it back to a numpy array of the same shape
        retrieved_data = np.frombuffer(result[0]['data']).reshape(*shape)

        # Ensure there was no problem
        assert np.all(retrieved_data == data)
        print "Everything went swimmingly in test one!"

        return True

    def perform_test_two(self):

        if not self.use_adapters: return False

        # Lets do a test

        shape = (2, 100)
        data = np.random.random(shape)

        # No changes made to the data, as the adapter should take care of it (and it does)

        self.cursor.execute('insert into spectrum (data) values (%s) returning sid;', [data])
        self.connection.commit()

        # Retrieve the data we just inserted
        query = self.cursor.execute('select * from spectrum')
        result = self.cursor.fetchall()

        # No need to change the type of data, as the converter should take care of it
        # (But, we never make it here)

        retrieved_data = result[0]['data']

        # Ensure there was no problem
        assert np.all(retrieved_data == data.flatten())
        print "Everything went swimmingly in test two!"

        return True


    def setup_adapters_and_converters(self):

        # Set up test adapters
        psycopg2.extensions.register_adapter(np.ndarray, my_adapter)

        # Register our converter
        self.cursor.execute("select null::bytea;")
        my_oid = self.cursor.description[0][1]

        obj = psycopg2.extensions.new_type((my_oid, ), "numpy_array", my_converter)
        psycopg2.extensions.register_type(obj, self.connection)

        self.connection.commit()

        self.use_adapters = True


    def tear_down(self):

        # Tear down

        self.cursor.close()
        self.connection.close()

        connection = psycopg2.connect(host=self.host, user=self.user, password=self.password)

        connection.set_isolation_level(psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT)

        cursor = connection.cursor()
        cursor.execute('drop database %s' % (self.database, ))
        cursor.close()
        connection.close()


test = MyBinaryTest()
test.set_up()
test.perform_test_one()
test.setup_adapters_and_converters()
test.perform_test_two()
test.tear_down()

Now, test #1 works fine. When I take the code I have used in test 1 and setup a psycopg2 adapter and converter, it does not work (test 2). This is because the data being fed to the converter is not actually a buffer anymore; it's PosgreSQL's string representation of bytea. The output is as follows:

In [1]: run -i test_binary.py
Type of data retrieved: type 'buffer'>
Everything went swimmingly in test one!
ERROR: An unexpected error occurred while tokenizing input
The following traceback may be corrupted or invalid
The error message is: ('EOF in multi-line statement', (273, 0))

---------------------------------------------------------------------------
ValueError                                Traceback (most recent call last)

/Users/andycasey/thesis/scope/scope/test_binary.py in <module>()
    155 test.perform_test_one()
    156 test.setup_adapters_and_converters()
--> 157 test.perform_test_two()
    158 test.tear_down()
    159 

/Users/andycasey/thesis/scope/scope/test_binary.py in perform_test_two(self)
    101         # Retrieve the data we just inserted

    102         query = self.cursor.execute('select * from spectrum')
--> 103         result = self.cursor.fetchall()
    104 
    105         # No need to change the type of data, as the converter should take care of it


/Library/Python/2.6/site-packages/psycopg2/extras.pyc in fetchall(self)
     81     def fetchall(self):
     82         if self._prefetch:
---> 83             res = _cursor.fetchall(self)
     84         if self._query_executed:
     85             self._build_index()

/Users/andycasey/thesis/scope/scope/test_binary.py in my_converter(my_buffer, cursor)
      7 
      8 def my_converter(my_buffer, cursor):
----> 9     return np.frombuffer(my_buffer)
     10 
     11 

ValueError: buffer size must be a multiple of element size
WARNING: Failure executing file: <test_binary.py>

In [2]: %debug
> /Users/andycasey/thesis/scope/scope/test_binary.py(9)my_converter()
      8 def my_converter(my_buffer, cursor):
----> 9     return np.frombuffer(my_buffer)
     10 

ipdb> my_buffer
'\\x40e67378b9b8ae3f78b15ebecf20ef3f4092f00289dc803f20a843f40b9ddd3f64b6ec99bf62e83f8cea6eb60758d43f2ba47d8e6d5be73f4e88f267bbb2d83ffacc8aad2220d43fc6006b9c7eb7d33ff440cccc638de33f70e0b4b906a1e13fe0eca2af2f87c83f98d31f41e081ee3f1e6f5b8a52fdea3f80fcbd0ec3a0a93f95316c9e462eed3f83fe6d8d2463ea3fb44849fa8404d33f701be5924049df3f6ef3ca0c50f6d63f0c7b7d800cfdda3fc000e89b890c983fb32cf3e4ba1dea3f87f17f7efc06e33f2e194b361190ed3f60e955f0456d933ff24dd5aabc7eeb3f7802405af74ddc3f9ce9c3852db8e03fa0c936267c19d33f3406c35637f9ec3f288d23502e70ee3f08fe67e7ed8ec53f00f5cde29763dc3f26bcb4d362c4e23fa9e01fac6cd8e33fbec912f5ff7ae13f7fbd61e2e585ed3fa0070671e970e83f68ef1f6e0b90da3fce9ce834bfa6d43fa02b825d144e903f42912641e5aedd3f645a299de883db3fd8b5126bb8f6c23f3c5d4ae40ecccd3f5ae503835d00e13fcc784bdb7ea9c43f880ebfb30719be3f1dffcb042f58e23f44cc727ab3dfc53f1bbe477eb861e43f3c4f55f6aea5e53fdc80f6fa91d6e33f12b580ef03acd03f1cb78f8dccaac13f9ebdbd206453d43f32ffc626fe4ddc3f625ff4e2b317d33f44822e2f0d52ca3f38fea7c36ba6cb3ff0290b4707cedc3fd456190f786bcd3f7ed46219b47eda3f66fbdef755c3df3f40ccd47f88978c3f382897872cf5b73f5d24a66af5d7e13f2dd179d56ea3ee3fc4bb5b0962bcd63f20024c1c55ddb63f68a02e5f73fbd13f21eeb68b333de63f1a19dfe1b713e53f7556fedbb698e53f44eb6e9228accf3fe61a509c1d4ae43fe0fb0624828fa83f1822e55e76cdd23f801708ab685dd93f06076be2e92bed3f5ac2ff90247fed3fd42902b6b974d13f9df97b70385ce83fdabc4af1e81fe83f250611249338e73fc0251f9c9739e93f5821b6024621d63f7a7e1fc15605e73fab085fa8bb67e83fb4eb1d087ef5dd3fd1b450d406cbe13f0078ed1c422d3e3f44ed12d19085e83f117d628438daea3f15c776903519e23f747f248fa2e0c83ffcd052e9c4edc93f177a255a0a91e93fbe3b9b894d8edf3fea9fb6dd8be4e23fdc879e88e094e83f18bd28327ae3c03fc1bfd06d0379ec3fe8d7ee7e066ee03f750c4e0f4802e33fca3e4d0e34d3da3fe0578becde30c43f6044d9ad900ed23f08a2562899a3d13f5a83cf6694f3e33f001c61debd5f513fa009953fde2c9a3f29d53b02ca65e53fda066b4421a8ea3f58f074484a08cc3fe239b4b7eb57e03f1f904fe586bde43f9ce6edd599d1d13f43878f622d7ee23fd3ebab4e7904e93f7c3437ad0e16d23fac5e5e9e08a9c83f2b7b2d56db34e73f74f8cd68effeed3f4c279a9d4210c53ffafad9b31886d33f4c3eb4acc9b0dc3f6ed2f82f486edc3fc349273cbe1fec3fe2f70e89b061d83facaa25cb8fdbcd3fb0659c127fb7e83f00a224076b6da43f9ab1eb331dfade3fc86e03757e3bec3f3d00c8545ccce93f90fac6a4cc21b93f08f57560a68bc63fd8cccbabcd13b03fc679c7f9ece6df3f4a8c78aa1a1aed3ffecac18174dbe43fdfe102cffb48e93f0078f7fa27cc463fb40acdaea46ee63f54f754df4daadf3f2a9e063d0ab3da3f82a21b50d3c6d33f1182e48aafb5ed3fb67f3de3b109d63f494258c18422e13f8a5542fc1491e63f43247cbeabece13feb9355572f68eb3f3cf415eee8f1d53f887df6aab75bb43f0042cd907780523ff5e724cad881e03fdb9de04e99ffe43fd6594feb9b75ec3f6d4e6fcf7690e13fabe634f015dee13f584563d26021c93f6f1916ee57c8e13fd8906bad6fa7cd3ff8fad5b03b02eb3f1b3b87c15f16e53f4014ec100f79c73f1aee1302d960d83f45be6b695ed9e13ffc86d1d311dbdb3f089e89e6389fb93f24d742e400cbd63fa048c53d8fbf9c3f6eb1db094d81ed3f8bbf0cba79fde63f70e8f3d63c43c33ff1c5e6fed947e43f64f3a21f062ee03f0d12c4282794e03fa0a3be998572ba3f16510b776d7aeb3fb8c7ca308d2acd3f6f37eb1eb330ef3f1ba1bdb6577fe73f78d805294a05b43f0ed0bea2f180db3f5a4cce890b57ea3f2472556ba6f1e43f1a79fcc20701e53fe2ae8a1ea5f7d73fe0bd1efc12caec3ff94b1e02a75bed3f78e098184e3fea3f46ff0b2344dedb3f1cdc0f7b72efdb3f6ceb0b772b37e43f47e49b2a7088ea3f'

Does anyone know how I can either (a) de-serialize the string representation coming back to me in my_converter so I return a Numpy array each time, or (b) force PostgreSQL/psycopg2 to send the buffer representation to the converter (which I can use) instead of the string representation?

Thanks!

I'm on OS X 10.6.8 with Python 2.6.1 (r261:67515), PostgreSQL 9.0.3 and psycopg2 2.4 (dt dec pq3 ext)

like image 570
Andy Casey Avatar asked May 10 '12 07:05

Andy Casey


2 Answers

The format you see in the debugger is easy to parse: it is PostgreSQL hex binary format (http://www.postgresql.org/docs/9.1/static/datatype-binary.html). psycopg can parse that format and return a buffer containing the data; you can use that buffer to obtain an array. Instead of writing a typecaster from scratch, write one invoking the original func and postprocess its result. Sorry but I can't remember its name now and I'm writing from a mobile: you may get further help from the mailing list.


Edit: complete solution.

The default bytea typecaster (which is the object that can parse the postgres binary representation and return a buffer object out of it) is psycopg2.BINARY. We can use it to create a typecaster converting to array instead:

In [1]: import psycopg2

In [2]: import numpy as np

In [3]: a = np.eye(3)

In [4]: a
Out[4]:
array([[ 1.,  0.,  0.],
      [ 0.,  1.,  0.],
      [ 0.,  0.,  1.]])

In [5]: cnn = psycopg2.connect('')


# The adapter: converts from python to postgres
# note: this only works on numpy version whose arrays 
# support the buffer protocol,
# e.g. it works on 1.5.1 but not on 1.0.4 on my tests.

In [12]: def adapt_array(a):
  ....:     return psycopg2.Binary(a)
  ....:

In [13]: psycopg2.extensions.register_adapter(np.ndarray, adapt_array)


# The typecaster: from postgres to python

In [21]: def typecast_array(data, cur):
  ....:     if data is None: return None
  ....:     buf = psycopg2.BINARY(data, cur)
  ....:     return np.frombuffer(buf)
  ....:

In [24]: ARRAY = psycopg2.extensions.new_type(psycopg2.BINARY.values,
'ARRAY', typecast_array)

In [25]: psycopg2.extensions.register_type(ARRAY)


# Now it works "as expected"

In [26]: cur = cnn.cursor()

In [27]: cur.execute("select %s", (a,))

In [28]: cur.fetchone()[0]
Out[28]: array([ 1.,  0.,  0.,  0.,  1.,  0.,  0.,  0.,  1.])

As you know, np.frombuffer(a) loses the array shape, so you will have to figure out a way to preserve it.

like image 177
piro Avatar answered Nov 15 '22 15:11

piro


pFor the case of numpy arrays one can avoid the buffer strategy with all its disadvantages like the loss of shape and data type. Following a stackoverflow question about storing a numpy array in sqlite3 one can easily adapt the approach for postgres.

import os
import psycopg2 as psql
import numpy as np

# converts from python to postgres
def _adapt_array(text):
    out = io.BytesIO()
    np.save(out, text)
    out.seek(0)
    return psql.Binary(out.read())

# converts from postgres to python
def _typecast_array(value, cur):
    if value is None:
        return None

    data = psql.BINARY(value, cur)
    bdata = io.BytesIO(data)
    bdata.seek(0)
    return np.load(bdata)

con = psql.connect('')

psql.extensions.register_adapter(np.ndarray, _adapt_array)
t_array = psql.extensions.new_type(psql.BINARY.values, "numpy", _typecast_array)
psql.extensions.register_type(t_array)

cur = con.cursor()

Now one can create and fill a table (with a defined as in the previous post)

cur.execute("create table test (column BYTEA)")
cur.execute("insert into test values(%s)", (a,))

And restore the numpy object

cur.execute("select * from test")
cur.fetchone()[0]

Result:

array([[ 1.,  0.,  0.],
       [ 0.,  1.,  0.],
       [ 0.,  0.,  1.]])
like image 24
Daniel Avatar answered Nov 15 '22 14:11

Daniel