Logo Questions Linux Laravel Mysql Ubuntu Git Menu

UDF cause warning: CachedKafkaConsumer is not running in UninterruptibleThread (KAFKA-1894)

In an usual structured_kafka_wordcount.py code,

When I split lines into words by udf like below,

my_split = udf(lambda x: x.split(' '), ArrayType(StringType()))

words = lines.select(

the warning will keep showing:

WARN CachedKafkaConsumer: CachedKafkaConsumer is not running in UninterruptibleThread. It may hang when CachedKafkaConsumer's methods are interrupted because of KAFKA-1894

On the other hand, when I split the lines into words by pyspark.sql.functions.split, everything works well.

words = lines.select(
        split(lines.value, ' ') 

Why this happened and how to fix the warning?

This is the code I am trying to execute in practice:

pattern = "(.+) message repeated (\\d) times: \\[ (.+)\\]"
prog = re.compile(pattern)

def _unfold(x):
    ret = []
    result = prog.match(x)
    if result:
        log = " ".join((result.group(1), result.group(3)))
        times = result.group(2)
        for _ in range(int(times)):

    return ret

_udf = udf(lambda x: _unfold(x), ArrayType(StringType()))
lines = lines.withColumn('value', explode(_udf(lines['value'])))
like image 389
petertc Avatar asked Jan 17 '18 07:01


People also ask

How do I connect Pyspark to Kafka?

We first create a spark session, SparkSession provides a single point of entry to interact with underlying Spark functionality and allows programming Spark with DataFrame and Dataset APIs. To read from Kafka for streaming queries, we can use the function spark. readStream.

How do you read data from Kafka topic using Pyspark?

Since we already know that the data read from Kafka has “VALUE” attribute in it, we have to have a same-named column to write data back to Kafka Topic. In case the result consists of multiple columns, condense them to a JSON, cast as a string, write to a value column. Each column's data should be cast to String.

Which method we will use for reading a batch queries in Kafka?

Writing batch queries is similar to streaming queries with the exception that we use the read method instead of the readStream method and write instead of writeStream .

1 Answers

Other than rejecting Python UDFs *, there is nothing you can do about this problem in you code. As you can read in the exception message UninterruptibleThread is a workaround to Kafka bug (KAFKA-1894) and is designed to prevent infinite loop, when interrupting KafkaConsumer.

It is not used with PythonUDFRunner (it probably wouldn't makes sense, to introduce special case there).

Personally I wouldn't worry about it unless you experience some related issues. Your Python code will never interact directly with KafkaConsumer. And if you experience any issues, there should fixed upstream - in that case I recommend creating a JIRA ticket.

* Your unfold function can be rewritten with SQL functions, but it will be a hack. Add message count as integer:

from pyspark.sql.functions import concat_ws, col, expr, coalesce, lit, regexp_extract, when

p = "(.+) message repeated (\\d) times: \\[ (.+)\\]"

lines = spark.createDataFrame(
    ["asd message repeated 3 times: [ 12]", "some other message"], "string"

lines_with_count = lines.withColumn(
   "message_count", coalesce(regexp_extract("value", p, 2).cast("int"), lit(1)))

Use it to explode

exploded = lines_with_count.withColumn(
      expr("explode(split(repeat('1', message_count - 1),''))")
).drop("message_count", "i")

and extract:

         concat_ws(" ", regexp_extract("value", p, 1), regexp_extract("value", p, 3))
    ).otherwise(col("value"))).show(4, False)

# +------------------+
# |value             |
# +------------------+
# |asd 12            |
# |asd 12            |
# |asd 12            |
# |some other message|
# +------------------+
like image 128
Alper t. Turker Avatar answered Nov 10 '22 09:11

Alper t. Turker