The Perl Toolchain Summit needs more sponsors. If your company depends on Perl, please support this very important event.

NAME

ETL::Pipeline - Extract-Transform-Load pattern for data file conversions

SYNOPSIS

  use ETL::Pipeline;

  # The object oriented interface...
  ETL::Pipeline->new( {
    work_in   => {search => 'C:\Data', find => qr/Ficticious/},
    input     => ['Excel', find => qr/\.xlsx?$/              ],
    mapping   => {Name => 'A', Address => 'B', ID => 'C'     },
    constants => {Type => 1, Information => 'Demographic'    },
    output    => ['SQL', table => 'NewData'                  ],
  } )->process;

  # Or using method calls...
  my $pipeline = ETL::Pipeline->new;
  $pipeline->work_in  ( search => 'C:\Data', find => qr/Ficticious/ );
  $pipeline->input    ( 'Excel', find => qr/\.xlsx?$/i              );
  $pipeline->mapping  ( Name => 'A', Address => 'B', ID => 'C'      );
  $pipeline->constants( Type => 1, Information => 'Demographic'     );
  $pipeline->output   ( 'SQL', table => 'NewData'                   );
  $pipeline->process;

DESCRIPTION

ETL stands for Extract-Transform-Load. ETL isn't just for Data Warehousing. ETL works on almost any type of data conversion. You read the source, translate the data for your target, and store the result.

By dividing a conversion into 3 steps, we isolate the input from the output...

  • Centralizes data formatting and validation.

  • Makes new input formats a breeze.

  • Makes new outputs just as easy.

ETL::Pipeline takes your data files from extract to load. It reads an input source, translates the data, and writes it to an output destination. For example, I use the these pipelines for reading an Excel spread sheet (input) and saving the information in an SQL database (output).

  use ETL::Pipeline;
  ETL::Pipeline->new( {
    work_in   => {search => 'C:\Data', find => qr/Ficticious/},
    input     => ['Excel', find => qr/\.xlsx?$/],
    mapping   => {Name => 'A', Complaint => 'B', ID => 'C'},
    constants => {Client => 1, Type => 'Complaint'}
    output    => ['SQL', table => 'NewData']
  } )->process;

Or like this, calling the methods instead of through the constructor...

  use ETL::Pipeline;
  my $etl = ETL::Pipeline->new;
  $etl->work_in  ( search => 'C:\Data', find => qr/Ficticious/ );
  $etl->input    ( 'Excel', find => qr/\.xlsx?$/               );
  $etl->mapping  ( Name => 'A', Complaint => 'B', ID => 'C'    );
  $etl->constants( Client => 1, Type => 'Complaint'            );
  $etl->output   ( 'SQL', table => 'NewData'                   );
  $etl->process;

What is a pipeline?

The term pipeline describes a complete ETL process - extract, transform, and load. Or more accurately - input, mapping, output. Raw data enters one end of the pipe (input) and useful information comes out the other (output). An ETL::Pipeline object represents a complete pipeline.

METHODS & ATTRIBUTES

new

Create a new ETL pipeline. The constructor accepts these values...

chain

This optional attribute copies "work_in", "data_in", and "session" from another object. chain accepts an ETL::Pipeline object. The constructor copies "work_in", "data_in", and "session" from that object. It helps scripts process multiple files from the same place.

See the section "Multiple input sources" for an example.

constants

Assigns constant values to output fields. Since mapping accepts input field names, constants assigns literal strings or numbers to fields. The constructor calls the "constants" method. Assign a hash reference to this attribute.

  constants => {Type => 1, Information => 'Demographic'},
input

Setup the ETL::Pipeline::Input object for retrieving the raw data. The constructor calls the "input" method. Assign an array reference to this attribute. The array is passed directly to "input" as parameters.

  input => ['Excel', find => qr/\.xlsx?$/],
output

Setup the ETL::Pipeline::Output object for retrieving the raw data. The constructor calls the "output" method. Assign an array reference to this attribute. The array is passed directly to "output" as parameters.

  output => ['SQL', table => 'NewData'],
mapping

Move data from the input to the output. This attribute maps the input to the output. The constructor calls the "mapping" method. Assign a hash reference to the attribute.

  mapping => {Name => 'A', Address => 'B', ID => 'C'},
work_in

Sets the working directory. All files - input, output, or temporary - reside in this directory. The constructor accepts the same value as the parameters to the "work_in" method. As a matter of fact, the constructor just calls the "work_in" method.

When creating the pipeline, ETL::Pipeline sets up arguments in this order...

1. work_in
2. data_in
3. input
4. constants
5. mapping
6. output

Later parts (e.g. output) can depend on earlier parts (e.g. input). For example, the input will use data_in in its constructor.

chain

This method creates a new pipeline using the same "work_in" and "data_in" directories. It accepts the same arguments as "new". Use chain when linking multiple pipelines together. See the section "Multiple input sources" for more details.

Reading the input

input

input sets and returns the ETL::Pipeline::Input object. The pipeline uses this object for reading the input records.

With no parameters, input returns the current ETL::Pipeline::Input object.

You tie in a new input source by calling input with parameters...

  $pipeline->input( 'Excel', find => qr/\.xlsx/i );

The first parameter is a class name. input looks for a Perl module matching this name in the ETL::Pipeline::Input namespace. In this example, the actual class name becomes ETL::Pipeline::Input::Excel.

The rest of the parameters are passed directly to the new method of that class.

Technical Note: Want to use a custom class from Local instead of ETL::Pipeline::Input? Put a + (plus sign) in front of the class name. For example, this command uses the input class Local::CustomExtract.

  $pipeline->input( '+Local::CustomExtract' );

get

The get method returns the value of a single field from the input. It maps directly to the get method from ETL::Pipeline::Input. See "get" in ETL::Pipeline::Input for more information.

  $pipeline->get( 'A' );
  # -or-
  $pipeline->mapping( Name => sub { lc $_->get( 'A' ) } );

When you use a code reference, ETL::Pipeline passes itself in $_. get provides a convenient shortcut. Instead of writing $_->input->get, you can write $_->get.

record_number

The record_number method returns current record number. It maps directly to the record_number method from ETL::Pipeline::Input. See "record_number" in ETL::Pipeline::Input for more information.

  $pipeline->record_number;
  # -or-
  $pipeline->mapping( Row => sub { $_->record_number } );

Translating the data

mapping

mapping ties the input fields with the output fields. If you call mapping with no parameters, it returns the hash reference. Call mapping with a hash or hash reference and it replaces the entire mapping with the new one.

Hash keys are output field names. The "output" class defines acceptable field names. The hash values can be...

A string
A regular expression reference (with qr/.../)
A code reference

Strings and regular expressions are passed to "get" in ETL::Pipeline::Input. They must refer to an input field.

A code reference is executed in a scalar context. It's return value goes into the output field. The subroutine receives this ETL::Pipeline object as its first parameter and in the $_ variable.

  # Get the current mapping...
  my $transformation = $pipeline->mapping;

  # Set the output field "Name" to the input column "A"...
  $pipeline->mapping( Name => 'A' );

  # Set "Name" from "Full Name" or "FullName"...
  $pipeline->mapping( Name => qr/Full\s*Name/i );

  # Use the lower case of input column "A"...
  $pipeline->mapping( Name => sub { lc $_->get( 'A' ) } );

Want to save a literal value? Use "constants" instead.

add_mapping

add_mapping adds new fields to the current mapping. "mapping" replaces the entire mapping. add_mapping modifies it, leaving all of your old transformations in place.

add_mapping accepts key/value pairs as parameters.

  $pipeline->add_mapping( Address => 'B' );

constants

constants sets output fields to literal values. "mapping" accepts input field names as strings. Instead of obtuse Perl tricks for marking literals, constants explicitly handles them.

If you call constants with no parameters, it returns the hash reference. Call constants with a hash or hash reference and it replaces the entire hash with the new one.

Hash keys are output field names. The "output" class defines acceptable field names. The hash values are literals.

  # Get the current mapping...
  my $transformation = $pipeline->constants;

  # Set the output field "Name" to the string "John Doe"...
  $pipeline->constants( Name => 'John Doe' );

add_constant

add_constants

add_constant adds new fields to the current hash of literal values. "constants" replaces the entire hash. add_constant and add_constants modify the hash, leaving all of your old literals in place.

add_constant accepts key/value pairs as parameters.

  $pipeline->add_constant( Address => 'B' );

Saving the output

output

output sets and returns the ETL::Pipeline::Output object. The pipeline uses this object for creating output records.

With no parameters, output returns the current ETL::Pipeline::Output object.

You tie in a new output destination by calling output with parameters...

  $pipeline->output( 'SQL', table => 'NewData' );

The first parameter is a class name. output looks for a Perl module matching this name in the ETL::Pipeline::Output namespace. In this example, the actual class name becomes ETL::Pipeline::Output::SQL.

The rest of the parameters are passed directly to the new method of that class.

Technical Note: Want to use a custom class from Local instead of ETL::Pipeline::Output? Put a + (plus sign) in front of the class name. For example, this command uses the input class Local::CustomLoad.

  $pipeline->output( '+Local::CustomLoad' );

set

set assigns a value to an output field. The ETL::Pipeline::Output class defines the valid field names.

set accepts two parameters...

field
value

set places value into the output field.

write_record

write_record outputs the current record. It is normally called by "process". The pipeline makes it available in case you need to do something special. write_record takes no parameters.

The rest of the pipeline

process

process kicks off the entire data conversion process. It takes no parameters. All of the setup is done by the other methods.

process returns the ETL::Pipeline object so you can do things like this...

  ETL::Pipeline->new( {...} )->process->chain( ... )->process;

work_in

The working directory sets the default place for finding files. All searches start here and only descend subdirectories. Temporary or output files go into this directory as well.

work_in has two forms: work_in( 'C:\Data' ); or work_in( search => 'C:\Data', matching => 'Ficticious' );.

The first form specifies the exact directory path. In our example, the working directory is C:\Data.

The second form searches the file system for a matching directory. Take this example...

  $etl->work_in( search => 'C:\Data', matching => 'Ficticious' );

It scans the C:\Data directory for a subdirectory named Fictious, like this: C:\Data\Ficticious. The search is not recursive. It locates files in the search folder.

Search inside this directory for a matching subdirectory. The search is not recursive.

matching

Look for a subdirectory that matches this name. Wildcards and regular expressions are supported. Searches are case insensitive.

work_in automatically resets "data_in".

data_in

The working directory ("work_in") usually contains the raw data files. In some cases, though, the actual data sits in a subdirectory underneath "work_in". data_in tells the pipeline where to find the input file.

data_in accepts a search pattern - name, glob, or regular expression. It searches "work_in" for the first matching directory. The search is case insensitive.

If you pass an empty string to data_in, the pipeline resets data_in to the "work_in" directory. This is useful when chaining pipelines. If one changes the data directory, the next in line can change back.

session

ETL::Pipeline supports sessions. A session allows input and output objects to share information along a chain. For example, imagine 3 Excel files being loaded into an Access database. All 3 files go into the same Access database. The first pipeline creates the database and saves its path in the session. That pipeline chains with a second pipeline. The second pipeline retrieves the Access filename from the session.

The session method provides access to session level variables. As you write your own ETL::Pipeline::Output classes, they can use session variables for sharing information.

The first parameter is the variable name. If you pass only the variable name, session returns the value.

  my $database = $etl->session( 'access_file' );
  my $identifier = $etl->session( 'session_identifier' );

A second parameter is the value.

  $etl->session( access_file => 'C:\ExcelData.accdb' );

You can set multiple variables in one call.

  $etl->session( access_file => 'C:\ExcelData.accdb', name => 'Abe' );

When retrieving an array or hash reference, session automatically derefernces it if called in a list context. In a scalar context, session returns the reference.

  # Returns the list of names as a list.
  foreach my $name ($etl->session( 'name_list' )) { ... }
  
  # Returns a list reference instead of a list.
  my $reference = $etl->session( 'name_list' );

session_has

session_has checks for a specific session variable. It returns true if the variable exists and false if it doesn't.

session_has only checks existence. It does not tell you if the value is defined.

  if ($etl->session_has( 'access_file' )) { ... }

Other methods & attributes

is_valid

This method returns true or false. True means that the pipeline is ready to go. False, of course, means that there's a problem. In a list context, is_invalid returns the false value and an error message. On success, the error message is undef.

progress

This method displays the current upload progress. It is called automatically by "process".

progress takes one parameter - a status...

start

The ETL process is just beginning. progress displays the input file name, if "input" supports the ETL::Pipeline::Input::File role. Otherwise, progress displays the ETL::Pipeline::Input class name.

end

The ETL process is complete.

(blank)

progress displays a count every 50 records, so you know that it's working.

execute_code_ref

This method runs arbitrary Perl code. ETL::Pipeline itself, input sources, and output destinations call this method.

The first parameter is the code reference. Any additional parameters are passed directly to the code reference.

The code reference receives the ETL::Pipeline object as its first parameter, plus any additional parameters. execute_code_ref also puts the ETL::Pipeline object into $_;

For overriding in a subclass

"work_in" searches inside this directory if you do not specify a search parameter. It defaults to the current directory. Override this in the subclass with the correct default for your environment.

_object_of_class

This private method creates the ETL::Pipeline::Input and ETL::Pipeline::Output objects. It allows me to centralize the error handling. The program dies if there's an error. It means that something is wrong with the corresponding class. And I don't want to hide those errors. You can only fix errors if you know about them.

Override or modify this method if you want to perform extra checks.

The first parameter is a string with either Input or Output. _object_of_class appends this value onto ETL::Pipeline. For example, 'Input' becomes ETL::Pipeline::Input.

The rest of the parameters are passed directly into the constructor for the class _object_of_class instantiates.

ADVANCED TOPICS

Multiple input sources

It is not uncommon to receive your data spread across more than one file. How do you guarantee that each pipeline pulls files from the same working directory ("work_in")? You "chain" the pipelines together.

The "chain" method works like this...

  ETL::Pipeline->new( {
    work_in   => {search => 'C:\Data', find => qr/Ficticious/},
    input     => ['Excel', find => 'main.xlsx'               ],
    mapping   => {Name => 'A', Address => 'B', ID => 'C'     },
    constants => {Type => 1, Information => 'Demographic'    },
    output    => ['SQL', table => 'NewData'                  ],
  } )->process->chain( {
    input     => ['Excel', find => 'notes.xlsx'         ],
    mapping   => {User => 'A', Text => 'B', Date => 'C' },
    constants => {Type => 2, Information => 'Note'      },
    output    => ['SQL', table => 'NewData'             ],
  } )->process;

When the first pipeline finishes, it creates a new object with the same "work_in". The code then calls "process" on the new object. You can also use the chain constructor argument...

  my $pipeline1 = ETL::Pipeline->new( {
    work_in   => {search => 'C:\Data', find => qr/Ficticious/},
    input     => ['Excel', find => 'main.xlsx'               ],
    mapping   => {Name => 'A', Address => 'B', ID => 'C'     },
    constants => {Type => 1, Information => 'Demographic'    },
    output    => ['SQL', table => 'NewData'                  ],
  } )->process;
  my $pipeline2 = ETL::Pipeline->new( {
    input     => ['Excel', find => 'notes.xlsx'         ],
    chain     => $pipeline1,
    mapping   => {User => 'A', Text => 'B', Date => 'C' },
    constants => {Type => 2, Information => 'Note'      },
    output    => ['SQL', table => 'NewData'             ],
  } )->process;

In both of these styles, the second pipeline copies "work_in" from the first pipeline. There is no difference between the "chain" method or chain constructor argument. Pick the one that best suits your programming style.

Writing an input source

ETL::Pipeline provides some basic, generic input sources. Invariable, you will come across data that doesn't fit one of these. No problem. ETL::Pipeline lets you create your own input sources.

An input source is a Moose class that implements the ETL::Pipeline::Input role. The role requires that you define certain methods. ETL::Pipeline makes use of those methods. Name your class ETL::Pipeline::Input::* and the "input" method can find it automatically.

See ETL::Pipeline::Input for more details.

Writing an output destination

ETL::Pipeline does not have any default output destinations. Output destinations are customized. You have something you want done with the data. And that something intimately ties into your specific business. You will have to write at least one output destination to do anything useful.

An output destination is a Moose class that implements the ETL::Pipeline::Output role. The role defines required methods and a simple hash for storing the new record in memory. ETL::Pipeline makes use of the methods. Name your class ETL::Pipeline::Output::* and the "output" method can find it automatically.

See ETL::Pipeline::Output for more details.

Why are the inputs and outputs separate?

Wouldn't it make sense to have an input source for Excel and an output destination for Excel?

Input sources are generic. It takes the same code to read from one Excel file as another. Output destinations, on the other hand, are customized for your business - with data validation and business logic.

ETL::Pipeline assumes that you have multiple input sources. Different feeds use different formats. But output destinations will be much fewer. You're writing data into a centralized place.

For these reasons, it makes sense to keep the input sources and output destinations separate. You can easily add more inputs without affecting the outputs.

SEE ALSO

ETL::Pipeline::Input, ETL::Pipeline::Output, ETL::Pipeline::Mapping

Input Source Formats

ETL::Pipeline::Input::Excel, ETL::Pipeline::Input::DelimitedText

REPOSITORY

https://github.com/rbwohlfarth/ETL-Pipeline

AUTHOR

Robert Wohlfarth <robert.j.wohlfarth@vanderbilt.edu>

COPYRIGHT AND LICENSE

Copyright (c) 2016 Robert Wohlfarth

This module is free software; you can redistribute it and/or modify it under the same terms as Perl 5.10.0. For more details, see the full text of the licenses in the directory LICENSES.

This program is distributed in the hope that it will be useful, but without any warranty; without even the implied