因为项目开发需要在游戏内部实现玩家名称的模糊查找功能,本身直接使用Sphinx配置mysql可以直接搭建一套模糊匹配的即可支持功能的实现。但是因为目前公司游戏都使用是tcaplus作为存储引擎,本着不想在项目中额外的引入mysql环境,因此准备参考Sphinx的源码自己实现或者在源码基础上做二次开发实现一套类似的模糊匹配服务功能,因此此笔记主要是自己在理解Sphinx源码记录,方便以后回顾总结。 参考Sphinx源码的版本号为sphinx-2.3.2-beta。
索引创建主要源码是 indexer.cpp文件,源码的学习与理解只关注关键重点方面的代码,因此忽略掉一部分个人认为不是很重要的代码,文中对代码的理解属于个人的理解可能存在偏差误解,如遇这方面的问题欢迎指出讨论。
//
CSphConfigParser cp;
CSphConfig & hConf = cp.m_tConf;
sOptConfig = sphLoadConfig ( sOptConfig, g_bQuiet, cp );
if ( !hConf ( "source" ) )
sphDie ( "no indexes found in config file '%s'", sOptConfig );
sphCheckDuplicatePaths ( hConf );
if ( hConf("indexer") && hConf["indexer"]("indexer") )
{
CSphConfigSection & hIndexer = hConf["indexer"]["indexer"];
g_iMemLimit = hIndexer.GetSize ( "mem_limit", g_iMemLimit );
g_iMaxXmlpipe2Field = hIndexer.GetSize ( "max_xmlpipe2_field", 2*1024*1024 );
g_iWriteBuffer = hIndexer.GetSize ( "write_buffer", 1024*1024 );
g_iMaxFileFieldBuffer = Max ( 1024*1024, hIndexer.GetSize ( "max_file_field_buffer", 8*1024*1024 ) );
if ( hIndexer("on_file_field_error") )
{
const CSphString & sVal = hIndexer["on_file_field_error"].strval();
if ( sVal=="ignore_field" )
g_eOnFileFieldError = FFE_IGNORE_FIELD;
else if ( sVal=="skip_document" )
g_eOnFileFieldError = FFE_SKIP_DOCUMENT;
else if ( sVal=="fail_index" )
g_eOnFileFieldError = FFE_FAIL_INDEX;
else
sphDie ( "unknown on_field_field_error value (must be one of ignore_field, skip_document, fail_index)" );
}
bool bJsonStrict = false;
bool bJsonAutoconvNumbers = false;
bool bJsonKeynamesToLowercase = false;
if ( hIndexer("on_json_attr_error") )
{
const CSphString & sVal = hIndexer["on_json_attr_error"].strval();
if ( sVal=="ignore_attr" )
bJsonStrict = false;
else if ( sVal=="fail_index" )
bJsonStrict = true;
else
sphDie ( "unknown on_json_attr_error value (must be one of ignore_attr, fail_index)" );
}
if ( hIndexer("json_autoconv_keynames") )
{
const CSphString & sVal = hIndexer["json_autoconv_keynames"].strval();
if ( sVal=="lowercase" )
bJsonKeynamesToLowercase = true;
else
sphDie ( "unknown json_autoconv_keynames value (must be 'lowercase')" );
}
bJsonAutoconvNumbers = ( hIndexer.GetInt ( "json_autoconv_numbers", 0 )!=0 );
sphSetJsonOptions ( bJsonStrict, bJsonAutoconvNumbers, bJsonKeynamesToLowercase );
sphSetThrottling ( hIndexer.GetInt ( "max_iops", 0 ), hIndexer.GetSize ( "max_iosize", 0 ) );
sphAotSetCacheSize ( hIndexer.GetSize ( "lemmatizer_cache", 262144 ) );
}
// simple config file
class CSphConfigParser
{
public:
CSphConfig m_tConf;
protected:
CSphString m_sFileName;
int m_iLine;
CSphString m_sSectionType;
CSphString m_sSectionName;
char m_sError [ 1024 ];
int m_iWarnings;
static const int WARNS_THRESH = 5;
};
/// config section type (hash of sections)
typedef SmallStringHash_T < CSphConfigSection > CSphConfigType;
/// config (hash of section types)
typedef SmallStringHash_T < CSphConfigType > CSphConfig;
/// small hash with string keys
template < typename T >
class SmallStringHash_T : public CSphOrderedHash < T, CSphString, CSphStrHashFunc, 256 > {};
/// config section (hash of variant values)
class CSphConfigSection : public SmallStringHash_T < CSphVariant >
int iIndexed = 0;
int iFailed = 0;
if ( bMerge )
{
if ( dIndexes.GetLength()!=2 )
sphDie ( "there must be 2 indexes to merge specified" );
if ( !hConf["index"](dIndexes[0]) )
sphDie ( "no merge destination index '%s'", dIndexes[0] );
if ( !hConf["index"](dIndexes[1]) )
sphDie ( "no merge source index '%s'", dIndexes[1] );
bool bLastOk = DoMerge (
hConf["index"][dIndexes[0]], dIndexes[0],
hConf["index"][dIndexes[1]], dIndexes[1], dMergeDstFilters, g_bRotate, bMergeKillLists );
if ( bLastOk )
iIndexed++;
else
iFailed++;
} else if ( bIndexAll )
{
uint64_t tmRotated = sphMicroTimer();
hConf["index"].IterateStart ();
while ( hConf["index"].IterateNext() )
{
bool bLastOk = DoIndex ( hConf["index"].IterateGet (), hConf["index"].IterateGetKey().cstr(), hConf["source"], bVerbose, fpDumpRows );
if ( bLastOk && ( sphMicroTimer() - tmRotated > ROTATE_MIN_INTERVAL ) && g_bSendHUP && SendRotate ( hConf, false ) )
tmRotated = sphMicroTimer();
if ( bLastOk )
iIndexed++;
}
} else
{
uint64_t tmRotated = sphMicroTimer();
ARRAY_FOREACH ( j, dIndexes )
{
if ( !hConf["index"](dIndexes[j]) )
fprintf ( stdout, "WARNING: no such index '%s', skipping.\n", dIndexes[j] );
else
{
bool bLastOk = DoIndex ( hConf["index"][dIndexes[j]], dIndexes[j], hConf["source"], bVerbose, fpDumpRows );
if ( bLastOk && ( sphMicroTimer() - tmRotated > ROTATE_MIN_INTERVAL ) && g_bSendHUP && SendRotate ( hConf, false ) )
tmRotated = sphMicroTimer();
if ( bLastOk )
iIndexed++;
else
iFailed++;
}
}
}
CSphTokenizerSettings tTokSettings;
sphConfTokenizer ( hIndex, tTokSettings );
CSphDictSettings tDictSettings;
sphConfDictionary ( hIndex, tDictSettings );
ISphTokenizer * pTokenizer = ISphTokenizer::Create ( tTokSettings, NULL, sError );
if ( !pTokenizer )
sphDie ( "index '%s': %s", sIndexName, sError.cstr() );
// enable sentence indexing on tokenizer
// (not in Create() because search time tokenizer does not care)
bool bIndexSP = ( hIndex.GetInt ( "index_sp" )!=0 );
if ( bIndexSP )
if ( !pTokenizer->EnableSentenceIndexing ( sError ) )
sphDie ( "index '%s': %s", sIndexName, sError.cstr() );
if ( hIndex("index_zones") )
if ( !pTokenizer->EnableZoneIndexing ( sError ) )
sphDie ( "index '%s': %s", sIndexName, sError.cstr() );
// parse all sources
CSphVector<CSphSource*> dSources;
bool bGotAttrs = false;
bool bSpawnFailed = false;
for ( CSphVariant * pSourceName = hIndex("source"); pSourceName; pSourceName = pSourceName->m_pNext )
{
if ( !hSources ( pSourceName->cstr() ) )
{
fprintf ( stdout, "ERROR: index '%s': source '%s' not found.\n", sIndexName, pSourceName->cstr() );
continue;
}
const CSphConfigSection & hSource = hSources [ pSourceName->cstr() ];
CSphSource * pSource = SpawnSource ( hSource, pSourceName->cstr(), tSettings.m_eChineseRLP==SPH_RLP_BATCHED );
if ( !pSource )
{
bSpawnFailed = true;
continue;
}
if ( pSource->HasAttrsConfigured() )
bGotAttrs = true;
if ( bHtmlStrip )
{
if ( !pSource->SetStripHTML ( sHtmlIndexAttrs.cstr(), sHtmlRemoveElements.cstr(), bIndexSP, hIndex.GetStr("index_zones"), sError ) )
{
fprintf ( stdout, "ERROR: source '%s': %s.\n", pSourceName->cstr(), sError.cstr() );
return false;
}
}
pSource->SetTokenizer ( pTokenizer );
pSource->SetFieldFilter ( pFieldFilter );
pSource->SetDumpRows ( fpDumpRows );
dSources.Add ( pSource );
}
// if searchd is running, we want to reindex to .tmp files
CSphString sIndexPath;
sIndexPath.SetSprintf ( g_bRotate ? "%s.tmp" : "%s", hIndex["path"].cstr() );
// do index
CSphIndex * pIndex = sphCreateIndexPhrase ( sIndexName, sIndexPath.cstr() );
assert ( pIndex );
// check lock file
if ( !pIndex->Lock() )
{
fprintf ( stdout, "FATAL: %s, will not index. Try --rotate option.\n", pIndex->GetLastError().cstr() );
exit ( 1 );
}
pIndex->SetFieldFilter ( pFieldFilter );
pIndex->SetTokenizer ( pTokenizer );
pIndex->SetDictionary ( pDict );
if ( g_bKeepAttrs )
{
if ( g_sKeepAttrsPath.IsEmpty() )
pIndex->SetKeepAttrs ( hIndex["path"].strval(), g_dKeepAttrs );
else
pIndex->SetKeepAttrs ( g_sKeepAttrsPath, g_dKeepAttrs );
}
pIndex->Setup ( tSettings );
bOK = pIndex->Build ( dSources, g_iMemLimit, g_iWriteBuffer )!=0;
if ( bOK && g_bRotate && g_bSendHUP )
{
sIndexPath.SetSprintf ( "%s.new", hIndex["path"].cstr() );
bOK = pIndex->Rename ( sIndexPath.cstr() );
}
pIndex->Unlock ();
SafeDelete ( pIndex );
int CSphIndex_VLN::Build ( const CSphVector<CSphSource*> & dSources, int iMemoryLimit, int iWriteBuffer )
{
// setup sources
ARRAY_FOREACH ( iSource, dSources )
{
CSphSource * pSource = dSources[iSource];
assert ( pSource );
pSource->SetDict ( m_pDict );
pSource->Setup ( m_tSettings );
}
// connect 1st source and fetch its schema
if ( !dSources[0]->Connect ( m_sLastError )
|| !dSources[0]->IterateStart ( m_sLastError )
|| !dSources[0]->UpdateSchema ( &m_tSchema, m_sLastError ) )
{
return 0;
}
// adjust memory requirements
int iOldLimit = iMemoryLimit;
// book memory to store at least 64K attribute rows
const int iDocinfoStride = DOCINFO_IDSIZE + m_tSchema.GetRowSize();
int iDocinfoMax = Max ( iMemoryLimit/16/iDocinfoStride/sizeof(DWORD), 65536ul );
if ( m_tSettings.m_eDocinfo==SPH_DOCINFO_NONE )
iDocinfoMax = 1;
// book at least 32 KB for field MVAs, if needed
int iFieldMVAPoolSize = Max ( 32768, iMemoryLimit/16 );
if ( bHaveFieldMVAs==0 )
iFieldMVAPoolSize = 0;
// book at least 2 MB for keywords dict, if needed
int iDictSize = 0;
if ( m_pDict->GetSettings().m_bWordDict )
iDictSize = Max ( MIN_KEYWORDS_DICT, iMemoryLimit/8 );
// do we have enough left for hits?
int iHitsMax = 1048576;
iMemoryLimit -= iDocinfoMax*iDocinfoStride*sizeof(DWORD) + iFieldMVAPoolSize + iDictSize;
if ( iMemoryLimit < iHitsMax*(int)sizeof(CSphWordHit) )
{
iMemoryLimit = iOldLimit + iHitsMax*sizeof(CSphWordHit) - iMemoryLimit;
sphWarn ( "collect_hits: mem_limit=%d kb too low, increasing to %d kb",
iOldLimit/1024, iMemoryLimit/1024 );
} else
{
iHitsMax = iMemoryLimit / sizeof(CSphWordHit);
}
// allocate raw hits block
CSphFixedVector<CSphWordHit> dHits ( iHitsMax + MAX_SOURCE_HITS );
CSphWordHit * pHits = dHits.Begin();
CSphWordHit * pHitsMax = dHits.Begin() + iHitsMax;
// after finishing with hits this pool will be used to sort strings
int iPoolSize = dHits.GetSizeBytes();
// allocate docinfos buffer
CSphFixedVector<DWORD> dDocinfos ( iDocinfoMax*iDocinfoStride );
DWORD * pDocinfo = dDocinfos.Begin();
const DWORD * pDocinfoMax = dDocinfos.Begin() + iDocinfoMax*iDocinfoStride;
if ( m_tSettings.m_eDocinfo==SPH_DOCINFO_NONE )
{
pDocinfo = NULL;
pDocinfoMax = NULL;
}
// create temp files
CSphAutofile fdLock ( GetIndexFileName("tmp0"), SPH_O_NEW, m_sLastError, true );
CSphAutofile fdHits ( GetIndexFileName ( m_bInplaceSettings ? "spp" : "tmp1" ), SPH_O_NEW, m_sLastError, !m_bInplaceSettings );
CSphAutofile fdDocinfos ( GetIndexFileName ( m_bInplaceSettings ? "spa" : "tmp2" ), SPH_O_NEW, m_sLastError, !m_bInplaceSettings );
CSphAutofile fdTmpFieldMVAs ( GetIndexFileName("tmp7"), SPH_O_NEW, m_sLastError, true );
CSphWriter tStrWriter;
CSphWriter tStrFinalWriter;
if ( !tStrWriter.OpenFile ( GetIndexFileName("tmps"), m_sLastError ) )
return 0;
tStrWriter.PutByte ( 0 ); // dummy byte, to reserve magic zero offset
if ( !tStrFinalWriter.OpenFile ( GetIndexFileName("sps"), m_sLastError ) )
return 0;
tStrFinalWriter.PutByte ( 0 ); // dummy byte, to reserve magic zero offset
// fetch documents
for ( ;; )
{
// get next doc, and handle errors
bool bGotDoc = pSource->IterateDocument ( m_sLastError );
if ( !bGotDoc )
return 0;
// ensure docid is sane
if ( pSource->m_tDocInfo.m_uDocID==DOCID_MAX )
{
m_sLastError.SetSprintf ( "docid==DOCID_MAX (source broken?)" );
return 0;
}
// check for eof
if ( !pSource->m_tDocInfo.m_uDocID )
break;
const DWORD * pPrevDocinfo = NULL;
if ( m_tSettings.m_eDocinfo==SPH_DOCINFO_EXTERN && pPrevIndex.Ptr() )
pPrevDocinfo = pPrevIndex->FindDocinfo ( pSource->m_tDocInfo.m_uDocID );
}
bool CSphSource_Document::IterateDocument ( CSphString & sError )
{
// fetch next document
for ( ;; )
{
m_tState.m_dFields = NextDocument ( sError );
if ( m_tDocInfo.m_uDocID==0 )
return true;
const int * pFieldLengths = GetFieldLengths ();
for ( int iField=0; iField<m_tState.m_iEndField; iField++ )
m_tState.m_dFieldLengths[iField] = pFieldLengths[iField];
// moved that here as docid==0 means eof for regular query
// but joined might produce doc with docid==0 and breaks delta packing
if ( HasJoinedFields() )
m_dAllIds.Add ( m_tDocInfo.m_uDocID );
if ( !m_tState.m_dFields )
return false;
// we're good
break;
}
m_tStats.m_iTotalDocuments++;
return true;
}
BYTE ** CSphSource_SQL::NextDocument ( CSphString & sError )
{
assert ( m_bSqlConnected );
// get next non-zero-id row
do
{
// try to get next row
bool bGotRow = SqlFetchRow ();
// get him!
m_tDocInfo.m_uDocID = VerifyID ( sphToDocid ( SqlColumn(0) ) );
m_uMaxFetchedID = Max ( m_uMaxFetchedID, m_tDocInfo.m_uDocID );
} while ( !m_tDocInfo.m_uDocID );
// cleanup attrs
for ( int i=0; i<m_tSchema.GetRowSize(); i++ )
m_tDocInfo.m_pDynamic[i] = 0;
// split columns into fields and attrs
for ( int i=0; i<m_iPlainFieldsLength; i++ )
{
// get that field
#if USE_ZLIB
if ( m_dUnpack[i]!=SPH_UNPACK_NONE )
{
DWORD uUnpackedLen = 0;
m_dFields[i] = (BYTE*) SqlUnpackColumn ( i, uUnpackedLen, m_dUnpack[i] );
m_dFieldLengths[i] = (int)uUnpackedLen;
continue;
}
#endif
m_dFields[i] = (BYTE*) SqlColumn ( m_tSchema.m_dFields[i].m_iIndex );
m_dFieldLengths[i] = SqlColumnLength ( m_tSchema.m_dFields[i].m_iIndex );
}
for ( int i=0; i<m_tSchema.GetAttrsCount(); i++ )
{
const CSphColumnInfo & tAttr = m_tSchema.GetAttr(i); // shortcut
if ( tAttr.m_eAttrType==SPH_ATTR_UINT32SET || tAttr.m_eAttrType==SPH_ATTR_INT64SET )
{
int uOff = 0;
if ( tAttr.m_eSrc==SPH_ATTRSRC_FIELD )
{
uOff = ParseFieldMVA ( m_dMva, SqlColumn ( tAttr.m_iIndex ), tAttr.m_eAttrType==SPH_ATTR_INT64SET );
}
m_tDocInfo.SetAttr ( tAttr.m_tLocator, uOff );
continue;
}
switch ( tAttr.m_eAttrType )
{
case SPH_ATTR_STRING:
case SPH_ATTR_JSON:
// memorize string, fixup NULLs
m_dStrAttrs[i] = SqlColumn ( tAttr.m_iIndex );
if ( !m_dStrAttrs[i].cstr() )
m_dStrAttrs[i] = "";
m_tDocInfo.SetAttr ( tAttr.m_tLocator, 0 );
break;
case SPH_ATTR_FLOAT:
m_tDocInfo.SetAttrFloat ( tAttr.m_tLocator, sphToFloat ( SqlColumn ( tAttr.m_iIndex ) ) ); // FIXME? report conversion errors maybe?
break;
case SPH_ATTR_BIGINT:
m_tDocInfo.SetAttr ( tAttr.m_tLocator, sphToInt64 ( SqlColumn ( tAttr.m_iIndex ) ) ); // FIXME? report conversion errors maybe?
break;
case SPH_ATTR_TOKENCOUNT:
// reset, and the value will be filled by IterateHits()
m_tDocInfo.SetAttr ( tAttr.m_tLocator, 0 );
break;
default:
// just store as uint by default
m_tDocInfo.SetAttr ( tAttr.m_tLocator, sphToDword ( SqlColumn ( tAttr.m_iIndex ) ) ); // FIXME? report conversion errors maybe?
break;
}
}
return m_dFields;
}
// store hits
while ( const ISphHits * pDocHits = pSource->IterateHits ( m_sLastWarning ) )
{
int iDocHits = pDocHits->Length();
#if PARANOID
for ( int i=0; i<iDocHits; i++ )
{
assert ( pDocHits->m_dData[i].m_uDocID==pSource->m_tDocInfo.m_uDocID );
assert ( pDocHits->m_dData[i].m_uWordID );
assert ( pDocHits->m_dData[i].m_iWordPos );
}
#endif
assert ( ( pHits+iDocHits )<=( pHitsMax+MAX_SOURCE_HITS ) );
memcpy ( pHits, pDocHits->First(), iDocHits*sizeof(CSphWordHit) );
pHits += iDocHits;
// sort hits
int iHits = pHits - dHits.Begin();
{
sphSort ( dHits.Begin(), iHits, CmpHit_fn() );
m_pDict->HitblockPatch ( dHits.Begin(), iHits );
}
pHits = dHits.Begin();
{
// we're not inlining, so only flush hits, docs are flushed independently
dHitBlocks.Add ( tHitBuilder.cidxWriteRawVLB ( fdHits.GetFD(), dHits.Begin(), iHits,
NULL, 0, 0 ) );
}
m_pDict->HitblockReset ();
if ( dHitBlocks.Last()<0 )
return 0;
}
/// hit info
struct CSphWordHit
{
SphDocID_t m_uDocID; ///< document ID
SphWordID_t m_uWordID; ///< word ID in current dictionary
Hitpos_t m_uWordPos; ///< word position in current document
};
class ISphHits
{
public:
CSphVector<CSphWordHit> m_dData;
};
if ( m_tSettings.m_eDocinfo==SPH_DOCINFO_INLINE )
{
// we're inlining, so let's flush both hits and docs
int iDocs = ( pDocinfo - dDocinfos.Begin() ) / iDocinfoStride;
pDocinfo = dDocinfos.Begin();
sphSortDocinfos ( dDocinfos.Begin(), iDocs, iDocinfoStride );
dHitBlocks.Add ( tHitBuilder.cidxWriteRawVLB ( fdHits.GetFD(), dHits.Begin(), iHits,
dDocinfos.Begin(), iDocs, iDocinfoStride ) );
// we are inlining, so if there are more hits in this document,
// we'll need to know it's info next flush
if ( iDocHits )
{
DOCINFOSETID ( pDocinfo, pSource->m_tDocInfo.m_uDocID );
memcpy ( DOCINFO2ATTRS ( pDocinfo ), pSource->m_tDocInfo.m_pDynamic, sizeof(CSphRowitem)*m_tSchema.GetRowSize() );
pDocinfo += iDocinfoStride;
}
} else
{
// we're not inlining, so only flush hits, docs are flushed independently
dHitBlocks.Add ( tHitBuilder.cidxWriteRawVLB ( fdHits.GetFD(), dHits.Begin(), iHits,
NULL, 0, 0 ) );
}
item | itme | itme |
---|---|---|
m_uWordID1 | m_uDocID1 | m_uWordPos1,m_uWordPos2,m_uWordPos3 |
m_uDocID2 | m_uWordPos1,m_uWordPos2,m_uWordPos3 | |
m_uDocID3 | m_uWordPos1,m_uWordPos2,m_uWordPos3 | |
m_uWordID2 | m_uDocID1 | m_uWordPos1,m_uWordPos2,m_uWordPos3 |
m_uDocID2 | m_uWordPos1,m_uWordPos2,m_uWordPos3 | |
m_uDocID3 | m_uWordPos1,m_uWordPos2,m_uWordPos3 | |
…… | …… | …… |
// store docinfo
// with the advent of SPH_ATTR_TOKENCOUNT, now MUST be done AFTER iterating the hits
// because field lengths are computed during that iterating
if ( m_tSettings.m_eDocinfo==SPH_DOCINFO_EXTERN )
{
// store next entry
DOCINFOSETID ( pDocinfo, pSource->m_tDocInfo.m_uDocID );
CSphRowitem * pAttr = DOCINFO2ATTRS ( pDocinfo );
if ( !pPrevDocinfo )
{
memcpy ( pAttr, pSource->m_tDocInfo.m_pDynamic, sizeof(CSphRowitem)*m_tSchema.GetRowSize() );
} else
{
if ( !m_dKeepAttrs.GetLength() )
{
// copy whole row from old index
memcpy ( pAttr, DOCINFO2ATTRS ( pPrevDocinfo ), sizeof(CSphRowitem)*m_tSchema.GetRowSize() );
// copy some strings attributes
// 2nd stage - copy offsets from source, data already copied at string indexing
if ( dStringAttrs.GetLength() )
CopyRow ( pSource->m_tDocInfo.m_pDynamic, m_tSchema, dStringAttrs, pAttr );
} else
{
// copy new attributes, however keep some of them from old index
memcpy ( pAttr, pSource->m_tDocInfo.m_pDynamic, sizeof(CSphRowitem)*m_tSchema.GetRowSize() );
// copy some plain attributes
if ( dPrevAttrsPlain.GetLength() )
CopyRow ( DOCINFO2ATTRS ( pPrevDocinfo ), m_tSchema, dPrevAttrsPlain, pAttr );
// copy some strings attributes
// 2nd stage - copy offsets from source, data already copied at string indexing
if ( dStringAttrs.GetLength() )
CopyRow ( pSource->m_tDocInfo.m_pDynamic, m_tSchema, dStringAttrs, pAttr );
}
}
pDocinfo += iDocinfoStride;
// if not inlining, flush buffer if it's full
// (if inlining, it will flushed later, along with the hits)
if ( pDocinfo>=pDocinfoMax )
{
assert ( pDocinfo==pDocinfoMax );
int iLen = iDocinfoMax*iDocinfoStride*sizeof(DWORD);
sphSortDocinfos ( dDocinfos.Begin(), iDocinfoMax, iDocinfoStride );
if ( !sphWriteThrottled ( fdDocinfos.GetFD(), dDocinfos.Begin(), iLen, "raw_docinfos", m_sLastError, &g_tThrottle ) )
return 0;
pDocinfo = dDocinfos.Begin();
iDocinfoBlocks++;
}
}
item | itme | itme |
---|---|---|
m_uDocID1 | attr0 | attr1 |
m_uDocID2 | attr0 | attr1 |
…… | … | … |
// vars shared between phases
CSphVector<CSphBin*> dBins;
SphOffset_t iSharedOffset = -1;
int iBinSize = CSphBin::CalcBinSize ( int ( iMemoryLimit * fReadFactor ),
dHitBlocks.GetLength() + m_pDict->GetSettings().m_bWordDict, "sort_hits" );
CSphFixedVector <BYTE> dRelocationBuffer ( iRelocationSize );
iSharedOffset = -1;
ARRAY_FOREACH ( i, dHitBlocks )
{
dBins.Add ( new CSphBin ( m_tSettings.m_eHitless, m_pDict->GetSettings().m_bWordDict ) );
dBins[i]->m_iFileLeft = dHitBlocks[i];
dBins[i]->m_iFilePos = ( i==0 ) ? iHitsGap : dBins[i-1]->m_iFilePos + dBins[i-1]->m_iFileLeft;
dBins[i]->Init ( fdHits.GetFD(), &iSharedOffset, iBinSize );
}
//////////////////////////////
// create new index files set
//////////////////////////////
tHitBuilder.CreateIndexFiles ( GetIndexFileName("spd").cstr(), GetIndexFileName("spp").cstr(),
GetIndexFileName("spe").cstr(), m_bInplaceSettings, iWriteBuffer, fdHits, &iSharedOffset );
// dict files
CSphAutofile fdTmpDict ( GetIndexFileName("tmp8"), SPH_O_NEW, m_sLastError, true );
CSphAutofile fdDict ( GetIndexFileName("spi"), SPH_O_NEW, m_sLastError, false );
bool CSphHitBuilder::CreateIndexFiles ( const char * sDocName, const char * sHitName, const char * sSkipName,
bool bInplace, int iWriteBuffer, CSphAutofile & tHit, SphOffset_t * pSharedOffset )
{
if ( !m_wrDoclist.OpenFile ( sDocName, *m_pLastError ) )
return false;
if ( bInplace )
{
sphSeek ( tHit.GetFD(), 0, SEEK_SET );
m_wrHitlist.SetFile ( tHit, pSharedOffset, *m_pLastError );
} else
{
if ( !m_wrHitlist.OpenFile ( sHitName, *m_pLastError ) )
return false;
}
if ( !m_wrSkiplist.OpenFile ( sSkipName, *m_pLastError ) )
return false;
return true;
}
CSphHitQueue tQueue ( iRawBlocks );
CSphAggregateHit tHit;
// initial fill
int iRowitems = ( m_tSettings.m_eDocinfo==SPH_DOCINFO_INLINE ) ? m_tSchema.GetRowSize() : 0;
CSphFixedVector<CSphRowitem> dInlineAttrs ( iRawBlocks*iRowitems );
CSphFixedVector<BYTE> dActive ( iRawBlocks );
for ( int i=0; i<iRawBlocks; i++ )
{
if ( !dBins[i]->ReadHit ( &tHit, iRowitems, dInlineAttrs.Begin() + i * iRowitems ) )
{
m_sLastError.SetSprintf ( "sort_hits: warmup failed (io error?)" );
return 0;
}
dActive[i] = ( tHit.m_uWordID!=0 );
if ( dActive[i] )
tQueue.Push ( tHit, i );
}
// while the queue has data for us
// FIXME! analyze binsRead return code
int iHitsSorted = 0;
iMinBlock = -1;
while ( tQueue.m_iUsed )
{
int iBin = tQueue.m_pData->m_iBin;
// pack and emit queue root
tQueue.m_pData->m_uDocID -= m_uMinDocid;
tHitBuilder.cidxHit ( tQueue.m_pData, iRowitems ? dInlineAttrs.Begin() + iBin * iRowitems : NULL );
if ( tHitBuilder.IsError() )
return 0;
// pop queue root and push next hit from popped bin
tQueue.Pop ();
if ( dActive[iBin] )
{
dBins[iBin]->ReadHit ( &tHit, iRowitems, dInlineAttrs.Begin() + iBin * iRowitems );
dActive[iBin] = ( tHit.m_uWordID!=0 );
if ( dActive[iBin] )
tQueue.Push ( tHit, iBin );
}
}
void CSphHitBuilder::cidxHit ( CSphAggregateHit * pHit, const CSphRowitem * pAttrs )
/////////////
// next word
/////////////
bool bNextWord = ( m_tLastHit.m_uWordID!=pHit->m_uWordID ||
( m_pDict->GetSettings().m_bWordDict && strcmp ( (char*)m_tLastHit.m_sKeyword, (char*)pHit->m_sKeyword ) ) ); // OPTIMIZE?
bool bNextDoc = bNextWord || ( m_tLastHit.m_uDocID!=pHit->m_uDocID );
if ( bNextDoc )
{
// finish hitlist, if any
Hitpos_t uLastPos = m_tLastHit.m_iWordPos;
if ( m_tLastHit.m_iWordPos!=EMPTY_HIT )
{
m_wrHitlist.ZipInt ( 0 );
m_tLastHit.m_iWordPos = EMPTY_HIT;
m_iPrevHitPos = EMPTY_HIT;
}
// finish doclist entry, if any
if ( m_tLastHit.m_uDocID )
DoclistEndEntry ( uLastPos );
}
void CSphHitBuilder::DoclistEndEntry ( Hitpos_t uLastPos )
{
// end doclist entry
{
assert ( m_eHitFormat==SPH_HIT_FORMAT_PLAIN );
m_wrDoclist.ZipOffset ( m_iLastHitlistDelta );
m_wrDoclist.ZipInt ( m_dLastDocFields.GetMask32() );
m_wrDoclist.ZipInt ( m_uLastDocHits );
}
m_dLastDocFields.UnsetAll();
m_uLastDocHits = 0;
// update keyword stats
m_tWord.m_iDocs++;
}
if ( bNextWord )
{
// finish doclist, if any
if ( m_tLastHit.m_uDocID )
{
// emit end-of-doclist marker
DoclistEndList ();
// emit dict entry
m_tWord.m_uWordID = m_tLastHit.m_uWordID;
m_tWord.m_sKeyword = m_tLastHit.m_sKeyword;
m_tWord.m_iDoclistLength = m_wrDoclist.GetPos() - m_tWord.m_iDoclistOffset;
m_pDict->DictEntry ( m_tWord );
// reset trackers
m_tWord.m_iDocs = 0;
m_tWord.m_iHits = 0;
m_tLastHit.m_uDocID = 0;
m_iLastHitlistPos = 0;
}
// flush wordlist, if this is the end
if ( pHit->m_iWordPos==EMPTY_HIT )
{
m_pDict->DictEndEntries ( m_wrDoclist.GetPos() );
return;
}
}
void CSphDictKeywords::DictEntry ( const CSphDictEntry & tEntry )
{
DictKeyword_t * pWord = NULL;
int iLen = strlen ( (char*)tEntry.m_sKeyword ) + 1;
for ( ;; )
{
// alloc dict entry
if ( !m_iDictChunkFree )
{
if ( m_iDictLimit && ( m_iMemUse + (int)sizeof(DictKeyword_t)*DICT_CHUNK )>m_iDictLimit )
DictFlush ();
m_pDictChunk = new DictKeyword_t [ DICT_CHUNK ];
m_iDictChunkFree = DICT_CHUNK;
m_dDictChunks.Add ( m_pDictChunk );
m_iMemUse += sizeof(DictKeyword_t)*DICT_CHUNK;
}
// alloc keyword
if ( m_iKeywordChunkFree < iLen )
{
if ( m_iDictLimit && ( m_iMemUse + KEYWORD_CHUNK )>m_iDictLimit )
{
DictFlush ();
continue; // because we just flushed pWord
}
m_pKeywordChunk = new BYTE [ KEYWORD_CHUNK ];
m_iKeywordChunkFree = KEYWORD_CHUNK;
m_dKeywordChunks.Add ( m_pKeywordChunk );
m_iMemUse += KEYWORD_CHUNK;
}
// aw kay
break;
}
pWord = m_pDictChunk++;
m_iDictChunkFree--;
pWord->m_sKeyword = (char*)m_pKeywordChunk;
memcpy ( m_pKeywordChunk, tEntry.m_sKeyword, iLen );
m_pKeywordChunk[iLen-1] = '\0';
m_pKeywordChunk += iLen;
m_iKeywordChunkFree -= iLen;
pWord->m_uOff = tEntry.m_iDoclistOffset;
pWord->m_iDocs = tEntry.m_iDocs;
pWord->m_iHits = tEntry.m_iHits;
pWord->m_uHint = sphDoclistHintPack ( tEntry.m_iDocs, tEntry.m_iDoclistLength );
pWord->m_iSkiplistPos = 0;
if ( tEntry.m_iDocs > SPH_SKIPLIST_BLOCK )
pWord->m_iSkiplistPos = (int)( tEntry.m_iSkiplistOffset );
}
// write em
DictBlock_t & tBlock = m_dDictBlocks.Add();
tBlock.m_iPos = m_wrTmpDict.GetPos ();
ARRAY_FOREACH ( i, dWords )
{
const DictKeyword_t * pWord = dWords[i];
int iLen = strlen ( pWord->m_sKeyword );
m_wrTmpDict.PutByte ( iLen );
m_wrTmpDict.PutBytes ( pWord->m_sKeyword, iLen );
m_wrTmpDict.ZipOffset ( pWord->m_uOff );
m_wrTmpDict.ZipInt ( pWord->m_iDocs );
m_wrTmpDict.ZipInt ( pWord->m_iHits );
m_wrTmpDict.PutByte ( pWord->m_uHint );
assert ( ( pWord->m_iDocs > SPH_SKIPLIST_BLOCK )==( pWord->m_iSkiplistPos!=0 ) );
if ( pWord->m_iDocs > SPH_SKIPLIST_BLOCK )
m_wrTmpDict.ZipInt ( pWord->m_iSkiplistPos );
}
tBlock.m_iLen = (int)( m_wrTmpDict.GetPos() - tBlock.m_iPos );
void CSphHitBuilder::DoclistEndList ()
{
if ( m_tWord.m_iDocs>SPH_SKIPLIST_BLOCK )
{
m_tWord.m_iSkiplistOffset = m_wrSkiplist.GetPos();
for ( int i=1; i<m_dSkiplist.GetLength(); i++ )
{
const SkiplistEntry_t & t = m_dSkiplist[i];
assert ( t.m_iBaseDocid - tLast.m_iBaseDocid>=SPH_SKIPLIST_BLOCK );
assert ( t.m_iOffset - tLast.m_iOffset>=4*SPH_SKIPLIST_BLOCK );
m_wrSkiplist.ZipOffset ( t.m_iBaseDocid - tLast.m_iBaseDocid - SPH_SKIPLIST_BLOCK );
m_wrSkiplist.ZipOffset ( t.m_iOffset - tLast.m_iOffset - 4*SPH_SKIPLIST_BLOCK );
m_wrSkiplist.ZipOffset ( t.m_iBaseHitlistPos - tLast.m_iBaseHitlistPos );
tLast = t;
}
}
// in any event, reset skiplist
m_dSkiplist.Resize ( 0 );
}
//
if ( bNextDoc )
{
// begin new doclist entry for new doc id
assert ( pHit->m_uDocID>m_tLastHit.m_uDocID );
assert ( m_wrHitlist.GetPos()>=m_iLastHitlistPos );
DoclistBeginEntry ( pHit->m_uDocID, pAttrs );
m_iLastHitlistDelta = m_wrHitlist.GetPos() - m_iLastHitlistPos;
m_tLastHit.m_uDocID = pHit->m_uDocID;
m_iLastHitlistPos = m_wrHitlist.GetPos();
}
void CSphHitBuilder::DoclistBeginEntry ( SphDocID_t uDocid, const DWORD * pAttrs )
{
// build skiplist
// that is, save decoder state and doclist position per every 128 documents
if ( ( m_tWord.m_iDocs & ( SPH_SKIPLIST_BLOCK-1 ) )==0 )
{
SkiplistEntry_t & tBlock = m_dSkiplist.Add();
tBlock.m_iBaseDocid = m_tLastHit.m_uDocID;
tBlock.m_iOffset = m_wrDoclist.GetPos();
tBlock.m_iBaseHitlistPos = m_iLastHitlistPos;
}
// begin doclist entry
m_wrDoclist.ZipOffset ( uDocid - m_tLastHit.m_uDocID );
assert ( !pAttrs || m_dMinRow.GetLength() );
if ( pAttrs )
{
ARRAY_FOREACH ( i, m_dMinRow )
m_wrDoclist.ZipInt ( pAttrs[i] - m_dMinRow[i] );
}
Hitpos_t iHitPosPure = HITMAN::GetPosWithField ( pHit->m_iWordPos );
// add hit delta without field end marker
// or postpone adding to hitlist till got another uniq hit
if ( iHitPosPure==pHit->m_iWordPos )
{
m_wrHitlist.ZipInt ( pHit->m_iWordPos - m_tLastHit.m_iWordPos );
m_tLastHit.m_iWordPos = pHit->m_iWordPos;
} else
{
assert ( HITMAN::IsEnd ( pHit->m_iWordPos ) );
m_bGotFieldEnd = true;
m_iPrevHitPos = m_tLastHit.m_iWordPos;
m_tLastHit.m_iWordPos = HITMAN::GetPosWithField ( pHit->m_iWordPos );
}
// update matched fields mask
m_dLastDocFields.Set ( HITMAN::GetField ( pHit->m_iWordPos ) );
m_uLastDocHits++;
m_tWord.m_iHits++;
bool CSphHitBuilder::cidxDone ( int iMemLimit, int iMinInfixLen, int iMaxCodepointLen, DictHeader_t * pDictHeader )
{
assert ( pDictHeader );
if ( m_bGotFieldEnd )
{
HITMAN::SetEndMarker ( &m_tLastHit.m_iWordPos );
m_wrHitlist.ZipInt ( m_tLastHit.m_iWordPos - m_iPrevHitPos );
m_bGotFieldEnd = false;
}
// finalize dictionary
// in dict=crc mode, just flushes wordlist checkpoints
// in dict=keyword mode, also creates infix index, if needed
if ( iMinInfixLen>0 && m_pDict->GetSettings().m_bWordDict )
pDictHeader->m_iInfixCodepointBytes = iMaxCodepointLen;
if ( !m_pDict->DictEnd ( pDictHeader, iMemLimit, *m_pLastError, m_pThrottle ) )
return false;
// close all data files
m_wrDoclist.CloseFile ();
m_wrHitlist.CloseFile ( true );
return !IsError();
}
bool CSphDictKeywords::DictEnd ( DictHeader_t * pHeader, int iMemLimit, CSphString & sError, ThrottleState_t * pThrottle )
// initialize readers
CSphVector<CSphBin*> dBins ( m_dDictBlocks.GetLength() );
int iMaxBlock = 0;
ARRAY_FOREACH ( i, m_dDictBlocks )
iMaxBlock = Max ( iMaxBlock, m_dDictBlocks[i].m_iLen );
iMemLimit = Max ( iMemLimit, iMaxBlock*m_dDictBlocks.GetLength() );
int iBinSize = CSphBin::CalcBinSize ( iMemLimit, m_dDictBlocks.GetLength(), "sort_dict" );
SphOffset_t iSharedOffset = -1;
ARRAY_FOREACH ( i, m_dDictBlocks )
{
dBins[i] = new CSphBin();
dBins[i]->m_iFileLeft = m_dDictBlocks[i].m_iLen;
dBins[i]->m_iFilePos = m_dDictBlocks[i].m_iPos;
dBins[i]->Init ( m_iTmpFD, &iSharedOffset, iBinSize );
dBins[i]->SetThrottle ( pThrottle );
}
// sort em
int iTotalWords = m_dDictChunks.GetLength()*DICT_CHUNK - m_iDictChunkFree;
CSphVector<DictKeyword_t*> dWords ( iTotalWords );
int iIdx = 0;
ARRAY_FOREACH ( i, m_dDictChunks )
{
int iWords = DICT_CHUNK;
if ( i==m_dDictChunks.GetLength()-1 )
iWords -= m_iDictChunkFree;
DictKeyword_t * pWord = m_dDictChunks[i];
for ( int j=0; j<iWords; j++ )
dWords[iIdx++] = pWord++;
}
dWords.Sort ( DictKeywordCmp_fn() );
bool bHasMorphology = HasMorphology();
CSphKeywordDeltaWriter tLastKeyword;
int iWords = 0;
while ( qWords.GetLength() )
{
const DictKeywordTagged_t & tWord = qWords.Root();
const int iLen = strlen ( tWord.m_sKeyword ); // OPTIMIZE?
// store checkpoints as needed
if ( ( iWords % SPH_WORDLIST_CHECKPOINT )==0 )
{
// emit a checkpoint, unless we're at the very dict beginning
if ( iWords )
{
m_wrDict.ZipInt ( 0 );
m_wrDict.ZipInt ( 0 );
}
BYTE * sClone = new BYTE [ iLen+1 ]; // OPTIMIZE? pool these?
memcpy ( sClone, tWord.m_sKeyword, iLen+1 );
sClone[iLen] = '\0';
CSphWordlistCheckpoint & tCheckpoint = m_dCheckpoints.Add ();
tCheckpoint.m_sWord = (char*) sClone;
tCheckpoint.m_iWordlistOffset = m_wrDict.GetPos();
tLastKeyword.Reset();
}
iWords++;
tLastKeyword.PutDelta ( m_wrDict, (const BYTE *)tWord.m_sKeyword, iLen );
m_wrDict.ZipOffset ( tWord.m_uOff );
m_wrDict.ZipInt ( tWord.m_iDocs );
m_wrDict.ZipInt ( tWord.m_iHits );
if ( tWord.m_uHint )
m_wrDict.PutByte ( tWord.m_uHint );
if ( tWord.m_iDocs > SPH_SKIPLIST_BLOCK )
m_wrDict.ZipInt ( tWord.m_iSkiplistPos );
// build infixes
if ( pInfixer )
pInfixer->AddWord ( (const BYTE*)tWord.m_sKeyword, iLen, m_dCheckpoints.GetLength(), bHasMorphology );
// next
int iBin = tWord.m_iBlock;
qWords.Pop ();
if ( !dBins[iBin]->IsDone() )
{
DictReadEntry ( dBins[iBin], tEntry, pKeywords + iBin*MAX_KEYWORD_BYTES );
if ( dBins[iBin]->IsError() )
{
sError.SetSprintf ( "entry read error in dictionary sort (bin %d of %d)", iBin, dBins.GetLength() );
LOC_CLEANUP();
return false;
}
tEntry.m_iBlock = iBin;
qWords.Push ( tEntry );
}
}
// flush infix hash entries, if any
if ( pInfixer )
pInfixer->SaveEntries ( m_wrDict );
// flush wordlist checkpoints (blocks)
pHeader->m_iDictCheckpointsOffset = m_wrDict.GetPos();
pHeader->m_iDictCheckpoints = m_dCheckpoints.GetLength();
ARRAY_FOREACH ( i, m_dCheckpoints )
{
const int iLen = strlen ( m_dCheckpoints[i].m_sWord );
assert ( m_dCheckpoints[i].m_iWordlistOffset>0 );
assert ( iLen>0 && iLen<MAX_KEYWORD_BYTES );
m_wrDict.PutDword ( iLen );
m_wrDict.PutBytes ( m_dCheckpoints[i].m_sWord, iLen );
m_wrDict.PutOffset ( m_dCheckpoints[i].m_iWordlistOffset );
SafeDeleteArray ( m_dCheckpoints[i].m_sWord );
}
// flush infix hash blocks
if ( pInfixer )
{
pHeader->m_iInfixBlocksOffset = pInfixer->SaveEntryBlocks ( m_wrDict );
pHeader->m_iInfixBlocksWordsSize = pInfixer->GetBlocksWordsSize();
if ( pHeader->m_iInfixBlocksOffset>UINT_MAX ) // FIXME!!! change to int64
sphDie ( "INTERNAL ERROR: dictionary size " INT64_FMT " overflow at dictend save", pHeader->m_iInfixBlocksOffset );
}
// flush header
// mostly for debugging convenience
// primary storage is in the index wide header
m_wrDict.PutBytes ( "dict-header", 11 );
m_wrDict.ZipInt ( pHeader->m_iDictCheckpoints );
m_wrDict.ZipOffset ( pHeader->m_iDictCheckpointsOffset );
m_wrDict.ZipInt ( pHeader->m_iInfixCodepointBytes );
m_wrDict.ZipInt ( (DWORD)pHeader->m_iInfixBlocksOffset );
tBuildHeader.m_sHeaderExtension = "sph";
tBuildHeader.m_pMinRow = m_dMinRow.Begin();
tBuildHeader.m_uMinDocid = m_uMinDocid;
tBuildHeader.m_pThrottle = &g_tThrottle;
tBuildHeader.m_uKillListSize = uKillistSize;
tBuildHeader.m_iMinMaxIndex = m_iMinMaxIndex;
tBuildHeader.m_iTotalDups = iDupes;
// we're done
if ( !BuildDone ( tBuildHeader, m_sLastError ) )
return 0;
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。