Andrew Grangaard
and 1 contributors


Hadoop::Streaming - Contains Mapper, Combiner and Reducer roles to simplify writing Hadoop Streaming jobs


version 0.102520


    package My::Hadoop::Example;
    use Moose::Role;

    sub map 
        my ($self, $line) = @_;
        $self->emit( $key => $value);
    sub reduce 
        my ( $self, $key, $value_iterator) = @_;
        while( $value_iterator->has_next() ) { ... }
        $self->emit( $key, $composite_value );
    sub combine 
        my ( $self, $key, $value_iterator) = @_;
        while( $value_iterator->has_next() ) { ... }
        $self->emit( $key, $composite_value );

    package My::Hadoop::Example::Mapper;
    use Moose;
    with Hadoop::Streaming::Mapper,   My::Hadoop::Example;

    package My::Hadoop::Example::Combiner;
    use Moose;
    with Hadoop::Streaming::Combiner, My::Hadoop::Example;

    package My::Hadoop::Example::Reducer;
    use Moose;
    with Hadoop::Streaming::Reducer,  My::Hadoop::Example;


mapper executable:

  use My::Hadoop::Example;

combiner executable:

  use My::Hadoop::Example;

reducer executable:

  use My::Hadoop::Example;


Hadoop::Streaming::* provides a simple perl interface to the Streaming interface of Hadoop.

Hadoop is a system "reliable, scalable, distributed computing." Hadoop was developed at Yahoo! and is now maintained by the Apache Software Foundation.

Hadoop provides a distributed map/reduce framework. Mappers take lines of unstructured file data and produce key/value pairs. These key/value pairs are merged and sorted by key and provided to Reducers. Reducers take key/value pairs and produce higher order data. This works for data that where output key/value pairs can be determined from a single line of data in isolation. The Reducer is provided sho

Hadoop's Streaming Interface

The Streaming interface provides a simple API for writing Hadoop jobs in any language. Jobs are provided input on STDIN and output is expected on STDOUT. Key value pairs are separated by a TAB character.

Streaming map jobs are provided an input of lines instead of key-value pairs. See Hadoop::Streaming::Mapper INTERFACE DETAILS for an explanation.

Reduce jobs are provided a stream of key\tvalue lines. multivalued keys appear on an input line once for each key\value. The stream is guaranteed to be sorted by key. The reduce job must track the key/value pairs and manually detect a key change.

Hadoop::Streaming::Mapper interface

Hadoop::Mapper consumes and chomps lines from STDIN and calls map($line) once per line. This is initiated by the run() method.

example mapper input: line1 line2 line3

Hadoop::Mapper transforms this into 3 calls to map() map(line1) map(line2) map(line3)

Hadoop::Streaming::Reducer interface

Hadoop::Reducer abstracts this stream into an interface of (key, value-iterator). reduce() is called once per key, instead of once per line. The reduce job pulls values from the iterator and outputs key/value pairs to STDOUT. emit() is provided as a convenience for outputing key/value pairs.

example reducer input: key1 value1 key2 valuea key2 valuec key2 valueb key3 valuefoo key3 valuebar

Hadoop::Streaming::Reduce transforms this input into three calls to reduce(): reduce( key, iterator_over(qw(value1)) ); reduce( key2, iterator_over(qw(valuea valuec valueb)) ); reduce( key3, iterator_over(qw(valuefoo valuebarr)) );

Hadoop::Streaming::Combiner interface

The Hadoop::Streaming::Combiner interface is analagous to the Hadoop::Streaming::Reducer interface. combine() is called instead of reduce() for each key. The above example would produce three calls to combine(): combine( key, iterator_over(qw(value1)) ); combine( key2, iterator_over(qw(valuea valuec valueb)) ); combine( key3, iterator_over(qw(valuefoo valuebarr)) );


Map/Reduce at wikipedia


Hadoop Streaming interface:



run locally without hadoop

To test locally the examples from the SYNOPSIS() section, we must provide the sort function provided by hadoop. For small test_input_file examples this can be done in one large pipe:

  my_mapper < test_input_file | sort | my_combiner | my_reducer

For larger files, make intermediary output files. The output of the intermediate files can be verified and used to demonstrate the efficiency of the (optional) combiner.

  my_mapper < test_input_file >        && \
  sort > output.mapsort                && \
  my_combiner < output.mapsort > output.combine   && \
  my_reducer < output.combine > output.reduce 
hadoop commandline

Run this in hadoop from the shell:

  hadoop                                     \
      jar $streaming_jar_name                \
      -D"my hadoop example" \
      -input    my_input_file                \
      -output   my_output_hdfs_path          \
      -mapper   my_mapper                    \
      -combiner my_combiner                  \
      -reducer  my_reducer

$streaming_jar_name is the full path to the streaming jar provided by the installed hadoop. For my 0.20 install the path is:


The -D line is optional. If included, -D lines must come directly after the jar name and before other options.

For this hadoop job to work, the mapper, combiner and reducer must be full paths that are valid on each box in the hadoop cluster. There are a few ways to make this work.

hadoop job -files option

Additional files may be bundled into the hadoop jar via the '-files' option to hadoop jar. These files will be included in the jar that is distributed to each host. The files will be visible in the current directory of the process. Subdirectories will not be created.

example: hadoop \ jar $streaming_jar_name \ -D"my hadoop example" \ -input my_input_file \ -output my_output_hdfs_path \ -mapper my_mapper \ -combiner my_combiner \ -reducer my_reducer \ -file /path/to/my_mapper \ -file /path/to/my_combiner \ -file /path/to/my_reducer

using perl modules

All perl modules must be installed on each hadoop cluster machine. This proves to be a challenge for large installations. I have a local::lib controlled perl directory that I push out to a fixed location on all of my hadoop boxes (/apps/perl5) that is kept up-to-date and included in my system image. Previously I was producing stand-alone perl files with PAR::Packer (pp), which worked quite well except for the size of the jar with the -file option. The standalone files can be put into hdfs and then included with the jar via the -cacheFile option. An option I have not investigated is using -cacheJar to push a jar of my necessary perl files along with the job.


* install all modules into a local::lib controlled directory, push this directory to all of the hadoop cluster boxes (rsync, app installer, nfs mount ), explicitly include this directory in a use lib or use local::lib line in your mapper/reducer/combiner files.

  use strict; use warnings;
  use lib qw(/apps/perl5);
  use My::Example::Job;

* The mapper/reducer/combiner files can be included with the job via -file options to hadoop jar or they can be referenced directly if they are in the shared environment.

full path of shared file
  hadoop                                     \
      jar $streaming_jar_name                \
      -input    my_input_file                \
      -output   my_output_hdfs_path          \
      -mapper   /apps/perl5/bin/my_mapper    \
      -combiner /apps/perl5/bin/my_combiner  \
      -reducer  /apps/perl5/bin/my_reducer
local path of included -file file
  hadoop                                     \
      jar $streaming_jar_name                \
      -input    my_input_file                \
      -output   my_output_hdfs_path          \
      -file     /apps/perl5/bin/my_mapper    \
      -file     /apps/perl5/bin/my_combiner  \
      -file     /apps/perl5/bin/my_reducer   \ 
      -mapper   ./my_mapper                  \
      -combiner ./my_combiner                \
      -reducer  ./my_reducer
PAR::Packer / pp

Use pp (installed via PAR::Packer) to produce a perl file that needs only a perl interpreter to execute. I use -x option to run the my_mapper script on blank input, as this forces all of the necessary modules to be loaded and thus tracked in my PAR file.

  mkdir packed
  pp my_mapper -B -P -Ilib -o packed/my_mapper -x my_mapper < /dev/null
  hadoop                                     \
      jar $streaming_jar_name                \
      -input    my_input_file                \
      -output   my_output_hdfs_path          \
      -file     packed/my_mapper             \
      -mapper   ./my_mapper

To simplify this process and reduce errors, I use make to produce the packed binaries. Indented lines after "name :" lines are indented with a literal tab, as per Makefile requirements.

    #Makefile for PAR packed apps
    PERLTOPACK     =                              \                      \                     \                  \                \
    PERL_LIBRARIES =                              \
        lib/                          \
        lib/RegionDMA/                   \
    PACKEDTARGETS  = $(patsubst %,packed/%,$(PERLTOPACK))
    PACK           = packed $(PACKEDTARGETS)
    PACK : $(PACK) 
        echo "pack: $(PACKTARGETS)"
        mkdir packed
    # perl files to compile via pp
    $(patsubst %,packed/%,$(PERLTOPACK)) : packed/ : bin/ $(PERL_LIBRARIES)
        time pp $< -B -P -I lib/ -o $@ -x $< < /dev/null


  • andrew grangaard <>

  • Naoya Ito <>


This software is copyright (c) 2010 by Naoya Ito <>.

This is free software; you can redistribute it and/or modify it under the same terms as the Perl 5 programming language system itself.