Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

how can i add a timestamp as an extra column to my dataframe

*Hi all,

I have an easy question for you all. I have an RDD, created from kafka streaming using createStream method. Now i want to add a timestamp as a value to this rdd before converting in to dataframe. I have tried doing to add a value to the dataframe using with withColumn() but returning this error*

val topicMaps = Map("topic" -> 1)
    val now = java.util.Calendar.getInstance().getTime()

    val messages = KafkaUtils.createStream[String, String, StringDecoder, StringDecoder](ssc, kafkaConf, topicMaps, StorageLevel.MEMORY_ONLY_SER)

      messages.foreachRDD(rdd =>
          {

            val sqlContext = new org.apache.spark.sql.SQLContext(sc)
            import sqlContext.implicits._

            val dataframe = sqlContext.read.json(rdd.map(_._2))



        val d =dataframe.withColumn("timeStamp_column",dataframe.col("now"))

val d =dataframe.withColumn("timeStamp_column",dataframe.col("now")) org.apache.spark.sql.AnalysisException: Cannot resolve column name "now" among (action, device_os_ver, device_type, event_name, item_name, lat, lon, memberid, productUpccd, tenantid); at org.apache.spark.sql.DataFrame$$anonfun$resolve$1.apply(DataFrame.scala:15

As i came to know that DataFrames cannot be altered as they are immutable, but RDDs are immutable as well. Then what is the best way to do it. How to a value to the RDD(adding timestamp to an RDD dynamically).

like image 653
jack AKA karthik Avatar asked Jan 09 '17 08:01

jack AKA karthik


People also ask

How do I create a Timestamp column in pandas?

In line 7, we use the to_datetime() function, which takes your entire data frame and creates a datetime object, to create a new column, Date-Time , in our data frame and save the new values.

How to cast a column to Timestamp in PySpark?

PySpark to_timestamp() – Convert String to Timestamp typeUse <em>to_timestamp</em>() function to convert String to Timestamp (TimestampType) in PySpark. The converted time would be in a default format of MM-dd-yyyy HH:mm:ss.

How to format Timestamp PySpark?

PySpark timestamp ( TimestampType ) consists of value in the format yyyy-MM-dd HH:mm:ss. SSSS and Date ( DateType ) format would be yyyy-MM-dd . Use to_date() function to truncate time from Timestamp or to convert the timestamp to date on DataFrame column.


2 Answers

Try current_timestamp function.

import org.apache.spark.sql.functions.current_timestamp    
df.withColumn("time_stamp", current_timestamp())
like image 174
venkat Avatar answered Oct 24 '22 00:10

venkat


For add a new column with a constant like timestamp, you can use litfunction:

import org.apache.spark.sql.functions._
val newDF = oldDF.withColumn("timeStamp_column", lit(System.currentTimeMillis))
like image 33
Javier Montón Avatar answered Oct 24 '22 01:10

Javier Montón