The London Perl and Raku Workshop takes place on 26th Oct 2024. If your company depends on Perl, please consider sponsoring and/or attending.

NAME

Schedule::LongSteps - Manage long term processes over arbitrary large spans of time.

ABSTRACT

This attempts to solve the problem of defining and running a set of potentially conditional steps accross an arbitrary long timespan.

An example of such a process would be: "After an order has been started, if more than one hour, send an email reminder every 2 days until the order is finished. Give up after a month"". You get the idea.

Such a process is usually a pain to implement and this is an attempt to provide a framework so it would make writing and testing such a process as easy as writing and testing a good old Class.

CONCEPTS

Process

A Process represents a set of logically linked steps that need to run over a long span of times (hours, months, even years..). It persists in a Storage.

At the logical level, the persistant Process has the following attributes (See Schedule::LongSteps::Storage::DBIxClass for a comprehensive list):

- what. Which step should it run next.

- run_at. A DateTime at which this next step should be run. This allows running a step far in the future.

- status. Is the step running, or paused or is the process terminated.

- state. The persistant state of your application. This should be a pure Perl hash (JSONable).

Users (you) implement their business process as a subclass of Schedule::LongSteps::Process. Such subclasses can have contextual properties as Moose properties that will have to be supplied by the Schedule::LongSteps management methods.

Steps

A step is simply a subroutine in a process class that runs some business code. It always returns either a new step to be run or a final step marker.

Storage

A storage provides the backend to persist processes. Build a Schedule::LongSteps with a storage instance.

See section PERSISTANCE for a list of available storage classes.

Manager: Schedule::LongSteps

A Schedule::LongSteps provides an entry point to all thing related to Schedule::LongSteps process management. You should keep once instance of this in your application (well, one instance per process) as this is what you are going to use to launch and manage processes.

QUICK START AND SYNOPSIS

First write a class to represent your long running set of steps

  package My::Application::MyLongProcess;

  use Moose;
  extends qw/Schedule::LongSteps::Process/;

  # Some contextual things.
  has 'thing' => ( is => 'ro', required => 1); # Some mandatory context provided by your application at each regular run.

  # The first step should be executed after the process is installed on the target.
  sub build_first_step{
    my ($self) = @_;
    return $self->new_step({ what => 'do_stuff1', run_at => DateTime->now() });
  }

  sub do_stuff1{
     my ($self) = @_;

      # The starting state
      my $state = $self->state();

      my $thing = $self->thing();

     .. Do some stuff and return the next step to execute ..

      return $self->new_step({ what => 'do_stuff2', run_at => DateTime->... , state => { some => 'jsonable', hash => 'ref'  ]  });
  }

  sub do_stuff2{
      my ($self, $step) = @_;

      $self->wait_for_steps('do_stuff1', 'do_stuff2' );

      .. Do some stuff and terminate the process or goto do_stuff1 ..

       if( ... ){
           return Schedule::LongSteps::Step->new({ what => 'do_stuff1', run_at => DateTime->... , state => { some jsonable structure } });
       }
       return $self->final_step({ state => { the => final, state => 1 }  }) ;
  }

  __PACKAGE__->meta->make_immutable();

Then in you main application do this once per 'target':

   my $dbic_storage = Schedule::LongSteps::Storage::DBIxClass->new(...);
   # Keep only ONE Instance of this in your application.
   my $longsteps = Schedule::LongSteps->new({ storage => $dbic_storage });
   ...

   $longsteps->instantiate_process('My::Application::MyProcess', { thing => 'whatever' }, { the => 'init', state => 1 });

Then regularly (in a cron, or a recurring callback):

   my $dbic_storage = Schedule::LongSteps::Storage::DBIxClass->new(...);
   # Keep only ONE instance of this in your application.
   my $longsteps = Schedule::LongSteps->new({ storage => $dbic_storage });
   ...

   $long_steps->run_due_steps({ thing => 'whatever' });

EXAMPLE

Look at https://github.com/jeteve/Schedule-LongSteps/blob/master/t/fullblown.t for a full blown working example.

PERSISTANCE

The persistance of processes is managed by a subclass of Schedule::LongSteps::Storage that you should instantiate and given to the constructor of Schedule::LongSteps

Example:

   my $dbic_storage = Schedule::LongSteps::Storage::DBIxClass->new(...);
   my $longsteps = Schedule::LongSteps->new({ storage => $dbic_storage });
   ...

Out of the box, the following storage classes are available:

Schedule::LongSteps::Storage::Memory

Persist processes in memory. Not very useful, except for testing. This is the storage of choice to unit test your processes.

Schedule::LongSteps::Storage::AutoDBIx

Persist processes in a relational DB (a $dbh from DBI). This is the easiest thing to use if you want to persist processes in a database, without having to worry about creating a DBIx::Class model yourself.

Schedule::LongSteps::Storage::DBIxClass

Persist processes in an existing DBIx::Class schema. Nice if you want to have only one instance of Schema in your application and if don't mind writing your own resultset.

COOKBOOK

WRITING A NEW PROCESS

See 'QUICK START AND SYNOPSIS'

INSTANTIATING A NEW PROCESS

See 'QUICK START AND SYNOPSIS'

RUNNING PROCESS STEPS

See 'QUICK START AND SYNOPSIS

BEING NOTIFIED OF ANY OF YOUR PROCESS ERROR

Use the property 'on_error' on the Schedule::LongStep manager:

  my $longsteps = Schedule::LongStep->new({ storage => ..,
                                            on_error => sub{
                                              my ( $stored_process ) = @_;
                                              .. do stuff with: ..
                                              $stored_process->error(), $stored_process->process_class(),
                                              $stored_process->state(), etc...
                                            }
                                           });

Note that an error in your error handler itself will result in the output of a pure Perl warning and an emmission of a 'critical' level Log::Any log event.

INJECTING PARAMETERS IN YOUR PROCESSES

Of course each instance of your process will most probably need to act on different pieces of application data. The one and only way to give 'parameters' to your processes is to specify an initial state when you instantiate a process:

  $longsteps->instantiate_process('My::App', { app => $app } , { work => 'on' , this => 'user_id' });

INJECTING CONTEXT IN YOUR PROCESSES

Let's say you hold an instance of your application object:

  my $app = ...;

And you want to use it in your processes:

  package MyProcess;
  ...
  has 'app' => (is => 'ro', isa => 'My::App', required => 1);

You can inject your $app instance in your processes at instantiation time:

  $longsteps->instantiate_process('My::App', { app => $app });

And also when running the due steps:

  $longsteps->run_due_steps({ app => $app });

The injected context should be stable over time. Do NOT use this to inject parameters. (See INJECTING PARAMETERS).

PROCESS WRITING

This package should be expressive enough for you to implement business processes as complex as those given as an example on this page: https://en.wikipedia.org/wiki/XPDL

Proper support for XPDL is not implemented yet, but here is a list of recipes to implement the most common process patterns:

MOVING TO A FINAL STATE

Simply do in your step 'do_last_stuff' implementation:

   sub do_last_stuff{
      my ($self) = @_;
      # Return final_step with the final state.
      return $self->final_step({ state => { the => 'final' , state => 1 } });
   }

DO SOMETHING ELSE IN X AMOUNT OF TIME

   sub do_stuff{
        ...
        # Do the things that have to be done NOW
        ...
        # And in two days, to this
        return $self->new_step({ what => 'do_stuff_later', run_at => DateTime->now()->add( days => 2 ) ,  state => { some => 'new one' }});
   }

DO SOMETHING CONDITIONALLY

   sub do_choose{
      if( ... ){
         return $self->new_step({ what => 'do_choice1', run_at => DateTime->now() });
      }
      return $self->new_step({ what => 'do_choice2', run_at => DateTime->now() });
   }

   sub do_choice1{...}
   sub do_choice2{...}

FORKING AND WAITING FOR PROCESSES

  sub do_fork{
     ...
     my $p1 = $self->longsteps->instantiate_process('AnotherProcessClass', \%build_args , \%initial_state );
     my $p2 = $self->longsteps->instantiate_process('YetAnotherProcessClass', \%build_args2 , \%initial_state2 );
     ...
     return $self->new_step({ what => 'do_join', run_at => DateTime->now() , { processes => [ $p1->id(), p2->id() ] } });
  }

  sub do_join{
     return $self->wait_processes( $self->state()->{processes}, sub{
          my ( @terminated_processes ) = @_;
          my $state1 = $terminated_processes[0]->state();
          my $state2 = $terminated_processes[1]->state();
          ...
          # And as usual:
          return $self->...
     });
  }

ATTRIBUTES

storage

An instance of a subclass of Schedule::LongSteps::Storage. See SYNOPSIS.

on_error

A callback called like $on_error->( $stored_process ). See COOKBOOK for an example

METHODS

uuid

Returns a Data::UUID from the storage.

run_due_processes

Runs all the due processes steps according to now(). All processes are given the context to be built.

Usage:

 # No context given:
 $this->run_due_processes();

 # With 'thing' as context:
 $this->run_due_processes({ thing => ... });

Returns the number of processes run

instantiate_process

Instantiate a stored process from the given process class returns a new process that will have an ID.

Usage:

  $this->instantiate_process( 'MyProcessClass', { process_attribute1 => .. } , { initial => 'state' });

find_process

Shortcut to $self->storage->find_process( $pid );

SEE ALSO

BPM::Engine A business Process engine based on XPDL, in Alpha version since 2012 (at this time of writing)

Copyright and Acknowledgement

This code is released under the Perl5 Terms by Jerome Eteve (JETEVE), with the support of Broadbean Technologies Ltd.

See perlartistic