I'm running Scala Spark job using Zeppelin. When i run it i get below error:
latestForEachKey: org.apache.spark.sql.DataFrame = [PartitionStatement_1: string, PartitionYear_1: string ... 64 more fields]
<console>:393: error: Could not write class $$$$2e6199f161363585e7ae9b28bcf8535e$$$$iw because it exceeds JVM code size limits. Method <init>'s code too large!
class $iw extends Serializable {
Sometimes I'm not getting the error and it works. What can I do to fix this problem?
Here is the code that I run:
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
import sqlContext.implicits._
import org.apache.spark.{ SparkConf, SparkContext }
import java.sql.{Date, Timestamp}
import org.apache.spark.sql.Row
import org.apache.spark.sql.types._
import org.apache.spark.sql.functions.udf
import java.io.File
import org.apache.hadoop.fs._
import org.apache.spark.sql.functions.input_file_name
import org.apache.spark.sql.functions.regexp_extract
val getPartition = spark.udf.register("getPartition", (filePath: String) => filePath.split("\\.")(3))
val getYearAndStatementTypeCodePartition = spark.udf.register("getPartition", (filePath: String) => filePath.split("\\.")(4))
val get_partition_Year = spark.udf.register("get_partition_Year", (df1resultFinal: String) => df1resultFinal.split("-")(0))
val get_partition_Statement = spark.udf.register("get_partition_Year", (df1resultFinal: String) => df1resultFinal.split("-")(1))
val rdd = sc.textFile("s3://trfsmallfffile/FinancialStatementLineItem/MAIN")
val header = rdd.filter(_.contains("uniqueFundamentalSet")).map(line => line.split("\\|\\^\\|")).first()
val schema = StructType(header.map(cols => StructField(cols.replace(".", "_"), StringType)).toSeq)
val data = sqlContext.createDataFrame(rdd.filter(!_.contains("uniqueFundamentalSet")).map(line => Row.fromSeq(line.split("\\|\\^\\|").toSeq)), schema)
val schemaHeader = StructType(header.map(cols => StructField(cols.replace(".", "."), StringType)).toSeq)
val dataHeader = sqlContext.createDataFrame(rdd.filter(!_.contains("uniqueFundamentalSet")).map(line => Row.fromSeq(line.split("\\|\\^\\|").toSeq)), schemaHeader)
val df1resultFinal=data.withColumn("DataPartition", getPartition(input_file_name))
val df1resultFinalWithYear=df1resultFinal.withColumn("PartitionYear", get_partition_Year(getYearAndStatementTypeCodePartition(input_file_name)))
val df1resultFinalWithAllPartition=df1resultFinalWithYear.withColumn("PartitionStatement", get_partition_Statement(getYearAndStatementTypeCodePartition(input_file_name)))
val df1resultFinalwithTimestamp=df1resultFinalWithAllPartition
.withColumn("CapitalChangeAdjustmentDate",date_format(col("CapitalChangeAdjustmentDate"), "yyyy-MM-dd'T'HH:mm:ss'Z'"))
.withColumn("FinancialStatementLineItemValue", regexp_replace(format_number($"FinancialStatementLineItemValue".cast(DoubleType), 5), ",", ""))
.withColumn("AdjustedForCorporateActionValue", regexp_replace(format_number($"AdjustedForCorporateActionValue".cast(DoubleType), 5), ",", ""))
.withColumn("IsAsReportedCurrencySetManually", regexp_replace(format_number($"IsAsReportedCurrencySetManually".cast(DoubleType), 5), ",", ""))
.withColumn("ItemDisplayedValue", regexp_replace(format_number($"ItemDisplayedValue".cast(DoubleType), 5), ",", ""))
.withColumn("ReportedValue", regexp_replace(format_number($"ReportedValue".cast(DoubleType), 5), ",", ""))
.withColumn("AsReportedExchangeRate", regexp_replace(format_number($"AsReportedExchangeRate".cast(DoubleType), 5), ",", ""))
.withColumn("FinancialStatementLineItemValueUpperRange", regexp_replace(format_number($"FinancialStatementLineItemValueUpperRange".cast(DoubleType), 5), ",", ""))
.withColumn("FinancialStatementLineItemValueUpperRange", regexp_replace(format_number($"FinancialStatementLineItemValueUpperRange".cast(DoubleType), 5), ",", ""))
//Loading Incremental
val rdd1 = sc.textFile("s3://trfsmallfffile/FinancialStatementLineItem/INCR")
val header1 = rdd1.filter(_.contains("uniqueFundamentalSet")).map(line => line.split("\\|\\^\\|")).first()
val schema1 = StructType(header1.map(cols => StructField(cols.replace(".", "_"), StringType)).toSeq)
val data1 = sqlContext.createDataFrame(rdd1.filter(!_.contains("uniqueFundamentalSet")).map(line => Row.fromSeq(line.split("\\|\\^\\|").toSeq)), schema1)
val df2resultTimestamp=data1
.withColumn("CapitalChangeAdjustmentDate_1",date_format(col("CapitalChangeAdjustmentDate_1"), "yyyy-MM-dd'T'HH:mm:ss'Z'"))
.withColumn("FinancialStatementLineItemValue_1", regexp_replace(format_number($"FinancialStatementLineItemValue_1".cast(DoubleType), 5), ",", ""))
.withColumn("AdjustedForCorporateActionValue_1", regexp_replace(format_number($"AdjustedForCorporateActionValue_1".cast(DoubleType), 5), ",", ""))
.withColumn("IsAsReportedCurrencySetManually_1", regexp_replace(format_number($"IsAsReportedCurrencySetManually_1".cast(DoubleType), 5), ",", ""))
.withColumn("ItemDisplayedValue_1", regexp_replace(format_number($"ItemDisplayedValue_1".cast(DoubleType), 5), ",", ""))
.withColumn("ReportedValue_1", regexp_replace(format_number($"ReportedValue_1".cast(DoubleType), 5), ",", ""))
.withColumn("AsReportedExchangeRate_1", regexp_replace(format_number($"AsReportedExchangeRate_1".cast(DoubleType), 5), ",", ""))
.withColumn("FinancialStatementLineItemValueUpperRange_1", regexp_replace(format_number($"FinancialStatementLineItemValueUpperRange_1".cast(DoubleType), 5), ",", ""))
.withColumn("FinancialStatementLineItemValueUpperRange_1", regexp_replace(format_number($"FinancialStatementLineItemValueUpperRange_1".cast(DoubleType), 5), ",", ""))
import org.apache.spark.sql.expressions._
val windowSpec = Window.partitionBy("FinancialStatementLineItem_lineItemId", "PeriodId","SourceId","StatementTypeCode","StatementCurrencyId","uniqueFundamentalSet").orderBy($"TimeStamp".cast(LongType).desc)
val latestForEachKey = df2resultTimestamp.withColumn("rank", rank().over(windowSpec)).filter($"rank" === 1).drop("rank", "TimeStamp")
val dfMainOutput = df1resultFinalwithTimestamp.join(latestForEachKey, Seq("FinancialStatementLineItem_lineItemId", "PeriodId","SourceId","StatementTypeCode","StatementCurrencyId","uniqueFundamentalSet"), "outer")
.select($"uniqueFundamentalSet",$"PeriodId",$"SourceId",$"StatementTypeCode",$"StatementCurrencyId",$"FinancialStatementLineItem_lineItemId",
when($"DataPartition_1".isNotNull, $"DataPartition_1").otherwise($"DataPartition").as("DataPartition"),
when($"PartitionYear_1".isNotNull, $"PartitionYear_1").otherwise($"PartitionYear").as("PartitionYear"),
when($"PartitionStatement_1".isNotNull, $"PartitionStatement_1").otherwise($"PartitionStatement").as("PartitionStatement"),
when($"FinancialAsReportedLineItemName_1".isNotNull, $"FinancialAsReportedLineItemName_1").otherwise($"FinancialAsReportedLineItemName").as("FinancialAsReportedLineItemName"),
when($"FinancialAsReportedLineItemName_languageId_1".isNotNull, $"FinancialAsReportedLineItemName_languageId_1").otherwise($"FinancialAsReportedLineItemName_languageId").as("FinancialAsReportedLineItemName_languageId"),
when($"FinancialStatementLineItemValue_1".isNotNull, $"FinancialStatementLineItemValue_1").otherwise($"FinancialStatementLineItemValue").as("FinancialStatementLineItemValue"),
when($"AdjustedForCorporateActionValue_1".isNotNull, $"AdjustedForCorporateActionValue_1").otherwise($"AdjustedForCorporateActionValue").as("AdjustedForCorporateActionValue"),
when($"ReportedCurrencyId_1".isNotNull, $"ReportedCurrencyId_1").otherwise($"ReportedCurrencyId").as("ReportedCurrencyId"),
when($"IsAsReportedCurrencySetManually_1".isNotNull, $"IsAsReportedCurrencySetManually_1").otherwise($"IsAsReportedCurrencySetManually").as("IsAsReportedCurrencySetManually"),
when($"Unit_1".isNotNull, $"Unit_1").otherwise($"Unit").as("Unit"),
when($"IsTotal_1".isNotNull, $"IsTotal_1").otherwise($"IsTotal").as("IsTotal"),
when($"StatementSectionCode_1".isNotNull, $"StatementSectionCode_1").otherwise($"StatementSectionCode").as("StatementSectionCode"),
when($"DimentionalLineItemId_1".isNotNull, $"DimentionalLineItemId_1").otherwise($"DimentionalLineItemId").as("DimentionalLineItemId"),
when($"IsDerived_1".isNotNull, $"IsDerived_1").otherwise($"IsDerived").as("IsDerived"),
when($"EstimateMethodCode_1".isNotNull, $"EstimateMethodCode_1").otherwise($"EstimateMethodCode").as("EstimateMethodCode"),
when($"EstimateMethodNote_1".isNotNull, $"EstimateMethodNote_1").otherwise($"EstimateMethodNote").as("EstimateMethodNote"),
when($"EstimateMethodNote_languageId_1".isNotNull, $"EstimateMethodNote_languageId_1").otherwise($"EstimateMethodNote_languageId").as("EstimateMethodNote_languageId"),
when($"FinancialLineItemSource_1".isNotNull, $"FinancialLineItemSource_1").otherwise($"FinancialLineItemSource").as("FinancialLineItemSource"),
when($"IsCombinedItem_1".isNotNull, $"IsCombinedItem_1").otherwise($"IsCombinedItem").as("IsCombinedItem"),
when($"IsExcludedFromStandardization_1".isNotNull, $"IsExcludedFromStandardization_1").otherwise($"IsExcludedFromStandardization").as("IsExcludedFromStandardization"),
when($"DocByteOffset_1".isNotNull, $"DocByteOffset_1").otherwise($"DocByteOffset").as("DocByteOffset"),
when($"DocByteLength_1".isNotNull, $"DocByteLength_1").otherwise($"DocByteLength").as("DocByteLength"),
when($"BookMark_1".isNotNull, $"BookMark_1").otherwise($"BookMark").as("BookMark"),
when($"ItemDisplayedNegativeFlag_1".isNotNull, $"ItemDisplayedNegativeFlag_1").otherwise($"ItemDisplayedNegativeFlag").as("ItemDisplayedNegativeFlag"),
when($"ItemScalingFactor_1".isNotNull, $"ItemScalingFactor_1").otherwise($"ItemScalingFactor").as("ItemScalingFactor"),
when($"ItemDisplayedValue_1".isNotNull, $"ItemDisplayedValue_1").otherwise($"ItemDisplayedValue").as("ItemDisplayedValue"),
when($"ReportedValue_1".isNotNull, $"ReportedValue_1").otherwise($"ReportedValue").as("ReportedValue"),
when($"EditedDescription_1".isNotNull, $"EditedDescription_1").otherwise($"EditedDescription").as("EditedDescription"),
when($"EditedDescription_languageId_1".isNotNull, $"EditedDescription_languageId_1").otherwise($"EditedDescription_languageId").as("EditedDescription_languageId"),
when($"ReportedDescription_1".isNotNull, $"ReportedDescription_1").otherwise($"ReportedDescription").as("ReportedDescription"),
when($"ReportedDescription_languageId_1".isNotNull, $"ReportedDescription_languageId_1").otherwise($"ReportedDescription_languageId").as("ReportedDescription_languageId"),
when($"AsReportedInstanceSequence_1".isNotNull, $"AsReportedInstanceSequence_1").otherwise($"AsReportedInstanceSequence").as("AsReportedInstanceSequence"),
when($"PhysicalMeasureId_1".isNotNull, $"PhysicalMeasureId_1").otherwise($"PhysicalMeasureId").as("PhysicalMeasureId"),
when($"FinancialStatementLineItemSequence_1".isNotNull, $"FinancialStatementLineItemSequence_1").otherwise($"FinancialStatementLineItemSequence").as("FinancialStatementLineItemSequence"),
when($"SystemDerivedTypeCode_1".isNotNull, $"SystemDerivedTypeCode_1").otherwise($"SystemDerivedTypeCode").as("SystemDerivedTypeCode"),
when($"AsReportedExchangeRate_1".isNotNull, $"AsReportedExchangeRate_1").otherwise($"AsReportedExchangeRate").as("AsReportedExchangeRate"),
when($"AsReportedExchangeRateSourceCurrencyId_1".isNotNull, $"AsReportedExchangeRateSourceCurrencyId_1").otherwise($"AsReportedExchangeRateSourceCurrencyId").as("AsReportedExchangeRateSourceCurrencyId"),
when($"ThirdPartySourceCode_1".isNotNull, $"ThirdPartySourceCode_1").otherwise($"ThirdPartySourceCode").as("ThirdPartySourceCode"),
when($"FinancialStatementLineItemValueUpperRange_1".isNotNull, $"FinancialStatementLineItemValueUpperRange_1").otherwise($"FinancialStatementLineItemValueUpperRange").as("FinancialStatementLineItemValueUpperRange"),
when($"FinancialStatementLineItemLocalLanguageLabel_1".isNotNull, $"FinancialStatementLineItemLocalLanguageLabel_1").otherwise($"FinancialStatementLineItemLocalLanguageLabel").as("FinancialStatementLineItemLocalLanguageLabel"),
when($"FinancialStatementLineItemLocalLanguageLabel_languageId_1".isNotNull, $"FinancialStatementLineItemLocalLanguageLabel_languageId_1").otherwise($"FinancialStatementLineItemLocalLanguageLabel_languageId").as("FinancialStatementLineItemLocalLanguageLabel_languageId"),
when($"IsFinal_1".isNotNull, $"IsFinal_1").otherwise($"IsFinal").as("IsFinal"),
when($"FinancialStatementLineItem_lineItemInstanceKey_1".isNotNull, $"FinancialStatementLineItem_lineItemInstanceKey_1").otherwise($"FinancialStatementLineItem_lineItemInstanceKey").as("FinancialStatementLineItem_lineItemInstanceKey"),
when($"StatementSectionIsCredit_1".isNotNull, $"StatementSectionIsCredit_1").otherwise($"StatementSectionIsCredit").as("StatementSectionIsCredit"),
when($"CapitalChangeAdjustmentDate_1".isNotNull, $"CapitalChangeAdjustmentDate_1").otherwise($"CapitalChangeAdjustmentDate").as("CapitalChangeAdjustmentDate"),
when($"ParentLineItemId_1".isNotNull, $"ParentLineItemId_1").otherwise($"ParentLineItemId").as("ParentLineItemId"),
when($"EstimateMethodId_1".isNotNull, $"EstimateMethodId_1").otherwise($"EstimateMethodId").as("EstimateMethodId"),
when($"StatementSectionId_1".isNotNull, $"StatementSectionId_1").otherwise($"StatementSectionId").as("StatementSectionId"),
when($"SystemDerivedTypeCodeId_1".isNotNull, $"SystemDerivedTypeCodeId_1").otherwise($"SystemDerivedTypeCodeId").as("SystemDerivedTypeCodeId"),
when($"UnitEnumerationId_1".isNotNull, $"UnitEnumerationId_1").otherwise($"UnitEnumerationId").as("UnitEnumerationId"),
when($"FiscalYear_1".isNotNull, $"FiscalYear_1").otherwise($"FiscalYear").as("FiscalYear"),
when($"IsAnnual_1".isNotNull, $"IsAnnual_1").otherwise($"IsAnnual").as("IsAnnual"),
when($"PeriodPermId_1".isNotNull, $"PeriodPermId_1").otherwise($"PeriodPermId").as("PeriodPermId"),
when($"PeriodPermId_objectTypeId_1".isNotNull, $"PeriodPermId_objectTypeId_1").otherwise($"PeriodPermId_objectTypeId").as("PeriodPermId_objectTypeId"),
when($"PeriodPermId_objectType_1".isNotNull, $"PeriodPermId_objectType_1").otherwise($"PeriodPermId_objectType").as("PeriodPermId_objectType"),
when($"AuditID_1".isNotNull, $"AuditID_1").otherwise($"AuditID").as("AuditID"),
when($"AsReportedItemId_1".isNotNull, $"AsReportedItemId_1").otherwise($"AsReportedItemId").as("AsReportedItemId"),
when($"ExpressionInstanceId_1".isNotNull, $"ExpressionInstanceId_1").otherwise($"ExpressionInstanceId").as("ExpressionInstanceId"),
when($"ExpressionText_1".isNotNull, $"ExpressionText_1").otherwise($"ExpressionText").as("ExpressionText"),
when($"FFAction_1".isNotNull, $"FFAction_1").otherwise($"FFAction|!|").as("FFAction|!|"))
.filter(!$"FFAction|!|".contains("D|!|"))
val dfMainOutputFinal = dfMainOutput.na.fill("").select($"DataPartition", $"PartitionYear", $"PartitionStatement",concat_ws("|^|", dfMainOutput.schema.fieldNames.filter(_ != "DataPartition").filter(_ != "PartitionYear").filter(_ != "PartitionStatement").map(c => col(c)): _*).as("concatenated"))
val headerColumn = dataHeader.columns.toSeq
val header = headerColumn.mkString("", "|^|", "|!|").dropRight(3)
val dfMainOutputFinalWithoutNull = dfMainOutputFinal.withColumn("concatenated", regexp_replace(col("concatenated"), "|^|null", "")).withColumnRenamed("concatenated", header)
dfMainOutputFinalWithoutNull.repartition(1).write.partitionBy("DataPartition","PartitionYear","PartitionStatement")
.format("csv")
.option("nullValue", "")
.option("delimiter", "\t")
.option("quote", "\u0000")
.option("header", "true")
.option("codec", "gzip")
.save("s3://trfsmallfffile/FinancialStatementLineItem/output")
val FFRowCount =dfMainOutputFinalWithoutNull.groupBy("DataPartition","PartitionYear","PartitionStatement").count
FFRowCount.coalesce(1).write.format("com.databricks.spark.xml")
.option("rootTag", "FFFileType")
.option("rowTag", "FFPhysicalFile")
.save("s3://trffullfiles/FinancialStatementLineItem/Descr")
The script is very large but I dont know how can I make it smaller. Maybe by using Scala classes or something?
The maximum size of a method in Java (and in extension Scala) is 64KB of bytecode, see e.g. the question here. That means that you ahve too much code without splitting it up into multiple methods.
In your case, I would recommend the following:
Refactor parts of the code to be more consice. Especially when computing dfMainOutput
there are a lot of when
statements, it should be possible to do this in a more efficient and better looking way.
Since the size constraint is per method, you can simply move some of the code into their own methods. This has the added benifit of being easier to follow and read. For example, you can have a method called loadData()
which reads the database that returns a dataframe and another method for merging the df1resultFinalwithTimestamp
with latestForEachKey
. You can create methods for each section/part of the code.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With