NAME
Net::RabbitMQ::Client - RabbitMQ client (XS for librabbitmq)
SYNOPSIS
Simple API:
produce();
consume();
sub produce {
my $simple = Net::RabbitMQ::Client->sm_new(
host => "best.host.for.rabbitmq.net" ,
login => "login" , password => "password" ,
exchange => "test_ex" , exchange_type => "direct" , exchange_declare => 1,
queue => "test_queue" , queue_declare => 1
);
die sm_get_error_desc( $simple ) unless ref $simple ;
my $sm_status = $simple ->sm_publish( '{"say": "hello"}' , content_type => "application/json" );
die sm_get_error_desc( $sm_status ) if $sm_status ;
$simple ->sm_destroy();
}
sub consume {
my $simple = Net::RabbitMQ::Client->sm_new(
host => "best.host.for.rabbitmq.net" ,
login => "login" , password => "password" ,
queue => "test_queue"
);
die sm_get_error_desc( $simple ) unless ref $simple ;
my $sm_status = $simple ->sm_get_messages( sub {
my ( $self , $message ) = @_ ;
print $message , "\n" ;
1;
});
die sm_get_error_desc( $sm_status ) if $sm_status ;
$simple ->sm_destroy();
}
|
Base API:
produce();
consume();
sub produce {
my $rmq = Net::RabbitMQ::Client->create();
my $exchange = "test" ;
my $routingkey = "" ;
my $messagebody = "lalala" ;
my $channel = 1;
my $conn = $rmq ->new_connection();
my $socket = $rmq ->tcp_socket_new( $conn );
my $status = $rmq ->socket_open( $socket , "best.host.for.rabbitmq.net" , 5672);
die "Can't create socket" if $status ;
$status = $rmq ->login( $conn , "/" , 0, 131072, 0, AMQP_SASL_METHOD_PLAIN, "login" , "password" );
die "Can't login on server" if $status != AMQP_RESPONSE_NORMAL;
$status = $rmq ->channel_open( $conn , $channel );
die "Can't open chanel" if $status != AMQP_RESPONSE_NORMAL;
$rmq ->queue_bind( $conn , 1, "test_q" , $exchange , $routingkey , 0);
die "Can't bind queue" if $status != AMQP_RESPONSE_NORMAL;
my $props = $rmq ->type_create_basic_properties();
$rmq ->set_prop__flags( $props , AMQP_BASIC_CONTENT_TYPE_FLAG|AMQP_BASIC_DELIVERY_MODE_FLAG);
$rmq ->set_prop_content_type( $props , "text/plain" );
$rmq ->set_prop_delivery_mode( $props , AMQP_DELIVERY_PERSISTENT);
$status = $rmq ->basic_publish( $conn , $channel , $exchange , $routingkey , 0, 0, $props , $messagebody );
if ( $status != AMQP_STATUS_OK) {
print "Can't send message\n" ;
}
$rmq ->type_destroy_basic_properties( $props );
$rmq ->channel_close( $conn , 1, AMQP_REPLY_SUCCESS);
$rmq ->connection_close( $conn , AMQP_REPLY_SUCCESS);
$rmq ->destroy_connection( $conn );
}
sub consume {
my $rmq = Net::RabbitMQ::Client->create();
my $exchange = "test" ;
my $routingkey = "" ;
my $messagebody = "lalala" ;
my $channel = 1;
my $conn = $rmq ->new_connection();
my $socket = $rmq ->tcp_socket_new( $conn );
my $status = $rmq ->socket_open( $socket , "best.host.for.rabbitmq.net" , 5672);
die "Can't create socket" if $status ;
$status = $rmq ->login( $conn , "/" , 0, 131072, 0, AMQP_SASL_METHOD_PLAIN, "login" , "password" );
die "Can't login on server" if $status != AMQP_RESPONSE_NORMAL;
$status = $rmq ->channel_open( $conn , $channel );
die "Can't open chanel" if $status != AMQP_RESPONSE_NORMAL;
$status = $rmq ->basic_consume( $conn , 1, "test_q" , undef , 0, 1, 0);
die "Consuming" if $status != AMQP_RESPONSE_NORMAL;
my $envelope = $rmq ->type_create_envelope();
while (1)
{
$rmq ->maybe_release_buffers( $conn );
$status = $rmq ->consume_message( $conn , $envelope , 0, 0);
last if $status != AMQP_RESPONSE_NORMAL;
print "New message: \n" , $rmq ->envelope_get_message_body( $envelope ), "\n" ;
$rmq ->destroy_envelope( $envelope );
}
$rmq ->type_destroy_envelope( $envelope );
$rmq ->channel_close( $conn , 1, AMQP_REPLY_SUCCESS);
$rmq ->connection_close( $conn , AMQP_REPLY_SUCCESS);
$rmq ->destroy_connection( $conn );
}
|
DESCRIPTION
This is binding for RabbitMQ-C library.
Please, before install this module make RabbitMQ-C library.
See https://github.com/alanxz/rabbitmq-c
https://github.com/lexborisov/perl-net-rabbitmq-client
METHODS
Simple API
sm_new
my $simple = Net::RabbitMQ::Client->sm_new(
host => '' ,
port => 5672,
channel => 1,
login => '' ,
password => '' ,
exchange => undef ,
exchange_type => undef ,
exchange_declare => 0,
queue => undef ,
routingkey => '' ,
queue_declare => 0,
);
|
Return: a Simple object if successful, otherwise an error occurred
sm_publish
my $sm_status = $simple ->sm_publish( $text ,
content_type => "text/plain" ,
delivery_mode => AMQP_DELIVERY_PERSISTENT,
_flags => AMQP_BASIC_CONTENT_TYPE_FLAG|AMQP_BASIC_DELIVERY_MODE_FLAG
);
|
Return: 0 if successful, otherwise an error occurred
sm_get_messages
Loop to get messages
my $callback = {
my ( $simple , $message ) = @_ ;
1;
}
my $sm_status = $simple ->sm_get_messages( $callback );
|
Return: 0 if successful, otherwise an error occurred
sm_get_message
Get one message
my $sm_status = 0;
my $message = $simple ->sm_get_message( $sm_status );
|
Return: message if successful
sm_get_rabbitmq
my $rmq = $simple ->sm_get_rabbitmq();
|
Return: RabbitMQ Base object
sm_get_connection
my $conn = $simple ->sm_get_connection();
|
Return: Connection object (from Base API new_connection)
sm_get_socket
my $socket = $simple ->sm_get_socket();
|
Return: Socket object (from Base API tcp_socket_new)
sm_get_config
my $config = $simple ->sm_get_config();
|
Return: Config when creating a Simple object
sm_get_config
my $description = $simple ->sm_get_error_desc( $sm_error_code );
|
Return: Error description by Simple error code
sm_destroy
Destroy a Simple object
Base API
Connection and Authorization
create
my $rmq = Net::RabbitMQ::Client->create();
|
Return: rmq
new_connection
my $amqp_connection_state_t = $rmq ->new_connection();
|
Return: amqp_connection_state_t
tcp_socket_new
my $amqp_socket_t = $rmq ->tcp_socket_new( $conn );
|
Return: amqp_socket_t
socket_open
my $status = $rmq ->socket_open( $socket , $host , $port );
|
Return: status
socket_open_noblock
my $status = $rmq ->socket_open_noblock( $socket , $host , $port , $struct_timeout );
|
Return: status
login
my $status = $rmq ->login( $conn , $vhost , $channel_max , $frame_max , $heartbeat , $sasl_method );
|
Return: status
channel_open
my $status = $rmq ->channel_open( $conn , $channel );
|
Return: status
socket_get_sockfd
my $res = $rmq ->socket_get_sockfd( $socket );
|
Return: variable
get_socket
my $amqp_socket_t = $rmq ->get_socket( $conn );
|
Return: amqp_socket_t
channel_close
my $status = $rmq ->channel_close( $conn , $channel , $code );
|
Return: status
connection_close
my $status = $rmq ->connection_close( $conn , $code );
|
Return: status
destroy_connection
my $status = $rmq ->destroy_connection( $conn );
|
Return: status
SSL
ssl_socket_new
my $amqp_socket_t = $rmq ->ssl_socket_new( $conn );
|
Return: amqp_socket_t
ssl_socket_set_key
my $status = $rmq ->ssl_socket_set_key( $socket , $cert , $key );
|
Return: status
set_initialize_ssl_library
$rmq ->set_initialize_ssl_library( $do_initialize );
|
ssl_socket_set_cacert
my $status = $rmq ->ssl_socket_set_cacert( $socket , $cacert );
|
Return: status
ssl_socket_set_key_buffer
my $status = $rmq ->ssl_socket_set_key_buffer( $socket , $cert , $key , $n );
|
Return: status
ssl_socket_set_verify
$rmq ->ssl_socket_set_verify( $socket , $verify );
|
Basic Publish/Consume
basic_publish
my $status = $rmq ->basic_publish( $conn , $channel , $exchange , $routing_key , $mandatory , $immediate , $properties , $body );
|
Return: status
basic_consume
my $status = $rmq ->basic_consume( $conn , $channel , $queue , $consumer_tag , $no_local , $no_ack , $exclusive );
|
Return: status
basic_get
my $status = $rmq ->basic_get( $conn , $channel , $queue , $no_ack );
|
Return: status
basic_ack
my $status = $rmq ->basic_ack( $conn , $channel , $delivery_tag , $multiple );
|
Return: status
basic_nack
my $status = $rmq ->basic_nack( $conn , $channel , $delivery_tag , $multiple , $requeue );
|
Return: status
basic_reject
my $status = $rmq ->basic_reject( $conn , $channel , $delivery_tag , $requeue );
|
Return: status
Consume
consume_message
my $status = $rmq ->consume_message( $conn , $envelope , $struct_timeout , $flags );
|
Return: status
Queue
queue_declare
my $status = $rmq ->queue_declare( $conn , $channel , $queue , $passive , $durable , $exclusive , $auto_delete );
|
Return: status
queue_bind
my $status = $rmq ->queue_bind( $conn , $channel , $queue , $exchange , $routing_key );
|
Return: status
queue_unbind
my $status = $rmq ->queue_unbind( $conn , $channel , $queue , $exchange , $routing_key );
|
Return: status
Exchange
exchange_declare
my $status = $rmq ->exchange_declare( $conn , $channel , $exchange , $type , $passive , $durable , $auto_delete , $internal );
|
Return: status
Envelope
envelope_get_redelivered
my $res = $rmq ->envelope_get_redelivered( $envelope );
|
Return: variable
envelope_get_channel
my $res = $rmq ->envelope_get_channel( $envelope );
|
Return: variable
envelope_get_exchange
my $res = $rmq ->envelope_get_exchange( $envelope );
|
Return: variable
envelope_get_routing_key
my $res = $rmq ->envelope_get_routing_key( $envelope );
|
Return: variable
destroy_envelope
$rmq ->destroy_envelope( $envelope );
|
envelope_get_consumer_tag
my $res = $rmq ->envelope_get_consumer_tag( $envelope );
|
Return: variable
envelope_get_delivery_tag
my $res = $rmq ->envelope_get_delivery_tag( $envelope );
|
Return: variable
envelope_get_message_body
my $res = $rmq ->envelope_get_message_body( $envelope );
|
Return: variable
Types
type_create_envelope
my $amqp_envelope_t = $rmq ->type_create_envelope();
|
Return: amqp_envelope_t
type_destroy_envelope
$rmq ->type_destroy_envelope( $envelope );
|
type_create_timeout
my $struct_timeval = $rmq ->type_create_timeout( $timeout_sec );
|
Return: struct_timeval
type_destroy_timeout
$rmq ->type_destroy_timeout( $timeout );
|
type_create_basic_properties
my $amqp_basic_properties_t = $rmq ->type_create_basic_properties();
|
Return: amqp_basic_properties_t
type_destroy_basic_properties
my $status = $rmq ->type_destroy_basic_properties( $props );
|
Return: status
For a Basic Properties
set_prop_app_id
$rmq ->set_prop_app_id( $props , $value );
|
set_prop_content_type
$rmq ->set_prop_content_type( $props , $value );
|
set_prop_reply_to
$rmq ->set_prop_reply_to( $props , $value );
|
set_prop_priority
$rmq ->set_prop_priority( $props , $priority );
|
set_prop__flags
$rmq ->set_prop__flags( $props , $flags );
|
set_prop_user_id
$rmq ->set_prop_user_id( $props , $value );
|
set_prop_delivery_mode
$rmq ->set_prop_delivery_mode( $props , $delivery_mode );
|
set_prop_message_id
$rmq ->set_prop_message_id( $props , $value );
|
set_prop_timestamp
$rmq ->set_prop_timestamp( $props , $timestamp );
|
set_prop_cluster_id
$rmq ->set_prop_cluster_id( $props , $value );
|
set_prop_correlation_id
$rmq ->set_prop_correlation_id( $props , $value );
|
set_prop_expiration
$rmq ->set_prop_expiration( $props , $value );
|
set_prop_type
$rmq ->set_prop_type( $props , $value );
|
set_prop_content_encoding
$rmq ->set_prop_content_encoding( $props , $value );
|
Other
data_in_buffer
my $amqp_boolean_t = $rmq ->data_in_buffer( $conn );
|
Return: amqp_boolean_t
maybe_release_buffers
$rmq ->maybe_release_buffers( $conn );
|
error_string
my $res = $rmq ->error_string( $error );
|
Return: variable
DESTROY
Free mem and destroy object.
AUTHOR
Alexander Borisov <lex.borisov@gmail.com>
COPYRIGHT AND LICENSE
This software is copyright (c) 2015 by Alexander Borisov.
This is free software; you can redistribute it and/or modify it under the same terms as the Perl 5 programming language system itself.
See librabbitmq license and COPYRIGHT https://github.com/alanxz/rabbitmq-c