The application I'm working on is a Java-based ETL process that loads data into multiple tables. The DBMS is Infobright (a MYSQL-based DBMS geared for data warehousing).
The data loading should be done atomically; however, for performance reasons, I want to load data into multiple tables at the same time (using a LOAD DATA INFILE
command). This means I need to open multiple connections.
Is there any solution that allows me to do the loads atomically and in parallel? (I'm guessing the answer might depend on the engine for the tables I load into; most of them are Brighthouse, which allows Transactions, but no XA and no Savepoints).
To further clarify, I want to avoid a situation where let's say:
In this situation, I can't rollback the first 4 loads, because they are already commited.
As I've promised I've hacked up a complete example. I've used MySQL and created three tables like the following:
CREATE TABLE `test{1,2,3}` (
`id` int(11) NOT NULL AUTO_INCREMENT,
`data` varchar(255) NOT NULL UNIQUE,
PRIMARY KEY (`id`)
);
test2
contains a single row initially.
INSERT INTO `test2` (`data`) VALUES ('a');
(I've posted the full code to http://pastebin.com.)
The following example does several things.
threads
to 3
which determines how many jobs are going to be run in parallel.threads
number of connections.a
for every table).threads
number of jobs to be run and loads them with data.threads
number of threads and waits for their completion (successful or not).(Note, that I've used Java 7's automatic resource management feature in SQLTask.call()
.)
public static void main(String[] args) throws SQLException, InterruptedException {
int threads = 3;
List<Connection> connections = getConnections(threads);
Map<String, String> tableData = getTableData(threads);
List<SQLTask> tasks = getTasks(threads, connections);
setData(tableData, tasks);
try {
runTasks(tasks);
commitConnections(connections);
} catch (ExecutionException ex) {
rollbackConnections(connections);
} finally {
closeConnections(connections);
}
}
private static Map<String, String> getTableData(int threads) {
Map<String, String> tableData = new HashMap<>();
for (int i = 1; i <= threads; i++)
tableData.put("test" + i, "a");
return tableData;
}
private static final class SQLTask implements Callable<Void> {
private final Connection connection;
private String data;
private String table;
public SQLTask(Connection connection) {
this.connection = connection;
}
public void setTable(String table) {
this.table = table;
}
public void setData(String data) {
this.data = data;
}
@Override
public Void call() throws SQLException {
try (Statement statement = connection.createStatement()) {
statement.executeUpdate(String.format(
"INSERT INTO `%s` (data) VALUES ('%s');", table, data));
}
return null;
}
}
private static List<SQLTask> getTasks(int threads, List<Connection> connections) {
List<SQLTask> tasks = new ArrayList<>();
for (int i = 0; i < threads; i++)
tasks.add(new SQLTask(connections.get(i)));
return tasks;
}
private static void setData(Map<String, String> tableData, List<SQLTask> tasks) {
Iterator<Entry<String, String>> i = tableData.entrySet().iterator();
Iterator<SQLTask> j = tasks.iterator();
while (i.hasNext()) {
Entry<String, String> entry = i.next();
SQLTask task = j.next();
task.setTable(entry.getKey());
task.setData(entry.getValue());
}
}
private static void runTasks(List<SQLTask> tasks)
throws ExecutionException, InterruptedException {
ExecutorService executorService = Executors.newFixedThreadPool(tasks.size());
List<Future<Void>> futures = executorService.invokeAll(tasks);
executorService.shutdown();
for (Future<Void> future : futures)
future.get();
}
Given the default data returned by getTableData(...)
test1 -> `a`
test2 -> `a`
test3 -> `a`
and the fact that test2
already contains a
(and the data
column is unique) the second job will fail and throw an exception, thus every connection will be rolled back.
If instead of a
s you return b
s, then the connections will be committed safely.
This can be done similarly with LOAD DATA
.
After OP's response on my answer I realized that what she/he wants to do isn't possible to do in a simple and clear manner.
Basically the problem is that after a successful commit the data that was committed can't be rolled-back, because the operation is atomic. Given multiple commits are needed in the case given, rolling-back everything isn't possible unless one tracks all data (in all of the transactions) and if somethings happens deletes everything that was successfully committed.
There is a nice answer relating to the issue of commits and rollbacks.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With