Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to retrieve the new rows of a table every minute

Tags:

mysql

I have a table, to which rows are only appended (not updated or deleted) with transactions (I'll explain why this is important), and I need to fetch the new, previously unfetched, rows of this table, every minute with a cron.

How am I going to do this? In any programming language (I use Perl but that's irrelevant.)

I list the ways I thought of how to solve this problem, and ask you to show me the correct one (there HAS to be one...)

The first way that popped to my head was to save (in a file) the largest auto_incrementing id of the rows fetched, so in the next minute I can fetch with: WHERE id > $last_id. But that can miss rows. Because new rows are inserted in transactions, it's possible that the transaction that saves the row with id = 5 commits before the transaction that saves the row with id = 4. It's therefore possible that the cron script retrieves row 5 but not row 4, and when row 4 gets committed one split second later, it will never gets fetched (because 4 is not > than 5 which is the $last_id).

Then I thought I could make the cron job fetch all rows that have a date field in the last TWO minutes, check which of these rows have been retrieved again in the previous run of the cron job (to do this I would need to save somewhere which row ids were retrieved), compare, and process only the new ones. Unfortunately this is complicated, and also doesn't solve the problem that will occur if a certain inserting transaction takes TWO AND A HALF minutes to commit for some weird database reason, which will cause the date to be too old for the next iteration of the cron job to fetch.

Then I thought of installing a message queue (MQ) like RabbitMQ or any other. The same process that does the inserting transaction, would notify RabbitMQ of the new row, and RabbitMQ would then notify an always-running process that processes new rows. So instead of getting a batch of rows inserted in the last minute, that process would get the new rows one-by-one as they are written. This sounds good, but has too many points of failure - RabbitMQ might be down for a second (in a restart for example) and in that case the insert transaction will have committed without the receiving process having ever received the new row. So the new row will be missed. Not good.

I just thought of one more solution: the receiving processes (there's 30 of them, doing the exact same job on exactly the same data, so the same rows get processed 30 times, once by each receiving process) could write in another table that they have processed row X when they process it, then when time comes they can ask for all rows in the main table that don't exist in the "have_processed" table with an OUTER JOIN query. But I believe (correct me if I'm wrong) that such a query will consume a lot of CPU and HD on the DB server, since it will have to compare the entire list of ids of the two tables to find new entries (and the table is huge and getting bigger each minute). It would have been fast if the receiving process was only one - then I would have been able to add a indexed field named "have_read" in the main table that would make looking for new rows extremely fast and easy on the DB server.

What is the right way to do it? What do you suggest? The question is simple, but a solution seems hard (for me) to find.

Thank you.

like image 489
alexk Avatar asked Oct 21 '22 01:10

alexk


1 Answers

I believe the 'best' way to do this would be to use one process that checks for new rows and delegates them to the thirty consumer processes. Then your problem becomes simpler to manage from a database perspective and a delegating process is not that difficult to write.

If you are stuck with communicating to the thirty consumer processes through the database, the best option I could come up with is to create a trigger on the table, which copies each row to a secondary table. Copy each row to the secondary table thirty times (once for each consumer process). Add a column to this secondary table indicating the 'target' consumer process (for example a number from 1 to 30). Each consumer process checks for new rows with its unique number and then deletes those. If you are worried that some rows are deleted before they are processed (because the consumer crashes in the middle of processing), you can fetch, process and delete them one by one.

Since the secondary table is kept small by continuously deleting processed rows, INSERTs, SELECTs and DELETEs would be very fast. All operations on this secondary table would also be indexed by the primary key (if you place the consumer ID as first field of the primary key).

In MySQL statements, this would look like this:

CREATE TABLE `consumer`(
    `id` INTEGER NOT NULL,
    PRIMARY KEY (`id`)
);
INSERT INTO `consumer`(`id`) VALUES
(1),
(2),
(3)
-- all the way to 30
;

CREATE TABLE `secondaryTable` LIKE `primaryTable`;
ALTER TABLE `secondaryTable` ADD COLUMN `targetConsumerId` INTEGER NOT NULL FIRST;
-- alter the secondary table further to allow several rows with the same primary key (by adding targetConsumerId to the primary key)

DELIMTER //
CREATE TRIGGER `mark_to_process` AFTER INSERT ON `primaryTable`
FOR EACH ROW
BEGIN
    -- by doing a cross join with the consumer table, this automatically inserts the correct amount of rows and adding or deleting consumers is just a matter of adding or deleting rows in the consumer table
    INSERT INTO `secondaryTable`(`targetConsumerId`, `primaryTableId`, `primaryTableField1`, `primaryTableField2`) SELECT `consumer`.`id`, `primaryTable`.`id`, `primaryTable`.`field1`, `primaryTable`.`field2` FROM `consumer`, `primaryTable` WHERE `primaryTable`.`id` = NEW.`id`;
END//
DELIMITER ;

-- loop over the following statements in each consumer until the SELECT doesn't return any more rows
START TRANSACTION;
SELECT * FROM secondaryTable WHERE targetConsumerId = MY_UNIQUE_CONSUMER_ID LIMIT 1;
-- here, do the processing (so before the COMMIT so that crashes won't let you miss rows)
DELETE FROM secondaryTable WHERE targetConsumerId = MY_UNIQUE_CONSUMER_ID AND primaryTableId = PRIMARY_TABLE_ID_OF_ROW_JUST_SELECTED;
COMMIT;
like image 110
Tomas Creemers Avatar answered Oct 23 '22 20:10

Tomas Creemers