I am trying to cache some data in storm bolt, but not sure if this is right way to do it or not. In below class employee id and employe name are cached to a hash map. For this a database call has been made to Employee table to select all employees and populate a hash map in prepare method (is this right place to initialize map?).
After some logging it turns out (while running storm topology), topology is making multiple database connections and initializing map multiple times. Ofcourse I want to avoid this, that is why I want to cache the result so that it does not go to database everytime. Please help?
public class TestBolt extends BaseRichBolt {
private static final long serialVersionUID = 2946379346389650348L;
private OutputCollector collector;
private Map<String, String> employeeIdToNameMap;
private static final Logger LOG = Logger.getLogger(TestBolt.class);
@Override
public void execute(Tuple tuple) {
String employeeId = tuple.getStringByField("employeeId");
String employeeName = employeeIdToNameMap.get(employeeId);
collector.emit(tuple, new Values(employeeId, employeeName));
collector.ack(tuple);
}
@Override
public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
// TODO Auto-generated method stub
this.collector = collector;
try {
employeeIdToNameMap = createEmployeIdToNameMap();
} catch (SQLException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields(/*some fields*/));
}
private Map<String, String> createEmployeIdToNameMap() throws SQLException {
final Map<String, String> employeeIdToNameMap = new HashMap<>();
final DatabaseManager dbm = new PostgresManager();
final String query = "select id, name from employee;";
final Connection conn = dbm.createDefaultConnection();
final ResultSet result = dbm.executeSelectQuery(conn, query);
while(result.next()) {
String employeId = result.getString("id");
String name = result.getString("name");
employeeIdToNameMap.put(employeId, name);
}
conn.close();
return employeeIdToNameMap;
}
}
SOLUTION I created synchronized map and its working fine for me
private static Map<String, String> employeeIdToNameMap = Collections
.synchronizedMap(new HashMap<String, String>());
Since you have multiple bolt tasks, you can mark employeeIdToNameMap static and volatile. Initialize the map in prepare like this -
try {
synchronized(TestBolt.class) {
if (null == employeeIdToNameMap) {
employeeIdToNameMap = createEmployeIdToNameMap();
}
}
} catch (SQLException e) {
...
}
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With