NAME

DataFlow::Node - A generic processing node in a data flow

VERSION

version 0.91.05

SYNOPSIS

use DataFlow::Node;

my $uc = DataFlow::Node->new(
    process_item => sub {
        shift; return uc(shift);
    }
);

my @res = $uc->process( qw/god save the queen/ );
# @res == qw/GOD SAVE THE QUEEN/

# or, in two steps:
$uc->input( qw/dont panic/ );
my @cool = $uc->output;
# @cool == qw/DONT PANIC/

Or

my $ucd = UC->new(
    process_into => 1,
    process_item => sub {
        shift; return uc(shift);
    }
);

$ucd->input( [qw/aaa bbb ccc/] );
$item = $ucd->output;
# $item == [ 'AAA', 'BBB', 'CCC' ]

$ucd->input(
    {   a => 'aaa',
        b => 'bbb } );

$item = $ucd->output;
# $item == { a => ' AAA ', b => ' BBB ' }

DESCRIPTION

This is a Moose based class that provides the idea of a step in a data-flow. It attemps to be as generic and unassuming as possible, in order to provide flexibility for implementors to make their own nodes as they see fit.

An object of the type DataFlow::Node does three things: accepts some data as input, processes that data, provides the transformed data as output.

The methods input and output provide the obvious functionality, while attempting to preserve the input data structure. The convenience method process() will pump its parameters into $self->input() and immediately return the result of $self->output().

A node will only be useful if, naturally, it performs some sort of transformation or processing on the input data. Thus, objects of the type DataFlow::Node must provide the code reference named process_item. This method will be called with just one parameter at a time, which will correspond one single input item.

Unless told differently (see the process_into option below), DataFlow::Node will treat as an individual item anything that is: a scalar, a blessed object, and a reference (of any kind). And, it will iterate over anything that is either an array or hash (treated like an array, as described above).

However, it might be convenient in many cases to have things work in a smarter way. If the input is an array reference, one might expect that every element in the referenced array should be processed. Or, that every value in a hash reference should be processed. For cases like that, DataFlow::Node provides a simple de-referencing mechanism.

INPUT

The input is provided through the method input(), which will gladly accept anything passed as parameter. However, it must be noticed that it will not be able to make a distinction between arrays and hashes. Both forms below will render the exact same results:

$node->input( qw/all the simple things/ );
$node->input( all => the, simple => 'things' );

If you do want to handle arrays and hashes differently, we strongly suggest that you use references:

$node->input( [ qw/all the simple things/ ] );
$node->input( { all => the, simple => 'things' } );

And, in the process_item

my $node = DataFlow:Node->new(
    process_item => sub {
        my ($self,$item) = @_;
        if( ref($item) eq 'ARRAY' ) {
            my @a = @{ $item };
            # ... do something with array @a
        }
        elsif( ref($item) eq 'HASH' ) {
            my %hash = %{ $item };
            # ... handle hash differently
        }
        ...
    }
);

PROCESS

The processing of the data is performed by the sub referenced by the process_item attribute. This attribute is required by DataFlow::Node.

Calling Convention

The code referenced by process_item will be called with two arguments: a reference to the DataFlow::Node object, and one single item from the input queue, be it a simple scalar, or any type of reference. The code below shows a typical implementation:

my $node = DataFlow::Node->new(
    process_item => sub {
        my ($self,$item) = @_;
        # do something with $item
        return $processed_item;
    }
);

Inheritance

When inheriting from DataFlow::Node, some classes may provide a default code for process_item. For instance:

package UCNode;

use Moose;
extends 'DataFlow::Node';

has '+process_item' => (
    default => sub {
        return sub {
            shift; return uc(shift);
        }
    },
);

Notice that the enclosing sub is mandatory in this case. The reason is that the outter sub is responsible for providing a default value to process_item and is run only once by Moose, while the inner sub is the actual value of the code reference process_item, and will be invoked every time a data item needs to be processed.

Dereferencing

If you set the attribute process_into as true, then the node will treat references differently. It will process the referenced objects, rather than the actual reference. It will work as follows:

$scalar = 'some text';
$ucd->input( \$scalar );
$res = $ucd->output;
print ${ $res };     # 'SOME TEXT'

$aref = [ qw/this is a test/ ];
$ucd->input( $aref );
$res = $ucd->output;
print Dumper($res);  # $VAR1 = [ 'THIS', 'IS', 'A', 'TEST' ]

$href = { apple => 'red', orange => 'orange', pineapple => 'yellow' };
$ucd->input( $href );
$res = $ucd->output;
print Dumper($res);  # $VAR1 = {
                           apple     => 'RED',
                           orange    => 'ORANGE',
                           pineapple => 'YELLOW',
                       }

$cref = sub { return 'a dozen dirty pirates' };
$ucd->input( $cref );
$res = $ucd->output;
print $res;          # 'A DOZEN DIRTY PIRATES'

Notice that, except for the code reference, for all others Node will preserve the original structure.

OUTPUT

The output is provided by the method output. If called in scalar context it will return one processed item from the node. If called in list context it will return all the elements in the queue.

ATTRIBUTES

deref

A boolean attribute that signals whether the output of the node will be de-referenced or if Node will preserve the original reference.

process_into

A boolean attribute that signals whether references should be dereferenced or not. If process_into is true, then process_item will be applied into the values referenced by any scalar, array or hash reference and onto the result of running any code reference.

process_item

A code reference that is the actual work horse for this class. It is a mandatory attribute, and must follow the calling conventions described above.

METHODS

input

Provide input data for the node.

has_input

Returns true if there is data in the input queue, false otherwise.

process_input

Processes the items in the input queue and place the results in the output queue.

output

Fetch data from the node.

flush

Flushes this node's queues

has_output

Returns true if there is data in the output queue, false otherwise.

has_queued_data

Returns true if there is data in either the input or the output queue of this node, false otherwise.

process

Convenience method to provide input and immediately get the output.

get_error

Fetch error messages (if any) from the node.

DEPENDENCIES

Scalar::Util

Queue::Base

INCOMPATIBILITIES

None reported.

BUGS AND LIMITATIONS

No bugs have been reported.

Please report any bugs or feature requests to bug-dataflow@rt.cpan.org, or through the web interface at http://rt.cpan.org.

AUTHOR

Alexei Znamensky <russoz@cpan.org>

COPYRIGHT AND LICENSE

This software is copyright (c) 2011 by Alexei Znamensky.

This is free software; you can redistribute it and/or modify it under the same terms as the Perl 5 programming language system itself.