Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Getting data from a very large table

I have a very large table in a MySQL database, 200 million records in table Users.

I make query, using JDBC:

public List<Pair<Long, String>> getUsersAll() throws SQLException {
        Connection cnn = null;
        CallableStatement cs = null;
        ResultSet rs = null;
        final List<Pair<Long, String>> res = new ArrayList<>();
        try {
            cnn = dataSource.getConnection();
            cs = cnn.prepareCall("select UserPropertyKindId, login from TEST.users;");
            rs = cs.executeQuery();
            while (rs.next()) {
                res.add(new ImmutablePair<>(rs.getLong(1), rs.getString(2)));
            }
            return res;
        } catch (SQLException ex) {
            throw ex;
        } finally {
            DbUtils.closeQuietly(cnn, cs, rs);
        }
    }

Next, I process the result:

List<Pair<Long, String>> users= dao.getUsersAll();
            if (CollectionUtils.isNotEmpty(users)) {
                for (List<Pair<Long, String>> partition : Lists.partition(users, 2000)) {
                    InconsistsUsers.InconsistsUsersCallable callable = new InconsistsUsers.InconsistsUsersCallable (new ArrayList<>(partition));
                    processExecutor.submit(callable);
                }
            }

But since the table is very large and it is all unloaded into memory, my application crashes with an error:

com.mysql.jdbc.exceptions.jdbc4.CommunicationsException: Communications link failure

The last packet successfully received from the server was 105,619 milliseconds ago.

How can I receive data in parts and process them in order of priority, so as not to upload all the result into memory at once? It may be possible to create a cursor and upload data to a non-blocking queue and process it as the data arrives. How can this be done?

UPDATE:

My DB structure: https://www.db-fiddle.com/f/v377ZHkG1YZcdQsETtPm9L/3

Current algorithm:

  1. Get all data users from Users table: select UserPropertyKindId, login from Users;

  2. This result is split for 2000 pairs and submit to ThreadPoolTaskExecutor:

    List<Pair<Long, String>> users= dao.getUsersAll();
    
    if (CollectionUtils.isNotEmpty(users)) {
        for (List<Pair<Long, String>> partition : Lists.partition(users, 2000)) {
            InconsistsUsers.InconsistsUsersCallable callable = new InconsistsUsers.InconsistsUsersCallable(new ArrayList<>(partition));
            processExecutor.submit(callable));
        }
    }
    
  3. In callable for each pair make two queries:

    First query:

    select distinct entityId 
    from UserPropertyValue 
    where userPropertyKindId= ? and value = ? -- value its login from Users table
    

    Second query:

    select UserIds 
    from UserPropertyIndex 
    where UserPropertyKindId = ? and Value = ?
    

Two cases are possible:

  1. Result of first query is empty: logging, send notification, continue to next pair
  2. Result of second query is not equal to the result of the first query (varbinary data decoded. There are stored encoded entityId's). Then logging, send notification, go to next pair.

I can't change the structure of the base. All the manipulations I have to do on the Java code side.

like image 747
All_Safe Avatar asked Dec 10 '22 06:12

All_Safe


2 Answers

You should handle this on several levels:

JDBC driver fetch size

JDBC has a Statement.setFetchSize() method, which indicates how many rows are going to be pre-fetched by the JDBC driver prior to you getting them from JDBC. Note that MySQL JDBC drivers don't really implement this correctly, but you can set setFetchSize(Integer.MIN_VALUE) to prevent it from fetching all rows in one go. See also this answer here.

Note, you may also activate the feature on your connection using useCursorFetch

Your own logic

You should not put the entire list of users in memory. What you're doing right now is collecting all the rows from JDBC and then partitioning your list later on using Lists.partition(users, 2000). This is going in the right direction, but you're not doing it right yet. Instead, do:

try (ResultSet rs = cs.executeQuery()) {
    while (rs.next()) {
        res.add(new ImmutablePair<>(rs.getLong(1), rs.getString(2)));
    }

    // Process a batch of rows:
    if (res.size() >= 2000) {
        process(res);
        res.clear();
    }
}

// Process the remaining rows
process(res);

The important message here is to not load all rows in memory and then process them in batches, but to process them directly while streaming rows from JDBC.

like image 88
Lukas Eder Avatar answered Dec 27 '22 03:12

Lukas Eder


Instead of Lists.partition(users, 2000) in Java side, you should limit your mysql resultset to 2000 per request.

select UserPropertyKindId, login from TEST.users limit <offset>, 2000;

Update: as mentioned by Raymond Nijland in the comment below, if the offset is too large, the query could be significantly slowed down.

One workaround could be instead of using offset, introduce a where statement such as where id > last_user_id.

Since @All_safe commented below, an autoincrement id doesn't exist, another workaround for the large limit offset is that: fetch the primary key only in subquery and then join back to main table. This will force mysql to not do early rows lookup, which is the main problem of large offset limit.

But your original query only fetch primary key columns, I don't think early rows lookups applies.

like image 32
Jacob Avatar answered Dec 27 '22 01:12

Jacob