The Perl Advent Calendar needs more articles for 2022. Submit your idea today!

NAME

Net::RabbitMQ::Client - RabbitMQ client (XS for librabbitmq)

SYNOPSIS

Simple API:

        use utf8;
        use strict;
        
        use Net::RabbitMQ::Client;
        
        produce();
        consume();
        
        sub produce {
                my $simple = Net::RabbitMQ::Client->sm_new(
                        host => "best.host.for.rabbitmq.net",
                        login => "login", password => "password",
                        exchange => "test_ex", exchange_type => "direct", exchange_declare => 1,
                        queue => "test_queue", queue_declare => 1
                );
                die sm_get_error_desc($simple) unless ref $simple;
                
                my $sm_status = $simple->sm_publish('{"say": "hello"}', content_type => "application/json");
                die sm_get_error_desc($sm_status) if $sm_status;
                
                $simple->sm_destroy();
        }
        
        sub consume {
                my $simple = Net::RabbitMQ::Client->sm_new(
                        host => "best.host.for.rabbitmq.net",
                        login => "login", password => "password",
                        queue => "test_queue"
                );
                die sm_get_error_desc($simple) unless ref $simple;
                
                my $sm_status = $simple->sm_get_messages(sub {
                        my ($self, $message) = @_;
                        
                        print $message, "\n";
                        
                        1; # it is important to return 1 (send ask) or 0
                });
                die sm_get_error_desc($sm_status) if $sm_status;
                
                $simple->sm_destroy();
        }

Base API:

        use utf8;
        use strict;
        
        use Net::RabbitMQ::Client;
        
        produce();
        consume();
        
        sub produce {
                my $rmq = Net::RabbitMQ::Client->create();
                
                my $exchange = "test";
                my $routingkey = "";
                my $messagebody = "lalala";
                
                my $channel = 1;
                
                my $conn = $rmq->new_connection();
                my $socket = $rmq->tcp_socket_new($conn);
                
                my $status = $rmq->socket_open($socket, "best.host.for.rabbitmq.net", 5672);
                die "Can't create socket" if $status;
                
                $status = $rmq->login($conn, "/", 0, 131072, 0, AMQP_SASL_METHOD_PLAIN, "login", "password");
                die "Can't login on server" if $status != AMQP_RESPONSE_NORMAL;
                
                $status = $rmq->channel_open($conn, $channel);
                die "Can't open chanel" if $status != AMQP_RESPONSE_NORMAL;
            
                $rmq->queue_bind($conn, 1, "test_q", $exchange, $routingkey, 0);
                die "Can't bind queue" if $status != AMQP_RESPONSE_NORMAL;
                
                my $props = $rmq->type_create_basic_properties();
                $rmq->set_prop__flags($props, AMQP_BASIC_CONTENT_TYPE_FLAG|AMQP_BASIC_DELIVERY_MODE_FLAG);
                $rmq->set_prop_content_type($props, "text/plain");
                $rmq->set_prop_delivery_mode($props, AMQP_DELIVERY_PERSISTENT);
                
                $status = $rmq->basic_publish($conn, $channel, $exchange, $routingkey, 0, 0, $props, $messagebody);
                
                if($status != AMQP_STATUS_OK) {
                    print "Can't send message\n";
                }
                
                $rmq->type_destroy_basic_properties($props);
                
                $rmq->channel_close($conn, 1, AMQP_REPLY_SUCCESS);
                $rmq->connection_close($conn, AMQP_REPLY_SUCCESS);
                $rmq->destroy_connection($conn);
        }
        
        sub consume {
                my $rmq = Net::RabbitMQ::Client->create();
                
                my $exchange = "test";
                my $routingkey = "";
                my $messagebody = "lalala";
                
                my $channel = 1;
                
                my $conn = $rmq->new_connection();
                my $socket = $rmq->tcp_socket_new($conn);
                
                my $status = $rmq->socket_open($socket, "best.host.for.rabbitmq.net", 5672);
                die "Can't create socket" if $status;
                
                $status = $rmq->login($conn, "/", 0, 131072, 0, AMQP_SASL_METHOD_PLAIN, "login", "password");
                die "Can't login on server" if $status != AMQP_RESPONSE_NORMAL;
                
                $status = $rmq->channel_open($conn, $channel);
                die "Can't open chanel" if $status != AMQP_RESPONSE_NORMAL;
                
                $status = $rmq->basic_consume($conn, 1, "test_q", undef, 0, 1, 0);
                die "Consuming" if $status != AMQP_RESPONSE_NORMAL;
                
                my $envelope = $rmq->type_create_envelope();
                
                while (1)
                {
                        $rmq->maybe_release_buffers($conn);
                        
                        $status = $rmq->consume_message($conn, $envelope, 0, 0);
                        last if $status != AMQP_RESPONSE_NORMAL;
                        
                        print "New message: \n", $rmq->envelope_get_message_body($envelope), "\n";
                        
                        $rmq->destroy_envelope($envelope);
                }
                
                $rmq->type_destroy_envelope($envelope);
                
                $rmq->channel_close($conn, 1, AMQP_REPLY_SUCCESS);
                $rmq->connection_close($conn, AMQP_REPLY_SUCCESS);
                $rmq->destroy_connection($conn);
        }

DESCRIPTION

This is binding for RabbitMQ-C library.

Please, before install this module make RabbitMQ-C library.

See https://github.com/alanxz/rabbitmq-c

https://github.com/lexborisov/perl-net-rabbitmq-client

METHODS

Simple API

sm_new

        my $simple = Net::RabbitMQ::Client->sm_new(
                host => '',              # default: 
                port => 5672,            # default: 5672
                channel => 1,            # default: 1
                login => '',             # default: 
                password => '',          # default: 
                exchange => undef,       # default: undef
                exchange_type => undef,  # optional, if exchange_declare == 1; types: topic, fanout, direct, headers; default: undef
                exchange_declare => 0,   # declare exchange or not; 1 or 0; default: 0
                queue => undef,          # default: undef
                routingkey => '',        # default: 
                queue_declare => 0,      # declare queue or not; 1 or 0; default: 0
        );

Return: a Simple object if successful, otherwise an error occurred

sm_publish

        my $sm_status = $simple->sm_publish($text,
                # default args
                content_type  => "text/plain",
                delivery_mode => AMQP_DELIVERY_PERSISTENT,
                _flags        => AMQP_BASIC_CONTENT_TYPE_FLAG|AMQP_BASIC_DELIVERY_MODE_FLAG
        );

Return: 0 if successful, otherwise an error occurred

sm_get_messages

Loop to get messages

        my $callback = {
                my ($simple, $message) = @_;
                
                1; # it is important to return 1 (send ask) or 0
        }
        
        my $sm_status = $simple->sm_get_messages($callback);

Return: 0 if successful, otherwise an error occurred

sm_get_message

Get one message

        my $sm_status = 0;
        my $message = $simple->sm_get_message($sm_status);

Return: message if successful

sm_get_rabbitmq

        my $rmq = $simple->sm_get_rabbitmq();
        

Return: RabbitMQ Base object

sm_get_connection

        my $conn = $simple->sm_get_connection();
        

Return: Connection object (from Base API new_connection)

sm_get_socket

        my $socket = $simple->sm_get_socket();
        

Return: Socket object (from Base API tcp_socket_new)

sm_get_config

        my $config = $simple->sm_get_config();
        

Return: Config when creating a Simple object

sm_get_config

        my $description = $simple->sm_get_error_desc($sm_error_code);
        

Return: Error description by Simple error code

sm_destroy

Destroy a Simple object

        $simple->sm_destroy();
        

Base API

Connection and Authorization

create

 my $rmq = Net::RabbitMQ::Client->create();

Return: rmq

new_connection

 my $amqp_connection_state_t = $rmq->new_connection();

Return: amqp_connection_state_t

tcp_socket_new

 my $amqp_socket_t = $rmq->tcp_socket_new($conn);

Return: amqp_socket_t

socket_open

 my $status = $rmq->socket_open($socket, $host, $port);

Return: status

socket_open_noblock

 my $status = $rmq->socket_open_noblock($socket, $host, $port, $struct_timeout);

Return: status

login

 my $status = $rmq->login($conn, $vhost, $channel_max, $frame_max, $heartbeat, $sasl_method);

Return: status

channel_open

 my $status = $rmq->channel_open($conn, $channel);

Return: status

socket_get_sockfd

 my $res = $rmq->socket_get_sockfd($socket);

Return: variable

get_socket

 my $amqp_socket_t  = $rmq->get_socket($conn);

Return: amqp_socket_t

channel_close

 my $status = $rmq->channel_close($conn, $channel, $code);

Return: status

connection_close

 my $status = $rmq->connection_close($conn, $code);

Return: status

destroy_connection

 my $status = $rmq->destroy_connection($conn);

Return: status

SSL

ssl_socket_new

 my $amqp_socket_t = $rmq->ssl_socket_new($conn);

Return: amqp_socket_t

ssl_socket_set_key

 my $status = $rmq->ssl_socket_set_key($socket, $cert, $key);

Return: status

set_initialize_ssl_library

 $rmq->set_initialize_ssl_library($do_initialize);

ssl_socket_set_cacert

 my $status = $rmq->ssl_socket_set_cacert($socket, $cacert);

Return: status

ssl_socket_set_key_buffer

 my $status = $rmq->ssl_socket_set_key_buffer($socket, $cert, $key, $n);

Return: status

ssl_socket_set_verify

 $rmq->ssl_socket_set_verify($socket, $verify);

Basic Publish/Consume

basic_publish

 my $status = $rmq->basic_publish($conn, $channel, $exchange, $routing_key, $mandatory, $immediate, $properties, $body);

Return: status

basic_consume

 my $status = $rmq->basic_consume($conn, $channel, $queue, $consumer_tag, $no_local, $no_ack, $exclusive);

Return: status

basic_get

 my $status = $rmq->basic_get($conn, $channel, $queue, $no_ack);

Return: status

basic_ack

 my $status = $rmq->basic_ack($conn, $channel, $delivery_tag, $multiple);

Return: status

basic_nack

 my $status = $rmq->basic_nack($conn, $channel, $delivery_tag, $multiple, $requeue);

Return: status

basic_reject

 my $status = $rmq->basic_reject($conn, $channel, $delivery_tag, $requeue);

Return: status

Consume

consume_message

 my $status = $rmq->consume_message($conn, $envelope, $struct_timeout, $flags);

Return: status

Queue

queue_declare

 my $status = $rmq->queue_declare($conn, $channel, $queue, $passive, $durable, $exclusive, $auto_delete);

Return: status

queue_bind

 my $status = $rmq->queue_bind($conn, $channel, $queue, $exchange, $routing_key);

Return: status

queue_unbind

 my $status = $rmq->queue_unbind($conn, $channel, $queue, $exchange, $routing_key);

Return: status

Exchange

exchange_declare

 my $status = $rmq->exchange_declare($conn, $channel, $exchange, $type, $passive, $durable, $auto_delete, $internal);

Return: status

Envelope

envelope_get_redelivered

 my $res = $rmq->envelope_get_redelivered($envelope);

Return: variable

envelope_get_channel

 my $res = $rmq->envelope_get_channel($envelope);

Return: variable

envelope_get_exchange

 my $res = $rmq->envelope_get_exchange($envelope);

Return: variable

envelope_get_routing_key

 my $res = $rmq->envelope_get_routing_key($envelope);

Return: variable

destroy_envelope

 $rmq->destroy_envelope($envelope);

envelope_get_consumer_tag

 my $res = $rmq->envelope_get_consumer_tag($envelope);

Return: variable

envelope_get_delivery_tag

 my $res = $rmq->envelope_get_delivery_tag($envelope);

Return: variable

envelope_get_message_body

 my $res = $rmq->envelope_get_message_body($envelope);

Return: variable

Types

type_create_envelope

 my $amqp_envelope_t = $rmq->type_create_envelope();

Return: amqp_envelope_t

type_destroy_envelope

 $rmq->type_destroy_envelope($envelope);

type_create_timeout

 my $struct_timeval = $rmq->type_create_timeout($timeout_sec);

Return: struct_timeval

type_destroy_timeout

 $rmq->type_destroy_timeout($timeout);

type_create_basic_properties

 my $amqp_basic_properties_t = $rmq->type_create_basic_properties();

Return: amqp_basic_properties_t

type_destroy_basic_properties

 my $status = $rmq->type_destroy_basic_properties($props);

Return: status

For a Basic Properties

set_prop_app_id

 $rmq->set_prop_app_id($props, $value);

set_prop_content_type

 $rmq->set_prop_content_type($props, $value);

set_prop_reply_to

 $rmq->set_prop_reply_to($props, $value);

set_prop_priority

 $rmq->set_prop_priority($props, $priority);

set_prop__flags

 $rmq->set_prop__flags($props, $flags);

set_prop_user_id

 $rmq->set_prop_user_id($props, $value);

set_prop_delivery_mode

 $rmq->set_prop_delivery_mode($props, $delivery_mode);

set_prop_message_id

 $rmq->set_prop_message_id($props, $value);

set_prop_timestamp

 $rmq->set_prop_timestamp($props, $timestamp);

set_prop_cluster_id

 $rmq->set_prop_cluster_id($props, $value);

set_prop_correlation_id

 $rmq->set_prop_correlation_id($props, $value);

set_prop_expiration

 $rmq->set_prop_expiration($props, $value);

set_prop_type

 $rmq->set_prop_type($props, $value);

set_prop_content_encoding

 $rmq->set_prop_content_encoding($props, $value);

Other

data_in_buffer

 my $amqp_boolean_t = $rmq->data_in_buffer($conn);

Return: amqp_boolean_t

maybe_release_buffers

 $rmq->maybe_release_buffers($conn);

error_string

 my $res = $rmq->error_string($error);

Return: variable

DESTROY

 undef $rmq;

Free mem and destroy object.

AUTHOR

Alexander Borisov <lex.borisov@gmail.com>

COPYRIGHT AND LICENSE

This software is copyright (c) 2015 by Alexander Borisov.

This is free software; you can redistribute it and/or modify it under the same terms as the Perl 5 programming language system itself.

See librabbitmq license and COPYRIGHT https://github.com/alanxz/rabbitmq-c