Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Zeppelin: How to create DataFrame from within custom interpreter?

I am developing a custom interpreter for a domain specific language. Based on the example given in the Apache Zeppelin documentation (https://zeppelin.incubator.apache.org/docs/latest/development/writingzeppelininterpreter.html), the interpreter works pretty well. Now I want to store some results in a new DataFrame.

I found code to create DataFrames (http://spark.apache.org/docs/latest/sql-programming-guide.html), but I cant use this in my interpreter because I basically dont find a way to access a valid runtime SparkContext (often called "sc") from within my custom interpreter.

I tried (static) SparkContext.getOrCreate() but this even led to a ClassNotFoundException. Then I added the whole zeppelin-spark-dependencies...jar to my interpreter folder, which solved the class loading issue but now I am getting a SparkException ("master url must be set...").

Any idea how I could get access to my Notebook's SparkContext from within the custom interpreter? Thanks a lot!

UPDATE

Thanks to Kangrok Lee's comment below, my code now looks as follows: see below. It runs and seems to create a DataFrame (at least it doesnt throw any Exception any more). But I can not consume the created DataFrame in a subsequent SQL paragraph (the first paragraph uses my "%opl" interpreter, as given below, that should create the "result" DataFrame):

%opl
1 2 3
> 1
> 2
> 3

%sql
select * from result
> Table not found: result; line 1 pos 14

So probably there is still something wrong with my way of dealing with the SparkContext. Any ideas? Thanks a lot!

package opl;

import java.io.ByteArrayOutputStream;
import java.io.PrintStream;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;

import org.apache.spark.SparkContext;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructType;
import org.apache.zeppelin.interpreter.Interpreter;
import org.apache.zeppelin.interpreter.InterpreterContext;
import org.apache.zeppelin.interpreter.InterpreterPropertyBuilder;
import org.apache.zeppelin.interpreter.InterpreterResult;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class OplInterpreter2 extends Interpreter {

static {
    Interpreter.register("opl","opl",OplInterpreter2.class.getName(),
        new InterpreterPropertyBuilder()
        .add("spark.master", "local[4]", "spark.master")
        .add("spark.app.name", "Opl Interpreter", "spark.app.name")
        .add("spark.serializer", "org.apache.spark.serializer.KryoSerializer", "spark.serializer")
        .build());
}

private Logger logger = LoggerFactory.getLogger(OplInterpreter2.class);

private void log(Object o) {
    if (logger != null)
        logger.warn("OplInterpreter2 "+o);
}

public OplInterpreter2(Properties properties) {
    super(properties);
    log("CONSTRUCTOR");
}

@Override
public void open() {
    log("open()");
}

@Override
public void cancel(InterpreterContext arg0) {
    log("cancel()");
}

@Override
public void close() {
    log("close()");
}

@Override
public List<String> completion(String arg0, int arg1) {
    log("completion()");
    return new ArrayList<String>();
}

@Override
public FormType getFormType() {
    log("getFormType()");
    return FormType.SIMPLE;
}

@Override
public int getProgress(InterpreterContext arg0) {
    log("getProgress()");
    return 100;
}

@Override
public InterpreterResult interpret(String string, InterpreterContext context) {
    log("interpret() "+string);
    PrintStream oldSys = System.out;
    try {
        ByteArrayOutputStream baos = new ByteArrayOutputStream();
        PrintStream ps = new PrintStream(baos);
        System.setOut(ps);
        execute(string);
        System.out.flush();
        System.setOut(oldSys);
        return new InterpreterResult(
                InterpreterResult.Code.SUCCESS,
                InterpreterResult.Type.TEXT,
                baos.toString());
    } catch (Exception ex) {
        System.out.flush();
        System.setOut(oldSys);
        return new InterpreterResult(
                InterpreterResult.Code.ERROR,
                InterpreterResult.Type.TEXT,
                ex.toString());
    }
}

private void execute(String code) throws Exception {
    SparkContext sc = SparkContext.getOrCreate();
    SQLContext sqlc = SQLContext.getOrCreate(sc);
    StructType structType = new StructType().add("value",DataTypes.IntegerType);
    ArrayList<Row> list = new ArrayList<Row>();
    for (String s : code.trim().split("\\s+")) {
        int value = Integer.parseInt(s);
        System.out.println(value);
        list.add(RowFactory.create(value));
    }
    DataFrame df = sqlc.createDataFrame(list,structType);
    df.registerTempTable("result");
}
}
like image 278
hansschl Avatar asked Feb 12 '26 09:02

hansschl


1 Answers

Finally I found a solution although I don't think this is a very nice one. In the code below, I am using a function getSparkInterpreter() that I found in org.apache.zeppelin.spark.PySparkInterpreter.java.

This requires that I put my packaged code (jar) into the Spark interpreter folder, instead of its own interpreter folder, which I believe should be the preferred way (according to https://zeppelin.incubator.apache.org/docs/latest/development/writingzeppelininterpreter.html). Also, my interpreter does not show up in Zeppelin's interpreter configuration page as an interpreter of its own. But it can be used in a Zeppelin paragraph nevertheless.

And: In the code I can create a DataFrame and this is also consumable outside my paragraph -- which is what I wanted to achieve.

package opl;

import java.io.ByteArrayOutputStream;
import java.io.PrintStream;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;

import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructType;
import org.apache.zeppelin.interpreter.Interpreter;
import org.apache.zeppelin.interpreter.InterpreterContext;
import org.apache.zeppelin.interpreter.InterpreterPropertyBuilder;
import org.apache.zeppelin.interpreter.InterpreterResult;
import org.apache.zeppelin.interpreter.LazyOpenInterpreter;
import org.apache.zeppelin.interpreter.WrappedInterpreter;
import org.apache.zeppelin.spark.SparkInterpreter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class OplInterpreter2 extends Interpreter {

    static {
        Interpreter.register(
                "opl", 
                "spark",//"opl", 
                OplInterpreter2.class.getName(),
                new InterpreterPropertyBuilder()
                    .add("sth", "defaultSth", "some thing")
                    .build());
    }

    private Logger logger = LoggerFactory.getLogger(OplInterpreter2.class);

    private void log(Object o) {
        if (logger != null)
            logger.warn("OplInterpreter2 "+o);
    }

    public OplInterpreter2(Properties properties) {
        super(properties);
        log("CONSTRUCTOR");
    }

    @Override
    public void open() {
        log("open()");
    }

    @Override
    public void cancel(InterpreterContext arg0) {
        log("cancel()");
    }

    @Override
    public void close() {
        log("close()");
    }

    @Override
    public List<String> completion(String arg0, int arg1) {
        log("completion()");
        return new ArrayList<String>();
    }

    @Override
    public FormType getFormType() {
        log("getFormType()");
        return FormType.SIMPLE;
    }

    @Override
    public int getProgress(InterpreterContext arg0) {
        log("getProgress()");
        return 100;
    }

    @Override
    public InterpreterResult interpret(String string, InterpreterContext context) {
        log("interpret() "+string);
        PrintStream oldSys = System.out;
        try {
            ByteArrayOutputStream baos = new ByteArrayOutputStream();
            PrintStream ps = new PrintStream(baos);
            System.setOut(ps);
            execute(string);
            System.out.flush();
            System.setOut(oldSys);
            return new InterpreterResult(
                    InterpreterResult.Code.SUCCESS,
                    InterpreterResult.Type.TEXT,
                    baos.toString());
        } catch (Exception ex) {
            System.out.flush();
            System.setOut(oldSys);
            return new InterpreterResult(
                    InterpreterResult.Code.ERROR,
                    InterpreterResult.Type.TEXT,
                    ex.toString());
        }
    }

    private void execute(String code) throws Exception {
        SparkInterpreter sintp = getSparkInterpreter();
        SQLContext sqlc = sintp.getSQLContext();
        StructType structType = new StructType().add("value",DataTypes.IntegerType);
        ArrayList<Row> list = new ArrayList<Row>();
        for (String s : code.trim().split("\\s+")) {
            int value = Integer.parseInt(s);
            System.out.println(value);
            list.add(RowFactory.create(value));
        }
        DataFrame df = sqlc.createDataFrame(list,structType);
        df.registerTempTable("result");
    }

    private SparkInterpreter getSparkInterpreter() {
        LazyOpenInterpreter lazy = null;
        SparkInterpreter spark = null;
        Interpreter p = getInterpreterInTheSameSessionByClassName(SparkInterpreter.class.getName());
        while (p instanceof WrappedInterpreter) {
            if (p instanceof LazyOpenInterpreter) {
                lazy = (LazyOpenInterpreter) p;
            }
            p = ((WrappedInterpreter) p).getInnerInterpreter();
        }
        spark = (SparkInterpreter) p;
        if (lazy != null) {
            lazy.open();
        }
        return spark;
    }
}
like image 102
hansschl Avatar answered Feb 17 '26 09:02

hansschl



Donate For Us

If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!