#include "KinoSearch/Util/ToolSet.h"

#define KINO_WANT_BBSORTEXRUN_VTABLE
#include "KinoSearch/Util/BBSortExRun.r"

#include "KinoSearch/Store/InStream.r"
#include "KinoSearch/Store/OutStream.r"
#include "KinoSearch/Util/BBSortEx.r"

BBSortExRun*
BBSortExRun_new(Obj **elems, u32_t num_elems) 
{
    CREATE(self, BBSortExRun, BBSORTEXRUN);
    
    /* init */
    SortExRun_init_base((SortExRun*)self, BBSortEx_compare_bbs);
    self->mem_thresh   = 0;
    self->instream     = NULL;
    self->start        = 0;
    self->end          = 0;

    /* take posession of cache elems */
    self->cache = MALLOCATE(num_elems, Obj*);
    memcpy(self->cache, elems, num_elems * sizeof(Obj*));
    self->cache_max = num_elems;
    self->cache_cap = num_elems;

    return self;
}

void
BBSortExRun_flush(BBSortExRun *self, OutStream *outstream)
{
    Obj       **cache   = self->cache;
    Obj **const limit   = cache + self->cache_max;

    /* sanity check */
    if (self->end != 0)
        CONFESS("Can't flush twice");

    /* mark start of run */
    self->start   = OutStream_STell(outstream);
    
    /* write items to file */
    for ( ; cache < limit; cache++) {
        ByteBuf *const bb = (ByteBuf*)*cache;
        OutStream_Write_VInt(outstream, bb->len);
        OutStream_Write_Bytes(outstream, bb->ptr, bb->len);
    }

    /* release elements */
    BBSortExRun_Clear_Cache(self);
    free(self->cache);
    self->cache     = NULL;
    self->cache_cap = 0;

    /* mark end of run */
    self->end = OutStream_STell(outstream);
}

void
BBSortExRun_flip(BBSortExRun *self, InStream *instream, u32_t mem_thresh)
{
    if (self->instream != NULL)
        CONFESS("Can't call BBSortexRun_Flip more than once");
    if (mem_thresh == 0)
        CONFESS("mem_thresh cannot be 0");
    self->instream   = (InStream*)InStream_Clone(instream);
    InStream_SSeek(self->instream, self->start);
    self->mem_thresh = mem_thresh;
}

u32_t
BBSortExRun_refill(BBSortExRun *self)
{
    InStream   *instream        = self->instream;
    const u32_t mem_thresh      = self->mem_thresh;
    u64_t       end             = self->end;
    u32_t       bytes_read      = 0;
    u32_t       num_elems       = 0; /* number of items recovered */
    u32_t       len;
    ByteBuf    *bb;

    if (self->instream == NULL)
        return 0;

    /* make sure cache is empty */
    if (self->cache_max - self->cache_tick > 0) {
        CONFESS("Refill called but cache contains %u items",
            self->cache_max - self->cache_tick);
    }
    BBSortExRun_Clear_Cache(self);

    while (1) {
        /* bail if we've read everything in this run */
        if (InStream_STell(instream) >= end) {
            /* make sure we haven't read too much */
            if (InStream_STell(instream) > end) {
                unsigned long pos = (unsigned long)InStream_STell(instream);
                CONFESS("read past end of run: %lu %lu", pos, 
                    (unsigned long)end);
            }
            break;
        }

        /* bail if we've hit the ceiling for this run's cache */
        if (bytes_read >= mem_thresh)
            break;

        /* retrieve and decode len; allocate ByteBuf and recover string */
        len = InStream_Read_VInt(instream);
        bb  = BB_new(len);
        InStream_Read_Bytes(instream, bb->ptr, len);
        bb->len = len;
        *BBEND(bb) = '\0';

        /* add to the run's cache */
        if (num_elems == self->cache_cap) {
            BBSortExRun_Grow_Cache(self, num_elems);
        }
        self->cache[ num_elems ] = (Obj*)bb;

        /* track how much we've read so far */
        num_elems++;
        bytes_read += len + 1;
    }

    /* reset the cache array position and length; remember file pos */
    self->cache_max   = num_elems;
    self->cache_tick  = 0;

    return num_elems;
}

void
BBSortExRun_destroy(BBSortExRun *self) 
{
    REFCOUNT_DEC(self->instream);
    BBSortExRun_Clear_Cache(self);
    REFCOUNT_DEC(self->context);
    free(self->cache);
    free(self);
}

/* Copyright 2006-2007 Marvin Humphrey
 *
 * This program is free software; you can redistribute it and/or modify
 * under the same terms as Perl itself.
 */