I'm facing a very strange behaviour on my Oracle SQL Server (exactly: Oracle Database 11g Enterprise Edition Release 11.2.0.4.0 - 64bit Production) when using the Oracle Advanced Queueing methods.
The error is that I enqueue X messages, but the dequeue_array returns X+1 messages , with the first message beeing duplicated (as seen by the MessageId).
I was able to write some simple PoC to reproduce the error. This Code is pretty straight-forward, the enqueue / dequeue stuff is standard Oracle AQ. The code does the following steps for two times (test runs):
When running the POC on a fresh connection, the first run succeeds without an error but every following run fails. After that, when using the same connection, everytime you execute the script it will fail in both test runs.
I can neither explain this behaviour, nor find an error in my code. Please take a look at it, it should be directly executable in your favorite sql client (tested with PL/SQL Developer).
If you need any further information or have problems getting the PoC to work, just ask, I'll check this thread regularly. I've tried to make the PoC as readable as possible, including a verbose output about what is happening.
declare
C_QueueName constant varchar2(32767) := 'TEST_QUEUE';
C_QueueTable constant varchar2(32767) := 'TEST_Q_TABLE';
C_MsgCount constant pls_integer := 1;
C_TestRuns constant pls_integer := 2;
C_DequeueArraySize constant pls_integer := 10;
/*
* Create the queue and the queue table used for theses tests
*/
procedure CreateQueueIfMissing is
L_Present pls_integer;
begin
dbms_output.put_line('START CreateQueueIfMissing');
execute immediate 'select count(*) from USER_OBJECTS where OBJECT_NAME = ''' || C_QueueName || ''' and OBJECT_TYPE = ''QUEUE''' into L_Present;
if L_Present = 1 then
dbms_output.put_line('Skipping queue creation, already present.');
dbms_output.put_line('END CreateQueueIfMissing');
return;
end if;
dbms_output.put_line(' Creating queue table ' || C_QueueTable);
DBMS_AQADM.CREATE_QUEUE_TABLE(queue_table => C_QueueTable
,storage_clause => 'LOGGING NOCACHE NOPARALLEL MONITORING'
,sort_list => 'priority,enq_time'
,multiple_consumers => false
,queue_payload_type => 'SYS.AQ$_JMS_BYTES_MESSAGE'
,comment => 'Queue for messages');
dbms_output.put_line(' Creating queue ' || C_QueueName);
DBMS_AQADM.CREATE_QUEUE(queue_name => C_QueueName
,queue_table => C_QueueTable
,max_retries => 8640
,retry_delay => 30
,comment => 'Queue for messages');
dbms_output.put_line(' Starting queue ' || C_QueueName);
DBMS_AQADM.START_QUEUE(queue_name => C_QueueName);
dbms_output.put_line('END CreateQueueIfMissing');
end CreateQueueIfMissing;
-- ================================================================================================
/*
* This procedure is the root of all evil.
* The error only occurs when using the purge_queue_tables procedure.
* When using a normal "delete from <queue_table>" then everything is just fine.
*/
procedure CleanQueueTable is
L_PurgeOptions dbms_aqadm.aq$_purge_options_t;
L_Count pls_integer;
begin
dbms_output.put_line('START CleanQueueTable');
execute immediate 'select count(*) from ' || C_QueueTable into L_Count;
dbms_output.put_line(' Messages in queue table BEFORE purge: ' || L_Count);
dbms_aqadm.purge_queue_table(queue_table => C_QueueTable
,purge_condition => null
,purge_options => L_PurgeOptions);
execute immediate 'select count(*) from ' || C_QueueTable into L_Count;
dbms_output.put_line(' Messages in queue table AFTER purge: ' || L_Count);
dbms_output.put_line('END CleanQueueTable');
end CleanQueueTable;
-- ================================================================================================
/*
* Enqueue the configured count of messages on the queue
*/
procedure EnqueueMessages is
L_BodyId pls_integer;
L_Msg sys.aq$_jms_bytes_message;
L_MsgId raw(16);
L_Count pls_integer;
L_EnqueueOptions DBMS_AQ.ENQUEUE_OPTIONS_T;
L_MessageProperties DBMS_AQ.MESSAGE_PROPERTIES_T;
begin
dbms_output.put_line('START EnqueueMessages');
execute immediate 'select count(*) from ' || C_QueueTable into L_Count;
dbms_output.put_line(' Messages in queue table BEFORE enqueue: ' || L_Count);
for i in 1 .. C_MsgCount
loop
dbms_output.put_line(' Construct #' || i);
L_Msg := sys.aq$_jms_bytes_message.construct;
-- set the JMS header
L_Msg.set_type('JmsBytesMessage');
L_Msg.set_userid(1);
L_Msg.set_appid('test');
L_Msg.set_groupid('cs');
L_Msg.set_groupseq(1);
-- set JMS message content
L_BodyId := L_Msg.clear_body(-1);
L_Msg.write_bytes(L_BodyId, to_blob(utl_raw.cast_to_raw('<test>Lorem Ipsum</test>')));
L_Msg.flush(L_BodyId);
L_Msg.clean(L_BodyId);
dbms_output.put_line(' Enqueue #' || i);
DBMS_AQ.ENQUEUE (queue_name => C_QueueName
,enqueue_options => L_EnqueueOptions
,message_properties => L_MessageProperties
,payload => L_Msg
,msgid => L_MsgId);
end loop;
execute immediate 'select count(*) from ' || C_QueueTable into L_Count;
dbms_output.put_line(' Messages in queue table AFTER enqueue: ' || L_Count);
dbms_output.put_line('END EnqueueMessages');
end EnqueueMessages;
-- ================================================================================================
/*
* Dequeues messages using dequeue_array from the configured queue.
*/
procedure DequeueMessages is
L_DequeueOptions dbms_aq.dequeue_options_t;
L_MsgPropArr dbms_aq.message_properties_array_t := dbms_aq.message_properties_array_t();
L_PayloadArr sys.aq$_jms_bytes_messages;
L_MsgIdArr dbms_aq.msgid_array_t;
L_MsgCnt pls_integer := 0;
L_Count pls_integer;
begin
dbms_output.put_line('START DequeueMessages');
execute immediate 'select count(*) from ' || C_QueueTable into L_Count;
dbms_output.put_line(' Messages in queue table BEFORE dequeue: ' || L_Count);
L_MsgCnt := dbms_aq.dequeue_array(queue_name => C_QueueName
,dequeue_options => L_DequeueOptions
,array_size => C_DequeueArraySize
,message_properties_array => L_MsgPropArr
,payload_array => L_PayloadArr
,msgid_array => L_MsgIdArr);
execute immediate 'select count(*) from ' || C_QueueTable into L_Count;
dbms_output.put_line(' Messages in queue table AFTER dequeue: ' || L_Count);
dbms_output.put_line(' Expected: ' || C_MsgCount || ', Received: ' || L_MsgCnt);
if C_MsgCount != L_MsgCnt then
dbms_output.put_line(' *****************************************');
dbms_output.put_line(' TOO MANY ITEMS DEQUEUED?!?');
dbms_output.put_line(' *****************************************');
for i in 1 .. L_MsgCnt
loop
dbms_output.put_line(' #' || i || ' MsdId=' || L_MsgIdArr(i));
end loop;
end if;
dbms_output.put_line('END DequeueMessages');
end DequeueMessages;
-- ================================================================================================
/*
* This is the testcase
*/
procedure RunTestCase is
begin
CreateQueueIfMissing;
for i in 1 .. C_TestRuns
loop
dbms_output.put_line(null);
dbms_output.put_line('=========== START test run #' || i || '===========');
CleanQueueTable;
EnqueueMessages;
DequeueMessages;
end loop;
end;
-- ================================================================================================
begin
RunTestCase;
end;
START CreateQueueIfMissing
Skipping queue creation, already present.
END CreateQueueIfMissing
=========== START test run #1===========
START CleanQueueTable
Messages in queue table BEFORE purge: 0
Messages in queue table AFTER purge: 0
END CleanQueueTable
START EnqueueMessages
Messages in queue table BEFORE enqueue: 0
Construct #1
Enqueue #1
Messages in queue table AFTER enqueue: 1
END EnqueueMessages
START DequeueMessages
Messages in queue table BEFORE dequeue: 1
Messages in queue table AFTER dequeue: 0
Expected: 1, Received: 1
END DequeueMessages
=========== START test run #2===========
START CleanQueueTable
Messages in queue table BEFORE purge: 0
Messages in queue table AFTER purge: 0
END CleanQueueTable
START EnqueueMessages
Messages in queue table BEFORE enqueue: 0
Construct #1
Enqueue #1
Messages in queue table AFTER enqueue: 1
END EnqueueMessages
START DequeueMessages
Messages in queue table BEFORE dequeue: 1
Messages in queue table AFTER dequeue: 0
Expected: 1, Received: 2
*****************************************
TOO MANY ITEMS DEQUEUED?!?
*****************************************
#1 MsdId=2949A0FF2EE456A7E0540010E0467A30
#2 MsdId=2949A0FF2EE456A7E0540010E0467A30
END DequeueMessages
This looks like bug 20659700. There's a bit more info in document 2002148.1.
You (or your DBA) should raise a service request to confirm that, and see if a patch is available for your platform.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With