I've been trying to track down an issue with some unit/integration tests I've been writing for an Apache Spark project.
When using Spark 1.1.1 my test passed. When I tried to upgrade to 1.4.0 (also tried 1.4.1) the test starts failing.
I've managed to reduce the code needed to reproduce the issue down to the small integration test below.
Interestingly, if I comment out the @RunWith annotation on the test then the test passes correctly. Obviously I don't need the @RunWith annotation for this cut down test, but the real tests make use of mocks fairly extensively, so I'd rather not have to drop using PowerMock.
package com.example;
import org.apache.spark.SparkConf;
import org.apache.spark.streaming.Duration;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.powermock.modules.junit4.PowerMockRunner;
@RunWith(PowerMockRunner.class)
public class SampleTest {
@Before
public void setup() throws Exception {
SparkConf conf = new SparkConf(false).setMaster("local[2]").setAppName("My app");
JavaStreamingContext jsc = new JavaStreamingContext(conf, new Duration(1000));
}
@Test
public void exampleTest() {
}
}
Below is the exception I'm seeing
java.io.IOException: failure to login
at org.apache.hadoop.security.UserGroupInformation.loginUserFromSubject(UserGroupInformation.java:796)
at org.apache.hadoop.security.UserGroupInformation.getLoginUser(UserGroupInformation.java:748)
at org.apache.hadoop.security.UserGroupInformation.getCurrentUser(UserGroupInformation.java:621)
at org.apache.spark.util.Utils$$anonfun$getCurrentUserName$1.apply(Utils.scala:2162)
at org.apache.spark.util.Utils$$anonfun$getCurrentUserName$1.apply(Utils.scala:2162)
at scala.Option.getOrElse(Option.scala:120)
at org.apache.spark.util.Utils$.getCurrentUserName(Utils.scala:2162)
at org.apache.spark.SparkContext.<init>(SparkContext.scala:301)
at org.apache.spark.streaming.StreamingContext$.createNewSparkContext(StreamingContext.scala:842)
at org.apache.spark.streaming.StreamingContext.<init>(StreamingContext.scala:80)
at org.apache.spark.streaming.api.java.JavaStreamingContext.<init>(JavaStreamingContext.scala:133)
at com.example.SampleTest.setup(SampleTest.java:19)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at org.junit.internal.runners.MethodRoadie.runBefores(MethodRoadie.java:133)
at org.junit.internal.runners.MethodRoadie.runBeforesThenTestThenAfters(MethodRoadie.java:96)
at org.powermock.modules.junit4.internal.impl.PowerMockJUnit44RunnerDelegateImpl$PowerMockJUnit44MethodRunner.executeTest(PowerMockJUnit44RunnerDelegateImpl.java:294)
at org.powermock.modules.junit4.internal.impl.PowerMockJUnit47RunnerDelegateImpl$PowerMockJUnit47MethodRunner.executeTestInSuper(PowerMockJUnit47RunnerDelegateImpl.java:127)
at org.powermock.modules.junit4.internal.impl.PowerMockJUnit47RunnerDelegateImpl$PowerMockJUnit47MethodRunner.executeTest(PowerMockJUnit47RunnerDelegateImpl.java:82)
at org.powermock.modules.junit4.internal.impl.PowerMockJUnit44RunnerDelegateImpl$PowerMockJUnit44MethodRunner.runBeforesThenTestThenAfters(PowerMockJUnit44RunnerDelegateImpl.java:282)
at org.junit.internal.runners.MethodRoadie.runTest(MethodRoadie.java:87)
at org.junit.internal.runners.MethodRoadie.run(MethodRoadie.java:50)
at org.powermock.modules.junit4.internal.impl.PowerMockJUnit44RunnerDelegateImpl.invokeTestMethod(PowerMockJUnit44RunnerDelegateImpl.java:207)
at org.powermock.modules.junit4.internal.impl.PowerMockJUnit44RunnerDelegateImpl.runMethods(PowerMockJUnit44RunnerDelegateImpl.java:146)
at org.powermock.modules.junit4.internal.impl.PowerMockJUnit44RunnerDelegateImpl$1.run(PowerMockJUnit44RunnerDelegateImpl.java:120)
at org.junit.internal.runners.ClassRoadie.runUnprotected(ClassRoadie.java:34)
at org.junit.internal.runners.ClassRoadie.runProtected(ClassRoadie.java:44)
at org.powermock.modules.junit4.internal.impl.PowerMockJUnit44RunnerDelegateImpl.run(PowerMockJUnit44RunnerDelegateImpl.java:122)
at org.powermock.modules.junit4.common.internal.impl.JUnit4TestSuiteChunkerImpl.run(JUnit4TestSuiteChunkerImpl.java:106)
at org.powermock.modules.junit4.common.internal.impl.AbstractCommonPowerMockRunner.run(AbstractCommonPowerMockRunner.java:53)
at org.powermock.modules.junit4.PowerMockRunner.run(PowerMockRunner.java:59)
at org.eclipse.jdt.internal.junit4.runner.JUnit4TestReference.run(JUnit4TestReference.java:50)
at org.eclipse.jdt.internal.junit.runner.TestExecution.run(TestExecution.java:38)
at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.runTests(RemoteTestRunner.java:467)
at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.runTests(RemoteTestRunner.java:683)
at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.run(RemoteTestRunner.java:390)
at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.main(RemoteTestRunner.java:197)
Caused by: javax.security.auth.login.LoginException: Can't find user name
at org.apache.hadoop.security.UserGroupInformation$HadoopLoginModule.commit(UserGroupInformation.java:197)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at javax.security.auth.login.LoginContext.invoke(LoginContext.java:784)
at javax.security.auth.login.LoginContext.access$000(LoginContext.java:203)
at javax.security.auth.login.LoginContext$5.run(LoginContext.java:721)
at javax.security.auth.login.LoginContext$5.run(LoginContext.java:719)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.login.LoginContext.invokeCreatorPriv(LoginContext.java:718)
at javax.security.auth.login.LoginContext.login(LoginContext.java:591)
at org.apache.hadoop.security.UserGroupInformation.loginUserFromSubject(UserGroupInformation.java:771)
... 38 more
The versions of the various dependencies are shown below
I've tried this with various versions of Spark. The above test passes with the following versions of Spark
It starts failing from Spark 1.3.0 onwards.
Any ideas what I need to change to get this to work?
Spark Streaming is an extension of the core Spark API that allows data engineers and data scientists to process real-time data from various sources including (but not limited to) Kafka, Flume, and Amazon Kinesis. This processed data can be pushed out to file systems, databases, and live dashboards.
So don't skip unit tests just because you have an integration test and don't skip integration tests because you have unit tests. One trick you can do to save some code is to make your tests work both as unit and integration by allowing them to mock or not mock dependencies depending on an input or setting.
Usually, integration testing is done after unit testing to ensure all the units work in harmony with each other. It is also done when support libraries are used along with the code.
Overview. Spark Streaming is an extension of the core Spark API that enables scalable, high-throughput, fault-tolerant stream processing of live data streams.
Looking at the source code for SparkContext, the line causing your exception while trying to get the current user name. In version 1.2 there was a fallback default SparkContext.SPARK_UNKNOWN_USER
and it did not required currectly logged in user:
// Set SPARK_USER for user who is running SparkContext.
val sparkUser = Option {
Option(System.getenv("SPARK_USER")).getOrElse(System.getProperty("user.name"))
}.getOrElse {
SparkContext.SPARK_UNKNOWN_USER
}
This code introduced in version 1.3 does not have default user anymore hence why you don't get this error with earlier versions:
// Set SPARK_USER for user who is running SparkContext.
val sparkUser = Utils.getCurrentUserName()
This calls the following code in Utils
:
/**
* Returns the current user name. This is the currently logged in user, unless that's been
* overridden by the `SPARK_USER` environment variable.
*/
def getCurrentUserName(): String = {
Option(System.getenv("SPARK_USER"))
.getOrElse(UserGroupInformation.getCurrentUser().getShortUserName())
}
If you set the environment variable SPARK_USER
, you should be able to prevent the branching to UserGroupInformation which leads to your exception.
UserGroupInformation
is a Hadoop Security class, and it looks like the use of PowerMock
is preventing it from working properly.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With