I am trying to read big files in Spark Scala and trying to perform join on then . When I test with the small files it works very well but with bigger files I get some time below error .
I managed to pull out one of the files for which I was getting error . The file size is 1 GB and while creating the partition at last this error is thrown where I split file name to get the column .
Right after this line
val rdd = sc.textFile(mainFileURL)
val header = rdd.filter(_.contains("uniqueFundamentalSet")).map(line => line.split("\\|\\^\\|")).first()
val schema = StructType(header.map(cols => StructField(cols.replace(".", "_"), StringType)).toSeq)
println(schema)
val data = sqlContext.createDataFrame(rdd.filter(!_.contains("uniqueFundamentalSet")).map(line => Row.fromSeq(line.split("\\|\\^\\|").toSeq)), schema)
This is the culprit line
val data = sqlContext.createDataFrame(rdd.filter(!_.contains("uniqueFundamentalSet")).map(line => Row.fromSeq(line.split("\\|\\^\\|").toSeq)), schema)
Please suggest how can I handle this.
When I do rdd.count I get value . But when I do data.count() I get the error
Caused by: java.lang.RuntimeException: Error while encoding: java.lang.ArrayIndexOutOfBoundsException: 37
if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) null else staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 0, uniqueFundamentalSet), StringType), true) AS uniqueFundamentalSet#0
I
Here is my sample data set
uniqueFundamentalSet|^|PeriodId|^|SourceId|^|StatementTypeCode|^|StatementCurrencyId|^|FinancialStatementLineItem.lineItemId|^|FinancialAsReportedLineItemName|^|FinancialAsReportedLineItemName.languageId|^|FinancialStatementLineItemValue|^|AdjustedForCorporateActionValue|^|ReportedCurrencyId|^|IsAsReportedCurrencySetManually|^|Unit|^|IsTotal|^|StatementSectionCode|^|DimentionalLineItemId|^|IsDerived|^|EstimateMethodCode|^|EstimateMethodNote|^|EstimateMethodNote.languageId|^|FinancialLineItemSource|^|IsCombinedItem|^|IsExcludedFromStandardization|^|DocByteOffset|^|DocByteLength|^|BookMark|^|ItemDisplayedNegativeFlag|^|ItemScalingFactor|^|ItemDisplayedValue|^|ReportedValue|^|EditedDescription|^|EditedDescription.languageId|^|ReportedDescription|^|ReportedDescription.languageId|^|AsReportedInstanceSequence|^|PhysicalMeasureId|^|FinancialStatementLineItemSequence|^|SystemDerivedTypeCode|^|AsReportedExchangeRate|^|AsReportedExchangeRateSourceCurrencyId|^|ThirdPartySourceCode|^|FinancialStatementLineItemValueUpperRange|^|FinancialStatementLineItemLocalLanguageLabel|^|FinancialStatementLineItemLocalLanguageLabel.languageId|^|IsFinal|^|FinancialStatementLineItem.lineItemInstanceKey|^|StatementSectionIsCredit|^|CapitalChangeAdjustmentDate|^|ParentLineItemId|^|EstimateMethodId|^|StatementSectionId|^|SystemDerivedTypeCodeId|^|UnitEnumerationId|^|FiscalYear|^|IsAnnual|^|PeriodPermId|^|PeriodPermId.objectTypeId|^|PeriodPermId.objectType|^|AuditID|^|AsReportedItemId|^|ExpressionInstanceId|^|ExpressionText|^|FFAction|!|
192730239205|^|235|^|1|^|FTN|^|500186|^|221|^|Average Age of Employees|^|505074|^|30.00000|^||^||^|False|^|1.00000|^|False|^|EMP|^||^|False|^|ARV|^||^|505074|^||^|False|^|False|^||^||^||^||^|0|^||^||^||^|505074|^||^|505074|^||^||^|122880|^|NA|^||^||^|TK |^||^||^|505126|^|True|^|1235002211206722736|^|True|^||^||^|3019656|^|3013652|^|3019679|^|1010066|^|1976|^|True|^||^|1000220295|^||^||^||^||^||^|I|!|
filter out the rows which doesn't match
One of the easiest way is to filter out all the rows which doesn't match the length of schema before applying the schema to form a dataframe as
val requiredNumberOfFields = schema.fieldNames.length //added to take the number of columns required
val data = sqlContext
.createDataFrame(
rdd
.filter(!_.contains("uniqueFundamentalSet"))
.map(line => line.split("\\|\\^\\|"))
.filter(_.length == requiredNumberOfFields) //added to filter in only the rows which has the same number of fields required in schema
.map(x => Row.fromSeq(x.toSeq))
, schema)
add dummy strings or filter out extra strings
You can write a function to check for the length. If the length of data is less than the schema then you can add dummy strings. If the length of the data is more you can drop the extra data
val requiredNumberOfFields = schema.fieldNames.length
def appendDummyData(row: Array[String], len: Int) = row.length == len match {
case true => row
case false => if(len > row.length) {
val add = (for(loop <- 1 to len - row.length) yield "dummy").toArray
row ++ add
} else row.take(len)
}
val data = sqlContext
.createDataFrame(
rdd
.filter(!_.contains("uniqueFundamentalSet"))
.map(line => line.split("\\|\\^\\|"))
.map(x => Row.fromSeq(appendDummyData(x, requiredNumberOfFields).toSeq)) //calling the custom function for checking the length
, schema)
I hope the answer is helpful
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With