Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

spark-redshift takes a lot of time to write to redshift

I am working on setting up spark streamer with kinesis and redshift. I read data from kinesis after every 10 sec, process it and write it to redshift using spark-redshift lib.

The problem is it is taking hell lot of time to write only 300 rows.

This is what it shows me in the console

[Stage 56:====================================================> (193 + 1) / 200]

Looking at my logs df.write.format is doing this.

I have spark setup on a machine with 4 gb ram and 2 core amazon EC2, running with --master local[*] mode.

Here is how I create stream

kinesisStream = KinesisUtils.createStream(ssc, APPLICATION_NAME, STREAM_NAME, ENDPOINT, REGION_NAME, INITIAL_POS, CHECKPOINT_INTERVAL, awsAccessKeyId =AWSACCESSID, awsSecretKey=AWSSECRETKEY, storageLevel=STORAGE_LEVEL)    
CHECKPOINT_INTERVAL = 60
storageLevel = memory

kinesisStream.foreachRDD(writeTotable)
def WriteToTable(df, type):
    if type in REDSHIFT_PAGEVIEW_TBL:
        df = df.groupby([COL_STARTTIME, COL_ENDTIME, COL_CUSTOMERID, COL_PROJECTID, COL_FONTTYPE, COL_DOMAINNAME, COL_USERAGENT]).count()
        df = df.withColumnRenamed('count', COL_PAGEVIEWCOUNT)

        # Write back to a table

        url = ("jdbc:redshift://" + REDSHIFT_HOSTNAME + ":" + REDSHIFT_PORT + "/" +   REDSHIFT_DATABASE + "?user=" + REDSHIFT_USERNAME + "&password="+ REDSHIFT_PASSWORD)

        s3Dir = 's3n://' + AWSACCESSID + ':' + AWSSECRETKEY + '@' + BUCKET + '/' + FOLDER

        print 'Start writing to redshift'
        df.write.format("com.databricks.spark.redshift").option("url", url).option("dbtable", REDSHIFT_PAGEVIEW_TBL).option('tempdir', s3Dir).mode('Append').save()

        print 'Finished writing to redshift'

please let me know the reason for taking this much time

like image 598
Nipun Avatar asked Mar 02 '16 06:03

Nipun


People also ask

Can you use spark with redshift?

With Amazon EMR release versions 6.4. 0 and later, every Amazon EMR cluster created with Apache Spark includes a connector between Spark and Amazon Redshift. This connector is based on the spark-redshift open-source connector and allows you to use Spark on Amazon EMR to process data stored in Amazon Redshift.

Why is my Pyspark so slow?

Sometimes, Spark runs slowly because there are too many concurrent tasks running. The capacity for high concurrency is a beneficial feature, as it provides Spark-native fine-grained sharing. This leads to maximum resource utilization while cutting down query latencies.

Is redshift for big data?

A Redshift Database is a cloud-based, big data warehouse solution offered by Amazon. The platform provides a storage system that lets companies store petabytes of data in easy-to-access “clusters” that can be queried in parallel. Each of these nodes can be accessed independently by users and applications.


1 Answers

I have had similar experiences when writing to Redshift both through Spark and directly. spark-redshift will always write the data to S3 and then use the Redshift copy function to write the data to the target table. This approach is the best practice and the most efficient way to write large numbers of records. This approach also imposes a lot of overhead on writes, particularly when the number of records on each write is relatively small.

Looking at the output above, it appears that you have a large number of partitions (probably 200 or so). This is likely because the spark.sql.shuffle.partitions setting is set to 200 by default. You can find more details in the Spark documentation.

The group operation is probably generating 200 partitions. This means that you are doing 200 separate copy operations to S3 with each having the substantial associated latency in getting the connection and completing the write.

As we discussed in the comments below, and in chat, you can coalesce the result of the group by into fewer partitions making the following change to the line above:

df = df.coalesce(4).withColumnRenamed('count', COL_PAGEVIEWCOUNT)

This will reduce the number of partitions from 200 to 4 and the amount of overhead from the copies to S3 by a couple of orders of magnitude. You can experiment with the number of partitions to optimize performance. You could also change the spark.sql.shuffle.partitions setting to reduce the number of partitions given the size of data your are dealing with and the number of available cores.

like image 177
DemetriKots Avatar answered Oct 12 '22 10:10

DemetriKots