package App::ElasticSearch::Utilities::Aggregations;
# ABSTRACT: Code to simplify creating and working with Elasticsearch aggregations

use strict;
use warnings;

use Storable qw(dclone);
use Sub::Exporter -setup => {
    exports => [ qw(
        expand_aggregate_string
        es_flatten_aggregations es_flatten_aggs
        is_single_stat
    )],
    groups => {
        default => [qw(
            expand_aggregate_string
            es_flatten_aggregations es_flatten_aggs
            is_single_stat
        )],
    },
};

my %Aggregations;


$Aggregations{terms} = {
    params    => sub { $_[0] && $_[0] =~ /^\d+$/ ? { size => $_[0] } : {} },
    type      => 'bucket',
    composite => 1,
};


$Aggregations{significant_terms} = {
    params => sub { $_[0] =~ /^\d+$/ ? { size => $_[0] } : {} },
    type   => 'bucket',
};


$Aggregations{rare_terms} = {
    params => sub { $_[0] =~ /^\d+$/ ? { max_doc_count => $_[0] } : {} },
    type   => 'bucket',
};


$Aggregations{histogram} = {
    params => sub {
        return unless $_[0] > 0;
        return { interval => $_[0] };
    },
    type      => 'bucket',
    composite => 1,
};


$Aggregations{date_histogram} = {
    params    => sub { { calendar_interval => $_[0] || '1h' } },
    type      => 'bucket',
    composite => 1,
};


$Aggregations{geohash_grid} = {
    params    => sub { $_[0] =~ /^\d+$/ ? { precision => $_[0] } : {} },
    type      => 'bucket',
    composite => 1,
};


$Aggregations{missing} = { type => 'bucket' };


$Aggregations{avg} = { single_stat => 1, type => 'metric' };
$Aggregations{max} = { single_stat => 1, type => 'metric' };
$Aggregations{min} = { single_stat => 1, type => 'metric' };
$Aggregations{sum} = { single_stat => 1, type => 'metric' };


$Aggregations{cardinality} = { single_stat => 1, type => 'metric' };


$Aggregations{stats} = { type => 'metric' };


$Aggregations{extended_stats} = { type => 'metric' };


$Aggregations{percentiles} = {
    params => sub {
        my @pcts = $_[0] ? split /,/, $_[0] : qw(25 50 75 90);
        return { percents => \@pcts };
    },
};


$Aggregations{geo_centroid} = { type => 'metric' };




sub is_single_stat {
    my ($agg) = @_;
    return unless $agg;
    return unless exists $Aggregations{$agg};
    return unless exists $Aggregations{$agg}->{single_stat};
    return $Aggregations{$agg}->{single_stat};
}


sub expand_aggregate_string {
    my ($token) = @_;

    my %aggs = ();
    foreach my $def ( split /\+/, $token ) {
        my $alias = $def =~ s/^(\w+)=// ? $1 : undef;
        my @parts = split /:/, $def, 3;
        if( @parts == 1 ) {
            $alias ||= $def;
            $aggs{$alias} = { terms => { field => $def, size => 20 } };
            next;
        }
        my ($agg, $field);
        if( exists $Aggregations{$parts[0]} ) {
            $agg     = shift @parts;
            $field   = shift @parts;
        }
        else {
            $agg = 'terms';
            $field = shift @parts;
        }
        my $params  = {};
        my $paramStr = shift @parts;

        if( $paramStr && $paramStr =~ /\w+=/ ) {
            # split on commas using a positive lookahead for a "word="
            foreach my $token (split /,(?=\w+=)/, $paramStr) {
                my ($k,$v) = split /=/, $token, 2;
                next unless $k and $v;
                $params->{$k} = $v =~ /,/ ? [ split /,/, $v ] : $v;
            }
        }
        elsif( exists $Aggregations{$agg}->{params} ) {
            # Process parameters
            $params = $Aggregations{$agg}->{params}->($paramStr);
        }
        $alias ||= join ".", $agg eq 'terms' ? ($field) : ($agg, $field);
        $aggs{$alias} = { $agg => { field => $field, %{ $params } } };
    }
    return \%aggs;
}


sub es_flatten_aggregations {
    my ($result,$field,$parent) = @_;

    $parent ||= [];
    my @rows = ();

    my @remove = qw(
        doc_count_error_upper_bound
        sum_other_doc_count
    );

    my $row = dclone($parent);
    my $extract = sub {
        my ($key, $hash) = @_;

        if( $hash->{value} ) {
            push @{ $row }, $key, $hash->{value};
        }
        elsif( $hash->{values} ) {
            foreach my $k ( sort keys %{ $hash->{values} } ) {
                push @{ $row }, "$key.$k", $hash->{values}{$k}
                    if $hash->{values}{$k};
            }
        }
        else {
            foreach my $k (sort keys %{ $hash }) {
                last if $k eq 'buckets';
                push @{ $row }, "$key.$k", $hash->{$k}
                    if defined $hash->{values}{$k};
            }
        }
    };

    if( $field ) {
        delete $result->{$_} for @remove;
        if( $result->{key} and exists $result->{doc_count} ) {
            push @{ $row }, $field, delete $result->{key};
            push @{ $row }, "$field.hits", delete $result->{doc_count} || 0;
        }
        my %buckets = ();
        foreach my $k ( sort keys %{ $result } ) {
            if( ref $result->{$k} eq 'HASH' ) {
                $extract->($k, $result->{$k});

                if( my $buckets = delete $result->{$k}{buckets} ) {
                    $buckets{$k} = $buckets;
                }
            }
        }
        if( keys %buckets ) {
            foreach my $k ( sort keys %buckets ) {
                if( @{ $buckets{$k} } ) {
                    foreach my $bucket ( @{ $buckets{$k} } ) {
                        push @rows, @{ es_flatten_aggregations($bucket, $k, $row) };
                    }
                }
                else {
                    push @rows, $row;
                }
            }
        }
        else {
            push @rows, $row;
        }
    }
    else {
        foreach my $k ( sort keys %{ $result } ) {
            delete $result->{$k}{$_} for @remove;
            $extract->($k, $result->{$k});
            my $buckets = delete $result->{$k}{buckets};
            if( $buckets and @{ $buckets } ) {
                foreach my $bucket ( @{ $buckets } ) {
                    push @rows, @{ es_flatten_aggregations($bucket,$k,$row) };
                }
            }
            else {
                push @rows, $row;
            }
        }
    }

    return \@rows;
}

# Setup Aliases
*es_flatten_aggs = \&es_flatten_aggregations;


1;

__END__

=pod

=head1 NAME

App::ElasticSearch::Utilities::Aggregations - Code to simplify creating and working with Elasticsearch aggregations

=head1 VERSION

version 8.2

=head1 FUNCTIONS

=head2 is_single_stat()

Returns true if an aggregation returns a single value.

=head2 expand_aggregate_string( token )

Takes a simplified aggregation grammar and expands it the full aggregation hash.

Simple Terms:

    field_name

To

    {
        field_name => {
            terms => {
                field => 'field_name',
                size  => 20,
            }
        }
    }

Alias expansion:

    alias=field_name

To

    {
        alias => {
            terms => {
                field => 'field_name',
                size  => 20,
            }
        }
    }

Parameters:

    alias=field_name:10

To

    {
        alias => {
            terms => {
                field => 'field_name',
                size  => 10,
            }
        }
    }

Parameters, k/v:

    alias=field_name:size=13

To

    {
        alias => {
            terms => {
                field => 'field_name',
                size  => 13,
            }
        }
    }

=head2 es_flatten_aggregations()

Takes the B<aggregations> section of the query result and parses it into a flat
structure so each row contains all the sub aggregation information.

It returns an array reference, containing arrray references.  The individual
rows of the array are ordered in a depth first fashion.  The array does include
a key for every value, so the array can be cast to a hash safely.

=head1 Aggregations

List of supported aggregations.  Other aggregation may work, but these have defined behavior.

=head2 Bucket Aggregations

These aggregations will support sub aggregations.

=over 2

=item B<terms>

The default aggregation if none is specified.

    field_name
    terms:field_name

Results in

    {
        "field_name": {
            "terms": {
                "field": "field_name"
            }
        }
    }

Supports a positional parameter: size

    field_name:20
    terms:field_name:20

Results in

    {
        "field_name": {
            "terms": {
                "field": "field_name",
                "size": 20
            }
        }
    }

=item B<significant_terms>

Same as C<terms>.

    significant_terms:field_name:10

Results in:

    {
        "rare_terms.field_name": {
            "terms": {
                "field": "field_name",
                "size": 10
            }
        }
    }

=item B<rare_terms>

Same as C<terms> but the positional parameter is the C<max_doc_count>.

    rare_terms:field_name:10

Results in:

    {
        "rare_terms.field_name": {
            "terms": {
                "field": "field_name",
                "max_doc_count": 10
            }
        }
    }

=item B<histogram>

Creates a histogram for numeric fields.  Positional parameter is the interval.

    histogram:field_name:10

Results in:

    {
        "histogram.field_name": {
            "histogram": {
                "field": "field_name",
                "interval": 10
            }
        }
    }

=item B<date_histogram>

Creates a histogram for date fields.  Positional parameter is the calendar_interval.

    date_histogram:field_name:1h

Results in:

    {
        "histogram.field_name": {
            "histogram": {
                "field": "field_name",
                "calendar_interval": "1h"
            }
        }
    }

=item B<geohash_grid>

Creates a geohash grid bucket aggregation.  Positional parameter is the precision.

    geohash_grid:field_name:6

Results in:

    {
        "geohash_grid.field_name": {
            "geohash_grid": {
                "field": "field_name",
                "precision": 6
            }
        }
    }

=item B<missing>

Creates a bucket for documents missing the field.  No positional parameters.

    missing:field_name

Results in:

    {
        "missing.field_name": {
            "missing": {
                "field": "field_name"
            }
        }
    }

=back

=head2 Metric Aggregations

Aggregations that generate metrics from enclosing buckets.

=over 2

=item B<avg>, B<max>, B<min>, B<sum>

Single stat metric aggregations to generate the various single statistics over the enclosing bucket.

    sum:field_name

Results in

    {
        "sum.field_names": {
            "sum": {
                "field": "field_name"
            }
        }
    }

=item B<cardinality>

Computes the unique count of terms in a field.

    cardinality:field_name

Results in

    {
        "cardinality.field_names": {
            "cardinality": {
                "field": "field_name"
            }
        }
    }

=item B<stats>

Runs the stats aggregation that returns min, max, avg, sum, and count.

    stats:field_name

Results in

    {
        "stats.field_names": {
            "stats": {
                "field": "field_name"
            }
        }
    }

=item B<extended_stats>

Runs the stats aggregation that returns the same data as the C<sum> aggregation
plus variance, sum of squares, and standard deviation.

    extended_stats:field_name

Results in

    {
        "extended_stats.field_names": {
            "extended_stats": {
                "field": "field_name"
            }
        }
    }

=item B<percentiles>

Computes percentiles for the enclosing bucket. The positional parameter is
interpretted at the percents computed.  If ommitted, the percentiles computed
will be: 25, 50, 75, 90.

    percentiles:field_name:75,95,99

Results in

    {
        "percentiles.field_names": {
            "percentiles": {
                "field": "field_name",
                "percents": [ 75, 95, 99 ]
            }
        }
    }

=item B<geo_centroid>

Computes center of a group of geo points. No positional parameters supported.

    geo_centroid:field_name

Results in

    {
        "geo_centroid.field_names": {
            "geo_centroid": {
                "field": "field_name"
            }
        }
    }

=back

=for Pod::Coverage es_flatten_aggs

=head1 AUTHOR

Brad Lhotsky <brad@divisionbyzero.net>

=head1 COPYRIGHT AND LICENSE

This software is Copyright (c) 2021 by Brad Lhotsky.

This is free software, licensed under:

  The (three-clause) BSD License

=cut