I'm trying to do a Redshift COPY in SQLAlchemy.
The following SQL correctly copies objects from my S3 bucket into my Redshift table when I execute it in psql:
COPY posts FROM 's3://mybucket/the/key/prefix'
WITH CREDENTIALS 'aws_access_key_id=myaccesskey;aws_secret_access_key=mysecretaccesskey'
JSON AS 'auto';
I have several files named
s3://mybucket/the/key/prefix.001.json
s3://mybucket/the/key/prefix.002.json
etc.
I can verify that the new rows were added to the table with select count(*) from posts
.
However, when I execute the exact same SQL expression in SQLAlchemy, execute completes without error, but no rows get added to my table.
session = get_redshift_session()
session.bind.execute("COPY posts FROM 's3://mybucket/the/key/prefix' WITH CREDENTIALS aws_access_key_id=myaccesskey;aws_secret_access_key=mysecretaccesskey' JSON AS 'auto';")
session.commit()
It doesn't matter whether I do the above or
from sqlalchemy.sql import text
session = get_redshift_session()
session.execute(text("COPY posts FROM 's3://mybucket/the/key/prefix' WITH CREDENTIALS aws_access_key_id=myaccesskey;aws_secret_access_key=mysecretaccesskey' JSON AS 'auto';"))
session.commit()
The new Amazon Redshift SQLAlchemy dialect uses the Amazon Redshift Python driver ( redshift_connector ) and lets you securely connect to your Amazon Redshift database. It natively supports IAM authentication and single sign-on (SSO).
The COPY command is an extension of SQL supported by Redshift. Therefore, the COPY command needs to be issued from an SQL client. You mention that you have configured SQL Workbench. Once you connect to the Redshift cluster, run the command from within that connection.
The COPY command appends the new input data to any existing rows in the table. The maximum size of a single input row from any source is 4 MB. To use the COPY command, you must have INSERT privilege for the Amazon Redshift table.
Add a commit to the end of the copy worked for me:
<your copy sql>;commit;
I basically had the same problem, though in my case it was more:
engine = create_engine('...')
engine.execute(text("COPY posts FROM 's3://mybucket/the/key/prefix' WITH CREDENTIALS aws_access_key_id=myaccesskey;aws_secret_access_key=mysecretaccesskey' JSON AS 'auto';"))
By stepping through pdb, the problem was obviously the lack of a .commit()
being invoked. I don't know why session.commit()
is not working in your case (maybe the session "lost track" of the sent commands?) so it might not actually fix your problem.
Anyhow, as explained in the sqlalchemy docs
Given this requirement, SQLAlchemy implements its own “autocommit” feature which works completely consistently across all backends. This is achieved by detecting statements which represent data-changing operations, i.e. INSERT, UPDATE, DELETE [...] If the statement is a text-only statement and the flag is not set, a regular expression is used to detect INSERT, UPDATE, DELETE, as well as a variety of other commands for a particular backend.
So, there are 2 solutions, either:
text("COPY posts FROM 's3://mybucket/the/key/prefix' WITH CREDENTIALS aws_access_key_id=myaccesskey;aws_secret_access_key=mysecretaccesskey' JSON AS 'auto';").execution_options(autocommit=True).
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With