Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to get ID of a map task in Spark?

Is there a way to get ID of a map task in Spark? For example if each map task calls a user defined function, can I get the ID of that map task from whithin that user defined function?

like image 271
MetallicPriest Avatar asked Jul 25 '15 22:07

MetallicPriest


People also ask

How do I find my executor ID on Spark?

Use sc. getConf. get("spark.executor.id") to know where the code is executed — driver or executors. It sets the jars and files based on spark.

Is there any way to get Spark application ID while running a job?

In Spark we can get the Spark Application ID inside the Task programmatically using: SparkEnv. get. blockManager.

What is output of map in Spark?

Spark map() transformation applies a function to each row in a DataFrame/Dataset and returns the new transformed Dataset. As mentioned earlier, map() returns one row for every row in an input DataFrame. In other words, input and the result exactly contain the same number of rows.

What is the function of the map () in Spark?

Spark Map function takes one element as input process it according to custom code (specified by the developer) and returns one element at a time. Map transforms an RDD of length N into another RDD of length N. The input and output RDDs will typically have the same number of records.


1 Answers

I am not sure what you mean by ID of map task but you can access task information using TaskContext:

import org.apache.spark.TaskContext

sc.parallelize(Seq[Int](), 4).mapPartitions(_ => {
    val ctx = TaskContext.get
    val stageId = ctx.stageId
    val partId = ctx.partitionId
    val hostname = java.net.InetAddress.getLocalHost().getHostName()
    Iterator(s"Stage: $stageId, Partition: $partId, Host: $hostname")
}).collect.foreach(println)

A similar functionality has been added to PySpark in Spark 2.2.0 (SPARK-18576):

from pyspark import TaskContext
import socket

def task_info(*_):
    ctx = TaskContext()
    return ["Stage: {0}, Partition: {1}, Host: {2}".format(
        ctx.stageId(), ctx.partitionId(), socket.gethostname())]

for x in sc.parallelize([], 4).mapPartitions(task_info).collect():
    print(x)
like image 74
zero323 Avatar answered Sep 26 '22 03:09

zero323