NAME

Stream::Aggregate - generate aggregate information from a stream of data

SYNOPSIS

 use Stream::Aggregate;

 my $af = generate_aggregation_func($agg_config, $extra_parameters, $user_extra_parameters);

 while ($log = ???) {
	@stats = $af->($log);
 }
 @stats = $af->(undef);

DESCRIPTION

Stream::Aggregate is a general-purpose aggregation module that will aggregate from a stream of perl objects. While it was written specifically for log processing, it can be used for other things too. The input records for the aggregator must be sorted in the order of the contexts you will aggregate over. If you want to count things by URL, then you must sort your input by URL.

Aggregation has two key elements: how you group things, and what you aggregate. This module understands two different ways to group things: nested and cross-product.

Nested groupings come from processing sorted input: if you have three fields you are considering your context, the order in which the data is sorted must match the order in which these fields make up your context.

Cross-prodcut groupings come from processing unsorted input. Each combination of values of the fields that make up your context is another context. This can lead to memory exhaustion so you must specify the maximum number of values for each of the fields.

Nested groupings

Nested groups are most easily illustrated with a simple example: aggregating by year, month, and day. The input data must be sorted by year, month, and day. A single time sort will do that, but other combinations aren't so easy. The current context is defined by the tiplet: (year, month, day). That triplet must be returned by the context code. It is stored in the @current_context array. When a context is finished, it must be converted into a hash by context2columns.

Doing it this way, you can, for example, get the average depth per day, per month, and per year in one pass though your data.

Cross-Product grouping

Cross Product grouping does not depend on the sort order of the input and can have many contexts active at the same time.

For example, if you're aggregating sales figures for shoes and want statistics for the combinations of size, width, and color there isn't a sort or nesting order that will answer your questions.

Use crossproduct to limit yourself to a certain number of values for each variable (say 10 sizes, 3 widths, and 5 colors).

API

The configuration for Stream::Aggregate is compiled into a perl function which is then called once for each input object. Each time it is called, it may produce one or more aggregate objects. When there is no more input data, call the function with undef.

The generate-the-function routine, generate_aggregation_func takes three parameters. The first is the configuration object (defined below) that is expected (but not required) to come from a YAML file. The second and third provide extra information. Currently they are only used to get a description of what this aggregation is trying to do using the name field. Eg:

generate_aggregation_func($agg_config, $extra, $user_extra);

my $code = qq{#line 1 "FAKE-all-code-for-$extra->{name}"\n};

The configuration object for Stream::Aggregate is expected to be read from a YAML file but it does not have to come in that way.

For some of the code fields (below), marked as Closure/Config, you can provide a closure instead of code. To do that, have a BEGIN block set $coderef to the closure. If set, code outside the BEGIN block will only be compiled (never run). When evalutating the BEGIN block, $agg_config will be set to the value of key_config (assuming the field was key).

The behavior of generate_aggregation_func in array connect may change in the future to provide additional return values.

As the aggregator runs over the input, it needs to know the boundries of the contexts so that it knows when to generate an aggregation result record.

For nested groupings, to aggregate over URLs, you need to sort your input by URL and you need to define a context that returns the URL:

context: |
  return ($log->{url})

If you want to aggregate over both the URL and the web host, the context must return an array: host & URL:

context: |
  $log->{url} =~ m{(?:https?|ftp)://([^/:]+)}
  my $host = $1;
  return ($host, $log->{url})

When the context is has multiple levels like that, there will be a resultant aggregation record for each level.

context

Code, Optional. Given a $log entry, return an array that describes the aggregation context. For example, for a URL, this array might be: domain name; host name (if different from domain name); each component of the path of the URL except the final filename. As Aggregate runs, it will generate an aggregation record for each element of the array.

This code will be invoked on every input record.

context2columns

Code, Optional. Given a context, in @current_context, return additional key/value pairs for the resulting aggregation record. This is how the context gets described in the aggregation results records.

This code will be invoked to generate resultant values just before a context is closed.

If this code sets the variable $suppress_result, then this aggregation result will be discarded.

stringify_context

Code, Optional.

If the new context array returned by the context2columns code (soon to become @current_context) is not an array of strings but rather an array of references, it will be turned into strings using YAML.

If this isn't what you want, use stringify_context to do something different. Unlike most of the other functions, stringify_context operates on @_.

This will be invoked for every input record.

crossproduct

Hash, Name->Number, Optional.

For crossproduct groupings, this defines the dimensions. The keys are the variables. The values are the maximum number of values for each variable to track.

The keys must be ephemeral0, ephemeral, or ephemeral2 column names.

simplify

Hash, Name->Code, Optional.

When a cross-product key is exceeding its quota of values, the default replacement value is *. This hash allows you to override the code that chooses the new value.

finalize_result

Code, Optional, Closure/Config.

This code will be called after the resultant values for a context have been calculated. It is a last-chance to modify them or to suppress the results. The values can be found as a reference to a hash: $row. To suppress the results, set $suppress_results.

new_context

Code, Optional, Closure/Config.

This code will be called each time there is a new context. At the time it is called, $ps is a reference to the new context, but @current_context will not yet have been updated.

merge

Code, Optional, Closure/Config.

When using multiple levels of contexts, data is counted for the top-most context layer only. When that top-most layer finishes, the counts are merged into the next more-general layer.

During the merge there is both $ps and $oldps available to for code to reference.

CONTROL FLOW CONFIGURATION

filter

Code, Optional. Before any of the columns are calculated or any of the values saved, run this filter code. If it returns a true value then proceed as normal. If it returns a false value, then do not consider it for any of the statistics values. The filter code an remember things in $ps-{heap}> that might effect how other things are counted. Filtered

In some situations, you many want to throw away most data and count things in the filter. When doing that, it may be that all of the columns come from output.

This may be redesigned, avoid using for the time being.

filter_early

Boolean, Optional, default false. Check the filter early before figuring out contexts? If so, and the result is filtered, don't check to see if the context changed.

passthrough

Code, Optional. Add results to the output of the aggregation. A value of $log adds the input data to the output.

PARAMETERS CONFIGURATION

max_stats_to_keep

Number, Optional, default: 4000.

When aggregating large amounts of data, limit memory use by throwing away some of the data. When data is thrown away, keep this number of samples for statistics functions that need bulk data like standard_deviation.

reduce

Code, Optional, Closure/Config.

When max_stats_to_keep is exceeded, data will be thrown away. This function will be called when that has happened.

preprocess

Code, Optional. Code to preprocess the input $log objects.

item_name

String, Optional, default: $log. In the rest of the code, use call the input data something other than $log. This anticipates using this module for something other than log data.

debug

Boolean, Optional. Print out some debugging information, including the code that is generated for building the columns.

AGGREGATE OUTPUT COLUMNS CONFIGURATION

Each of these (except ephemeral & keep) defines additional columns of output that will be included in each aggregation record. Thse are all optional and all defined as key/value pairs where the keys are column names and the values are perl code. You can refer to previous columns using the variable $column_column_name where column_name is the name of one of the other columns. When refering to other columns, the order in which columns are processed matters: ephemeral and keep are processed first and second respecively. Idential code fragments will be evaluated only once. Within a group, columns are evaluated alphabetically.

Some of the columns will have their code evaluated per-item and some are evaluated per-aggregation.

The input data is in $log unless overriden by item_name.

Per item callbacks

ephemeral

These columns will not be included in the aggregation data. Refer to them as $column_column_name.

ephemeral0

Same as ephemeral, will be evaluated before ephemeral.

ephemeral2

Same as ephemeral, will be evaluated after ephemeral.

counter

Keep a counter. Add one if the code returns true.

percentage

Keep a counter. Include the percentage of items for which the code returned true as an output column as opposed to the number of items where the code return 0. A return value of undef does not count at all.

sum

Keep an accumulator. Add the return values.

mean

Keep an accumulator. Add the return values. Divide by the number of items before inserting into the results. Items whose value is undef do not count towards the number of items.

standard_deviation

Remeber the return values. Compute the standard deviation of the accumulated return values and insert that into the results. Items whose value is undef are removed before calculating the standard_deviation.

median

Remeber the return values. Compute the median of the accumulated return values and insert that into the results. Items whose value is undef are removed before calculating the median.

dominant

Remeber the return values. Compute the mode (most frequent) of the accumulated return values and insert that into the results. Items whose value is undef are removed before calculating the mode.

min

Keep a minimum value. Replace it with the return value if the return value is less than the current value. Items whose value is undef are removed before calculating the min.

max

Keep a maximum value. Replace it with the return value if the return value is greater than the current value. Items whose value is undef are removed before calculating the max.

minstr

Keep a minimum string value. Replace it with the return value if the return value is less than the current value. Items whose value is undef are removed before calculating the minstr.

maxstr

Keep a maximum string value. Replace it with the return value if the return value is greater than the current value. Items whose value is undef are removed before calculating the maxstr.

keep

Remember the return values. The return values are available at aggregation time as @{$ps->{keep}{column_name}}. Items whose value is undef are kept but they're ignored by Stream::Aggregate::Stats functions.

Per aggregation result record callbacks

For code that is per-aggregation, the saved aggregation state can be found in $ps. One item that is probably needed is $ps->{item_count}.

output

Extra columns to include in the output. This is where to save $ps->{item_count}.

stat

Use arbitrary perl code to compute statistics on remembered return values kept with keep. Write your own function or use any of the functions in Stream::Aggregate::Stats (the global variable is pre-loaded). No, there isn't any difference between this and output.

VARIALBES AVAILABLE FOR CODE SNIPPETS TO USE

The following variables are available for the code that generates per-item and per-aggregation statistics:

$log

The current item (unless overridden by item_name)

$ps->{keep}{column_name}

An array of return values kept by keep.

$ps->{numeric}{column_name}

If Stream::Aggregate::Stats functions are called, they will grab the numeric values from $ps->{keep}{column_name} and store them in $ps->{numeric}{column_name}

$ps->{random}

For each kept item in $ps->{keep}{column_name}, there is a corrosponding item in $ps->{random} that is a random number. These random numbers are used to determine which values to keep and which values to toss if there are too many values to keep them all.

$ps->{$column_type}{column_name}

For each type of column (output, counter, percentage, sum, min, standard_deviation, median, stat) the values that will be part of the final aggregation record.

$ps->{$tempoary_type}{column_name}

Some columns need temporary storage for their values: percentage_counter (the counter used by percentage); percentage_total (the number of total items); mean_sum (the sum used to compute the mean); mean_count (the number of items for the mean).

$ps->{heap}

A hash that can be used by the configured perl code for whatever it wants.

$ps->{item_counter}

The count of items.

$agg_config

The configuration object for Stream::Aggregate

$itemref

A reference to $log. It's always $itemref even if $log is something else.

@current_context

The current context as returned by context.

@context_strings

The string-ified version @current_context as returned by stringify_context or YAML.

@contexts

The array of context objects. $ps is always $context[-1].

@items_seen

An array that counts the number of rows of output from this aggregation. When the context is multi-level, the counter is multi-level. For example, if the context is domain, host, and URL; then $items_seen[0] is the number of domains (so far), and $items_seen[1] is the number of hosts for this domain (so far), and $items_seen[2] is the number of URLs for this host (so far).

Passthrough rows do not count.

$row

When gather results, the variable that holds them is a reference to a hash: $row.

$suppress_result

After gathering results, the $suppress_result variable is examined. If it's set the results (in $row) are discards.

To skip results that aren't crossproduct results, in finalize_result, set $suppress_result if $cross_count isn't true.

$cross_count

The number of currently active crossproduct accumulator contexts.

$extra, $user_extra

The additional paramerts (beyond $agg_config) that were passed to generate_aggregation_func().

%persist

This hash is not used by Stream::Aggregate. It's available for any supplied code to use however it wants.

$last_item

A refernece to the previous $log object. This is valid during finalize_result and context2columns.

There are more. Read the code.

HELPER FUNCTIONS

The following helper functions are available: everything in Stream::Aggregate::Stats and:

nonblank($value)

Returns $value if $value is defined and not the empty string. Returns undef otherwise.

LICENSE

This package may be used and redistributed under the terms of either the Artistic 2.0 or LGPL 2.1 license.