Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

AWS EMR: Pyspark: Rdd: mappartitions: Could not find valid SPARK_HOME while searching: Spark closures

I'm having a pyspark job which runs without any issues when ran locally, but when It runs from the aws cluster, it gets stuck at the point when it reaches the below code. The job just process 100 records. "some_function" posts data into a website and it returns a response at the end. Any idea what's going wrong or How I can debug this? FYI: "Some_function" is outside of the class, I guess the issue is related to ["closures"][1], But not sure how to fix it

response = attributes.mapPartitions(lambda iter: [some_fuc1(map(lambda x: x, xs), dict_name, id) for xs in partition_all(50, iter)]).collect()

Full code below

def ctgs(entries):
    col1 = entries[0]
    col2 = entries[1]
    col3 = entries[2]

    rec = {
    up_col1 : col1,
    up_col2 : col2,
    up_col3 : col3

    }

    return rec

def some_func1(rec, dict_name, id):
recs{
    rec_list = list(rec)
    seid = id
    }
    headers = "some header"
    attrburl = "www.someurl.com"

    response = requests.post(attrburl, data=json.dumps(rec_list)), headers)

    return response


class Processor:
    def __init(self, sc, arguments):
    self.sc = sc
    self.env = arguments.env
    self.dte = arguments.dte
    self.sendme = arguments.sendme

    def send_them(ext_data, dict_name,id):
        attributes = ext_data.rdd.map(lambda x: ctgs(x['col1'], x['col2'], x[col3]))

        response = attributes.mapPartitions(lambda iter: [some_fuc1(map(lambda x: x, xs), dict_name, id) for xs in partition_all(50, iter)]).collect()

    def extr_data(self):
        ext_data=spark.sql('''select col1, col2, col3 from table_name''')
        send_them(ext_data,dict_name,id)


    def process(self):
        dict_name = { dict_id: '34343-3434-3433-343'}
        id = 'dfdfd-erere-dfd'
        extr_data()



def argument_parsing(args):
    parser.add_argument("--env", required=True)
    parser.add_argument("--dte",  required=True)
    parser.add_argument("--sendme", required=False)
    args = parser.parse_args(args)
    return args


def main(args):

        arguments = argument_parsing(args)

        sc = SparkSession \
            .builder \
            .appName("job_name") \
            .enableHiveSupport() \
            .getOrCreate()

        sc.sparkContext.setLogLevel("ERROR")

        processor = Processor(sc, arguments)
        processor.process()
like image 362
user7343922 Avatar asked Oct 16 '21 02:10

user7343922


1 Answers

You are correct this is an issue with closures/executors.

Code that is inside mapPartitions will run on executors when in cluster. Running 'local' will obscures these type of bugs/error as it scopes all the functions to driver which is running on your machine. There is not a scope issue running in 'local'

There are two types of problems when dealing with closures/executors. Your scoped variables not being serializable and the environment that the executor is running in.

The environment check should be easy. Can you connect to the URL from one of your executors if you just ssh in and try and connect. (My bet is you are waiting on a URL looking up in DNS). In fact I'd suggest you start by checking the security group for the EMR cluster and seeing what nodes are allowed to access.

The scope is a little more challenging. If requests is initiated in the global scope but not serializable this could cause an issue. (You can't serialize a inflight connection to a database/website.) You could just initiate it inside of mapPartitions and this would solve the issue. The thing is this would usually fail immediately and doesn't really fit the issue you are describing. Unless this is causing python interpreter to die and falsely report it's waiting, I do not think this is the issue.

like image 123
Matt Andruff Avatar answered Oct 09 '22 23:10

Matt Andruff