NAME
MQSeries::PubSub::Command -- base OO class implementing and interface to the MQSeries Publish/Subscribe broker commands.
SYNOPSIS
#
# Examples of Broker object instantiation
#
# Plain and simple (usually sufficient)
#
my $broker = MQSeries::PubSub::Broker->new
(
QueueManager => 'FOO.QMGR',
) || die;
#
# Using your own ReplyQ.
#
my $replyq = MQSeries::Queue->new
(
QueueManager => $broker,
Queue => 'SOME.APP.MODEL.QUEUE'
DynamicQName => 'SOME.APP.REPLYQ.*',
Mode => 'input',
CloseOptions => MQCO_DELETE_PURGE,
) || die;
my $broker = MQSeries::PubSub::Broker->new
(
QueueManager => 'FOO.QMGR',
ReplyQ => $replyq,
) || die;
#
# Using a fixed, per-defined app-specific local queue
#
my $broker = MQSeries::PubSub::Broker->new
(
QueueManager => 'FOO.QMGR',
ReplyQ => 'SOME.APP.PUBSUB.REPLY',
) || die;
#
# Overriding the default DynamicQName
#
my $broker = MQSeries::PubSub::Broker->new
(
QueueManager => 'FOO.QMGR',
DyamicQName => 'SOME.APP.PUBSUB.REPLY.*',
) || die;
#
# Examples of Stream object instantiation
#
my $stream = MQSeries::PubSub::Stream->new
(
QueueManager => $broker,
Queue => 'SOME.APP.PUBSUB.STREAM',
) || die;
#
# Command examples
#
# RegisterPublisher, single topic
#
$broker->RegisterPublisher
(
Options =>
{
Topic => 'Some/Topic/Of/Interest',
StreamName => 'SOME.APP.PUBSUB.STREAM',
},
) || die;
#
# RegisterSubscriber, multiple topics
#
$broker->RegisterSubscriber
(
Options =>
{
Topic => [qw(
Some/Topic/Of/Interest
Other/Interesing/Topic
Something/Boring
)],
StreamName => 'SOME.APP.PUBSUB.STREAM',
},
) || die;
#
# RegisterSubscriber, with subscription expiration
#
$broker->RegisterSubscriber
(
MsgDesc =>
{
Expiry => 600,
},
Options =>
{
Topic => 'Some/Topic/Of/Interest',
StreamName => 'SOME.APP.PUBSUB.STREAM',
},
) || die;
#
# RegisterSubscriber, with your own replyQ
#
my $replyq = MQSeries::Queue->new
(
QueueManager => $broker,
Queue => 'SOME.APP.MODEL.QUEUE'
DynamicQName => 'SOME.APP.REPLYQ.*',
Mode => 'input',
CloseOptions => MQCO_DELETE_PURGE,
) || die;
$broker->RegisterSubscriber
(
Options =>
{
Topic => 'Some/Topic/Of/Interest',
StreamName => 'SOME.APP.PUBSUB.STREAM',
QMgrName => $replyq->ObjDesc('ObjectQMgrName'),
QName => $replyq->ObjDesc('ObjectName'),
},
) || die;
#
# Publish some data
#
$stream->Publish
(
Options =>
{
Topic => 'Some/Topic/Of/Interest',
},
Data => $mydata,
) || die;
#
# Publish some data as a datagram, explicitly. Note that the data
# is a string, so we set the Format field of the MQRFH header
# appropriately.
#
$stream->Publish
(
MsgDesc =>
{
MsgType => MQMT_DATAGRAM,
},
Header =>
{
Format => MQFMT_STRING,
},
Options =>
{
Topic => 'Some/Topic/Of/Interest',
},
Data => $mydata,
) || die;
DESCRIPTION
The MQSeries::PubSub::Command class is the base class for both of MQSeries::PubSub::Broker and MQSeries::PubSub::Stream. It is not used directly, and attempts to instantiate MQSeries::PubSub::Command objects will fail.
The underlying similarity between both objects is that they implement subsets of the Publish/Subscribe commands. There are 7 of these commands, and 5 of them are specific to the PubSub Broker (QueueManager), and 2 to any individual PubSub Stream (Queue).
PubSub Command Module
-------------- ------
RegisterPublisher MQSeries::PubSub::Broker
RegisterSubscriber MQSeries::PubSub::Broker
DeregisterPublisher MQSeries::PubSub::Broker
DeregisterSubscriber MQSeries::PubSub::Broker
RequestUpdate MQSeries::PubSub::Broker
Publish MQSeries::PubSub::Stream
DeletePublication MQSeries::PubSub::Stream
This base class (MQSeries::PubSub::Command) implements the command interface, with the fundamental difference being the MQSeries Queue to which the command messages are put. The Broker-specific commands all put messages to the SYSTEM.BROKER.CONTROL.QUEUE, and the Stream-specific commands put messages to the chosen PubSub stream. In both case, the broker should be listening on these queues, and it expects the same format (MQRFH) for the messages, and generates replies accordingly.
In addition, this API provides for several "Extended Commands", which are convenience functions allowing the developer to query the Publish/Subscribe administrative messages in the "MQ/*" topic namespace for each Stream. All of these commands are methods of the MQSeries::PubSub::Broker object.
PubSub Extended Command
-----------------------
InquireParent
InquireChildren
InquireStreamNames
InquireTopics
InquireIdentities
InquireRetainedMessages
METHODS
MQSeries::PubSub::Broker objects are subclassed from MQSeries::QueueManager, and therefore all of the latter methods are available.
MQSeries::PubSub::Stream objects are subclassed from MQSeries::Queue, and therefore all of the latter methods are available. However, it should be noted that an MQSeries::PubSub::Stream object can only be opened for 'output', since input is the PubSub brokers responsibility (that is, the actual broker process, not the perl5 Broker object). More importantly, messages must be put using the Publish method, rather then Put directly, since the format must be a proper MQRFH message. This is handled automatically, if the Publish method is used. Gluttons for punishment are welcome to do this on their own.
The return value from all of the command methods depends on whether or not datagrams or requests are being sent. In all cases, the command methods return a false value in any failure scenario. When succesful, the result is always true, and if datagrams are sent, the return value is the MQSeries::PubSub::Message object successfully put to either the SYSTEM.BROKER.CONTROL.QUEUE or the MQSeries::PubSub::Stream queue.
When sending datagrams, one may still wish to know the MsgId of the successfully put messages, for later correlation with exception reports for example. The API provided by these classes does nothing special to handle these reports; it is the responsibility of the application to poll any ReplyQ for these, and correlate the exception reports with the original messages. By returning the MQSeries::PubSub::Message object, this is at least possible.
new
In addition to the documented arguments of the parent class constructors, the MQSeries::PubSub::Broker and MQSeries::PubSub::Stream constructor supports some additional arguments.
Key Value
=== =====
ReplyQ String, or MQSeries::Queue object
DynamicQName String
ModelQName String
Wait Numeric
DatagramOnly Boolean
- ReplyQ
-
If specified, this can either be a plain string, in which case it is interpreted as queue name to be opened for input, or it can be an MQSeries::Queue object (or subclass thereof), in which case it is assumed to have been instantiated for input.
If not given (and if DatagramOnly is omitted), the constructor will open a permanent dynamic queue, using the SYSTEM.DEFAULT.MODEL.PERMDYN.QUEUE model queue, and PUBSUB.BROKER.REPLYQ.* as a DynamicQName template.
NOTE: This queue is not a default object created by the MQSeries product, so it will need to be created by the MQSeries administrators at your site, if you wish to leverage this feature.
The dynamic queue is opened by specifying MQCO_DELETE_PURGE as a CloseOptions, so it should be destroyed when the broker object goes out of scope, or the perl process exits.
This queue will be used as the reply queue for request messages sent to the broker, or published to a stream queue.
- DynamicQName
-
When using the default permanent dynamic queue, the template name (DynamicQName) used to open the queue can be customized with this argument. This is relevant if you want a dynamic queue name that matches your own object naming conventions, and not the authors.
- ModelQName
-
When using the default permanent dynamic queue, the model queue used to open the queue can be customized with this argument. As noted above, the default object (SYSTEM.DEFAULT.MODEL.PERMDYN.QUEUE) is not present unless you create it, so this will allow you to use an existing permanent dynamic model queue, or use an application specific model queue, if one exists.
NOTE: With the MQSeries PubSub support pac, temporary dynamic queues are not supported for replies to PubSub command messages, since the replies are persistent, thus the model must be a permanent dynamic model queue. Note that with MSQI v2.0, which will replace the PubSub support pac broker, this restriction is supposedly lifted.
- Wait
-
This parameter specified the Wait argument passed to the Get() method when retrieving responses from the broker. This is obviously not relevant if the DatagramOnly option is set, or if the individual MQSeries::PubSub::Message objects are instantiated as datagrams, and not requests.
This can be a numeric value, in milliseconds, or a symbolic value of a number ending on an 's' for seconds or an 'm' for minutes.
The default value is 5000 milliseconds.
- DatagramOnly
-
If this is true, then the broker will be configured to always send datagrams, and never requests, when sending messages to the broker. By specifying that the user intends to send nothing other than datagrams, the automatic creation of a dynamic reply queue is disabled, since it will not be needed.
This simply optmization would really only be appropriate for a high performance publisher sending non-persistent messages via the PubSub infrastructure.
The use of only datagrams implies a significantly reduced level of error checking, since the only operation that can be checked is the MQPUT() of the messages to either the broker control queue, or an individual stream queue. If the broker encounters a problem when it retreives such a message, it may be silently dropped, or perhaps sent to the system dead letter queue, depending on the configuration of the broker.
In any event, this feature should be used with caution.
Response
This method returns the MQSeries::PubSub::Message object for the response retrieved from the broker, which will be saved in the object, unless the command sent a datagram. See the MQSeries::PubSub::Message for more information.
ReplyQ
This method returns the MQSeries::Queue object instantiated for the ReplyQ used by the Broker of Stream object. This would be useful if the subscriber queue is allowed to default to the automatically created and opened object created at instantiation time. If the application doesnt create its own subscriber queue, for example. When RegisterSubscriber is called, the QMgrName and QName Options can be omitted, and they will default to those used in the MsgDesc field of the resulting command message, which will be set to those of the automatically instantiated ReplyQ object by default.
Publications will then be sent to the default ReplyQ, and applications can use this method to retrieve the object against which the Get method call must be retrieve the subscribed data.
For example:
$broker = MQSeries::PubSub::Broker->new( QueueManager => 'FOO' ) || die;
$broker->RegisterSubscriber
(
Options =>
{
Topic => 'Something/Interesting',
StreamName => 'SOME.APP.STREAM',
},
) || die;
while ( 1 ) {
my $message = MQSeries::PubSub::Message->new() || die;
my $result = $broker->ReplyQ()->Get
(
Message => $message,
Wait => 1000,
);
next if $result > 1; # -1 means MQRC_NO_MSG_AVAILABLE -- see MQSeries::Queue::Get docs
# Do something useful with $message->Data(), perhaps
}
COMMAND SYNTAX
All 7 of the Publish/Subscribe commands have a common calling syntax. These are all implemented as method calls of either MQSeries::PubSub::Broker or MQSeries::PubSub::Stream objects, as discussed above.
Common Arguments
The arguments to the command methods are a hash, containing the following Key/Value pairs:
Key Value
=== =====
MsgDesc MQMD HASH ref (see MQSeries::Message->new() docs)
Header MQRFH HASH ref (see MQSeries::Message::RulesFormat->new() docs)
Options PubSub command NameValue pairs (MQRFH.NameValueString, see below)
Data Scalar value, passed to MQSeries::PubSub::Message->new()
Sync Boolean, passed to MQSeries::Queue->Put()
- MsgDesc
-
This HASH is passed through directly to the MQSeries::Message::RulesFormat->new() constructor, however there are a few keys that are particularly important.
"Format" should not be given, as this will be set to MQFMT_RF_HEADER by default (which is actually the required format for PubSub commands), when the underlying MQSeries::PubSub::Message object is instantiated. This is done for you.
"Expiry" will be both the expiration value for the message itself, as well as the expiration of the publication or subscription registration. This value will typically be used to specify the lifetime of the PubSub subscriber or publisher aopplication.
"MsgType" can be set to either MQMT_DATAGRAM or MQMT_REQUEST, with the default depending on how the MQSeries::PubSub::* object was instantianted. If the DatagramOnly option was specified, then it is an error to attempt to send a MQMT_REQUEST. MsgType defaults to MQMT_REQUEST, unless DatagramOnly was specified, in which case it defaults to MQMT_DATAGRAM.
"ReplyToQ" and "ReplyToQMgr", if not given, are assumed to be the those of the "ReplyQ" opened when the MQSeries::PubSub::* object was instantiated, but they can be specified, and thus override these defaults. Normally, the developer specifies the ReplyQ at instantiation time, and not for each and every PubSub command, so thee options are not normally used.
- Header
-
This HASH is passed through directly to the MQSeries::Message::RulesFormat->new() constructor. This HASH represents all of the fields of the MQRFH data structure, except the NameValue string (see the "Options" section below).
In general, all of the fields of this data structure have reasonable defaults, and the only fields that usually need to be set, if at all, are as follows.
"Encoding" defaults to MQENV_NATIVE, but this can be set if your application uses the pre-defined numeric data formats supported by MQSeries.
"CodedCharSetId" defaults to MQCCSI_Q_MGR, but can be set to the appropriate value when string data is used, and a specific character set is used.
"Format" defaults to MQFMT_NONE, which is wrong if you are using string data in your publications. In order for MQGMO_CONVERT to work correctly, this field must be set to MQFMT_STRING for string data.
- Options
-
This is a HASH or Name/Value pairs, which represents the MQRFH NameValue string. The keys vary from one command to another, and are documented in detail in the "MQSeries PubSub Users Guide" document from IBM. They are summarized here for completeness.
Most of the values should be simple strings, corresponding to the value of that Options key. However, if the Key is repeated (for example, specifying multiple topics for a subscription), then value should be an ARRAY reference of strings. This will generate an appropriately repeated entry in the NameValue string section of the resulting MQRFH message.
Most importantly, in the documentation, the key strings are all prefixed with "MQPS". This must be omitted, as the MQSeries::PubSub::Message objects will prepend it appropriately, and strip it when retreiving these messages.
See below for a summary of each individual PubSub commands options.
- Sync
-
This option is passed directly to the MQSeries::Queue->Put() method, and it can only really be used when sending datagrams. Obviously, if you are sending requests, then you can not use syncpoint since the put of the request, and subsequent get of the response all happens within a single subroutine call.
This option is really only relevant if MsgDesc->{MsgType} == MQMT_DATAGRAM, and then the method call (eg. the MQSeries::PubSub::Broker->Commit() call) is the responsibility of the calling application.
- Data
-
This is application data portion of the subsequently instantiated MQSeries::PubSub::Message object. See the MQSeries::Message::RulesFormat documentation for more information.
COMMANDS
This section provides a summary of the Options keys, and appropriate values, for each of the 7 PubSub commands. Note that the full story here is really found in the IBM product documentation, most notably the "MQSeries PubSub User's Guide".
NOTE: that the docs list the "Command" key as required, but you will notice it is omitted here. Since the method names all map directly to the actual command name, this API adds that key for you.
RegisterPublisher (Broker)
Required keys:
Topic
Optional keys:
RegOpts, StreamName, QMgrName, QName
RegisterSubscriber (Broker)
Required parameters:
Topic
Optional parameters:
RegOpts, StreamName, QMgrName, QName
DeregisterPublisher (Broker)
Optional parameters:
RegOpts, StreamName, Topic, QMgrName, QName
DeregisterSubscriber (Broker)
Optional parameters:
RegOpts, StreamName, Topic, QMgrName, QName
RequestUpdate (Broker)
Required parameters:
Topic
Optional parameters:
RegOpts, StreamName, QMgrName, QName
Publish (Stream)
Required parameters:
Topic
Optional parameters:
RegOpts, PubOpts, StreamName,
QMgrName, QName, PubTime, SeqNum,
StringData, IntData
Data can be published in any arbitrary string format, using the Data argument to the Publish command (see above).
NOTE: The version of the IBM docs I have has an important typo, namely the PublishTimestamp section for this command claims the Name: is MQPSQName, and the string constant is MQPS_Q_NAME, which is obviously wrong.
DeletePublication (Stream)
Required parameters:
Topic
Optional parameters:
DelOpts, StreamName
EXTENDED COMMANDS
WARNING: These commands are entirely specific to the MQSeries Publish/Subscribe Perl API, and there is no analogous functionality in any of the IBM products or APIs. These commands are modeled after the Command Server API (MQSeries::Command) methods, in terms of the names chosen.
InquireParent
This method returns a single string, which is the name of the parent broker for the given queue manager. The arguments are a hash of key/value pairs:
Key Value
=== =====
QMgrName string
- QMgrName
-
This is the name of the queue manager whose parent is being queried. This argument is optional, and the QMgrName defaults to the same queue manager as that used to instantiate the MQSeries::PubSub::Broker object.
InquireChildren
This method returns a list of strings, each of which is a broker queue manager whose parent is the given queue manager. The arguments are a hash of key/value pairs:
Key Value
=== =====
QMgrName string
- QMgrName
-
This is the name of the queue manager whose children are being queried. This argument is optional, and the QMgrName defaults to the same queue manager as that used to instantiate the MQSeries::PubSub::Broker object.
InquireStreamNames
This method returns a list of strings, each of which is a supported StreamName, available on the broker for the given queue manager. The arguments are a hash of key/value pairs:
Key Value
=== =====
QMgrName string
- QMgrName
-
This is the name of the queue manager whose stream names are being queried. This argument is optional, and the QMgrName defaults to the same queue manager as that used to instantiate the MQSeries::PubSub::Broker object.
InquireTopics
This method returns a list of strings, each of which is a currently registered Topic, for either Publishers or Subscribers, for the given QMgrName and StreamName. The arguments are a hash of key/value pairs:
Key Value
=== =====
Type string ( "Publishers" | "Subscribers" )
StreamName string
QMgrName string
- Type
-
This indicates whether registered Publishers or Subscribers Topics are to be queried. It is optional, and defaults to "Publishers", in the assumption that the available Topic list is most interesting.
- StreamName
-
This indicates the stream for which Topics are to be queried. It is optional, and defaults to "SYSTEM.BROKER.DEFAULT.STREAM".
- QMgrName
-
This is the name of the queue manager whose topics are being queried. This argument is optional, and the QMgrName defaults to the same queue manager as that used to instantiate the MQSeries::PubSub::Broker object.
InquireIdentities
This method returns a list of hash references, each of which describes the publishers or subscribers for a single stream. The arguments are a hash of key/value pairs:
Key Value
=== =====
Type string ( "Publishers" | "Subscribers" )
QMgrName string
Topic string
StreamName string
Anonymous Boolean
- Type
-
This is a string which indicates whether the query if for "Publishers" or "Subscribers". This argument is optional, and at defaults to "Subscribers".
- QMgrName
-
This is the name of the queue manager whose publishers are being queried. This argument is optional, and the QMgrName defaults to the same queue manager as that used to instantiate the MQSeries::PubSub::Broker object.
- Topic
-
This is the Topic for which publishers are being queried. This argument is optional, and it defaults to '*', or all possible topics.
- StreamName
-
This is the stream name for which publishers are being queried. This argument is optional and it defaults to "SYSTEM.BROKER.DEFAULT.STREAM".
- Anonymous
-
This is a flag which indicated whether or not anonymous publishers are to be included in the returned data.
The structure of the return data for the InquireIdentities method is as follows. The return value is an ARRAY of HASH references. Each HASH has the following keys:
Key Value
=== =====
Topic string
StreamName string
BrokerCount integer
ApplCount integer
AnonymousCount integer
Publishers ARRAY reference
Subscribers ARRAY reference
There will be one of these HASH references for each Topic which matches the value specified in the method argument list.
- Topic
-
The Topic for which this entry contains publisher or subscriber information.
- StreamName
-
The StreamName for which this entry contains publisher or subscriber information.
- BrokerCount
-
Count of publisher or subscriber registrations from brokers. For publishers, this count is normally zero, as brokers do not register as publishers. The role of a broker in acting as a publisher itself for metatopics on stream queues is not counted, nor is its role as a publisher for administrative topics on the SYSTEM.BROKER.ADMIN.STREAM stream.
- ApplCount
-
Count of publisher or subscriber registrations from applications. Note that this includes anonymous registrations if the "Anonymous" argument was given.
- AnonymousCount
-
Count of anonymous publisher or subscriber registrations from applications.
- Publishers or Subscribers
-
Only one of these keys will be present, obviously depending on the value of the "Type" specified in the method argument list. The value will be an ARRAY reference of HASH references. There will be one entry in the ARRAY for each individual publisher or subscriber.
The Publishers and/or Subscribers HASH references will have the following keys:
Key
===
QMgrName
QName
UserIdentifier
RegistrationOptions
Time
CorrelId
NOTE: These are the same as the Parameter identifiers documented in the IBM docs with the string "Registration" prepended.
- QMgrName
-
Publisher's or subscriber's queue manager name.
- QName
-
Publisher's or subscriber's queue name.
- UserIdentifier
-
Publisher's or subscriber's user ID.
- RegistrationOptions
-
This is a HASH reference, and the existence of any of the following keys indicated that the corresponding RegistrationOptions value is present.
For publishers, the following keys may be present:
Key Option === ====== Anon MQREGO_ANONYMOUS Local MQREGO_LOCAL DirectReq MQREGO_DIRECT_REQUEST CorrelAsId MQREGO_CORREL_ID_AS_IDENTITY
For subscribers, the following keys may be present:
Key Option === ====== Anon MQREGO_ANONYMOUS Local MQREGO_LOCAL NewPubsOnly MQREGO_NEW_PUBLICATIONS_ONLY PubOnReqOnly MQREGO_PUBLISH_ON_REQUEST_ONLY CorrelAsId MQREGO_CORREL_ID_AS_IDENTITY InclStreamName MQREGO_INCLUDE_STREAM_NAME InformIfRet MQREGO_INFORM_IF_RETAINED
- Time
-
Time of registration.
- CorrelId
-
Publisher's or subscriber's correlation identifier.
This is a 48-byte character string of hexadecimal characters representing the contents of the 24-byte binary correlation identifier. Each character in the string is in the range 0 through 9 or A through F.
This parameter is present only if the publisher's or subscriber's identity includes a correlation identifier.
InquireRetainedMessages
This method is really the building block for all of the above "Extended Commands", however it is also useful as a standalone method for querying arbitrary Publish/Subscribe administrative messages. Note that for some of the specific information available, the higher level functions described above should be used.
This code works by calling RegisterSubscriber with for the specified Topic and StreamName, retreives all of the retained publications by calling RequestUpdate, and finally calls DeregisterSubscriber to cancel the subscription.
This method will return an array of MQSeries::PubSub::AdminMessage objects, one for each retained message in the specifies Topic and StreamName. The arguments are a hash of key/value pairs:
Key Value
=== =====
Topic string
StreamName string
- Topic
-
This is the Topic for which retained messages are to be queried.
- StreamName
-
This is the StreamName for which retained messages are to be queried. This is optional, and the default value is "SYSTEM.BROKER.ADMIN.STREAM".
SEE ALSO
MQSeries::PubSub::Broker(3), MQSeries::PubSub::Stream(3), MQSeries::PubSub::Message(3), MQSeries::QueueManager(3), MQSeries::Queue(3), "MQSeries PubSub User's Guide"