文件结构
名称扩展名数据结构说明
Segments Filesegments.gen segments_NSegmentInfos保存当前索引中所有的段信息的集合,索引中所有可用的段信息都存储在段文件segment_N
Lock Filewrite.lock写锁,用于阻止多个IndexWriter写同一个索引文件
Segment Info.siLucene70SegmentInfoFormatsegment的元数据信息,指明这个segment都包含哪些文件
Compound File.cfs, .cfeLucene50CompoundFormat如果启用compound功能,会压缩索引到2个文件内
Fields.fnmLucene60FieldInfosFormat存储有哪些Field,以及相关信息
Field Index.fdxLucene50StoredFieldsFormatField数据文件的索引
Field Data.fdtLucene50StoredFieldsFormatField数据文件
Term Dictionary.timBlockTreeTermsWriterTerm词典
Term Index.tipBlockTreeTermsWriter指向Term词典的索引
Frequencies.docLucene50PostingsWriter存储有关术语在索引中出现位置的位置信息
Payloads.payLucene50PostingsWriteroffset偏移/payload附加信息
Norms.nvd, .nvmLucene70NormsFormat.nvm保存加权因子元数据;.nvd存储加权数据
Per-Document Valuesdvd, .dvmLucene70DocValuesFormat.dvm存文档正排元数据;.dvd存文档正排数据
Term Vector Index.tvxLucene50TermVectorsFormat指向tvd的offset
Term Vector Data.tvdLucene50TermVectorsFormat存储term vector信息
Live Documents.livLucene50LiveDocsFormat存活的文档列表。位图形式
Point values.dii, .dimLucene60PointsFormat多维数据,地理位置等信息,用于处理数值型的查询
写入数据

我们以下面代码为例看看lucene是如何写入数据的,lucene的版本是8.1.0

@Test
public void createIndexTest() throws IOException {
    Document document = new Document();//文本
    Document document1 = new Document();
    Document document2 = new Document();
    Document document3 = new Document();
    //每个field域
    document.add(new TextField("desc", "common common common common common term", Field.Store.YES));
    document1.add(new TextField("desc", "common common common common common term term", Field.Store.YES));
    document2.add(new TextField("desc", "term term term common common common common common", Field.Store.YES));
    document3.add(new TextField("desc", "term", Field.Store.YES));

    StandardAnalyzer standardAnalyzer = new StandardAnalyzer();
    Directory directory = FSDirectory.open(Paths.get("D:/lucene/index"));
    IndexWriterConfig indexWriterConfig = new IndexWriterConfig(standardAnalyzer);
    indexWriterConfig.setUseCompoundFile(false);
    IndexWriter indexWriter = new IndexWriter(directory, indexWriterConfig);
    indexWriter.addDocument(document);
    indexWriter.addDocument(document1);
    indexWriter.addDocument(document2);
    indexWriter.addDocument(document3);
    indexWriter.close();
}

创建一个数据写入目录对象,所有数据都会写入到此处设置的路径下

Directory directory = FSDirectory.open(Paths.get("D:/lucene/index"));

创建一个写入配置,并设置了标准分析器

IndexWriterConfig indexWriterConfig = new IndexWriterConfig(standardAnalyzer);
LiveIndexWriterConfig(Analyzer analyzer) {
    this.analyzer = analyzer;
    //内存占用最大空间
    ramBufferSizeMB = IndexWriterConfig.DEFAULT_RAM_BUFFER_SIZE_MB;
    //最大文档数量
    maxBufferedDocs = IndexWriterConfig.DEFAULT_MAX_BUFFERED_DOCS;
    mergedSegmentWarmer = null;
    //数据删除策略
    delPolicy = new KeepOnlyLastCommitDeletionPolicy();
    commit = null;
    //默认使用复合文件
    useCompoundFile = IndexWriterConfig.DEFAULT_USE_COMPOUND_FILE_SYSTEM;
    //模式
    openMode = OpenMode.CREATE_OR_APPEND;
    //默认的打分为BM25
    similarity = IndexSearcher.getDefaultSimilarity();
    //段合并调度器
    mergeScheduler = new ConcurrentMergeScheduler();
    //索引数据处理流程,包含生成正向和倒排数据
    indexingChain = DocumentsWriterPerThread.defaultIndexingChain;
    //codec默认为Lucene80,可以通过SPI机制进行扩展
    codec = Codec.getDefault();
    if (codec == null) {
      throw new NullPointerException();
    }
    infoStream = InfoStream.getDefault();
    //设置segment的合并策略
    mergePolicy = new TieredMergePolicy();
    //数据刷盘策略
    flushPolicy = new FlushByRamOrCountsPolicy();
    readerPooling = IndexWriterConfig.DEFAULT_READER_POOLING;
    //索引数据线程池
    indexerThreadPool = new DocumentsWriterPerThreadPool();
    perThreadHardLimitMB = IndexWriterConfig.DEFAULT_RAM_PER_THREAD_HARD_LIMIT_MB;
  }

创建IndexWriter对象,IndexWriter是数据写入的核心类,这里面我们为了更好的观察lucene生成了哪些文件,禁用了lucene生成符合文件,同时我们写入了很多重复数据,主要看下lucene是如何生成倒排索引的

indexWriterConfig.setUseCompoundFile(false);
IndexWriter indexWriter = new IndexWriter(directory, indexWriterConfig);

indexWriter的构造函数

public IndexWriter(Directory d, IndexWriterConfig conf) throws IOException {
    enableTestPoints = isEnableTestPoints();
    conf.setIndexWriter(this); // prevent reuse by other instances
    config = conf;
    infoStream = config.getInfoStream();
    softDeletesEnabled = config.getSoftDeletesField() != null;
    //创建一个write.lock文件,防止并发写入
    writeLock = d.obtainLock(WRITE_LOCK_NAME);
    
    boolean success = false;
    try {
      directoryOrig = d;
      directory = new LockValidatingDirectoryWrapper(d, writeLock);

      analyzer = config.getAnalyzer();
      mergeScheduler = config.getMergeScheduler();
      mergeScheduler.setInfoStream(infoStream);
      codec = config.getCodec();
      OpenMode mode = config.getOpenMode();
      final boolean indexExists;
      final boolean create;
      if (mode == OpenMode.CREATE) {
        indexExists = DirectoryReader.indexExists(directory);
        create = true;
      } else if (mode == OpenMode.APPEND) {
        indexExists = true;
        create = false;
      } else {
        indexExists = DirectoryReader.indexExists(directory);
        create = !indexExists;
      }
	  //读取文件夹下的所有文件
      String[] files = directory.listAll();

      //在上面配置中我们没有设置commit,则默认为null
      IndexCommit commit = config.getIndexCommit();

      // Set up our initial SegmentInfos:
      StandardDirectoryReader reader;
      if (commit == null) {
        reader = null;
      } else {
        reader = commit.getReader();
      }
	 //新创建索引
      if (create) {
        //校验
        if (config.getIndexCommit() != null) {
          // We cannot both open from a commit point and create:
          if (mode == OpenMode.CREATE) {
            throw new IllegalArgumentException("cannot use IndexWriterConfig.setIndexCommit() with OpenMode.CREATE");
          } else {
            throw new IllegalArgumentException("cannot use IndexWriterConfig.setIndexCommit() when index has no commit");
          }
        }
		
        final SegmentInfos sis = new SegmentInfos(config.getIndexCreatedVersionMajor());
        if (indexExists) {
          final SegmentInfos previous = SegmentInfos.readLatestCommit(directory);
          sis.updateGenerationVersionAndCounter(previous);
        }
        segmentInfos = sis;
        rollbackSegments = segmentInfos.createBackupSegmentInfos();

        // Record that we have a change (zero out all
        // segments) pending:
        changed();

      } else if (reader != null) {
        // Init from an existing already opened NRT or non-NRT reader:
      
        if (reader.directory() != commit.getDirectory()) {
          throw new IllegalArgumentException("IndexCommit's reader must have the same directory as the IndexCommit");
        }

        if (reader.directory() != directoryOrig) {
          throw new IllegalArgumentException("IndexCommit's reader must have the same directory passed to IndexWriter");
        }

        if (reader.segmentInfos.getLastGeneration() == 0) {  
          // TODO: maybe we could allow this?  It's tricky...
          throw new IllegalArgumentException("index must already have an initial commit to open from reader");
        }

        // Must clone because we don't want the incoming NRT reader to "see" any changes this writer now makes:
        segmentInfos = reader.segmentInfos.clone();

        SegmentInfos lastCommit;
        try {
          lastCommit = SegmentInfos.readCommit(directoryOrig, segmentInfos.getSegmentsFileName());
        } catch (IOException ioe) {
          throw new IllegalArgumentException("the provided reader is stale: its prior commit file \"" + segmentInfos.getSegmentsFileName() + "\" is missing from index");
        }

        if (reader.writer != null) {

          // The old writer better be closed (we have the write lock now!):
          assert reader.writer.closed;

          // In case the old writer wrote further segments (which we are now dropping),
          // update SIS metadata so we remain write-once:
          segmentInfos.updateGenerationVersionAndCounter(reader.writer.segmentInfos);
          lastCommit.updateGenerationVersionAndCounter(reader.writer.segmentInfos);
        }

        rollbackSegments = lastCommit.createBackupSegmentInfos();
      } else {
        // Init from either the latest commit point, or an explicit prior commit point:

        String lastSegmentsFile = SegmentInfos.getLastCommitSegmentsFileName(files);
        if (lastSegmentsFile == null) {
          throw new IndexNotFoundException("no segments* file found in " + directory + ": files: " + Arrays.toString(files));
        }

        // Do not use SegmentInfos.read(Directory) since the spooky
        // retrying it does is not necessary here (we hold the write lock):
        segmentInfos = SegmentInfos.readCommit(directoryOrig, lastSegmentsFile);

        if (commit != null) {
          // Swap out all segments, but, keep metadata in
          // SegmentInfos, like version & generation, to
          // preserve write-once.  This is important if
          // readers are open against the future commit
          // points.
          if (commit.getDirectory() != directoryOrig) {
            throw new IllegalArgumentException("IndexCommit's directory doesn't match my directory, expected=" + directoryOrig + ", got=" + commit.getDirectory());
          }
          
          SegmentInfos oldInfos = SegmentInfos.readCommit(directoryOrig, commit.getSegmentsFileName());
          segmentInfos.replace(oldInfos);
          changed();

          if (infoStream.isEnabled("IW")) {
            infoStream.message("IW", "init: loaded commit \"" + commit.getSegmentsFileName() + "\"");
          }
        }

        rollbackSegments = segmentInfos.createBackupSegmentInfos();
      }



      commitUserData = new HashMap<>(segmentInfos.getUserData()).entrySet();

      pendingNumDocs.set(segmentInfos.totalMaxDoc());

      // start with previous field numbers, but new FieldInfos
      // NOTE: this is correct even for an NRT reader because we'll pull FieldInfos even for the un-committed segments:
      globalFieldNumberMap = getFieldNumberMap();

      validateIndexSort();

      config.getFlushPolicy().init(config);
      bufferedUpdatesStream = new BufferedUpdatesStream(infoStream);
      docWriter = new DocumentsWriter(flushNotifications, segmentInfos.getIndexCreatedVersionMajor(), pendingNumDocs,
          enableTestPoints, this::newSegmentName,
          config, directoryOrig, directory, globalFieldNumberMap);
      readerPool = new ReaderPool(directory, directoryOrig, segmentInfos, globalFieldNumberMap,
          bufferedUpdatesStream::getCompletedDelGen, infoStream, conf.getSoftDeletesField(), reader, config.getReaderAttributes());
      if (config.getReaderPooling()) {
        readerPool.enableReaderPooling();
      }
      // Default deleter (for backwards compatibility) is
      // KeepOnlyLastCommitDeleter:

      // Sync'd is silly here, but IFD asserts we sync'd on the IW instance:
      synchronized(this) {
        deleter = new IndexFileDeleter(files, directoryOrig, directory,
                                       config.getIndexDeletionPolicy(),
                                       segmentInfos, infoStream, this,
                                       indexExists, reader != null);

        // We incRef all files when we return an NRT reader from IW, so all files must exist even in the NRT case:
        assert create || filesExist(segmentInfos);
      }

      if (deleter.startingCommitDeleted) {
        // Deletion policy deleted the "head" commit point.
        // We have to mark ourself as changed so that if we
        // are closed w/o any further changes we write a new
        // segments_N file.
        changed();
      }

      if (reader != null) {
        // We always assume we are carrying over incoming changes when opening from reader:
        segmentInfos.changed();
        changed();
      }

      if (infoStream.isEnabled("IW")) {
        infoStream.message("IW", "init: create=" + create + " reader=" + reader);
        messageState();
      }

      success = true;

    } finally {
      if (!success) {
        if (infoStream.isEnabled("IW")) {
          infoStream.message("IW", "init: hit exception on init; releasing write lock");
        }
        IOUtils.closeWhileHandlingException(writeLock);
        writeLock = null;
      }
    }
  }

上面代码主要做了几件事,1.创建一个write.lock文件,防止并发写入,2.根据mode设置的模式和索引文件是否已经存在来执行初始化,如果索引文件已经存在则需要加载路径下的索引文件,3.初始化DocumentsWriter对象为后续文档数据写入做准备,4.创建ReaderPool对象为后续,主要用于保存SegmentReader,后续segment合并可能需要读取段信息,5.创建IndexFileDeleter对象,遍历目录下已经提交的提交点添加引用,清理掉提交点未引用的文件
初始化完成调用addDocument添加文档

indexWriter.addDocument(document);

追踪代码调用的是updateDocument

private long updateDocument(final DocumentsWriterDeleteQueue.Node<?> delNode,
                              Iterable<? extends IndexableField> doc) throws IOException {
    ensureOpen();
    boolean success = false;
    try {
      //写入文档数据
      long seqNo = docWriter.updateDocument(doc, analyzer, delNode);
      if (seqNo < 0) {
        seqNo = -seqNo;
        processEvents(true);
      }
      success = true;
      return seqNo;
    } catch (VirtualMachineError tragedy) {
      tragicEvent(tragedy, "updateDocument");
      throw tragedy;
    } finally {
      if (success == false) {
        if (infoStream.isEnabled("IW")) {
          infoStream.message("IW", "hit exception updating document");
        }
      }
      maybeCloseOnTragicEvent();
    }
  }

继续调用docWiter的updateDocument方法

long updateDocument(final Iterable<? extends IndexableField> doc, final Analyzer analyzer,
                      final DocumentsWriterDeleteQueue.Node<?> delNode) throws IOException {

    boolean hasEvents = preUpdate();
	//创建重入锁
    final ThreadState perThread = flushControl.obtainAndLock();
	//核心类
    final DocumentsWriterPerThread flushingDWPT;
    long seqNo;
    try {
      // This must happen after we've pulled the ThreadState because IW.close
      // waits for all ThreadStates to be released:
      ensureOpen();
      //初始化DocumentsWriterPerThread对象
      ensureInitialized(perThread);
      assert perThread.isInitialized();
      final DocumentsWriterPerThread dwpt = perThread.dwpt;
      final int dwptNumDocs = dwpt.getNumDocsInRAM();
      try {
        //写入文档数据
        seqNo = dwpt.updateDocument(doc, analyzer, delNode, flushNotifications);
      } finally {
        if (dwpt.isAborted()) {
          flushControl.doOnAbort(perThread);
        }
        // We don't know whether the document actually
        // counted as being indexed, so we must subtract here to
        // accumulate our separate counter:
        numDocsInRAM.addAndGet(dwpt.getNumDocsInRAM() - dwptNumDocs);
      }
      final boolean isUpdate = delNode != null && delNode.isDelete();
      flushingDWPT = flushControl.doAfterDocument(perThread, isUpdate);

      assert seqNo > perThread.lastSeqNo: "seqNo=" + seqNo + " lastSeqNo=" + perThread.lastSeqNo;
      perThread.lastSeqNo = seqNo;

    } finally {
      perThreadPool.release(perThread);
    }

    if (postUpdate(flushingDWPT, hasEvents)) {
      seqNo = -seqNo;
    }
    
    return seqNo;
  }

上面的代码主要

1.创建了一个ThreadState对象,它继承了ReentrantLock对象所以也是一个重入锁

ThreadState obtainAndLock() {
    final ThreadState perThread = perThreadPool.getAndLock();
    boolean success = false;
    try {
      if (perThread.isInitialized() && perThread.dwpt.deleteQueue != documentsWriter.deleteQueue) {
        addFlushableState(perThread);
      }
      success = true;
      return perThread;
    } finally {
      if (!success) { 
        perThreadPool.release(perThread);
      }
    }
  }

ThreadState getAndLock() {
    ThreadState threadState = null;
    synchronized (this) {
      if (freeList.isEmpty()) {
        return newThreadState();
      } else {
        threadState = freeList.remove(freeList.size()-1);

        if (threadState.dwpt == null) {
          for(int i=0;i<freeList.size();i++) {
            ThreadState ts = freeList.get(i);
            if (ts.dwpt != null) {
              // Use this one instead, and swap it with
              // the un-initialized one:
              freeList.set(i, threadState);
              threadState = ts;
              break;
            }
          }
        }
      }
    }

    threadState.lock();
    return threadState;
  }

首先会从缓存中直接获取一个threadState使用,空闲队列是空的则调用newThreadState方法重新创建一个对象

private synchronized ThreadState newThreadState() {
    assert takenThreadStatePermits >= 0;
    while (takenThreadStatePermits > 0) {
      // we can't create new thread-states while not all permits are available
      try {
        wait();
      } catch (InterruptedException ie) {
        throw new ThreadInterruptedException(ie);
      }
    }
    ThreadState threadState = new ThreadState(null);
    threadState.lock(); // lock so nobody else will get this ThreadState
    threadStates.add(threadState);
    return threadState;
}

创建threadState并调用lock方法将这段代码加锁,防止并发操作,并将threadState放入缓存list中

2.初始化DocumentsWriterPerThread

ensureInitialized(perThread);

lucene为了提高写入性能,通过DocumentsWriterPerThread并发写入数据,每个线程是隔离的,单独执行自己的写入逻辑,不会为了保证线程安全问题而进行加锁或其他操作,可以提高写入性能

private void ensureInitialized(ThreadState state) throws IOException {
    if (state.dwpt == null) {
        final FieldInfos.Builder infos = new FieldInfos.Builder(globalFieldNumberMap);
        state.dwpt = new DocumentsWriterPerThread(indexCreatedVersionMajor, segmentNameSupplier.get(), directoryOrig,
                                                  directory, config, infoStream, deleteQueue, infos,
                                                  pendingNumDocs, enableTestPoints);
    }
}

public DocumentsWriterPerThread(int indexVersionCreated, String segmentName, Directory directoryOrig, Directory directory, LiveIndexWriterConfig indexWriterConfig, InfoStream infoStream, DocumentsWriterDeleteQueue deleteQueue,
                                  FieldInfos.Builder fieldInfos, AtomicLong pendingNumDocs, boolean enableTestPoints) throws IOException {
    this.directoryOrig = directoryOrig;
    this.directory = new TrackingDirectoryWrapper(directory);
    this.fieldInfos = fieldInfos;
    this.indexWriterConfig = indexWriterConfig;
    this.infoStream = infoStream;
    this.codec = indexWriterConfig.getCodec();
    this.docState = new DocState(this, infoStream);
    this.docState.similarity = indexWriterConfig.getSimilarity();
    this.pendingNumDocs = pendingNumDocs;
    bytesUsed = Counter.newCounter();
    byteBlockAllocator = new DirectTrackingAllocator(bytesUsed);
    pendingUpdates = new BufferedUpdates(segmentName);
    intBlockAllocator = new IntBlockAllocator(bytesUsed);
    this.deleteQueue = deleteQueue;
    assert numDocsInRAM == 0 : "num docs " + numDocsInRAM;
    deleteSlice = deleteQueue.newSlice();
   
    segmentInfo = new SegmentInfo(directoryOrig, Version.LATEST, Version.LATEST, segmentName, -1, false, codec, Collections.emptyMap(), StringHelper.randomId(), Collections.emptyMap(), indexWriterConfig.getIndexSort());
    assert numDocsInRAM == 0;
    if (INFO_VERBOSE && infoStream.isEnabled("DWPT")) {
      infoStream.message("DWPT", Thread.currentThread().getName() + " init seg=" + segmentName + " delQueue=" + deleteQueue);  
    }
    // this should be the last call in the ctor 
    // it really sucks that we need to pull this within the ctor and pass this ref to the chain!
    consumer = indexWriterConfig.getIndexingChain().getChain(this);
    this.enableTestPoints = enableTestPoints;
    this.indexVersionCreated = indexVersionCreated;
  }

调用dwpt的updateDocument方法

seqNo = dwpt.updateDocument(doc, analyzer, delNode, flushNotifications);

继续执行到processDocument方法

public void processDocument() throws IOException {
    int fieldCount = 0;
    long fieldGen = nextFieldGen++;
    termsHash.startDocument();
    startStoredFields(docState.docID);
    try {
      //循环遍历字段进行处理
      for (IndexableField field : docState.doc) {
        fieldCount = processField(field, fieldGen, fieldCount);
      }
    } finally {
      if (docWriter.hasHitAbortingException() == false) {
        for (int i=0;i<fieldCount;i++) {
          fields[i].finish();
        }
        finishStoredFields();
      }
    }

    try {
      termsHash.finishDocument();
    } catch (Throwable th) {
      docWriter.onAbortingException(th);
      throw th;
    }
  }

这里主要1.初始化调用 startStoredFields(docState.docID)创建倒排索引写入writer其实就是Lucene50StoredFieldsFormat

void startDocument(int docID) throws IOException {
    assert lastDoc < docID;
    initStoredFieldsWriter();
    while (++lastDoc < docID) {
      writer.startDocument();
      writer.finishDocument();
    }
    writer.startDocument();
  }

protected void initStoredFieldsWriter() throws IOException {
    if (writer == null) {
      this.writer =
          docWriter.codec.storedFieldsFormat().fieldsWriter(docWriter.directory, docWriter.getSegmentInfo(),
              IOContext.DEFAULT);
    }
  }

2.开始循环遍历文档中的所有字段然后进行处理

for (IndexableField field : docState.doc) {
    fieldCount = processField(field, fieldGen, fieldCount);
}

调用processField开始处理字段数据

private int processField(IndexableField field, long fieldGen, int fieldCount) throws IOException {
    String fieldName = field.name();
    IndexableFieldType fieldType = field.fieldType();

    PerField fp = null;

    if (fieldType.indexOptions() == null) {
      throw new NullPointerException("IndexOptions must not be null (field: \"" + field.name() + "\")");
    }

    // Invert indexed fields:
    //处理需要索引的字段
    if (fieldType.indexOptions() != IndexOptions.NONE) {
      fp = getOrAddField(fieldName, fieldType, true);
      boolean first = fp.fieldGen != fieldGen;
      //生成倒排索引数据
      fp.invert(field, first);

      if (first) {
        fields[fieldCount++] = fp;
        fp.fieldGen = fieldGen;
      }
    } else {
      verifyUnIndexedFieldType(fieldName, fieldType);
    }

    // Add stored fields:
    //需要存储的字段
    if (fieldType.stored()) {
      if (fp == null) {
        fp = getOrAddField(fieldName, fieldType, false);
      }
      if (fieldType.stored()) {
        String value = field.stringValue();
        if (value != null && value.length() > IndexWriter.MAX_STORED_STRING_LENGTH) {
          throw new IllegalArgumentException("stored field \"" + field.name() + "\" is too large (" + value.length() + " characters) to store");
        }
        try {
          //存储字段
          storedFieldsConsumer.writeField(fp.fieldInfo, field);
        } catch (Throwable th) {
          docWriter.onAbortingException(th);
          throw th;
        }
      }
    }
    //
    DocValuesType dvType = fieldType.docValuesType();
    if (dvType == null) {
      throw new NullPointerException("docValuesType must not be null (field: \"" + fieldName + "\")");
    }
    //需要写入docvalues,正排索引数据
    if (dvType != DocValuesType.NONE) {
      if (fp == null) {
        fp = getOrAddField(fieldName, fieldType, false);
      }
      indexDocValue(fp, dvType, field);
    }
    //维度数据
    if (fieldType.pointDataDimensionCount() != 0) {
      if (fp == null) {
        fp = getOrAddField(fieldName, fieldType, false);
      }
      indexPoint(fp, field);
    }
    
    return fieldCount;
  }

根据配置是否需要索引数据、是否需要保存数据、是否记录正排数据和维度数据进行不同处理,我们主要看如何创建倒排索引

//判断field之前是否已经创建,如果已经创建则直接使用
fp = getOrAddField(fieldName, fieldType, true);
private PerField getOrAddField(String name, IndexableFieldType fieldType, boolean invert) {

    // Make sure we have a PerField allocated
    final int hashPos = name.hashCode() & hashMask;
    PerField fp = fieldHash[hashPos];
    while (fp != null && !fp.fieldInfo.name.equals(name)) {
      fp = fp.next;
    }

    if (fp == null) {
      // First time we are seeing this field in this segment

      FieldInfo fi = fieldInfos.getOrAdd(name);
      initIndexOptions(fi, fieldType.indexOptions());
      Map<String, String> attributes = fieldType.getAttributes();
      if (attributes != null) {
        attributes.forEach((k, v) -> fi.putAttribute(k, v));
      }

      fp = new PerField(docWriter.getIndexCreatedVersionMajor(), fi, invert);
      fp.next = fieldHash[hashPos];
      fieldHash[hashPos] = fp;
      totalFieldCount++;

      // At most 50% load factor:
      if (totalFieldCount >= fieldHash.length/2) {
        rehash();
      }

      if (totalFieldCount > fields.length) {
        PerField[] newFields = new PerField[ArrayUtil.oversize(totalFieldCount, RamUsageEstimator.NUM_BYTES_OBJECT_REF)];
        System.arraycopy(fields, 0, newFields, 0, fields.length);
        fields = newFields;
      }

    } else if (invert && fp.invertState == null) {
      initIndexOptions(fp.fieldInfo, fieldType.indexOptions());
      fp.setInvertState();
    }

    return fp;
  }

这里其实类似于一个map,首先计算hashcode然后进行取模获取存储在数组上的下标位置,然后通过拉链法的形式向下查找,通过equals方法查询对应的字段对象PerField是否存在,如果该字段对象在该segment中之前不存在则会新创建一个FieldInfo,并且把字段名称到对象的映射和字段编号到对象的映射都保存起来,然后将fileInfo信息添加到PerField中

public PerField(int indexCreatedVersionMajor, FieldInfo fieldInfo, boolean invert) {
    this.indexCreatedVersionMajor = indexCreatedVersionMajor;
    this.fieldInfo = fieldInfo;
    similarity = docState.similarity;
    //需要创建倒排索引
    if (invert) {
        setInvertState();
    }
}

void setInvertState() {
    invertState = new FieldInvertState(indexCreatedVersionMajor, fieldInfo.name, fieldInfo.getIndexOptions());
    //倒排存储字段
    termsHashPerField = termsHash.addField(invertState, fieldInfo);
    if (fieldInfo.omitsNorms() == false) {
        assert norms == null;
        norms = new NormValuesWriter(fieldInfo, docState.docWriter.bytesUsed);
    }
}

public TermsHashPerField addField(FieldInvertState invertState, FieldInfo fieldInfo) {
    //保存字段的词频和位置
    return new FreqProxTermsWriterPerField(invertState, this, fieldInfo, nextTermsHash.addField(invertState, fieldInfo));
  }

 public FreqProxTermsWriterPerField(FieldInvertState invertState, TermsHash termsHash, FieldInfo fieldInfo, TermsHashPerField nextPerField) {
    super(fieldInfo.getIndexOptions().compareTo(IndexOptions.DOCS_AND_FREQS_AND_POSITIONS) >= 0 ? 2 : 1, invertState, termsHash, nextPerField, fieldInfo);
    IndexOptions indexOptions = fieldInfo.getIndexOptions();
    assert indexOptions != IndexOptions.NONE;
    //需要记录词频
    hasFreq = indexOptions.compareTo(IndexOptions.DOCS_AND_FREQS) >= 0;
    //记录位置
    hasProx = indexOptions.compareTo(IndexOptions.DOCS_AND_FREQS_AND_POSITIONS) >= 0;
    //记录偏移量
    hasOffsets = indexOptions.compareTo(IndexOptions.DOCS_AND_FREQS_AND_POSITIONS_AND_OFFSETS) >= 0;
  }

 public TermsHashPerField(int streamCount, FieldInvertState fieldState, TermsHash termsHash, TermsHashPerField nextPerField, FieldInfo fieldInfo) {
    //用于记录分词在bytePool中的偏移量
    intPool = termsHash.intPool;
    //用于记录分词信息,包含分词,docID,词频,位置等
    bytePool = termsHash.bytePool;
    //和bytePool相同
    termBytePool = termsHash.termBytePool;
    docState = termsHash.docState;
    this.termsHash = termsHash;
    bytesUsed = termsHash.bytesUsed;
    this.fieldState = fieldState;
    this.streamCount = streamCount;
    numPostingInt = 2*streamCount;
    this.fieldInfo = fieldInfo;
    this.nextPerField = nextPerField;
    PostingsBytesStartArray byteStarts = new PostingsBytesStartArray(this, bytesUsed);
    bytesHash = new BytesRefHash(termBytePool, HASH_INIT_SIZE, byteStarts);
  }

创建好PerField对象后采用头插法,将其插入fieldHash中
开始构建倒排索引结构

fp.invert(field, first);

invert方法比较长,我们分段进行分析
1.第一次执行方法,将状态信息进行重置

if (first) {
	invertState.reset();
}
void reset() {
    position = -1;
    length = 0;
    numOverlap = 0;
    offset = 0;
    maxTermFrequency = 0;
    uniqueTermCount = 0;
    lastStartOffset = 0;
    lastPosition = 0;
  }

2.对字段进行切词,然后创建倒排索引信息

try (TokenStream stream = tokenStream = field.tokenStream(docState.analyzer, tokenStream)) {
    // reset the TokenStream to the first token
    stream.reset();
    //设置属性信息
    invertState.setAttributeSource(stream);
    termsHashPerField.start(field, first);
	//获取分词
    while (stream.incrementToken()) {
        int posIncr = invertState.posIncrAttribute.getPositionIncrement();
        invertState.position += posIncr;
        //删除校验代码
        invertState.lastPosition = invertState.position;
        if (posIncr == 0) {
            invertState.numOverlap++;
        }
		//分词的起始偏移量
        int startOffset = invertState.offset + invertState.offsetAttribute.startOffset();
        //分词的结束偏移量
        int endOffset = invertState.offset + invertState.offsetAttribute.endOffset();
        if (startOffset < invertState.lastStartOffset || endOffset < startOffset) {
            throw new IllegalArgumentException("startOffset must be non-negative, and endOffset must be >= startOffset, and offsets must not go backwards "
                                               + "startOffset=" + startOffset + ",endOffset=" + endOffset + ",lastStartOffset=" + invertState.lastStartOffset + " for field '" + field.name() + "'");
        }
        invertState.lastStartOffset = startOffset;

        try {
            invertState.length = Math.addExact(invertState.length, invertState.termFreqAttribute.getTermFrequency());
        } catch (ArithmeticException ae) {
            throw new IllegalArgumentException("too many tokens for field \"" + field.name() + "\"");
        }

        try {
            //写入分词信息
            termsHashPerField.add();
        } catch (MaxBytesLengthExceededException e) {
            ......
        } catch (Throwable th) {
            docWriter.onAbortingException(th);
            throw th;
        }
    }

    stream.end();

调用add方法

void add() throws IOException {
    //1.判断分词之前是否已经添加,如果没有添加则添加到类似一个map中,然后生成一个termID
    int termID = bytesHash.add(termAtt.getBytesRef());
	//termID大于0则表示是新生成term
    if (termID >= 0) {// New posting
      bytesHash.byteStart(termID);
      // 初始化intpool
      if (numPostingInt + intPool.intUpto > IntBlockPool.INT_BLOCK_SIZE) {
        intPool.nextBuffer();
      }
	  //初始化bytepool
      if (ByteBlockPool.BYTE_BLOCK_SIZE - bytePool.byteUpto < numPostingInt*ByteBlockPool.FIRST_LEVEL_SIZE) {
        bytePool.nextBuffer();
      }

      intUptos = intPool.buffer;
      intUptoStart = intPool.intUpto;
      intPool.intUpto += streamCount;

      postingsArray.intStarts[termID] = intUptoStart + intPool.intOffset;
	  //创建长度为5的空间,第一个位置用来保存docid+termFreq,第二个位置用来保存position
      for(int i=0;i<streamCount;i++) {
        final int upto = bytePool.newSlice(ByteBlockPool.FIRST_LEVEL_SIZE);
        //intpool记录bytepool写入位置
        intUptos[intUptoStart+i] = upto + bytePool.byteOffset;
      }
      postingsArray.byteStarts[termID] = intUptos[intUptoStart];
	  //新创建term
      newTerm(termID);
    } else {
      //term已经存在,获取termID
      termID = (-termID)-1;
      int intStart = postingsArray.intStarts[termID];
      intUptos = intPool.buffers[intStart >> IntBlockPool.INT_BLOCK_SHIFT];
      intUptoStart = intStart & IntBlockPool.INT_BLOCK_MASK;
      //添加term信息
      addTerm(termID);
    }

    if (doNextCall) {
      nextPerField.add(postingsArray.textStarts[termID]);
    }
  }

1.主要是判断term分词之前是否已经存在,如果已经存在则返回之前已经生成的termID,如果没有就首先根据term分词的长度来判断空间是否足以存储,如果不够则需要使用新的buffer,然后记录的起始位置记录到bytesStart中,其中下标位置就是分词的ID,然后根据term的长度,判断是使用1个字节还是2个字节存储长度信息,并且判断count总数是否达到了hashHalfSize,如果是则需要扩容,最后将termID返回

public int add(BytesRef bytes) {
    assert bytesStart != null : "Bytesstart is null - not initialized";
    //分词占用字节长度
    final int length = bytes.length;
    //通过hash计算位于ids数组的位置
    final int hashPos = findHash(bytes);
    //获取ID
    int e = ids[hashPos];
    //-1表示不存在
    if (e == -1) {
      // new entry
      final int len2 = 2 + bytes.length;
      //长度已经达到最大值,需要
      if (len2 + pool.byteUpto > BYTE_BLOCK_SIZE) {
        if (len2 > BYTE_BLOCK_SIZE) {
          throw new MaxBytesLengthExceededException("bytes can be at most "
              + (BYTE_BLOCK_SIZE - 2) + " in length; got " + bytes.length);
        }
        //移动到下一个字节数组来保存分词数据
        pool.nextBuffer();
      }
      final byte[] buffer = pool.buffer;
      //字节数组存储位置
      final int bufferUpto = pool.byteUpto;
      //分词数量已经达到了bytesStart的索引长度则需要扩容
      if (count >= bytesStart.length) {
        bytesStart = bytesStartArray.grow();
        assert count < bytesStart.length + 1 : "count: " + count + " len: "
            + bytesStart.length;
      }
      //生成一个id,每一个segment的id都是独立的
      e = count++;
	  //保存term分词的起始位置
      bytesStart[e] = bufferUpto + pool.byteOffset;

      //判断分词的长度,使用变长类型来存储长度,节省空间
      if (length < 128) {
        // 1 byte to store length
        buffer[bufferUpto] = (byte) length;
        pool.byteUpto += length + 1;
        assert length >= 0: "Length must be positive: " + length;
        System.arraycopy(bytes.bytes, bytes.offset, buffer, bufferUpto + 1,
            length);
      } else {
        // 2 byte to store length
        buffer[bufferUpto] = (byte) (0x80 | (length & 0x7f));
        buffer[bufferUpto + 1] = (byte) ((length >> 7) & 0xff);
        pool.byteUpto += length + 2;
        System.arraycopy(bytes.bytes, bytes.offset, buffer, bufferUpto + 2,
            length);
      }
      assert ids[hashPos] == -1;
      ids[hashPos] = e;
	  //扩容
      if (count == hashHalfSize) {
        rehash(2 * hashSize, true);
      }
      return e;
    }
    return -(e + 1);
  }

继续回到add方法

public final static int[] NEXT_LEVEL_ARRAY = {1, 2, 3, 4, 5, 6, 7, 8, 9, 9};

public final static int[] LEVEL_SIZE_ARRAY = {5, 14, 20, 30, 40, 40, 80, 80, 120, 200};
 
public final static int FIRST_LEVEL_SIZE = LEVEL_SIZE_ARRAY[0];

//termID大于0则表示是新的分词
if (termID >= 0) {// New posting
    bytesHash.byteStart(termID);
    //初始化存储intPool,主要是保存bytePool中数据的元数据
    if (numPostingInt + intPool.intUpto > IntBlockPool.INT_BLOCK_SIZE) {
        intPool.nextBuffer();
    }
    //初始化bytePool
    if (ByteBlockPool.BYTE_BLOCK_SIZE - bytePool.byteUpto < numPostingInt*ByteBlockPool.FIRST_LEVEL_SIZE) {
        bytePool.nextBuffer();
    }

    intUptos = intPool.buffer;
    //intPool存储数据的起始位置
    intUptoStart = intPool.intUpto;
    //再加上来个位置,用来保存词频和位置信息
    intPool.intUpto += streamCount;
	//记录intPool中分词的起始位置
    postingsArray.intStarts[termID] = intUptoStart + intPool.intOffset;
	//初始化
    for(int i=0;i<streamCount;i++) {
        //分配长度为5的存储空间,结束位置为16,
        final int upto = bytePool.newSlice(ByteBlockPool.FIRST_LEVEL_SIZE);
        //记录起始位置
        intUptos[intUptoStart+i] = upto + bytePool.byteOffset;
    }
    //将信息记录到postingsArray中
    postingsArray.byteStarts[termID] = intUptos[intUptoStart];

    newTerm(termID);

}

上面intPool记录了bytePool中的位置信息,而bytePool才是真正存储数据的,NEXT_LEVEL_ARRAY分为10个等级,从1到9最高等级为9,每一级对应着不同长度

等级长度分隔符
1516
21417
32018
43019
54020
64021
78022
88023
912024
920025

在这里会先预留两个FIRST_LEVEL_SIZE长度为5个字节的位置分隔符为16,一个用来存储docID和词频,一个用来存储位置信息,当5个字节存储不够时需要进行扩容,也就是从1级增涨到2级,长度为14分隔符为17,依次类推进行扩容

void newTerm(final int termID) {
    // First time we're seeing this term since the last
    // flush
    //倒排元信息存储
    final FreqProxPostingsArray postings = freqProxPostingsArray;
    //设置分词的最后出现的docID
    postings.lastDocIDs[termID] = docState.docID;
    if (!hasFreq) {
        assert postings.termFreqs == null;
        postings.lastDocCodes[termID] = docState.docID;
        fieldState.maxTermFrequency = Math.max(1, fieldState.maxTermFrequency);
    } else {
        //docid左移一位,最后一位用来表示freq,如果大于1则需要两个字节存储,如果是词频为1则用一个字节存储
        postings.lastDocCodes[termID] = docState.docID << 1;
        //记录词频
        postings.termFreqs[termID] = getTermFreq();
        //记录位置信息
        if (hasProx) {
            //将位置信息保存到上面申请第二个的长度为5个字节,分隔符为16的空间上
            writeProx(termID, fieldState.position);
            //记录偏移量
            if (hasOffsets) {
                writeOffsets(termID, fieldState.offset);
            }
        } else {
            assert !hasOffsets;
        }
        //最大词频
        fieldState.maxTermFrequency = Math.max(postings.termFreqs[termID], fieldState.maxTermFrequency);
    }
    fieldState.uniqueTermCount++;
}

这里保存记录词频信息,然后根据我们设置的策略,判断是否需要存储位置信息和偏移量

//记录位置信息
void writeProx(int termID, int proxCode) {
    if (payloadAttribute == null) {
      //左移1位,后一位的0表示没有payload
      writeVInt(1, proxCode<<1);
    } else {
      BytesRef payload = payloadAttribute.getPayload();
      if (payload != null && payload.length > 0) {
        writeVInt(1, (proxCode<<1)|1);
        writeVInt(1, payload.length);
        writeBytes(1, payload.bytes, payload.offset, payload.length);
        sawPayloads = true;
      } else {
        writeVInt(1, proxCode<<1);
      }
    }

    assert postingsArray == freqProxPostingsArray;
    freqProxPostingsArray.lastPositions[termID] = fieldState.position;
  }

void writeVInt(int stream, int i) {
    assert stream < streamCount;
    while ((i & ~0x7F) != 0) {
      writeByte(stream, (byte)((i & 0x7f) | 0x80));
      i >>>= 7;
    }
    writeByte(stream, (byte) i);
  }

void writeByte(int stream, byte b) {
    //从intpool获取写入位置
    int upto = intUptos[intUptoStart+stream];
    byte[] bytes = bytePool.buffers[upto >> ByteBlockPool.BYTE_BLOCK_SHIFT];
    assert bytes != null;
    //计算写入偏移量
    int offset = upto & ByteBlockPool.BYTE_BLOCK_MASK;
    //写入位置已经到达末尾,需要进行扩容了
    if (bytes[offset] != 0) {
      // End of slice; allocate a new one
      //长度不够存储位置信息需要进行扩容
      offset = bytePool.allocSlice(bytes, offset);
      bytes = bytePool.buffer;
      intUptos[intUptoStart+stream] = offset + bytePool.byteOffset;
    }
    bytes[offset] = b;
    (intUptos[intUptoStart+stream])++;
  }

这里将term的位置信息写入,首先根据intpool中记录的写入偏移量,计算需要写入bytepool中的那个buffer,bytePool会创建一个二维数组来保存数据,二维数组一共有10行,每一行都是一个字节数组字节数组的长度为1<<15=32768个字节,根据记录的upto计算应该写入哪一行的字节数组中,然后计算写入的真正位置,如果该位置存储的数据不是0,则表示数据已经写到了末尾位置,需要进行扩容,如果为0则不需要扩容,最后将分词位置信息写入

那这个记录docID+termFreq和position的空间是如何进行扩容的呢?我们进入allocSlice方法

public int allocSlice(final byte[] slice, final int upto) {
	//计算应该扩容后的等级,初始分隔符为16,第一次扩容也就是level为1
    final int level = slice[upto] & 15;
    //扩容之后的等级
    final int newLevel = NEXT_LEVEL_ARRAY[level];
    //扩容之后的长度
    final int newSize = LEVEL_SIZE_ARRAY[newLevel];

    // Maybe allocate another block
    //需要进行扩容,新创建一个数组
    if (byteUpto > BYTE_BLOCK_SIZE-newSize) {
      nextBuffer();
    }
	//新的写入位置
    final int newUpto = byteUpto;
    //偏移量
    final int offset = newUpto + byteOffset;
    //扩容后的下一个数据写入位置
    byteUpto += newSize;

    // Copy forward the past 3 bytes (which we are about
    // to overwrite with the forwarding address):
    //将最后三位的数据copy到新写入位置
    buffer[newUpto] = slice[upto-3];
    buffer[newUpto+1] = slice[upto-2];
    buffer[newUpto+2] = slice[upto-1];

    // Write forwarding address at end of last slice:
    //然后将最后4位用来保存扩容后的偏移量信息,是一个指针指向了起始位置
    slice[upto-3] = (byte) (offset >>> 24);
    slice[upto-2] = (byte) (offset >>> 16);
    slice[upto-1] = (byte) (offset >>> 8);
    slice[upto] = (byte) offset;
        
    // Write new level:
    //写入新的分隔符
    buffer[byteUpto-1] = (byte) (16|newLevel);

    return newUpto+3;
  }

扩容是一级一级的扩容,这里比如我们创建的是1级长度为5的空间结束标志为16,这里我们需要扩容计算后level为1,扩容后的newLevel为2,扩容后的newSize长度为14,然后计算新的写入位置然后将最后三个位置的数据copy到新的写入位置,然后留出4个字节长度用来保存指针,指向扩容后的首个写入位置,然后返回写入位置

继续回到newTerm方法,如果需要写入偏移量,则在和上面写入position信息一样

void writeOffsets(int termID, int offsetAccum) {
    final int startOffset = offsetAccum + offsetAttribute.startOffset();
    final int endOffset = offsetAccum + offsetAttribute.endOffset();
    assert startOffset - freqProxPostingsArray.lastOffsets[termID] >= 0;
    writeVInt(1, startOffset - freqProxPostingsArray.lastOffsets[termID]);
    writeVInt(1, endOffset - startOffset);
    freqProxPostingsArray.lastOffsets[termID] = startOffset;
  }

如果term之前已经写入过则执行addTerm(termID)逻辑

void addTerm(final int termID) {
    final FreqProxPostingsArray postings = freqProxPostingsArray;
    assert !hasFreq || postings.termFreqs[termID] > 0;
    //不记录词频
    if (!hasFreq) {
      assert postings.termFreqs == null;
      if (termFreqAtt.getTermFrequency() != 1) {
        throw new IllegalStateException("field \"" + fieldInfo.name + "\": must index term freq while using custom TermFrequencyAttribute");
      }
      //只记录docID
      if (docState.docID != postings.lastDocIDs[termID]) {
        // New document; now encode docCode for previous doc:
        assert docState.docID > postings.lastDocIDs[termID];
        writeVInt(0, postings.lastDocCodes[termID]);
        postings.lastDocCodes[termID] = docState.docID - postings.lastDocIDs[termID];
        postings.lastDocIDs[termID] = docState.docID;
        fieldState.uniqueTermCount++;
      }
    } else if (docState.docID != postings.lastDocIDs[termID]) {
      //本次相同term的docID和上次不一样则可以将docID和词频写入
      assert docState.docID > postings.lastDocIDs[termID]:"id: "+docState.docID + " postings ID: "+ postings.lastDocIDs[termID] + " termID: "+termID;
      // Term not yet seen in the current doc but previously
      // seen in other doc(s) since the last flush

      // Now that we know doc freq for previous doc,
      // write it & lastDocCode
      //如果只出现一次则1个字节记录docID和词频
      if (1 == postings.termFreqs[termID]) {
        writeVInt(0, postings.lastDocCodes[termID]|1);
      } else {
        writeVInt(0, postings.lastDocCodes[termID]);
        writeVInt(0, postings.termFreqs[termID]);
      }

      // Init freq for the current document
      postings.termFreqs[termID] = getTermFreq();
      fieldState.maxTermFrequency = Math.max(postings.termFreqs[termID], fieldState.maxTermFrequency);
      //docID差值code
      postings.lastDocCodes[termID] = (docState.docID - postings.lastDocIDs[termID]) << 1;
      //记录docID
      postings.lastDocIDs[termID] = docState.docID;
      if (hasProx) {
        //记录位置信息
        writeProx(termID, fieldState.position);
        if (hasOffsets) {
          //记录偏移量
          postings.lastOffsets[termID] = 0;
          writeOffsets(termID, fieldState.offset);
        }
      } else {
        assert !hasOffsets;
      }
      fieldState.uniqueTermCount++;
    } else {
      //倒排索引中记录词频
      postings.termFreqs[termID] = Math.addExact(postings.termFreqs[termID], getTermFreq());
      fieldState.maxTermFrequency = Math.max(fieldState.maxTermFrequency, postings.termFreqs[termID]);
      //记录位置信息和偏移量
      if (hasProx) {
        writeProx(termID, fieldState.position-postings.lastPositions[termID]);
        if (hasOffsets) {
          writeOffsets(termID, fieldState.offset);
        }
      }
    }
  }

判断配置是否记录词频信息,如果不需要记录则判断当前docID和上次写入的docID是否相同,如果不同说明上次处理的doc文档已经处理结束,可以记录docID了;如果需要记录词频,和上面一样判断当前docID和上次记录的docID是否相同,如果不相同则说明上次的doc文档已经处理完成,词频信息也可以确定了,则可以写入docID和词频信息,然后再根据配置记录位置信息和偏移量。循环往复将所有要写入的数据都生成倒排索引数据。

Logo

助力广东及东莞地区开发者,代码托管、在线学习与竞赛、技术交流与分享、资源共享、职业发展,成为松山湖开发者首选的工作与学习平台

更多推荐