lucene数据写入-01写入流程
文件结构名称扩展名数据结构说明Segments Filesegments.gen segments_NSegmentInfos保存当前索引中所有的段信息的集合,索引中所有可用的段信息都存储在段文件segment_NLock Filewrite.lock写锁,用于阻止多个IndexWriter写同一个索引文件Segment Info.siLucene70SegmentInfoFormatsegmen
文件结构
名称 | 扩展名 | 数据结构 | 说明 |
---|---|---|---|
Segments File | segments.gen segments_N | SegmentInfos | 保存当前索引中所有的段信息的集合,索引中所有可用的段信息都存储在段文件segment_N |
Lock File | write.lock | 写锁,用于阻止多个IndexWriter写同一个索引文件 | |
Segment Info | .si | Lucene70SegmentInfoFormat | segment的元数据信息,指明这个segment都包含哪些文件 |
Compound File | .cfs, .cfe | Lucene50CompoundFormat | 如果启用compound功能,会压缩索引到2个文件内 |
Fields | .fnm | Lucene60FieldInfosFormat | 存储有哪些Field,以及相关信息 |
Field Index | .fdx | Lucene50StoredFieldsFormat | Field数据文件的索引 |
Field Data | .fdt | Lucene50StoredFieldsFormat | Field数据文件 |
Term Dictionary | .tim | BlockTreeTermsWriter | Term词典 |
Term Index | .tip | BlockTreeTermsWriter | 指向Term词典的索引 |
Frequencies | .doc | Lucene50PostingsWriter | 存储有关术语在索引中出现位置的位置信息 |
Payloads | .pay | Lucene50PostingsWriter | offset偏移/payload附加信息 |
Norms | .nvd, .nvm | Lucene70NormsFormat | .nvm保存加权因子元数据;.nvd存储加权数据 |
Per-Document Values | dvd, .dvm | Lucene70DocValuesFormat | .dvm存文档正排元数据;.dvd存文档正排数据 |
Term Vector Index | .tvx | Lucene50TermVectorsFormat | 指向tvd的offset |
Term Vector Data | .tvd | Lucene50TermVectorsFormat | 存储term vector信息 |
Live Documents | .liv | Lucene50LiveDocsFormat | 存活的文档列表。位图形式 |
Point values | .dii, .dim | Lucene60PointsFormat | 多维数据,地理位置等信息,用于处理数值型的查询 |
写入数据
我们以下面代码为例看看lucene是如何写入数据的,lucene的版本是8.1.0
@Test
public void createIndexTest() throws IOException {
Document document = new Document();//文本
Document document1 = new Document();
Document document2 = new Document();
Document document3 = new Document();
//每个field域
document.add(new TextField("desc", "common common common common common term", Field.Store.YES));
document1.add(new TextField("desc", "common common common common common term term", Field.Store.YES));
document2.add(new TextField("desc", "term term term common common common common common", Field.Store.YES));
document3.add(new TextField("desc", "term", Field.Store.YES));
StandardAnalyzer standardAnalyzer = new StandardAnalyzer();
Directory directory = FSDirectory.open(Paths.get("D:/lucene/index"));
IndexWriterConfig indexWriterConfig = new IndexWriterConfig(standardAnalyzer);
indexWriterConfig.setUseCompoundFile(false);
IndexWriter indexWriter = new IndexWriter(directory, indexWriterConfig);
indexWriter.addDocument(document);
indexWriter.addDocument(document1);
indexWriter.addDocument(document2);
indexWriter.addDocument(document3);
indexWriter.close();
}
创建一个数据写入目录对象,所有数据都会写入到此处设置的路径下
Directory directory = FSDirectory.open(Paths.get("D:/lucene/index"));
创建一个写入配置,并设置了标准分析器
IndexWriterConfig indexWriterConfig = new IndexWriterConfig(standardAnalyzer);
LiveIndexWriterConfig(Analyzer analyzer) {
this.analyzer = analyzer;
//内存占用最大空间
ramBufferSizeMB = IndexWriterConfig.DEFAULT_RAM_BUFFER_SIZE_MB;
//最大文档数量
maxBufferedDocs = IndexWriterConfig.DEFAULT_MAX_BUFFERED_DOCS;
mergedSegmentWarmer = null;
//数据删除策略
delPolicy = new KeepOnlyLastCommitDeletionPolicy();
commit = null;
//默认使用复合文件
useCompoundFile = IndexWriterConfig.DEFAULT_USE_COMPOUND_FILE_SYSTEM;
//模式
openMode = OpenMode.CREATE_OR_APPEND;
//默认的打分为BM25
similarity = IndexSearcher.getDefaultSimilarity();
//段合并调度器
mergeScheduler = new ConcurrentMergeScheduler();
//索引数据处理流程,包含生成正向和倒排数据
indexingChain = DocumentsWriterPerThread.defaultIndexingChain;
//codec默认为Lucene80,可以通过SPI机制进行扩展
codec = Codec.getDefault();
if (codec == null) {
throw new NullPointerException();
}
infoStream = InfoStream.getDefault();
//设置segment的合并策略
mergePolicy = new TieredMergePolicy();
//数据刷盘策略
flushPolicy = new FlushByRamOrCountsPolicy();
readerPooling = IndexWriterConfig.DEFAULT_READER_POOLING;
//索引数据线程池
indexerThreadPool = new DocumentsWriterPerThreadPool();
perThreadHardLimitMB = IndexWriterConfig.DEFAULT_RAM_PER_THREAD_HARD_LIMIT_MB;
}
创建IndexWriter对象,IndexWriter是数据写入的核心类,这里面我们为了更好的观察lucene生成了哪些文件,禁用了lucene生成符合文件,同时我们写入了很多重复数据,主要看下lucene是如何生成倒排索引的
indexWriterConfig.setUseCompoundFile(false);
IndexWriter indexWriter = new IndexWriter(directory, indexWriterConfig);
indexWriter的构造函数
public IndexWriter(Directory d, IndexWriterConfig conf) throws IOException {
enableTestPoints = isEnableTestPoints();
conf.setIndexWriter(this); // prevent reuse by other instances
config = conf;
infoStream = config.getInfoStream();
softDeletesEnabled = config.getSoftDeletesField() != null;
//创建一个write.lock文件,防止并发写入
writeLock = d.obtainLock(WRITE_LOCK_NAME);
boolean success = false;
try {
directoryOrig = d;
directory = new LockValidatingDirectoryWrapper(d, writeLock);
analyzer = config.getAnalyzer();
mergeScheduler = config.getMergeScheduler();
mergeScheduler.setInfoStream(infoStream);
codec = config.getCodec();
OpenMode mode = config.getOpenMode();
final boolean indexExists;
final boolean create;
if (mode == OpenMode.CREATE) {
indexExists = DirectoryReader.indexExists(directory);
create = true;
} else if (mode == OpenMode.APPEND) {
indexExists = true;
create = false;
} else {
indexExists = DirectoryReader.indexExists(directory);
create = !indexExists;
}
//读取文件夹下的所有文件
String[] files = directory.listAll();
//在上面配置中我们没有设置commit,则默认为null
IndexCommit commit = config.getIndexCommit();
// Set up our initial SegmentInfos:
StandardDirectoryReader reader;
if (commit == null) {
reader = null;
} else {
reader = commit.getReader();
}
//新创建索引
if (create) {
//校验
if (config.getIndexCommit() != null) {
// We cannot both open from a commit point and create:
if (mode == OpenMode.CREATE) {
throw new IllegalArgumentException("cannot use IndexWriterConfig.setIndexCommit() with OpenMode.CREATE");
} else {
throw new IllegalArgumentException("cannot use IndexWriterConfig.setIndexCommit() when index has no commit");
}
}
final SegmentInfos sis = new SegmentInfos(config.getIndexCreatedVersionMajor());
if (indexExists) {
final SegmentInfos previous = SegmentInfos.readLatestCommit(directory);
sis.updateGenerationVersionAndCounter(previous);
}
segmentInfos = sis;
rollbackSegments = segmentInfos.createBackupSegmentInfos();
// Record that we have a change (zero out all
// segments) pending:
changed();
} else if (reader != null) {
// Init from an existing already opened NRT or non-NRT reader:
if (reader.directory() != commit.getDirectory()) {
throw new IllegalArgumentException("IndexCommit's reader must have the same directory as the IndexCommit");
}
if (reader.directory() != directoryOrig) {
throw new IllegalArgumentException("IndexCommit's reader must have the same directory passed to IndexWriter");
}
if (reader.segmentInfos.getLastGeneration() == 0) {
// TODO: maybe we could allow this? It's tricky...
throw new IllegalArgumentException("index must already have an initial commit to open from reader");
}
// Must clone because we don't want the incoming NRT reader to "see" any changes this writer now makes:
segmentInfos = reader.segmentInfos.clone();
SegmentInfos lastCommit;
try {
lastCommit = SegmentInfos.readCommit(directoryOrig, segmentInfos.getSegmentsFileName());
} catch (IOException ioe) {
throw new IllegalArgumentException("the provided reader is stale: its prior commit file \"" + segmentInfos.getSegmentsFileName() + "\" is missing from index");
}
if (reader.writer != null) {
// The old writer better be closed (we have the write lock now!):
assert reader.writer.closed;
// In case the old writer wrote further segments (which we are now dropping),
// update SIS metadata so we remain write-once:
segmentInfos.updateGenerationVersionAndCounter(reader.writer.segmentInfos);
lastCommit.updateGenerationVersionAndCounter(reader.writer.segmentInfos);
}
rollbackSegments = lastCommit.createBackupSegmentInfos();
} else {
// Init from either the latest commit point, or an explicit prior commit point:
String lastSegmentsFile = SegmentInfos.getLastCommitSegmentsFileName(files);
if (lastSegmentsFile == null) {
throw new IndexNotFoundException("no segments* file found in " + directory + ": files: " + Arrays.toString(files));
}
// Do not use SegmentInfos.read(Directory) since the spooky
// retrying it does is not necessary here (we hold the write lock):
segmentInfos = SegmentInfos.readCommit(directoryOrig, lastSegmentsFile);
if (commit != null) {
// Swap out all segments, but, keep metadata in
// SegmentInfos, like version & generation, to
// preserve write-once. This is important if
// readers are open against the future commit
// points.
if (commit.getDirectory() != directoryOrig) {
throw new IllegalArgumentException("IndexCommit's directory doesn't match my directory, expected=" + directoryOrig + ", got=" + commit.getDirectory());
}
SegmentInfos oldInfos = SegmentInfos.readCommit(directoryOrig, commit.getSegmentsFileName());
segmentInfos.replace(oldInfos);
changed();
if (infoStream.isEnabled("IW")) {
infoStream.message("IW", "init: loaded commit \"" + commit.getSegmentsFileName() + "\"");
}
}
rollbackSegments = segmentInfos.createBackupSegmentInfos();
}
commitUserData = new HashMap<>(segmentInfos.getUserData()).entrySet();
pendingNumDocs.set(segmentInfos.totalMaxDoc());
// start with previous field numbers, but new FieldInfos
// NOTE: this is correct even for an NRT reader because we'll pull FieldInfos even for the un-committed segments:
globalFieldNumberMap = getFieldNumberMap();
validateIndexSort();
config.getFlushPolicy().init(config);
bufferedUpdatesStream = new BufferedUpdatesStream(infoStream);
docWriter = new DocumentsWriter(flushNotifications, segmentInfos.getIndexCreatedVersionMajor(), pendingNumDocs,
enableTestPoints, this::newSegmentName,
config, directoryOrig, directory, globalFieldNumberMap);
readerPool = new ReaderPool(directory, directoryOrig, segmentInfos, globalFieldNumberMap,
bufferedUpdatesStream::getCompletedDelGen, infoStream, conf.getSoftDeletesField(), reader, config.getReaderAttributes());
if (config.getReaderPooling()) {
readerPool.enableReaderPooling();
}
// Default deleter (for backwards compatibility) is
// KeepOnlyLastCommitDeleter:
// Sync'd is silly here, but IFD asserts we sync'd on the IW instance:
synchronized(this) {
deleter = new IndexFileDeleter(files, directoryOrig, directory,
config.getIndexDeletionPolicy(),
segmentInfos, infoStream, this,
indexExists, reader != null);
// We incRef all files when we return an NRT reader from IW, so all files must exist even in the NRT case:
assert create || filesExist(segmentInfos);
}
if (deleter.startingCommitDeleted) {
// Deletion policy deleted the "head" commit point.
// We have to mark ourself as changed so that if we
// are closed w/o any further changes we write a new
// segments_N file.
changed();
}
if (reader != null) {
// We always assume we are carrying over incoming changes when opening from reader:
segmentInfos.changed();
changed();
}
if (infoStream.isEnabled("IW")) {
infoStream.message("IW", "init: create=" + create + " reader=" + reader);
messageState();
}
success = true;
} finally {
if (!success) {
if (infoStream.isEnabled("IW")) {
infoStream.message("IW", "init: hit exception on init; releasing write lock");
}
IOUtils.closeWhileHandlingException(writeLock);
writeLock = null;
}
}
}
上面代码主要做了几件事,1.创建一个write.lock文件,防止并发写入,2.根据mode设置的模式和索引文件是否已经存在来执行初始化,如果索引文件已经存在则需要加载路径下的索引文件,3.初始化DocumentsWriter对象为后续文档数据写入做准备,4.创建ReaderPool对象为后续,主要用于保存SegmentReader,后续segment合并可能需要读取段信息,5.创建IndexFileDeleter对象,遍历目录下已经提交的提交点添加引用,清理掉提交点未引用的文件
初始化完成调用addDocument添加文档
indexWriter.addDocument(document);
追踪代码调用的是updateDocument
private long updateDocument(final DocumentsWriterDeleteQueue.Node<?> delNode,
Iterable<? extends IndexableField> doc) throws IOException {
ensureOpen();
boolean success = false;
try {
//写入文档数据
long seqNo = docWriter.updateDocument(doc, analyzer, delNode);
if (seqNo < 0) {
seqNo = -seqNo;
processEvents(true);
}
success = true;
return seqNo;
} catch (VirtualMachineError tragedy) {
tragicEvent(tragedy, "updateDocument");
throw tragedy;
} finally {
if (success == false) {
if (infoStream.isEnabled("IW")) {
infoStream.message("IW", "hit exception updating document");
}
}
maybeCloseOnTragicEvent();
}
}
继续调用docWiter的updateDocument方法
long updateDocument(final Iterable<? extends IndexableField> doc, final Analyzer analyzer,
final DocumentsWriterDeleteQueue.Node<?> delNode) throws IOException {
boolean hasEvents = preUpdate();
//创建重入锁
final ThreadState perThread = flushControl.obtainAndLock();
//核心类
final DocumentsWriterPerThread flushingDWPT;
long seqNo;
try {
// This must happen after we've pulled the ThreadState because IW.close
// waits for all ThreadStates to be released:
ensureOpen();
//初始化DocumentsWriterPerThread对象
ensureInitialized(perThread);
assert perThread.isInitialized();
final DocumentsWriterPerThread dwpt = perThread.dwpt;
final int dwptNumDocs = dwpt.getNumDocsInRAM();
try {
//写入文档数据
seqNo = dwpt.updateDocument(doc, analyzer, delNode, flushNotifications);
} finally {
if (dwpt.isAborted()) {
flushControl.doOnAbort(perThread);
}
// We don't know whether the document actually
// counted as being indexed, so we must subtract here to
// accumulate our separate counter:
numDocsInRAM.addAndGet(dwpt.getNumDocsInRAM() - dwptNumDocs);
}
final boolean isUpdate = delNode != null && delNode.isDelete();
flushingDWPT = flushControl.doAfterDocument(perThread, isUpdate);
assert seqNo > perThread.lastSeqNo: "seqNo=" + seqNo + " lastSeqNo=" + perThread.lastSeqNo;
perThread.lastSeqNo = seqNo;
} finally {
perThreadPool.release(perThread);
}
if (postUpdate(flushingDWPT, hasEvents)) {
seqNo = -seqNo;
}
return seqNo;
}
上面的代码主要
1.创建了一个ThreadState对象,它继承了ReentrantLock对象所以也是一个重入锁
ThreadState obtainAndLock() {
final ThreadState perThread = perThreadPool.getAndLock();
boolean success = false;
try {
if (perThread.isInitialized() && perThread.dwpt.deleteQueue != documentsWriter.deleteQueue) {
addFlushableState(perThread);
}
success = true;
return perThread;
} finally {
if (!success) {
perThreadPool.release(perThread);
}
}
}
ThreadState getAndLock() {
ThreadState threadState = null;
synchronized (this) {
if (freeList.isEmpty()) {
return newThreadState();
} else {
threadState = freeList.remove(freeList.size()-1);
if (threadState.dwpt == null) {
for(int i=0;i<freeList.size();i++) {
ThreadState ts = freeList.get(i);
if (ts.dwpt != null) {
// Use this one instead, and swap it with
// the un-initialized one:
freeList.set(i, threadState);
threadState = ts;
break;
}
}
}
}
}
threadState.lock();
return threadState;
}
首先会从缓存中直接获取一个threadState使用,空闲队列是空的则调用newThreadState方法重新创建一个对象
private synchronized ThreadState newThreadState() {
assert takenThreadStatePermits >= 0;
while (takenThreadStatePermits > 0) {
// we can't create new thread-states while not all permits are available
try {
wait();
} catch (InterruptedException ie) {
throw new ThreadInterruptedException(ie);
}
}
ThreadState threadState = new ThreadState(null);
threadState.lock(); // lock so nobody else will get this ThreadState
threadStates.add(threadState);
return threadState;
}
创建threadState并调用lock方法将这段代码加锁,防止并发操作,并将threadState放入缓存list中
2.初始化DocumentsWriterPerThread
ensureInitialized(perThread);
lucene为了提高写入性能,通过DocumentsWriterPerThread并发写入数据,每个线程是隔离的,单独执行自己的写入逻辑,不会为了保证线程安全问题而进行加锁或其他操作,可以提高写入性能
private void ensureInitialized(ThreadState state) throws IOException {
if (state.dwpt == null) {
final FieldInfos.Builder infos = new FieldInfos.Builder(globalFieldNumberMap);
state.dwpt = new DocumentsWriterPerThread(indexCreatedVersionMajor, segmentNameSupplier.get(), directoryOrig,
directory, config, infoStream, deleteQueue, infos,
pendingNumDocs, enableTestPoints);
}
}
public DocumentsWriterPerThread(int indexVersionCreated, String segmentName, Directory directoryOrig, Directory directory, LiveIndexWriterConfig indexWriterConfig, InfoStream infoStream, DocumentsWriterDeleteQueue deleteQueue,
FieldInfos.Builder fieldInfos, AtomicLong pendingNumDocs, boolean enableTestPoints) throws IOException {
this.directoryOrig = directoryOrig;
this.directory = new TrackingDirectoryWrapper(directory);
this.fieldInfos = fieldInfos;
this.indexWriterConfig = indexWriterConfig;
this.infoStream = infoStream;
this.codec = indexWriterConfig.getCodec();
this.docState = new DocState(this, infoStream);
this.docState.similarity = indexWriterConfig.getSimilarity();
this.pendingNumDocs = pendingNumDocs;
bytesUsed = Counter.newCounter();
byteBlockAllocator = new DirectTrackingAllocator(bytesUsed);
pendingUpdates = new BufferedUpdates(segmentName);
intBlockAllocator = new IntBlockAllocator(bytesUsed);
this.deleteQueue = deleteQueue;
assert numDocsInRAM == 0 : "num docs " + numDocsInRAM;
deleteSlice = deleteQueue.newSlice();
segmentInfo = new SegmentInfo(directoryOrig, Version.LATEST, Version.LATEST, segmentName, -1, false, codec, Collections.emptyMap(), StringHelper.randomId(), Collections.emptyMap(), indexWriterConfig.getIndexSort());
assert numDocsInRAM == 0;
if (INFO_VERBOSE && infoStream.isEnabled("DWPT")) {
infoStream.message("DWPT", Thread.currentThread().getName() + " init seg=" + segmentName + " delQueue=" + deleteQueue);
}
// this should be the last call in the ctor
// it really sucks that we need to pull this within the ctor and pass this ref to the chain!
consumer = indexWriterConfig.getIndexingChain().getChain(this);
this.enableTestPoints = enableTestPoints;
this.indexVersionCreated = indexVersionCreated;
}
调用dwpt的updateDocument方法
seqNo = dwpt.updateDocument(doc, analyzer, delNode, flushNotifications);
继续执行到processDocument方法
public void processDocument() throws IOException {
int fieldCount = 0;
long fieldGen = nextFieldGen++;
termsHash.startDocument();
startStoredFields(docState.docID);
try {
//循环遍历字段进行处理
for (IndexableField field : docState.doc) {
fieldCount = processField(field, fieldGen, fieldCount);
}
} finally {
if (docWriter.hasHitAbortingException() == false) {
for (int i=0;i<fieldCount;i++) {
fields[i].finish();
}
finishStoredFields();
}
}
try {
termsHash.finishDocument();
} catch (Throwable th) {
docWriter.onAbortingException(th);
throw th;
}
}
这里主要1.初始化调用 startStoredFields(docState.docID)创建倒排索引写入writer其实就是Lucene50StoredFieldsFormat
void startDocument(int docID) throws IOException {
assert lastDoc < docID;
initStoredFieldsWriter();
while (++lastDoc < docID) {
writer.startDocument();
writer.finishDocument();
}
writer.startDocument();
}
protected void initStoredFieldsWriter() throws IOException {
if (writer == null) {
this.writer =
docWriter.codec.storedFieldsFormat().fieldsWriter(docWriter.directory, docWriter.getSegmentInfo(),
IOContext.DEFAULT);
}
}
2.开始循环遍历文档中的所有字段然后进行处理
for (IndexableField field : docState.doc) {
fieldCount = processField(field, fieldGen, fieldCount);
}
调用processField开始处理字段数据
private int processField(IndexableField field, long fieldGen, int fieldCount) throws IOException {
String fieldName = field.name();
IndexableFieldType fieldType = field.fieldType();
PerField fp = null;
if (fieldType.indexOptions() == null) {
throw new NullPointerException("IndexOptions must not be null (field: \"" + field.name() + "\")");
}
// Invert indexed fields:
//处理需要索引的字段
if (fieldType.indexOptions() != IndexOptions.NONE) {
fp = getOrAddField(fieldName, fieldType, true);
boolean first = fp.fieldGen != fieldGen;
//生成倒排索引数据
fp.invert(field, first);
if (first) {
fields[fieldCount++] = fp;
fp.fieldGen = fieldGen;
}
} else {
verifyUnIndexedFieldType(fieldName, fieldType);
}
// Add stored fields:
//需要存储的字段
if (fieldType.stored()) {
if (fp == null) {
fp = getOrAddField(fieldName, fieldType, false);
}
if (fieldType.stored()) {
String value = field.stringValue();
if (value != null && value.length() > IndexWriter.MAX_STORED_STRING_LENGTH) {
throw new IllegalArgumentException("stored field \"" + field.name() + "\" is too large (" + value.length() + " characters) to store");
}
try {
//存储字段
storedFieldsConsumer.writeField(fp.fieldInfo, field);
} catch (Throwable th) {
docWriter.onAbortingException(th);
throw th;
}
}
}
//
DocValuesType dvType = fieldType.docValuesType();
if (dvType == null) {
throw new NullPointerException("docValuesType must not be null (field: \"" + fieldName + "\")");
}
//需要写入docvalues,正排索引数据
if (dvType != DocValuesType.NONE) {
if (fp == null) {
fp = getOrAddField(fieldName, fieldType, false);
}
indexDocValue(fp, dvType, field);
}
//维度数据
if (fieldType.pointDataDimensionCount() != 0) {
if (fp == null) {
fp = getOrAddField(fieldName, fieldType, false);
}
indexPoint(fp, field);
}
return fieldCount;
}
根据配置是否需要索引数据、是否需要保存数据、是否记录正排数据和维度数据进行不同处理,我们主要看如何创建倒排索引
//判断field之前是否已经创建,如果已经创建则直接使用
fp = getOrAddField(fieldName, fieldType, true);
private PerField getOrAddField(String name, IndexableFieldType fieldType, boolean invert) {
// Make sure we have a PerField allocated
final int hashPos = name.hashCode() & hashMask;
PerField fp = fieldHash[hashPos];
while (fp != null && !fp.fieldInfo.name.equals(name)) {
fp = fp.next;
}
if (fp == null) {
// First time we are seeing this field in this segment
FieldInfo fi = fieldInfos.getOrAdd(name);
initIndexOptions(fi, fieldType.indexOptions());
Map<String, String> attributes = fieldType.getAttributes();
if (attributes != null) {
attributes.forEach((k, v) -> fi.putAttribute(k, v));
}
fp = new PerField(docWriter.getIndexCreatedVersionMajor(), fi, invert);
fp.next = fieldHash[hashPos];
fieldHash[hashPos] = fp;
totalFieldCount++;
// At most 50% load factor:
if (totalFieldCount >= fieldHash.length/2) {
rehash();
}
if (totalFieldCount > fields.length) {
PerField[] newFields = new PerField[ArrayUtil.oversize(totalFieldCount, RamUsageEstimator.NUM_BYTES_OBJECT_REF)];
System.arraycopy(fields, 0, newFields, 0, fields.length);
fields = newFields;
}
} else if (invert && fp.invertState == null) {
initIndexOptions(fp.fieldInfo, fieldType.indexOptions());
fp.setInvertState();
}
return fp;
}
这里其实类似于一个map,首先计算hashcode然后进行取模获取存储在数组上的下标位置,然后通过拉链法的形式向下查找,通过equals方法查询对应的字段对象PerField是否存在,如果该字段对象在该segment中之前不存在则会新创建一个FieldInfo,并且把字段名称到对象的映射和字段编号到对象的映射都保存起来,然后将fileInfo信息添加到PerField中
public PerField(int indexCreatedVersionMajor, FieldInfo fieldInfo, boolean invert) {
this.indexCreatedVersionMajor = indexCreatedVersionMajor;
this.fieldInfo = fieldInfo;
similarity = docState.similarity;
//需要创建倒排索引
if (invert) {
setInvertState();
}
}
void setInvertState() {
invertState = new FieldInvertState(indexCreatedVersionMajor, fieldInfo.name, fieldInfo.getIndexOptions());
//倒排存储字段
termsHashPerField = termsHash.addField(invertState, fieldInfo);
if (fieldInfo.omitsNorms() == false) {
assert norms == null;
norms = new NormValuesWriter(fieldInfo, docState.docWriter.bytesUsed);
}
}
public TermsHashPerField addField(FieldInvertState invertState, FieldInfo fieldInfo) {
//保存字段的词频和位置
return new FreqProxTermsWriterPerField(invertState, this, fieldInfo, nextTermsHash.addField(invertState, fieldInfo));
}
public FreqProxTermsWriterPerField(FieldInvertState invertState, TermsHash termsHash, FieldInfo fieldInfo, TermsHashPerField nextPerField) {
super(fieldInfo.getIndexOptions().compareTo(IndexOptions.DOCS_AND_FREQS_AND_POSITIONS) >= 0 ? 2 : 1, invertState, termsHash, nextPerField, fieldInfo);
IndexOptions indexOptions = fieldInfo.getIndexOptions();
assert indexOptions != IndexOptions.NONE;
//需要记录词频
hasFreq = indexOptions.compareTo(IndexOptions.DOCS_AND_FREQS) >= 0;
//记录位置
hasProx = indexOptions.compareTo(IndexOptions.DOCS_AND_FREQS_AND_POSITIONS) >= 0;
//记录偏移量
hasOffsets = indexOptions.compareTo(IndexOptions.DOCS_AND_FREQS_AND_POSITIONS_AND_OFFSETS) >= 0;
}
public TermsHashPerField(int streamCount, FieldInvertState fieldState, TermsHash termsHash, TermsHashPerField nextPerField, FieldInfo fieldInfo) {
//用于记录分词在bytePool中的偏移量
intPool = termsHash.intPool;
//用于记录分词信息,包含分词,docID,词频,位置等
bytePool = termsHash.bytePool;
//和bytePool相同
termBytePool = termsHash.termBytePool;
docState = termsHash.docState;
this.termsHash = termsHash;
bytesUsed = termsHash.bytesUsed;
this.fieldState = fieldState;
this.streamCount = streamCount;
numPostingInt = 2*streamCount;
this.fieldInfo = fieldInfo;
this.nextPerField = nextPerField;
PostingsBytesStartArray byteStarts = new PostingsBytesStartArray(this, bytesUsed);
bytesHash = new BytesRefHash(termBytePool, HASH_INIT_SIZE, byteStarts);
}
创建好PerField对象后采用头插法,将其插入fieldHash中
开始构建倒排索引结构
fp.invert(field, first);
invert方法比较长,我们分段进行分析
1.第一次执行方法,将状态信息进行重置
if (first) {
invertState.reset();
}
void reset() {
position = -1;
length = 0;
numOverlap = 0;
offset = 0;
maxTermFrequency = 0;
uniqueTermCount = 0;
lastStartOffset = 0;
lastPosition = 0;
}
2.对字段进行切词,然后创建倒排索引信息
try (TokenStream stream = tokenStream = field.tokenStream(docState.analyzer, tokenStream)) {
// reset the TokenStream to the first token
stream.reset();
//设置属性信息
invertState.setAttributeSource(stream);
termsHashPerField.start(field, first);
//获取分词
while (stream.incrementToken()) {
int posIncr = invertState.posIncrAttribute.getPositionIncrement();
invertState.position += posIncr;
//删除校验代码
invertState.lastPosition = invertState.position;
if (posIncr == 0) {
invertState.numOverlap++;
}
//分词的起始偏移量
int startOffset = invertState.offset + invertState.offsetAttribute.startOffset();
//分词的结束偏移量
int endOffset = invertState.offset + invertState.offsetAttribute.endOffset();
if (startOffset < invertState.lastStartOffset || endOffset < startOffset) {
throw new IllegalArgumentException("startOffset must be non-negative, and endOffset must be >= startOffset, and offsets must not go backwards "
+ "startOffset=" + startOffset + ",endOffset=" + endOffset + ",lastStartOffset=" + invertState.lastStartOffset + " for field '" + field.name() + "'");
}
invertState.lastStartOffset = startOffset;
try {
invertState.length = Math.addExact(invertState.length, invertState.termFreqAttribute.getTermFrequency());
} catch (ArithmeticException ae) {
throw new IllegalArgumentException("too many tokens for field \"" + field.name() + "\"");
}
try {
//写入分词信息
termsHashPerField.add();
} catch (MaxBytesLengthExceededException e) {
......
} catch (Throwable th) {
docWriter.onAbortingException(th);
throw th;
}
}
stream.end();
调用add方法
void add() throws IOException {
//1.判断分词之前是否已经添加,如果没有添加则添加到类似一个map中,然后生成一个termID
int termID = bytesHash.add(termAtt.getBytesRef());
//termID大于0则表示是新生成term
if (termID >= 0) {// New posting
bytesHash.byteStart(termID);
// 初始化intpool
if (numPostingInt + intPool.intUpto > IntBlockPool.INT_BLOCK_SIZE) {
intPool.nextBuffer();
}
//初始化bytepool
if (ByteBlockPool.BYTE_BLOCK_SIZE - bytePool.byteUpto < numPostingInt*ByteBlockPool.FIRST_LEVEL_SIZE) {
bytePool.nextBuffer();
}
intUptos = intPool.buffer;
intUptoStart = intPool.intUpto;
intPool.intUpto += streamCount;
postingsArray.intStarts[termID] = intUptoStart + intPool.intOffset;
//创建长度为5的空间,第一个位置用来保存docid+termFreq,第二个位置用来保存position
for(int i=0;i<streamCount;i++) {
final int upto = bytePool.newSlice(ByteBlockPool.FIRST_LEVEL_SIZE);
//intpool记录bytepool写入位置
intUptos[intUptoStart+i] = upto + bytePool.byteOffset;
}
postingsArray.byteStarts[termID] = intUptos[intUptoStart];
//新创建term
newTerm(termID);
} else {
//term已经存在,获取termID
termID = (-termID)-1;
int intStart = postingsArray.intStarts[termID];
intUptos = intPool.buffers[intStart >> IntBlockPool.INT_BLOCK_SHIFT];
intUptoStart = intStart & IntBlockPool.INT_BLOCK_MASK;
//添加term信息
addTerm(termID);
}
if (doNextCall) {
nextPerField.add(postingsArray.textStarts[termID]);
}
}
1.主要是判断term分词之前是否已经存在,如果已经存在则返回之前已经生成的termID,如果没有就首先根据term分词的长度来判断空间是否足以存储,如果不够则需要使用新的buffer,然后记录的起始位置记录到bytesStart中,其中下标位置就是分词的ID,然后根据term的长度,判断是使用1个字节还是2个字节存储长度信息,并且判断count总数是否达到了hashHalfSize,如果是则需要扩容,最后将termID返回
public int add(BytesRef bytes) {
assert bytesStart != null : "Bytesstart is null - not initialized";
//分词占用字节长度
final int length = bytes.length;
//通过hash计算位于ids数组的位置
final int hashPos = findHash(bytes);
//获取ID
int e = ids[hashPos];
//-1表示不存在
if (e == -1) {
// new entry
final int len2 = 2 + bytes.length;
//长度已经达到最大值,需要
if (len2 + pool.byteUpto > BYTE_BLOCK_SIZE) {
if (len2 > BYTE_BLOCK_SIZE) {
throw new MaxBytesLengthExceededException("bytes can be at most "
+ (BYTE_BLOCK_SIZE - 2) + " in length; got " + bytes.length);
}
//移动到下一个字节数组来保存分词数据
pool.nextBuffer();
}
final byte[] buffer = pool.buffer;
//字节数组存储位置
final int bufferUpto = pool.byteUpto;
//分词数量已经达到了bytesStart的索引长度则需要扩容
if (count >= bytesStart.length) {
bytesStart = bytesStartArray.grow();
assert count < bytesStart.length + 1 : "count: " + count + " len: "
+ bytesStart.length;
}
//生成一个id,每一个segment的id都是独立的
e = count++;
//保存term分词的起始位置
bytesStart[e] = bufferUpto + pool.byteOffset;
//判断分词的长度,使用变长类型来存储长度,节省空间
if (length < 128) {
// 1 byte to store length
buffer[bufferUpto] = (byte) length;
pool.byteUpto += length + 1;
assert length >= 0: "Length must be positive: " + length;
System.arraycopy(bytes.bytes, bytes.offset, buffer, bufferUpto + 1,
length);
} else {
// 2 byte to store length
buffer[bufferUpto] = (byte) (0x80 | (length & 0x7f));
buffer[bufferUpto + 1] = (byte) ((length >> 7) & 0xff);
pool.byteUpto += length + 2;
System.arraycopy(bytes.bytes, bytes.offset, buffer, bufferUpto + 2,
length);
}
assert ids[hashPos] == -1;
ids[hashPos] = e;
//扩容
if (count == hashHalfSize) {
rehash(2 * hashSize, true);
}
return e;
}
return -(e + 1);
}
继续回到add方法
public final static int[] NEXT_LEVEL_ARRAY = {1, 2, 3, 4, 5, 6, 7, 8, 9, 9};
public final static int[] LEVEL_SIZE_ARRAY = {5, 14, 20, 30, 40, 40, 80, 80, 120, 200};
public final static int FIRST_LEVEL_SIZE = LEVEL_SIZE_ARRAY[0];
//termID大于0则表示是新的分词
if (termID >= 0) {// New posting
bytesHash.byteStart(termID);
//初始化存储intPool,主要是保存bytePool中数据的元数据
if (numPostingInt + intPool.intUpto > IntBlockPool.INT_BLOCK_SIZE) {
intPool.nextBuffer();
}
//初始化bytePool
if (ByteBlockPool.BYTE_BLOCK_SIZE - bytePool.byteUpto < numPostingInt*ByteBlockPool.FIRST_LEVEL_SIZE) {
bytePool.nextBuffer();
}
intUptos = intPool.buffer;
//intPool存储数据的起始位置
intUptoStart = intPool.intUpto;
//再加上来个位置,用来保存词频和位置信息
intPool.intUpto += streamCount;
//记录intPool中分词的起始位置
postingsArray.intStarts[termID] = intUptoStart + intPool.intOffset;
//初始化
for(int i=0;i<streamCount;i++) {
//分配长度为5的存储空间,结束位置为16,
final int upto = bytePool.newSlice(ByteBlockPool.FIRST_LEVEL_SIZE);
//记录起始位置
intUptos[intUptoStart+i] = upto + bytePool.byteOffset;
}
//将信息记录到postingsArray中
postingsArray.byteStarts[termID] = intUptos[intUptoStart];
newTerm(termID);
}
上面intPool记录了bytePool中的位置信息,而bytePool才是真正存储数据的,NEXT_LEVEL_ARRAY分为10个等级,从1到9最高等级为9,每一级对应着不同长度
等级 | 长度 | 分隔符 |
---|---|---|
1 | 5 | 16 |
2 | 14 | 17 |
3 | 20 | 18 |
4 | 30 | 19 |
5 | 40 | 20 |
6 | 40 | 21 |
7 | 80 | 22 |
8 | 80 | 23 |
9 | 120 | 24 |
9 | 200 | 25 |
在这里会先预留两个FIRST_LEVEL_SIZE长度为5个字节的位置分隔符为16,一个用来存储docID和词频,一个用来存储位置信息,当5个字节存储不够时需要进行扩容,也就是从1级增涨到2级,长度为14分隔符为17,依次类推进行扩容
void newTerm(final int termID) {
// First time we're seeing this term since the last
// flush
//倒排元信息存储
final FreqProxPostingsArray postings = freqProxPostingsArray;
//设置分词的最后出现的docID
postings.lastDocIDs[termID] = docState.docID;
if (!hasFreq) {
assert postings.termFreqs == null;
postings.lastDocCodes[termID] = docState.docID;
fieldState.maxTermFrequency = Math.max(1, fieldState.maxTermFrequency);
} else {
//docid左移一位,最后一位用来表示freq,如果大于1则需要两个字节存储,如果是词频为1则用一个字节存储
postings.lastDocCodes[termID] = docState.docID << 1;
//记录词频
postings.termFreqs[termID] = getTermFreq();
//记录位置信息
if (hasProx) {
//将位置信息保存到上面申请第二个的长度为5个字节,分隔符为16的空间上
writeProx(termID, fieldState.position);
//记录偏移量
if (hasOffsets) {
writeOffsets(termID, fieldState.offset);
}
} else {
assert !hasOffsets;
}
//最大词频
fieldState.maxTermFrequency = Math.max(postings.termFreqs[termID], fieldState.maxTermFrequency);
}
fieldState.uniqueTermCount++;
}
这里保存记录词频信息,然后根据我们设置的策略,判断是否需要存储位置信息和偏移量
//记录位置信息
void writeProx(int termID, int proxCode) {
if (payloadAttribute == null) {
//左移1位,后一位的0表示没有payload
writeVInt(1, proxCode<<1);
} else {
BytesRef payload = payloadAttribute.getPayload();
if (payload != null && payload.length > 0) {
writeVInt(1, (proxCode<<1)|1);
writeVInt(1, payload.length);
writeBytes(1, payload.bytes, payload.offset, payload.length);
sawPayloads = true;
} else {
writeVInt(1, proxCode<<1);
}
}
assert postingsArray == freqProxPostingsArray;
freqProxPostingsArray.lastPositions[termID] = fieldState.position;
}
void writeVInt(int stream, int i) {
assert stream < streamCount;
while ((i & ~0x7F) != 0) {
writeByte(stream, (byte)((i & 0x7f) | 0x80));
i >>>= 7;
}
writeByte(stream, (byte) i);
}
void writeByte(int stream, byte b) {
//从intpool获取写入位置
int upto = intUptos[intUptoStart+stream];
byte[] bytes = bytePool.buffers[upto >> ByteBlockPool.BYTE_BLOCK_SHIFT];
assert bytes != null;
//计算写入偏移量
int offset = upto & ByteBlockPool.BYTE_BLOCK_MASK;
//写入位置已经到达末尾,需要进行扩容了
if (bytes[offset] != 0) {
// End of slice; allocate a new one
//长度不够存储位置信息需要进行扩容
offset = bytePool.allocSlice(bytes, offset);
bytes = bytePool.buffer;
intUptos[intUptoStart+stream] = offset + bytePool.byteOffset;
}
bytes[offset] = b;
(intUptos[intUptoStart+stream])++;
}
这里将term的位置信息写入,首先根据intpool中记录的写入偏移量,计算需要写入bytepool中的那个buffer,bytePool会创建一个二维数组来保存数据,二维数组一共有10行,每一行都是一个字节数组字节数组的长度为1<<15=32768个字节,根据记录的upto计算应该写入哪一行的字节数组中,然后计算写入的真正位置,如果该位置存储的数据不是0,则表示数据已经写到了末尾位置,需要进行扩容,如果为0则不需要扩容,最后将分词位置信息写入
那这个记录docID+termFreq和position的空间是如何进行扩容的呢?我们进入allocSlice方法
public int allocSlice(final byte[] slice, final int upto) {
//计算应该扩容后的等级,初始分隔符为16,第一次扩容也就是level为1
final int level = slice[upto] & 15;
//扩容之后的等级
final int newLevel = NEXT_LEVEL_ARRAY[level];
//扩容之后的长度
final int newSize = LEVEL_SIZE_ARRAY[newLevel];
// Maybe allocate another block
//需要进行扩容,新创建一个数组
if (byteUpto > BYTE_BLOCK_SIZE-newSize) {
nextBuffer();
}
//新的写入位置
final int newUpto = byteUpto;
//偏移量
final int offset = newUpto + byteOffset;
//扩容后的下一个数据写入位置
byteUpto += newSize;
// Copy forward the past 3 bytes (which we are about
// to overwrite with the forwarding address):
//将最后三位的数据copy到新写入位置
buffer[newUpto] = slice[upto-3];
buffer[newUpto+1] = slice[upto-2];
buffer[newUpto+2] = slice[upto-1];
// Write forwarding address at end of last slice:
//然后将最后4位用来保存扩容后的偏移量信息,是一个指针指向了起始位置
slice[upto-3] = (byte) (offset >>> 24);
slice[upto-2] = (byte) (offset >>> 16);
slice[upto-1] = (byte) (offset >>> 8);
slice[upto] = (byte) offset;
// Write new level:
//写入新的分隔符
buffer[byteUpto-1] = (byte) (16|newLevel);
return newUpto+3;
}
扩容是一级一级的扩容,这里比如我们创建的是1级长度为5的空间结束标志为16,这里我们需要扩容计算后level为1,扩容后的newLevel为2,扩容后的newSize长度为14,然后计算新的写入位置然后将最后三个位置的数据copy到新的写入位置,然后留出4个字节长度用来保存指针,指向扩容后的首个写入位置,然后返回写入位置
继续回到newTerm方法,如果需要写入偏移量,则在和上面写入position信息一样
void writeOffsets(int termID, int offsetAccum) {
final int startOffset = offsetAccum + offsetAttribute.startOffset();
final int endOffset = offsetAccum + offsetAttribute.endOffset();
assert startOffset - freqProxPostingsArray.lastOffsets[termID] >= 0;
writeVInt(1, startOffset - freqProxPostingsArray.lastOffsets[termID]);
writeVInt(1, endOffset - startOffset);
freqProxPostingsArray.lastOffsets[termID] = startOffset;
}
如果term之前已经写入过则执行addTerm(termID)逻辑
void addTerm(final int termID) {
final FreqProxPostingsArray postings = freqProxPostingsArray;
assert !hasFreq || postings.termFreqs[termID] > 0;
//不记录词频
if (!hasFreq) {
assert postings.termFreqs == null;
if (termFreqAtt.getTermFrequency() != 1) {
throw new IllegalStateException("field \"" + fieldInfo.name + "\": must index term freq while using custom TermFrequencyAttribute");
}
//只记录docID
if (docState.docID != postings.lastDocIDs[termID]) {
// New document; now encode docCode for previous doc:
assert docState.docID > postings.lastDocIDs[termID];
writeVInt(0, postings.lastDocCodes[termID]);
postings.lastDocCodes[termID] = docState.docID - postings.lastDocIDs[termID];
postings.lastDocIDs[termID] = docState.docID;
fieldState.uniqueTermCount++;
}
} else if (docState.docID != postings.lastDocIDs[termID]) {
//本次相同term的docID和上次不一样则可以将docID和词频写入
assert docState.docID > postings.lastDocIDs[termID]:"id: "+docState.docID + " postings ID: "+ postings.lastDocIDs[termID] + " termID: "+termID;
// Term not yet seen in the current doc but previously
// seen in other doc(s) since the last flush
// Now that we know doc freq for previous doc,
// write it & lastDocCode
//如果只出现一次则1个字节记录docID和词频
if (1 == postings.termFreqs[termID]) {
writeVInt(0, postings.lastDocCodes[termID]|1);
} else {
writeVInt(0, postings.lastDocCodes[termID]);
writeVInt(0, postings.termFreqs[termID]);
}
// Init freq for the current document
postings.termFreqs[termID] = getTermFreq();
fieldState.maxTermFrequency = Math.max(postings.termFreqs[termID], fieldState.maxTermFrequency);
//docID差值code
postings.lastDocCodes[termID] = (docState.docID - postings.lastDocIDs[termID]) << 1;
//记录docID
postings.lastDocIDs[termID] = docState.docID;
if (hasProx) {
//记录位置信息
writeProx(termID, fieldState.position);
if (hasOffsets) {
//记录偏移量
postings.lastOffsets[termID] = 0;
writeOffsets(termID, fieldState.offset);
}
} else {
assert !hasOffsets;
}
fieldState.uniqueTermCount++;
} else {
//倒排索引中记录词频
postings.termFreqs[termID] = Math.addExact(postings.termFreqs[termID], getTermFreq());
fieldState.maxTermFrequency = Math.max(fieldState.maxTermFrequency, postings.termFreqs[termID]);
//记录位置信息和偏移量
if (hasProx) {
writeProx(termID, fieldState.position-postings.lastPositions[termID]);
if (hasOffsets) {
writeOffsets(termID, fieldState.offset);
}
}
}
}
判断配置是否记录词频信息,如果不需要记录则判断当前docID和上次写入的docID是否相同,如果不同说明上次处理的doc文档已经处理结束,可以记录docID了;如果需要记录词频,和上面一样判断当前docID和上次记录的docID是否相同,如果不相同则说明上次的doc文档已经处理完成,词频信息也可以确定了,则可以写入docID和词频信息,然后再根据配置记录位置信息和偏移量。循环往复将所有要写入的数据都生成倒排索引数据。
更多推荐
所有评论(0)