In Amazon Redshift I have a table where I need to load data from multiple CSV files:
create table my_table (
id integer,
name varchar(50) NULL
email varchar(50) NULL,
processed_file varchar(256) NULL
);
The first three columns refer to data from the files. The last column processed_filed
indicates from which file was the record imported.
I have the files in Amazon S3 and I want to import them with the COPY
command. Something like:
COPY {table_name} FROM 's3://file-key'
WITH CREDENTIALS 'aws_access_key_id=xxxx;aws_secret_access_key=xxxxx'
DATEFORMAT 'auto' TIMEFORMAT 'auto' MAXERROR 0 ACCEPTINVCHARS '*' DELIMITER '\t' GZIP;
Is there a way to populate the fourth processed_file
column automatically with the COPY command, to insert the name of the file.
I can do an UPDATE statement after the COPY, but I am dealing with huge amounts of data, so ideally I would like to avoid that if possible.
This is not possible.
You would need to either pre-process the files (to include a name column) or update the data after loading (but then it would be difficult to do a bulk-load from several files simultaneously, which is the most efficient way to load data into Redshift).
See: Redshift COPY
command documentation
Here you can try this custom logic for adding new column , in this example added file name as new column in redshift COPY
import boto3
import re
s3 = boto3.client('s3')
sql = "DROPSQL , CREATE SQL , COPY SQL" ## here need to pass your actual sqls
def Filter(datalist,keyword):
# Search data based on regular expression in the list
return [val for val in datalist
if re.search(keyword, val)]
def add_new_col(table_name):
drop_sql = ''.join(Filter(sql.split(';'),keyword=table_name+' '))
create_sql = ''.join(Filter(sql.split(';'),keyword=table_name+'\('))
copy_sql = ''.join(Filter(sql.split(';'),keyword=table_name.upper()+'/'))
BUCKET = copy_sql.split(' ')[3].split('/')[2]
folder = '/'.join(copy_sql.split(' ')[3].split('/')[3:-1])+'/'
maintable = copy_sql.split(' ')[1]
print ("BUCKET {}, key_folder {}, maintable {}".format(BUCKET,folder,maintable))
temp_table_drop_sql = drop_sql.replace(table_name,'temp_table')
temp_table_create_sql = create_sql.replace(table_name,'temp_table')
temp_table_copy_sql = copy_sql.replace(table_name.upper(),'temp_table')
temp_table_name_withSchema = temp_table_copy_sql.split(' ')[1]
print ("temp_table_name_withSchema {}".format(temp_table_name_withSchema))
## replace with query execute logic
print(temp_table_drop_sql)
print(temp_table_create_sql)
#####
response = s3.list_objects_v2(
Bucket=BUCKET,
Prefix =folder)
new_column_name = 'filename'
for i in response["Contents"]:
## replace with query execute logic
temp_sql = copy_sql.replace(folder,i["Key"])
temp_sql = temp_sql.replace(table_name.upper(),'temp_table')
print(temp_sql)
## i["Key"] is filename
print("alter table {} ADD COLUMN {} varchar(256) NOT NULL DEFAULT '{}';".format(temp_table_name_withSchema, new_column_name , i["Key"].split('/')[-1]))
print("insert into {} (select * from {})".format(maintable, temp_table_name_withSchema))
print("truncate {}".format(temp_table_name_withSchema))
#####
## replace with query execute logic
print(drop_sql)
########
add_new_col(table_name)
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With