Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Error: Could not write class iw because it exceeds JVM code size limits. Method code too large

I'm running Scala Spark job using Zeppelin. When i run it i get below error:

latestForEachKey: org.apache.spark.sql.DataFrame = [PartitionStatement_1: string, PartitionYear_1: string ... 64 more fields]
<console>:393: error: Could not write class $$$$2e6199f161363585e7ae9b28bcf8535e$$$$iw because it exceeds JVM code size limits. Method <init>'s code too large!
class $iw extends Serializable {

Sometimes I'm not getting the error and it works. What can I do to fix this problem?

Here is the code that I run:

val sqlContext = new org.apache.spark.sql.SQLContext(sc)
import sqlContext.implicits._

import org.apache.spark.{ SparkConf, SparkContext }
import java.sql.{Date, Timestamp}
import org.apache.spark.sql.Row
import org.apache.spark.sql.types._
import org.apache.spark.sql.functions.udf
import java.io.File
import org.apache.hadoop.fs._
import org.apache.spark.sql.functions.input_file_name
import org.apache.spark.sql.functions.regexp_extract

val getPartition = spark.udf.register("getPartition", (filePath: String) => filePath.split("\\.")(3))
val getYearAndStatementTypeCodePartition = spark.udf.register("getPartition", (filePath: String) => filePath.split("\\.")(4))
val get_partition_Year = spark.udf.register("get_partition_Year", (df1resultFinal: String) => df1resultFinal.split("-")(0))
val get_partition_Statement = spark.udf.register("get_partition_Year", (df1resultFinal: String) => df1resultFinal.split("-")(1))

val rdd = sc.textFile("s3://trfsmallfffile/FinancialStatementLineItem/MAIN")
val header = rdd.filter(_.contains("uniqueFundamentalSet")).map(line => line.split("\\|\\^\\|")).first()
val schema = StructType(header.map(cols => StructField(cols.replace(".", "_"), StringType)).toSeq)
val data = sqlContext.createDataFrame(rdd.filter(!_.contains("uniqueFundamentalSet")).map(line => Row.fromSeq(line.split("\\|\\^\\|").toSeq)), schema)

val schemaHeader = StructType(header.map(cols => StructField(cols.replace(".", "."), StringType)).toSeq)
val dataHeader = sqlContext.createDataFrame(rdd.filter(!_.contains("uniqueFundamentalSet")).map(line => Row.fromSeq(line.split("\\|\\^\\|").toSeq)), schemaHeader)

val df1resultFinal=data.withColumn("DataPartition", getPartition(input_file_name))
val df1resultFinalWithYear=df1resultFinal.withColumn("PartitionYear", get_partition_Year(getYearAndStatementTypeCodePartition(input_file_name)))
val df1resultFinalWithAllPartition=df1resultFinalWithYear.withColumn("PartitionStatement", get_partition_Statement(getYearAndStatementTypeCodePartition(input_file_name)))

val df1resultFinalwithTimestamp=df1resultFinalWithAllPartition
.withColumn("CapitalChangeAdjustmentDate",date_format(col("CapitalChangeAdjustmentDate"), "yyyy-MM-dd'T'HH:mm:ss'Z'"))
.withColumn("FinancialStatementLineItemValue", regexp_replace(format_number($"FinancialStatementLineItemValue".cast(DoubleType), 5), ",", ""))
.withColumn("AdjustedForCorporateActionValue", regexp_replace(format_number($"AdjustedForCorporateActionValue".cast(DoubleType), 5), ",", ""))
.withColumn("IsAsReportedCurrencySetManually", regexp_replace(format_number($"IsAsReportedCurrencySetManually".cast(DoubleType), 5), ",", ""))
.withColumn("ItemDisplayedValue", regexp_replace(format_number($"ItemDisplayedValue".cast(DoubleType), 5), ",", ""))
.withColumn("ReportedValue", regexp_replace(format_number($"ReportedValue".cast(DoubleType), 5), ",", ""))
.withColumn("AsReportedExchangeRate", regexp_replace(format_number($"AsReportedExchangeRate".cast(DoubleType), 5), ",", ""))
.withColumn("FinancialStatementLineItemValueUpperRange", regexp_replace(format_number($"FinancialStatementLineItemValueUpperRange".cast(DoubleType), 5), ",", ""))
.withColumn("FinancialStatementLineItemValueUpperRange", regexp_replace(format_number($"FinancialStatementLineItemValueUpperRange".cast(DoubleType), 5), ",", ""))

//Loading Incremental 

val rdd1 = sc.textFile("s3://trfsmallfffile/FinancialStatementLineItem/INCR")
val header1 = rdd1.filter(_.contains("uniqueFundamentalSet")).map(line => line.split("\\|\\^\\|")).first()
val schema1 = StructType(header1.map(cols => StructField(cols.replace(".", "_"), StringType)).toSeq)
val data1 = sqlContext.createDataFrame(rdd1.filter(!_.contains("uniqueFundamentalSet")).map(line => Row.fromSeq(line.split("\\|\\^\\|").toSeq)), schema1)

val df2resultTimestamp=data1
.withColumn("CapitalChangeAdjustmentDate_1",date_format(col("CapitalChangeAdjustmentDate_1"), "yyyy-MM-dd'T'HH:mm:ss'Z'"))
.withColumn("FinancialStatementLineItemValue_1", regexp_replace(format_number($"FinancialStatementLineItemValue_1".cast(DoubleType), 5), ",", ""))
.withColumn("AdjustedForCorporateActionValue_1", regexp_replace(format_number($"AdjustedForCorporateActionValue_1".cast(DoubleType), 5), ",", ""))
.withColumn("IsAsReportedCurrencySetManually_1", regexp_replace(format_number($"IsAsReportedCurrencySetManually_1".cast(DoubleType), 5), ",", ""))
.withColumn("ItemDisplayedValue_1", regexp_replace(format_number($"ItemDisplayedValue_1".cast(DoubleType), 5), ",", ""))
.withColumn("ReportedValue_1", regexp_replace(format_number($"ReportedValue_1".cast(DoubleType), 5), ",", ""))
.withColumn("AsReportedExchangeRate_1", regexp_replace(format_number($"AsReportedExchangeRate_1".cast(DoubleType), 5), ",", ""))
.withColumn("FinancialStatementLineItemValueUpperRange_1", regexp_replace(format_number($"FinancialStatementLineItemValueUpperRange_1".cast(DoubleType), 5), ",", ""))
.withColumn("FinancialStatementLineItemValueUpperRange_1", regexp_replace(format_number($"FinancialStatementLineItemValueUpperRange_1".cast(DoubleType), 5), ",", ""))


import org.apache.spark.sql.expressions._
val windowSpec = Window.partitionBy("FinancialStatementLineItem_lineItemId", "PeriodId","SourceId","StatementTypeCode","StatementCurrencyId","uniqueFundamentalSet").orderBy($"TimeStamp".cast(LongType).desc) 
val latestForEachKey = df2resultTimestamp.withColumn("rank", rank().over(windowSpec)).filter($"rank" === 1).drop("rank", "TimeStamp")


val dfMainOutput = df1resultFinalwithTimestamp.join(latestForEachKey, Seq("FinancialStatementLineItem_lineItemId", "PeriodId","SourceId","StatementTypeCode","StatementCurrencyId","uniqueFundamentalSet"), "outer")
      .select($"uniqueFundamentalSet",$"PeriodId",$"SourceId",$"StatementTypeCode",$"StatementCurrencyId",$"FinancialStatementLineItem_lineItemId",
        when($"DataPartition_1".isNotNull, $"DataPartition_1").otherwise($"DataPartition").as("DataPartition"),
        when($"PartitionYear_1".isNotNull, $"PartitionYear_1").otherwise($"PartitionYear").as("PartitionYear"),
        when($"PartitionStatement_1".isNotNull, $"PartitionStatement_1").otherwise($"PartitionStatement").as("PartitionStatement"),
        when($"FinancialAsReportedLineItemName_1".isNotNull, $"FinancialAsReportedLineItemName_1").otherwise($"FinancialAsReportedLineItemName").as("FinancialAsReportedLineItemName"),
        when($"FinancialAsReportedLineItemName_languageId_1".isNotNull, $"FinancialAsReportedLineItemName_languageId_1").otherwise($"FinancialAsReportedLineItemName_languageId").as("FinancialAsReportedLineItemName_languageId"),
        when($"FinancialStatementLineItemValue_1".isNotNull, $"FinancialStatementLineItemValue_1").otherwise($"FinancialStatementLineItemValue").as("FinancialStatementLineItemValue"),
        when($"AdjustedForCorporateActionValue_1".isNotNull, $"AdjustedForCorporateActionValue_1").otherwise($"AdjustedForCorporateActionValue").as("AdjustedForCorporateActionValue"),
        when($"ReportedCurrencyId_1".isNotNull, $"ReportedCurrencyId_1").otherwise($"ReportedCurrencyId").as("ReportedCurrencyId"),
        when($"IsAsReportedCurrencySetManually_1".isNotNull, $"IsAsReportedCurrencySetManually_1").otherwise($"IsAsReportedCurrencySetManually").as("IsAsReportedCurrencySetManually"),
        when($"Unit_1".isNotNull, $"Unit_1").otherwise($"Unit").as("Unit"),
        when($"IsTotal_1".isNotNull, $"IsTotal_1").otherwise($"IsTotal").as("IsTotal"),
        when($"StatementSectionCode_1".isNotNull, $"StatementSectionCode_1").otherwise($"StatementSectionCode").as("StatementSectionCode"),
        when($"DimentionalLineItemId_1".isNotNull, $"DimentionalLineItemId_1").otherwise($"DimentionalLineItemId").as("DimentionalLineItemId"),
        when($"IsDerived_1".isNotNull, $"IsDerived_1").otherwise($"IsDerived").as("IsDerived"),
        when($"EstimateMethodCode_1".isNotNull, $"EstimateMethodCode_1").otherwise($"EstimateMethodCode").as("EstimateMethodCode"),
        when($"EstimateMethodNote_1".isNotNull, $"EstimateMethodNote_1").otherwise($"EstimateMethodNote").as("EstimateMethodNote"),
        when($"EstimateMethodNote_languageId_1".isNotNull, $"EstimateMethodNote_languageId_1").otherwise($"EstimateMethodNote_languageId").as("EstimateMethodNote_languageId"),
        when($"FinancialLineItemSource_1".isNotNull, $"FinancialLineItemSource_1").otherwise($"FinancialLineItemSource").as("FinancialLineItemSource"),
        when($"IsCombinedItem_1".isNotNull, $"IsCombinedItem_1").otherwise($"IsCombinedItem").as("IsCombinedItem"),
        when($"IsExcludedFromStandardization_1".isNotNull, $"IsExcludedFromStandardization_1").otherwise($"IsExcludedFromStandardization").as("IsExcludedFromStandardization"),
        when($"DocByteOffset_1".isNotNull, $"DocByteOffset_1").otherwise($"DocByteOffset").as("DocByteOffset"),
        when($"DocByteLength_1".isNotNull, $"DocByteLength_1").otherwise($"DocByteLength").as("DocByteLength"),
        when($"BookMark_1".isNotNull, $"BookMark_1").otherwise($"BookMark").as("BookMark"),
        when($"ItemDisplayedNegativeFlag_1".isNotNull, $"ItemDisplayedNegativeFlag_1").otherwise($"ItemDisplayedNegativeFlag").as("ItemDisplayedNegativeFlag"),
        when($"ItemScalingFactor_1".isNotNull, $"ItemScalingFactor_1").otherwise($"ItemScalingFactor").as("ItemScalingFactor"),
        when($"ItemDisplayedValue_1".isNotNull, $"ItemDisplayedValue_1").otherwise($"ItemDisplayedValue").as("ItemDisplayedValue"),
        when($"ReportedValue_1".isNotNull, $"ReportedValue_1").otherwise($"ReportedValue").as("ReportedValue"),
        when($"EditedDescription_1".isNotNull, $"EditedDescription_1").otherwise($"EditedDescription").as("EditedDescription"),
        when($"EditedDescription_languageId_1".isNotNull, $"EditedDescription_languageId_1").otherwise($"EditedDescription_languageId").as("EditedDescription_languageId"),
        when($"ReportedDescription_1".isNotNull, $"ReportedDescription_1").otherwise($"ReportedDescription").as("ReportedDescription"),
        when($"ReportedDescription_languageId_1".isNotNull, $"ReportedDescription_languageId_1").otherwise($"ReportedDescription_languageId").as("ReportedDescription_languageId"),
        when($"AsReportedInstanceSequence_1".isNotNull, $"AsReportedInstanceSequence_1").otherwise($"AsReportedInstanceSequence").as("AsReportedInstanceSequence"),
        when($"PhysicalMeasureId_1".isNotNull, $"PhysicalMeasureId_1").otherwise($"PhysicalMeasureId").as("PhysicalMeasureId"),
        when($"FinancialStatementLineItemSequence_1".isNotNull, $"FinancialStatementLineItemSequence_1").otherwise($"FinancialStatementLineItemSequence").as("FinancialStatementLineItemSequence"),
        when($"SystemDerivedTypeCode_1".isNotNull, $"SystemDerivedTypeCode_1").otherwise($"SystemDerivedTypeCode").as("SystemDerivedTypeCode"),
        when($"AsReportedExchangeRate_1".isNotNull, $"AsReportedExchangeRate_1").otherwise($"AsReportedExchangeRate").as("AsReportedExchangeRate"),
        when($"AsReportedExchangeRateSourceCurrencyId_1".isNotNull, $"AsReportedExchangeRateSourceCurrencyId_1").otherwise($"AsReportedExchangeRateSourceCurrencyId").as("AsReportedExchangeRateSourceCurrencyId"),
        when($"ThirdPartySourceCode_1".isNotNull, $"ThirdPartySourceCode_1").otherwise($"ThirdPartySourceCode").as("ThirdPartySourceCode"),
        when($"FinancialStatementLineItemValueUpperRange_1".isNotNull, $"FinancialStatementLineItemValueUpperRange_1").otherwise($"FinancialStatementLineItemValueUpperRange").as("FinancialStatementLineItemValueUpperRange"),
        when($"FinancialStatementLineItemLocalLanguageLabel_1".isNotNull, $"FinancialStatementLineItemLocalLanguageLabel_1").otherwise($"FinancialStatementLineItemLocalLanguageLabel").as("FinancialStatementLineItemLocalLanguageLabel"),
        when($"FinancialStatementLineItemLocalLanguageLabel_languageId_1".isNotNull, $"FinancialStatementLineItemLocalLanguageLabel_languageId_1").otherwise($"FinancialStatementLineItemLocalLanguageLabel_languageId").as("FinancialStatementLineItemLocalLanguageLabel_languageId"),
        when($"IsFinal_1".isNotNull, $"IsFinal_1").otherwise($"IsFinal").as("IsFinal"),
        when($"FinancialStatementLineItem_lineItemInstanceKey_1".isNotNull, $"FinancialStatementLineItem_lineItemInstanceKey_1").otherwise($"FinancialStatementLineItem_lineItemInstanceKey").as("FinancialStatementLineItem_lineItemInstanceKey"),
        when($"StatementSectionIsCredit_1".isNotNull, $"StatementSectionIsCredit_1").otherwise($"StatementSectionIsCredit").as("StatementSectionIsCredit"),
        when($"CapitalChangeAdjustmentDate_1".isNotNull, $"CapitalChangeAdjustmentDate_1").otherwise($"CapitalChangeAdjustmentDate").as("CapitalChangeAdjustmentDate"),
        when($"ParentLineItemId_1".isNotNull, $"ParentLineItemId_1").otherwise($"ParentLineItemId").as("ParentLineItemId"),
        when($"EstimateMethodId_1".isNotNull, $"EstimateMethodId_1").otherwise($"EstimateMethodId").as("EstimateMethodId"),
        when($"StatementSectionId_1".isNotNull, $"StatementSectionId_1").otherwise($"StatementSectionId").as("StatementSectionId"),
        when($"SystemDerivedTypeCodeId_1".isNotNull, $"SystemDerivedTypeCodeId_1").otherwise($"SystemDerivedTypeCodeId").as("SystemDerivedTypeCodeId"),
        when($"UnitEnumerationId_1".isNotNull, $"UnitEnumerationId_1").otherwise($"UnitEnumerationId").as("UnitEnumerationId"),
        when($"FiscalYear_1".isNotNull, $"FiscalYear_1").otherwise($"FiscalYear").as("FiscalYear"),
        when($"IsAnnual_1".isNotNull, $"IsAnnual_1").otherwise($"IsAnnual").as("IsAnnual"),
        when($"PeriodPermId_1".isNotNull, $"PeriodPermId_1").otherwise($"PeriodPermId").as("PeriodPermId"),
        when($"PeriodPermId_objectTypeId_1".isNotNull, $"PeriodPermId_objectTypeId_1").otherwise($"PeriodPermId_objectTypeId").as("PeriodPermId_objectTypeId"),
        when($"PeriodPermId_objectType_1".isNotNull, $"PeriodPermId_objectType_1").otherwise($"PeriodPermId_objectType").as("PeriodPermId_objectType"),
        when($"AuditID_1".isNotNull, $"AuditID_1").otherwise($"AuditID").as("AuditID"),
        when($"AsReportedItemId_1".isNotNull, $"AsReportedItemId_1").otherwise($"AsReportedItemId").as("AsReportedItemId"),
        when($"ExpressionInstanceId_1".isNotNull, $"ExpressionInstanceId_1").otherwise($"ExpressionInstanceId").as("ExpressionInstanceId"),
        when($"ExpressionText_1".isNotNull, $"ExpressionText_1").otherwise($"ExpressionText").as("ExpressionText"),
         when($"FFAction_1".isNotNull, $"FFAction_1").otherwise($"FFAction|!|").as("FFAction|!|"))
        .filter(!$"FFAction|!|".contains("D|!|"))

val dfMainOutputFinal = dfMainOutput.na.fill("").select($"DataPartition", $"PartitionYear", $"PartitionStatement",concat_ws("|^|", dfMainOutput.schema.fieldNames.filter(_ != "DataPartition").filter(_ != "PartitionYear").filter(_ != "PartitionStatement").map(c => col(c)): _*).as("concatenated"))

val headerColumn = dataHeader.columns.toSeq

val header = headerColumn.mkString("", "|^|", "|!|").dropRight(3)

val dfMainOutputFinalWithoutNull = dfMainOutputFinal.withColumn("concatenated", regexp_replace(col("concatenated"), "|^|null", "")).withColumnRenamed("concatenated", header)



dfMainOutputFinalWithoutNull.repartition(1).write.partitionBy("DataPartition","PartitionYear","PartitionStatement")
  .format("csv")
  .option("nullValue", "")
  .option("delimiter", "\t")
  .option("quote", "\u0000")
  .option("header", "true")
  .option("codec", "gzip")
  .save("s3://trfsmallfffile/FinancialStatementLineItem/output")


  val FFRowCount =dfMainOutputFinalWithoutNull.groupBy("DataPartition","PartitionYear","PartitionStatement").count

  FFRowCount.coalesce(1).write.format("com.databricks.spark.xml")
  .option("rootTag", "FFFileType")
  .option("rowTag", "FFPhysicalFile")
   .save("s3://trffullfiles/FinancialStatementLineItem/Descr")

The script is very large but I dont know how can I make it smaller. Maybe by using Scala classes or something?

like image 598
Atharv Thakur Avatar asked Jan 09 '18 06:01

Atharv Thakur


1 Answers

The maximum size of a method in Java (and in extension Scala) is 64KB of bytecode, see e.g. the question here. That means that you ahve too much code without splitting it up into multiple methods.

In your case, I would recommend the following:

  1. Refactor parts of the code to be more consice. Especially when computing dfMainOutput there are a lot of when statements, it should be possible to do this in a more efficient and better looking way.

  2. Since the size constraint is per method, you can simply move some of the code into their own methods. This has the added benifit of being easier to follow and read. For example, you can have a method called loadData() which reads the database that returns a dataframe and another method for merging the df1resultFinalwithTimestamp with latestForEachKey. You can create methods for each section/part of the code.

like image 145
Shaido Avatar answered Sep 28 '22 14:09

Shaido