Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Caching in storm bolts

I am trying to cache some data in storm bolt, but not sure if this is right way to do it or not. In below class employee id and employe name are cached to a hash map. For this a database call has been made to Employee table to select all employees and populate a hash map in prepare method (is this right place to initialize map?).

After some logging it turns out (while running storm topology), topology is making multiple database connections and initializing map multiple times. Ofcourse I want to avoid this, that is why I want to cache the result so that it does not go to database everytime. Please help?

public class TestBolt extends BaseRichBolt {
    private static final long serialVersionUID = 2946379346389650348L;
    private OutputCollector collector;
    private Map<String, String> employeeIdToNameMap;
    private static final Logger LOG = Logger.getLogger(TestBolt.class);

    @Override
    public void execute(Tuple tuple) {    
        String employeeId = tuple.getStringByField("employeeId");
        String employeeName = employeeIdToNameMap.get(employeeId);

        collector.emit(tuple, new Values(employeeId, employeeName));
        collector.ack(tuple);
    }

    @Override
    public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
        // TODO Auto-generated method stub
        this.collector = collector;
        try {
            employeeIdToNameMap = createEmployeIdToNameMap();
        } catch (SQLException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields(/*some fields*/));

    }

    private Map<String, String> createEmployeIdToNameMap() throws SQLException {
        final Map<String, String> employeeIdToNameMap = new HashMap<>();
        final DatabaseManager dbm = new PostgresManager();
        final String query = "select id, name from employee;";
        final Connection conn = dbm.createDefaultConnection();
        final ResultSet result = dbm.executeSelectQuery(conn, query);
        while(result.next()) {
            String employeId = result.getString("id");
            String name = result.getString("name");
            employeeIdToNameMap.put(employeId, name);
        }
        conn.close();
        return employeeIdToNameMap;
    }       
}

SOLUTION I created synchronized map and its working fine for me

private static Map<String, String> employeeIdToNameMap = Collections
            .synchronizedMap(new HashMap<String, String>());
like image 269
big Avatar asked Nov 08 '22 20:11

big


1 Answers

Since you have multiple bolt tasks, you can mark employeeIdToNameMap static and volatile. Initialize the map in prepare like this -

try {
 synchronized(TestBolt.class) {
    if (null == employeeIdToNameMap) {
    employeeIdToNameMap = createEmployeIdToNameMap();
    }
  }  
} catch (SQLException e) {
 ...
}
like image 94
hobgoblin Avatar answered Nov 14 '22 22:11

hobgoblin