In an usual structured_kafka_wordcount.py code,
When I split lines into words by udf
like below,
my_split = udf(lambda x: x.split(' '), ArrayType(StringType()))
words = lines.select(
explode(
my_split(lines.value)
)
)
the warning will keep showing:
WARN CachedKafkaConsumer: CachedKafkaConsumer is not running in UninterruptibleThread. It may hang when CachedKafkaConsumer's methods are interrupted because of KAFKA-1894
On the other hand, when I split the lines into words by pyspark.sql.functions.split
, everything works well.
words = lines.select(
explode(
split(lines.value, ' ')
)
)
Why this happened and how to fix the warning?
This is the code I am trying to execute in practice:
pattern = "(.+) message repeated (\\d) times: \\[ (.+)\\]"
prog = re.compile(pattern)
def _unfold(x):
ret = []
result = prog.match(x)
if result:
log = " ".join((result.group(1), result.group(3)))
times = result.group(2)
for _ in range(int(times)):
ret.append(log)
else:
ret.append(x)
return ret
_udf = udf(lambda x: _unfold(x), ArrayType(StringType()))
lines = lines.withColumn('value', explode(_udf(lines['value'])))
We first create a spark session, SparkSession provides a single point of entry to interact with underlying Spark functionality and allows programming Spark with DataFrame and Dataset APIs. To read from Kafka for streaming queries, we can use the function spark. readStream.
Since we already know that the data read from Kafka has “VALUE” attribute in it, we have to have a same-named column to write data back to Kafka Topic. In case the result consists of multiple columns, condense them to a JSON, cast as a string, write to a value column. Each column's data should be cast to String.
Writing batch queries is similar to streaming queries with the exception that we use the read method instead of the readStream method and write instead of writeStream .
Other than rejecting Python UDFs *, there is nothing you can do about this problem in you code. As you can read in the exception message UninterruptibleThread
is a workaround to Kafka bug (KAFKA-1894) and is designed to prevent infinite loop, when interrupting KafkaConsumer
.
It is not used with PythonUDFRunner
(it probably wouldn't makes sense, to introduce special case there).
Personally I wouldn't worry about it unless you experience some related issues. Your Python code will never interact directly with KafkaConsumer
. And if you experience any issues, there should fixed upstream - in that case I recommend creating a JIRA ticket.
* Your unfold
function can be rewritten with SQL functions, but it will be a hack. Add message count as integer:
from pyspark.sql.functions import concat_ws, col, expr, coalesce, lit, regexp_extract, when
p = "(.+) message repeated (\\d) times: \\[ (.+)\\]"
lines = spark.createDataFrame(
["asd message repeated 3 times: [ 12]", "some other message"], "string"
)
lines_with_count = lines.withColumn(
"message_count", coalesce(regexp_extract("value", p, 2).cast("int"), lit(1)))
Use it to explode
exploded = lines_with_count.withColumn(
"i",
expr("explode(split(repeat('1', message_count - 1),''))")
).drop("message_count", "i")
and extract:
exploded.withColumn(
"value",
when(
col("value").rlike(p),
concat_ws(" ", regexp_extract("value", p, 1), regexp_extract("value", p, 3))
).otherwise(col("value"))).show(4, False)
# +------------------+
# |value |
# +------------------+
# |asd 12 |
# |asd 12 |
# |asd 12 |
# |some other message|
# +------------------+
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With