Following on from this question, when I try to create a postgresql table from a dask.dataframe with more than one partition I get the following error:
IntegrityError: (psycopg2.IntegrityError) duplicate key value violates unique constraint "pg_type_typname_nsp_index"
DETAIL: Key (typname, typnamespace)=(test1, 2200) already exists.
[SQL: '\nCREATE TABLE test1 (\n\t"A" BIGINT, \n\t"B" BIGINT, \n\t"C" BIGINT, \n\t"D" BIGINT, \n\t"E" BIGINT, \n\t"F" BIGINT, \n\t"G" BIGINT, \n\t"H" BIGINT, \n\t"I" BIGINT, \n\t"J" BIGINT, \n\tidx BIGINT\n)\n\n']
You can recreate the error with the following code:
import numpy as np
import dask.dataframe as dd
import dask
import pandas as pd
import sqlalchemy_utils as sqla_utils
import sqlalchemy as sqla
DATABASE_CONFIG = {
'driver': '',
'host': '',
'user': '',
'password': '',
'port': 5432,
}
DBNAME = 'dask'
url = '{driver}://{user}:{password}@{host}:{port}/'.format(
**DATABASE_CONFIG)
db_url = url.rstrip('/') + '/' + DBNAME
# create db if non-existent
if not sqla_utils.database_exists(db_url):
print('Creating database \'{}\''.format(DBNAME))
sqla_utils.create_database(db_url)
conn = sqla.create_engine(db_url)
# create pandas df with random numbers
df = pd.DataFrame(np.random.randint(0,40,size=(100, 10)), columns=list('ABCDEFGHIJ'))
# add index so that it can be used as primary key later on
df['idx'] = df.index
# create dask df
ddf = dd.from_pandas(df, npartitions=4)
# Write to psql
dto_sql = dask.delayed(pd.DataFrame.to_sql)
out = [dto_sql(d, 'test', db_url, if_exists='append', index=False, index_label='idx')
for d in ddf.to_delayed()]
dask.compute(*out)
The code doesn't produce an error if npartitions is set to 1. So I'm guessing it has to do with postgres not being able to handle parallel requests to write to a same sql table...? How can I fix this?
If you get this message when trying to insert data into a PostgreSQL database: ERROR: duplicate key violates unique constraint That likely means that the primary key sequence in the table you're working with has somehow become out of sync, likely because of a mass import process (or something along those lines).
ERROR: duplicate key violates unique constraint. That likely means that the primary key sequence in the table you're working with has somehow become out of sync, likely because of a mass import process (or something along those lines).
The primary key is already protecting you from inserting duplicate values, as you're experiencing when you get that error. Adding another unique constraint isn't necessary to do that.
Adding another unique constraint isn't necessary to do that. The "duplicate key" error is telling you that the work was not done because it would produce a duplicate key, not that it discovered a duplicate key already commited to the table. For future searchs, use ON CONFLICT DO NOTHING. You can specify column names, indexes or constraint name.
I was reading this. It seems this error rises when you are creating/updating the same table with parallel processing. I understand it depends because of this (as explained on the google group discussion).
So I think it depend from PostgreSQL
itself and not from the connection driver or the module used for the multiprocessing.
Well, Actually, the only way I found to solve this is to create chunks big enough to have back a writing process slower than the calculation itself. With bigger chunks this error doesn't rise.
In PostgreSQL that helps me.
set enable_parallel_hash=off;
After u can turn it on
set enable_parallel_hash=on;
I had the same error with ponyORM on PostgreSQL in Heroku. I solved it by locking the thread until it executes the DB operation. In my case:
lock = threading.Lock()
with lock:
PonyOrmEntity(name='my_name', description='description')
PonyOrmEntity.get(lambda u: u.name == 'another_name')
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With