package MogileFS::Server;
use strict;
use warnings;
use vars qw($VERSION);
$VERSION = "2.73";

=head1 NAME

MogileFS::Server - MogileFS (distributed filesystem) server

=head1 SYNOPSIS

 $s = MogileFS::Server->server;
 $s->run;

=cut

use IO::Socket;
use Symbol;
use POSIX;
use File::Copy ();
use Carp;
use File::Basename ();
use File::Path ();
use Sys::Syslog ();
use Time::HiRes ();
use Net::Netmask;
use List::Util;
use Socket qw(SO_KEEPALIVE IPPROTO_TCP TCP_NODELAY);

use MogileFS::Util qw(daemonize);
use MogileFS::Config;

use MogileFS::ProcManager;
use MogileFS::Connection::Client;
use MogileFS::Connection::Worker;

use MogileFS::Worker::Query;
use MogileFS::Worker::Delete;
use MogileFS::Worker::Replicate;
use MogileFS::Worker::Reaper;
use MogileFS::Worker::Monitor;
use MogileFS::Worker::Fsck;
use MogileFS::Worker::JobMaster;

use MogileFS::Factory::Domain;
use MogileFS::Factory::Class;
use MogileFS::Factory::Host;
use MogileFS::Factory::Device;
use MogileFS::Domain;
use MogileFS::Class;
use MogileFS::Host;
use MogileFS::Device;

use MogileFS::HTTPFile;
use MogileFS::FID;
use MogileFS::DevFID;

use MogileFS::Store;

use MogileFS::ReplicationPolicy::MultipleHosts;

my $server; # server singleton
sub server {
    my ($pkg) = @_;
    return $server ||= bless {}, $pkg;
}

# --------------------------------------------------------------------------
# instance methods:
# --------------------------------------------------------------------------

sub run {
    my $self = shift;

    MogileFS::Config->load_config;

    # don't run as root
    die "mogilefsd cannot be run as root\n"
        if $< == 0 && MogileFS->config('user') ne "root";

    MogileFS::Config->check_database;
    daemonize() if MogileFS->config("daemonize");

    MogileFS::ProcManager->set_min_workers('monitor'     => 1);

    # open up our log
    Sys::Syslog::openlog('mogilefsd', 'pid', 'daemon');
    Mgd::log('info', 'beginning run');

    unless (MogileFS::ProcManager->write_pidfile) {
        Mgd::log('info', "Couldn't write pidfile, ending run");
        Sys::Syslog::closelog();
        exit 1;
    }

    # Install signal handlers.
    $SIG{TERM}  = sub {
        my @children = MogileFS::ProcManager->child_pids;
        print STDERR scalar @children, " children to kill.\n" if $DEBUG;
        my $count = kill( 'TERM' => @children );
        print STDERR "Sent SIGTERM to $count children.\n" if $DEBUG;
        MogileFS::ProcManager->remove_pidfile;
        Mgd::log('info', 'ending run due to SIGTERM');
        Sys::Syslog::closelog();

        exit 0;
    };

    $SIG{INT}  = sub {
        my @children = MogileFS::ProcManager->child_pids;
        print STDERR scalar @children, " children to kill.\n" if $DEBUG;
        my $count = kill( 'INT' => @children );
        print STDERR "Sent SIGINT to $count children.\n" if $DEBUG;
        MogileFS::ProcManager->remove_pidfile;
        Mgd::log('info', 'ending run due to SIGINT');
        exit 0;
    };
    $SIG{PIPE} = 'IGNORE';  # catch them by hand

    # setup server sockets to listen for client connections
    my @servers;
    foreach my $listen (@{ MogileFS->config('listen') }) {
        my $server = IO::Socket::INET->new(LocalAddr => $listen,
                                           Type      => SOCK_STREAM,
                                           Proto     => 'tcp',
                                           Blocking  => 0,
                                           Reuse     => 1,
                                           Listen    => 1024 )
            or die "Error creating socket: $@\n";
        $server->sockopt(SO_KEEPALIVE, 1);
        $server->setsockopt(IPPROTO_TCP, TCP_NODELAY, 1);

        # save sub to accept a client
        push @servers, $server;
        Danga::Socket->AddOtherFds( fileno($server) => sub {
                while (my $csock = $server->accept) {
                    MogileFS::Connection::Client->new($csock);
                }
            } );
    }

    MogileFS::ProcManager->push_pre_fork_cleanup(sub {
        # so children don't hold server connection open
        close($_) foreach @servers;
    });

    # setup the post event loop callback to spawn jobs, and the timeout
    Danga::Socket->DebugLevel(3);
    Danga::Socket->SetLoopTimeout( 250 ); # 250 milliseconds
    Danga::Socket->SetPostLoopCallback(MogileFS::ProcManager->PostEventLoopChecker);

    # and now, actually start listening for events
    eval {
        print( "Starting event loop for frontend job on pid $$.\n" ) if $DEBUG;
        Danga::Socket->EventLoop();
    };

    if ($@) {
        Mgd::log('err', "crash log: $@");
        exit 1;
    }
    Mgd::log('info', 'ending run');
    Sys::Syslog::closelog();
    exit(0);
}

# --------------------------------------------------------------------------

package MogileFS;
# just so MogileFS->config($key) will work:
use MogileFS::Config qw(config);

my %hooks;

sub register_worker_command {
    # just pass this through to the Worker class
    return MogileFS::Worker::Query::register_command(@_);
}

sub register_global_hook {
    $hooks{$_[0]} = $_[1];
    return 1;
}

sub unregister_global_hook {
    delete $hooks{$_[0]};
    return 1;
}

sub run_global_hook {
    my $hookname = shift;
    my $ref = $hooks{$hookname};
    return $ref->(@_) if defined $ref;
    return undef;
}

# --------------------------------------------------------------------------

package Mgd;  # conveniently short name
use strict;
use warnings;
use MogileFS::Config;
use MogileFS::Util qw(error fatal debug); # for others calling Mgd::foo()
use Socket qw(SOL_SOCKET SO_RCVBUF AF_UNIX SOCK_STREAM PF_UNSPEC);
BEGIN {
    # detect the receive buffer size for Unix domain stream sockets,
    # we assume the size is identical across all Unix domain sockets.
    socketpair(my $s1, my $s2, AF_UNIX, SOCK_STREAM, PF_UNSPEC)
        or die( "socketpair failed: $!" );

    my $r = getsockopt($s1, SOL_SOCKET, SO_RCVBUF);
    defined $r or die "getsockopt: $!";
    $r = unpack('i', $r) if defined $r;
    $r = (defined $r && $r > 0) ? $r : 8192;
    close $s1;
    close $s2;
    eval 'use constant UNIX_RCVBUF_SIZE => $r';
}

sub server {
    return MogileFS::Server->server;
}

# database checking/connecting
sub validate_dbh { 
    my $sto = Mgd::get_store();
    my $had_dbh = $sto->have_dbh;
    $sto->recheck_dbh();
    my $dbh;
    eval { $dbh = $sto->dbh };
    # Doesn't matter what the failure was; workers should retry later.
    error("Error validating master DB: $@") if $@ && $had_dbh;
    return $dbh;
}

# the eventual replacement for callers asking for a dbh directly:
# they'll ask for the current store, which is a database abstraction
# layer.
my ($store, $store_pid);
sub get_store {
    return $store if $store && $store_pid == $$;
    $store_pid = $$;
    return $store = MogileFS::Store->new;
}

sub close_store {
    if ($store) {
        $store->dbh->disconnect();
        $store = undef;
        return 1;
    }
    return 0;
}

# only for t/ scripts to explicitly set a store, without loading in a config
sub set_store {
    my ($s) = @_;
    $store = $s;
    $store_pid = $$;
}

sub domain_factory {
    return MogileFS::Factory::Domain->get_factory;
}

sub class_factory {
    return MogileFS::Factory::Class->get_factory;
}

sub host_factory {
    return MogileFS::Factory::Host->get_factory;
}

sub device_factory {
    return MogileFS::Factory::Device->get_factory;
}

# log stuff to syslog or the screen
sub log {
    # simple logging functionality
    if (! $MogileFS::Config::daemonize) {
        $| = 1;
        # syslog acts like printf so we have to use printf and append a \n
        shift; # ignore the first parameter (info, warn, critical, etc)
        my $mask = shift; # format string
        $mask .= "\n" unless $mask =~ /\n$/;
        my $message = @_ ? sprintf($mask, @_) : $mask;
        print '[', scalar localtime(), '] ', $message;
    } else {
        # just pass the parameters to syslog
        Sys::Syslog::syslog(@_);
    }
}

1;
__END__
#Just for MakeMaker's kinda lame regexp for ABSTRACT_FROM
=dummypod
mogilefs::server - MogileFS (distributed filesystem) server.