Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

merge two dataset which are having different column names in Apache spark

We need to merge two dataset which are having different column names, there are no common columns across the datasets.

We have tried couple of approaches, both of the approaches are not yielding result. Kindly let us know how to combine two dataset using Apache spark Java

Input data set 1

"405-048011-62815", "CRC Industries",

"630-0746","Dixon value",

"4444-444","3M INdustries",

"555-55","Dixon coupling valve"

Input dataset 2

"222-2222-5555", "Tata",

"7777-88886","WestSide",

"22222-22224","Reliance",

"33333-3333","V industries"

Expected out is

    ----------label1----|------sentence1------|------label2---|------sentence2-----------
    | 405-048011-62815  | CRC Industries      | 222-2222-5555 |                      Tata|
    |        630-0746   |   Dixon value       |   7777-88886  |                  WestSide|
    -------------------------------------------------------------------------------------

`

    List<Row> data = Arrays.asList(
                    RowFactory.create("405-048011-62815", "CRC Industries"),
                    RowFactory.create("630-0746","Dixon value"),
                    RowFactory.create("4444-444","3M INdustries"),
                    RowFactory.create("555-55","Dixon coupling valve"));

    StructType schema = new StructType(new StructField[] {new StructField("label1", DataTypes.StringType, false,Metadata.empty()),
            new StructField("sentence1", DataTypes.StringType, false,Metadata.empty()) });

    Dataset<Row> sentenceDataFrame = spark.createDataFrame(data, schema);

    List<String> listStrings = new ArrayList<String>();
    listStrings.add("405-048011-62815");
    listStrings.add("630-0746");

    Dataset<Row> matchFound1=sentenceDataFrame.filter(col("label1").isin(listStrings.stream().toArray(String[]::new)));
    matchFound1.show();
    listStrings.clear();
    listStrings.add("222-2222-5555");
    listStrings.add("7777-88886");

    List<Row> data2 = Arrays.asList(
            RowFactory.create("222-2222-5555", "Tata"),
            RowFactory.create("7777-88886","WestSide"),
            RowFactory.create("22222-22224","Reliance"),
            RowFactory.create("33333-3333","V industries"));

    StructType schema2 = new StructType(new StructField[] {new StructField("label2", DataTypes.StringType, false,Metadata.empty()),
    new StructField("sentence2", DataTypes.StringType, false,Metadata.empty()) });

    Dataset<Row> sentenceDataFrame2 = spark.createDataFrame(data2, schema2);

    Dataset<Row> matchFound2=sentenceDataFrame2.filter(col("label2").isin(listStrings.stream().toArray(String[]::new)));
    matchFound2.show();

    //Approach 1
    Dataset<Row> matchFound3=matchFound1.select(matchFound1.col("label1"),matchFound1.col("sentence1"),matchFound2.col("label2"),
            matchFound2.col("sentence2"));
    System.out.println("After concat");
    matchFound3.show();

    //Approach 2
    Dataset<Row> matchFound4=matchFound1.filter(concat((col("label1")),matchFound1.col("sentence1"),matchFound2.col("label2"),
            matchFound2.col("sentence2")));
    System.out.println("After concat 2");
    matchFound4.show();`

Error for each of the approaches are as follows

Approach 1 error

----------
org.apache.spark.sql.AnalysisException: resolved attribute(s) label2#10,sentence2#11 missing from label1#0,sentence1#1 in operator !Project [label1#0, sentence1#1, label2#10, sentence2#11];;
!Project [label1#0, sentence1#1, label2#10, sentence2#11]
+- Filter label1#0 IN (405-048011-62815,630-0746)
   +- LocalRelation [label1#0, sentence1#1]


----------
Error for each of the approaches are as follows
Approach 2 error
org.apache.spark.sql.AnalysisException: filter expression 'concat(`label1`, `sentence1`, `label2`, `sentence2`)' of type string is not a boolean.;;
!Filter concat(label1#0, sentence1#1, label2#10, sentence2#11)
+- Filter label1#0 IN (405-048011-62815,630-0746)
   +- LocalRelation [label1#0, sentence1#1]
like image 773
jeevitesh Avatar asked May 03 '17 07:05

jeevitesh


People also ask

How do I merge two DataFrames with different columns in spark?

Here In first dataframe (dataframe1) , the columns ['ID', 'NAME', 'Address'] and second dataframe (dataframe2 ) columns are ['ID','Age']. Now we have to add the Age column to the first dataframe and NAME and Address in the second dataframe, we can do this by using lit() function. This function is available in pyspark.

How do I merge two columns in spark?

Using concat() Function to Concatenate DataFrame Columns Spark SQL functions provide concat() to concatenate two or more DataFrame columns into a single Column. It can also take columns of different Data Types and concatenate them into a single column. for example, it supports String, Int, Boolean and also arrays.

How do I merge Datasets in spark?

Spark provides union() method in Dataset class to concatenate or append a Dataset to another. To append or concatenate two Datasets use Dataset. union() method on the first dataset and provide second Dataset as argument. Note: Dataset Union can only be performed on Datasets with the same number of columns.


2 Answers

hope this work for you

DF

val pre: Array[String] = Array("CRC Industries", "Dixon value" ,"3M INdustries" ,"Dixon coupling valve")
        val rea: Array[String] = Array("405048011-62815", "630-0746", "4444-444", "555-55")
        val df1 = sc.parallelize( rea zip pre).toDF("label1","sentence1")

        val preasons2: Array[String] = Array("Tata", "WestSide","Reliance", "V industries")
         val reasonsI2: Array[String] = Array( "222-2222-5555", "7777-88886", "22222-22224", "33333-3333")
        val df2 = sc.parallelize( reasonsI2 zip preasons2 ).toDF("label2","sentence2")

String Indexer

import org.apache.spark.ml.feature.StringIndexer

val indexer = new StringIndexer()
  .setInputCol("label1")
  .setOutputCol("label1Index")

val indexed = indexer.fit(df1).transform(df1)
indexed.show()

val indexer1 = new StringIndexer()
  .setInputCol("label2")
  .setOutputCol("label2Index")

val indexed1 = indexer1.fit(df2).transform(df2)
indexed1.show()

Join

    val rnd_reslt12 = indexed.join(indexed1 , indexed.col("label1Index")===indexed1.col("label2Index")).drop(indexed.col("label1Index")).drop(indexed1.col("label2Index"))
rnd_reslt12.show()

+---------------+--------------------+-------------+------------+
|         label1|           sentence1|       label2|   sentence2|
+---------------+--------------------+-------------+------------+
|       630-0746|         Dixon value|222-2222-5555|        Tata|
|       4444-444|       3M INdustries|  22222-22224|    Reliance|
|         555-55|Dixon coupling valve|   33333-3333|V industries|
|405048011-62815|      CRC Industries|   7777-88886|    WestSide|
+---------------+--------------------+-------------+------------+
like image 130
learner Avatar answered Oct 02 '22 13:10

learner


With string indexer i have done with java, this will work.

public class StringIndexer11  {

    public static void main(String[] args) {
        Dataset<Row> csvDataSet=null;
        try{
            System.setProperty("hadoop.home.dir", "D:\\AI matching\\winutil");
            JavaSparkContext sc = new JavaSparkContext(new SparkConf().setAppName("SparkJdbcDs").setMaster("local[*]"));
            SQLContext sqlContext = new SQLContext(sc);
            SparkSession spark = SparkSession.builder().appName("JavaTokenizerExample").getOrCreate();
            List<Row> data = Arrays.asList(
                    RowFactory.create("405-048011-62815", "CRC Industries"),
                    RowFactory.create("630-0746","Dixon value"),
                    RowFactory.create("4444-444","3M INdustries"),
                    RowFactory.create("555-55","Dixon coupling valve"));

            StructType schema = new StructType(new StructField[] {new StructField("label1", DataTypes.StringType, false,Metadata.empty()),
            new StructField("sentence1", DataTypes.StringType, false,Metadata.empty()) });

            Dataset<Row> sentenceDataFrame = spark.createDataFrame(data, schema);

            List<String> listStrings = new ArrayList<String>();
            listStrings.add("405-048011-62815");
            listStrings.add("630-0746");
             Dataset<Row> matchFound1=sentenceDataFrame.filter(col("label1").isin(listStrings.stream().toArray(String[]::new)));
            matchFound1.show();
            listStrings.clear();
            listStrings.add("222-2222-5555");
            listStrings.add("7777-88886");

            StringIndexer indexer = new StringIndexer()
              .setInputCol("label1")
              .setOutputCol("label1Index");
            Dataset<Row> Dataset1 = indexer.fit(matchFound1).transform(matchFound1);
            //Dataset1.show();


            List<Row> data2 = Arrays.asList(
            RowFactory.create("222-2222-5555", "Tata"),
            RowFactory.create("7777-88886","WestSide"),
            RowFactory.create("22222-22224","Reliance"),
            RowFactory.create("33333-3333","V industries"));

            StructType schema2 = new StructType(new StructField[] {new StructField("label2", DataTypes.StringType, false,Metadata.empty()),
            new StructField("sentence2", DataTypes.StringType, false,Metadata.empty()) });

            Dataset<Row> sentenceDataFrame2 = spark.createDataFrame(data2, schema2);

            Dataset<Row> matchFound2=sentenceDataFrame2.filter(col("label2").isin(listStrings.stream().toArray(String[]::new)));
            matchFound2.show();

            StringIndexer indexer1 = new StringIndexer()
              .setInputCol("label2")
              .setOutputCol("label2Index");
            Dataset<Row> Dataset2 = indexer1.fit(matchFound2).transform(matchFound2);
            //Dataset2.show();
            Dataset<Row> Finalresult = Dataset1.join(Dataset2 , Dataset1.col("label1Index").equalTo(Dataset2.col("label2Index"))).drop(Dataset1.col("label1Index")).drop(Dataset2.col("label2Index"));
                    Finalresult.show();


        }catch(Exception e)
        {
            e.printStackTrace();
        }

    }
like image 45
Sandesh Puttaraj Avatar answered Oct 02 '22 12:10

Sandesh Puttaraj