*Hi all,
I have an easy question for you all. I have an RDD, created from kafka streaming using createStream method. Now i want to add a timestamp as a value to this rdd before converting in to dataframe. I have tried doing to add a value to the dataframe using with withColumn() but returning this error*
val topicMaps = Map("topic" -> 1)
val now = java.util.Calendar.getInstance().getTime()
val messages = KafkaUtils.createStream[String, String, StringDecoder, StringDecoder](ssc, kafkaConf, topicMaps, StorageLevel.MEMORY_ONLY_SER)
messages.foreachRDD(rdd =>
{
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
import sqlContext.implicits._
val dataframe = sqlContext.read.json(rdd.map(_._2))
val d =dataframe.withColumn("timeStamp_column",dataframe.col("now"))
val d =dataframe.withColumn("timeStamp_column",dataframe.col("now")) org.apache.spark.sql.AnalysisException: Cannot resolve column name "now" among (action, device_os_ver, device_type, event_name, item_name, lat, lon, memberid, productUpccd, tenantid); at org.apache.spark.sql.DataFrame$$anonfun$resolve$1.apply(DataFrame.scala:15
As i came to know that DataFrames cannot be altered as they are immutable, but RDDs are immutable as well. Then what is the best way to do it. How to a value to the RDD(adding timestamp to an RDD dynamically).
In line 7, we use the to_datetime() function, which takes your entire data frame and creates a datetime object, to create a new column, Date-Time , in our data frame and save the new values.
PySpark to_timestamp() – Convert String to Timestamp typeUse <em>to_timestamp</em>() function to convert String to Timestamp (TimestampType) in PySpark. The converted time would be in a default format of MM-dd-yyyy HH:mm:ss.
PySpark timestamp ( TimestampType ) consists of value in the format yyyy-MM-dd HH:mm:ss. SSSS and Date ( DateType ) format would be yyyy-MM-dd . Use to_date() function to truncate time from Timestamp or to convert the timestamp to date on DataFrame column.
Try current_timestamp function.
import org.apache.spark.sql.functions.current_timestamp
df.withColumn("time_stamp", current_timestamp())
For add a new column with a constant like timestamp, you can use lit
function:
import org.apache.spark.sql.functions._
val newDF = oldDF.withColumn("timeStamp_column", lit(System.currentTimeMillis))
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With