ETL::Pipeline - Extract-Transform-Load pattern for data file conversions
use ETL::Pipeline; # The object oriented interface... ETL::Pipeline->new( { work_in => {search => 'C:\Data', find => qr/Ficticious/}, input => ['Excel', find => qr/\.xlsx?$/ ], mapping => {Name => 'A', Address => 'B', ID => 'C' }, constants => {Type => 1, Information => 'Demographic' }, output => ['SQL', table => 'NewData' ], } )->process; # Or using method calls... my $pipeline = ETL::Pipeline->new; $pipeline->work_in ( search => 'C:\Data', find => qr/Ficticious/ ); $pipeline->input ( 'Excel', find => qr/\.xlsx?$/i ); $pipeline->mapping ( Name => 'A', Address => 'B', ID => 'C' ); $pipeline->constants( Type => 1, Information => 'Demographic' ); $pipeline->output ( 'SQL', table => 'NewData' ); $pipeline->process;
ETL stands for Extract-Transform-Load. ETL isn't just for Data Warehousing. ETL works on almost any type of data conversion. You read the source, translate the data for your target, and store the result.
By dividing a conversion into 3 steps, we isolate the input from the output...
Centralizes data formatting and validation.
Makes new input formats a breeze.
Makes new outputs just as easy.
ETL::Pipeline takes your data files from extract to load. It reads an input source, translates the data, and writes it to an output destination. For example, I use the these pipelines for reading an Excel spread sheet (input) and saving the information in an SQL database (output).
use ETL::Pipeline; ETL::Pipeline->new( { work_in => {search => 'C:\Data', find => qr/Ficticious/}, input => ['Excel', find => qr/\.xlsx?$/], mapping => {Name => 'A', Complaint => 'B', ID => 'C'}, constants => {Client => 1, Type => 'Complaint'} output => ['SQL', table => 'NewData'] } )->process;
Or like this, calling the methods instead of through the constructor...
use ETL::Pipeline; my $etl = ETL::Pipeline->new; $etl->work_in ( search => 'C:\Data', find => qr/Ficticious/ ); $etl->input ( 'Excel', find => qr/\.xlsx?$/ ); $etl->mapping ( Name => 'A', Complaint => 'B', ID => 'C' ); $etl->constants( Client => 1, Type => 'Complaint' ); $etl->output ( 'SQL', table => 'NewData' ); $etl->process;
The term pipeline describes a complete ETL process - extract, transform, and load. Or more accurately - input, mapping, output. Raw data enters one end of the pipe (input) and useful information comes out the other (output). An ETL::Pipeline object represents a complete pipeline.
Create a new ETL pipeline. The constructor accepts these values...
This optional attribute copies "work_in", "data_in", and "session" from another object. chain accepts an ETL::Pipeline object. The constructor copies "work_in", "data_in", and "session" from that object. It helps scripts process multiple files from the same place.
See the section "Multiple input sources" for an example.
Assigns constant values to output fields. Since mapping accepts input field names, constants assigns literal strings or numbers to fields. The constructor calls the "constants" method. Assign a hash reference to this attribute.
constants => {Type => 1, Information => 'Demographic'},
Setup the ETL::Pipeline::Input object for retrieving the raw data. The constructor calls the "input" method. Assign an array reference to this attribute. The array is passed directly to "input" as parameters.
input => ['Excel', find => qr/\.xlsx?$/],
Setup the ETL::Pipeline::Output object for retrieving the raw data. The constructor calls the "output" method. Assign an array reference to this attribute. The array is passed directly to "output" as parameters.
output => ['SQL', table => 'NewData'],
Move data from the input to the output. This attribute maps the input to the output. The constructor calls the "mapping" method. Assign a hash reference to the attribute.
mapping => {Name => 'A', Address => 'B', ID => 'C'},
Sets the working directory. All files - input, output, or temporary - reside in this directory. The constructor accepts the same value as the parameters to the "work_in" method. As a matter of fact, the constructor just calls the "work_in" method.
When creating the pipeline, ETL::Pipeline sets up arguments in this order...
Later parts (e.g. output) can depend on earlier parts (e.g. input). For example, the input will use data_in in its constructor.
This method creates a new pipeline using the same "work_in" and "data_in" directories. It accepts the same arguments as "new". Use chain when linking multiple pipelines together. See the section "Multiple input sources" for more details.
input sets and returns the ETL::Pipeline::Input object. The pipeline uses this object for reading the input records.
With no parameters, input returns the current ETL::Pipeline::Input object.
You tie in a new input source by calling input with parameters...
$pipeline->input( 'Excel', find => qr/\.xlsx/i );
The first parameter is a class name. input looks for a Perl module matching this name in the ETL::Pipeline::Input namespace. In this example, the actual class name becomes ETL::Pipeline::Input::Excel.
ETL::Pipeline::Input
ETL::Pipeline::Input::Excel
The rest of the parameters are passed directly to the new method of that class.
new
Technical Note: Want to use a custom class from Local instead of ETL::Pipeline::Input? Put a + (plus sign) in front of the class name. For example, this command uses the input class Local::CustomExtract.
$pipeline->input( '+Local::CustomExtract' );
The get method returns the value of a single field from the input. It maps directly to the get method from ETL::Pipeline::Input. See "get" in ETL::Pipeline::Input for more information.
$pipeline->get( 'A' ); # -or- $pipeline->mapping( Name => sub { lc $_->get( 'A' ) } );
When you use a code reference, ETL::Pipeline passes itself in $_. get provides a convenient shortcut. Instead of writing $_->input->get, you can write $_->get.
$_
$_->input->get
$_->get
The record_number method returns current record number. It maps directly to the record_number method from ETL::Pipeline::Input. See "record_number" in ETL::Pipeline::Input for more information.
$pipeline->record_number; # -or- $pipeline->mapping( Row => sub { $_->record_number } );
mapping ties the input fields with the output fields. If you call mapping with no parameters, it returns the hash reference. Call mapping with a hash or hash reference and it replaces the entire mapping with the new one.
Hash keys are output field names. The "output" class defines acceptable field names. The hash values can be...
qr/.../
Strings and regular expressions are passed to "get" in ETL::Pipeline::Input. They must refer to an input field.
A code reference is executed in a scalar context. It's return value goes into the output field. The subroutine receives this ETL::Pipeline object as its first parameter and in the $_ variable.
# Get the current mapping... my $transformation = $pipeline->mapping; # Set the output field "Name" to the input column "A"... $pipeline->mapping( Name => 'A' ); # Set "Name" from "Full Name" or "FullName"... $pipeline->mapping( Name => qr/Full\s*Name/i ); # Use the lower case of input column "A"... $pipeline->mapping( Name => sub { lc $_->get( 'A' ) } );
Want to save a literal value? Use "constants" instead.
add_mapping adds new fields to the current mapping. "mapping" replaces the entire mapping. add_mapping modifies it, leaving all of your old transformations in place.
add_mapping accepts key/value pairs as parameters.
$pipeline->add_mapping( Address => 'B' );
constants sets output fields to literal values. "mapping" accepts input field names as strings. Instead of obtuse Perl tricks for marking literals, constants explicitly handles them.
If you call constants with no parameters, it returns the hash reference. Call constants with a hash or hash reference and it replaces the entire hash with the new one.
Hash keys are output field names. The "output" class defines acceptable field names. The hash values are literals.
# Get the current mapping... my $transformation = $pipeline->constants; # Set the output field "Name" to the string "John Doe"... $pipeline->constants( Name => 'John Doe' );
add_constant adds new fields to the current hash of literal values. "constants" replaces the entire hash. add_constant and add_constants modify the hash, leaving all of your old literals in place.
add_constant accepts key/value pairs as parameters.
$pipeline->add_constant( Address => 'B' );
output sets and returns the ETL::Pipeline::Output object. The pipeline uses this object for creating output records.
With no parameters, output returns the current ETL::Pipeline::Output object.
You tie in a new output destination by calling output with parameters...
$pipeline->output( 'SQL', table => 'NewData' );
The first parameter is a class name. output looks for a Perl module matching this name in the ETL::Pipeline::Output namespace. In this example, the actual class name becomes ETL::Pipeline::Output::SQL.
ETL::Pipeline::Output
ETL::Pipeline::Output::SQL
Technical Note: Want to use a custom class from Local instead of ETL::Pipeline::Output? Put a + (plus sign) in front of the class name. For example, this command uses the input class Local::CustomLoad.
$pipeline->output( '+Local::CustomLoad' );
set assigns a value to an output field. The ETL::Pipeline::Output class defines the valid field names.
set accepts two parameters...
set places value into the output field.
write_record outputs the current record. It is normally called by "process". The pipeline makes it available in case you need to do something special. write_record takes no parameters.
process kicks off the entire data conversion process. It takes no parameters. All of the setup is done by the other methods.
process returns the ETL::Pipeline object so you can do things like this...
ETL::Pipeline->new( {...} )->process->chain( ... )->process;
The working directory sets the default place for finding files. All searches start here and only descend subdirectories. Temporary or output files go into this directory as well.
work_in has two forms: work_in( 'C:\Data' ); or work_in( search => 'C:\Data', matching => 'Ficticious' );.
work_in( 'C:\Data' );
work_in( search => 'C:\Data', matching => 'Ficticious' );
The first form specifies the exact directory path. In our example, the working directory is C:\Data.
The second form searches the file system for a matching directory. Take this example...
$etl->work_in( search => 'C:\Data', matching => 'Ficticious' );
It scans the C:\Data directory for a subdirectory named Fictious, like this: C:\Data\Ficticious. The search is not recursive. It locates files in the search folder.
Search inside this directory for a matching subdirectory. The search is not recursive.
Look for a subdirectory that matches this name. Wildcards and regular expressions are supported. Searches are case insensitive.
work_in automatically resets "data_in".
The working directory ("work_in") usually contains the raw data files. In some cases, though, the actual data sits in a subdirectory underneath "work_in". data_in tells the pipeline where to find the input file.
data_in accepts a search pattern - name, glob, or regular expression. It searches "work_in" for the first matching directory. The search is case insensitive.
If you pass an empty string to data_in, the pipeline resets data_in to the "work_in" directory. This is useful when chaining pipelines. If one changes the data directory, the next in line can change back.
ETL::Pipeline supports sessions. A session allows input and output objects to share information along a chain. For example, imagine 3 Excel files being loaded into an Access database. All 3 files go into the same Access database. The first pipeline creates the database and saves its path in the session. That pipeline chains with a second pipeline. The second pipeline retrieves the Access filename from the session.
The session method provides access to session level variables. As you write your own ETL::Pipeline::Output classes, they can use session variables for sharing information.
The first parameter is the variable name. If you pass only the variable name, session returns the value.
my $database = $etl->session( 'access_file' ); my $identifier = $etl->session( 'session_identifier' );
A second parameter is the value.
$etl->session( access_file => 'C:\ExcelData.accdb' );
You can set multiple variables in one call.
$etl->session( access_file => 'C:\ExcelData.accdb', name => 'Abe' );
When retrieving an array or hash reference, session automatically derefernces it if called in a list context. In a scalar context, session returns the reference.
# Returns the list of names as a list. foreach my $name ($etl->session( 'name_list' )) { ... } # Returns a list reference instead of a list. my $reference = $etl->session( 'name_list' );
session_has checks for a specific session variable. It returns true if the variable exists and false if it doesn't.
session_has only checks existence. It does not tell you if the value is defined.
if ($etl->session_has( 'access_file' )) { ... }
This method returns true or false. True means that the pipeline is ready to go. False, of course, means that there's a problem. In a list context, is_invalid returns the false value and an error message. On success, the error message is undef.
undef
This method displays the current upload progress. It is called automatically by "process".
progress takes one parameter - a status...
The ETL process is just beginning. progress displays the input file name, if "input" supports the ETL::Pipeline::Input::File role. Otherwise, progress displays the ETL::Pipeline::Input class name.
The ETL process is complete.
progress displays a count every 50 records, so you know that it's working.
This method runs arbitrary Perl code. ETL::Pipeline itself, input sources, and output destinations call this method.
The first parameter is the code reference. Any additional parameters are passed directly to the code reference.
The code reference receives the ETL::Pipeline object as its first parameter, plus any additional parameters. execute_code_ref also puts the ETL::Pipeline object into $_;
"work_in" searches inside this directory if you do not specify a search parameter. It defaults to the current directory. Override this in the subclass with the correct default for your environment.
This private method creates the ETL::Pipeline::Input and ETL::Pipeline::Output objects. It allows me to centralize the error handling. The program dies if there's an error. It means that something is wrong with the corresponding class. And I don't want to hide those errors. You can only fix errors if you know about them.
Override or modify this method if you want to perform extra checks.
The first parameter is a string with either Input or Output. _object_of_class appends this value onto ETL::Pipeline. For example, 'Input' becomes ETL::Pipeline::Input.
ETL::Pipeline
The rest of the parameters are passed directly into the constructor for the class _object_of_class instantiates.
It is not uncommon to receive your data spread across more than one file. How do you guarantee that each pipeline pulls files from the same working directory ("work_in")? You "chain" the pipelines together.
The "chain" method works like this...
ETL::Pipeline->new( { work_in => {search => 'C:\Data', find => qr/Ficticious/}, input => ['Excel', find => 'main.xlsx' ], mapping => {Name => 'A', Address => 'B', ID => 'C' }, constants => {Type => 1, Information => 'Demographic' }, output => ['SQL', table => 'NewData' ], } )->process->chain( { input => ['Excel', find => 'notes.xlsx' ], mapping => {User => 'A', Text => 'B', Date => 'C' }, constants => {Type => 2, Information => 'Note' }, output => ['SQL', table => 'NewData' ], } )->process;
When the first pipeline finishes, it creates a new object with the same "work_in". The code then calls "process" on the new object. You can also use the chain constructor argument...
my $pipeline1 = ETL::Pipeline->new( { work_in => {search => 'C:\Data', find => qr/Ficticious/}, input => ['Excel', find => 'main.xlsx' ], mapping => {Name => 'A', Address => 'B', ID => 'C' }, constants => {Type => 1, Information => 'Demographic' }, output => ['SQL', table => 'NewData' ], } )->process; my $pipeline2 = ETL::Pipeline->new( { input => ['Excel', find => 'notes.xlsx' ], chain => $pipeline1, mapping => {User => 'A', Text => 'B', Date => 'C' }, constants => {Type => 2, Information => 'Note' }, output => ['SQL', table => 'NewData' ], } )->process;
In both of these styles, the second pipeline copies "work_in" from the first pipeline. There is no difference between the "chain" method or chain constructor argument. Pick the one that best suits your programming style.
ETL::Pipeline provides some basic, generic input sources. Invariable, you will come across data that doesn't fit one of these. No problem. ETL::Pipeline lets you create your own input sources.
An input source is a Moose class that implements the ETL::Pipeline::Input role. The role requires that you define certain methods. ETL::Pipeline makes use of those methods. Name your class ETL::Pipeline::Input::* and the "input" method can find it automatically.
See ETL::Pipeline::Input for more details.
ETL::Pipeline does not have any default output destinations. Output destinations are customized. You have something you want done with the data. And that something intimately ties into your specific business. You will have to write at least one output destination to do anything useful.
An output destination is a Moose class that implements the ETL::Pipeline::Output role. The role defines required methods and a simple hash for storing the new record in memory. ETL::Pipeline makes use of the methods. Name your class ETL::Pipeline::Output::* and the "output" method can find it automatically.
See ETL::Pipeline::Output for more details.
Wouldn't it make sense to have an input source for Excel and an output destination for Excel?
Input sources are generic. It takes the same code to read from one Excel file as another. Output destinations, on the other hand, are customized for your business - with data validation and business logic.
ETL::Pipeline assumes that you have multiple input sources. Different feeds use different formats. But output destinations will be much fewer. You're writing data into a centralized place.
For these reasons, it makes sense to keep the input sources and output destinations separate. You can easily add more inputs without affecting the outputs.
ETL::Pipeline::Input, ETL::Pipeline::Output, ETL::Pipeline::Mapping
ETL::Pipeline::Input::Excel, ETL::Pipeline::Input::DelimitedText
https://github.com/rbwohlfarth/ETL-Pipeline
Robert Wohlfarth <robert.j.wohlfarth@vanderbilt.edu>
Copyright (c) 2016 Robert Wohlfarth
This module is free software; you can redistribute it and/or modify it under the same terms as Perl 5.10.0. For more details, see the full text of the licenses in the directory LICENSES.
This program is distributed in the hope that it will be useful, but without any warranty; without even the implied
To install ETL::Pipeline, copy and paste the appropriate command in to your terminal.
cpanm
cpanm ETL::Pipeline
CPAN shell
perl -MCPAN -e shell install ETL::Pipeline
For more information on module installation, please visit the detailed CPAN module installation guide.