Oracle8 Application Developer's Guide Release 8.0 A58241-01 |
|
This chapter has four sections:
This introductory section:
Consider the following application scenarios.
Although not every application developer will have worked with each of these types of scenario, the basic elements of these problem domains will be familiar. Each of these scenarios describes a situation in which messages come from and are disbursed to multiple clients (nodes) in a distributed computing environment. Messages are not only passed back and forth between client and server but also are intricately interleaved between processes on the server.
If we focus on these scenarios in terms of messages, the applications can be viewed as consisting of multi-step processes in which each step is triggered by one or more messages, and gives rises to one or more messages. Another way of saying this is that messages are events that trigger other message-events.
Business Process Management, or Workflow, which is based on this notion of the interrelation of messages and events, is becoming recognized as a fundamental technology. Queuing is one of the key technologies for this class of application because it implements deferred execution of messages. This decoupling of `requests for service' from `supply of services' increases efficiency, and provides the infrastructure for complex scheduling.
Handling the intricacy message-passing is not the only problem. Unfortunately, networks, computing hardware, and software applications will all fail from time to time, as is the case in power utility scenario. Nevertheless, the ACID properties of the information must be preserved. Chaos would quickly follow if buy orders and the order in which they issued were `lost', or if the changing status of students could not be matched to class availability, or if power could not routed to where the combination of incoming reports an historical patterns of changes in demand indicated it would be most required. In other words, messaging must be persistent. By integrating transaction processing with queuing technology, persistent messaging in the form of queuing is made possible. The importance of queuing has been proven by TP-monitors that typically include such a facility.
The persistence of messages that is required goes beyond the ability to recover information in the event of system failure. Applications may have to deal with multiple unprocessed messages arriving simultaneously from external clients or from programs internal to the application. The communication links between databases may not be available all the time or may be reserved for some other purpose. If the system falls short in its capacity to deal with these messages immediately, the application must be able to store the messages until they can be processed. By the same token, external clients or internal programs may not be ready to receive messages that have been processed.
Even more important, applications must be able to deal with priorities: messages arriving later may be of higher priority than messages arriving earlier; messages arriving earlier may have to wait for messages arriving later before actions are executed; the same message may have to be accessed by different processes; and so on. All these cases become more pressing in situations in which messages are communicated between remote locations.
Moreover, priorities are not fixed. One crucial dimension of handling the dynamic aspect of message persistence has to do with windows of opportunity that grow and shrink.It may be that messages in a specific queue become more important than messages in other queues, and so need to be processed with less delay or interference from messages in other queues. Similarly, it may be more pressing to send messages to some destinations than to others. In the case of the share brokerage application, the window for completing the sale shrinks to nothing (i.e. an offer to sell expires) from the time the offer to sell message is received. In the case of the student registration application, different priorities apply during different temporal phases, and data must be re-evaluated with the transition from one phase to another. And in the case of the power utility, the entire decision-making process depends on whether conditions are stable (the persistence of a large window) or dynamic (the rapid appearance and disappearance of windows).
What is true for all the scenarios is the time that messages are received or dispatched is a crucial part of the message. This means that the control component of the message - in this case, time markers - is as important as the payload data. Put another way: the message retains importance as a business asset after it has been executed.
Persistent messaging thus implies accurate documentation of messages for analysis of historical patterns and future trends. For instance:
What are the key requirements of a persistent messaging system given the above issues?
Generally, attempts to provide communication between programs can be classified into one of two types: Synchronous and Disconnected/Deferred Communication.
This model of communication, also called on-line or connected, is based on the request/reply paradigm. In this model a program sends a request to another program and waits (blocks) until the reply arrives. This model of communication, in which the sender and receiver of the message are tightly coupled, is suitable for programs that need to get a reply before they can proceed with any task. Traditional client/server architectures are based on this model.
The major drawback of the synchronous model of communication is that the programs must be available and running for the application to work. In the event of network or machine failure, or even if the program needed being busy, the entire application grinds to a halt.
In this model programs in the role of producers place requests in a queue and then proceed with their work. Programs in the role of consumers retrieve requests from the queue and acts on them. This model is well suited for applications that can continue with their work after placing a request in the queue because they are not blocked waiting for a reply. It is also suited to applications that can continue with their work until there is a message to retrieve.
For deferred execution to work correctly even in the presence of network, machine and application failures, the requests must be stored persistently, and processed exactly once. This can be achieved by combining persistent queuing with transaction protection. Oracle8 provides a queuing technology that does not depend on the use of TP-monitors or any other evolving Message-Oriented Middleware (MOM) infrastructure.
Oracle AQ (Oracle Advanced Queueing) provides message queuing as an integrated part of the Oracle server. Oracle AQ provides this functionality by integrating the queuing system with the database, thereby creating a message-enabled database. By providing an integrated solution Oracle AQ frees application developers to devote their efforts to their specific business logic rather than having to construct a messaging infrastructure.
ENQUEUE
/DEQUEUE
time and the identification of the transaction that executed each request. This allows users to keep a history of relevant messages. The history can be used for tracking, data warehouse and data mining operations.
DEQUEUE
request can either browse or remove a message. If a message is browsed it remains available for further processing, if a message is removed, it is not available any more for DEQUEUE
requests. Depending on the queue properties a removed message may be retained in the queue table.
DEQUEUE
could be issued against an empty queue. To avoid polling for the arrival of a new message a user can specify if and for how long the request is allowed to wait for the arrival of a message.
ENQUEUE
/DEQUEUE
requests are normally part of a transaction that contains the requests, thereby providing the desired transactional behavior. Users can, however, specify that a specific request is a transaction by itself making the result of that request immediately visible to other transactions. This means that messages can be made visible to the external world either as soon as soon as the ENQUEUE
or DEQUEUE
statement is issued, or only after the transaction is committed.
For example, you can connect to database X and enqueue the message in a queue, say "DROPBOX
" located in database X. You can configure AQ so that all messages enqueued in queue "DROPBOX
" will be automatically propagated to another queue in a database Y, regardless whether database Y is local or remote. AQ will automatically check if the type of the remote queue in database Y is structurally equivalent to the type of the local queue in database X, and propagate the message.
Recipients of propagated messages can be either applications or queues. If the recipient is a queue, the actual recipients will be determined by the subscription list associated with the recipient queue.If the queues are remote, messages will be propagated using the specified database link. Only AQ to AQ message propagation is supported.
A message is the smallest unit of information inserted into and retrieved from a queue. A message consists of control information (metadata) and payload (data). The control information represents message properties used by AQ to manage messages. The payload data is the information stored in the queue and is transparent to Oracle AQ. A message can reside in only one queue. A message is created by the enqueue call and consumed by the dequeue call.
A queue is a repository for messages. There are two types of queues: user queues, also known as normal queues, and exception queues. The user queue is for normal message processing. Messages are transferred to an exception queue if they can not be retrieved and processed for some reason. Queues can be created, altered, started, stopped, and dropped by using the Oracle AQ administrative interfaces.
Queues are stored in queue tables. Each queue table is a database table and contains one or more queues. Each queue table contains a default exception queue.
The following figure shows the relationship between messages, queues, and queue tables. The columns represent message queues, with rows representing individual messages.
An agent is a queue user. There are two types of agents: producers who place messages in a queue (enqueuing) and consumers who retrieve messages (dequeuing). Any number of producers and consumers may be accessing the queue at a given time. Agents insert messages into a queue and retrieve messages from the queue by using the Oracle AQ operational interfaces
An agent is identified by its name, address and protocol. The address field is a character field of up to 1024 bytes that is interpreted in the context of the protocol. For instance, the default value for the protocol is 0, signifying a database link addressing. In this case, the address for this protocol is of the form
queue_name@dblink
where queue_name
is of the form [schema.]queue
and dblink
may either be a fully qualified database link name or the database link name without the domain name. The only supported protocol value is 0 at this time.
The queue monitor is a background process that monitors the messages in the queue. It provides the mechanism for message expiration, retry and delay.
Figure 11-1 (above) portrays a queue table that contains two queues, and one exception queue:
At its most basic, one producer may enqueues different messages into one queue. Each message will be dequeued and processed once by one of the consumers. A message will stay in the queue until a consumer dequeues it or the message expires. A producer may stipulate a delay before the message is available to be consumed, and a time after which the message expires. Likewise, a consumer may wait when trying to dequeue a message if no message is available. Note that an agent program, or application, can act as both a producer and a consumer.
At a slightly higher level of complexity, many producers may enqueue messages into a queue, all of which are processed by one consumer.
In this next stage, many producers may enqueue messages, each message being processed by a different consumer depending on type and correlation identifier. The figure below shows this scenario.
Figure 11-2 (below) portrays a queue table that contains one queue into which messages are being enqueued and from which messages are being dequeued.
The figure indicates that there are 6 producers of messages, although only four are shown. This assumes that two other producers (P4 and P5) have the right to enqueue messages even though there are no messages enqueued by them at the moment portrayed by the figure. The figure shows:
According to the figure, there are 3 consumers of messages, representing the total population of consumers. The figure shows:
The previous figure portrayed the enqueuing of multiple messages by a set of producers, and the dequeuing of messages by a set of consumers. What may not be readily evident in that sketch is the notion of time, and the advantages offered by Oracle AQ.
Client-Server applications normally execute in a synchronous manner, with all the disadvantages of that tight coupling described above. Figure 11-3 demonstrates the asynchronous alternative using AQ. In this example Application B (a server) provides service to Application A (a client) using a request/response queue.
In this way the client does not have to wait to establish a connection with the server, and the server dequeues the message at its own pace. When the server is finished processing the message, there is no need for the client to be waiting to receive the result. In this way a process of double-deferral frees both client and server.
A message can only be enqueued into one queue at a time. If a producer had to insert the same message into several queues in order to reach different consumers, this would require management of a very large number of queues. Oracle AQ provides two mechanisms to allow for multiple consumers to dequeue the same message: queue subscribers and message recipients. The queue must reside in a queue table that is created with multiple consumer option to allow for subscriber and recipient lists. Each message remains in the queue until it is consumed by all its intended consumers.
Using this approach, multiple consumer-subscribers are associated with a queue. This will cause all messages enqueued in the queue to be made available to be consumed by each of the queue subscribers. The subscribers to the queue can be changed dynamically without any change to the messages or message producers. Subscribers to the queue are added and removed by using the Oracle AQ administrative package. The diagram below shows multiple producers enqueuing messages into queue, each of which is consumed by multiple consumer-subscribers.
A message producer can submit a list of recipients at the time a message is enqueued. This allows for a unique set of recipients for each message in the queue. The recipient list associated with the message overrides the subscriber list associated with the queue, if there is one. The recipients need not be in the subscriber list. However, recipients may be selected from among the subscribers.
Figure 11-4 describes the case in which three consumers are all listed as subscribers of a queue. This is the same as saying that they all subscribe to all the messages that might ever be enqueued into that queue. The drawing illustrates a number of important points:
Figure 11-5 illustrates the same technology from a dynamic perspective. This examples concerns a scenario in which more than one application needs the result produced by an application. Every message enqueued by Application A is dequeued by Application B and Application C. To make this possible, the multiple consumer queue is specially configured with Application B and Application C as queue subscribers. Consequently, they are implicit recipients of every message placed in the queue.
Figure 11-6 shows how a message can be specified for one or more recipients. In this case, Message 5 is specified to be dequeued by Recipient-1 and Recipient-2. As described by the drawing, neither of the recipients is one of the 3 subscribers to the queue.
We earlier referred to subscribers as "implicit recipients" in that they are able to dequeue all the messages placed into a specific queue. This is like subscribing to a magazine and thereby implicitly gaining access to all its articles. The category of consumers that we have referred to as recipients may also be viewed as "explicit recipients" in that they are designated targets of particular messages.
Figure 11-7 shows how Oracle AQ can adjust dynamically to accommodate both kinds of consumers. In this scenario Application B and Application C are implicit recipients (subscribers). But messages can also be explicitly directed toward specific consumers (recipients) who may or may not be subscribers to the queue. The list of such recipients is specified in the enqueue call for that message and overrides the list of subscribers for that queue. In the figure, Application D is specified as the sole recipient of a message enqueued by Application A.
Figure 11-8 illustrates the use of AQ for implementing workflows, also knows as chained application transactions. It shows a workflow consisting of 4 steps performed by Applications A, B, C and D. The queues are used to buffer the flow of information between different processing stages of the business process. By specifying delay interval and expiration time for a message, a window of execution can be provided for each of the applications.
From a workflow perspective, the passing of messages is a business asset above and beyond the value of the payload data. Hence, AQ supports the optional retention of messages for analysis of historical patterns and prediction of future trends. For instance, two of the three application scenarios at the head of the chapter are founded in an implementation of workflow analysis.
In AQ, message recipients can be either consumers or other queues. If the message recipient is a queue, the actual recipients are determined by the subscribers to the queue (which may in turn be other queues). Thus it is possible to fan-out messages to a large number of recipients without requiring them all to dequeue messages from a single queue.
For example: A queue, Source, may have as its as its subscribers queues dispatch1@dest1 and dispatch2@dest2. Queue dispatch1@dest1 may in turn have as its subscribers the queues outerreach1@dest3 and outerreach2@dest4, while queue dispatch2@dest2 has as subscribers the queue outerreach3@dest21 and outerreach4@dest4. In this way, messages enqueued in Source will be propagated to all the subscribers of four different queues.
Another use of queues as a message recipient is the ability to combine messages from different queues into a single queue. This process is sometimes described as "compositing"
For example, if queue composite@endpoint is a subscriber to both queues funnel1@source1 and funnel2@source2 then the subscribers to queue composite@endpoint can get all messages enqueued in those queues as well as messages enqueued directly into itself.
Figure 11-9 illustrates applications on different databases communicating via AQ. Each application has an inbox and an outbox for handling incoming and outgoing messages. An application enqueues a message into its outbox irrespective of whether the message has to be sent to an application that is local (on the same node) or remote (on a different node).
Likewise, an application is not concerned as to whether a message originates locally or remotely. In all cases, an application dequeues messages from its inbox.
Oracle AQ facilitates all this interchange, treating messages on the same basis.
Oracle AQ by Example guides users by means of a step-by-step approach.
RAW
type messages
RAW
type messages using Pro*C/C++
RAW
type messages using OCI
/* Create user and grant privileges: */ CONNECT sys/change_on_install as sysdba; CREATE user aq identified by AQ; GRANT AQ_ADMINISTRATOR_ROLE TO aq; GRANT CONNECT TO aq; GRANT RESOURCE TO aq; EXECUTE dbms_aqadm.grant_type_access(`aq'); CONNECT aq/AQ; SET ECHO ON; SET SERVEROUTPUT ON;
/* Create a message type: */ CREATE type aq.message_type as object ( subject VARCHAR2(30), text VARCHAR2(80)); /* Create a object type queue table and queue: */ EXECUTE dbms_aqadm.create_queue_table ( queue_table => 'aq.msg', queue_payload_type => 'aq.message_type'); EXECUTE dbms_aqadm.create_queue ( queue_name => 'msg_queue', queue_table => 'aq.msg'); EXECUTE dbms_aqadm.start_queue ( queue_name => 'msg_queue');
/* Create a RAW type queue table and queue: */ EXECUTE dbms_aqadm.create_queue_table ( queue_table => 'aq.raw_msg', queue_payload_type => 'RAW'); EXECUTE dbms_aqadm.create_queue ( queue_name => 'raw_msg_queue', queue_table => 'aq.raw_msg'); EXECUTE dbms_aqadm.start_queue ( queue_name => 'raw_msg_queue');
EXECUTE dbms_aqadm.create_queue_table ( queue_table => 'aq.priority_msg', sort_list => 'PRIORITY,ENQ_TIME', queue_payload_type => 'aq.message_type'); EXECUTE dbms_aqadm.create_queue ( queue_name => 'priority_msg_queue', queue_table => 'aq.priority_msg'); EXECUTE dbms_aqadm.start_queue ( queue_name => 'priority_msg_queue');
EXECUTE dbms_aqadm.create_queue_table ( queue_table => 'aq.msg_multiple', multiple_consumers => TRUE, queue_payload_type => 'aq.message_type'); EXECUTE dbms_aqadm.create_queue ( queue_name => 'msg_queue_multiple', queue_table => 'aq.msg_multiple'); EXECUTE dbms_aqadm.start_queue ( queue_name => 'msg_queue_multiple');
EXECUTE dbms_aqadm.create_queue ( queue_name => 'another_msg_queue', queue_table => 'aq.msg_multiple'); EXECUTE dbms_aqadm.start_queue ( queue_name => 'another_msg_queue');
To enqueue a single message without any other parameters specify the queue name and the payload.
/* Enqueue to msg_queue: */ DECLARE enqueue_options dbms_aq.enqueue_options_t; message_properties dbms_aq.message_properties_t; message_handle RAW(16); message aq.message_type; BEGIN message := message_type('NORMAL MESSAGE', 'enqueued to msg_queue first.'); dbms_aq.enqueue(queue_name => 'msg_queue', enqueue_options => enqueue_options, message_properties => message_properties, payload => message, msgid => message_handle); COMMIT; END; / /* Dequeue from msg_queue: */ DECLARE dequeue_options dbms_aq.dequeue_options_t; message_properties dbms_aq.message_properties_t; message_handle RAW(16); message aq.message_type; BEGIN dbms_aq.dequeue(queue_name => 'msg_queue', dequeue_options => dequeue_options, message_properties => message_properties, payload => message, msgid => message_handle); dbms_output.put_line ('Message: ' || message.subject || ' ... ' || message.text ); COMMIT; END; /
#include <stdio.h> #include <string.h> #include <sqlca.h> #include <sql2oci.h> /* The header file generated by processing object type 'aq.message_type': */ #include "pceg.h" void sql_error(msg) char *msg; { EXEC SQL WHENEVER SQLERROR CONTINUE; printf("%s\n", msg); printf("\n% .800s \n", sqlca.sqlerrm.sqlerrmc); EXEC SQL ROLLBACK WORK RELEASE; exit(1); } main() { message_type *message = (message_type*)0; /* payload */ char user[60]="aq/AQ"; /* user logon password */ char subject[30]; /* components of the */ char txt[80]; /* payload type */ /* ENQUEUE and DEQUEUE to an OBJECT QUEUE */ /* Connect to database: */ EXEC SQL CONNECT :user; /* On an oracle error print the error number :*/ EXEC SQL WHENEVER SQLERROR DO sql_error("Oracle Error :"); /* Allocate memory for the host variable from the object cache : */ EXEC SQL ALLOCATE :message; /* ENQUEUE */ strcpy(subject, "NORMAL ENQUEUE"); strcpy(txt, "The Enqueue was done through PLSQL embedded in PROC"); /* Initialize the components of message : */ EXEC SQL OBJECT SET SUBJECT, TEXT OF :message TO :subject, :txt; /* Embedded PLSQL call to the AQ enqueue procedure : */ EXEC SQL EXECUTE DECLARE message_properties dbms_aq.message_properties_t; enqueue_options dbms_aq.enqueue_options_t; msgid RAW(16); BEGIN /* Bind the host variable 'message' to the payload: */ dbms_aq.enqueue(queue_name => 'msg_queue', message_properties => message_properties, enqueue_options => enqueue_options, payload => :message, msgid => msgid); END; END-EXEC; /* Commit work */ EXEC SQL COMMIT; printf("Enqueued Message \n");printf("Subject :%s\n",subject);
printf("Text :%s\n",txt);
/* Dequeue */ /* Embedded PLSQL call to the AQ dequeue procedure : */ EXEC SQL EXECUTE DECLARE message_properties dbms_aq.message_properties_t; dequeue_options dbms_aq.dequeue_options_t; msgid RAW(16); BEGIN /* Return the payload into the host variable 'message': */ dbms_aq.dequeue(queue_name => 'msg_queue', message_properties => message_properties, dequeue_options => dequeue_options, payload => :message, msgid => msgid); END; END-EXEC; /* Commit work :*/ EXEC SQL COMMIT; /* Extract the components of message: */ EXEC SQL OBJECT GET SUBJECT,TEXT FROM :message INTO :subject,:txt; printf("Dequeued Message \n"); printf("Subject :%s\n",subject); printf("Text :%s\n",txt); }
#ifndef SL_ORACLE #include <sl.h> #endif #ifndef OCI_ORACLE #include <oci.h> #endif struct message { OCIString *subject; OCIString *data; }; typedef struct message message; struct null_message { OCIInd null_adt; OCIInd null_subject; OCIInd null_data; }; typedef struct null_message null_message; int main() { OCIEnv *envhp; OCIServer *srvhp; OCIError *errhp; OCISvcCtx *svchp; dvoid *tmp; OCIType *mesg_tdo = (OCIType *) 0; message msg; null_message nmsg; message *mesg = &msg; null_message *nmesg = &nmsg; message *deqmesg = (message *)0; null_message *ndeqmesg = (null_message *)0; OCIInitialize((ub4) OCI_OBJECT, (dvoid *)0, (dvoid * (*)()) 0, (dvoid * (*)()) 0, (void (*)()) 0 ); OCIHandleAlloc( (dvoid *) NULL, (dvoid **) &envhp, (ub4) OCI_HTYPE_ENV, 52, (dvoid **) &tmp); OCIEnvInit( &envhp, (ub4) OCI_DEFAULT, 21, (dvoid **) &tmp ); OCIHandleAlloc( (dvoid *) envhp, (dvoid **) &errhp, (ub4) OCI_HTYPE_ERROR, 52, (dvoid **) &tmp); OCIHandleAlloc( (dvoid *) envhp, (dvoid **) &srvhp, (ub4) OCI_HTYPE_SERVER, 52, (dvoid **) &tmp); OCIServerAttach( srvhp, errhp, (text *) 0, (sb4) 0, (ub4) OCI_DEFAULT); OCIHandleAlloc( (dvoid *) envhp, (dvoid **) &svchp, (ub4) OCI_HTYPE_SVCCTX, 52, (dvoid **) &tmp); OCIAttrSet( (dvoid *) svchp, (ub4) OCI_HTYPE_SVCCTX, (dvoid *)srvhp, (ub4) 0, (ub4) OCI_ATTR_SERVER, (OCIError *) errhp); OCILogon(envhp, errhp, &svchp, "AQ", strlen("AQ"), "AQ", strlen("AQ"), 0, 0); /* obtain TDO of message_type */ OCITypeByName(envhp, errhp, svchp, (CONST text *)"AQ", strlen("AQ"), (CONST text *)"MESSAGE_TYPE", strlen("MESSAGE_TYPE"), (text *)0, 0, OCI_DURATION_SESSION, OCI_TYPEGET_ALL, &mesg_tdo); /* prepare the message payload */ mesg->subject = (OCIString *)0; mesg->data = (OCIString *)0; OCIStringAssignText(envhp, errhp, (CONST text *)"NORMAL MESSAGE", strlen("NORMAL MESSAGE"), &mesg->subject); OCIStringAssignText(envhp, errhp, (CONST text *)"OCI ENQUEUE", strlen("OCI ENQUEUE"), &mesg->data); nmesg->null_adt = nmesg->null_subject = nmesg->null_data = OCI_IND_NOTNULL; /* enqueue into the msg_queue */ OCIAQEnq(svchp, errhp, (CONST text *)"msg_queue", 0, 0, mesg_tdo, (dvoid **)&mesg, (dvoid **)&nmesg, 0, 0); OCITransCommit(svchp, errhp, (ub4) 0); /* dequeue from the msg_queue */ OCIAQDeq(svchp, errhp, (CONST text *)"msg_queue", 0, 0, mesg_tdo, (dvoid **)&deqmesg, (dvoid **)&ndeqmesg, 0, 0); printf("Subject: %s\n", OCIStringPtr(envhp, deqmesg->subject)); printf("Text: %s\n", OCIStringPtr(envhp, deqmesg->data)); OCITransCommit(svchp, errhp, (ub4) 0); }
/* Enqueue a message containing a RAW: */ DECLARE enqueue_options dbms_aq.enqueue_options_t; message_properties dbms_aq.message_properties_t; message_handle RAW(16); message RAW(4096); BEGIN message := hextoraw(rpad('FF',4095,'FF')); dbms_aq.enqueue(queue_name => 'raw_msg_queue', enqueue_options => enqueue_options, message_properties => message_properties, payload => message, msgid => message_handle); COMMIT; END; /* Dequeue from raw_msg_queue: */ DECLARE dequeue_options dbms_aq.dequeue_options_t; message_properties dbms_aq.message_properties_t; message_handle RAW(16); message RAW(4096); BEGIN dbms_aq.dequeue(queue_name => 'raw_msg_queue', dequeue_options => dequeue_options, message_properties => message_properties, payload => message, msgid => message_handle); COMMIT; END;
#include <stdio.h>
#include <string.h>
#include <sqlca.h>
#include <sql2oci.h>
void sql_error(msg)
char *msg;
{
EXEC SQL WHENEVER SQLERROR CONTINUE;
printf("%s\n", msg);
printf("\n% .800s \n", sqlca.sqlerrm.sqlerrmc);
EXEC SQL ROLLBACK WORK RELEASE;
exit(1);
}
main()
{
OCIEnv *oeh; /* OCI Env handle */
OCIError *err; /* OCI Err handle */
OCIRaw *message= (OCIRaw*)0; /* payload */
ub1 message_txt[100]; /* data for payload */
char user[60]="aq/AQ"; /* user logon password */
int status; /* returns status of the OCI call */
/* Enqueue and dequeue to a RAW queue */ /* Connect to database: */EXEC SQL CONNECT :user;
/* On an oracle error print the error number: */EXEC SQL WHENEVER SQLERROR DO sql_error("Oracle Error :");
/* Get the OCI Env handle: */
if (SQLEnvGet(SQL_SINGLE_RCTX, &oeh) != OCI_SUCCESS)
{
printf(" error in SQLEnvGet \n");
exit(1);
}
/* Get the OCI Error handle: */if (status = OCIHandleAlloc((dvoid *)oeh, (dvoid **)&err,
(ub4)OCI_HTYPE_ERROR, (ub4)0, (dvoid **)0))
{
printf(" error in OCIHandleAlloc %d \n", status);
exit(1);
}
/
* Enqueue */ /* The bytes to be put into the raw payload:*/strcpy(message_txt, "Enqueue to a Raw payload queue ");
/* Assign bytes to the OCIRaw pointer : Memory needs to be allocated explicitly to OCIRaw*: */if (status=OCIRawAssignBytes(oeh, err, message_txt, 100,
&message))
{
printf(" error in OCIRawAssignBytes %d \n", status);
exit(1);
}
/
* Embedded PLSQL call to the AQ enqueue procedure : */EXEC SQL EXECUTE
DECLARE
message_properties dbms_aq.message_properties_t;
enqueue_options dbms_aq.enqueue_options_t;
msgid RAW(16);
BEGIN
/* Bind the host variable message to the raw payload: */dbms_aq.enqueue(queue_name => 'raw_msg_queue',
message_properties => message_properties,
enqueue_options => enqueue_options,
payload => :message,
msgid => msgid);
END;
END-EXEC;
/* Commit work: */EXEC SQL COMMIT;
/
* Dequeue */ /* Embedded PLSQL call to the AQ dequeue procedure :*/
EXEC SQL EXECUTE
DECLARE
message_properties dbms_aq.message_properties_t;
dequeue_options dbms_aq.dequeue_options_t;
msgid RAW(16);
BEGIN
/
* Return the raw payload into the host variable 'message':*/dbms_aq.dequeue(queue_name => 'raw_msg_queue',
message_properties => message_properties,
dequeue_options => dequeue_options,
payload => :message,
msgid => msgid);
END;
END-EXEC;
/* Commit work: */
EXEC SQL COMMIT;
}
#ifndef SL_ORACLE #include <sl.h> #endif #ifndef OCI_ORACLE #include <oci.h> #endif int main() { OCIEnv *envhp; OCIServer *srvhp; OCIError *errhp; OCISvcCtx *svchp; dvoid *tmp; OCIType *mesg_tdo = (OCIType *) 0; char msg_text[100]; OCIRaw *mesg = (OCIRaw *)0; OCIRaw *deqmesg = (OCIRaw *)0; OCIInd ind = 0; dvoid *indptr = (dvoid *)&ind; int i; OCIInitialize((ub4) OCI_OBJECT, (dvoid *)0, (dvoid * (*)()) 0, (dvoid * (*)()) 0, (void (*)()) 0 ); OCIHandleAlloc( (dvoid *) NULL, (dvoid **) &envhp, (ub4) OCI_HTYPE_ENV, 52, (dvoid **) &tmp); OCIEnvInit( &envhp, (ub4) OCI_DEFAULT, 21, (dvoid **) &tmp ); OCIHandleAlloc( (dvoid *) envhp, (dvoid **) &errhp, (ub4) OCI_HTYPE_ERROR, 52, (dvoid **) &tmp); OCIHandleAlloc( (dvoid *) envhp, (dvoid **) &srvhp, (ub4) OCI_HTYPE_SERVER, 52, (dvoid **) &tmp); OCIServerAttach( srvhp, errhp, (text *) 0, (sb4) 0, (ub4) OCI_DEFAULT); OCIHandleAlloc( (dvoid *) envhp, (dvoid **) &svchp, (ub4) OCI_HTYPE_SVCCTX, 52, (dvoid **) &tmp); OCIAttrSet( (dvoid *) svchp, (ub4) OCI_HTYPE_SVCCTX, (dvoid *)srvhp, (ub4) 0, (ub4) OCI_ATTR_SERVER, (OCIError *) errhp); OCILogon(envhp, errhp, &svchp, "AQ", strlen("AQ"), "AQ", strlen("AQ"), 0, 0); /* obtain the TDO of the RAW data type */ OCITypeByName(envhp, errhp, svchp, (CONST text *)"SYS", strlen("SYS"), (CONST text *)"RAW", strlen("RAW"), (text *)0, 0, OCI_DURATION_SESSION, OCI_TYPEGET_ALL, &mesg_tdo); /* prepare the message payload */ strcpy(msg_text, "Enqueue to a RAW queue"); OCIRawAssignBytes(envhp, errhp, msg_text, strlen(msg_text), &mesg); /* enqueue the message into raw_msg_queue */ OCIAQEnq(svchp, errhp, (CONST text *)"raw_msg_queue", 0, 0, mesg_tdo, (dvoid **)&mesg, (dvoid **)&indptr, 0, 0); OCITransCommit(svchp, errhp, (ub4) 0); /* dequeue the same message into C variable deqmesg */ OCIAQDeq(svchp, errhp, (CONST text *)"raw_msg_queue", 0, 0, mesg_tdo, (dvoid **)&deqmesg, (dvoid **)&indptr, 0, 0); for (i = 0; i < OCIRawSize(envhp, deqmesg); i++) printf("%c", *(OCIRawPtr(envhp, deqmesg) + i)); OCITransCommit(svchp, errhp, (ub4) 0); }
When two messages are enqued with the same priority, the message which was enqued earlier will be dequeued first. However, if two messages are of different priorities, the message with the lower value (higher priority) will be dequeued first.
/* Enqueue two messages with priority 30 and 5: */ DECLARE enqueue_options dbms_aq.enqueue_options_t; message_properties dbms_aq.message_properties_t; message_handle RAW(16); message aq.message_type; BEGIN message := message_type('PRIORITY MESSAGE', enqued at priority 30.'); message_properties.priority := 30; dbms_aq.enqueue(queue_name => 'priority_msg_queue', enqueue_options => enqueue_options, message_properties => message_properties, payload => message, msgid => message_handle); message := message_type('PRIORITY MESSAGE', 'Enqueued at priority 5.'); message_properties.priority := 5; dbms_aq.enqueue(queue_name => 'priority_msg_queue', enqueue_options => enqueue_options, message_properties => message_properties, payload => message, msgid => message_handle); END; / /* Dequeue from priority queue: */ DECLARE dequeue_options dbms_aq.dequeue_options_t; message_properties dbms_aq.message_properties_t; message_handle RAW(16); message aq.message_type; BEGIN dbms_aq.dequeue(queue_name => 'priority_msg_queue', dequeue_options => dequeue_options, message_properties => message_properties, payload => message, msgid => message_handle); dbms_output.put_line ('Message: ' || message.subject || ' ... ' || message.text ); COMMIT; dbms_aq.dequeue(queue_name => 'priority_msg_queue', dequeue_options => dequeue_options, message_properties => message_properties, payload => message, msgid => message_handle); dbms_output.put_line ('Message: ' || message.subject || ' ... ' || message.text ); COMMIT; END; / On return, the second message with priority set to 5 will be retrieved before the message with priority set to 30 since priority takes precedence over enqueue time.
An application can preview messages in browse mode or locked mode without deleting the message. The message of interest can then be removed from the queue.
/* Enqueue 6 messages to msg_queue - GREEN, GREEN, YELLOW, VIOLET, BLUE, RED */ DECLARE enqueue_options dbms_aq.enqueue_options_t; message_properties dbms_aq.message_properties_t; message_handle RAW(16); message aq.message_type; BEGIN message := message_type('GREEN', 'GREEN enqueued to msg_queue first.'); dbms_aq.enqueue(queue_name => 'msg_queue', enqueue_options => enqueue_options, message_properties => message_properties, payload => message, msgid => message_handle); message := message_type('GREEN', 'GREEN also enqueued to msg_queue second.'); dbms_aq.enqueue(queue_name => 'msg_queue', enqueue_options => enqueue_options, message_properties => message_properties, payload => message, msgid => message_handle); message := message_type('YELLOW', 'YELLOW enqueued to msg_queue third.'); dbms_aq.enqueue(queue_name => 'msg_queue', enqueue_options => enqueue_options, message_properties => message_properties, payload => message, msgid => message_handle); dbms_output.put_line ('Message handle: ' || message_handle); message := message_type('VIOLET', 'VIOLET enqueued to msg_queue fourth.'); dbms_aq.enqueue(queue_name => 'msg_queue', enqueue_options => enqueue_options, message_properties => message_properties, payload => message, msgid => message_handle); message := message_type('BLUE', 'BLUE enqueued to msg_queue fifth.'); dbms_aq.enqueue(queue_name => 'msg_queue', enqueue_options => enqueue_options, message_properties => message_properties, payload => message, msgid => message_handle); message := message_type('RED', 'RED enqueued to msg_queue sixth.'); dbms_aq.enqueue(queue_name => 'msg_queue', enqueue_options => enqueue_options, message_properties => message_properties, payload => message, msgid => message_handle); COMMIT; END; / /* Dequeue in BROWSE mode until RED is found, and remove RED from queue: */ DECLARE dequeue_options dbms_aq.dequeue_options_t; message_properties dbms_aq.message_properties_t; message_handle RAW(16); message aq.message_type; BEGIN dequeue_options.dequeue_mode := dbms_aq.BROWSE; LOOP dbms_aq.dequeue(queue_name => 'msg_queue', dequeue_options => dequeue_options, message_properties => message_properties, payload => message, msgid => message_handle); dbms_output.put_line ('Message: ' || message.subject || ' ... ' || message.text ); EXIT WHEN message.subject = 'RED'; END LOOP; dequeue_options.dequeue_mode := dbms_aq.REMOVE; dequeue_options.msgid := message_handle; dbms_aq.dequeue(queue_name => 'msg_queue', dequeue_options => dequeue_options, message_properties => message_properties, payload => message, msgid => message_handle); dbms_output.put_line ('Message: ' || message.subject || ' ... ' || message.text ); COMMIT; END; / /* Dequeue in LOCKED mode until BLUE is found, and remove BLUE from queue: */ DECLARE dequeue_options dbms_aq.dequeue_options_t; message_properties dbms_aq.message_properties_t; message_handle RAW(16); message aq.message_type; BEGIN dequeue_options.dequeue_mode := dbms_aq.LOCKED; LOOP dbms_aq.dequeue(queue_name => 'msg_queue', dequeue_options => dequeue_options, message_properties => message_properties, payload => message, msgid => message_handle); dbms_output.put_line ('Message: ' || message.subject || ' ... ' || message.text ); EXIT WHEN message.subject = 'BLUE'; END LOOP; dequeue_options.dequeue_mode := dbms_aq.REMOVE; dequeue_options.msgid := message_handle; dbms_aq.dequeue(queue_name => 'msg_queue', dequeue_options => dequeue_options, message_properties => message_properties, payload => message, msgid => message_handle); dbms_output.put_line ('Message: ' || message.subject || ' ... ' || message.text ); COMMIT; END; /
An enqueue can specify the time before which a message cannot be retrieved by a dequeue call. To do this, the producer (i.e the agent enqueuing the message) can also specify the time when a message expires, at which time the message is can use the parameter "delay" when enqueuing the message. The producer can also specify the time when a message expires, at which time the message is moved to an exception queue.
/* Enqueue message for delayed availability: */ DECLARE enqueue_options dbms_aq.enqueue_options_t; message_properties dbms_aq.message_properties_t; message_handle RAW(16); message aq.message_type; BEGIN message := message_type('DELAYED', 'This message is delayed one week.'); message_properties.delay := 7*24*60*60; message_properties.expiration := 2*7*24*60*60; dbms_aq.enqueue(queue_name => 'msg_queue', enqueue_options => enqueue_options, message_properties => message_properties, payload => message, msgid => message_handle); COMMIT; END;
#include <stdio.h>
#include <string.h>
#include <sqlca.h>
#include <sql2oci.h>
/* The header file generated by processing object type 'aq.message_type': */#include "pceg.h"
void sql_error(msg)
char *msg;
{
EXEC SQL WHENEVER SQLERROR CONTINUE;
printf("%s\n", msg);
printf("\n% .800s \n", sqlca.sqlerrm.sqlerrmc);
EXEC SQL ROLLBACK WORK RELEASE;
exit(1);
}
main()
{
OCIEnv *oeh; /* OCI Env Handle */
OCIError *err; /* OCI Error Handle */
message_type *message = (message_type*)0; /* queue payload */
OCIRaw *msgid = (OCIRaw*)0; /* message id */
ub1 msgmem[16]=""; /* memory for msgid */
char user[60]="aq/AQ"; /* user login password */
char subject[30]; /* components of */
char txt[80]; /* message_type */
char correlation1[30]; /* message correlation */
char correlation2[30];
int status; /* code returned by the OCI calls */
/
* Dequeue by correlation and msgid *//*
Connect to the database: */
EXEC SQL CONNECT :user;
EXEC SQL WHENEVER SQLERROR DO sql_error("Oracle Error :");
/
* Allocate space in the object cache for the host variable: */EXEC SQL ALLOCATE :message;
/* Get the OCI Env handle: */if (SQLEnvGet(SQL_SINGLE_RCTX, &oeh) != OCI_SUCCESS)
{
printf(" error in SQLEnvGet \n");
exit(1);
}
/
* Get the OCI Error handle: */if (status = OCIHandleAlloc((dvoid *)oeh, (dvoid **)&err,
(ub4)OCI_HTYPE_ERROR, (ub4)0, (dvoid **)0))
{
printf(" error in OCIHandleAlloc %d \n", status);
exit(1);
}
/*
Assign memory for msgid: Memory needs to be allocated explicitly to OCIRaw*: */if (status=OCIRawAssignBytes(oeh, err, msgmem, 16, &msgid))
{
printf(" error in OCIRawAssignBytes %d \n", status);
exit(1);
}
/
* First enqueue */
strcpy(correlation1, "1st message");
strcpy(subject, "NORMAL ENQUEUE1");
strcpy(txt, "The Enqueue was done through PLSQL embedded in PROC");
/
* Initialize the components of message: */EXEC SQL OJECT SET SUBJECT, TEXT OF :message TO :subject, :txt;
/*
Embedded PLSQL call to the AQ enqueue procedure:*/
EXEC SQL EXECUTE
DECLARE
message_properties dbms_aq.message_properties_t;
enqueue_options dbms_aq.enqueue_options_t;
BEGIN
/
* Bind the host variable 'correlation1': to message correlation*/message_properties.correlation := :correlation1;
/
* Bind the host variable 'message' to payload and return message id into host variable 'msgid': */dbms_aq.enqueue(queue_name => 'msg_queue',
message_properties => message_properties,
enqueue_options => enqueue_options,
payload => :message,
msgid => :msgid);
END;
END-EXEC; /* Commit work: */ EXEC SQL COMMIT; printf("Enqueued Message \n"); printf("Subject :%s\n",subject); printf("Text :%s\n",txt); /* Second enqueue */ strcpy(correlation2, "2nd message"); strcpy(subject, "NORMAL ENQUEUE2"); strcpy(txt, "The Enqueue was done through PLSQL embedded in PROC"); /* Initialize the components of message: */ EXEC SQL OBJECT SET SUBJECT, TEXT OF :messsage TO :subject,:txt; /* Embedded PLSQL call to the AQ enqueue procedure: */ EXEC SQL EXECUTE DECLARE message_properties dbms_aq.message_properties_t; enqueue_options dbms_aq.enqueue_options_t; msgid RAW(16); BEGIN /* Bind the host variable 'correlation2': to message correlaiton */ message_properties.correlation := :correlation2; /* Bind the host variable 'message': to payload */ dbms_aq.enqueue(queue_name => 'msg_queue', message_properties => message_properties, enqueue_options => enqueue_options, payload => :message, msgid => msgid); END; END-EXEC; /* Commit work: */ EXEC SQL COMMIT; printf("Enqueued Message \n"); printf("Subject :%s\n",subject); printf("Text :%s\n",txt); /* First dequeue - by correlation */ EXEC SQL EXECUTE DECLARE message_properties dbms_aq.message_properties_t; dequeue_options dbms_aq.dequeue_options_t; msgid RAW(16); BEGIN /* Dequeue by correlation in host variable 'correlation2': */ dequeue_options.correlation := :correlation2; /* Return the payload into host variable 'message': */ dbms_aq.dequeue(queue_name => 'msg_queue', message_properties => message_properties, dequeue_options => dequeue_options, payload => :message, msgid => msgid); END; END-EXEC; /* Commit work : */ EXEC SQL COMMIT; /* Extract the values of the components of message: */ EXEC SQL OBJECT GET SUBJECT, TEXT FROM :message INTO :subject,:txt; printf("Dequeued Message \n"); printf("Subject :%s\n",subject); printf("Text :%s\n",txt); /* SECOND DEQUEUE - by MSGID */ EXEC SQL EXECUTE DECLARE message_properties dbms_aq.message_properties_t; dequeue_options dbms_aq.dequeue_options_t; msgid RAW(16); BEGIN /* Dequeue by msgid in host variable 'msgid': */ dequeue_options.msgid := :msgid; /* Return the payload into host variable 'message': */ dbms_aq.dequeue(queue_name => 'msg_queue', message_properties => message_properties, dequeue_options => dequeue_options, payload => :message, msgid => msgid); END; END-EXEC; /* Commit work: */ EXEC SQL COMMIT; }
#ifndef SL_ORACLE #include <sl.h> #endif #ifndef OCI_ORACLE #include <oci.h> #endif struct message { OCIString *subject; OCIString *data; }; typedef struct message message; struct null_message { OCIInd null_adt; OCIInd null_subject; OCIInd null_data; }; typedef struct null_message null_message; int main() { OCIEnv *envhp; OCIServer *srvhp; OCIError *errhp; OCISvcCtx *svchp; dvoid *tmp; OCIType *mesg_tdo = (OCIType *) 0; message msg; null_message nmsg; message *mesg = &msg; null_message *nmesg = &nmsg; message *deqmesg = (message *)0; null_message *ndeqmesg = (null_message *)0; OCIInitialize((ub4) OCI_OBJECT, (dvoid *)0, (dvoid * (*)()) 0, (dvoid * (*)()) 0, (void (*)()) 0 ); OCIHandleAlloc( (dvoid *) NULL, (dvoid **) &envhp, (ub4) OCI_HTYPE_ENV, 52, (dvoid **) &tmp); OCIEnvInit( &envhp, (ub4) OCI_DEFAULT, 21, (dvoid **) &tmp ); OCIHandleAlloc( (dvoid *) envhp, (dvoid **) &errhp, (ub4) OCI_HTYPE_ERROR, 52, (dvoid **) &tmp); OCIHandleAlloc( (dvoid *) envhp, (dvoid **) &srvhp, (ub4) OCI_HTYPE_SERVER, 52, (dvoid **) &tmp); OCIServerAttach( srvhp, errhp, (text *) 0, (sb4) 0, (ub4) OCI_DEFAULT); OCIHandleAlloc( (dvoid *) envhp, (dvoid **) &svchp, (ub4) OCI_HTYPE_SVCCTX, 52, (dvoid **) &tmp); OCIAttrSet( (dvoid *) svchp, (ub4) OCI_HTYPE_SVCCTX, (dvoid *)srvhp, (ub4) 0, (ub4) OCI_ATTR_SERVER, (OCIError *) errhp); OCILogon(envhp, errhp, &svchp, "AQ", strlen("AQ"), "AQ", strlen("AQ"), 0, 0); /* obtain TDO of message_type */ OCITypeByName(envhp, errhp, svchp, (CONST text *)"AQ", strlen("AQ"), (CONST text *)"MESSAGE_TYPE", strlen("MESSAGE_TYPE"), (text *)0, 0, OCI_DURATION_SESSION, OCI_TYPEGET_ALL, &mesg_tdo); /* prepare the message payload */ mesg->subject = (OCIString *)0; mesg->data = (OCIString *)0; OCIStringAssignText(envhp, errhp, (CONST text *)"NORMAL MESSAGE", strlen("NORMAL MESSAGE"), &mesg->subject); OCIStringAssignText(envhp, errhp, (CONST text *)"OCI ENQUEUE", strlen("OCI ENQUEUE"), &mesg->data); nmesg->null_adt = nmesg->null_subject = nmesg->null_data = OCI_IND_NOTNULL; /* enqueue into the msg_queue */ OCIAQEnq(svchp, errhp, (CONST text *)"msg_queue", 0, 0, mesg_tdo, (dvoid **)&mesg, (dvoid **)&nmesg, 0, 0); OCITransCommit(svchp, errhp, (ub4) 0); /* dequeue from the msg_queue */ OCIAQDeq(svchp, errhp, (CONST text *)"msg_queue", 0, 0, mesg_tdo, (dvoid **)&deqmesg, (dvoid **)&ndeqmesg, 0, 0); printf("Subject: %s\n", OCIStringPtr(envhp, deqmesg->subject)); printf("Text: %s\n", OCIStringPtr(envhp, deqmesg->data)); OCITransCommit(svchp, errhp, (ub4) 0); }
/* Create subscriber list: */ DECLARE subscriber sys.aq$_agent; /* Add subscribers RED and GREEN to the suscriber list: */ BEGIN subscriber := sys.aq$_agent('RED', NULL, NULL); dbms_aqadm.add_subscriber(queue_name => 'msg_queue_multiple', subscriber => subscriber); subscriber := sys.aq$_agent('GREEN', NULL, NULL); dbms_aqadm.add_subscriber(queue_name => 'msg_queue_multiple', subscriber => subscriber); END; / DECLARE enqueue_options dbms_aq.enqueue_options_t; message_properties dbms_aq.message_properties_t; recipients dbms_aq.aq$_recipient_list_t; message_handle RAW(16); message aq.message_type; /* Enqueue MESSAGE 1 for subscribers to the queue i.e. for RED and GREEN: */ BEGIN message := message_type('MESSAGE 1', 'This message is queued for queue subscribers.'); dbms_aq.enqueue(queue_name => 'msg_queue_multiple', enqueue_options => enqueue_options, message_properties => message_properties, payload => message, msgid => message_handle); /* Enqueue MESSAGE 2 for specified recipients i.e. for RED and BLUE.*/ message := message_type('MESSAGE 2', 'This message is queued for two recipients.'); recipients(1) := sys.aq$_agent('RED', NULL, NULL); recipients(2) := sys.aq$_agent('BLUE', NULL, NULL); message_properties.recipient_list := recipients; dbms_aq.enqueue(queue_name => 'msg_queue_multiple', enqueue_options => enqueue_options, message_properties => message_properties, payload => message, msgid => message_handle); COMMIT; END; /
Note that RED is both a subscriber to the queue, as well as being a specified recipient of MESSAGE 2. By contrast, GREEN is only a subscriber to those messages in the queue (in this case, MESSAGE) for which no recipients have been specified. BLUE, while not a subscriber to the queue, is nevertheless specified to receive MESSAGE 2.
/* Dequeue messages from msg_queue_multiple: */ DECLARE dequeue_options dbms_aq.dequeue_options_t; message_properties dbms_aq.message_properties_t; message_handle RAW(16); message aq.message_type; no_messages exception; pragma exception_init (no_messages, -25228); BEGIN dequeue_options.wait := dbms_aq.NO_WAIT; /* Consumer BLUE will get MESSAGE 2: */ dequeue_options.consumer_name := 'BLUE'; LOOP dbms_aq.dequeue(queue_name => 'msg_queue_multiple', dequeue_options => dequeue_options, message_properties => message_properties, payload => message, msgid => message_handle); dbms_output.put_line ('Message: ' || message.subject || ' ... ' || message.text ); END LOOP; EXCEPTION WHEN no_messages THEN dbms_output.put_line ('No more messages for BLUE'); COMMIT; END; BEGIN /* Consumer RED will get MESSAGE 1 and MESSAGE 2: */ dequeue_options.consumer_name := 'RED'; LOOP dbms_aq.dequeue(queue_name => 'msg_queue_multiple', dequeue_options => dequeue_options, message_properties => message_properties, payload => message, msgid => message_handle); dbms_output.put_line ('Message: ' || message.subject || ' ... ' || message.text ); END LOOP; EXCEPTION WHEN no_messages THEN dbms_output.put_line ('No more messages for RED'); COMMIT; END; BEGIN /* Consumer GREEN will get MESSAGE 1: */ dequeue_options.consumer_name := 'GREEN'; LOOP dbms_aq.dequeue(queue_name => 'msg_queue_multiple', dequeue_options => dequeue_options, message_properties => message_properties, payload => message, msgid => message_handle); dbms_output.put_line ('Message: ' || message.subject || ' ... ' || message.text ); END LOOP; EXCEPTION WHEN no_messages THEN dbms_output.put_line ('No more messages for GREEN'); COMMIT; END; /
#ifndef SL_ORACLE #include <sl.h> #endif #ifndef OCI_ORACLE #include <oci.h> #endif struct message { OCIString *subject; OCIString *data; }; typedef struct message message; struct null_message { OCIInd null_adt; OCIInd null_subject; OCIInd null_data; }; typedef struct null_message null_message; int main() { OCIEnv *envhp; OCIServer *srvhp; OCIError *errhp; OCISvcCtx *svchp; dvoid *tmp; OCIType *mesg_tdo = (OCIType *) 0; message msg; null_message nmsg; message *mesg = &msg; null_message *nmesg = &nmsg; message *deqmesg = (message *)0; null_message *ndeqmesg = (null_message *)0; OCIAQMsgProperties *msgprop = (OCIAQMsgProperties *)0; OCIAQAgent *agents[2]; OCIAQDeqOptions *deqopt = (OCIAQDeqOptions *)0; ub4 wait = OCI_DEQ_NO_WAIT; ub4 navigation = OCI_DEQ_FIRST_MSG; OCIInitialize((ub4) OCI_OBJECT, (dvoid *)0, (dvoid * (*)()) 0, (dvoid * (*)()) 0, (void (*)()) 0 ); OCIHandleAlloc( (dvoid *) NULL, (dvoid **) &envhp, (ub4) OCI_HTYPE_ENV, 52, (dvoid **) &tmp); OCIEnvInit( &envhp, (ub4) OCI_DEFAULT, 21, (dvoid **) &tmp ); OCIHandleAlloc( (dvoid *) envhp, (dvoid **) &errhp, (ub4) OCI_HTYPE_ERROR, 52, (dvoid **) &tmp); OCIHandleAlloc( (dvoid *) envhp, (dvoid **) &srvhp, (ub4) OCI_HTYPE_SERVER, 52, (dvoid **) &tmp); OCIServerAttach( srvhp, errhp, (text *) 0, (sb4) 0, (ub4) OCI_DEFAULT); OCIHandleAlloc( (dvoid *) envhp, (dvoid **) &svchp, (ub4) OCI_HTYPE_SVCCTX, 52, (dvoid **) &tmp); OCIAttrSet( (dvoid *) svchp, (ub4) OCI_HTYPE_SVCCTX, (dvoid *)srvhp, (ub4) 0, (ub4) OCI_ATTR_SERVER, (OCIError *) errhp); OCILogon(envhp, errhp, &svchp, "AQ", strlen("AQ"), "AQ", strlen("AQ"), 0, 0); /* obtain TDO of message_type */ OCITypeByName(envhp, errhp, svchp, (CONST text *)"AQ", strlen("AQ"), (CONST text *)"MESSAGE_TYPE", strlen("MESSAGE_TYPE"), (text *)0, 0, OCI_DURATION_SESSION, OCI_TYPEGET_ALL, &mesg_tdo); /* prepare the message payload */ mesg->subject = (OCIString *)0; mesg->data = (OCIString *)0; OCIStringAssignText(envhp, errhp, (CONST text *)"MESSAGE 1", strlen("MESSAGE 1"), &mesg->subject); OCIStringAssignText(envhp, errhp, (CONST text *)"mesg for queue subscribers", strlen("mesg for queue subscribers"), &mesg->data); nmesg->null_adt = nmesg->null_subject = nmesg->null_data = OCI_IND_NOTNULL; /* enqueue MESSAGE 1 for subscribers to the queue i.e. for RED and GREEN */ OCIAQEnq(svchp, errhp, (CONST text *)"msg_queue_multiple", 0, 0, mesg_tdo, (dvoid **)&mesg, (dvoid **)&nmesg, 0, 0); /* enqueue MESSAGE 2 for specified recipients i.e. for RED and BLUE */ /* prepare message payload */ OCIStringAssignText(envhp, errhp, (CONST text *)"MESSAGE 2", strlen("MESSAGE 2"), &mesg->subject); OCIStringAssignText(envhp, errhp, (CONST text *)"mesg for two recipients", strlen("mesg for two recipients"), &mesg->data); /* allocate AQ message properties and agent descriptors */ OCIDescriptorAlloc(envhp, (dvoid **)&msgprop, OCI_DTYPE_AQMSG_PROPERTIES, 0, (dvoid **)0); OCIDescriptorAlloc(envhp, (dvoid **)&agents[0], OCI_DTYPE_AQAGENT, 0, (dvoid **)0); OCIDescriptorAlloc(envhp, (dvoid **)&agents[1], OCI_DTYPE_AQAGENT, 0, (dvoid **)0); /* prepare the recipient list, RED and BLUE */ OCIAttrSet(agents[0], OCI_DTYPE_AQAGENT, "RED", strlen("RED"), OCI_ATTR_AGENT_NAME, errhp); OCIAttrSet(agents[1], OCI_DTYPE_AQAGENT, "BLUE", strlen("BLUE"), OCI_ATTR_AGENT_NAME, errhp); OCIAttrSet(msgprop, OCI_DTYPE_AQMSG_PROPERTIES, (dvoid *)agents, 2, OCI_ATTR_RECIPIENT_LIST, errhp); OCIAQEnq(svchp, errhp, (CONST text *)"msg_queue_multiple", 0, msgprop, mesg_tdo, (dvoid **)&mesg, (dvoid **)&nmesg, 0, 0); OCITransCommit(svchp, errhp, (ub4) 0); /* now dequeue the messages using different consumer names */ /* allocate dequeue options descriptor to set the dequeue options */ OCIDescriptorAlloc(envhp, (dvoid **)&deqopt, OCI_DTYPE_AQDEQ_OPTIONS, 0, (dvoid **)0); /* set wait parameter to NO_WAIT so that the dequeue returns immediately */ OCIAttrSet(deqopt, OCI_DTYPE_AQDEQ_OPTIONS, (dvoid *)&wait, 0, OCI_ATTR_WAIT, errhp); /* set navigation to FIRST_MESSAGE so that the dequeue resets the position */ /* after a new consumer_name is set in the dequeue options */ OCIAttrSet(deqopt, OCI_DTYPE_AQDEQ_OPTIONS, (dvoid *)&navigation, 0, OCI_ATTR_NAVIGATION, errhp); /* dequeue from the msg_queue_multiple as consumer BLUE */ OCIAttrSet(deqopt, OCI_DTYPE_AQDEQ_OPTIONS, (dvoid *)"BLUE", strlen("BLUE"), OCI_ATTR_CONSUMER_NAME, errhp); while (OCIAQDeq(svchp, errhp, (CONST text *)"msg_queue_multiple", deqopt, 0, mesg_tdo, (dvoid **)&deqmesg, (dvoid **)&ndeqmesg, 0, 0) == OCI_SUCCESS) { printf("Subject: %s\n", OCIStringPtr(envhp, deqmesg->subject)); printf("Text: %s\n", OCIStringPtr(envhp, deqmesg->data)); } OCITransCommit(svchp, errhp, (ub4) 0); /* dequeue from the msg_queue_multiple as consumer RED */ OCIAttrSet(deqopt, OCI_DTYPE_AQDEQ_OPTIONS, (dvoid *)"RED", strlen("RED"), OCI_ATTR_CONSUMER_NAME, errhp); while (OCIAQDeq(svchp, errhp, (CONST text *)"msg_queue_multiple", deqopt, 0, mesg_tdo, (dvoid **)&deqmesg, (dvoid **)&ndeqmesg, 0, 0) == OCI_SUCCESS) { printf("Subject: %s\n", OCIStringPtr(envhp, deqmesg->subject)); printf("Text: %s\n", OCIStringPtr(envhp, deqmesg->data)); } OCITransCommit(svchp, errhp, (ub4) 0); /* dequeue from the msg_queue_multiple as consumer GREEN */ OCIAttrSet(deqopt, OCI_DTYPE_AQDEQ_OPTIONS,(dvoid *)"GREEN",strlen("GREEN"), OCI_ATTR_CONSUMER_NAME, errhp); while (OCIAQDeq(svchp, errhp, (CONST text *)"msg_queue_multiple", deqopt, 0, mesg_tdo, (dvoid **)&deqmesg, (dvoid **)&ndeqmesg, 0, 0) == OCI_SUCCESS) { printf("Subject: %s\n", OCIStringPtr(envhp, deqmesg->subject)); printf("Text: %s\n", OCIStringPtr(envhp, deqmesg->data)); } OCITransCommit(svchp, errhp, (ub4) 0); }
/* Create subscriber list: */ DECLARE subscriber sys.aq$_agent; /* Add subscribers RED and GREEN with different addresses to the suscriber list: */ BEGIN /* Add subscriber RED that will dequeue messages from another_msg_queue queue in the same datatbase */ subscriber := sys.aq$_agent('RED', 'another_msg_queue', NULL); dbms_aqadm.add_subscriber(queue_name => 'msg_queue_multiple', subscriber => subscriber); /* Schedule propagation from msg_queue_multiple to other queues in the same database: */ dbms_aqadm.schedule_propagation(queue_name => 'msg_queue_multiple'); /* Add subscriber GREEN that will dequeue messages from the msg_queue queue in another database reached by the database link another_db.world */ subscriber := sys.aq$_agent('GREEN', 'msg_queue@another_db.world', NULL); dbms_aqadm.add_subscriber(queue_name => 'msg_queue_multiple', subscriber => subscriber); /* Schedule propagation from msg_queue_multiple to other queues in the database "another_database": */ BEGIN dbms_aqadm.schedule_propagation(queue_name => 'msg_queue_multiple', destination => 'another_db.world'); END; / DECLARE enqueue_options dbms_aq.enqueue_options_t; message_properties dbms_aq.message_properties_t; recipients dbms_aq.aq$_recipient_list_t; message_handle RAW(16); message aq.message_type; /* Enqueue MESSAGE 1 for subscribers to the queue i.e. for RED at address another_msg_queue and GREEN at address msg_queue@another_db.world: */ BEGIN message := message_type('MESSAGE 1', 'This message is queued for queue subscribers.'); dbms_aq.enqueue(queue_name => 'msg_queue_multiple', enqueue_options => enqueue_options, message_properties => message_properties, payload => message, msgid => message_handle); /* Enqueue MESSAGE 2 for specified recipients i.e. for RED at address another_msg_queue and BLUE.*/ message := message_type('MESSAGE 2', 'This message is queued for two recipients.'); recipients(1) := sys.aq$_agent('RED', 'another_msg_queue', NULL); recipients(2) := sys.aq$_agent('BLUE', NULL, NULL); message_properties.recipient_list := recipients; dbms_aq.enqueue(queue_name => 'msg_queue_multiple', enqueue_options => enqueue_options, message_properties => message_properties, payload => message, msgid => message_handle); COMMIT; END; /
/* unschedule propagation from msg_queue_multiple to the destination another_db.world */ execute dbms_aqadm.unschedule_propagation(queue_name => 'msg_queue_multiple', destination => 'another_db.world');
CONNECT aq/aq EXECUTE dbms_aqadm.create_queue_table ( queue_table => 'aq.msggroup', queue_payload_type => 'aq.message_type', message_grouping => dbms_aqadm.TRANSACTIONAL); EXECUTE dbms_aqadm.create_queue( queue_name => 'msggroup_queue', queue_table => 'aq.msggroup'); EXECUTE dbms_aqadm.start_queue(queue_name => 'msggroup_queue'); /* Enqueue three messages in each transaction */ DECLARE enqueue_options dbms_aq.enqueue_options_t; message_properties dbms_aq.message_properties_t; message_handle RAW(16); message aq.message_type; BEGIN /* loop through three times, committing after every iteration */ FOR txnno in 1..3 LOOP /* loop through three times, enqueuing each iteration */ FOR mesgno in 1..3 LOOP message := message_type('GROUP#' || txnno, 'Message#' || mesgno || ' in group' || txnno); dbms_aq.enqueue(queue_name => 'msggroup_queue', enqueue_options => enqueue_options, message_properties => message_properties, payload => message, msgid => message_handle); END LOOP; /* commit the transaction */ COMMIT; END LOOP; END; / /* Now dequeue the messages as groups */ DECLARE dequeue_options dbms_aq.dequeue_options_t; message_properties dbms_aq.message_properties_t; message_handle RAW(16); message aq.message_type; no_messages exception; end_of_group exception; pragma exception_init (no_messages, -25228); pragma exception_init (end_of_group, -25235); BEGIN dequeue_options.wait := DBMS_AQ.NO_WAIT; dequeue_options.navigation := DBMS_AQ.FIRST_MESSAGE; LOOP BEGIN dbms_aq.dequeue(queue_name => 'msggroup_queue', dequeue_options => dequeue_options, message_properties => message_properties, payload => message, msgid => message_handle); dbms_output.put_line ('Message: ' || message.subject || ' ... ' || message.text ); dequeue_options.navigation := DBMS_AQ.NEXT_MESSAGE; EXCEPTION WHEN end_of_group THEN dbms_output.put_line ('Finished processing a group of messages'); COMMIT; dequeue_options.navigation := DBMS_AQ.NEXT_TRANSACTION; END; END LOOP; EXCEPTION WHEN no_messages THEN dbms_output.put_line ('No more messages'); END; /
/* Cleans up all objects related to the object type: */ CONNECT aq/AQ; EXECUTE dbms_aqadm.stop_queue ( queue_name => 'msg_queue'); EXECUTE dbms_aqadm.drop_queue ( queue_name => 'msg_queue'); EXECUTE dbms_aqadm.drop_queue_table ( queue_table => 'aq.msg'); /* Cleans up all objects related to the RAW type: */ EXECUTE dbms_aqadm.stop_queue ( queue_name => 'raw_msg_queue'); EXECUTE dbms_aqadm.drop_queue ( queue_name => 'raw_msg_queue'); EXECUTE dbms_aqadm.drop_queue_table ( queue_table => 'aq.raw_msg'); /* Cleans up all objects related to the priority queue: */ EXECUTE dbms_aqadm.stop_queue ( queue_name => 'priority_msg_queue'); EXECUTE dbms_aqadm.drop_queue ( queue_name => 'priority_msg_queue'); EXECUTE dbms_aqadm.drop_queue_table ( queue_table => 'aq.priority_msg'); /* Cleans up all objects related to the multiple-consumer queue: */ EXECUTE dbms_aqadm.stop_queue ( queue_name => 'msg_queue_multiple'); EXECUTE dbms_aqadm.drop_queue ( queue_name => 'msg_queue_multiple'); EXECUTE dbms_aqadm.drop_queue_table ( queue_table => 'aq.msg_multiple'); drop type aq.message_type;
CONNECT sys/change_on_install; drop user aq;
This section contains a detailed description of the technical specifications:
- Init ora Parameter
- Data Structures
- Agent
- Message Properties
- Queue Options
- Operational Interface
- Administrative Interface
- Administration Topics
- Data Objects
A parameter called AQ_TM_PROCESSES
should be specified in the init
.ora
PARAMETER
file if you want to perform time monitoring on queue messages. This will be used for messages which have delay and expiration properties specified. This parameter can be set in a range from 0 to 10. Setting it to any other number will result in an error. If this parameter is set to 1, one queue monitor process will be created as a background process to monitor the messages. If the parameter is not specified, or is set to 0, the queue monitor process is not created. The administrative interfaces to start and stop the queue monitor are only valid if the queue monitor process is started as part of instance startup by specifying this parameter.
Propagation is handled by job queue (SNP) processes. The number of job queue processes started in an instance is controlled by the init.ora
parameter JOB_QUEUE_PROCESSES
. The default value of this parameter is 0. In order for message propagation to take place, this parameter must be set to at least 1. The DBA can set it to higher values if there are many queues from which the messages have to be propagated, or if there are many destinations to which the messages have to be propagated, or if there are other jobs in the job queue.
The COMPATIBLE init.ora
parameter must be set to 8.0.4 in order to use the AQ propagation feature. Specifically, the COMPATIBLE
parameter will be checked under the following three conditions:
sys.aq$_agent
) address field is specified in the DBMS_AQADM.ADD_SUBSCRIBER
command.
sys.aq$_agent
) address field is specified in the recipient_list
of dbms_aq.message_properties_t
.
DBMS_AQADM.SCHEDULE_PROPAGATION
command is used.
Users can downgrade to 8.0.3 after using the 8.0.4 features by using
ALTER DATABASE RESET COMPATIBILITY
Users will not be allowed to restart the database in 8.0.3 compatible mode under the following conditions:
DBMS_AQADM.UNSCHEDULE_PROPAGATION
command to remove the schedules.
sys.aq$_agent
), in which case you may remove remote subscribers by means of the DBMS_AQADM.REMOVE_SUBSCRIBER
command.
The following data structures are used in the operational and administrative interfaces.
Naming of database objects. This naming convention applies to queues, queue tables and object types.
object_name := VARCHAR2 object_name := [<schema_name>.]<name>
Names for objects are specified by an optional schema name and a name. If the schema name is not specified then the current schema is assumed. The schema name and the name can each be up to 30 bytes long. However, queue names and queue table names can be a maximum of 24 bytes.
type_name := VARCHAR2 type_name := <object_type> | "RAW"
Usage:
TYPE sys.aq$_agent IS OBJECT (name VARCHAR2(30), address VARCHAR2(1024), protocol NUMBER)
The Message Properties describe the information that is used by AQ to manage individual messages. These are set at enqueue time and their values are returned at dequeue time.
TYPE message_properties_t IS RECORD (priority BINARY_INTEGER default 1, delay BINARY_INTEGER default NO_DELAY, expiration BINARY_INTEGER default NEVER, correlation VARCHAR2(128) default NULL, attempts BINARY_INTEGER, recipient_list aq$_recipient_list_t, exception_queue VARCHAR2(51) default NULL, enqueue_time DATE, state BINARY_INTEGER)TYPE aq$_recipient_list_t IS TABLE OF sys.aq$_agentINDEX BY BINARY_INTEGER
:
TYPE enqueue_options_t IS RECORD (visibility BINARY_INTEGER default ON_COMMIT, relative_msgid RAW(16) default NULL, sequence_deviation BINARY_INTEGER default NULL)
TYPE dequeue_options_t IS RECORD (consumer_name VARCHAR2(30) default NULL, dequeue_mode BINARY_INTEGER default REMOVE, navigation BINARY_INTEGER default NEXT_MESSAGE, visibility BINARY_INTEGER default ON_COMMIT, wait BINARY_INTEGER default FOREVER msgid RAW(16) default NULL, correlation VARCHAR2(128) default NULL)
Usage
The following interface calls are available to enqueue and dequeue messages from queues.
Adds a message to the specified queue. In the simplest case, if the user wants to enqueue a message, without any other parameters, only the queue name and the payload have to be specified.
DBMS_AQ.ENQUEUE (queue_name IN VARCHAR2, enqueue_options IN enqueue_options_t, message_properties IN message_properties_t, payload IN "<type_name>", msgid OUT RAW)
The sequence_deviation parameter in enqueue_options can be used to change the order of processing between two messages. The identity of the other message, if any, is specified by the enqueue_options parameter relative_msgid. The relationship is identified by the sequence_deviation parameter.
Specifying sequence_deviation for a message introduces some restrictions for the delay and priority values that can be specified for this message. The delay of this message has to be less than or equal to the delay of the message before which this message is to be enqueued. The priority of this message has to be greater than or equal to the priority of the message before which this message is to be enqueued.
Dequeues a message from the specified queue.
DBMS_AQ.DEQUEUE (queue_name IN VARCHAR2, dequeue_options IN dequeue_options_t, message_properties OUT message_properties_t, payload OUT "<type_name>", msgid OUT raw)
The search criteria for messages to be dequeued is determined by the consumer_name, msgid and correlation parameters in the dequeue_options. Msgid uniquely identifies the message to be dequeued. Correlation identifiers are application-defined identifiers that are not interpreted by AQ.
Only messages in the READY
state are dequeued unless a msgid is specified.
The dequeue order is determined by the values specified at the time the queue table is created unless overridden by the msgid and correlation id in dequeue_options.
The database consistent read mechanism is applicable for queue operations. For example, a BROWSE
call may not see a message that is enqueued after the beginning of the browsing transaction.
The default NAVIGATION
parameter during dequeue is NEXT_MESSAGE
. This means that subsequent dequeues will retrieve the messages from the queue based on the snapshot obtained in the first dequeue. In particular, a message that is enqueued after the first dequeue command will be processed only after processing all the remaining messages in the queue. This is usually sufficient when all the messages have already been enqueued into the queue, or when the queue does not have a priority-based ordering. However, applications must use the FIRST_MESSAGE n
avigation option when the first message in the queue needs to be processed by every dequeue command. This usually becomes necessary when a higher priority message arrives in the queue while messages already-enqueued are being processed.
Messages enqueued in the same transaction into a queue that has been enabled for message grouping will form a group. If only one message is enqueued in the transaction, this will effectively form a group of one message. There is no upper limit to the number of messages that can be grouped in a single transaction.
In queues that have not been enabled for message grouping, a dequeue in LOCKED
or REMOVE
mode locks only a single message. By contrast, a dequeue operation that seeks to dequeue a message that is part of a group will lock the entire group. This is useful when all the messages in a group need to be processed as an atomic unit.
When all the messages in a group have been dequeued, the dequeue returns an error indicating that all messages in the group have been processed. The application can then use the NEXT_TRANSACTION
to start dequeuing messages from the next available group. In the event that no groups are available, the dequeue will time-out after the specified WAIT
period.
When using enumerated constants such as BROWSE
, LOCKED
, REMOVE
, the PL/SQL constants need to be specified with the scope of the packages defining it. All types associated with the operational interfaces have to be prepended with dbms_aq
. For example:
dbms_aq.BROWSE
Configuration information can be managed through procedures in the DBMS_AQADM
package. Because incorrect usage of the administration interface can have substantial performance impact on the database system, the administration interface should be treated as privileged commands, and only the designated queue administrator or privileged users should be granted access to the administration package. Initially, only SYS
has the execution privilege for the procedures in DBMS_AQADM
and DBMS_AQ
.
Access to AQ operations are granted to users through roles. These roles provide execution privileges on the AQ procedures. Currently, we do not support fine grained access control at the database object level. This implies that a user with the AQ_USER_ROLE
can enqueue and dequeue to any queue in the system.
AQ_ADMINISTRATOR_ROLE
grants execute privileges to procedures in the DBMS_AQADM
and DBMS_AQ
packages. These include all the administrative and operational interfaces. The user 'SYS
' must grant the AQ_ADMINISTRATOR_ROLE to the AQ administrator.
AQ_USER_ROLE
grants execute privileges to procedures in the DBMS_AQ
packages. These include all the operational interfaces. The AQ administrator must grant the AQ_USER_ROLE
to AQ users.
The procedure grant_type_access must first be executed by the user 'SYS
' to grant access for AQ object types to the AQ administrator. The AQ administrator can then execute this procedure to grant access for AQ object types to other AQ users. The procedure needs to be executed if the user wishes to perform any administrative operation involving a multiple consumer queue. These include CREATE_QUEUE_TABLE
, CREATE_QUEUE
, ADD_SUBSCRIBER
and REMOVE_SUBSCRIBER
.
PROCEDURE grant_type_access (user_name IN VARCHAR2);
If you wish to call DBMS_AQ
from a PL/SQL function or procedure, you will need to have been explicitly granted the EXECUTE
privilege. You cannot inherit this right from either the AQ_USER_ROLE
or the AQ_ADMINISTRATOR_ROLE
.
GRANT EXECUTE ON DBMS_AQ TO <user>;
CONNECT sys/change_on_install GRANT AQ_ADMINISTRATOR_ROLE to scott with admin option; execute dbms_aqadm.grant_type_access('scott');
CONNECT scott/tiger execute dbms_aqadm.grant_type_access('jones');
Create a queue table for messages of a pre-defined type. The sort keys for dequeue ordering, if any, need to be defined at table creation time. The following objects are created at this time:
aq$_<queue_table_name>_e.
aq$<queue_table_name>.
aq$_<queue_table_name>_t
.
aq$_<queue_table_name>_i.
DBMS_AQADM.CREATE_QUEUE_TABLE (queue_table IN VARCHAR2, queue_payload_type IN VARCHAR2, storage_clause IN VARCHAR2 default NULL, sort_list IN VARCHAR2 default NULL, multiple_consumers IN BOOLEAN default FALSE, message_grouping IN BINARY_INTEGER default NONE, comment IN VARCHAR2 default NULL, auto_commit IN BOOLEAN default TRUE)
Create a queue in the specified queue table. All queue names must be unique within a schema. Once a queue is created with CREATE_QUEUE
, it can be enabled by calling START_QUEUE
. By default, the queue is created with both enqueue and dequeue disabled.
DBMS_AQADM.CREATE_QUEUE (queue_name IN VARCHAR2, queue_table IN VARCHAR2, queue_type IN BINARY_INTEGER default NORMAL_QUEUE, max_retries IN NUMBER default 0, retry_delay IN NUMBER default 0, retention_time IN NUMBER default 0, dependency_tracking IN BOOLEAN default FALSE, comment IN VARCHAR2 default NULL, auto_commit IN BOOLEAN default TRUE)
Drop an existing queue table. All the queues in a queue table have to be stopped and dropped before the queue table can be dropped.
DBMS_AQADM.DROP_QUEUE_TABLE (queue_table IN VARCHAR2, force IN BOOLEAN default FALSE, auto_commit IN BOOLEAN default TRUE)
Drops an existing queue. DROP_QUEUE is not allowed unless STOP_QUEUE has been called to disable the queue for both enqueuing and dequeuing. All the queue data is deleted as part of the drop operation.
DBMS_AQADM.DROP_QUEUE (queue_name IN VARCHAR2, auto_commit IN BOOLEAN default TRUE)
Alter existing properties of a queue. Only max_retries, retry_delay, and retention_time can be altered.
DBMS_AQADM.ALTER_QUEUE (queue_name IN VARCHAR2, max_retries IN NUMBER default NULL, retry_delay IN NUMBER default NULL, retention_time IN NUMBER default NULL, auto_commit IN BOOLEAN default TRUE)
Table 11-13 DBMS_AQADM.ALTER_QUEUE
Enables the specified queue for enqueuing and/or dequeueing. After creating a queue the administrator must use START_QUEUE
to enable the queue. The default is to enable it for both ENQUEUE
and DEQUEUE
. Only dequeue operations are allowed on an exception queue. This operation takes effect when the call completes and does not have any transactional characteristics.
DBMS_AQADM.START_QUEUE (queue_name IN VARCHAR2, enqueue IN BOOLEAN default TRUE, dequeue IN BOOLEAN default TRUE)
Disables enqueuing and/or dequeuing on the specified queue. By default, it disables both ENQUEUE
s or DEQUEUE
s. A queue cannot be stopped if there are outstanding transactions against the queue. This operation takes effect when the call completes and does not have any transactional characteristics.
DBMS_AQADM.STOP_QUEUE (queue_name IN VARCHAR2, enqueue IN BOOLEAN default TRUE, dequeue IN BOOLEAN default TRUE, wait IN BOOLEAN default TRUE)
Add a default subscriber to a queue. A program can enqueue messages to a specific list of recipients or to the default list of subscribers. This operation will only succeed on queues that allow multiple consumers. This operation takes effect immediately and the containing transaction is committed. Enqueue requests that are executed after the completion of this call will reflect the new behavior. The user must have been granted type access by executing the grant_type_access procedure.
DBMS_AQADM.ADD_SUBSCRIBER(queue_name IN VARCHAR2, subscriber IN sys.aq$_agent)
Parameter | Description |
---|---|
|
Specifies the name of the queue. |
|
See definition in section titled `Agent'. |
Remove a default subscriber from a queue. This operation takes effect immediately and the containing transaction is committed. All references to the subscriber in existing messages are removed as part of the operation. The user must have been granted type access by executing the grant_type_access procedure.
DBMS_AQADM.REMOVE_SUBSCRIBER( queue_name IN VARCHAR2, subscriber IN sys.aq$_agent)
Parameter | Description |
---|---|
|
Specifies the name of the queue. |
|
See definition in section titled `Agent'. |
Schedule propagation of messages from a queue to a destination identified by a specific dblink. Messages may also be propagated to other queues in the same database by specifying a NULL
destination. If a message has multiple recipients at the same destination in either the same or different queues the message will be propagated to all of them at the same time.
DBMS_AQADM.SCHEDULE_PROPAGATION(src_queue_name IN VARCHAR2, destination IN VARCHAR2 default NULL start_time IN DATE default SYSDATE, duration IN NUMBER default NULL, next_time IN VARCHAR2 default NULL, latency IN NUMBER default 60)
Unscheduled previously scheduled propagation of messages from a queue to a destination identified by a specific dblink
.
DBMS_AQADM.UNSCHEDULE_PROPAGATION(src_queue_name IN VARCHAR2, destination IN VARCHAR2 default NULL)
Verify that the source and destination queues have identical types. The result of the verification is stored in sys.aq$_message_types tables
, overwriting all previous output of this command.
DBMS_AQADM.SCHEDULE_PROPAGATION(src_queue_name IN VARCHAR2, dest_queue_name IN VARCHAR2, destination IN VARCHAR2 default NULL rc OUT BINARY_INTEGER)
When using enumerated constants such as BROWSE
, LOCKED
, REMOVE
, the symbol needs to be specified with the scope of the packages defining it. All types associated with the administrative interfaces have to be prepended with dbms_aqadm. For example:
dbms_aqadm.NORMAL_QUEUE
Parameter | Options |
---|---|
|
|
|
|
|
|
This is a view of the queue table in which message data is stored. This view is automatically created with each queue table and is called aq$<queue_table_name>.
This view should be used for querying the queue data. The dequeue history data (time, user identification and transaction identification) is only valid for single consumer queues. For dequeue history of messages in a multiple consumer queue please refer to a following section.
The administrator can use any SQL statement or SQL tool to analyze and review the content of a queue or queue table. SQL provides full access to the message metadata and/or payload. Use ENQ_TXN_ID
and DEQ_TXN_ID
to correlate transactions. If the ENQ_TXN_ID
of message m2 is the same as the DEQ_TXN_ID
of m1, m2 is created in the transaction that consumed m1. (You may use CONNECT
BY
in your SQL statements to identify related messages). Remove retained messages that are not automatically removed by AQ. Do not update or modify messages since this may destroy the consistency of the queue metadata. Before you use SQL to correct any error in AQ, please contact the Oracle service representative.
This view describes the names and types of all queue tables created in the database.
This view is the same as DBA_QUEUES_TABLES
with the exception that it only shows queue tables in the user's schema. It does not contain a column for OWNER
.
Users can specify operational characteristics for individual queues. DBA_QUEUES
contains the view which contains relevant information for every queue in a database.
This view is the same as DBA_QUEUES
with the exception that it only shows queues in the user's schema. It does not contain a column for OWNER.
To get a list of subscribers for a queue.
DBMS_AQADM.QUEUE_SUBSCRIBERS(queue_name IN VARCHAR2)RETURN aq$_subscriber_list_t
The function returns a PL/SQL table of aq$_agent. This can be used to get the list of all subscribers for a queue.
Example:
DECLAREsubs dbms_aqadm.aq$_subscriber_list_t; nsubs BINARY_INTEGER; i BINARY_INTEGER;BEGINsubs := dbms_aqadm.queue_subscribers('Q1DEF'); nsubs := subs.COUNT; FOR i IN 0..nsubs-1 LOOPEND;dbms_output.put_line(subs(i).name);END LOOP;
/
The queue table view provides the dequeue history for single consumer queue messages. To query the list of recipients or the dequeue history of a message in a multiple-consumer queue you need to execute a SQL
query on the queue table for the message of interest.
For example, to view the dequeue history of the message with msgid
`105E7A2EBFF11348E03400400B40F149' in queue table sys.queue_tab the following query must be executed. The query will return one row per consumer of the message.
SELECT consumer, transaction_id, deq_time, deq_user FROM THE(select cast(history as sys.aq$_dequeue_history_t)FROM sys.queue_tab WHERE msgid='105E7A2EBFF11348E03400400B40F149');
The error messages for AQ are reported in two ranges:
Queues are stored in database tables. The performance characteristics of queue operations are very similar to the underlying database operations.
To understand the performance characteristics of queues it is important to understand the tables and index layout for AQ objects.
Creating a queue table creates a database table with approximately 25 columns. These columns store the AQ meta data and the user defined payload. The payload can be of an object type or RAW
. The AQ meta data contains object types and scaler types. A view and two indexes are created on the queue table. The view allows users to query the message data. The indexes are used to accelerate access to message data. Please refer to the create queue table command for a detailed description of the objects created.
The code path of an enqueue operation is comparable to an insert into a multi-column table with two indexes. The code path of a dequeue operation is comparable to a select and delete operation on a similar table. These operations are performed using PL/SQL functions.
Oracle Parallel Server (OPS) can be used to ensure highly available access to queue data. Queues are implemented using database tables. The tail and the head of a queue can be extreme hot spots. Since OPS does not scale well in the presence of hot spots it is recommended to limit normal access to a queue from one instance only. In case of an instance failure messages managed by the failed instance can be processed immediately by one of the surviving instances.
Queue operation scalability is similar to the underlying database operation scalability. If a dequeue operation with wait option is issued in a Multi-Threaded Server (MTS) environment the shared server process will be dedicated to the dequeue operation for the duration of the call including the wait time. The presence of many such processes could cause severe performance and availability problems and could result in deadlocking the shared server processes. For this reason it is recommended that dequeue requests with wait option be only issued via dedicated server processes. This restriction is not enforced.
In setting the number of JOB_QUEUE_PROCESSES,
the DBA should aware that this need is determined by the number of queues from which the messages have to be propagated and the number of destinations (rather than queues) to which messages have to be propagated.
The standard database reliability and recoverability characteristics apply to queue data.
Enterprise manager supports GUIs for some of the administrative functions listed in the administrative interfaces section.
These include:
Queues are implemented on tables. The import/export of queues constitutes the import/export of the underlying queue tables and related dictionary tables. Import and export of queues can only be done at queue table granularity.
When a queue table is exported, both the table definition information and the queue data are exported. When a queue table is imported, export action procedures will maintain the queue dictionary. Because the queue table data is also exported, the user is responsible for maintaining application-level data integrity when queue table data are being transported.
Importing queue data into a queue table with existing data is not recommended. During a table mode import, if the queue table already exists at the import site the old queue table definition, and the old queue definition will be dropped and recreated. Hence, queue table and queue definitions prior to the import will be lost.
For every queue table that supports multiple recipients, there is a index-organized table (IOT) that contains important queue metadata. This metadata is essential to the operations of the queue, so the user must export and import this IOT as well as the queue table for the queues in this table to work after import. When the schema containing the queue table is exported, the IOT is also automatically exported. The behavior is similar at import time. Because the metadata table contains rowids of some rows in the queue table, import will issue a note about the rowids being obsolete when importing the metadata table. This message can be ignored, as the queueing system will automatically correct the obsolete rowids as a part of the import process. However, if another problem is encountered while doing the import (such as running out of rollback segment space), the problem should be corrected and the import should be rerun.
This section describes some troubleshooting tips to diagnose problems with message propagation.
AQ updates the message history when a message has been successfully propagated to a destination. The message history is stored as a collection in the queue table. An administrator can execute a SQL query to determine if a message has been propagated. For example, to check if a message with msgid
105E7A2EBFF11348E03400400B40F149
'
in queue table aqadmn.queue_tab
has been propagated to destination 'boston
', the following query can be executed:
SELECT consumer, transaction_id, deq_time, deq_user, propagated_msgid FROM THE(select cast(history as sys.aq$_dequeue_history_t) FROM adadmn.queue_tab WHERE msgid='105E7A2EBFF11348E03400400B40F149') WHERE consumer LIKE '%BOSTON%';
A non-NULL transaction_id
indicates that the message was successfully propagated. Further, the deq_time
indicates the time of propagation, the deq_user
indicates the userid used for propagation, and the propagated_msgid
indicates the msgid of the message that was enqueued at the destination. If the message with the msgid cannot be found in the queue table, an administrator can check the exception queue (if the exception queue is in a different queue table) for the message history.
The administrator can check the DBA_QUEUE_SCHEDULE
S view to check if propagation has been scheduled for a particular combination of source queue and destination. If propagation has been scheduled, the jobno
of the job used to propagate messages can be determined from the sys.aq$_schedules
table. The jobno
can then be used to query the DBA_JOBS
view to determine the last time that the propagation was scheduled for the combination of source queue and destination. The DBA_JOBS
view also indicates the next time the propagation will be scheduled, and if the job has been marked as broken. If the job has been marked as broken, check for errors in trace file(s) generated by the job_queue processes in the $ORACLE_HOME
/log directory.
There are a number of points at which the propagation may break down:
dbms_aqadm.schedule_propagation
) has access to the database link for the destination.
aq$_agent
type (in the subscriber list for the source queue or in the recipient list of the enqueuer) both (a) exists at the specified destination, and (b) has been enabled for enqueuing. All these and other errors that the propagator encounters are logged into trace file(s) generated by the job_queue processes in $ORACLE_HOME/
log directory.
AQ will not propagate messages from one queue to another if the payload-types of the two queues are not equivalent. An administrator can verify if the source and destination's payload types match by executing the DBMS_AQADM.VERIFY_QUEUE_TYPES
procedure. The results of the type checking will be stored in the sys.aq$_message_types table. This table can be accessed using the OID
of the source queue and the address of the destination queue (i.e. [schema.]queue_name[@destination
]).
As you can see, the GV$ view and V$ view are exactly the same:
Column Name | Type |
---|---|
|
|
|
|
|
|
|
|
|
|
|
|
Column Name | Type |
---|---|
|
|
|
|
|
|
|
|
|
|
|
|
The difference between these two views is that the GV$ view gives information about the number of messages in different states for the whole database while the V$ view gives information regarding specific instances. The way this works is that each instance keeps its own AQ statistics information in its own SGA, and does not have knowledge of the statistics gathered by other instances. Then, when a GV$AQ view is queried by an instance, all other instances funnel their AQ statistics information to the instance issuing the query.
The following demos may be found in the related directories:
$ORACLE_HOME/demo/aqdemo00.sql Main driver of demo $ORACLE_HOME/demo/aqdemo01.sql Create queue tables and queues using AQ administration interface $ORACLE_HOME/demo/aqdemo02.sql Load the demo package $ORACLE_HOME/demo/aqdemo03.sql Submit the event handler as a job to Job Queue $ORACLE_HOME/demo/aqdemo04.sql Enqueue messages
The operational interface in Orcacle AQ 8.0.4 is backward compatible with the 8.0.3 Oracle AQ interface.
AQ$_AGENT
Data Type
In the latest release, the address field is now enabled for the aq$_agent
datatype. Consequently, it is now possible for this field to be specified wherever an interface takes an Agent as an argument - such as in the recipient list of the message properties, and the DBMS_AQADM
.ADD_SUBSCRIBER
administrative interface.
The address field in the aq$_agent
datatype has been extended to 1024 bytes. To use the extended address field, you will have to complete the following steps:
CATNOQUEUE.SQL
to drop the existing dictionary and queue tables:
SVRMGRL> @CATNOQUEUE.SQL
CATQUEUE.SQL
to redefine the new types and dictionary tables:
SVRMGRL> @CATQUEUE.SQL
CAT8004.SQL
) creates the additional dictionary tables: SYS.AQ$_MESSAGE_TYPES
SYSTEM.AQ$_SCHEDULES
SYS.AQ$_QUEUE_STATISTICS