#define C_KINO_INDEXER
#include "KinoSearch/Util/ToolSet.h"
#include "KinoSearch/Index/Indexer.h"
#include "KinoSearch/Analysis/Analyzer.h"
#include "KinoSearch/Document/Doc.h"
#include "KinoSearch/Plan/FieldType.h"
#include "KinoSearch/Plan/FullTextType.h"
#include "KinoSearch/Plan/Schema.h"
#include "KinoSearch/Index/DeletionsReader.h"
#include "KinoSearch/Index/DeletionsWriter.h"
#include "KinoSearch/Index/FilePurger.h"
#include "KinoSearch/Index/IndexManager.h"
#include "KinoSearch/Index/PolyReader.h"
#include "KinoSearch/Index/Segment.h"
#include "KinoSearch/Index/SegReader.h"
#include "KinoSearch/Index/Snapshot.h"
#include "KinoSearch/Index/SegWriter.h"
#include "KinoSearch/Plan/Architecture.h"
#include "KinoSearch/Search/Matcher.h"
#include "KinoSearch/Search/Query.h"
#include "KinoSearch/Store/Folder.h"
#include "KinoSearch/Store/FSFolder.h"
#include "KinoSearch/Store/Lock.h"
#include "KinoSearch/Util/IndexFileNames.h"
#include "KinoSearch/Util/Json.h"
int32_t Indexer_CREATE = 0x00000001;
int32_t Indexer_TRUNCATE = 0x00000002;
static
void
S_release_write_lock(Indexer *self);
static
void
S_release_merge_lock(Indexer *self);
static
Folder*
S_init_folder(Obj *index, bool_t create);
static
CharBuf*
S_find_schema_file(Snapshot *snapshot);
Indexer*
Indexer_new(Schema *schema, Obj *index, IndexManager *manager, int32_t flags)
{
Indexer *self = (Indexer*)VTable_Make_Obj(INDEXER);
return
Indexer_init(self, schema, index, manager, flags);
}
Indexer*
Indexer_init(Indexer *self, Schema *schema, Obj *index,
IndexManager *manager, int32_t flags)
{
bool_t create = (flags & Indexer_CREATE) ?
true
:
false
;
bool_t truncate = (flags & Indexer_TRUNCATE) ?
true
:
false
;
Folder *folder = S_init_folder(index, create);
Lock *write_lock;
CharBuf *latest_snapfile;
Snapshot *latest_snapshot = Snapshot_new();
self->stock_doc = Doc_new(NULL, 0);
self->truncate =
false
;
self->optimize =
false
;
self->prepared =
false
;
self->needs_commit =
false
;
self->snapfile = NULL;
self->merge_lock = NULL;
self->folder = folder;
self->manager = manager
? (IndexManager*)INCREF(manager)
: IxManager_new(NULL, NULL);
IxManager_Set_Folder(self->manager, folder);
write_lock = IxManager_Make_Write_Lock(self->manager);
Lock_Clear_Stale(write_lock);
if
(Lock_Obtain(write_lock)) {
self->write_lock = write_lock;
}
else
{
DECREF(write_lock);
DECREF(self);
RETHROW(INCREF(Err_get_error()));
}
latest_snapfile = IxFileNames_latest_snapshot(folder);
if
(latest_snapfile) {
Snapshot_Read_File(latest_snapshot, folder, latest_snapfile);
}
if
(schema) {
self->schema = (Schema*)INCREF(schema);
}
else
{
if
(!latest_snapfile) {
THROW(ERR,
"No Schema supplied, and can't find one in the index"
);
}
else
{
CharBuf *schema_file = S_find_schema_file(latest_snapshot);
Hash *dump = (Hash*)Json_slurp_json(folder, schema_file);
if
(dump) {
self->schema = (Schema*)CERTIFY(
VTable_Load_Obj(SCHEMA, (Obj*)dump), SCHEMA);
schema = self->schema;
DECREF(dump);
schema_file = NULL;
}
else
{
THROW(ERR,
"Failed to parse %o"
, schema_file);
}
}
}
if
(truncate) {
self->snapshot = Snapshot_new();
self->polyreader = PolyReader_new(schema, folder, NULL, NULL, NULL);
self->truncate =
true
;
}
else
{
self->snapshot = (Snapshot*)INCREF(latest_snapshot);
self->polyreader = latest_snapfile
? PolyReader_open((Obj*)folder, NULL, NULL)
: PolyReader_new(schema, folder, NULL, NULL, NULL);
if
(latest_snapfile) {
Schema *old_schema = PolyReader_Get_Schema(self->polyreader);
Schema_Eat(schema, old_schema);
}
}
{
FilePurger *file_purger
= FilePurger_new(folder, latest_snapshot, self->manager);
FilePurger_Purge(file_purger);
DECREF(file_purger);
}
{
int64_t new_seg_num
= IxManager_Highest_Seg_Num(self->manager, latest_snapshot) + 1;
Lock *merge_lock = IxManager_Make_Merge_Lock(self->manager);
uint32_t i, max;
if
(Lock_Is_Locked(merge_lock)) {
Hash *merge_data = IxManager_Read_Merge_Data(self->manager);
Obj *cutoff_obj = merge_data
? Hash_Fetch_Str(merge_data,
"cutoff"
, 6)
: NULL;
if
(!cutoff_obj) {
DECREF(merge_lock);
DECREF(merge_data);
THROW(ERR,
"Background merge detected, but can't read merge data"
);
}
else
{
int64_t cutoff = Obj_To_I64(cutoff_obj);
if
(cutoff >= new_seg_num) {
new_seg_num = cutoff + 1;
}
}
DECREF(merge_data);
}
self->segment = Seg_new(new_seg_num);
{
VArray *fields = Schema_All_Fields(schema);
for
(i = 0, max = VA_Get_Size(fields); i < max; i++) {
Seg_Add_Field(self->segment, (CharBuf*)VA_Fetch(fields, i));
}
DECREF(fields);
}
DECREF(merge_lock);
}
self->file_purger
= FilePurger_new(folder, self->snapshot, self->manager);
self->seg_writer = SegWriter_new(self->schema, self->snapshot,
self->segment, self->polyreader);
SegWriter_Prep_Seg_Dir(self->seg_writer);
self->del_writer = (DeletionsWriter*)INCREF(
SegWriter_Get_Del_Writer(self->seg_writer));
DECREF(latest_snapfile);
DECREF(latest_snapshot);
return
self;
}
void
Indexer_destroy(Indexer *self)
{
S_release_merge_lock(self);
S_release_write_lock(self);
DECREF(self->schema);
DECREF(self->folder);
DECREF(self->segment);
DECREF(self->manager);
DECREF(self->stock_doc);
DECREF(self->polyreader);
DECREF(self->del_writer);
DECREF(self->snapshot);
DECREF(self->seg_writer);
DECREF(self->file_purger);
DECREF(self->write_lock);
DECREF(self->snapfile);
SUPER_DESTROY(self, INDEXER);
}
static
Folder*
S_init_folder(Obj *index, bool_t create)
{
Folder *folder = NULL;
if
(Obj_Is_A(index, FOLDER)) {
folder = (Folder*)INCREF(index);
}
else
if
(Obj_Is_A(index, CHARBUF)) {
folder = (Folder*)FSFolder_new((CharBuf*)index);
}
else
{
THROW(ERR,
"Invalid type for 'index': %o"
, Obj_Get_Class_Name(index));
}
if
(create) {
Folder_Initialize(folder);
}
else
{
if
(!Folder_Check(folder)) {
THROW(ERR,
"Folder '%o' failed check"
, Folder_Get_Path(folder));
}
}
return
folder;
}
void
Indexer_add_doc(Indexer *self, Doc *doc,
float
boost)
{
SegWriter_Add_Doc(self->seg_writer, doc, boost);
}
void
Indexer_delete_by_term(Indexer *self, CharBuf *field, Obj *term)
{
Schema *schema = self->schema;
FieldType *type = Schema_Fetch_Type(schema, field);
if
(!type || !FType_Indexed(type))
THROW(ERR,
"%o is not an indexed field"
, field);
if
(FType_Is_A(type, FULLTEXTTYPE)) {
CERTIFY(term, CHARBUF);
{
Analyzer *analyzer = Schema_Fetch_Analyzer(schema, field);
VArray *terms = Analyzer_Split(analyzer, (CharBuf*)term);
Obj *analyzed_term = VA_Fetch(terms, 0);
if
(analyzed_term) {
DelWriter_Delete_By_Term(self->del_writer, field,
analyzed_term);
}
DECREF(terms);
}
}
else
{
DelWriter_Delete_By_Term(self->del_writer, field, term);
}
}
void
Indexer_delete_by_query(Indexer *self, Query *query)
{
DelWriter_Delete_By_Query(self->del_writer, query);
}
void
Indexer_add_index(Indexer *self, Obj *index)
{
Folder *other_folder = NULL;
IndexReader *reader = NULL;
if
(Obj_Is_A(index, FOLDER)) {
other_folder = (Folder*)INCREF(index);
}
else
if
(Obj_Is_A(index, CHARBUF)) {
other_folder = (Folder*)FSFolder_new((CharBuf*)index);
}
else
{
THROW(ERR,
"Invalid type for 'index': %o"
, Obj_Get_Class_Name(index));
}
reader = IxReader_open((Obj*)other_folder, NULL, NULL);
if
(reader == NULL) {
THROW(ERR,
"Index doesn't seem to contain any data"
);
}
else
{
Schema *schema = self->schema;
Schema *other_schema = IxReader_Get_Schema(reader);
VArray *other_fields = Schema_All_Fields(other_schema);
VArray *seg_readers = IxReader_Seg_Readers(reader);
uint32_t i, max;
Schema_Eat(schema, other_schema);
for
(i = 0, max = VA_Get_Size(other_fields); i < max; i++) {
CharBuf *other_field = (CharBuf*)VA_Fetch(other_fields, i);
Seg_Add_Field(self->segment, other_field);
}
DECREF(other_fields);
for
(i = 0, max = VA_Get_Size(seg_readers); i < max; i++) {
SegReader *seg_reader = (SegReader*)VA_Fetch(seg_readers, i);
DeletionsReader *del_reader = (DeletionsReader*)SegReader_Fetch(
seg_reader, VTable_Get_Name(DELETIONSREADER));
Matcher *deletions = del_reader
? DelReader_Iterator(del_reader)
: NULL;
I32Array *doc_map = DelWriter_Generate_Doc_Map(self->del_writer,
deletions, SegReader_Doc_Max(seg_reader),
(int32_t)Seg_Get_Count(self->segment)
);
SegWriter_Add_Segment(self->seg_writer, seg_reader, doc_map);
DECREF(deletions);
DECREF(doc_map);
}
DECREF(seg_readers);
}
DECREF(reader);
DECREF(other_folder);
}
void
Indexer_optimize(Indexer *self)
{
self->optimize =
true
;
}
static
CharBuf*
S_find_schema_file(Snapshot *snapshot)
{
VArray *files = Snapshot_List(snapshot);
uint32_t i, max;
CharBuf *retval = NULL;
for
(i = 0, max = VA_Get_Size(files); i < max; i++) {
CharBuf *file = (CharBuf*)VA_Fetch(files, i);
if
( CB_Starts_With_Str(file,
"schema_"
, 7)
&& CB_Ends_With_Str(file,
".json"
, 5)
) {
retval = file;
break
;
}
}
DECREF(files);
return
retval;
}
static
bool_t
S_maybe_merge(Indexer *self, VArray *seg_readers)
{
bool_t merge_happened =
false
;
uint32_t num_seg_readers = VA_Get_Size(seg_readers);
Lock *merge_lock = IxManager_Make_Merge_Lock(self->manager);
bool_t got_merge_lock = Lock_Obtain(merge_lock);
int64_t cutoff;
VArray *to_merge;
uint32_t i, max;
if
(got_merge_lock) {
self->merge_lock = merge_lock;
cutoff = 0;
}
else
{
Hash *merge_data = IxManager_Read_Merge_Data(self->manager);
if
(merge_data) {
Obj *cutoff_obj = Hash_Fetch_Str(merge_data,
"cutoff"
, 6);
if
(cutoff_obj) {
cutoff = Obj_To_I64(cutoff_obj);
}
else
{
cutoff = I64_MAX;
}
DECREF(merge_data);
}
else
{
cutoff = I64_MAX;
}
DECREF(merge_lock);
}
to_merge = IxManager_Recycle(self->manager, self->polyreader,
self->del_writer, cutoff, self->optimize);
{
Hash *seen = Hash_new(VA_Get_Size(to_merge));
for
(i = 0, max = VA_Get_Size(to_merge); i < max; i++) {
SegReader *seg_reader = (SegReader*)CERTIFY(
VA_Fetch(to_merge, i), SEGREADER);
CharBuf *seg_name = SegReader_Get_Seg_Name(seg_reader);
if
(Hash_Fetch(seen, (Obj*)seg_name)) {
DECREF(seen);
DECREF(to_merge);
THROW(ERR,
"Recycle() tried to merge segment '%o' twice"
,
seg_name);
}
Hash_Store(seen, (Obj*)seg_name, INCREF(&EMPTY));
}
DECREF(seen);
}
for
(i = 0, max = VA_Get_Size(to_merge); i < max; i++) {
SegReader *seg_reader = (SegReader*)VA_Fetch(to_merge, i);
int64_t seg_num = SegReader_Get_Seg_Num(seg_reader);
Matcher *deletions
= DelWriter_Seg_Deletions(self->del_writer, seg_reader);
I32Array *doc_map = DelWriter_Generate_Doc_Map(self->del_writer,
deletions, SegReader_Doc_Max(seg_reader),
(int32_t)Seg_Get_Count(self->segment)
);
if
(seg_num <= cutoff) {
THROW(ERR,
"Segment %o violates cutoff (%i64 <= %i64)"
,
SegReader_Get_Seg_Name(seg_reader), seg_num, cutoff);
}
SegWriter_Merge_Segment(self->seg_writer, seg_reader, doc_map);
merge_happened =
true
;
DECREF(deletions);
DECREF(doc_map);
}
if
(DelWriter_Updated(self->del_writer)) {
if
(VA_Get_Size(to_merge) != num_seg_readers) {
DelWriter_Finish(self->del_writer);
}
}
DECREF(to_merge);
return
merge_happened;
}
void
Indexer_prepare_commit(Indexer *self)
{
VArray *seg_readers = PolyReader_Get_Seg_Readers(self->polyreader);
uint32_t num_seg_readers = VA_Get_Size(seg_readers);
bool_t merge_happened =
false
;
if
( !self->write_lock || self->prepared ) {
THROW(ERR,
"Can't call Prepare_Commit() more than once"
);
}
if
(num_seg_readers) {
merge_happened = S_maybe_merge(self, seg_readers);
}
if
( Seg_Get_Count(self->segment)
|| merge_happened
|| !Snapshot_Num_Entries(self->snapshot)
|| DelWriter_Updated(self->del_writer)
) {
Folder *folder = self->folder;
Schema *schema = self->schema;
Snapshot *snapshot = self->snapshot;
CharBuf *old_schema_name = S_find_schema_file(snapshot);
uint64_t schema_gen = old_schema_name
? IxFileNames_extract_gen(old_schema_name) + 1
: 1;
char
base36[StrHelp_MAX_BASE36_BYTES];
CharBuf *new_schema_name;
StrHelp_to_base36(schema_gen, &base36);
new_schema_name = CB_newf(
"schema_%s.json"
, base36);
SegWriter_Finish(self->seg_writer);
Schema_Write(schema, folder, new_schema_name);
if
(old_schema_name) {
Snapshot_Delete_Entry(snapshot, old_schema_name);
}
Snapshot_Add_Entry(snapshot, new_schema_name);
DECREF(new_schema_name);
DECREF(self->snapfile);
self->snapfile = IxManager_Make_Snapshot_Filename(self->manager);
CB_Cat_Trusted_Str(self->snapfile,
".temp"
, 5);
Folder_Delete(folder, self->snapfile);
Snapshot_Write_File(snapshot, folder, self->snapfile);
self->needs_commit =
true
;
}
PolyReader_Close(self->polyreader);
self->prepared =
true
;
}
void
Indexer_commit(Indexer *self)
{
if
( !self->write_lock ) {
THROW(ERR,
"Can't call commit() more than once"
);
}
if
(!self->prepared) {
Indexer_Prepare_Commit(self);
}
if
(self->needs_commit) {
bool_t success;
CharBuf *temp_snapfile = CB_Clone(self->snapfile);
CB_Chop(self->snapfile,
sizeof
(
".temp"
) - 1);
Snapshot_Set_Path(self->snapshot, self->snapfile);
success = Folder_Rename(self->folder, temp_snapfile, self->snapfile);
DECREF(temp_snapfile);
if
(!success) { RETHROW(INCREF(Err_get_error())); }
FilePurger_Purge(self->file_purger);
}
S_release_merge_lock(self);
S_release_write_lock(self);
}
SegWriter*
Indexer_get_seg_writer(Indexer *self) {
return
self->seg_writer; }
Doc*
Indexer_get_stock_doc(Indexer *self) {
return
self->stock_doc; }
static
void
S_release_write_lock(Indexer *self)
{
if
(self->write_lock) {
Lock_Release(self->write_lock);
DECREF(self->write_lock);
self->write_lock = NULL;
}
}
static
void
S_release_merge_lock(Indexer *self)
{
if
(self->merge_lock) {
Lock_Release(self->merge_lock);
DECREF(self->merge_lock);
self->merge_lock = NULL;
}
}