Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

ArrayIndexOutOfBoundsException while encoding in Spark Scala

I am trying to read big files in Spark Scala and trying to perform join on then . When I test with the small files it works very well but with bigger files I get some time below error .

I managed to pull out one of the files for which I was getting error . The file size is 1 GB and while creating the partition at last this error is thrown where I split file name to get the column .

Right after this line

 val rdd = sc.textFile(mainFileURL)
      val header = rdd.filter(_.contains("uniqueFundamentalSet")).map(line => line.split("\\|\\^\\|")).first()
      val schema = StructType(header.map(cols => StructField(cols.replace(".", "_"), StringType)).toSeq)
      println(schema)
      val data = sqlContext.createDataFrame(rdd.filter(!_.contains("uniqueFundamentalSet")).map(line => Row.fromSeq(line.split("\\|\\^\\|").toSeq)), schema)

This is the culprit line

 val data = sqlContext.createDataFrame(rdd.filter(!_.contains("uniqueFundamentalSet")).map(line => Row.fromSeq(line.split("\\|\\^\\|").toSeq)), schema)

Please suggest how can I handle this.

When I do rdd.count I get value . But when I do data.count() I get the error

Caused by: java.lang.RuntimeException: Error while encoding: java.lang.ArrayIndexOutOfBoundsException: 37
if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) null else staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 0, uniqueFundamentalSet), StringType), true) AS uniqueFundamentalSet#0
I

Here is my sample data set

uniqueFundamentalSet|^|PeriodId|^|SourceId|^|StatementTypeCode|^|StatementCurrencyId|^|FinancialStatementLineItem.lineItemId|^|FinancialAsReportedLineItemName|^|FinancialAsReportedLineItemName.languageId|^|FinancialStatementLineItemValue|^|AdjustedForCorporateActionValue|^|ReportedCurrencyId|^|IsAsReportedCurrencySetManually|^|Unit|^|IsTotal|^|StatementSectionCode|^|DimentionalLineItemId|^|IsDerived|^|EstimateMethodCode|^|EstimateMethodNote|^|EstimateMethodNote.languageId|^|FinancialLineItemSource|^|IsCombinedItem|^|IsExcludedFromStandardization|^|DocByteOffset|^|DocByteLength|^|BookMark|^|ItemDisplayedNegativeFlag|^|ItemScalingFactor|^|ItemDisplayedValue|^|ReportedValue|^|EditedDescription|^|EditedDescription.languageId|^|ReportedDescription|^|ReportedDescription.languageId|^|AsReportedInstanceSequence|^|PhysicalMeasureId|^|FinancialStatementLineItemSequence|^|SystemDerivedTypeCode|^|AsReportedExchangeRate|^|AsReportedExchangeRateSourceCurrencyId|^|ThirdPartySourceCode|^|FinancialStatementLineItemValueUpperRange|^|FinancialStatementLineItemLocalLanguageLabel|^|FinancialStatementLineItemLocalLanguageLabel.languageId|^|IsFinal|^|FinancialStatementLineItem.lineItemInstanceKey|^|StatementSectionIsCredit|^|CapitalChangeAdjustmentDate|^|ParentLineItemId|^|EstimateMethodId|^|StatementSectionId|^|SystemDerivedTypeCodeId|^|UnitEnumerationId|^|FiscalYear|^|IsAnnual|^|PeriodPermId|^|PeriodPermId.objectTypeId|^|PeriodPermId.objectType|^|AuditID|^|AsReportedItemId|^|ExpressionInstanceId|^|ExpressionText|^|FFAction|!|
192730239205|^|235|^|1|^|FTN|^|500186|^|221|^|Average Age of Employees|^|505074|^|30.00000|^||^||^|False|^|1.00000|^|False|^|EMP|^||^|False|^|ARV|^||^|505074|^||^|False|^|False|^||^||^||^||^|0|^||^||^||^|505074|^||^|505074|^||^||^|122880|^|NA|^||^||^|TK |^||^||^|505126|^|True|^|1235002211206722736|^|True|^||^||^|3019656|^|3013652|^|3019679|^|1010066|^|1976|^|True|^||^|1000220295|^||^||^||^||^||^|I|!|
like image 972
Atharv Thakur Avatar asked Sep 19 '25 06:09

Atharv Thakur


1 Answers

filter out the rows which doesn't match

One of the easiest way is to filter out all the rows which doesn't match the length of schema before applying the schema to form a dataframe as

val requiredNumberOfFields = schema.fieldNames.length   //added to take the number of columns required
val data = sqlContext
  .createDataFrame(
    rdd
      .filter(!_.contains("uniqueFundamentalSet"))
      .map(line => line.split("\\|\\^\\|"))
      .filter(_.length == requiredNumberOfFields)    //added to filter in only the rows which has the same number of fields required in schema
      .map(x => Row.fromSeq(x.toSeq))
    , schema)

add dummy strings or filter out extra strings

You can write a function to check for the length. If the length of data is less than the schema then you can add dummy strings. If the length of the data is more you can drop the extra data

val requiredNumberOfFields = schema.fieldNames.length
def appendDummyData(row: Array[String], len: Int) = row.length == len match {
  case true => row
  case false => if(len > row.length) {
    val add = (for(loop <- 1 to len - row.length) yield "dummy").toArray
    row ++ add
  } else row.take(len)
}
val data = sqlContext
  .createDataFrame(
    rdd
      .filter(!_.contains("uniqueFundamentalSet"))
      .map(line => line.split("\\|\\^\\|"))
      .map(x => Row.fromSeq(appendDummyData(x, requiredNumberOfFields).toSeq))   //calling the custom function for checking the length
    , schema)

I hope the answer is helpful

like image 127
Ramesh Maharjan Avatar answered Sep 21 '25 18:09

Ramesh Maharjan