Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Redshift add column when importing with COPY

In Amazon Redshift I have a table where I need to load data from multiple CSV files:

create table my_table (
  id integer,
  name varchar(50) NULL
  email varchar(50) NULL,
  processed_file varchar(256) NULL
);

The first three columns refer to data from the files. The last column processed_filed indicates from which file was the record imported.

I have the files in Amazon S3 and I want to import them with the COPY command. Something like:

COPY {table_name} FROM 's3://file-key' 
WITH CREDENTIALS 'aws_access_key_id=xxxx;aws_secret_access_key=xxxxx' 
DATEFORMAT 'auto' TIMEFORMAT 'auto' MAXERROR 0 ACCEPTINVCHARS '*' DELIMITER '\t' GZIP;

Is there a way to populate the fourth processed_file column automatically with the COPY command, to insert the name of the file.

I can do an UPDATE statement after the COPY, but I am dealing with huge amounts of data, so ideally I would like to avoid that if possible.

like image 664
Martin Taleski Avatar asked Mar 18 '23 17:03

Martin Taleski


2 Answers

This is not possible.

You would need to either pre-process the files (to include a name column) or update the data after loading (but then it would be difficult to do a bulk-load from several files simultaneously, which is the most efficient way to load data into Redshift).

See: Redshift COPY command documentation

like image 52
John Rotenstein Avatar answered Mar 30 '23 11:03

John Rotenstein


Here you can try this custom logic for adding new column , in this example added file name as new column in redshift COPY

import boto3
import re
s3 = boto3.client('s3')

sql = "DROPSQL , CREATE SQL , COPY SQL" ## here need to pass your actual sqls

def Filter(datalist,keyword):
    # Search data based on regular expression in the list
    return [val for val in datalist
        if re.search(keyword, val)]

def add_new_col(table_name):
    drop_sql = ''.join(Filter(sql.split(';'),keyword=table_name+' '))
    create_sql = ''.join(Filter(sql.split(';'),keyword=table_name+'\('))
    copy_sql = ''.join(Filter(sql.split(';'),keyword=table_name.upper()+'/'))
    
    BUCKET = copy_sql.split(' ')[3].split('/')[2]
    folder = '/'.join(copy_sql.split(' ')[3].split('/')[3:-1])+'/'
    maintable = copy_sql.split(' ')[1]
    
    print ("BUCKET {}, key_folder {}, maintable {}".format(BUCKET,folder,maintable))
    
    temp_table_drop_sql = drop_sql.replace(table_name,'temp_table')
    temp_table_create_sql = create_sql.replace(table_name,'temp_table')
    temp_table_copy_sql = copy_sql.replace(table_name.upper(),'temp_table')
    temp_table_name_withSchema = temp_table_copy_sql.split(' ')[1]
    
    print ("temp_table_name_withSchema {}".format(temp_table_name_withSchema))
    
    ## replace with query execute logic
    print(temp_table_drop_sql)
    print(temp_table_create_sql)
    #####
    
    response = s3.list_objects_v2(
            Bucket=BUCKET,
            Prefix =folder)
    
    new_column_name = 'filename' 
    
    for i in response["Contents"]:
        ## replace with query execute logic
        temp_sql = copy_sql.replace(folder,i["Key"])
        temp_sql = temp_sql.replace(table_name.upper(),'temp_table')
        print(temp_sql)
        ## i["Key"] is filename
        print("alter table {} ADD COLUMN {} varchar(256) NOT NULL DEFAULT '{}';".format(temp_table_name_withSchema, new_column_name , i["Key"].split('/')[-1]))
        print("insert into {} (select * from {})".format(maintable, temp_table_name_withSchema))
        print("truncate {}".format(temp_table_name_withSchema))
        #####
    
    ## replace with query execute logic
    print(drop_sql)
    ########
    
add_new_col(table_name)
like image 35
Mani Avatar answered Mar 30 '23 11:03

Mani