The Perl Toolchain Summit needs more sponsors. If your company depends on Perl, please support this very important event.

NAME

TaskPipe::Manual::Overview - An overview of the TaskPipe framework

WHAT IS TASKPIPE?

TaskPipe is a framework which may be used to build web scrapers and crawlers. A longer term goal of the project is to become the data-gathering component of tool(s) performing scientific studies of societal trends through analysis of the web.

WHY TASKPIPE?

TaskPipe resulted from the observation that systems which extract data from the web often share certain characteristics, particularly in terms of workflow. At the same time web data-extraction systems are typically built with static components which are only geared towards solving a specific problem. When a new scraper or crawler is required, coding starts again from scratch.

TaskPipe is an attempt to approach the general problem of web-data extraction in a more modular way, where existing tasks may be quickly adapted and reused. Common operations, and in particular operations related to workflow are abstracted. The aim is to allow engineers to rapidly develop tailored web-extraction systems by providing only the information which is unique to the process in question.

WHAT IS TASKPIPE SUITABLE FOR?

In fact the core of TaskPipe is simply a task management system (hence the name) and could be used in applications other than web data gathering - however it is best adapted to certain kinds of process:

  • Multi-threaded processes where the threads do not rejoin. ie the parent kicks off children which perform task(s) and then terminate without ever returning to the caller.

  • Workflows where the execution time of individual tasks is relatively long (on the order of milliseconds up to seconds, or longer). TaskPipe caches intensively and is built with Moose and DBIx::Class, both of which involve a performance penalty.

For these reasons TaskPipe is probably best-suited to web operations where the download/store operations are independent and the request/response cycle typically dwarfs other processing times.

Features

In addition, the core of TaskPipe has been expanded with web data gathering in mind, so that it offers the following features:

  • Advanced caching to minimise the need to repeat requests, and allow the process to resume quickly in the case of premature termination

  • Thread management designed to ensure a consistent number of threads remain active regardless of the workflow. E.g. if a 10 thread limit is specified and 4 threads are required on a given task, then the remaining 6 threads automatically cascade to the next or adjacent tasks.

  • Choice of non-rendering or rendering useragent: Currently these are Perl's native LWP (non-rendering) or PhantomJS (rendering). Switching between useragent simply requires a one line change in the project config.

  • Choice of proxy net system: Currently these are TOR and Open Proxy. The TOR UserAgent Manager automatically launches TOR instances on separate ports for each designated thread. The Open Proxy system runs as a daemon gathering IPs from Open Proxy lists and testing them. Switching between proxy systems, or switching between proxying and non-proxying requires only a one line change in the project config.

  • A command line tool (TaskPipe Tool) allows quick deployment of files and database tables, simplifying the creation of fresh projects

Remember TaskPipe is a framework, and as such is designed to be extensible. Custom tasks, IP lists and even TaskPipe Tool commands can be added.

TaskPipe is also designed with a certain degree of platform independence in mind. That is, modules that purport to offer platform independence have been selected for all areas of functionality. For example any database which supports SQLSTATE should work (MySQL, PostGreSQL, and SQLite are 3 major database systems which should support SQLSTATE). However, see below: it's early days, and none of this has really been tested.

HOW READY IS TASKPIPE?

Quick answer: it's not. So you probably shouldn't get too excited. That is, right now you are looking at a bleeding edge alpha version which only has the barest skeleton test framework in place. You'll want commands that aren't there and the ones that are will probably grumble. Some specific issues:

  • Most notably the caching system needs commands. Right now it is tricky to leverage much of the caching system's potential. The caching system does several things automatically and it is believed those things work reasonably well. For example it avoids repeating executed tasks ('xtasks' in TaskPipe language) and it does not cache xtasks which error (which is correct behaviour).

    However, it is intended that targeted removal of cached items be possible, such that when a plan is changed, cached items that are no longer needed can easily be removed while keeping those that are still relevant. Some careful thought is going into this. However, in the meantime you have 2 options: clear the project cache tables completely or try to edit cache database tables manually (requires a genius level IQ).

    On the bright side, this is being worked on as a priority.

  • The number of commands TaskPipe Tool offers at the current stage is minimal. There are definitely a number of fairly obvious commands which would make project management easier. Again, these are being worked on.

  • More and better of pretty much everything is needed: more job manager code, better logging options and messages, more IP lists, better proxy management more individual TaskPipe components (tasks, sample plans, iterators etc)...

  • As mentioned, exhaustive testing has definitely not taken place. A test framework is on the drawing board, and no doubt will uncover some hidden gems. Testing on different platforms also needs to take place. As usual, use at your own risk.

  • There's a lot in TaskPipe and this early documentation is probably going to leave you scratching your head. Sorry about that. A tutorial is planned but won't happen overnight.

SHOW ME THE BASICS?

In TaskPipe you specify a plan, which is a yaml file:

    # plan.yml:
    
    ---
    -   _name: Scrape_Companies
        url: https://en.wikipedia.org/wiki/List_of_S%26P_500_companies
        headers:
            Referer: https://www.google.com

    -   _name: Scrape_Quote
        url: $this
        headers:
            Referer: https://en.wikipedia.org/wiki/List_of_S%26P_500_companies

    -   _name: Record
        table: Company
        values:
            quote: $this
            ticker: $this[1]
            url: $this[1]
            name: $this[1]
            sector: $this[1]
            industry: $this[1]
            address: $this[1]
            date_added: $this[1]
            cik: $this[1]

    

In this example our plan is to scrape the list of S&P 500 companies from wikipedia, and gather a stock quote for each company, then insert the completed company information (including the quote) into the database. Our plan has 3 tasks:

  1. Scrape the list from Wikipedia

  2. Scrape the Quote from the URL which was grabbed from each wikipedia list item

  3. Record the full record on the database

Task Definition

Here's our plan again:

    # plan.yml:
    
    ---
    -   _name: Scrape_Companies
        url: https://en.wikipedia.org/wiki/List_of_S%26P_500_companies
        headers:
            Referer: https://www.google.com

    -   _name: Scrape_Quote
        url: $this
        headers:
            Referer: https://en.wikipedia.org/wiki/List_of_S%26P_500_companies

    -   _name: Record
        table: Company
        values:
            quote: $this
            ticker: $this[1]
            url: $this[1]
            name: $this[1]
            sector: $this[1]
            industry: $this[1]
            address: $this[1]
            date_added: $this[1]
            cik: $this[1]
            

How do we define Scrape_Companies? Scrape_Companies is the name of a TaskPipe task. This means a module needs to be created with the name TaskPipe::Task_Scrape_Companies (ie the module name will be in the format TaskPipe::Task_(task name)). Here's our module:

    package TaskPipe::Task_Scrape_Companies;

    use Moose;
    use Web::Scraper;
    extends 'TaskPipe::Task_Scrape';

    has ws => (is => 'ro', isa => 'Web::Scraper', default => sub {
        scraper {
            process_first 'table.wikitable', 'table' => scraper {
                process 'tr + tr', 'tr[]' => scraper {
                    process_first 'td:nth-child(1) a', 'ticker' => 'TEXT';
                    process_first 'td:nth-child(1) a', 'url' => '@href';
                    process_first 'td:nth-child(2) a', 'name' => 'TEXT';
                    process_first 'td:nth-child(4)', 'sector' => 'TEXT';
                    process_first 'td:nth-child(5)', 'industry' => 'TEXT';
                    process_first 'td:nth-child(6)', 'address' => 'TEXT';
                    process_first 'td:nth-child(7)', 'date_added' => 'TEXT';
                    process_first 'td:nth-child(8)', 'cik' => 'TEXT';
                };
                result 'tr';
            };
            result 'table';
        };
    });

    1;

This just has a single ws attribute which is a Web::Scraper - and as such this is pretty much the simplest form a scraping task can take. The ws itself looks a bit more complex, but I will leave the explanation of how Web::Scraper works to the Web::Scraper documentation.

Task Output

A dump of the output from the task above might look something like

    [   {
            ticker => 'MMM',
            url => 'https://www.nyse.com/quote/XNYS:MMM',
            name => '3M Company',
            sector => 'Industrials',
            industry => 'Industrial Conglomerates',
            address => 'St. Paul, Minnesota',
            date => '',
            cik => '0000066740'

        },

        {
            ticker => 'ABT',
            url => 'https://www.nyse.com/quote/XNYS:ABT',
            name => 'Abbot Laboratories',
            sector => 'Health Care',
            industry => 'Health Care Equipment',
            address => 'North Chicago, Illinois',
            date => '1964-03-31',
            cik => '0000001800'
        }

        # ...

    ]

ie our output from the task is an arrayref of result set hashes.

Task Input/Output Format

In TaskPipe each task accepts a hashref input of variables, performs an operation (the task itself) and produces a list of results. Thus it is, in general, a one to many operation. A general example is where our task scrapes a list of data - such as the wikipedia list of S&P500 companies. We have 1 set of inputs (the wikipedia URL and the Referer header), and we produce a list of companies.

In some cases our task expects a single set of inputs and delivers a single set of outputs. This is true when we scrape a detail page. So in the second task, where we are scraping the stock quote, we have one input (the URL of the quote for a given company) and one output (the quote for the company). However, we still expect to produce a list (ie an arrayref) as the output - it is just a list containing only one element in this case.

tasks vs xtasks, branches vs xbranches, trees vs xtrees...

In our example we had a single branch, ie a completely linear order of operations. However, you'll note that the first task is executed only once (against the single wikipedia page which lists the S&P500 companies), but the second task Scrape_Quote is executed many times (one for each S&P500 company). If we draw the structure of executed tasks (which we will denote as xtasks) then this looks more like a tree (an xtree!) than a single branch:

                    (Scrape_Companies)
                wikipedia S&P500 companies list page
                            |
                            |
               ------------------------------------------
               |                   |                |           
        (Scrape_Quote)      (Scrape_Quote)      (Scrape_Quote)  ...
        Company: MMM        Company: ABT        Company: ABBV

So in "task space" we are looking at a single branch. But in "xtask space" we are looking at an "xtree". An appreciation of tasks vs. xtasks and branches vs. xbranches is important for understanding and getting the most from caching. However, I will skip the details of this for now.

Remember, TaskPipe is handling threads. So if you have e.g. 10 threads specified as the maximum number of threads in your config file, TaskPipe will go off and grab up to 10 company quotes at the same time. In general you shouldn't need to worry about thread allocation: TaskPipe's ThreadManager module should ensure the threads you allocate are kept busy.

TaskPipe Tool

TaskPipe comes with a command line interface script taskpipe which can be used to deploy files, database tables and manage projects. Some example commands

  • prepare taskpipe for use on your system

        taskpipe setup
  • create file stubs for a new project

        taskpipe deploy files --project=myprojectname
  • deploy basic database tables for your new project

        taskpipe deploy tables --project=myprojectname
  • run your projects main plan

        taskpipe run plan --project=myprojectname

NOTE The above is a sample selection of commands, not an exhaustive list, and nor does it represent all the steps needed for setup. Run

    taskpipe help

for a list of available commands. See TaskPipe::Manual::Installation for information on getting TaskPipe up and running.