package MediaCloud::JobManager::Broker::RabbitMQ;

#
# RabbitMQ job broker (using Celery protocol)
#
# Usage:
#
# MediaCloud::JobManager::Broker::RabbitMQ->new();
#

use strict;
use warnings;
use Modern::Perl "2012";

use Moose;
with 'MediaCloud::JobManager::Broker';

use Net::AMQP::RabbitMQ;
use UUID::Tiny ':std';
use Tie::Cache;
use JSON;
use Data::Dumper;
use Readonly;

use Log::Log4perl qw(:easy);
Log::Log4perl->easy_init(
    {
        level  => $DEBUG,
        utf8   => 1,
        layout => "%d{ISO8601} [%P]: %m%n"
    }
);

# flush sockets after every write
$| = 1;

use MediaCloud::JobManager;
use MediaCloud::JobManager::Job;

# RabbitMQ default timeout
Readonly my $RABBITMQ_DEFAULT_TIMEOUT => 60;

# Default amount of retries to try connecting to RabbitMQ to
Readonly my $RABBITMQ_DEFAULT_RETRIES => 60;

# RabbitMQ delivery modes
Readonly my $RABBITMQ_DELIVERY_MODE_NONPERSISTENT => 1;
Readonly my $RABBITMQ_DELIVERY_MODE_PERSISTENT    => 2;

# RabbitMQ queue durability
Readonly my $RABBITMQ_QUEUE_TRANSIENT => 0;
Readonly my $RABBITMQ_QUEUE_DURABLE   => 1;

# RabbitMQ priorities
Readonly my %RABBITMQ_PRIORITIES => (
    $MediaCloud::JobManager::Job::MJM_JOB_PRIORITY_LOW    => 0,
    $MediaCloud::JobManager::Job::MJM_JOB_PRIORITY_NORMAL => 1,
    $MediaCloud::JobManager::Job::MJM_JOB_PRIORITY_HIGH   => 2,
);

# JSON (de)serializer
my $json = JSON->new->allow_nonref->canonical->utf8;

# RabbitMQ connection credentials
has '_hostname' => ( is => 'rw', isa => 'Str' );
has '_port'     => ( is => 'rw', isa => 'Int' );
has '_username' => ( is => 'rw', isa => 'Str' );
has '_password' => ( is => 'rw', isa => 'Str' );
has '_vhost'    => ( is => 'rw', isa => 'Str' );
has '_timeout'  => ( is => 'rw', isa => 'Int' );
has '_retries'  => ( is => 'rw', isa => 'Int' );

# RabbitMQ connection pool for every connection ID (PID + credentials)
my %_rabbitmq_connection_for_connection_id;

# "reply_to" queues for connection ID + function name
#
# We emulate Celery's RPC via RabbitMQ behavior in which results are being
# stuffed in per-client result queues and can be retrieved only by the same
# client that requested the job using run_remotely() or add_to_queue():
#
# http://docs.celeryproject.org/en/latest/userguide/tasks.html#rpc-result-backend-rabbitmq-qpid
my %_reply_to_queues_for_connection_id_function_name;

# Memory-limited results cache for connection ID + function name
#
# When fetching messages from "reply_to" queue for a specific name,
# run_remotely() can't requeue messages that don't belong to a specific job ID
# so it has to put it somewhere. This hash of hashes serves as a backlog for
# unused job results.
#
# It's not ideal that some job results might get invalidated but Celery does
# that too (purges results deemed too old).
my %_results_caches_for_connection_id_function_name;

# Limits of results cache above
Readonly my $RABBITMQ_RESULTS_CACHE_MAXCOUNT => 1024 * 100;
Readonly my $RABBITMQ_RESULTS_CACHE_MAXBYTES => 1024 * 1024 * 10;

# Constructor
sub BUILD
{
    my $self = shift;
    my $args = shift;

    $self->_hostname( $args->{ hostname } // 'localhost' );
    $self->_port( $args->{ port }         // 5672 );
    $self->_username( $args->{ username } // 'guest' );
    $self->_password( $args->{ password } // 'guest' );
    my $default_vhost = '/';
    $self->_vhost( $args->{ vhost }     // $default_vhost );
    $self->_timeout( $args->{ timeout } // $RABBITMQ_DEFAULT_TIMEOUT );
    $self->_retries( $args->{ retries } // $RABBITMQ_DEFAULT_RETRIES );

    # Connect to the current connection ID (PID + credentials)
    my $mq = $self->_mq();
}

# Used to uniquely identify RabbitMQ connections (by connection credentials and
# PID) so that we know when to reconnect
sub _connection_identifier($)
{
    my $self = shift;

    # Reconnect when running on a fork too
    my $pid = $$;

    return sprintf(
        'PID=%d; hostname=%s; port=%d; username: %s; password=%s; vhost=%s, timeout=%d, retries=%d',
        $pid,             $self->_hostname, $self->_port,    $self->_username,
        $self->_password, $self->_vhost,    $self->_timeout, $self->_retries
    );
}

# Returns RabbitMQ connection handler for the current connection ID
sub _mq($)
{
    my $self = shift;

    my $conn_id = $self->_connection_identifier();

    unless ( $_rabbitmq_connection_for_connection_id{ $conn_id } )
    {

        # Connect to RabbitMQ, open channel
        DEBUG( "Connecting to RabbitMQ (PID: $$, hostname: " .
              $self->_hostname . ", port: " . $self->_port . ", username: " . $self->_username . ")..." );

        # RabbitMQ might not yet be up at the time of connecting, so try for up to a minute
        my $mq;
        my $connected = 0;
        my $last_error_message;
        for ( my $retry = 0 ; $retry < $self->_retries ; ++$retry )
        {
            eval {
                if ( $retry > 0 )
                {
                    DEBUG( "Retrying #$retry..." );
                }

                $mq = Net::AMQP::RabbitMQ->new();
                $mq->connect(
                    $self->_hostname,
                    {
                        user     => $self->_username,
                        password => $self->_password,
                        port     => $self->_port,
                        vhost    => $self->_vhost,
                        timeout  => $self->_timeout,
                    }
                );
            };
            if ( $@ )
            {
                $last_error_message = $@;
                WARN( "Unable to connect to RabbitMQ, will retry: $last_error_message" );
                sleep( 1 );
            }
            else
            {
                $connected = 1;
                last;
            }
        }
        unless ( $connected )
        {
            LOGDIE( "Unable to connect to RabbitMQ, giving up: $last_error_message" );
        }

        my $channel_number = _channel_number();
        unless ( $channel_number )
        {
            LOGDIE( "Channel number is unset." );
        }

        eval {
            $mq->channel_open( $channel_number );

            # Fetch one message at a time
            $mq->basic_qos( $channel_number, { prefetch_count => 1 } );
        };
        if ( $@ )
        {
            LOGDIE( "Unable to open channel $channel_number: $@" );
        }

        $_rabbitmq_connection_for_connection_id{ $conn_id }           = $mq;
        $_reply_to_queues_for_connection_id_function_name{ $conn_id } = ();
        $_results_caches_for_connection_id_function_name{ $conn_id }  = ();
    }

    return $_rabbitmq_connection_for_connection_id{ $conn_id };
}

# Returns "reply_to" queue name for current connection and provided function name
sub _reply_to_queue($$)
{
    my ( $self, $function_name ) = @_;

    my $conn_id = $self->_connection_identifier();

    unless ( defined $_reply_to_queues_for_connection_id_function_name{ $conn_id } )
    {
        $_reply_to_queues_for_connection_id_function_name{ $conn_id } = ();
    }

    unless ( $_reply_to_queues_for_connection_id_function_name{ $conn_id }{ $function_name } )
    {
        my $reply_to_queue = _random_uuid();
        $_reply_to_queues_for_connection_id_function_name{ $conn_id }{ $function_name } = $reply_to_queue;
    }

    return $_reply_to_queues_for_connection_id_function_name{ $conn_id }{ $function_name };
}

# Returns reference to results cache for current connection and provided function name
sub _results_cache_hashref($$)
{
    my ( $self, $function_name ) = @_;

    my $conn_id = $self->_connection_identifier();

    unless ( defined $_results_caches_for_connection_id_function_name{ $conn_id } )
    {
        $_results_caches_for_connection_id_function_name{ $conn_id } = ();
    }

    unless ( defined $_results_caches_for_connection_id_function_name{ $conn_id }{ $function_name } )
    {
        $_results_caches_for_connection_id_function_name{ $conn_id }{ $function_name } = {};

        tie %{ $_results_caches_for_connection_id_function_name{ $conn_id }{ $function_name } }, 'Tie::Cache',
          {
            MaxCount => $RABBITMQ_RESULTS_CACHE_MAXCOUNT,
            MaxBytes => $RABBITMQ_RESULTS_CACHE_MAXBYTES
          };
    }

    return $_results_caches_for_connection_id_function_name{ $conn_id }{ $function_name };
}

# Channel number we should be talking to
sub _channel_number()
{
    # Each PID + credentials pair has its own connection so we can just use constant channel
    return 1;
}

sub _declare_queue($$$$;$)
{
    my ( $self, $queue_name, $durable, $declare_and_bind_exchange, $lazy_queue ) = @_;

    unless ( defined $queue_name )
    {
        LOGCONFESS( 'Queue name is undefined' );
    }

    my $mq = $self->_mq();

    my $channel_number = _channel_number();
    my $options        = {
        durable     => $durable,
        auto_delete => 0,
    };
    my $arguments = {
        'x-max-priority' => _priority_count(),
        'x-queue-mode'   => ( $lazy_queue ? 'lazy' : 'default' ),
    };

    eval { $mq->queue_declare( $channel_number, $queue_name, $options, $arguments ); };
    if ( $@ )
    {
        LOGDIE( "Unable to declare queue '$queue_name': $@" );
    }

    if ( $declare_and_bind_exchange )
    {
        my $exchange_name = $queue_name;
        my $routing_key   = $queue_name;

        eval {
            $mq->exchange_declare(
                $channel_number,
                $exchange_name,
                {
                    durable     => $durable,
                    auto_delete => 0,
                }
            );
            $mq->queue_bind( $channel_number, $queue_name, $exchange_name, $routing_key );
        };
        if ( $@ )
        {
            LOGDIE( "Unable to bind queue '$queue_name' to exchange '$exchange_name': $@" );
        }
    }
}

sub _declare_task_queue($$;$)
{
    my ( $self, $queue_name, $lazy_queue ) = @_;

    unless ( defined $queue_name )
    {
        LOGCONFESS( 'Queue name is undefined' );
    }

    my $durable                   = $RABBITMQ_QUEUE_DURABLE;
    my $declare_and_bind_exchange = 1;

    return $self->_declare_queue( $queue_name, $durable, $declare_and_bind_exchange, $lazy_queue );
}

sub _declare_results_queue($$;$)
{
    my ( $self, $queue_name, $lazy_queue ) = @_;

    unless ( defined $queue_name )
    {
        LOGCONFESS( 'Queue name is undefined' );
    }

    my $durable                   = $RABBITMQ_QUEUE_TRANSIENT;
    my $declare_and_bind_exchange = 0;

    return $self->_declare_queue( $queue_name, $durable, $declare_and_bind_exchange, $lazy_queue );
}

sub _publish_json_message($$$;$$)
{
    my ( $self, $routing_key, $payload, $extra_options, $extra_props ) = @_;

    my $mq = $self->_mq();

    unless ( $routing_key )
    {
        LOGCONFESS( 'Routing key is undefined.' );
    }
    unless ( $payload )
    {
        LOGCONFESS( 'Payload is undefined.' );
    }

    my $payload_json;
    eval { $payload_json = $json->encode( $payload ); };
    if ( $@ )
    {
        LOGDIE( "Unable to encode JSON message: $@" );
    }

    my $channel_number = _channel_number();

    my $options = {};
    if ( $extra_options )
    {
        $options = { %{ $options }, %{ $extra_options } };
    }
    my $props = {
        content_type     => 'application/json',
        content_encoding => 'utf-8',
    };
    if ( $extra_props )
    {
        $props = { %{ $props }, %{ $extra_props } };
    }

    eval { $mq->publish( $channel_number, $routing_key, $payload_json, $options, $props ); };
    if ( $@ )
    {
        LOGDIE( "Unable to publish message to routing key '$routing_key': $@" );
    }
}

sub _random_uuid()
{
    # Celery uses v4 (random) UUIDs
    return create_uuid_as_string( UUID_RANDOM );
}

sub _priority_to_int($)
{
    my $priority = shift;

    unless ( exists $RABBITMQ_PRIORITIES{ $priority } )
    {
        LOGDIE( "Unknown job priority: $priority" );
    }

    return $RABBITMQ_PRIORITIES{ $priority };
}

sub _priority_count()
{
    return scalar( keys( %RABBITMQ_PRIORITIES ) );
}

sub _process_worker_message($$$)
{
    my ( $self, $function_name, $message ) = @_;

    my $mq = $self->_mq();

    my $correlation_id = $message->{ props }->{ correlation_id };
    unless ( $correlation_id )
    {
        LOGDIE( '"correlation_id" is empty.' );
    }

    # "reply_to" might be empty if sending back job results is disabled via
    # !publish_results()
    my $reply_to = $message->{ props }->{ reply_to };

    my $priority = $message->{ props }->{ priority } // 0;

    my $delivery_tag = $message->{ delivery_tag };
    unless ( $delivery_tag )
    {
        LOGDIE( "'delivery_tag' is empty." );
    }

    my $payload_json = $message->{ body };
    unless ( $payload_json )
    {
        LOGDIE( 'Message payload is empty.' );
    }

    my $payload;
    eval { $payload = $json->decode( $payload_json ); };
    if ( $@ or ( !$payload ) or ( ref( $payload ) ne ref( {} ) ) )
    {
        LOGDIE( 'Unable to decode payload JSON: ' . $@ );
    }

    if ( $payload->{ task } ne $function_name )
    {
        LOGDIE( "Task name is not '$function_name'; maybe you're using same queue for multiple types of jobs?" );
    }

    my $celery_job_id = $payload->{ id };
    my $args          = $payload->{ kwargs };

    # Do the job
    my $job_result;
    eval { $job_result = $function_name->run_locally( $args, $celery_job_id ); };
    my $error_message = $@;

    # If the job has failed, run_locally() has already printed the error
    # message multiple times at this point so we don't repeat outselves

    if ( $reply_to )
    {    # undef if !publish_results()

        # Construct response based on whether the job succeeded or failed
        my $response;
        if ( $error_message )
        {
            ERROR( "Job '$celery_job_id' died: $@" );
            $response = {
                'status'    => 'FAILURE',
                'traceback' => "Job died: $error_message",
                'result'    => {
                    'exc_message' => 'Task has failed',
                    'exc_type'    => 'Exception',
                },
                'task_id'  => $celery_job_id,
                'children' => []
            };
        }
        else
        {
            $response = {
                'status'    => 'SUCCESS',
                'traceback' => undef,
                'result'    => $job_result,
                'task_id'   => $celery_job_id,
                'children'  => []
            };
        }

        # Send message back with the job result
        eval {
            $self->_declare_results_queue( $reply_to, $function_name->lazy_queue() );
            $self->_publish_json_message(
                $reply_to,
                $response,
                {
                    # Options
                },
                {
                    # Properties
                    delivery_mode  => $RABBITMQ_DELIVERY_MODE_NONPERSISTENT,
                    priority       => $priority,
                    correlation_id => $celery_job_id,
                }
            );
        };
        if ( $@ )
        {
            LOGDIE( "Unable to publish job $celery_job_id result: $@" );
        }
    }

    # ACK the message (mark the job as completed)
    eval { $mq->ack( _channel_number(), $delivery_tag ); };
    if ( $@ )
    {
        LOGDIE( "Unable to mark job $celery_job_id as completed: $@" );
    }
}

sub start_worker($$)
{
    my ( $self, $function_name ) = @_;

    my $mq = $self->_mq();

    $self->_declare_task_queue( $function_name, $function_name->lazy_queue() );

    my $consume_options = {

        # Don't assume that the job is finished when it reaches the worker
        no_ack => 0,
    };
    my $consumer_tag = $mq->consume( _channel_number(), $function_name, $consume_options );

    INFO( "Consumer tag: $consumer_tag" );
    INFO( "Worker is ready and accepting jobs" );
    my $recv_timeout = 0;    # block until message is received
    while ( my $message = $mq->recv( 0 ) )
    {
        $self->_process_worker_message( $function_name, $message );
    }
}

sub run_job_sync($$$$)
{
    my ( $self, $function_name, $args, $priority ) = @_;

    my $mq = $self->_mq();

    # Post the job
    my $publish_results = 1;    # always publish results when running synchronously
    my $celery_job_id = $self->_run_job_on_rabbitmq( $function_name, $args, $priority, $publish_results );

    # Declare result queue (ignore function's publish_results())
    my $reply_to_queue = $self->_reply_to_queue( $function_name );
    eval { $self->_declare_results_queue( $reply_to_queue, $function_name->lazy_queue() ); };
    if ( $@ )
    {
        LOGDIE( "Unable to declare results queue '$reply_to_queue': $@" );
    }

    my $results_cache = $self->_results_cache_hashref( $function_name );

    my $message;
    if ( exists $results_cache->{ $celery_job_id } )
    {
        # Result for this job ID was fetched previously -- return from cache
        DEBUG( "Results message for job ID '$celery_job_id' found in cache" );
        $message = $results_cache->{ $celery_job_id };
        delete $results_cache->{ $celery_job_id };

    }
    else
    {
        # Result not yet fetched -- process the result queue

        my $channel_number  = _channel_number();
        my $consume_options = {};
        my $consumer_tag    = $mq->consume( $channel_number, $reply_to_queue, $consume_options );

        my $recv_timeout = 0;    # block until message is received

        while ( my $queue_message = $mq->recv( 0 ) )
        {
            my $correlation_id = $queue_message->{ props }->{ correlation_id };
            unless ( $correlation_id )
            {
                LOGDIE( '"correlation_id" is empty.' );
            }

            if ( $correlation_id eq $celery_job_id )
            {
                DEBUG( "Found results message with job ID '$celery_job_id'." );
                $message = $queue_message;
                last;

            }
            else
            {
                # Message belongs to some other job -- add to cache and continue
                DEBUG( "Results message '$correlation_id' does not belong to job ID '$celery_job_id'." );
                $results_cache->{ $correlation_id } = $queue_message;
            }
        }
    }

    unless ( $message )
    {
        LOGDIE( "At this point, message should have been fetched either from broker or from cache" );
    }

    my $correlation_id = $message->{ props }->{ correlation_id };
    unless ( $correlation_id )
    {
        LOGDIE( '"correlation_id" is empty.' );
    }
    if ( $correlation_id ne $celery_job_id )
    {
        # Message belongs to some other job -- requeue and skip
        DEBUG( "'correlation_id' ('$correlation_id') is not equal to job ID ('$celery_job_id')." );
        next;
    }

    my $payload_json = $message->{ body };
    unless ( $payload_json )
    {
        LOGDIE( 'Message payload is empty.' );
    }

    my $payload;
    eval { $payload = $json->decode( $payload_json ); };
    if ( $@ or ( !$payload ) or ( ref( $payload ) ne ref( {} ) ) )
    {
        LOGDIE( 'Unable to decode payload JSON: ' . $@ );
    }

    if ( $payload->{ task_id } ne $celery_job_id )
    {
        LOGDIE( "'task_id' ('$payload->{ task_id }') is not equal to job ID ('$celery_job_id')." );
    }

    # Return job result
    if ( $payload->{ status } eq 'SUCCESS' )
    {
        # Job completed successfully
        return $payload->{ result };

    }
    elsif ( $payload->{ status } eq 'FAILURE' )
    {
        # Job failed -- pass the failure to the caller
        LOGDIE( "Job '$celery_job_id' failed: " . $payload->{ traceback } );

    }
    else
    {
        # Unknown value
        LOGDIE( "Unknown 'status' value: " . $payload->{ status } );
    }
}

sub run_job_async($$$$)
{
    my ( $self, $function_name, $args, $priority ) = @_;

    return $self->_run_job_on_rabbitmq( $function_name, $args, $priority, $function_name->publish_results() );
}

sub _run_job_on_rabbitmq($$$$$)
{
    my ( $self, $function_name, $args, $priority, $publish_results ) = @_;

    unless ( defined( $args ) )
    {
        $args = {};
    }
    unless ( ref( $args ) eq ref( {} ) )
    {
        LOGDIE( "'args' is not a hashref." );
    }

    my $celery_job_id = create_uuid_as_string( UUID_RANDOM );

    # Encode payload
    my $payload = {
        'expires'   => undef,
        'utc'       => JSON::true,
        'args'      => [],
        'chord'     => undef,
        'callbacks' => undef,
        'errbacks'  => undef,
        'taskset'   => undef,
        'id'        => $celery_job_id,
        'retries'   => $function_name->retries(),
        'task'      => $function_name,
        'timelimit' => [ undef, undef, ],
        'eta'       => undef,
        'kwargs'    => $args,
    };

    # Declare task queue
    $self->_declare_task_queue( $function_name, $function_name->lazy_queue() );

    my $reply_to_queue;
    if ( $publish_results )
    {
        # Declare result queue before posting a job (just like Celery does)
        $reply_to_queue = $self->_reply_to_queue( $function_name );
        $self->_declare_results_queue( $reply_to_queue, $function_name->lazy_queue() );
    }
    else
    {
        $reply_to_queue = '';    # undef doesn't work with Net::AMQP::RabbitMQ
    }

    # Post a job
    eval {
        my $rabbitmq_priority = _priority_to_int( $priority );
        $self->_publish_json_message(
            $function_name,
            $payload,
            {
                # Options
                exchange => $function_name
            },
            {
                # Properties
                delivery_mode  => $RABBITMQ_DELIVERY_MODE_PERSISTENT,
                priority       => $rabbitmq_priority,
                correlation_id => $celery_job_id,
                reply_to       => $reply_to_queue,
            }
        );
    };
    if ( $@ )
    {
        LOGDIE( "Unable to add job '$celery_job_id' to queue: $@" );
    }

    return $celery_job_id;
}

sub job_id_from_handle($$)
{
    my ( $self, $job_handle ) = @_;

    return $job_handle;
}

sub set_job_progress($$$$)
{
    my ( $self, $job, $numerator, $denominator ) = @_;

    LOGDIE( "FIXME not implemented." );
}

sub job_status($$$)
{
    my ( $self, $function_name, $job_id ) = @_;

    LOGDIE( "FIXME not implemented." );
}

sub show_jobs($)
{
    my $self = shift;

    LOGDIE( "FIXME not implemented." );
}

sub cancel_job($)
{
    my ( $self, $job_id ) = @_;

    LOGDIE( "FIXME not implemented." );
}

sub server_status($$)
{
    my $self = shift;

    LOGDIE( "FIXME not implemented." );
}

sub workers($)
{
    my $self = shift;

    LOGDIE( "FIXME not implemented." );
}

no Moose;    # gets rid of scaffolding

1;