I am getting the following error:
Traceback (most recent call last):
main()
for item in session.query(Item).yield_per(10):
fetch = cursor.fetchmany(self._yield_per)
self.cursor, self.context)
l = self.process_rows(self._fetchmany_impl(size))
row = self._fetchone_impl()
self.__buffer_rows()
self.__rowbuffer = collections.deque(self.cursor.fetchmany(size))
sqlalchemy.exc.ProgrammingError: (ProgrammingError) named cursor isn't valid anymore None None
I am suspect that calling session.commit() is interfering with .yield_per
sessionmaker_ = sessionmaker(autocommit=False, autoflush=False, bind=engine)
session = scoped_session(sessionmaker_)
def foo(item):
# DO something to the item
session.add(item)
session.commit()
def main():
for item in session.query(Item).yield_per(5):
foo(item)
Any idea?
If you haven't fetched all rows from a DBAPI cursor, then it's usually a bad idea to call commit() on that cursor's connection. In this case psycopg2 (which I'm guessing that's the DBAPI you're on) is not able to maintain the state of a named cursor (which is what it uses when you want server-buffered rows) over the transaction.
One thing you should definitely change here is how frequently you're committing. Ideally you wouldn't commit anything until your entire operation is complete. The Session will automatically flush data as it needs to (well, if you turned on autoflush, which I'd recommend), or you can call flush() to force it, but this is independent of actually committing the transaction. All those calls to commit() will make the operation much less efficient than it has to be, and of course it's getting in the way of the cursor for the other result set. If you just put one commit() at the end of your loop, then you'd solve both issues at once.
If you still need to commit before the entire operation is finished, or even if not, I'd favor working in chunks rather than using yield_per(), which is quite brittle. The recipe at http://www.sqlalchemy.org/trac/wiki/UsageRecipes/WindowedRangeQuery shows one way to do this. DBAPI's are not well suited to dealing with extremely large result sets overall, even though psycopg2 gives us a little more leeway.
Problem above can be solved via one more session
sessionmaker_ = sessionmaker(autocommit=False, autoflush=False, bind=engine)
session = scoped_session(sessionmaker_)
cool_session = scoped_session(sessionmaker_)
def foo(item):
# DO something to the item
session.add(item)
session.commit()
def main():
for item in cool_session.query(Item).yield_per(5):
item = session.merge(item, load=False)
foo(item)
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With