Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

AWS EMR Spark save to S3 is very slow

I have a Spark job running on EMR that takes unusually long time. Spark tasks themselves are running fast. When I save the result to S3 it spends more than 20mins doing this...

16/02/05 01:44:44 INFO latency: StatusCode=[404], Exception=[com.amazonaws.services.s3.model.AmazonS3Exception: Not Found (Service: Amazon S3; Status Code: 404; Error Code: 404 Not Found; Request ID: 561CA7CD8C009E79), S3 Extended Request ID: B3dMnYkxE/QSZsD1VREBf5FR+uH8m5k2Tb8zZ+Y0+VFgQFSwRJjPEWV7wX61+9ZiJhY5nf35Rx8=], ServiceName=[Amazon S3], AWSErrorCode=[404 Not Found], AWSRequestID=[561CA7CD8C009E79], ServiceEndpoint=[https://my-bucket-name.s3.amazonaws.com], Exception=1, HttpClientPoolLeasedCount=0, RequestCount=1, HttpClientPoolPendingCount=0, HttpClientPoolAvailableCount=1, ClientExecuteTime=[12.766], HttpRequestTime=[12.494], HttpClientReceiveResponseTime=[11.067], RequestSigningTime=[0.103], CredentialsRequestTime=[0.001], HttpClientSendRequestTime=[0.071],
16/02/05 01:44:44 INFO latency: StatusCode=[200], ServiceName=[Amazon S3], AWSRequestID=[F84316D0C1958276], ServiceEndpoint=[https://my-bucket-name.s3.amazonaws.com], HttpClientPoolLeasedCount=0, RequestCount=1, HttpClientPoolPendingCount=0, HttpClientPoolAvailableCount=1, ClientExecuteTime=[16.001], HttpRequestTime=[13.1], HttpClientReceiveResponseTime=[11.69], RequestSigningTime=[0.085], CredentialsRequestTime=[0.001], ResponseProcessingTime=[2.673], HttpClientSendRequestTime=[0.071],
16/02/05 01:44:44 INFO S3NativeFileSystem: rename s3://my-bucket-name/stati/data/output/bidder4/_temporary/0/task_201602050130_0014_m_000001/organization_id=100932/impression_date=2016-01-01/part-r-00001-0e84d8cb-4b43-4cc3-b95e-65b1b9c12f25.gz.parquet s3://my-bucket-name/stati/data/output/bidder4/organization_id=100932/impression_date=2016-01-01/part-r-00001-0e84d8cb-4b43-4cc3-b95e-65b1b9c12f25.gz.parquet
16/02/05 01:44:44 INFO latency: StatusCode=[404], Exception=[com.amazonaws.services.s3.model.AmazonS3Exception: Not Found (Service: Amazon S3; Status Code: 404; Error Code: 404 Not Found; Request ID: 014934F9C27E2969), S3 Extended Request ID: B313czEVYZR21sBpxHODqS4gCRudU249Jd5+Z+D0a4FGlHW6eQx0/GRNtTkrS2y4ucKND8DYWyg=], ServiceName=[Amazon S3], AWSErrorCode=[404 Not Found], AWSRequestID=[014934F9C27E2969], ServiceEndpoint=[https://my-bucket-name.s3.amazonaws.com], Exception=1, HttpClientPoolLeasedCount=0, RequestCount=1, HttpClientPoolPendingCount=0, HttpClientPoolAvailableCount=1, ClientExecuteTime=[11.854], HttpRequestTime=[11.598], HttpClientReceiveResponseTime=[10.168], RequestSigningTime=[0.098], CredentialsRequestTime=[0.001], HttpClientSendRequestTime=[0.078],
16/02/05 01:44:44 INFO latency: StatusCode=[404], Exception=[com.amazonaws.services.s3.model.AmazonS3Exception: Not Found (Service: Amazon S3; Status Code: 404; Error Code: 404 Not Found; Request ID: 97FD09BE9E109D68), S3 Extended Request ID: oGOPBseyZF9/7OCtzwyOK+lCfALplBW+IOAFXIybKSHDtvMUyZeFFOGi7+qba6fo0ReV1sl9fl4=], ServiceName=[Amazon S3], AWSErrorCode=[404 Not Found], AWSRequestID=[97FD09BE9E109D68], ServiceEndpoint=[https://my-bucket-name.s3.amazonaws.com], Exception=1, HttpClientPoolLeasedCount=0, RequestCount=1, HttpClientPoolPendingCount=0, HttpClientPoolAvailableCount=1, ClientExecuteTime=[13.141], HttpRequestTime=[12.864], HttpClientReceiveResponseTime=[11.462], RequestSigningTime=[0.098], CredentialsRequestTime=[0.001], HttpClientSendRequestTime=[0.057],
16/02/05 01:51:13 INFO latency: StatusCode=[200], ServiceName=[Amazon S3], AWSRequestID=[7936D2099DD2EB95], ServiceEndpoint=[https://my-bucket-name.s3.amazonaws.com], HttpClientPoolLeasedCount=0, RequestCount=1, HttpClientPoolPendingCount=0, HttpClientPoolAvailableCount=1, ClientExecuteTime=[8.471], HttpRequestTime=[8.209], HttpClientReceiveResponseTime=[6.947], RequestSigningTime=[0.09], CredentialsRequestTime=[0.001], ResponseProcessingTime=[0.08], HttpClientSendRequestTime=[0.042],
16/02/05 01:51:13 INFO S3NativeFileSystem: listStatus s3://my-bucket-name/stati/data/output/bidder4/_temporary/0/task_201602050130_0014_m_000004/organization_id=101041 with recursive false
16/02/05 01:51:13 INFO latency: StatusCode=[404], Exception=[com.amazonaws.services.s3.model.AmazonS3Exception: Not Found (Service: Amazon S3; Status Code: 404; Error Code: 404 Not Found; Request ID: 4D2BAED335E4DD56), S3 Extended Request ID: NgmiU8R7X94wUHnYXhTB4aW0AIpQ9F1RHbmAwsFsh/x8D1/B7EFjAWgo8Z/Eluj18PKlVM7w2zQ=], ServiceName=[Amazon S3], AWSErrorCode=[404 Not Found], AWSRequestID=[4D2BAED335E4DD56], ServiceEndpoint=[https://my-bucket-name.s3.amazonaws.com], Exception=1, HttpClientPoolLeasedCount=0, RequestCount=1, HttpClientPoolPendingCount=0, HttpClientPoolAvailableCount=1, ClientExecuteTime=[22.181], HttpRequestTime=[22.004], HttpClientReceiveResponseTime=[20.697], RequestSigningTime=[0.053], CredentialsRequestTime=[0.0], HttpClientSendRequestTime=[0.052],
16/02/05 01:51:13 INFO latency: StatusCode=[200], ServiceName=[Amazon S3], AWSRequestID=[C554088E2B24A1F0], ServiceEndpoint=[https://my-bucket-name.s3.amazonaws.com], HttpClientPoolLeasedCount=0, RequestCount=1, HttpClientPoolPendingCount=0, HttpClientPoolAvailableCount=1, ClientExecuteTime=[35.69], HttpRequestTime=[34.067], HttpClientReceiveResponseTime=[32.718], RequestSigningTime=[0.07], CredentialsRequestTime=[0.0], ResponseProcessingTime=[1.447], HttpClientSendRequestTime=[0.043],
16/02/05 01:51:14 INFO latency: StatusCode=[404], Exception=[com.amazonaws.services.s3.model.AmazonS3Exception: Not Found (Service: Amazon S3; Status Code: 404; Error Code: 404 Not Found; Request ID: 3ADAE326D46195E2), S3 Extended Request ID: peAwu6EY5NGjDMSHQQmhvYzqmvhjogefngu2BNaSh4a5O4QgUbYUM+TBlIZ2763PgiZOt2BtAqc=], ServiceName=[Amazon S3], AWSErrorCode=[404 Not Found], AWSRequestID=[3ADAE326D46195E2], ServiceEndpoint=[https://my-bucket-name.s3.amazonaws.com], Exception=1, HttpClientPoolLeasedCount=0, RequestCount=1, HttpClientPoolPendingCount=0, HttpClientPoolAvailableCount=1, ClientExecuteTime=[24.088], HttpRequestTime=[23.851], HttpClientReceiveResponseTime=[22.466], RequestSigningTime=[0.088], CredentialsRequestTime=[0.0], HttpClientSendRequestTime=[0.064],
16/02/05 01:51:14 INFO latency: StatusCode=[404], Exception=[com.amazonaws.services.s3.model.AmazonS3Exception: Not Found (Service: Amazon S3; Status Code: 404; Error Code: 404 Not Found; Request ID: 069544819617C5F4), S3 Extended Request ID: gomSLQka0EMLiv+uo5zSjrXDhjxQBmvJMqyBJMiqozuEjppIup20RT/DqJZqrQPggDE0Dpzcr5Q=], ServiceName=[Amazon S3], AWSErrorCode=[404 Not Found], AWSRequestID=[069544819617C5F4], ServiceEndpoint=[https://my-bucket-name.s3.amazonaws.com], Exception=1, HttpClientPoolLeasedCount=0, RequestCount=1, HttpClientPoolPendingCount=0, HttpClientPoolAvailableCount=1, ClientExecuteTime=[51.626], HttpRequestTime=[51.351], HttpClientReceiveResponseTime=[49.956], RequestSigningTime=[0.081], CredentialsRequestTime=[0.0], HttpClientSendRequestTime=[0.05],
16/02/05 01:51:14 INFO latency: StatusCode=[200], ServiceName=[Amazon S3], AWSRequestID=[E59C345260724310], ServiceEndpoint=[https://my-bucket-name.s3.amazonaws.com], HttpClientPoolLeasedCount=0, RequestCount=1, HttpClientPoolPendingCount=0, HttpClientPoolAvailableCount=1, ClientExecuteTime=[14.771], HttpRequestTime=[13.035], HttpClientReceiveResponseTime=[11.65], RequestSigningTime=[0.092], CredentialsRequestTime=[0.0], ResponseProcessingTime=[1.533], HttpClientSendRequestTime=[0.072],
16/02/05 01:51:14 INFO S3NativeFileSystem: listStatus s3://my-bucket-name/stati/data/output/bidder4/_temporary/0/task_201602050130_0014_m_000004/organization_id=101041/impression_date=2016-01-01 with recursive false
16/02/05 01:51:14 INFO latency: StatusCode=[404], Exception=[com.amazonaws.services.s3.model.AmazonS3Exception: Not Found (Service: Amazon S3; Status Code: 404; Error Code: 404 Not Found; Request ID: E1F7FDB93AB37E2F), S3 Extended Request ID: Tglj240gJvYWm2bvi0MSk4aaH4c5KWk/8l6UJIw/WS/wxRKPEed3mFUaX7PZWgVl8ESEF8TtCz8=], ServiceName=[Amazon S3], AWSErrorCode=[404 Not Found], AWSRequestID=[E1F7FDB93AB37E2F], ServiceEndpoint=[https://my-bucket-name.s3.amazonaws.com], Exception=1, HttpClientPoolLeasedCount=0, RequestCount=1, HttpClientPoolPendingCount=0, HttpClientPoolAvailableCount=1, ClientExecuteTime=[9.351], HttpRequestTime=[9.166], HttpClientReceiveResponseTime=[7.869], RequestSigningTime=[0.071], CredentialsRequestTime=[0.0], HttpClientSendRequestTime=[0.04],
16/02/05 01:51:14 INFO latency: StatusCode=[200], ServiceName=[Amazon S3], AWSRequestID=[2228F32BADB3EAC6], ServiceEndpoint=[https://my-bucket-name.s3.amazonaws.com], HttpClientPoolLeasedCount=0, RequestCount=1, HttpClientPoolPendingCount=0, HttpClientPoolAvailableCount=1, ClientExecuteTime=[21.51], HttpRequestTime=[19.992], HttpClientReceiveResponseTime=[18.687], RequestSigningTime=[0.047], CredentialsRequestTime=[0.0], ResponseProcessingTime=[1.387], HttpClientSendRequestTime=[0.057],
16/02/05 01:51:14 INFO latency: StatusCode=[404], Exception=[com.amazonaws.services.s3.model.AmazonS3Exception: Not Found (Service: Amazon S3; Status Code: 404; Error Code: 404 Not Found; Request ID: DAAFA5B4B81AAB0C), S3 Extended Request ID: 5BGDszg4CRVs0kN8S1hwdvpFknwqQZyGs+QOk0m6+U7K8HJ3eUpDEeYXMv6zt+Dx1cqknGDV+/U=], ServiceName=[Amazon S3], AWSErrorCode=[404 Not Found], AWSRequestID=[DAAFA5B4

I thought this is similar to another SO question. I've set this in my config.json file as suggested there

{
  "Classification": "mapred-site",
  "Properties": {
    "mapred.output.direct.EmrFileSystem": "true",
    "mapred.output.direct.NativeS3FileSystem": "true"
  }
},

Still seeing the same behavior. EMR 4.3.0.

like image 658
Cheeko Avatar asked Feb 07 '23 19:02

Cheeko


2 Answers

The issue is the results are being uploaded twice with S3. Take a look here

Set the Hadoop property: spark.sql.parquet.output.committer.class to org.apache.spark.sql.parquet.DirectParquetOutputCommitter.

Note the info regarding impact on speculative execution.

like image 185
ChristopherB Avatar answered Feb 23 '23 03:02

ChristopherB


pyspark

sc._jsc.hadoopConfiguration().set("spark.sql.parquet.output.committ‌​er.class", "org.apache.spark.sql.parquet.DirectParquetOutputCommitter")

scala

sc.hadoopConfiguration.set("spark.sql.parquet.output.committ‌​er.class", "org.apache.spark.sql.parquet.DirectParquetOutputCommitter")

where sc is the SparkContext

like image 38
Kamil Sindi Avatar answered Feb 23 '23 04:02

Kamil Sindi