Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to use apache storm tuple

I just began with Apache Storm. I read the tutorial and had a look into examples My problem is that all example work with very simple tuples (often one filed with a string). The tuples are created inline (using new Values(...)). In my case i have tuples with many fields (5..100). So my question is how to implement such tuple with name and type (all primitive) for each field?

Are there any examples? (i think directly implementing "Tuple" isn't a good idea)

thanks

like image 961
dermoritz Avatar asked Aug 17 '15 15:08

dermoritz


People also ask

What is tuple in Storm?

The tuple is the main data structure in Storm. A tuple is a named list of values, where each value can be any type. Tuples are dynamically typed – the types of the fields do not need to be declared. Tuples have helper methods like getInteger and getString to get field values without having to cast the result.

How do I use Apache Storm?

BackType is a social analytics company. Later, Storm was acquired and open-sourced by Twitter. In a short time, Apache Storm became a standard for distributed real-time processing system that allows you to process large amount of data, similar to Hadoop. Apache Storm is written in Java and Clojure.

Which of the following method is used to declare the schema of tuple in Apache Storm?

declarer − It is used to declare output stream ids, output fields, etc. This method is used to specify the output schema of the tuple.

How do you run a Storm topology locally?

To install Storm locally, download a release from here and unzip it somewhere on your computer. Then add the unpacked bin/ directory onto your PATH and make sure the bin/storm script is executable. Installing a Storm release locally is only for interacting with remote clusters.


1 Answers

An alternative to creating the tuple with all of the fields as a value is to just create a bean and pass that inside the tuple.

Given the following class:

public class DataBean implements Serializable {
    private static final long serialVersionUID = 1L;

    // add more properties as necessary
    int id;
    String word;

    public DataBean(int id, String word) {
        setId(id);
        setWord(word);
    }
    public int getId() {
        return id;
    }
    public void setId(int id) {
        this.id = id;
    }
    public String getWord() {
        return word;
    }
    public void setWord(String word) {
        this.word = word;
    }
}

Create and emit the DataBean in one bolt:

collector.emit(new Values(bean));

Get the DataBean in the destination bolt:

@Override
public void execute(Tuple tuple, BasicOutputCollector collector) {
    try {
        DataBean bean = (DataBean)tuple.getValue(0);
        // do your bolt processing with the bean
    } catch (Exception e) {
        LOG.error("WordCountBolt error", e);
        collector.reportError(e);
    }       
}

Don't forget to make your bean serializable and register when you set up your topology:

Config stormConfig = new Config();
stormConfig.registerSerialization(DataBean.class);
// more stuff
StormSubmitter.submitTopology("MyTopologyName", stormConfig, builder.createTopology());

Disclaimer: Beans will work fine for shuffle grouping. If you need to do a fieldsGrouping, you should still use a primitive. For example, in the Word Count scenario, you need go group by word so you might emit:

collector.emit(new Values(word, bean));
like image 132
Kit Menke Avatar answered Oct 05 '22 09:10

Kit Menke