I am developing a custom interpreter for a domain specific language. Based on the example given in the Apache Zeppelin documentation (https://zeppelin.incubator.apache.org/docs/latest/development/writingzeppelininterpreter.html), the interpreter works pretty well. Now I want to store some results in a new DataFrame.
I found code to create DataFrames (http://spark.apache.org/docs/latest/sql-programming-guide.html), but I cant use this in my interpreter because I basically dont find a way to access a valid runtime SparkContext (often called "sc") from within my custom interpreter.
I tried (static) SparkContext.getOrCreate() but this even led to a ClassNotFoundException. Then I added the whole zeppelin-spark-dependencies...jar to my interpreter folder, which solved the class loading issue but now I am getting a SparkException ("master url must be set...").
Any idea how I could get access to my Notebook's SparkContext from within the custom interpreter? Thanks a lot!
UPDATE
Thanks to Kangrok Lee's comment below, my code now looks as follows: see below. It runs and seems to create a DataFrame (at least it doesnt throw any Exception any more). But I can not consume the created DataFrame in a subsequent SQL paragraph (the first paragraph uses my "%opl" interpreter, as given below, that should create the "result" DataFrame):
%opl
1 2 3
> 1
> 2
> 3
%sql
select * from result
> Table not found: result; line 1 pos 14
So probably there is still something wrong with my way of dealing with the SparkContext. Any ideas? Thanks a lot!
package opl;
import java.io.ByteArrayOutputStream;
import java.io.PrintStream;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
import org.apache.spark.SparkContext;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructType;
import org.apache.zeppelin.interpreter.Interpreter;
import org.apache.zeppelin.interpreter.InterpreterContext;
import org.apache.zeppelin.interpreter.InterpreterPropertyBuilder;
import org.apache.zeppelin.interpreter.InterpreterResult;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class OplInterpreter2 extends Interpreter {
static {
Interpreter.register("opl","opl",OplInterpreter2.class.getName(),
new InterpreterPropertyBuilder()
.add("spark.master", "local[4]", "spark.master")
.add("spark.app.name", "Opl Interpreter", "spark.app.name")
.add("spark.serializer", "org.apache.spark.serializer.KryoSerializer", "spark.serializer")
.build());
}
private Logger logger = LoggerFactory.getLogger(OplInterpreter2.class);
private void log(Object o) {
if (logger != null)
logger.warn("OplInterpreter2 "+o);
}
public OplInterpreter2(Properties properties) {
super(properties);
log("CONSTRUCTOR");
}
@Override
public void open() {
log("open()");
}
@Override
public void cancel(InterpreterContext arg0) {
log("cancel()");
}
@Override
public void close() {
log("close()");
}
@Override
public List<String> completion(String arg0, int arg1) {
log("completion()");
return new ArrayList<String>();
}
@Override
public FormType getFormType() {
log("getFormType()");
return FormType.SIMPLE;
}
@Override
public int getProgress(InterpreterContext arg0) {
log("getProgress()");
return 100;
}
@Override
public InterpreterResult interpret(String string, InterpreterContext context) {
log("interpret() "+string);
PrintStream oldSys = System.out;
try {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
PrintStream ps = new PrintStream(baos);
System.setOut(ps);
execute(string);
System.out.flush();
System.setOut(oldSys);
return new InterpreterResult(
InterpreterResult.Code.SUCCESS,
InterpreterResult.Type.TEXT,
baos.toString());
} catch (Exception ex) {
System.out.flush();
System.setOut(oldSys);
return new InterpreterResult(
InterpreterResult.Code.ERROR,
InterpreterResult.Type.TEXT,
ex.toString());
}
}
private void execute(String code) throws Exception {
SparkContext sc = SparkContext.getOrCreate();
SQLContext sqlc = SQLContext.getOrCreate(sc);
StructType structType = new StructType().add("value",DataTypes.IntegerType);
ArrayList<Row> list = new ArrayList<Row>();
for (String s : code.trim().split("\\s+")) {
int value = Integer.parseInt(s);
System.out.println(value);
list.add(RowFactory.create(value));
}
DataFrame df = sqlc.createDataFrame(list,structType);
df.registerTempTable("result");
}
}
Finally I found a solution although I don't think this is a very nice one. In the code below, I am using a function getSparkInterpreter() that I found in org.apache.zeppelin.spark.PySparkInterpreter.java.
This requires that I put my packaged code (jar) into the Spark interpreter folder, instead of its own interpreter folder, which I believe should be the preferred way (according to https://zeppelin.incubator.apache.org/docs/latest/development/writingzeppelininterpreter.html). Also, my interpreter does not show up in Zeppelin's interpreter configuration page as an interpreter of its own. But it can be used in a Zeppelin paragraph nevertheless.
And: In the code I can create a DataFrame and this is also consumable outside my paragraph -- which is what I wanted to achieve.
package opl;
import java.io.ByteArrayOutputStream;
import java.io.PrintStream;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructType;
import org.apache.zeppelin.interpreter.Interpreter;
import org.apache.zeppelin.interpreter.InterpreterContext;
import org.apache.zeppelin.interpreter.InterpreterPropertyBuilder;
import org.apache.zeppelin.interpreter.InterpreterResult;
import org.apache.zeppelin.interpreter.LazyOpenInterpreter;
import org.apache.zeppelin.interpreter.WrappedInterpreter;
import org.apache.zeppelin.spark.SparkInterpreter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class OplInterpreter2 extends Interpreter {
static {
Interpreter.register(
"opl",
"spark",//"opl",
OplInterpreter2.class.getName(),
new InterpreterPropertyBuilder()
.add("sth", "defaultSth", "some thing")
.build());
}
private Logger logger = LoggerFactory.getLogger(OplInterpreter2.class);
private void log(Object o) {
if (logger != null)
logger.warn("OplInterpreter2 "+o);
}
public OplInterpreter2(Properties properties) {
super(properties);
log("CONSTRUCTOR");
}
@Override
public void open() {
log("open()");
}
@Override
public void cancel(InterpreterContext arg0) {
log("cancel()");
}
@Override
public void close() {
log("close()");
}
@Override
public List<String> completion(String arg0, int arg1) {
log("completion()");
return new ArrayList<String>();
}
@Override
public FormType getFormType() {
log("getFormType()");
return FormType.SIMPLE;
}
@Override
public int getProgress(InterpreterContext arg0) {
log("getProgress()");
return 100;
}
@Override
public InterpreterResult interpret(String string, InterpreterContext context) {
log("interpret() "+string);
PrintStream oldSys = System.out;
try {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
PrintStream ps = new PrintStream(baos);
System.setOut(ps);
execute(string);
System.out.flush();
System.setOut(oldSys);
return new InterpreterResult(
InterpreterResult.Code.SUCCESS,
InterpreterResult.Type.TEXT,
baos.toString());
} catch (Exception ex) {
System.out.flush();
System.setOut(oldSys);
return new InterpreterResult(
InterpreterResult.Code.ERROR,
InterpreterResult.Type.TEXT,
ex.toString());
}
}
private void execute(String code) throws Exception {
SparkInterpreter sintp = getSparkInterpreter();
SQLContext sqlc = sintp.getSQLContext();
StructType structType = new StructType().add("value",DataTypes.IntegerType);
ArrayList<Row> list = new ArrayList<Row>();
for (String s : code.trim().split("\\s+")) {
int value = Integer.parseInt(s);
System.out.println(value);
list.add(RowFactory.create(value));
}
DataFrame df = sqlc.createDataFrame(list,structType);
df.registerTempTable("result");
}
private SparkInterpreter getSparkInterpreter() {
LazyOpenInterpreter lazy = null;
SparkInterpreter spark = null;
Interpreter p = getInterpreterInTheSameSessionByClassName(SparkInterpreter.class.getName());
while (p instanceof WrappedInterpreter) {
if (p instanceof LazyOpenInterpreter) {
lazy = (LazyOpenInterpreter) p;
}
p = ((WrappedInterpreter) p).getInnerInterpreter();
}
spark = (SparkInterpreter) p;
if (lazy != null) {
lazy.open();
}
return spark;
}
}
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With