Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Transforming one column into multiple ones in a Spark Dataframe

I have a big dataframe (1.2GB more or less) with this structure:

+---------+--------------+------------------------------------------------------------------------------------------------------+
| country |  date_data   |                                                 text                                                 |
+---------+--------------+------------------------------------------------------------------------------------------------------+
| "EEUU"  | "2016-10-03" | "T_D: QQWE\nT_NAME: name_1\nT_IN: ind_1\nT_C: c1ws12\nT_ADD: Sec_1_P\n ...........\nT_R: 45ee"       |
| "EEUU"  | "2016-10-03" | "T_D: QQAA\nT_NAME: name_2\nT_IN: ind_2\nT_C: c1ws12\nT_ADD: Sec_1_P\n ...........\nT_R: 46ee"       |
| .       | .            | .                                                                                                    |
| .       | .            | .                                                                                                    |
| "EEUU"  | "2016-10-03" | "T_D: QQWE\nT_NAME: name_300000\nT_IN: ind_65\nT_C: c1ws12\nT_ADD: Sec_1_P\n ...........\nT_R: 47aa" |
+---------+--------------+------------------------------------------------------------------------------------------------------+

The number of rows is 300.000 and "text" field is a string of 5000 characters approximately.

I would like to separate the field “text” in this new fields:

+---------+------------+------+-------------+--------+--------+---------+--------+------+
| country | date_data  | t_d  |   t_name    |  t_in  |  t_c   |  t_add  | ...... | t_r  |
+---------+------------+------+-------------+--------+--------+---------+--------+------+
| EEUU    | 2016-10-03 | QQWE | name_1      | ind_1  | c1ws12 | Sec_1_P | ...... | 45ee |
| EEUU    | 2016-10-03 | QQAA | name_2      | ind_2  | c1ws12 | Sec_1_P | ...... | 45ee |
| .       | .          | .    | .           | .      | .      | .       | .      |      |
| .       | .          | .    | .           | .      | .      | .       | .      |      |
| .       | .          | .    | .           | .      | .      | .       | .      |      |
| EEUU    | 2016-10-03 | QQWE | name_300000 | ind_65 | c1ws12 | Sec_1_P | ...... | 47aa |
+---------+------------+------+-------------+--------+--------+---------+--------+------+

Currently, I´m using regular expressions to solve this problem. Firstly, I write the regular expresions and create a function to extract individual fields from text (90 regular expressions in total):

val D_text = "((?<=T_D: ).*?(?=\\\\n))".r
val NAME_text = "((?<=nT_NAME: ).*?(?=\\\\n))".r
val IN_text = "((?<=T_IN: ).*?(?=\\\\n))".r
val C_text = "((?<=T_C: ).*?(?=\\\\n))".r
val ADD_text = "((?<=T_ADD: ).*?(?=\\\\n))".r
        .
        .
        .
        .
val R_text = "((?<=T_R: ).*?(?=\\\\n))".r   

//UDF function:
 def getFirst(pattern2: scala.util.matching.Regex) = udf(
          (url: String) => pattern2.findFirstIn(url) match { 
              case Some(texst_new) => texst_new
              case None => "NULL"
              case null => "NULL"
          }
   )

Then, I create a new Dataframe (tbl_separate_fields ) as a result of applying the function with a regular expression to extract every new field from text.

val tbl_separate_fields = hiveDF.select(
          hiveDF("country"),
          hiveDF("date_data"),   
          getFirst(D_text)(hiveDF("texst")).alias("t_d"),
          getFirst(NAME_text)(hiveDF("texst")).alias("t_name"),
          getFirst(IN_text)(hiveDF("texst")).alias("t_in"),
          getFirst(C_text)(hiveDF("texst")).alias("t_c"),
          getFirst(ADD_text)(hiveDF("texst")).alias("t_add"),
                            .
                            .
                            .
                            .

        getFirst(R_text)(hiveDF("texst")).alias("t_r") 

        )

Finally, I insert this dataframe into a Hive table:

tbl_separate_fields.registerTempTable("tbl_separate_fields") 
hiveContext.sql("INSERT INTO TABLE TABLE_INSERT PARTITION (date_data)  SELECT * FROM tbl_separate_fields")

This solution lasts for 1 hour for the entire dataframe so I wish to optimize and reduce the execution time. Is there any solution?

We are using Hadoop 2.7.1 and Apache-Spark 1.5.1. The configuration for Spark is:

val conf = new SparkConf().set("spark.storage.memoryFraction", "0.1")
val sc = new SparkContext(conf)
val hiveContext = new org.apache.spark.sql.hive.HiveContext(sc)

Thanks in advance.

EDIT DATA:

+---------+--------------+------------------------------------------------------------------------------------------------------+
| country |  date_data   |                                                 text                                                 |
+---------+--------------+------------------------------------------------------------------------------------------------------+
| "EEUU"  | "2016-10-03" | "T_D: QQWE\nT_NAME: name_1\nT_IN: ind_1\nT_C: c1ws12\nT_ADD: Sec_1_P\n ...........\nT_R: 45ee"       |
| "EEUU"  | "2016-10-03" | "T_NAME: name_2\nT_D: QQAA\nT_IN: ind_2\nT_C: c1ws12 ...........\nT_R: 46ee"                         |
| .       | .            | .                                                                                                    |
| .       | .            | .                                                                                                    |
| "EEUU"  | "2016-10-03" | "T_NAME: name_300000\nT_ADD: Sec_1_P\nT_IN: ind_65\nT_C: c1ws12\n ...........\nT_R: 47aa"            |
+---------+--------------+------------------------------------------------------------------------------------------------------+
like image 395
E.Aarón Avatar asked Oct 30 '22 18:10

E.Aarón


1 Answers

Using regular expressions in this case is slow and also fragile.

If you know that all records have the same structure, i.e. that all "text" values have the same number and order of "parts", the following code would work (for any number of columns), mainly taking advantage of the split function in org.apache.spark.sql.functions:

import org.apache.spark.sql.functions._

// first - split "text" column values into Arrays
val textAsArray: DataFrame = inputDF
  .withColumn("as_array", split(col("text"), "\n"))
  .drop("text")
  .cache()

// get a sample (first row) to get column names, can be skipped if you want to hard-code them:
val sampleText = textAsArray.first().getAs[mutable.WrappedArray[String]]("as_array").toArray
val columnNames: Array[(String, Int)] = sampleText.map(_.split(": ")(0)).zipWithIndex

// add Column per columnName with the right value and drop the no-longer-needed as_array column
val withValueColumns: DataFrame = columnNames.foldLeft(textAsArray) {
  case (df, (colName, index)) => df.withColumn(colName, split(col("as_array").getItem(index), ": ").getItem(1))
}.drop("as_array")

withValueColumns.show()
// for the sample data I created, 
// with just 4 "parts" in "text" column, this prints:
// +-------+----------+----+------+-----+------+
// |country| date_data| T_D|T_NAME| T_IN|   T_C|
// +-------+----------+----+------+-----+------+
// |   EEUU|2016-10-03|QQWE|name_1|ind_1|c1ws12|
// |   EEUU|2016-10-03|QQAA|name_2|ind_2|c1ws12|
// +-------+----------+----+------+-----+------+

Alternatively, if the assumption above is not true, you can use a UDF that converts the text column into a Map, and then perform a similar reduceLeft operation on the hard-coded list of desired columns:

import sqlContext.implicits._

// sample data: not the same order, not all records have all columns:
val inputDF: DataFrame = sc.parallelize(Seq(
  ("EEUU", "2016-10-03", "T_D: QQWE\nT_NAME: name_1\nT_IN: ind_1\nT_C: c1ws12"),
  ("EEUU", "2016-10-03", "T_D: QQAA\nT_IN: ind_2\nT_NAME: name_2")
)).toDF("country", "date_data", "text")

// hard-coded list of expected column names:
val columnNames: Seq[String] = Seq("T_D", "T_NAME", "T_IN", "T_C")

// UDF to convert text into key-value map
val asMap = udf[Map[String, String], String] { s =>
  s.split("\n").map(_.split(": ")).map { case Array(k, v) => k -> v }.toMap
}


val textAsMap = inputDF.withColumn("textAsMap", asMap(col("text"))).drop("text")

// for each column name - lookup the value in the map
val withValueColumns: DataFrame = columnNames.foldLeft(textAsMap) {
  case (df, colName) => df.withColumn(colName, col("textAsMap").getItem(colName))
}.drop("textAsMap")

withValueColumns.show()
// prints:
// +-------+----------+----+------+-----+------+
// |country| date_data| T_D|T_NAME| T_IN|   T_C|
// +-------+----------+----+------+-----+------+
// |   EEUU|2016-10-03|QQWE|name_1|ind_1|c1ws12|
// |   EEUU|2016-10-03|QQAA|name_2|ind_2|  null|
// +-------+----------+----+------+-----+------+
like image 140
Tzach Zohar Avatar answered Nov 10 '22 05:11

Tzach Zohar