Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Writing to Oracle Database using Apache Spark 1.4.0

I am trying to write some data to our Oracle database using Spark 1.4.0 DataFrame.write.jdbc() function.

The symmetric read.jdbc() function for reading data from Oracle Database to DataFrame objects works well. However while I am writing the dataframe back (I also tried to write exactly same object that I got from database setting CverWrite to true) gives the following exception:

Exception in thread "main" java.sql.SQLSyntaxErrorException: ORA-00902: Ungültiger Datentyp

    at oracle.jdbc.driver.T4CTTIoer.processError(T4CTTIoer.java:450)
    at oracle.jdbc.driver.T4CTTIoer.processError(T4CTTIoer.java:399)
    at oracle.jdbc.driver.T4C8Oall.processError(T4C8Oall.java:1017)
    at oracle.jdbc.driver.T4CTTIfun.receive(T4CTTIfun.java:655)
    at oracle.jdbc.driver.T4CTTIfun.doRPC(T4CTTIfun.java:249)
    at oracle.jdbc.driver.T4C8Oall.doOALL(T4C8Oall.java:566)
    at oracle.jdbc.driver.T4CPreparedStatement.doOall8(T4CPreparedStatement.java:215)
    at oracle.jdbc.driver.T4CPreparedStatement.doOall8(T4CPreparedStatement.java:58)
    at oracle.jdbc.driver.T4CPreparedStatement.executeForRows(T4CPreparedStatement.java:943)
    at oracle.jdbc.driver.OracleStatement.doExecuteWithTimeout(OracleStatement.java:1075)
    at oracle.jdbc.driver.OraclePreparedStatement.executeInternal(OraclePreparedStatement.java:3820)
    at oracle.jdbc.driver.OraclePreparedStatement.executeUpdate(OraclePreparedStatement.java:3897)
    at oracle.jdbc.driver.OraclePreparedStatementWrapper.executeUpdate(OraclePreparedStatementWrapper.java:1361)
    at org.apache.spark.sql.DataFrameWriter.jdbc(DataFrameWriter.scala:252)
    at main3$.main(main3.scala:72)
    at main3.main(main3.scala)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:497)
    at com.intellij.rt.execution.application.AppMain.main(AppMain.java:140)

The table has 2 basic string columns. When they are Integer, it can also write it.

Actually when I go deeper, I realize that it maps StringType to "TEXT" which is not recognized by Oracle (should be "VARCHAR" instead). The code is following from jdbc.scala which may be found at GitHub:

def schemaString(df: DataFrame, url: String): String = {
      val sb = new StringBuilder()
      val dialect = JdbcDialects.get(url)
      df.schema.fields foreach { field => {
        val name = field.name
        val typ: String =
          dialect.getJDBCType(field.dataType).map(_.databaseTypeDefinition).getOrElse(
          field.dataType match {
            case IntegerType => "INTEGER"
            case LongType => "BIGINT"
            case DoubleType => "DOUBLE PRECISION"
            case FloatType => "REAL"
            case ShortType => "INTEGER"
            case ByteType => "BYTE"
            case BooleanType => "BIT(1)"
            case StringType => "TEXT"
            case BinaryType => "BLOB"
            case TimestampType => "TIMESTAMP"
            case DateType => "DATE"
            case DecimalType.Unlimited => "DECIMAL(40,20)"
            case _ => throw new IllegalArgumentException(s"Don't know how to save $field to JDBC")
          })
        val nullable = if (field.nullable) "" else "NOT NULL"
        sb.append(s", $name $typ $nullable")
      }}
      if (sb.length < 2) "" else sb.substring(2)
    }

So the question is am I mistaken somewhere or SparkSQL does not support Oracle and should I install a plug-in to use SparkSQL with Oracle?

My simple main is:

val conf = new SparkConf().setAppName("Parser").setMaster("local[*]")
val sc = new SparkContext(conf)
val sqlContext = new SQLContext(sc)



val reader = sqlContext.read
val frame = reader.jdbc(url,"STUDENTS",connectionprop)

frame.printSchema()
frame.show()


val row = Row("3","4")


val struct =
  StructType(
    StructField("ONE", StringType, true) ::
      StructField("TWO", StringType, true) :: Nil)

val arr = Array(row)
val rddRow = sc.parallelize(arr)
val dframe = sqlContext.createDataFrame(rddRow,struct
)
dframe.printSchema()
dframe.show()

dframe.write.jdbc(url,"STUDENTS",connectionprop)
like image 931
Tolun Tosun Avatar asked Jul 08 '15 08:07

Tolun Tosun


1 Answers

Actual answer - it's not possible to write back to Oracle using existing DataFrame.write.jdbc() implementation in 1.4.0 But if you don't mind to upgrade to Spark 1.5 there is a little bit hackish way to do it. As described here there are two problems:

easy one - spark way to check table existence is not compatible with oracle

SELECT 1 FROM $table LIMIT 1

that can be easily avoided by direct save table utility method

org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils.saveTable(df, url, table, props)

and hard one (as you've properly guessed) - there is no Oracle specific data type dialect available out of the box. Adopted from the same article solution:

import org.apache.spark.sql.jdbc.{JdbcDialects, JdbcType, JdbcDialect}
import org.apache.spark.sql.types._

  val OracleDialect = new JdbcDialect {
    override def canHandle(url: String): Boolean = url.startsWith("jdbc:oracle") || url.contains("oracle")

    override def getJDBCType(dt: DataType): Option[JdbcType] = dt match {
      case StringType => Some(JdbcType("VARCHAR2(255)", java.sql.Types.VARCHAR))
      case BooleanType => Some(JdbcType("NUMBER(1)", java.sql.Types.NUMERIC))
      case IntegerType => Some(JdbcType("NUMBER(10)", java.sql.Types.NUMERIC))
      case LongType => Some(JdbcType("NUMBER(19)", java.sql.Types.NUMERIC))
      case DoubleType => Some(JdbcType("NUMBER(19,4)", java.sql.Types.NUMERIC))
      case FloatType => Some(JdbcType("NUMBER(19,4)", java.sql.Types.NUMERIC))
      case ShortType => Some(JdbcType("NUMBER(5)", java.sql.Types.NUMERIC))
      case ByteType => Some(JdbcType("NUMBER(3)", java.sql.Types.NUMERIC))
      case BinaryType => Some(JdbcType("BLOB", java.sql.Types.BLOB))
      case TimestampType => Some(JdbcType("DATE", java.sql.Types.DATE))
      case DateType => Some(JdbcType("DATE", java.sql.Types.DATE))
//      case DecimalType.Fixed(precision, scale) => Some(JdbcType("NUMBER(" + precision + "," + scale + ")", java.sql.Types.NUMERIC))
      case DecimalType.Unlimited => Some(JdbcType("NUMBER(38,4)", java.sql.Types.NUMERIC))
      case _ => None
    }
  }

    JdbcDialects.registerDialect(OracleDialect)

so, finally, working example should look similar to

  val url: String = "jdbc:oracle:thin:@your_domain:1521/dbname"
  val driver: String = "oracle.jdbc.OracleDriver"
  val props = new java.util.Properties()
  props.setProperty("user", "username")
  props.setProperty("password", "userpassword")
  org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils.saveTable(dataFrame, url, "table_name", props)
like image 166
MxR Avatar answered Oct 16 '22 07:10

MxR