Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Failing integration test for Apache Spark Streaming

I've been trying to track down an issue with some unit/integration tests I've been writing for an Apache Spark project.

When using Spark 1.1.1 my test passed. When I tried to upgrade to 1.4.0 (also tried 1.4.1) the test starts failing.

I've managed to reduce the code needed to reproduce the issue down to the small integration test below.

Interestingly, if I comment out the @RunWith annotation on the test then the test passes correctly. Obviously I don't need the @RunWith annotation for this cut down test, but the real tests make use of mocks fairly extensively, so I'd rather not have to drop using PowerMock.

package com.example;

import org.apache.spark.SparkConf;
import org.apache.spark.streaming.Duration;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.powermock.modules.junit4.PowerMockRunner;

@RunWith(PowerMockRunner.class)
public class SampleTest {

    @Before
    public void setup() throws Exception {
        SparkConf conf = new     SparkConf(false).setMaster("local[2]").setAppName("My app");
        JavaStreamingContext jsc = new JavaStreamingContext(conf, new Duration(1000));
    }

    @Test
    public void exampleTest() {
    }
}

Below is the exception I'm seeing

java.io.IOException: failure to login
    at org.apache.hadoop.security.UserGroupInformation.loginUserFromSubject(UserGroupInformation.java:796)
    at org.apache.hadoop.security.UserGroupInformation.getLoginUser(UserGroupInformation.java:748)
    at org.apache.hadoop.security.UserGroupInformation.getCurrentUser(UserGroupInformation.java:621)
    at org.apache.spark.util.Utils$$anonfun$getCurrentUserName$1.apply(Utils.scala:2162)
    at org.apache.spark.util.Utils$$anonfun$getCurrentUserName$1.apply(Utils.scala:2162)
    at scala.Option.getOrElse(Option.scala:120)
    at org.apache.spark.util.Utils$.getCurrentUserName(Utils.scala:2162)
    at org.apache.spark.SparkContext.<init>(SparkContext.scala:301)
    at org.apache.spark.streaming.StreamingContext$.createNewSparkContext(StreamingContext.scala:842)
    at org.apache.spark.streaming.StreamingContext.<init>(StreamingContext.scala:80)
    at org.apache.spark.streaming.api.java.JavaStreamingContext.<init>(JavaStreamingContext.scala:133)
    at com.example.SampleTest.setup(SampleTest.java:19)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:606)
    at org.junit.internal.runners.MethodRoadie.runBefores(MethodRoadie.java:133)
    at org.junit.internal.runners.MethodRoadie.runBeforesThenTestThenAfters(MethodRoadie.java:96)
    at org.powermock.modules.junit4.internal.impl.PowerMockJUnit44RunnerDelegateImpl$PowerMockJUnit44MethodRunner.executeTest(PowerMockJUnit44RunnerDelegateImpl.java:294)
    at org.powermock.modules.junit4.internal.impl.PowerMockJUnit47RunnerDelegateImpl$PowerMockJUnit47MethodRunner.executeTestInSuper(PowerMockJUnit47RunnerDelegateImpl.java:127)
    at org.powermock.modules.junit4.internal.impl.PowerMockJUnit47RunnerDelegateImpl$PowerMockJUnit47MethodRunner.executeTest(PowerMockJUnit47RunnerDelegateImpl.java:82)
    at org.powermock.modules.junit4.internal.impl.PowerMockJUnit44RunnerDelegateImpl$PowerMockJUnit44MethodRunner.runBeforesThenTestThenAfters(PowerMockJUnit44RunnerDelegateImpl.java:282)
    at org.junit.internal.runners.MethodRoadie.runTest(MethodRoadie.java:87)
    at org.junit.internal.runners.MethodRoadie.run(MethodRoadie.java:50)
    at org.powermock.modules.junit4.internal.impl.PowerMockJUnit44RunnerDelegateImpl.invokeTestMethod(PowerMockJUnit44RunnerDelegateImpl.java:207)
    at org.powermock.modules.junit4.internal.impl.PowerMockJUnit44RunnerDelegateImpl.runMethods(PowerMockJUnit44RunnerDelegateImpl.java:146)
    at org.powermock.modules.junit4.internal.impl.PowerMockJUnit44RunnerDelegateImpl$1.run(PowerMockJUnit44RunnerDelegateImpl.java:120)
    at org.junit.internal.runners.ClassRoadie.runUnprotected(ClassRoadie.java:34)
    at org.junit.internal.runners.ClassRoadie.runProtected(ClassRoadie.java:44)
    at org.powermock.modules.junit4.internal.impl.PowerMockJUnit44RunnerDelegateImpl.run(PowerMockJUnit44RunnerDelegateImpl.java:122)
    at org.powermock.modules.junit4.common.internal.impl.JUnit4TestSuiteChunkerImpl.run(JUnit4TestSuiteChunkerImpl.java:106)
    at org.powermock.modules.junit4.common.internal.impl.AbstractCommonPowerMockRunner.run(AbstractCommonPowerMockRunner.java:53)
    at org.powermock.modules.junit4.PowerMockRunner.run(PowerMockRunner.java:59)
    at org.eclipse.jdt.internal.junit4.runner.JUnit4TestReference.run(JUnit4TestReference.java:50)
    at org.eclipse.jdt.internal.junit.runner.TestExecution.run(TestExecution.java:38)
    at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.runTests(RemoteTestRunner.java:467)
    at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.runTests(RemoteTestRunner.java:683)
    at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.run(RemoteTestRunner.java:390)
    at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.main(RemoteTestRunner.java:197)
Caused by: javax.security.auth.login.LoginException: Can't find user name
    at org.apache.hadoop.security.UserGroupInformation$HadoopLoginModule.commit(UserGroupInformation.java:197)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:606)
    at javax.security.auth.login.LoginContext.invoke(LoginContext.java:784)
    at javax.security.auth.login.LoginContext.access$000(LoginContext.java:203)
    at javax.security.auth.login.LoginContext$5.run(LoginContext.java:721)
    at javax.security.auth.login.LoginContext$5.run(LoginContext.java:719)
    at java.security.AccessController.doPrivileged(Native Method)
    at javax.security.auth.login.LoginContext.invokeCreatorPriv(LoginContext.java:718)
    at javax.security.auth.login.LoginContext.login(LoginContext.java:591)
    at org.apache.hadoop.security.UserGroupInformation.loginUserFromSubject(UserGroupInformation.java:771)
    ... 38 more

The versions of the various dependencies are shown below

  • hadoop-client 2.6
  • apache spark 1.4.0 / 1.4.1
  • junit 4.12
  • easy mock 3.31
  • power mock 1.6.2

I've tried this with various versions of Spark. The above test passes with the following versions of Spark

  • 1.1.1
  • 1.2.2

It starts failing from Spark 1.3.0 onwards.

Any ideas what I need to change to get this to work?

like image 946
Glen Avatar asked Jul 21 '15 08:07

Glen


People also ask

Which API is used by Spark Streaming?

Spark Streaming is an extension of the core Spark API that allows data engineers and data scientists to process real-time data from various sources including (but not limited to) Kafka, Flume, and Amazon Kinesis. This processed data can be pushed out to file systems, databases, and live dashboards.

Can we skip integration testing?

So don't skip unit tests just because you have an integration test and don't skip integration tests because you have unit tests. One trick you can do to save some code is to make your tests work both as unit and integration by allowing them to mock or not mock dependencies depending on an input or setting.

What is the best time to perform integration testing?

Usually, integration testing is done after unit testing to ensure all the units work in harmony with each other. It is also done when support libraries are used along with the code.

Can Apache Spark handle stream processing?

Overview. Spark Streaming is an extension of the core Spark API that enables scalable, high-throughput, fault-tolerant stream processing of live data streams.


Video Answer


1 Answers

Looking at the source code for SparkContext, the line causing your exception while trying to get the current user name. In version 1.2 there was a fallback default SparkContext.SPARK_UNKNOWN_USER and it did not required currectly logged in user:

 // Set SPARK_USER for user who is running SparkContext.
 val sparkUser = Option {
     Option(System.getenv("SPARK_USER")).getOrElse(System.getProperty("user.name"))
  }.getOrElse {
      SparkContext.SPARK_UNKNOWN_USER
  }

This code introduced in version 1.3 does not have default user anymore hence why you don't get this error with earlier versions:

// Set SPARK_USER for user who is running SparkContext.
val sparkUser = Utils.getCurrentUserName()

This calls the following code in Utils:

/**
   * Returns the current user name. This is the currently logged in user, unless that's been
   * overridden by the `SPARK_USER` environment variable.
   */
  def getCurrentUserName(): String = {
    Option(System.getenv("SPARK_USER"))
      .getOrElse(UserGroupInformation.getCurrentUser().getShortUserName())
  }

If you set the environment variable SPARK_USER, you should be able to prevent the branching to UserGroupInformation which leads to your exception.

UserGroupInformation is a Hadoop Security class, and it looks like the use of PowerMock is preventing it from working properly.

like image 144
mattinbits Avatar answered Sep 22 '22 16:09

mattinbits