I'm having a pyspark job which runs without any issues when ran locally, but when It runs from the aws cluster, it gets stuck at the point when it reaches the below code. The job just process 100 records. "some_function" posts data into a website and it returns a response at the end. Any idea what's going wrong or How I can debug this? FYI: "Some_function" is outside of the class, I guess the issue is related to ["closures"][1], But not sure how to fix it
response = attributes.mapPartitions(lambda iter: [some_fuc1(map(lambda x: x, xs), dict_name, id) for xs in partition_all(50, iter)]).collect()
Full code below
def ctgs(entries):
col1 = entries[0]
col2 = entries[1]
col3 = entries[2]
rec = {
up_col1 : col1,
up_col2 : col2,
up_col3 : col3
}
return rec
def some_func1(rec, dict_name, id):
recs{
rec_list = list(rec)
seid = id
}
headers = "some header"
attrburl = "www.someurl.com"
response = requests.post(attrburl, data=json.dumps(rec_list)), headers)
return response
class Processor:
def __init(self, sc, arguments):
self.sc = sc
self.env = arguments.env
self.dte = arguments.dte
self.sendme = arguments.sendme
def send_them(ext_data, dict_name,id):
attributes = ext_data.rdd.map(lambda x: ctgs(x['col1'], x['col2'], x[col3]))
response = attributes.mapPartitions(lambda iter: [some_fuc1(map(lambda x: x, xs), dict_name, id) for xs in partition_all(50, iter)]).collect()
def extr_data(self):
ext_data=spark.sql('''select col1, col2, col3 from table_name''')
send_them(ext_data,dict_name,id)
def process(self):
dict_name = { dict_id: '34343-3434-3433-343'}
id = 'dfdfd-erere-dfd'
extr_data()
def argument_parsing(args):
parser.add_argument("--env", required=True)
parser.add_argument("--dte", required=True)
parser.add_argument("--sendme", required=False)
args = parser.parse_args(args)
return args
def main(args):
arguments = argument_parsing(args)
sc = SparkSession \
.builder \
.appName("job_name") \
.enableHiveSupport() \
.getOrCreate()
sc.sparkContext.setLogLevel("ERROR")
processor = Processor(sc, arguments)
processor.process()
You are correct this is an issue with closures/executors.
Code that is inside mapPartitions will run on executors when in cluster. Running 'local' will obscures these type of bugs/error as it scopes all the functions to driver which is running on your machine. There is not a scope issue running in 'local'
There are two types of problems when dealing with closures/executors. Your scoped variables not being serializable and the environment that the executor is running in.
The environment check should be easy. Can you connect to the URL from one of your executors if you just ssh in and try and connect. (My bet is you are waiting on a URL looking up in DNS). In fact I'd suggest you start by checking the security group for the EMR cluster and seeing what nodes are allowed to access.
The scope is a little more challenging. If requests
is initiated in the global scope but not serializable this could cause an issue. (You can't serialize a inflight connection to a database/website.) You could just initiate it inside of mapPartitions
and this would solve the issue. The thing is this would usually fail immediately and doesn't really fit the issue you are describing. Unless this is causing python interpreter to die and falsely report it's waiting, I do not think this is the issue.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With