monstache 同步 es 学习


  最近在做一个 mongoDB 的存储数据到 es 中进行检索的工作,也是通过此文学习一下 monstache 是如何做同步,以及管理可用性和一致性的,同时学习 golang 相关的 chan 的生命周期

  当我们想实现检索 mongoDB 中的数据,通过 like 的方式在 DB 中检索肯定是不行的,如何根据关键词搜索出对应用户之前相关的记录,最好的方式还是放到搜索引擎 es 中,由于之前数据从没有同步到 monstache 过,所以需要使用一些工具来实现同步数据的功能,可能在 mongoDB 中的数据,不一定是检索想要的,也可以通过编写工具来进行多个集合的关联处理,在mapping 过数据之后,再同步到 es 中来方便检索。

使用工具到 es 有多种方式:

  1. monstache
    Monstache基于MongoDB的oplog实现实时数据同步及订阅,支持MongoDB与高版本Elasticsearch之间的数据同步,同时支持MongoDB的变更流和聚合管道功能,并且拥有丰富的特性。

     Monstache不仅支持软删除和硬删除,还支持数据库删除和集合删除,能够确保Elasticsearch端实时与源端数据保持一致。
    
  2. flink cdc
    支持数据的全量同步和增量同步,同时支持灵活的多目标源数据库的写入,

  3. logstache
    支持数据的全量同步或增量同步,不够灵活的处理多数据的关联处理和映射。

使用 cursor 对monstache 进行总结:

  1. 全量同步

    1. 可以在 config.toml 中配置toml:"direct-read-namespaces",使用 mongoDB 客户端直接查询指定的 namespace
    2. 将查询结果按批次处理并索引到 Elasticsearch
  2. 增量同步

    1. 支持 oplog 和 change stream 两种增量同步方式。
  3. 文档处理流程
    mongoDB 文档经过以下处理后同步到 ES:

    1. 筛选: 通过 filterWithRegex 等函数确定哪些文档需要处理
    2. 转换: 通过 JavaScript 脚本、Go 插件或内联规则转换文档
    3. 关联: 处理关联的文档
    4. 索引: 最终将文档批量发送到 Elasticsearch
  4. 高可用/集群实现

    1. clusterName 配置: 同一集群的节点共享 ClusterName
    2. worker 配置: 同一集群不同worker 节点就是 clusterName + workerName
    3. leader election: 同一 worker 可以有多个备用节点,使用 mongoDB 作为锁(使用 index ttl ) + 心跳来实现领导选举
    4. 状态管理: 共享最后同步的时间戳(lastTs)到 mongoDB 集合中
  5. 断点续传

    1. monstache 支持断点续传,保证重启后能从上次处理的位置继续:
      1. 根据 ResumeStrategy 选择时间戳或 token 方式
      2. 将处理状态(lastTs或 token)存储在 mongoDB,重启时进行检索
  6. 监控服务器

    1. 提供 http 服务器用于监控和管理
      1. 健康检查
      2. 同步统计
      3. 实例信息
  7. eventLoop 处理

    1. 处理各种事件和信号
  8. 回退处理

    1. 当 es 同步数据出错后,会基于熔断的机制,进行恢复

下面基于这几个模块进行分析:

  1. 数据处理
  2. 管道处理以及优雅下线
  3. 回退处理
  4. 断点续传
  5. 插件管理
  6. 高可用处理

1. 数据处理

不管是 directNamespace 进行全量同步,还是监听增量的变更,都会写入到同一管道进行处理:


    func (ic *indexClient) eventLoop() {
	
	...

	    for {
	    	select {
	    	...

            // 监听到有数据过来
	    	case op, open := <-ic.gtmCtx.OpC:
	    		if !ic.enabled {
	    			break
	    		}
	    		if op == nil {
	    			if !open && !allOpsVisited {
	    				allOpsVisited = true
	    				ic.opsConsumed <- true
	    			}
	    			break
	    		}
	    		// 判断是不是增量变更
	    		if op.IsSourceOplog() {
                    // 每次增量变更,都会保存赋值最新处理的时间戳
	    			ic.lastTs = op.Timestamp
	    			if ic.config.ResumeStrategy == tokenResumeStrategy {
	    				ic.tokens[op.ResumeToken.StreamID] = op.ResumeToken.ResumeToken
	    			}
	    		}
                // 根据更新的类型,进行不同的 route 来处理
	    		if err = ic.routeOp(op); err != nil {
	    			ic.processErr(err)
	    		}
	    	}
	    }
    }


    func (ic *indexClient) routeOp(op *gtm.Op) (err error) {
	    if processPlugin != nil {
            // 插件处理数据
	    	err = ic.routeProcess(op)
	    }
	    if op.IsDrop() {
	    	err = ic.routeDrop(op)
	    } else if op.IsDelete() {
	    	err = ic.routeDelete(op)
	    } else if op.Data != nil {
	    	err = ic.routeData(op)
	    }
	    return
    }

    func (ic *indexClient) routeData(op *gtm.Op) (err error) {
	    skip := false
	    if op.IsSourceOplog() && len(ic.config.Relate) > 0 {
	    	skip, err = ic.routeDataRelate(op)
	    }
	    if !skip {
	    	if ic.hasFileContent(op) {
	    		ic.fileC <- op
	    	} else {
	    		ic.indexC <- op
	    	}
	    }
	    return
    }

    func (ic *indexClient) startIndex() {
	    for i := 0; i < 5; i++ {
	    	ic.indexWg.Add(1)
	    	go func() {
                //  这里会开 5 个线程进行处理
	    		defer ic.indexWg.Done()
	    		for op := range ic.indexC {
	    			if err := ic.doIndex(op); err != nil {
	    				ic.processErr(err)
	    			}
	    		}
	    	}()
	    }
    }

2. 管道以及处理生命周期

这里涉及多管道数据处理,以及 graceful shut down 等处理,需了解到各管道之间的数据流向和处理关系,下面是一个大概的管道数据关系

chan_process

这里重点说明一下优雅关闭下的各管道处理流程,首先,是在什么情况下会进行优雅关闭:

  1. 设置了全量读取完成后,直接退出
func (ic *indexClient) startReadWait() {
	directReadsEnabled := len(ic.config.DirectReadNs) > 0
	if directReadsEnabled {
		exitAfterDirectReads := ic.config.ExitAfterDirectReads
		go func() {
			ic.gtmCtx.DirectReadWg.Wait()
			if ic.config.Resume {
				ic.saveTimestampFromReplStatus()
			}
			if exitAfterDirectReads {
				var exit bool
				ic.rwmutex.RLock()
				exit = !ic.externalShutdown
				ic.rwmutex.RUnlock()
				if exit {
                    // 先等待所有 worker 处理,在关闭客户端和服务端
					ic.stopAllWorkers()
					ic.doneC <- 30
				}
			}
		}()
	}
}

  1. monstache 会在最开始的时候,监听系统信号,当监听到退出信号,也会进行优雅退出
func (sh *sigHandler) start() {
	go func() {
		sigs := make(chan os.Signal, 1)
		signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM, syscall.SIGKILL)
		select {
		case <-sigs:
            // 当客户端还未开始,则直接退出
			// we never got started so simply exit
			os.Exit(0)
		case ic := <-sh.clientStartedC:
            // 当客户端已经开始,则需要等待处理完成数据后,优雅退出
			<-sigs
			ic.onExternalShutdown()
			go func() {
				// forced shutdown on 2nd signal
				<-sigs
				infoLog.Println("Forcing shutdown, bye bye...")
				os.Exit(1)
			}()
			// we started processing events so do a clean shutdown
			infoLog.Println("Starting clean shutdown")
			ic.stopAllWorkers()
			ic.doneC <- 10
		}
	}()
}


onExternalShutdown处理的是当意外退出后,需要根据是否开启全量写入状态,等待 500ms 后,在停止所有 workers 和客户端

func (ic *indexClient) onExternalShutdown() {
	ic.rwmutex.Lock()
	defer ic.rwmutex.Unlock()
	ic.externalShutdown = true
	ic.checkDirectReads()
}

func (ic *indexClient) checkDirectReads() {
	if len(ic.config.DirectReadNs) == 0 {
		return
	}
	drc := ic.directReadChan()
	t := time.NewTimer(time.Duration(500) * time.Millisecond)
	defer t.Stop()
	select {
	case <-t.C:
		ic.directReadsPending = true
	case <-drc:
	}
}

func (ic *indexClient) directReadChan() chan struct{} {
	c := make(chan struct{})
	go func() {
		ic.gtmCtx.DirectReadWg.Wait()
		close(c)
	}()
	return c
}

stopAllWorkers 做的事情就是:

  1. 先停止监听数据变化
  2. 直到没有数据会过来后,等待关闭相关的文档处理chan

func (ic *indexClient) stopAllWorkers() {
	infoLog.Println("Stopping all workers")
	ic.gtmCtx.Stop()
	<-ic.opsConsumed
	close(ic.relateC)
	ic.relateWg.Wait()
	close(ic.fileC)
	ic.fileWg.Wait()
	close(ic.indexC)
	ic.indexWg.Wait()
	close(ic.processC)
	ic.processWg.Wait()
}

doneC 就是一个监听开始关闭客户端的信道,同时设置超时关闭时间,当收到要关闭客户端后,进行超时关闭


// main 的 eventLoop 会监听 doneC

func (ic *indexClient) eventLoop() {
    ...
	for {
		select {
		...
		case timeout := <-ic.doneC:
			ic.enabled = false
			ic.shutdown(timeout)
			return

        }
    }
}


func (ic *indexClient) shutdown(timeout int) {
	infoLog.Println("Shutting down")
    // 关闭客户端
	go ic.closeClient()
	doneC := make(chan bool)
	go func() {
		closeT := time.NewTimer(time.Duration(timeout) * time.Second)
		defer closeT.Stop()
		done := false
		for !done {
			select {
			case <-ic.closeC:
				done = true
				close(doneC)
            //  超时直接关闭
			case <-closeT.C:
				done = true
				close(doneC)
			}
		}
	}()
	<-doneC
	os.Exit(exitStatus)
}


func (ic *indexClient) closeClient() {
	if ic.mongo != nil && ic.config.ClusterName != "" {
		ic.resetClusterState()
	}
    // 关闭 开启的服务端状态响应
	if ic.hsc != nil {
		ic.hsc.shutdown = true
		ic.hsc.httpServer.Shutdown(context.Background())
	}
    // 关闭 es 客户端
	if ic.bulk != nil {
		ic.bulk.Close()
	}
    // 关闭 es 状态统计 客户端
	if ic.bulkStats != nil {
		ic.bulkStats.Close()
	}
    // 如果全量写入完成,则会根据是否开启全量写入状态,来保存写入状态
	if len(ic.config.DirectReadNs) > 0 {
		ic.rwmutex.RLock()
		if !ic.directReadsPending {
			infoLog.Println("Direct reads completed")
			if ic.config.DirectReadStateful {
				if err := ic.saveDirectReadNamespaces(); err != nil {
					errorLog.Printf("Error saving direct read state: %s", err)
				}
			}
		}
		ic.rwmutex.RUnlock()
	}
	close(ic.closeC)
}

3. 回退处理

es bulk server 设置 backoff 函数

    func (ic *indexClient) newBulkProcessor(client *elastic.Client) (bulk *elastic.BulkProcessor,   err error) {
    	config := ic.config
    	bulkService := client.BulkProcessor().Name("monstache")
    	bulkService.Workers(config.ElasticMaxConns)   //初始化elastic 连接数
    	bulkService.Stats(config.Stats)  // 开启统计
    	bulkService.BulkActions(config.ElasticMaxDocs) //批量 flush 进 elastic 的数量
    	bulkService.BulkSize(config.ElasticMaxBytes) // 批量 flush 进 elastic 的最大 size
    	if config.ElasticRetry == false {
    		bulkService.Backoff(&elastic.StopBackoff{})
    	}
    	bulkService.After(ic.afterBulk()) // 回退处理,其实也是熔断处理
        // 设置 flush 的间隔
    	bulkService.FlushInterval(time.Duration(config.ElasticMaxSeconds) * time.Second)
    	return bulkService.Do(context.Background())
    }

    func (ic *indexClient) afterBulk() func(int64, []elastic.BulkableRequest, *elastic.BulkResponse, error) {
	return func(executionID int64, requests []elastic.BulkableRequest, response *elastic.BulkResponse, err error) {
		if response == nil || !response.Errors {
			ic.bulkErrs.Store(0)
			return
		}
		if failed := response.Failed(); failed != nil {
			backoff := false
			for _, item := range failed {
				if item.Status == http.StatusConflict {
					// ignore version conflict since this simply means the doc
					// is already in the index
					continue
				}
				logFailedResponseItem(item)
				if item.Status == http.StatusNotFound {
					// status not found should not initiate back off
					continue
				}
				backoff = true
			}
            //  主要是在这里,会给 bulkBackoffC 管道 传递信号,告诉主进程需要等待一段时间,再处理
			if backoff {
				wait := ic.backoffDuration()
				infoLog.Printf("Backing off for %.1f minutes after bulk indexing failures.", wait.Minutes())
				// signal the event loop to pause pulling new events for a duration
				ic.bulkBackoffC <- wait
				// pause the bulk worker for a duration
				ic.backoff(wait)
				ic.bulkErrs.Add(1)
			}
		}
	}
}


主进程 eventLoop 中会监听这个 bulkBackoffC ,当监听到之后,会根据设置的超时时间,自旋一段时间, 当收到信号,也会主动退出自旋,然后主进程监听到信号,也会主动进行 shutdown,也会响应状态探测函数

func (ic *indexClient) backoffDuration() time.Duration {
	consecutiveErrors := int(ic.bulkErrs.Load())
	wait, ok := ic.bulkBackoff.Next(consecutiveErrors)
	if !ok {
		wait = ic.bulkBackoffMax
	}
	return wait
}


func (ic *indexClient) backoff(wait time.Duration) {
	timer := time.NewTimer(wait)
	defer timer.Stop()
	sigs := make(chan os.Signal, 1)
	signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM)
	defer signal.Stop(sigs)
	for {
		select {
		case <-timer.C:
			return
		case <-sigs:
			return
		case req := <-ic.statusReqC:
			enabled, lastTs := ic.enabled, ic.lastTs
			statusResp := &statusResponse{
				enabled: enabled,
				lastTs:  lastTs,
				backoff: true,
			}
			req.responseC <- statusResp
		}
	}
}

func (ic *indexClient) eventLoop() {


    ...

    for {
		select {
		case wait := <-ic.bulkBackoffC:
			ic.backoff(wait)
        }
    }

}


4. 断点续传

保存时间戳的核心函数: saveTimestamp() , 每次保存最新的时间戳到ConfigDatabaseName = cluster + worker

func (ic *indexClient) saveTimestamp() error {
    col := ic.mongoWriter.Database(ic.config.ConfigDatabaseName).Collection("monstache")
    doc := map[string]interface{}{
        "ts": ic.lastTs,
    }
    opts := options.Update()
    opts.SetUpsert(true)
    _, err := col.UpdateOne(context.Background(), bson.M{
        "_id": ic.config.ResumeName,
    }, bson.M{
        "$set": doc,
    }, opts)
    return err
}


时间戳更新机制: 每隔 10s 执行一次,根据重启策略,来进行保存时间戳或是 token


func (ic *indexClient) eventLoop() {

	timestampTicker := time.NewTicker(10 * time.Second)
	if !ic.config.Resume {
		timestampTicker.Stop()
	}


	for {
		select {

		case <-timestampTicker.C:
			if !ic.enabled {
				break
			}
			if ic.config.ResumeStrategy == tokenResumeStrategy {
				ic.nextTokens()
			} else {
				ic.nextTimestamp()
			}
        }
    }
}



主要解释时间戳机制: nextTimestamp()


func (ic *indexClient) nextTimestamp() {
	if ic.hasNewEvents() {
		ic.bulk.Flush()
		if err := ic.saveTimestamp(); err == nil {
			ic.lastTsSaved = ic.lastTs
		} else {
			ic.processErr(err)
		}
	}
}

// 只保存时间最新的时间,同时把数据 flush 进 es
func (ic *indexClient) hasNewEvents() bool {
	if ic.lastTs.T > ic.lastTsSaved.T ||
		(ic.lastTs.T == ic.lastTsSaved.T && ic.lastTs.I > ic.lastTsSaved.I) {
		return true
	}
	return false
}



在优雅关闭时,也会保存一次时间戳,避免心跳还没有执行

func (ic *indexClient) shutdown(timeout int) {
    // ...
    if ic.config.Resume {
        if ic.config.ResumeStrategy == tokenResumeStrategy {
            if err := ic.saveTokens(); err != nil {
                errorLog.Printf("Unable to save tokens: %s", err)
            }
        } else {
            if err := ic.saveTimestamp(); err != nil {
                errorLog.Printf("Unable to save timestamp: %s", err)
            }
        }
    }
    // ...
}


什么时候获取lastTs 呢,在每次有新的更新数据来的时候,就会赋值 lastTs


    func (ic *indexClient) eventLoop() {
	
	...

	    for {
	    	select {
	    	...

            // 监听到有数据过来
	    	case op, open := <-ic.gtmCtx.OpC:
	    		if !ic.enabled {
	    			break
	    		}
	    		if op == nil {
	    			if !open && !allOpsVisited {
	    				allOpsVisited = true
	    				ic.opsConsumed <- true
	    			}
	    			break
	    		}
	    		// 判断是不是增量变更
	    		if op.IsSourceOplog() {
                    // 每次增量变更,都会保存赋值最新处理的时间戳
	    			ic.lastTs = op.Timestamp
	    			if ic.config.ResumeStrategy == tokenResumeStrategy {
	    				ic.tokens[op.ResumeToken.StreamID] = op.ResumeToken.ResumeToken
	    			}
	    		}
                // 根据更新的类型,进行不同的 route 来处理
	    		if err = ic.routeOp(op); err != nil {
	    			ic.processErr(err)
	    		}
	    	}
	    }
    }

如果是全量同步时,则会在完成全量同步时,保存副本集中最小提交的时间戳,用于重启增量同步


func (ic *indexClient) startReadWait() {
	directReadsEnabled := len(ic.config.DirectReadNs) > 0
	if directReadsEnabled {
		exitAfterDirectReads := ic.config.ExitAfterDirectReads
		go func() {
			ic.gtmCtx.DirectReadWg.Wait()
			// 完成全量同步后,保存副本集最小时间戳
            if ic.config.Resume {
				ic.saveTimestampFromReplStatus()
			}
			if exitAfterDirectReads {
				var exit bool
				ic.rwmutex.RLock()
				exit = !ic.externalShutdown
				ic.rwmutex.RUnlock()
				if exit {
					ic.stopAllWorkers()
					ic.doneC <- 30
				}
			}
		}()
	}
}


func (ic *indexClient) saveTimestampFromReplStatus() {
	if rs, err := gtm.GetReplStatus(ic.mongo); err == nil {
		if ic.lastTs, err = rs.GetLastCommitted(); err == nil {
			if err = ic.saveTimestamp(); err != nil {
				ic.processErr(err)
			}
		} else {
			ic.processErr(err)
		}
	} else {
		ic.saveTimestampFromServerStatus()
	}
}

启动时,也会获取时间戳


func (ic *indexClient) buildTimestampGen() gtm.TimestampGenerator {
	var after gtm.TimestampGenerator
	config := ic.config
	if config.ResumeStrategy != timestampResumeStrategy {
		return after
	}
	
    
    ...

    else if config.Resume {
		after = func(client *mongo.Client, options *gtm.Options) (primitive.Timestamp, error) {
			var candidateTs primitive.Timestamp
			var tsSource string
			var err error
			col := client.Database(config.ConfigDatabaseName).Collection("monstache")
			result := col.FindOne(context.Background(), bson.M{
				"_id": config.ResumeName,
			})
			if err = result.Err(); err == nil {
				doc := make(map[string]interface{})
				if err = result.Decode(&doc); err == nil {
					if doc["ts"] != nil {
						candidateTs = doc["ts"].(primitive.Timestamp)
						candidateTs.I++
						tsSource = oplog.TS_SOURCE_MONSTACHE
					}
				}
			}
            // 如果从 configDBName 中没有获取到时间戳,就取mongo 中最新的时间戳
			if candidateTs.T == 0 {
				candidateTs, _ = gtm.LastOpTimestamp(client, options)
				tsSource = oplog.TS_SOURCE_OPLOG
			}

			ts := <-ic.oplogTsResolver.GetResumeTimestamp(candidateTs, tsSource)
			infoLog.Printf("Resuming from timestamp %+v", ts)
			return ts, nil
		}
	}
	return after
}



startListen 后,当成为主节点,开始链接 mongo,同时配置数据,启动服务监听来自 mongo 的数据

func (ic *indexClient) startListen() {
	config := ic.config
	conns := ic.buildConnections()

	if config.ResumeStrategy == timestampResumeStrategy {
        // 从最早副本集的记录的时间戳开始恢复
		if config.ResumeFromEarliestTimestamp {
			ic.oplogTsResolver = oplog.NewTimestampResolverEarliest(len(conns), infoLog)
		} else {
			ic.oplogTsResolver = oplog.TimestampResolverSimple{}
		}
	}


	gtmOpts := ic.buildGtmOptions()
	ic.gtmCtx = gtm.StartMulti(conns, gtmOpts)
	if config.readShards() && !config.DisableChangeEvents {
		ic.gtmCtx.AddShardListener(ic.mongoConfig, gtmOpts, config.makeShardInsertHandler())
	}
}

下面的时间戳处理器,是给分片集群使用的,此时connectionsTotal 会大于 1,然后算出最小的时间戳,每个分片连接都会调用这个 after 函数,此时会给每个mongo分片传递这个最早的时间戳(有可能每个分片的数据有关联,然后使用最小时间戳,会把关联的文档都会处理了),从最小时间戳开始同步

func (resolver *TimestampResolverEarliest) GetResumeTimestamp(candidateTs primitive.Timestamp, source string) chan primitive.Timestamp {
	resolver.m.Lock()
	defer resolver.m.Unlock()

	if resolver.connectionsQueried >= resolver.connectionsTotal {
		// in this case, an earliest timestamp is already calculated,
		// so it is just returned in a temporary channel
		resolver.logger.Printf(
			"Earliest oplog resume timestamp is already calculated: %s",
			tsToString(resolver.earliestTs),
		)
		tmpResultChan := make(chan primitive.Timestamp, 1)
		tmpResultChan <- resolver.earliestTs
		return tmpResultChan
	}

	resolver.connectionsQueried++
	resolver.updateEarliestTs(source, candidateTs)

	// if this function has been called for every mongodb connection,
	// then a final earliest resume timestamp can be returned to every caller
	if resolver.connectionsQueried == resolver.connectionsTotal {
		resolver.logger.Printf(
			"Earliest oplog resume timestamp calculated: %s, source: %s",
			tsToString(resolver.earliestTs),
			resolver.earliestTsSource,
		)

		for i := 0; i < resolver.connectionsTotal; i++ {
			resolver.resultChan <- resolver.earliestTs
		}
	}

	return resolver.resultChan
}
  1. monstache 保存的时间戳 > oplog 最新时间戳
  2. buildConnections 处理监听比如分片集群的connection,副本集不会出现这种场景
  3. 处理恢复时间戳的策略
    1. NewTimestampResolverEarliest 处理基于副本集最早时间戳开始恢复
    2. TimestampResolverSimple 从记录的时间戳后面开始恢复
  4. buildGtmOptions
    1. 配置各种 filter chain, 直接读取的 namespace
    2. 开始同步数据的时间戳生成器
      3. ResumeFromTimestamp 基于给定的时间戳开始同步
      4. Resume 为 true,从上次保存的时间戳开始同步
      5. 基于时间戳策略,读取 opLog 的时间戳,然后进行同步
      6. Replay 则是从 mongo 记录的 oplog 的时间戳开始同步
  5. StartMulti 开启多个 client 的监听
  6. 如果配置了分片,则监听分片集群AddShardListener

5. 插件管理

monstache 支持使用自定义的插件,来处理数据流

各插件的作用

  • Pipeline: 作用于数据源端(MongoDB 侧),在数据被读取之前应用,修改从 MongoDB 获取的数据流
  • Map: 作用于单个文档上,在文档读取后、索引前应用
  • Filter: 决定是否处理某个文档,在读取文档后立即应用
  • Process: 对clone 的文档做处理

在有数据更新时,会进行 routeOp 处理:


func (ic *indexClient) routeOp(op *gtm.Op) (err error) {
	//  如果设置了 processPlugin
    if processPlugin != nil {
		err = ic.routeProcess(op)
	}
	if op.IsDrop() {
		err = ic.routeDrop(op)
	} else if op.IsDelete() {
		err = ic.routeDelete(op)
	} else if op.Data != nil {
		err = ic.routeData(op)
	}
	return
}

在 routeData 中,会使用 mapperPlugin 来处理数据:

func (ic *indexClient) routeData(op *gtm.Op) (err error) {
   skip := false
   if op.IsSourceOplog() && len(ic.config.Relate) > 0 {
   	skip, err = ic.routeDataRelate(op)
   }
   if !skip {
   	if ic.hasFileContent(op) {
   		ic.fileC <- op
   	} else {
   		ic.indexC <- op
   	}
   }
   return
}


// 在 indexC 有数据后,就会执行doIndex
func (ic *indexClient) doIndex(op *gtm.Op) (err error) {
   if err = ic.mapData(op); err == nil {
   	if op.Data != nil {
   		err = ic.doIndexing(op)
   	} else if op.IsUpdate() {
   		ic.doDelete(op)
   	}
   }
   return
}

func (ic *indexClient) mapData(op *gtm.Op) error {
   if mapperPlugin != nil {
   	return ic.mapDataGolang(op)
   }
   return ic.mapDataJavascript(op)
}


func (ic *indexClient) mapDataGolang(op *gtm.Op) error {
   input := &monstachemap.MapperPluginInput{
   	Document:          op.Data,
   	Namespace:         op.Namespace,
   	Database:          op.GetDatabase(),
   	Collection:        op.GetCollection(),
   	Operation:         op.Operation,
   	MongoClient:       ic.mongo,
   	UpdateDescription: op.UpdateDescription,
   }
   // 这里会执行 mapperPlugin
   output, err := mapperPlugin(input)
   if err != nil {
   	return err
   }
   if output == nil {
   	return nil
   }
   if output.Drop {
   	op.Data = nil
   } else {
   	if output.Skip {
   		op.Data = map[string]interface{}{}
   	} else if !output.Passthrough {
   		if output.Document == nil {
   			return errors.New("Map function must return a non-nil document")
   		}
   		op.Data = output.Document
   	}
   	meta := make(map[string]interface{})
   	if output.Skip {
   		meta["skip"] = true
   	}
   	if output.Index != "" {
   		meta["index"] = output.Index
   	}
   	if output.ID != "" {
   		meta["id"] = output.ID
   	}
   	if output.Type != "" {
   		meta["type"] = output.Type
   	}
   	if output.Routing != "" {
   		meta["routing"] = output.Routing
   	}
   	if output.Parent != "" {
   		meta["parent"] = output.Parent
   	}
   	if output.Version != 0 {
   		meta["version"] = output.Version
   	}
   	if output.VersionType != "" {
   		meta["versionType"] = output.VersionType
   	}
   	if output.Pipeline != "" {
   		meta["pipeline"] = output.Pipeline
   	}
   	if output.RetryOnConflict != 0 {
   		meta["retryOnConflict"] = output.RetryOnConflict
   	}
   	if len(meta) > 0 {
   		op.Data["_meta_monstache"] = meta
   	}
   }
   return nil
}

filter plugin 会在开始的时候,在读取 directNameSpace 和 监听新来的数据是作为 filter chain 进行处理


func (ic *indexClient) buildGtmOptions() *gtm.Options {
   var nsFilter, filter, directReadFilter gtm.OpFilter
   config := ic.config
   filterChain := ic.buildFilterChain()
   filterArray := ic.buildFilterArray()
   nsFilter = gtm.ChainOpFilters(filterChain...)
   filter = gtm.ChainOpFilters(filterArray...)
   directReadFilter = gtm.ChainOpFilters(filterArray...)

}


func (ic *indexClient) buildFilterArray() []gtm.OpFilter {
   config := ic.config
   filterArray := []gtm.OpFilter{}
   var pluginFilter gtm.OpFilter
   if config.Worker != "" {
   	workerFilter, err := consistent.ConsistentHashFilter(config.Worker, config.Workers)
   	if err != nil {
   		errorLog.Fatalln(err)
   	}
   	filterArray = append(filterArray, workerFilter)
   } else if config.Workers != nil {
   	errorLog.Fatalln("Workers configured but this worker is undefined. worker must be set to one of the workers.")
   }
   if filterPlugin != nil {
       // 设置 filter plugin
   	pluginFilter = filterWithPlugin(ic.mongo)
   	filterArray = append(filterArray, pluginFilter)
   } else if len(filterEnvs) > 0 {
   	pluginFilter = filterWithScript()
   	filterArray = append(filterArray, pluginFilter)
   }
   if pluginFilter != nil {
   	ic.filter = pluginFilter
   }
   return filterArray
}


6. 高可用处理

可以在启动时,设置不同的 workers 来并行处理不同的 opLog 的更新,这个时候需要将 opLog 分派给不同的 worker,使得数据一致性,不能重复处理,这个是通过设置 hash 来进行处理

在 buildFilterArray 的时候,通过设置 workerFilter,opLog 经过hash 函数处理后,只会被路由到某一个 worker


func (ic *indexClient) buildFilterArray() []gtm.OpFilter {
	config := ic.config
	filterArray := []gtm.OpFilter{}
	var pluginFilter gtm.OpFilter
	if config.Worker != "" {
        // opLog 经过hash 函数处理后,只会被路由到某一个 worker
		workerFilter, err := consistent.ConsistentHashFilter(config.Worker, config.Workers)
		if err != nil {
			errorLog.Fatalln(err)
		}
		filterArray = append(filterArray, workerFilter)
	} else if config.Workers != nil {
		errorLog.Fatalln("Workers configured but this worker is undefined. worker must be set to one of the workers.")
	}
	if filterPlugin != nil {
        // 设置 filter plugin
		pluginFilter = filterWithPlugin(ic.mongo)
		filterArray = append(filterArray, pluginFilter)
	} else if len(filterEnvs) > 0 {
		pluginFilter = filterWithScript()
		filterArray = append(filterArray, pluginFilter)
	}
	if pluginFilter != nil {
		ic.filter = pluginFilter
	}
	return filterArray
}


最后在赋一张 cursor 生成的全流程图的描述:

detail_process