/* Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

#define C_LUCY_INDEXER
#include "Lucy/Util/ToolSet.h"

#include "Lucy/Index/Indexer.h"
#include "Clownfish/Boolean.h"
#include "Lucy/Analysis/Analyzer.h"
#include "Lucy/Document/Doc.h"
#include "Lucy/Plan/FieldType.h"
#include "Lucy/Plan/FullTextType.h"
#include "Lucy/Plan/Schema.h"
#include "Lucy/Index/DeletionsReader.h"
#include "Lucy/Index/DeletionsWriter.h"
#include "Lucy/Index/FilePurger.h"
#include "Lucy/Index/IndexManager.h"
#include "Lucy/Index/PolyReader.h"
#include "Lucy/Index/Segment.h"
#include "Lucy/Index/SegReader.h"
#include "Lucy/Index/Snapshot.h"
#include "Lucy/Index/SegWriter.h"
#include "Lucy/Plan/Architecture.h"
#include "Lucy/Search/Matcher.h"
#include "Lucy/Search/Query.h"
#include "Lucy/Store/Folder.h"
#include "Lucy/Store/FSFolder.h"
#include "Lucy/Store/Lock.h"
#include "Lucy/Util/Freezer.h"
#include "Lucy/Util/IndexFileNames.h"
#include "Lucy/Util/Json.h"
#include "Lucy/Util/StringHelper.h"

int32_t Indexer_CREATE   = 0x00000001;
int32_t Indexer_TRUNCATE = 0x00000002;

// Release the write lock - if it's there.
static void
S_release_write_lock(Indexer *self);

// Release the merge lock - if it's there.
static void
S_release_merge_lock(Indexer *self);

// Verify a Folder or derive an FSFolder from a String path.  Call
// Folder_Initialize() if "create" is true.
static Folder*
S_init_folder(Obj *index, bool create);

// Find the schema file within a snapshot.
static String*
S_find_schema_file(Snapshot *snapshot);

Indexer*
Indexer_new(Schema *schema, Obj *index, IndexManager *manager, int32_t flags) {
    Indexer *self = (Indexer*)Class_Make_Obj(INDEXER);
    return Indexer_init(self, schema, index, manager, flags);
}

Indexer*
Indexer_init(Indexer *self, Schema *schema, Obj *index,
             IndexManager *manager, int32_t flags) {
    IndexerIVARS *const ivars = Indexer_IVARS(self);
    bool      create   = (flags & Indexer_CREATE)   ? true : false;
    bool      truncate = (flags & Indexer_TRUNCATE) ? true : false;
    Folder   *folder   = S_init_folder(index, create);
    Snapshot *latest_snapshot = Snapshot_new();

    // Init.
    ivars->stock_doc     = Doc_new(NULL, 0);
    ivars->truncate      = false;
    ivars->optimize      = false;
    ivars->prepared      = false;
    ivars->needs_commit  = false;
    ivars->snapfile      = NULL;
    ivars->merge_lock    = NULL;

    // Assign.
    ivars->folder       = folder;
    ivars->manager      = manager
                         ? (IndexManager*)INCREF(manager)
                         : IxManager_new(NULL, NULL);
    IxManager_Set_Folder(ivars->manager, folder);

    // Get a write lock for this folder.
    Lock *write_lock = IxManager_Make_Write_Lock(ivars->manager);
    Lock_Clear_Stale(write_lock);
    if (Lock_Obtain(write_lock)) {
        // Only assign if successful, otherwise DESTROY unlocks -- bad!
        ivars->write_lock = write_lock;
    }
    else {
        DECREF(write_lock);
        DECREF(self);
        RETHROW(INCREF(Err_get_error()));
    }

    // Find the latest snapshot or create a new one.
    String *latest_snapfile = IxFileNames_latest_snapshot(folder);
    if (latest_snapfile) {
        Snapshot_Read_File(latest_snapshot, folder, latest_snapfile);
    }

    // Look for an existing Schema if one wasn't supplied.
    if (schema) {
        ivars->schema = (Schema*)INCREF(schema);
    }
    else {
        if (!latest_snapfile) {
            S_release_write_lock(self);
            THROW(ERR, "No Schema supplied, and can't find one in the index");
        }
        else {
            String *schema_file = S_find_schema_file(latest_snapshot);
            Obj *dump = Json_slurp_json(folder, schema_file);
            if (dump) { // read file successfully
                ivars->schema = (Schema*)CERTIFY(Freezer_load(dump), SCHEMA);
                schema = ivars->schema;
                DECREF(dump);
                schema_file = NULL;
            }
            else {
                THROW(ERR, "Failed to parse %o", schema_file);
            }
        }
    }

    // If we're clobbering, start with an empty Snapshot and an empty
    // PolyReader.  Otherwise, start with the most recent Snapshot and an
    // up-to-date PolyReader.
    if (truncate) {
        ivars->snapshot = Snapshot_new();
        ivars->polyreader = PolyReader_new(schema, folder, NULL, NULL, NULL);
        ivars->truncate = true;
    }
    else {
        // TODO: clone most recent snapshot rather than read it twice.
        ivars->snapshot = (Snapshot*)INCREF(latest_snapshot);
        ivars->polyreader = latest_snapfile
                           ? PolyReader_open((Obj*)folder, NULL, NULL)
                           : PolyReader_new(schema, folder, NULL, NULL, NULL);

        if (latest_snapfile) {
            // Make sure than any existing fields which may have been
            // dynamically added during past indexing sessions get added.
            Schema *old_schema = PolyReader_Get_Schema(ivars->polyreader);
            Schema_Eat(schema, old_schema);
        }
    }

    // Zap detritus from previous sessions.
    // Note: we have to feed FilePurger with the most recent snapshot file
    // now, but with the Indexer's snapshot later.
    FilePurger *file_purger
        = FilePurger_new(folder, latest_snapshot, ivars->manager);
    FilePurger_Purge(file_purger);
    DECREF(file_purger);

    // Create a new segment.
    int64_t new_seg_num
        = IxManager_Highest_Seg_Num(ivars->manager, latest_snapshot) + 1;
    Lock *merge_lock = IxManager_Make_Merge_Lock(ivars->manager);
    if (Lock_Is_Locked(merge_lock)) {
        // If there's a background merge process going on, stay out of its
        // way.
        Hash *merge_data = IxManager_Read_Merge_Data(ivars->manager);
        Obj *cutoff_obj = merge_data
                          ? Hash_Fetch_Utf8(merge_data, "cutoff", 6)
                          : NULL;
        if (!cutoff_obj) {
            DECREF(merge_lock);
            DECREF(merge_data);
            THROW(ERR, "Background merge detected, but can't read merge data");
        }
        else {
            int64_t cutoff = Json_obj_to_i64(cutoff_obj);
            if (cutoff >= new_seg_num) {
                new_seg_num = cutoff + 1;
            }
        }
        DECREF(merge_data);
    }
    ivars->segment = Seg_new(new_seg_num);

    // Add all known fields to Segment.
    Vector *fields = Schema_All_Fields(schema);
    for (size_t i = 0, max = Vec_Get_Size(fields); i < max; i++) {
        Seg_Add_Field(ivars->segment, (String*)Vec_Fetch(fields, i));
    }
    DECREF(fields);

    DECREF(merge_lock);

    // Create new SegWriter and FilePurger.
    ivars->file_purger
        = FilePurger_new(folder, ivars->snapshot, ivars->manager);
    ivars->seg_writer = SegWriter_new(ivars->schema, ivars->snapshot,
                                     ivars->segment, ivars->polyreader);
    SegWriter_Prep_Seg_Dir(ivars->seg_writer);

    // Grab a local ref to the DeletionsWriter.
    ivars->del_writer = (DeletionsWriter*)INCREF(
                           SegWriter_Get_Del_Writer(ivars->seg_writer));

    DECREF(latest_snapfile);
    DECREF(latest_snapshot);

    return self;
}

void
Indexer_Destroy_IMP(Indexer *self) {
    IndexerIVARS *const ivars = Indexer_IVARS(self);
    S_release_merge_lock(self);
    S_release_write_lock(self);
    DECREF(ivars->schema);
    DECREF(ivars->folder);
    DECREF(ivars->segment);
    DECREF(ivars->manager);
    DECREF(ivars->stock_doc);
    DECREF(ivars->polyreader);
    DECREF(ivars->del_writer);
    DECREF(ivars->snapshot);
    DECREF(ivars->seg_writer);
    DECREF(ivars->file_purger);
    DECREF(ivars->write_lock);
    DECREF(ivars->snapfile);
    SUPER_DESTROY(self, INDEXER);
}

static Folder*
S_init_folder(Obj *index, bool create) {
    Folder *folder = NULL;

    // Validate or acquire a Folder.
    if (Obj_is_a(index, FOLDER)) {
        folder = (Folder*)INCREF(index);
    }
    else if (Obj_is_a(index, STRING)) {
        folder = (Folder*)FSFolder_new((String*)index);
    }
    else {
        THROW(ERR, "Invalid type for 'index': %o", Obj_get_class_name(index));
    }

    // Validate or create the index directory.
    if (create) {
        Folder_Initialize(folder);
    }
    else {
        if (!Folder_Check(folder)) {
            THROW(ERR, "Folder '%o' failed check", Folder_Get_Path(folder));
        }
    }

    return folder;
}

void
Indexer_Add_Doc_IMP(Indexer *self, Doc *doc, float boost) {
    IndexerIVARS *const ivars = Indexer_IVARS(self);
    SegWriter_Add_Doc(ivars->seg_writer, doc, boost);
}

void
Indexer_Delete_By_Term_IMP(Indexer *self, String *field, Obj *term) {
    IndexerIVARS *const ivars = Indexer_IVARS(self);
    Schema    *schema = ivars->schema;
    FieldType *type   = Schema_Fetch_Type(schema, field);

    // Raise exception if the field isn't indexed.
    if (!type || !FType_Indexed(type)) {
        THROW(ERR, "%o is not an indexed field", field);
    }

    // Analyze term if appropriate, then zap.
    if (FType_is_a(type, FULLTEXTTYPE)) {
        CERTIFY(term, STRING);
        Analyzer *analyzer = Schema_Fetch_Analyzer(schema, field);
        Vector *terms = Analyzer_Split(analyzer, (String*)term);
        Obj *analyzed_term = Vec_Fetch(terms, 0);
        if (analyzed_term) {
            DelWriter_Delete_By_Term(ivars->del_writer, field,
                                     analyzed_term);
        }
        DECREF(terms);
    }
    else {
        DelWriter_Delete_By_Term(ivars->del_writer, field, term);
    }
}

void
Indexer_Delete_By_Query_IMP(Indexer *self, Query *query) {
    IndexerIVARS *const ivars = Indexer_IVARS(self);
    DelWriter_Delete_By_Query(ivars->del_writer, query);
}

void
Indexer_Delete_By_Doc_ID_IMP(Indexer *self, int32_t doc_id) {
    IndexerIVARS *const ivars = Indexer_IVARS(self);
    DelWriter_Delete_By_Doc_ID(ivars->del_writer, doc_id);
}

void
Indexer_Add_Index_IMP(Indexer *self, Obj *index) {
    IndexerIVARS *const ivars = Indexer_IVARS(self);
    Folder *other_folder = NULL;
    IndexReader *reader  = NULL;

    if (Obj_is_a(index, FOLDER)) {
        other_folder = (Folder*)INCREF(index);
    }
    else if (Obj_is_a(index, STRING)) {
        other_folder = (Folder*)FSFolder_new((String*)index);
    }
    else {
        THROW(ERR, "Invalid type for 'index': %o", Obj_get_class_name(index));
    }

    reader = IxReader_open((Obj*)other_folder, NULL, NULL);
    if (reader == NULL) {
        THROW(ERR, "Index doesn't seem to contain any data");
    }
    else {
        Schema *schema       = ivars->schema;
        Schema *other_schema = IxReader_Get_Schema(reader);
        Vector *other_fields = Schema_All_Fields(other_schema);
        Vector *seg_readers  = IxReader_Seg_Readers(reader);

        // Validate schema compatibility and add fields.
        Schema_Eat(schema, other_schema);

        // Add fields to Segment.
        for (size_t i = 0, max = Vec_Get_Size(other_fields); i < max; i++) {
            String *other_field = (String*)Vec_Fetch(other_fields, i);
            Seg_Add_Field(ivars->segment, other_field);
        }
        DECREF(other_fields);

        // Add all segments.
        for (size_t i = 0, max = Vec_Get_Size(seg_readers); i < max; i++) {
            SegReader *seg_reader = (SegReader*)Vec_Fetch(seg_readers, i);
            DeletionsReader *del_reader
                = (DeletionsReader*)SegReader_Fetch(
                      seg_reader, Class_Get_Name(DELETIONSREADER));
            Matcher *deletions = del_reader
                                 ? DelReader_Iterator(del_reader)
                                 : NULL;
            I32Array *doc_map = DelWriter_Generate_Doc_Map(
                                    ivars->del_writer, deletions,
                                    SegReader_Doc_Max(seg_reader),
                                    (int32_t)Seg_Get_Count(ivars->segment));
            SegWriter_Add_Segment(ivars->seg_writer, seg_reader, doc_map);
            DECREF(deletions);
            DECREF(doc_map);
        }
        DECREF(seg_readers);
    }

    DECREF(reader);
    DECREF(other_folder);
}

void
Indexer_Optimize_IMP(Indexer *self) {
    Indexer_IVARS(self)->optimize = true;
}

static String*
S_find_schema_file(Snapshot *snapshot) {
    Vector *files = Snapshot_List(snapshot);
    String *retval = NULL;
    for (size_t i = 0, max = Vec_Get_Size(files); i < max; i++) {
        String *file = (String*)Vec_Fetch(files, i);
        if (Str_Starts_With_Utf8(file, "schema_", 7)
            && Str_Ends_With_Utf8(file, ".json", 5)
           ) {
            retval = file;
            break;
        }
    }
    DECREF(files);
    return retval;
}

static bool
S_maybe_merge(Indexer *self, Vector *seg_readers) {
    IndexerIVARS *const ivars = Indexer_IVARS(self);
    bool      merge_happened  = false;
    size_t    num_seg_readers = Vec_Get_Size(seg_readers);
    Lock     *merge_lock      = IxManager_Make_Merge_Lock(ivars->manager);
    bool      got_merge_lock  = Lock_Obtain(merge_lock);
    int64_t   cutoff;

    if (got_merge_lock) {
        ivars->merge_lock = merge_lock;
        cutoff = 0;
    }
    else {
        // If something else holds the merge lock, don't interfere.
        Hash *merge_data = IxManager_Read_Merge_Data(ivars->manager);
        if (merge_data) {
            Obj *cutoff_obj = Hash_Fetch_Utf8(merge_data, "cutoff", 6);
            if (cutoff_obj) {
                cutoff = Json_obj_to_i64(cutoff_obj);
            }
            else {
                cutoff = INT64_MAX;
            }
            DECREF(merge_data);
        }
        else {
            cutoff = INT64_MAX;
        }
        DECREF(merge_lock);
    }

    // Get a list of segments to recycle.  Validate and confirm that there are
    // no dupes in the list.
    Vector *to_merge = IxManager_Recycle(ivars->manager, ivars->polyreader,
                                         ivars->del_writer, cutoff, ivars->optimize);

    Hash *seen = Hash_new(Vec_Get_Size(to_merge));
    for (size_t i = 0, max = Vec_Get_Size(to_merge); i < max; i++) {
        SegReader *seg_reader
            = (SegReader*)CERTIFY(Vec_Fetch(to_merge, i), SEGREADER);
        String *seg_name = SegReader_Get_Seg_Name(seg_reader);
        if (Hash_Fetch(seen, seg_name)) {
            DECREF(seen);
            DECREF(to_merge);
            THROW(ERR, "Recycle() tried to merge segment '%o' twice",
                  seg_name);
        }
        Hash_Store(seen, seg_name, (Obj*)CFISH_TRUE);
    }
    DECREF(seen);

    // Consolidate segments if either sparse or optimizing forced.
    for (size_t i = 0, max = Vec_Get_Size(to_merge); i < max; i++) {
        SegReader *seg_reader = (SegReader*)Vec_Fetch(to_merge, i);
        int64_t seg_num = SegReader_Get_Seg_Num(seg_reader);
        Matcher *deletions
            = DelWriter_Seg_Deletions(ivars->del_writer, seg_reader);
        I32Array *doc_map = DelWriter_Generate_Doc_Map(
                                ivars->del_writer, deletions,
                                SegReader_Doc_Max(seg_reader),
                                (int32_t)Seg_Get_Count(ivars->segment));
        if (seg_num <= cutoff) {
            THROW(ERR, "Segment %o violates cutoff (%i64 <= %i64)",
                  SegReader_Get_Seg_Name(seg_reader), seg_num, cutoff);
        }
        SegWriter_Merge_Segment(ivars->seg_writer, seg_reader, doc_map);
        merge_happened = true;
        DECREF(deletions);
        DECREF(doc_map);
    }

    // Write out new deletions.
    if (DelWriter_Updated(ivars->del_writer)) {
        // Only write out if they haven't all been applied.
        if (Vec_Get_Size(to_merge) != num_seg_readers) {
            DelWriter_Finish(ivars->del_writer);
        }
    }

    DECREF(to_merge);
    return merge_happened;
}

void
Indexer_Prepare_Commit_IMP(Indexer *self) {
    IndexerIVARS *const ivars = Indexer_IVARS(self);
    Vector   *seg_readers     = PolyReader_Get_Seg_Readers(ivars->polyreader);
    size_t    num_seg_readers = Vec_Get_Size(seg_readers);
    bool      merge_happened  = false;

    if (!ivars->write_lock || ivars->prepared) {
        THROW(ERR, "Can't call Prepare_Commit() more than once");
    }

    // Merge existing index data.
    if (num_seg_readers) {
        merge_happened = S_maybe_merge(self, seg_readers);
    }

    // Add a new segment and write a new snapshot file if...
    if (Seg_Get_Count(ivars->segment)             // Docs/segs added.
        || merge_happened                        // Some segs merged.
        || !Snapshot_Num_Entries(ivars->snapshot) // Initializing index.
        || DelWriter_Updated(ivars->del_writer)
       ) {
        Folder   *folder   = ivars->folder;
        Schema   *schema   = ivars->schema;
        Snapshot *snapshot = ivars->snapshot;

        // Derive snapshot and schema file names.
        DECREF(ivars->snapfile);
        String *snapfile = IxManager_Make_Snapshot_Filename(ivars->manager);
        ivars->snapfile = Str_Cat_Trusted_Utf8(snapfile, ".temp", 5);
        DECREF(snapfile);
        uint64_t schema_gen = IxFileNames_extract_gen(ivars->snapfile);
        char base36[StrHelp_MAX_BASE36_BYTES];
        StrHelp_to_base36(schema_gen, &base36);
        String *new_schema_name = Str_newf("schema_%s.json", base36);

        // Finish the segment, write schema file.
        SegWriter_Finish(ivars->seg_writer);
        Schema_Write(schema, folder, new_schema_name);
        String *old_schema_name = S_find_schema_file(snapshot);
        if (old_schema_name) {
            Snapshot_Delete_Entry(snapshot, old_schema_name);
        }
        Snapshot_Add_Entry(snapshot, new_schema_name);
        DECREF(new_schema_name);

        // Write temporary snapshot file.
        Folder_Delete(folder, ivars->snapfile);
        Snapshot_Write_File(snapshot, folder, ivars->snapfile);

        ivars->needs_commit = true;
    }

    // Close reader, so that we can delete its files if appropriate.
    PolyReader_Close(ivars->polyreader);

    ivars->prepared = true;
}

void
Indexer_Commit_IMP(Indexer *self) {
    IndexerIVARS *const ivars = Indexer_IVARS(self);

    // Safety check.
    if (!ivars->write_lock) {
        THROW(ERR, "Can't call commit() more than once");
    }

    if (!ivars->prepared) {
        Indexer_Prepare_Commit(self);
    }

    if (ivars->needs_commit) {
        bool success;

        // Rename temp snapshot file.
        String *temp_snapfile = ivars->snapfile;
        size_t ext_len      = sizeof(".temp") - 1;
        size_t snapfile_len = Str_Length(temp_snapfile);
        if (snapfile_len <= ext_len) {
            THROW(ERR, "Invalid snapfile name: %o", temp_snapfile);
        }
        ivars->snapfile = Str_SubString(temp_snapfile, 0,
                                       snapfile_len - ext_len);
        Snapshot_Set_Path(ivars->snapshot, ivars->snapfile);
        success = Folder_Rename(ivars->folder, temp_snapfile, ivars->snapfile);
        DECREF(temp_snapfile);
        if (!success) { RETHROW(INCREF(Err_get_error())); }

        // Purge obsolete files.
        FilePurger_Purge(ivars->file_purger);
    }

    // Release locks, invalidating the Indexer.
    S_release_merge_lock(self);
    S_release_write_lock(self);
}

Schema*
Indexer_Get_Schema_IMP(Indexer *self) {
    return Indexer_IVARS(self)->schema;
}

SegWriter*
Indexer_Get_Seg_Writer_IMP(Indexer *self) {
    return Indexer_IVARS(self)->seg_writer;
}

Doc*
Indexer_Get_Stock_Doc_IMP(Indexer *self) {
    return Indexer_IVARS(self)->stock_doc;
}

static void
S_release_write_lock(Indexer *self) {
    IndexerIVARS *const ivars = Indexer_IVARS(self);
    if (ivars->write_lock) {
        Lock_Release(ivars->write_lock);
        DECREF(ivars->write_lock);
        ivars->write_lock = NULL;
    }
}

static void
S_release_merge_lock(Indexer *self) {
    IndexerIVARS *const ivars = Indexer_IVARS(self);
    if (ivars->merge_lock) {
        Lock_Release(ivars->merge_lock);
        DECREF(ivars->merge_lock);
        ivars->merge_lock = NULL;
    }
}