Расширенное распространение очереди Oracle не работает для меня

#oracle #queue #propagation

#Oracle #очередь #распространение

Вопрос:

Я хотел бы настроить распространение в Oracle AQ (11).

Я хотел бы перейти из очереди «Q» в таблице очередей «QT» в очередь «QD» в таблице очередей «QTD».

Это моя настройка:

 DECLARE 
 subscriber sys.aq$_agent; 
BEGIN
 DBMS_AQADM.CREATE_QUEUE_TABLE(queue_table=>'QT',multiple_consumers=>TRUE,queue_payload_type=>'RAW');
 DBMS_AQADM.CREATE_QUEUE_TABLE(queue_table=>'QTD',queue_payload_type=>'RAW');
 DBMS_AQADM.CREATE_QUEUE(queue_name => 'Q', queue_table => 'QT'); 
 DBMS_AQADM.CREATE_QUEUE(queue_name => 'QD', queue_table => 'QTD'); 
 DBMS_AQADM.START_QUEUE(queue_name => 'Q');
 DBMS_AQADM.START_QUEUE(queue_name => 'QD');
 subscriber := sys.aq$_agent('SUB', 'QD', NULL);
 DBMS_AQADM.ADD_SUBSCRIBER(queue_name => 'Q',subscriber => subscriber, queue_to_queue => TRUE);
 DBMS_AQADM.SCHEDULE_PROPAGATION(queue_name => 'Q');
END;
/
  

Я отправляю сообщение от клиента Java aqapi. Сообщение отправляется без ошибок, я вижу его в очереди «Q»:

 select * from QT;

"Q_NAME"    "MSGID" "CORRID"    "PRIORITY"  "STATE" "DELAY" "EXPIRATION"    "TIME_MANAGER_INFO" "LOCAL_ORDER_NO"    "CHAIN_NO"  "CSCN"  "DSCN"  "ENQ_TIME"  "ENQ_UID"   "ENQ_TID"   "DEQ_TIME"  "DEQ_UID"   "DEQ_TID"   "RETRY_COUNT"   "EXCEPTION_QSCHEMA" "EXCEPTION_QUEUE"   "STEP_NO"   "RECIPIENT_KEY" "DEQUEUE_MSGID" "SENDER_NAME"   "SENDER_ADDRESS"    "SENDER_PROTOCOL"   "USER_DATA" "USER_PROP"
"Q" FC914BFDC7489ECEE040010A393F3DD1    ""  1   0               0   0   0   0   24-JUN-14 07.56.27.258348000 AM "RISKOPALL" "9.5.283837"        ""  ""  0   ""  ""  0   0       ""  ""  0   (BLOB)  
  

Но я не вижу его в очереди назначения «QD»:

 select * from QTD;
  

Показывает пустой результат.

У вас есть какие-либо идеи, что с этим не так?

Я уже пробовал ENABLE_PROPAGATION_SCHEDULE, но оно уже включено после SCHEDULE_PROPAGATION . Это приводит к ошибке.

Я проверил эту страницу: http://docs.oracle.com/cd/B28359_01/server.111/b28420/aq_trbl.htm но я не могу найти представление DBA_QUEUE_SCHEDULES. У меня есть права администратора. Где мне его искать? Как мне устранить неполадки в распространении?

Любая помощь действительно ценится!

Ответ №1:

Решаемая проблема!

Если целевая очередь (в которую передаются сообщения) является однопользовательской очередью, то для имени подписчика должно быть установлено значение NULL! Это была моя проблема. Это задокументировано в документе 11g:

http://docs.oracle.com/cd/B28359_01/server.111/b28420/aq_admin.htm#i1008642 :

«Имя агента должно быть НУЛЕВЫМ, если очередь назначения представляет собой единственную очередь потребителя».

Проблемная строка в моей настройке:

 subscriber := sys.aq$_agent('SUB', 'QD', NULL);
  

Это должно было быть:

 subscriber := sys.aq$_agent(NULL, 'QD', NULL);
  

В поиске проблемы было действительно полезно приведенное ниже руководство по устранению неполадок:

https://blogs.oracle.com/db/entry/oracle_support_master_note_for_troubleshooting_advanced_queuing_and_oracle_streams_propagation_issue

В разделе 4.3 рекомендуется проверять журналы предупреждений. Я проверил их и действительно в файле трассировки, который я нашел

 kwqpdest: exception 24039
kwqpdest: Error 24039 propagating to "TEST"."QD"
  

После этого было не слишком сложно выяснить, почему выбрасывается 24039.

Итак, в конце концов, вот рабочая настройка на моем сервере 11g. Он распространяет сообщения от источника до трех целей. Источником является очередь с несколькими потребителями (она должна быть), целями являются очереди с одним потребителем:

 BEGIN
 DBMS_AQADM.DROP_QUEUE_TABLE(queue_table=>'QT', force => TRUE);
 DBMS_AQADM.DROP_QUEUE_TABLE(queue_table=>'QTD', force => TRUE);
END;
/
DECLARE
 subscriber sys.aq$_agent;
BEGIN
 DBMS_AQADM.CREATE_QUEUE_TABLE(queue_table=>'QT',multiple_consumers=>TRUE,
  queue_payload_type=>'SYS.AQ$_JMS_TEXT_MESSAGE');
 DBMS_AQADM.CREATE_QUEUE_TABLE(queue_table=>'QTD',multiple_consumers=>FALSE,
  queue_payload_type=>'SYS.AQ$_JMS_TEXT_MESSAGE');
 DBMS_AQADM.CREATE_QUEUE(queue_name => 'Q', queue_table => 'QT');
 DBMS_AQADM.CREATE_QUEUE(queue_name => 'QD1', queue_table => 'QTD');
 DBMS_AQADM.CREATE_QUEUE(queue_name => 'QD2', queue_table => 'QTD');
 DBMS_AQADM.CREATE_QUEUE(queue_name => 'QD3', queue_table => 'QTD');
 DBMS_AQADM.START_QUEUE(queue_name => 'Q');
 DBMS_AQADM.START_QUEUE(queue_name => 'QD1');
 DBMS_AQADM.START_QUEUE(queue_name => 'QD2');
 DBMS_AQADM.START_QUEUE(queue_name => 'QD3');
 subscriber := sys.aq$_agent(NULL, 'QD1', NULL);
 DBMS_AQADM.ADD_SUBSCRIBER(queue_name => 'Q',subscriber => subscriber);
 subscriber := sys.aq$_agent(NULL, 'QD2', NULL);
 DBMS_AQADM.ADD_SUBSCRIBER(queue_name => 'Q',subscriber => subscriber);
 subscriber := sys.aq$_agent(NULL, 'QD3', NULL);
 DBMS_AQADM.ADD_SUBSCRIBER(queue_name => 'Q',subscriber => subscriber);
 DBMS_AQADM.SCHEDULE_PROPAGATION(queue_name => 'Q', latency => 0);
END;