lucene数据写入-03数据刷盘
回到org.apache.lucene.index.FreqProxTermsWriter#flush方法,写入成功调用close方法,最终调用的还是org.apache.lucene.codecs.blocktree.BlockTreeTermsWriter#close方法。继续跟踪执行org.apache.lucene.index.DocumentsWriter#doFlush方法,最终调用o
前两篇文章我们已经分析了倒排数据是如何组织的,现在我们看下数据如何保存到文件中
最后调用indexWriter.close(),indexWriter执行关闭操作,将内存数据写入文件并刷盘
flush(true, true);
final void flush(boolean triggerMerge, boolean applyAllDeletes) throws IOException {
ensureOpen(false);
if (doFlush(applyAllDeletes) && triggerMerge) {
//需要merge
maybeMerge(config.getMergePolicy(), MergeTrigger.FULL_FLUSH, UNBOUNDED_MAX_MERGE_SEGMENTS);
}
}
private boolean doFlush(boolean applyAllDeletes) throws IOException {
if (tragedy.get() != null) {
throw new IllegalStateException("this writer hit an unrecoverable error; cannot flush", tragedy.get());
}
//刷盘前回调
doBeforeFlush();
testPoint("startDoFlush");
boolean success = false;
try {
if (infoStream.isEnabled("IW")) {
infoStream.message("IW", " start flush: applyAllDeletes=" + applyAllDeletes);
infoStream.message("IW", " index before flush " + segString());
}
boolean anyChanges = false;
synchronized (fullFlushLock) {
boolean flushSuccess = false;
try {
//提交
long seqNo = docWriter.flushAllThreads();
if (seqNo < 0) {
seqNo = -seqNo;
anyChanges = true;
} else {
anyChanges = false;
}
if (!anyChanges) {
// flushCount is incremented in flushAllThreads
flushCount.incrementAndGet();
}
//发布flush的段
publishFlushedSegments(true);
flushSuccess = true;
} finally {
assert holdsFullFlushLock();
docWriter.finishFullFlush(flushSuccess);
processEvents(false);
}
}
if (applyAllDeletes) {
applyAllDeletesAndUpdates();
}
anyChanges |= maybeMerge.getAndSet(false);
synchronized(this) {
writeReaderPool(applyAllDeletes);
doAfterFlush();
success = true;
return anyChanges;
}
} catch (VirtualMachineError tragedy) {
tragicEvent(tragedy, "doFlush");
throw tragedy;
} finally {
if (!success) {
if (infoStream.isEnabled("IW")) {
infoStream.message("IW", "hit exception during flush");
}
maybeCloseOnTragicEvent();
}
}
}
继续跟踪执行org.apache.lucene.index.DocumentsWriter#doFlush方法,最终调用org.apache.lucene.index.DefaultIndexingChain#flush方法
public Sorter.DocMap flush(SegmentWriteState state) throws IOException {
// NOTE: caller (DocumentsWriterPerThread) handles
// aborting on any exception from this method
Sorter.DocMap sortMap = maybeSortSegment(state);
int maxDoc = state.segmentInfo.maxDoc();
long t0 = System.nanoTime();
//写入norms
writeNorms(state, sortMap);
if (docState.infoStream.isEnabled("IW")) {
docState.infoStream.message("IW", ((System.nanoTime()-t0)/1000000) + " msec to write norms");
}
SegmentReadState readState = new SegmentReadState(state.directory, state.segmentInfo, state.fieldInfos, true, IOContext.READ, state.segmentSuffix, Collections.emptyMap());
t0 = System.nanoTime();
//写入doc values
writeDocValues(state, sortMap);
if (docState.infoStream.isEnabled("IW")) {
docState.infoStream.message("IW", ((System.nanoTime()-t0)/1000000) + " msec to write docValues");
}
t0 = System.nanoTime();
//写入points
writePoints(state, sortMap);
if (docState.infoStream.isEnabled("IW")) {
docState.infoStream.message("IW", ((System.nanoTime()-t0)/1000000) + " msec to write points");
}
// it's possible all docs hit non-aborting exceptions...
t0 = System.nanoTime();
storedFieldsConsumer.finish(maxDoc);
storedFieldsConsumer.flush(state, sortMap);
if (docState.infoStream.isEnabled("IW")) {
docState.infoStream.message("IW", ((System.nanoTime()-t0)/1000000) + " msec to finish stored fields");
}
t0 = System.nanoTime();
Map<String,TermsHashPerField> fieldsToFlush = new HashMap<>();
for (int i=0;i<fieldHash.length;i++) {
PerField perField = fieldHash[i];
while (perField != null) {
if (perField.invertState != null) {
fieldsToFlush.put(perField.fieldInfo.name, perField.termsHashPerField);
}
perField = perField.next;
}
}
try (NormsProducer norms = readState.fieldInfos.hasNorms()
? state.segmentInfo.getCodec().normsFormat().normsProducer(readState)
: null) {
NormsProducer normsMergeInstance = null;
if (norms != null) {
// Use the merge instance in order to reuse the same IndexInput for all terms
normsMergeInstance = norms.getMergeInstance();
}
//倒排数据写入
termsHash.flush(fieldsToFlush, state, sortMap, normsMergeInstance);
}
if (docState.infoStream.isEnabled("IW")) {
docState.infoStream.message("IW", ((System.nanoTime()-t0)/1000000) + " msec to write postings and finish vectors");
}
// Important to save after asking consumer to flush so
// consumer can alter the FieldInfo* if necessary. EG,
// FreqProxTermsWriter does this with
// FieldInfo.storePayload.
t0 = System.nanoTime();
//字段信息写入
docWriter.codec.fieldInfosFormat().write(state.directory, state.segmentInfo, "", state.fieldInfos, IOContext.DEFAULT);
if (docState.infoStream.isEnabled("IW")) {
docState.infoStream.message("IW", ((System.nanoTime()-t0)/1000000) + " msec to write fieldInfos");
}
return sortMap;
}
这里可以看到很多种信息的写入我们主要关系的是倒排数据是如何写入的,其他写入有兴趣的同学可以自行查看。
倒排数据写入入口
//倒排数据写入
termsHash.flush(fieldsToFlush, state, sortMap, normsMergeInstance);
继续往下执行org.apache.lucene.index.FreqProxTermsWriter#flush
public void flush(Map<String,TermsHashPerField> fieldsToFlush, final SegmentWriteState state,
Sorter.DocMap sortMap, NormsProducer norms) throws IOException {
super.flush(fieldsToFlush, state, sortMap, norms);
// Gather all fields that saw any postings:
List<FreqProxTermsWriterPerField> allFields = new ArrayList<>();
for (TermsHashPerField f : fieldsToFlush.values()) {
final FreqProxTermsWriterPerField perField = (FreqProxTermsWriterPerField) f;
if (perField.bytesHash.size() > 0) {
perField.sortPostings();
assert perField.fieldInfo.getIndexOptions() != IndexOptions.NONE;
allFields.add(perField);
}
}
// Sort by field name
//根据字段名排序
CollectionUtil.introSort(allFields);
Fields fields = new FreqProxFields(allFields);
applyDeletes(state, fields);
if (sortMap != null) {
fields = new SortingLeafReader.SortingFields(fields, state.fieldInfos, sortMap);
}
FieldsConsumer consumer = state.segmentInfo.getCodec().postingsFormat().fieldsConsumer(state);
boolean success = false;
try {
//写入数据
consumer.write(fields, norms);
success = true;
} finally {
if (success) {
IOUtils.close(consumer);
} else {
IOUtils.closeWhileHandlingException(consumer);
}
}
}
写入前会对字段进行排序,然后获取FieldsConsumer写入数据,这里会调用org.apache.lucene.codecs.blocktree.BlockTreeTermsWriter#write方法
public void write(Fields fields, NormsProducer norms) throws IOException {
//if (DEBUG) System.out.println("\nBTTW.write seg=" + segment);
String lastField = null;
for(String field : fields) {
assert lastField == null || lastField.compareTo(field) < 0;
lastField = field;
//if (DEBUG) System.out.println("\nBTTW.write seg=" + segment + " field=" + field);
Terms terms = fields.terms(field);
if (terms == null) {
continue;
}
//迭代每隔term分词
TermsEnum termsEnum = terms.iterator();
TermsWriter termsWriter = new TermsWriter(fieldInfos.fieldInfo(field));
while (true) {
//获取分词信息
BytesRef term = termsEnum.next();
//if (DEBUG) System.out.println("BTTW: next term " + term);
if (term == null) {
break;
}
//if (DEBUG) System.out.println("write field=" + fieldInfo.name + " term=" + brToString(term));
//记录term信息
termsWriter.write(term, termsEnum, norms);
}
//构建索引结构
termsWriter.finish();
//if (DEBUG) System.out.println("\nBTTW.write done seg=" + segment + " field=" + field);
}
}
可以看到循环遍历每个字段,然后获取每个字段的分词信息执行写入,我们分段来看上面的代码
TermsEnum termsEnum = terms.iterator();
调用的是org.apache.lucene.index.FreqProxFields.FreqProxTerms#iterator方法
public TermsEnum iterator() {
FreqProxTermsEnum termsEnum = new FreqProxTermsEnum(terms);
termsEnum.reset();
return termsEnum;
}
public FreqProxTermsEnum(FreqProxTermsWriterPerField terms) {
this.terms = terms;
this.numTerms = terms.bytesHash.size();
sortedTermIDs = terms.sortedTermIDs;
assert sortedTermIDs != null;
//内存中保存的倒排表元数据
postingsArray = (FreqProxPostingsArray) terms.postingsArray;
}
然后循环遍历获取每个term分词信息然后执行写入
BytesRef term = termsEnum.next();
public BytesRef next() {
ord++;
if (ord >= numTerms) {
return null;
} else {
//term分词的字符串起始偏移量
int textStart = postingsArray.textStarts[sortedTermIDs[ord]];
terms.bytePool.setBytesRef(scratch, textStart);
return scratch;
}
}
public void setBytesRef(BytesRef term, int textStart) {
final byte[] bytes = term.bytes = buffers[textStart >> BYTE_BLOCK_SHIFT];
int pos = textStart & BYTE_BLOCK_MASK;
//使用一个字节记录字符串的长度
if ((bytes[pos] & 0x80) == 0) {
// length is 1 byte
term.length = bytes[pos];
term.offset = pos+1;
} else {
// length is 2 bytes
term.length = (bytes[pos]&0x7f) + ((bytes[pos+1]&0xff)<<7);
term.offset = pos+2;
}
assert term.length >= 0;
}
在这里初始化term分词的字符串长度信息和起始偏移量,然后调用write写入term分词信息
//记录term信息
termsWriter.write(term, termsEnum, norms);
public void write(BytesRef text, TermsEnum termsEnum, NormsProducer norms) throws IOException {
//生成term状态记录对象
BlockTermState state = postingsWriter.writeTerm(text, termsEnum, docsSeen, norms);
if (state != null) {
assert state.docFreq != 0;
assert fieldInfo.getIndexOptions() == IndexOptions.DOCS || state.totalTermFreq >= state.docFreq: "postingsWriter=" + postingsWriter;
//将文本写入
pushTerm(text);
PendingTerm term = new PendingTerm(text, state);
pending.add(term);
//if (DEBUG) System.out.println(" add pending term = " + text + " pending.size()=" + pending.size());
sumDocFreq += state.docFreq;
sumTotalTermFreq += state.totalTermFreq;
numTerms++;
if (firstPendingTerm == null) {
firstPendingTerm = term;
}
lastPendingTerm = term;
}
}
我们将上面代码分开来看,首先调用postingsWriter.writeTerm生成一个term的状态对象
//生成term状态记录对象
BlockTermState state = postingsWriter.writeTerm(text, termsEnum, docsSeen, norms);
public final BlockTermState writeTerm(BytesRef term, TermsEnum termsEnum, FixedBitSet docsSeen, NormsProducer norms) throws IOException {
NumericDocValues normValues;
if (fieldInfo.hasNorms() == false) {
normValues = null;
} else {
normValues = norms.getNorms(fieldInfo);
}
startTerm(normValues);
//倒排表
postingsEnum = termsEnum.postings(postingsEnum, enumFlags);
assert postingsEnum != null;
int docFreq = 0;
long totalTermFreq = 0;
while (true) {
//文档ID
int docID = postingsEnum.nextDoc();
//如果没有文档了,跳出循环
if (docID == PostingsEnum.NO_MORE_DOCS) {
break;
}
docFreq++;
docsSeen.set(docID);
int freq;
//记录总的词频
if (writeFreqs) {
freq = postingsEnum.freq();
totalTermFreq += freq;
} else {
freq = -1;
}
startDoc(docID, freq);
//写入位置
if (writePositions) {
for(int i=0;i<freq;i++) {
//获取位置
int pos = postingsEnum.nextPosition();
BytesRef payload = writePayloads ? postingsEnum.getPayload() : null;
int startOffset;
int endOffset;
//偏移量
if (writeOffsets) {
startOffset = postingsEnum.startOffset();
endOffset = postingsEnum.endOffset();
} else {
startOffset = -1;
endOffset = -1;
}
//记录位置
addPosition(pos, payload, startOffset, endOffset);
}
}
finishDoc();
}
if (docFreq == 0) {
return null;
} else {
BlockTermState state = newTermState();
state.docFreq = docFreq;
state.totalTermFreq = writeFreqs ? totalTermFreq : -1;
finishTerm(state);
return state;
}
}
这里其实就是读取内存中的保存的倒排数据然后分别写入不同的文件的过程
//倒排表
postingsEnum = termsEnum.postings(postingsEnum, enumFlags);
这里会调用org.apache.lucene.index.FreqProxFields.FreqProxTermsEnum#postings方法,创建一个FreqProxPostingsEnum对象,然后初始化docID+freqs的reader对象和用于读取位置信息的posReader对象
public void reset(int termID) {
this.termID = termID;
//docID+freqs读取
terms.initReader(reader, termID, 0);
//pos读取
terms.initReader(posReader, termID, 1);
ended = false;
docID = -1;
posLeft = 0;
}
public void initReader(ByteSliceReader reader, int termID, int stream) {
assert stream < streamCount;
//获取term分词在intPool中的起始位置
int intStart = postingsArray.intStarts[termID];
//获取intPool
final int[] ints = intPool.buffers[intStart >> IntBlockPool.INT_BLOCK_SHIFT];
final int upto = intStart & IntBlockPool.INT_BLOCK_MASK;
//初始化设置读取起始位置和结束位置
reader.init(bytePool,
postingsArray.byteStarts[termID]+stream*ByteBlockPool.FIRST_LEVEL_SIZE,
ints[upto+stream]);
}
public void init(ByteBlockPool pool, int startIndex, int endIndex) {
assert endIndex-startIndex >= 0;
assert startIndex >= 0;
assert endIndex >= 0;
this.pool = pool;
this.endIndex = endIndex;
level = 0;
bufferUpto = startIndex / ByteBlockPool.BYTE_BLOCK_SIZE;
bufferOffset = bufferUpto * ByteBlockPool.BYTE_BLOCK_SIZE;
buffer = pool.buffers[bufferUpto];
//读取起始偏移量
upto = startIndex & ByteBlockPool.BYTE_BLOCK_MASK;
final int firstSize = ByteBlockPool.LEVEL_SIZE_ARRAY[0];
//判断该位置保存的是指针还是真正的数据,设置读取的结尾
if (startIndex+firstSize >= endIndex) {
// There is only this one slice to read
limit = endIndex & ByteBlockPool.BYTE_BLOCK_MASK;
} else
limit = upto+firstSize-4;
}
这里可以看到在初始化docID+freqs的reader的时候,会根据intPool中记录的偏移量计算起始位置upto和结束位置limit
继续回到writeTerm方法
//文档ID
int docID = postingsEnum.nextDoc();
public int nextDoc() throws IOException {
//初始设置docID
if (docID == -1) {
docID = 0;
}
while (posLeft != 0) {
nextPosition();
}
//upto和endIndex相同,读取到了尾部
if (reader.eof()) {
if (ended) {
return NO_MORE_DOCS;
} else {
//最后一个docID和freqs从倒排表获取
ended = true;
docID = postingsArray.lastDocIDs[termID];
freq = postingsArray.termFreqs[termID];
}
} else {
//从bytePool读取
int code = reader.readVInt();
docID += code >>> 1;
//code的最后一位为1,说明词频为1
if ((code & 1) != 0) {
freq = 1;
} else {
//读取词频
freq = reader.readVInt();
}
assert docID != postingsArray.lastDocIDs[termID];
}
posLeft = freq;
pos = 0;
startOffset = 0;
return docID;
}
这里主要是从bytePool中将docID、freqs词频信息读取出来然后记录下来
public byte readByte() {
assert !eof();
assert upto <= limit;
//读取达到尾部,需要根据指针计算读取
if (upto == limit)
nextSlice();
return buffer[upto++];
}
public void nextSlice() {
//根据指针信息跳转到读取位置
final int nextIndex = ((buffer[limit]&0xff)<<24) + ((buffer[1+limit]&0xff)<<16) + ((buffer[2+limit]&0xff)<<8) + (buffer[3+limit]&0xff);
//层数
level = ByteBlockPool.NEXT_LEVEL_ARRAY[level];
//长度
final int newSize = ByteBlockPool.LEVEL_SIZE_ARRAY[level];
bufferUpto = nextIndex / ByteBlockPool.BYTE_BLOCK_SIZE;
bufferOffset = bufferUpto * ByteBlockPool.BYTE_BLOCK_SIZE;
buffer = pool.buffers[bufferUpto];
//计算读取的起始位置
upto = nextIndex & ByteBlockPool.BYTE_BLOCK_MASK;
//记录的是偏移量还是正在存储数据,设置不同的limit值
if (nextIndex + newSize >= endIndex) {
// We are advancing to the final slice
assert endIndex - nextIndex > 0;
limit = endIndex - bufferOffset;
} else {
// This is not the final slice (subtract 4 for the
// forwarding address at the end of this new slice)
limit = upto+newSize-4;
}
}
上面的读取过程中,可能读取到的最后位置是指针信息,这里需要计算新的读取偏移量,然后根据新计算得到的偏移量继续读取docID和freqs,当然pos位置信息的读取也是一样的
继续回到org.apache.lucene.codecs.PushPostingsWriterBase#writeTerm方法
循环遍历获取一个term分词出现的所有docID、freqs词频、pos位置信息然后保存到缓存中
记录term出现的docID和词频的差值编码
startDoc(docID, freq);
public void startDoc(int docID, int termDocFreq) throws IOException {
if (lastBlockDocID != -1 && docBufferUpto == 0) {
skipWriter.bufferSkip(lastBlockDocID, competitiveFreqNormAccumulator, docCount,
lastBlockPosFP, lastBlockPayFP, lastBlockPosBufferUpto, lastBlockPayloadByteUpto);
competitiveFreqNormAccumulator.clear();
}
//差值编码
final int docDelta = docID - lastDocID;
if (docID < 0 || (docCount > 0 && docDelta <= 0)) {
throw new CorruptIndexException("docs out of order (" + docID + " <= " + lastDocID + " )", docOut);
}
//保存文档ID差值
docDeltaBuffer[docBufferUpto] = docDelta;
//保存词频
if (writeFreqs) {
freqBuffer[docBufferUpto] = termDocFreq;
}
docBufferUpto++;
docCount++;
//数量达到128个时,写成一个block
if (docBufferUpto == BLOCK_SIZE) {
forUtil.writeBlock(docDeltaBuffer, encoded, docOut);
if (writeFreqs) {
forUtil.writeBlock(freqBuffer, encoded, docOut);
}
}
lastDocID = docID;
lastPosition = 0;
lastStartOffset = 0;
long norm;
if (fieldHasNorms) {
boolean found = norms.advanceExact(docID);
if (found == false) {
norm = 1L;
} else {
norm = norms.longValue();
assert norm != 0 : docID;
}
} else {
norm = 1L;
}
competitiveFreqNormAccumulator.add(writeFreqs ? termDocFreq : 1, norm);
}
记录pos位置的差值编码
//记录位置
addPosition(pos, payload, startOffset, endOffset);
public void addPosition(int position, BytesRef payload, int startOffset, int endOffset) throws IOException {
if (position > IndexWriter.MAX_POSITION) {
throw new CorruptIndexException("position=" + position + " is too large (> IndexWriter.MAX_POSITION=" + IndexWriter.MAX_POSITION + ")", docOut);
}
if (position < 0) {
throw new CorruptIndexException("position=" + position + " is < 0", docOut);
}
//位置信息差值
posDeltaBuffer[posBufferUpto] = position - lastPosition;
if (writePayloads) {
if (payload == null || payload.length == 0) {
// no payload
payloadLengthBuffer[posBufferUpto] = 0;
} else {
payloadLengthBuffer[posBufferUpto] = payload.length;
if (payloadByteUpto + payload.length > payloadBytes.length) {
payloadBytes = ArrayUtil.grow(payloadBytes, payloadByteUpto + payload.length);
}
System.arraycopy(payload.bytes, payload.offset, payloadBytes, payloadByteUpto, payload.length);
payloadByteUpto += payload.length;
}
}
//记录偏移量
if (writeOffsets) {
assert startOffset >= lastStartOffset;
assert endOffset >= startOffset;
offsetStartDeltaBuffer[posBufferUpto] = startOffset - lastStartOffset;
offsetLengthBuffer[posBufferUpto] = endOffset - startOffset;
lastStartOffset = startOffset;
}
posBufferUpto++;
lastPosition = position;
//达到128写成一个block
if (posBufferUpto == BLOCK_SIZE) {
forUtil.writeBlock(posDeltaBuffer, encoded, posOut);
if (writePayloads) {
forUtil.writeBlock(payloadLengthBuffer, encoded, payOut);
payOut.writeVInt(payloadByteUpto);
payOut.writeBytes(payloadBytes, 0, payloadByteUpto);
payloadByteUpto = 0;
}
if (writeOffsets) {
forUtil.writeBlock(offsetStartDeltaBuffer, encoded, payOut);
forUtil.writeBlock(offsetLengthBuffer, encoded, payOut);
}
posBufferUpto = 0;
}
}
最后调用org.apache.lucene.codecs.lucene50.Lucene50PostingsWriter#finishTerm方法将这些信息写入不同的文件中
public void finishTerm(BlockTermState _state) throws IOException {
IntBlockTermState state = (IntBlockTermState) _state;
assert state.docFreq > 0;
// TODO: wasteful we are counting this (counting # docs
// for this term) in two places?
assert state.docFreq == docCount: state.docFreq + " vs " + docCount;
// docFreq == 1, don't write the single docid/freq to a separate file along with a pointer to it.
final int singletonDocID;
//词频为1
if (state.docFreq == 1) {
// pulse the singleton docid into the term dictionary, freq is implicitly totalTermFreq
singletonDocID = docDeltaBuffer[0];
} else {
singletonDocID = -1;
// vInt encode the remaining doc deltas and freqs:
//循环写入词条的docid和词频
for(int i=0;i<docBufferUpto;i++) {
final int docDelta = docDeltaBuffer[i];
final int freq = freqBuffer[i];
//不需要写入词频则这里只需要记录docID
if (!writeFreqs) {
docOut.writeVInt(docDelta);
} else if (freqBuffer[i] == 1) {
//词频为1,则使用一个字节来记录docid和词频
docOut.writeVInt((docDelta<<1)|1);
} else {
//.doc文件写入docID和词频
docOut.writeVInt(docDelta<<1);
docOut.writeVInt(freq);
}
}
}
final long lastPosBlockOffset;
//写入位置信息
if (writePositions) {
// totalTermFreq is just total number of positions(or payloads, or offsets)
// associated with current term.
assert state.totalTermFreq != -1;
if (state.totalTermFreq > BLOCK_SIZE) {
// record file offset for last pos in last block
lastPosBlockOffset = posOut.getFilePointer() - posStartFP;
} else {
lastPosBlockOffset = -1;
}
if (posBufferUpto > 0) {
int lastPayloadLength = -1; // force first payload length to be written
int lastOffsetLength = -1; // force first offset length to be written
int payloadBytesReadUpto = 0;
for(int i=0;i<posBufferUpto;i++) {
final int posDelta = posDeltaBuffer[i];
//判断是否需要写入payload
if (writePayloads) {
final int payloadLength = payloadLengthBuffer[i];
if (payloadLength != lastPayloadLength) {
lastPayloadLength = payloadLength;
posOut.writeVInt((posDelta<<1)|1);
posOut.writeVInt(payloadLength);
} else {
posOut.writeVInt(posDelta<<1);
}
if (payloadLength != 0) {
posOut.writeBytes(payloadBytes, payloadBytesReadUpto, payloadLength);
payloadBytesReadUpto += payloadLength;
}
} else {
//.pos文件写入位置信息差值
posOut.writeVInt(posDelta);
}
//是否需要写入偏移量
if (writeOffsets) {
int delta = offsetStartDeltaBuffer[i];
int length = offsetLengthBuffer[i];
if (length == lastOffsetLength) {
posOut.writeVInt(delta << 1);
} else {
posOut.writeVInt(delta << 1 | 1);
posOut.writeVInt(length);
lastOffsetLength = length;
}
}
}
if (writePayloads) {
assert payloadBytesReadUpto == payloadByteUpto;
payloadByteUpto = 0;
}
}
} else {
lastPosBlockOffset = -1;
}
long skipOffset;
//doc数量超过128个时候,需要将跳表写出到.doc文件
if (docCount > BLOCK_SIZE) {
skipOffset = skipWriter.writeSkip(docOut) - docStartFP;
} else {
skipOffset = -1;
}
//位置指针
state.docStartFP = docStartFP;
state.posStartFP = posStartFP;
state.payStartFP = payStartFP;
state.singletonDocID = singletonDocID;
state.skipOffset = skipOffset;
state.lastPosBlockOffset = lastPosBlockOffset;
docBufferUpto = 0;
posBufferUpto = 0;
lastDocID = 0;
docCount = 0;
}
将docID差值编码、词频信息写入.doc文件,将位置信息写入.pos文件,然后将这些文件的指针信息保存到state对象中,然后返回
继续回到org.apache.lucene.codecs.blocktree.BlockTreeTermsWriter.TermsWriter#write方法中,上述的倒排信息已经存储完毕,开始构造term数据和term index数据
public void write(BytesRef text, TermsEnum termsEnum, NormsProducer norms) throws IOException {
//生成term状态记录对象
BlockTermState state = postingsWriter.writeTerm(text, termsEnum, docsSeen, norms);
if (state != null) {
assert state.docFreq != 0;
assert fieldInfo.getIndexOptions() == IndexOptions.DOCS || state.totalTermFreq >= state.docFreq: "postingsWriter=" + postingsWriter;
//计算是否需要组成一个block
pushTerm(text);
PendingTerm term = new PendingTerm(text, state);
pending.add(term);
//if (DEBUG) System.out.println(" add pending term = " + text + " pending.size()=" + pending.size());
sumDocFreq += state.docFreq;
sumTotalTermFreq += state.totalTermFreq;
numTerms++;
if (firstPendingTerm == null) {
firstPendingTerm = term;
}
lastPendingTerm = term;
}
}
private void pushTerm(BytesRef text) throws IOException {
int limit = Math.min(lastTerm.length(), text.length);
// Find common prefix between last term and current term:
//获取公共前缀
int pos = 0;
while (pos < limit && lastTerm.byteAt(pos) == text.bytes[text.offset+pos]) {
pos++;
}
// if (DEBUG) System.out.println(" shared=" + pos + " lastTerm.length=" + lastTerm.length);
// Close the "abandoned" suffix now:
for(int i=lastTerm.length()-1;i>=pos;i--) {
// How many items on top of the stack share the current suffix
// we are closing:
//公共前缀的数量
int prefixTopSize = pending.size() - prefixStarts[i];
//前缀长度到达了阈值需要写出一个block
if (prefixTopSize >= minItemsInBlock) {
// if (DEBUG) System.out.println("pushTerm i=" + i + " prefixTopSize=" + prefixTopSize + " minItemsInBlock=" + minItemsInBlock);
//写一个block
writeBlocks(i+1, prefixTopSize);
prefixStarts[i] -= prefixTopSize-1;
}
}
if (prefixStarts.length < text.length) {
prefixStarts = ArrayUtil.grow(prefixStarts, text.length);
}
// Init new tail:
for(int i=pos;i<text.length;i++) {
prefixStarts[i] = pending.size();
}
lastTerm.copyBytes(text);
}
这里获取上次写入的term和本次写入term的公共前缀长度,如果达到了阈值则需要组成一个block写入
void writeBlocks(int prefixLength, int count) throws IOException {
assert count > 0;
//if (DEBUG2) {
// BytesRef br = new BytesRef(lastTerm.bytes());
// br.length = prefixLength;
// System.out.println("writeBlocks: seg=" + segment + " prefix=" + brToString(br) + " count=" + count);
//}
// Root block better write all remaining pending entries:
assert prefixLength > 0 || count == pending.size();
int lastSuffixLeadLabel = -1;
// True if we saw at least one term in this block (we record if a block
// only points to sub-blocks in the terms index so we can avoid seeking
// to it when we are looking for a term):
boolean hasTerms = false;
boolean hasSubBlocks = false;
//起始偏移
int start = pending.size()-count;
//结束位置
int end = pending.size();
int nextBlockStart = start;
int nextFloorLeadLabel = -1;
for (int i=start; i<end; i++) {
//
PendingEntry ent = pending.get(i);
//后缀标签
int suffixLeadLabel;
//是一个term
if (ent.isTerm) {
PendingTerm term = (PendingTerm) ent;
//前缀和term分词相同
if (term.termBytes.length == prefixLength) {
// Suffix is 0, i.e. prefix 'foo' and term is
// 'foo' so the term has empty string suffix
// in this block
assert lastSuffixLeadLabel == -1: "i=" + i + " lastSuffixLeadLabel=" + lastSuffixLeadLabel;
suffixLeadLabel = -1;
} else {
//后缀位置
suffixLeadLabel = term.termBytes[prefixLength] & 0xff;
}
} else {
//对象是一个block
PendingBlock block = (PendingBlock) ent;
assert block.prefix.length > prefixLength;
suffixLeadLabel = block.prefix.bytes[block.prefix.offset + prefixLength] & 0xff;
}
// if (DEBUG) System.out.println(" i=" + i + " ent=" + ent + " suffixLeadLabel=" + suffixLeadLabel);
//新的term
if (suffixLeadLabel != lastSuffixLeadLabel) {
int itemsInBlock = i - nextBlockStart;
//公共前缀的单词长度超过了阈值,则进行切分生成floor结构,分隔按照minItemsInBlock进行,
// 不一定是最优的方案,可能会导致最后一个block长度太小
if (itemsInBlock >= minItemsInBlock && end-nextBlockStart > maxItemsInBlock) {
// The count is too large for one block, so we must break it into "floor" blocks, where we record
// the leading label of the suffix of the first term in each floor block, so at search time we can
// jump to the right floor block. We just use a naive greedy segmenter here: make a new floor
// block as soon as we have at least minItemsInBlock. This is not always best: it often produces
// a too-small block as the final block:
boolean isFloor = itemsInBlock < count;
newBlocks.add(writeBlock(prefixLength, isFloor, nextFloorLeadLabel, nextBlockStart, i, hasTerms, hasSubBlocks));
hasTerms = false;
hasSubBlocks = false;
nextFloorLeadLabel = suffixLeadLabel;
nextBlockStart = i;
}
lastSuffixLeadLabel = suffixLeadLabel;
}
if (ent.isTerm) {
hasTerms = true;
} else {
hasSubBlocks = true;
}
}
// Write last block, if any:
if (nextBlockStart < end) {
int itemsInBlock = end - nextBlockStart;
boolean isFloor = itemsInBlock < count;
newBlocks.add(writeBlock(prefixLength, isFloor, nextFloorLeadLabel, nextBlockStart, end, hasTerms, hasSubBlocks));
}
assert newBlocks.isEmpty() == false;
PendingBlock firstBlock = newBlocks.get(0);
assert firstBlock.isFloor || newBlocks.size() == 1;
//编制索引
firstBlock.compileIndex(newBlocks, scratchBytes, scratchIntsRef);
// Remove slice from the top of the pending stack, that we just wrote:
//删除已经处理的pending block
pending.subList(pending.size()-count, pending.size()).clear();
// Append new block
//保留首位的block
pending.add(firstBlock);
newBlocks.clear();
}
这里将需要写出一个block的数据进行合并,然后如果在block中的合并前缀后的长度仍然超过了阈值则需要写成一个floor结构,分隔按照minItemsInBlock进行
term索引数据写入
//编制索引
firstBlock.compileIndex(newBlocks, scratchBytes, scratchIntsRef);
public void compileIndex(List<PendingBlock> blocks, RAMOutputStream scratchBytes, IntsRefBuilder scratchIntsRef) throws IOException {
assert (isFloor && blocks.size() > 1) || (isFloor == false && blocks.size() == 1): "isFloor=" + isFloor + " blocks=" + blocks;
assert this == blocks.get(0);
assert scratchBytes.getFilePointer() == 0;
// TODO: try writing the leading vLong in MSB order
// (opposite of what Lucene does today), for better
// outputs sharing in the FST
//记录outputs
scratchBytes.writeVLong(encodeOutput(fp, hasTerms, isFloor));
//层级
if (isFloor) {
scratchBytes.writeVInt(blocks.size()-1);
for (int i=1;i<blocks.size();i++) {
PendingBlock sub = blocks.get(i);
assert sub.floorLeadByte != -1;
//if (DEBUG) {
// System.out.println(" write floorLeadByte=" + Integer.toHexString(sub.floorLeadByte&0xff));
//}
scratchBytes.writeByte((byte) sub.floorLeadByte);
assert sub.fp > fp;
scratchBytes.writeVLong((sub.fp - fp) << 1 | (sub.hasTerms ? 1 : 0));
}
}
//倒排索引FST
final ByteSequenceOutputs outputs = ByteSequenceOutputs.getSingleton();
final Builder<BytesRef> indexBuilder = new Builder<>(FST.INPUT_TYPE.BYTE1,
0, 0, true, false, Integer.MAX_VALUE,
outputs, true, 15);
//if (DEBUG) {
// System.out.println(" compile index for prefix=" + prefix);
//}
//indexBuilder.DEBUG = false;
final byte[] bytes = new byte[(int) scratchBytes.getFilePointer()];
assert bytes.length > 0;
scratchBytes.writeTo(bytes, 0);
//构造索引文件
indexBuilder.add(Util.toIntsRef(prefix, scratchIntsRef), new BytesRef(bytes, 0, bytes.length));
scratchBytes.reset();
// Copy over index for all sub-blocks
//遍历block
for(PendingBlock block : blocks) {
if (block.subIndices != null) {
//将子block的FST所有写入一个FST中
for(FST<BytesRef> subIndex : block.subIndices) {
append(indexBuilder, subIndex, scratchIntsRef);
}
block.subIndices = null;
}
}
//冻结FST生成索引
index = indexBuilder.finish();
assert subIndices == null;
}
这里可以看到就是写入一个FST结构中,output就是term文件中的偏移量信息,在写入FST的时候如果有子blocks则会将子blocks的FST也合并进来,最后调用finish方法冻结FST生成索引
继续回到org.apache.lucene.codecs.blocktree.BlockTreeTermsWriter#write方法,所有处理完毕后调用
//构建索引结构
termsWriter.finish();
public void finish() throws IOException {
if (numTerms > 0) {
//处理剩余所有的pending block
pushTerm(new BytesRef());
//处理root block前缀为空的情况
pushTerm(new BytesRef());
//将剩余数据为一个block
writeBlocks(0, pending.size());
//根block
final PendingBlock root = (PendingBlock) pending.get(0);
assert root.prefix.length == 0;
assert root.index.getEmptyOutput() != null;
//将term index写到.tip文件
indexStartFP = indexOut.getFilePointer();
root.index.save(indexOut);
assert firstPendingTerm != null;
BytesRef minTerm = new BytesRef(firstPendingTerm.termBytes);
assert lastPendingTerm != null;
BytesRef maxTerm = new BytesRef(lastPendingTerm.termBytes);
//保存字段元数据
fields.add(new FieldMetaData(fieldInfo,
((PendingBlock) pending.get(0)).index.getEmptyOutput(),
numTerms,
indexStartFP,
sumTotalTermFreq,
sumDocFreq,
docsSeen.cardinality(),
longsSize,
minTerm, maxTerm));
} else {
assert sumTotalTermFreq == 0 || fieldInfo.getIndexOptions() == IndexOptions.DOCS && sumTotalTermFreq == -1;
assert sumDocFreq == 0;
assert docsSeen.cardinality() == 0;
}
}
调用pushTerm(new BytesRef())方法处理所有的pending的block,然后将剩余所有的数据写成一个block,该block就是根block,然后调用root.index.save(indexOut),将FST中保存的索引信息保存到indexOut中,后续在调用关闭方法时会写入到.tip文件。
回到org.apache.lucene.index.FreqProxTermsWriter#flush方法,写入成功调用close方法,最终调用的还是org.apache.lucene.codecs.blocktree.BlockTreeTermsWriter#close方法
if (success) {
IOUtils.close(consumer);
} else {
IOUtils.closeWhileHandlingException(consumer);
}
public void close() throws IOException {
if (closed) {
return;
}
closed = true;
boolean success = false;
try {
final long dirStart = termsOut.getFilePointer();
final long indexDirStart = indexOut.getFilePointer();
//写入字段个数
termsOut.writeVInt(fields.size());
for(FieldMetaData field : fields) {
//System.out.println(" field " + field.fieldInfo.name + " " + field.numTerms + " terms");
//写入字段编号
termsOut.writeVInt(field.fieldInfo.number);
assert field.numTerms > 0;
//term分词个数
termsOut.writeVLong(field.numTerms);
//root长度
termsOut.writeVInt(field.rootCode.length);
termsOut.writeBytes(field.rootCode.bytes, field.rootCode.offset, field.rootCode.length);
assert field.fieldInfo.getIndexOptions() != IndexOptions.NONE;
//总的词频
if (field.fieldInfo.getIndexOptions() != IndexOptions.DOCS) {
termsOut.writeVLong(field.sumTotalTermFreq);
}
termsOut.writeVLong(field.sumDocFreq);
termsOut.writeVInt(field.docCount);
termsOut.writeVInt(field.longsSize);
//索引起始指针
indexOut.writeVLong(field.indexStartFP);
writeBytesRef(termsOut, field.minTerm);
writeBytesRef(termsOut, field.maxTerm);
}
//写入头尾
writeTrailer(termsOut, dirStart);
CodecUtil.writeFooter(termsOut);
writeIndexTrailer(indexOut, indexDirStart);
CodecUtil.writeFooter(indexOut);
success = true;
} finally {
if (success) {
IOUtils.close(termsOut, indexOut, postingsWriter);
} else {
IOUtils.closeWhileHandlingException(termsOut, indexOut, postingsWriter);
}
}
}
这里将信息写入到tim和tip文件,然后调用postingsWriter的close方法
public void close() throws IOException {
// TODO: add a finish() at least to PushBase? DV too...?
boolean success = false;
try {
//写入尾部
if (docOut != null) {
CodecUtil.writeFooter(docOut);
}
if (posOut != null) {
CodecUtil.writeFooter(posOut);
}
if (payOut != null) {
CodecUtil.writeFooter(payOut);
}
success = true;
} finally {
if (success) {
//刷盘
IOUtils.close(docOut, posOut, payOut);
} else {
IOUtils.closeWhileHandlingException(docOut, posOut, payOut);
}
docOut = posOut = payOut = null;
}
}
尾部数据写入docOut、posOut,然后执行关闭操作
继续回到org.apache.lucene.index.DefaultIndexingChain#flush方法
//字段信息写入
docWriter.codec.fieldInfosFormat().write(state.directory, state.segmentInfo, "", state.fieldInfos, IOContext.DEFAULT);
将字段信息写入文件
那si文件是什么时候写入的呢?我们继续回到org.apache.lucene.index.DocumentsWriterPerThread#flush方法上面流程执行结束后生成了一个FlushedSegment调用
sealFlushedSegment(fs, sortMap, flushNotifications);
codec.segmentInfoFormat().write(directory, newSegment.info, context);
public void write(Directory dir, SegmentInfo si, IOContext ioContext) throws IOException {
//si文件名称
final String fileName = IndexFileNames.segmentFileName(si.name, "", Lucene70SegmentInfoFormat.SI_EXTENSION);
try (IndexOutput output = dir.createOutput(fileName, ioContext)) {
// Only add the file once we've successfully created it, else IFD assert can trip:
si.addFile(fileName);
//写入头信息
CodecUtil.writeIndexHeader(output,
Lucene70SegmentInfoFormat.CODEC_NAME,
Lucene70SegmentInfoFormat.VERSION_CURRENT,
si.getId(),
"");
//版本信息校验
Version version = si.getVersion();
if (version.major < 7) {
throw new IllegalArgumentException("invalid major version: should be >= 7 but got: " + version.major + " segment=" + si);
}
// Write the Lucene version that created this segment, since 3.1
//写入版本信息
output.writeInt(version.major);
output.writeInt(version.minor);
output.writeInt(version.bugfix);
// Write the min Lucene version that contributed docs to the segment, since 7.0
//写最小版本号
if (si.getMinVersion() != null) {
output.writeByte((byte) 1);
Version minVersion = si.getMinVersion();
output.writeInt(minVersion.major);
output.writeInt(minVersion.minor);
output.writeInt(minVersion.bugfix);
} else {
output.writeByte((byte) 0);
}
assert version.prerelease == 0;
//写入文档数
output.writeInt(si.maxDoc());
//写入是否使用复合文件
output.writeByte((byte) (si.getUseCompoundFile() ? SegmentInfo.YES : SegmentInfo.NO));
//写入一些元数据
output.writeMapOfStrings(si.getDiagnostics());
//segment引用的一些文件
Set<String> files = si.files();
for (String file : files) {
if (!IndexFileNames.parseSegmentName(file).equals(si.name)) {
throw new IllegalArgumentException("invalid files: expected segment=" + si.name + ", got=" + files);
}
}
output.writeSetOfStrings(files);
//写入属性信息
output.writeMapOfStrings(si.getAttributes());
Sort indexSort = si.getIndexSort();
int numSortFields = indexSort == null ? 0 : indexSort.getSort().length;
output.writeVInt(numSortFields);
for (int i = 0; i < numSortFields; ++i) {
SortField sortField = indexSort.getSort()[i];
SortField.Type sortType = sortField.getType();
output.writeString(sortField.getField());
int sortTypeID;
switch (sortField.getType()) {
case STRING:
sortTypeID = 0;
break;
case LONG:
sortTypeID = 1;
break;
case INT:
sortTypeID = 2;
break;
case DOUBLE:
sortTypeID = 3;
break;
case FLOAT:
sortTypeID = 4;
break;
case CUSTOM:
if (sortField instanceof SortedSetSortField) {
sortTypeID = 5;
sortType = SortField.Type.STRING;
} else if (sortField instanceof SortedNumericSortField) {
sortTypeID = 6;
sortType = ((SortedNumericSortField) sortField).getNumericType();
} else {
throw new IllegalStateException("Unexpected SortedNumericSortField " + sortField);
}
break;
default:
throw new IllegalStateException("Unexpected sort type: " + sortField.getType());
}
output.writeVInt(sortTypeID);
if (sortTypeID == 5) {
SortedSetSortField ssf = (SortedSetSortField) sortField;
if (ssf.getSelector() == SortedSetSelector.Type.MIN) {
output.writeByte((byte) 0);
} else if (ssf.getSelector() == SortedSetSelector.Type.MAX) {
output.writeByte((byte) 1);
} else if (ssf.getSelector() == SortedSetSelector.Type.MIDDLE_MIN) {
output.writeByte((byte) 2);
} else if (ssf.getSelector() == SortedSetSelector.Type.MIDDLE_MAX) {
output.writeByte((byte) 3);
} else {
throw new IllegalStateException("Unexpected SortedSetSelector type: " + ssf.getSelector());
}
} else if (sortTypeID == 6) {
SortedNumericSortField snsf = (SortedNumericSortField) sortField;
if (snsf.getNumericType() == SortField.Type.LONG) {
output.writeByte((byte) 0);
} else if (snsf.getNumericType() == SortField.Type.INT) {
output.writeByte((byte) 1);
} else if (snsf.getNumericType() == SortField.Type.DOUBLE) {
output.writeByte((byte) 2);
} else if (snsf.getNumericType() == SortField.Type.FLOAT) {
output.writeByte((byte) 3);
} else {
throw new IllegalStateException("Unexpected SortedNumericSelector type: " + snsf.getNumericType());
}
if (snsf.getSelector() == SortedNumericSelector.Type.MIN) {
output.writeByte((byte) 0);
} else if (snsf.getSelector() == SortedNumericSelector.Type.MAX) {
output.writeByte((byte) 1);
} else {
throw new IllegalStateException("Unexpected sorted numeric selector type: " + snsf.getSelector());
}
}
output.writeByte((byte) (sortField.getReverse() ? 0 : 1));
// write missing value
Object missingValue = sortField.getMissingValue();
if (missingValue == null) {
output.writeByte((byte) 0);
} else {
switch(sortType) {
case STRING:
if (missingValue == SortField.STRING_LAST) {
output.writeByte((byte) 1);
} else if (missingValue == SortField.STRING_FIRST) {
output.writeByte((byte) 2);
} else {
throw new AssertionError("unrecognized missing value for STRING field \"" + sortField.getField() + "\": " + missingValue);
}
break;
case LONG:
output.writeByte((byte) 1);
output.writeLong(((Long) missingValue).longValue());
break;
case INT:
output.writeByte((byte) 1);
output.writeInt(((Integer) missingValue).intValue());
break;
case DOUBLE:
output.writeByte((byte) 1);
output.writeLong(Double.doubleToLongBits(((Double) missingValue).doubleValue()));
break;
case FLOAT:
output.writeByte((byte) 1);
output.writeInt(Float.floatToIntBits(((Float) missingValue).floatValue()));
break;
default:
throw new IllegalStateException("Unexpected sort type: " + sortField.getType());
}
}
}
//写入尾部信息
CodecUtil.writeFooter(output);
}
}
记录segment的元数据,这里我们为了更清楚看到lucene保存的各种文件没有使用复合文件存储,正常情况下会将这些文件保存到复合文件
更多推荐
所有评论(0)