++ed by:
KEEDI ROMANF
Author image Atsushi Kobayashi
and 1 contributors

NAME

Qudo::Manual::JA - Qudo's document lang:ja

DESCRIPTION

QudoはバックエンドにRDBMSなどを利用したJob Queueing Management Systemです。

PerlにはTheSchwartzというJob Queueing Management Systemがありますが、 利用者が拡張しやすくなるように考えてつくりました。

Job Queueingとは

「Job Queueingとは何か」を簡単に言えば

HTTPサーバで扱うには少し重く厳密なリアルタイム性が求められない処理を

HTTPのサイクルから切り離した部分で処理を行わせたりすることができる仕組みです。

もちろんバッチ処理の変わりに使う事もできます。

例えば、あるユーザ登録型サイトがあるとして、

ユーザがサイトに登録した場合、登録完了メールをユーザに送る事が有ると思います。

実際の登録処理はHTTPのリクエスト内で行いますが、

メールの配信はそこまで厳密なリアルタイム性が求められていないため、

HTTPのサイクル内では実際のメール送信を行わず、

「この人にサイト登録完了メールを送信しておいてね」

という仕事をお願い(Job Queueing)しておき、

メール送信を専門に担当する処理にまかせたりします。

Qudoではこの

「この人にサイト登録完了メールを送信しておいてね」

をお願いするクライアントと

「この人にサイト登録完了メールを送信しておいてね」

を実際に処理するワーカーの仕組みを提供します。

また、Cronなどで起動されるバッチ処理でも使う事ができます。

CronなどでPerlのスクリプトを起動させると、処理に必要なモジュールのロードに時間がかかりますが、

常駐しているワーカーに処理の依頼だけを出し、実際の処理は必要なモジュールが既にロードされている

ワーカーが処理することで、バッチの起動コストを押さえる事も可能です。

まず始めに

Qudoを使うにはMySQLもしくはSQLiteにQudo用のデータベースを設定する必要があります。

MySQL用のスキーマはdoc/schema-mysql.sqlに SQLite用のスキーマはdoc/schema-sqlite.sqlにありますのでこれを基にデータベースを作成してください。

現在のところQudoではMySQLもしくはSQLiteにしか対応していません。

client

QudoでJob Queueingを行うクライアントは以下のように書きます

    # in your script:
    use Qudo;
    my $client = Qudo->new(
        driver_class => 'Skinny', # DBIx::Skinny
        database => +{
            dsn      => 'dbi:SQLite:/tmp/qudo.db',
            username => '',
            password => '',
        },
    );

    # enqueue job
    $client->enqueue("Your::Worker::Mail", { arg => $user->email, uniqkey => $user->login_id});

まずQudoのオブジェクトを作成します。

JobをQueueingするデータベースを指定してオブジェクトを作成します。

作成したオブジェクトを元にJobをQueueingします。

enqueueメソッドの

第一引数はJobを処理させるWorkerの名前

第二引数はWorkerが使う引数の情報

この例の場合、Jobを処理するクラスとしてYour::Worker::Mailを指定し、

処理させる引数として$user->email(メールアドレス)を指定し、

Jobのユニークキーとして$user->login_id(ログインID)を指定しています。

第二引数のhashrefに

run_afterに秒数を指定することで、現在から何秒後に処理を開始させるかを指定させることも出来ます。

クライアントが行う処理はこれだけです。

あとはこれから作成するYour::Worker::Mailが処理してくれます。

worker

queueingされたJobを処理していくワーカーを作成します

    package Your::Worker::Mail;
    use base 'Qudo::Worker';
    sub work {
        my ($class, $job) = @_;
        print $job->arg; #print $user->email;
        
        # send mail process...
        
        $job->completed; # finished job!
    }
    1;

Qudo::Wokerを継承したクラスを作成し、

workメソッドをオーバーライドしてください。

workメソッドの第二引数にはJobの情報が丸々わたってきます。

$job->argメソッドでQueueingする際に指定した、Workerに使ってほしいと指定された引数情報が格納されています。

次に、 workerを起動するスクリプトを以下のように用意します

    # ex) qudo-worker.pl :
    use Qudo;
    my $worker = Qudo->new(
        driver_class => 'Skinny',
        database => +{
            dsn      => 'dbi:SQLite:/tmp/qudo.db',
            username => '',
            password => '',
        },
        manager_abilities => [qw/Your::Worker::Mail/], # set worker
    );
    $worker->work(); # boot manager
    # work work work!

clientと同じようにワーカーが使うデータベースを指定してQudoのオブジェクトを作成します。

この時にmanager_abilitiesの引数をつかって、

このworkerが管理するWorkerクラスを指定します。

$worker->work()メソッドよ呼び出すことで、

JobがQueueingされる毎にWorkerクラスを呼び出して処理させています。

基本的にはこれだけです。

簡単でしょ:)

workメソッド内では無限ループが発生し、JobがQueueingたびにJobに対応するWorkerに処理が移譲されます。

Hook

Qudoでは各所にHookポイントがあるので、Hookポイントをつかって各種処理を行う事ができます。

Job を enqueueする時に引数をシリアライズしたりなどです。

たとえば、enqueueする引数の情報をJSONでシリアライズしたい場合は

    my $client = Qudo->new(...);
    $client->global_register_hooks('Qudo::Hook::Serialize::JSON');
    $client->enqueue('Test::Worker', { arg => {name => 'nekokak'}, uniqkey => 'uniq'});

このようにHookモジュールをregister_pluginsに渡す事で設定でき、 設定した引数がJSONでシリアライズされて保存されます。

上記の場合Hookは全てのWorkerクラスに対して適用されるグローバルな設定となります。

特定のWorkerだけ特定のHookをかけたい場合は、WorkerクラスにHookの設定を別途行います。

    package Worker::Test;
    use base 'Qudo::Worker';
    __PACKAGE__->register_hooks('Qudo::Hook::Serialize::JSON');
    sub work {
        my ($self, $job) = @_;
        $job->completed();
    }

worker毎に異なるHookを設定したい場合は、global_register_hooksで設定せず、 worker毎にregister_hooksメソッドで設定してください。

Hookは利用者が好きに書く事ができるので、Qudoが持っていない機能も簡単に書く事ができます。

あなたが再利用できるHookを書いたのであれば私に教えてくださいね:)

Plugin

Qudoでは好きにPluginを書く事ができます。

たとえばworkerで使うdebug print用のPluginを用意したとします。

    package Your::Plugin::DebugPrint;
    use strict;
    use warnings;
    use base 'Qudo::Plugin';
    
    sub plugin_name { 'debug' }
    
    sub load {
        my $class = shift;
        $class->register(
            sub {
                my $val = shift;
                print STDOUT $val;
            }
        );
    }

用意したPluginをloadするには

    $worker->register_plugins(qw/Your::Plugin::DebugPrint/);

このようにregister_pluginsメソッドにロードさせたいモジュールのパッケージ名を指定します。

プラグインの呼び出し方は

    package Worker::Test;
    use base 'Qudo::Worker';
    sub work {
        my ($self, $job) = @_;
        $job->manager->plugin->{debug}->('debug message in Worker::Test::work');
        $job->completed();
    }

このようにします。

Your::Plugin::DebugPrintで指定したplugin_nameの値がハッシュのキーになっているので、 そのキー経由でプラグインを呼び出す事ができます。

ちなみに現在のところPluginはHookの様にグローバルなPluginとworkerローカルなPluginの設定の様に切り替える事はできません。

これまた再利用できるPluginを書いたのであれば教えてください:)

Driver

QudoではJobをstoreする仕組みを差し替える事ができます。

デフォルトではDBIx::Skinnyを利用しRDBMSを利用したDriverが使われます。

Skinny以外にはDBIをサポートしています。

DriverもHookやPlugin同様、好きに書く事ができますので、DBICが使いたい人、CDBIを使いたい人、Data::Modelを使いたい人など色々有ると思いますので

好きに書いてください:)

書いたら教えてもらえるととてもうれしいです:)

好きに書いたDriverは

    Qudo->new(
        driver_class => 'Your::Driver',
        database => +{
            dsn      => 'dbi:SQLite:/tmp/qudo.db',
            username => '',
            password => '',
        },
    );

driver_classに設定することで使う事ができます。

exception_log

jobの処理がcompleteしなかった場合、exception_logテーブルにエラー情報が書き込まれます。 TheSchwartzの場合でもjobの処理が失敗した場合、errorテーブルにエラー情報が書き込まれますが、 errorテーブルに一定数以上のレコードが溜まらないように、適宜deleteされます。 Qudoの場合はexception_logに書き込まれた情報はQudo自体が消す事は有りません。 これは、知らないエラーが発生したまま情報が消えてしまう事を嫌った為です。

Qudoではexception_logの情報を取り出すためのメソッドを用意しています。

    $client->exception_list;

exception_listメソッドをつかえば、発生した例外情報を参照する事ができます。

発生した例外を確認したのち、もう一度リトライさせたい場合があるとおもいます。 その場合は以下のようにすると良いでしょう。

    my $exceptions = $client->exception_list
    $client->enqueue_from_failed_job($exceptions->[0]);

こうするだけで、例外が発生した時のJobの情報を元にJobが再登録されます。

再登録した例外情報にはretriedが1になり同じ例外情報からは再度Jobが登録できなくなります。

job_count

QudoではJobが溜まりすぎていないかを簡単に調べる事ができます。

    $client->job_count([qw/Your::Worker::Mail/]);

job_countメソッドに調べたいワーカー名を指定すると、処理されていないJobの件数を取得する事ができます。

job_list

job_listメソッドを使うと現在enqueueされているJobの情報を参照する事ができます。

    $client->job_list([qw/Your::Worker::Mail/]);