最近在做一个 mongoDB 的存储数据到 es 中进行检索的工作,也是通过此文学习一下 monstache 是如何做同步,以及管理可用性和一致性的,同时学习 golang 相关的 chan 的生命周期
当我们想实现检索 mongoDB 中的数据,通过 like 的方式在 DB 中检索肯定是不行的,如何根据关键词搜索出对应用户之前相关的记录,最好的方式还是放到搜索引擎 es 中,由于之前数据从没有同步到 monstache 过,所以需要使用一些工具来实现同步数据的功能,可能在 mongoDB 中的数据,不一定是检索想要的,也可以通过编写工具来进行多个集合的关联处理,在mapping 过数据之后,再同步到 es 中来方便检索。
使用工具到 es 有多种方式:
monstache
Monstache基于MongoDB的oplog实现实时数据同步及订阅,支持MongoDB与高版本Elasticsearch之间的数据同步,同时支持MongoDB的变更流和聚合管道功能,并且拥有丰富的特性。
Monstache不仅支持软删除和硬删除,还支持数据库删除和集合删除,能够确保Elasticsearch端实时与源端数据保持一致。
flink cdc
支持数据的全量同步和增量同步,同时支持灵活的多目标源数据库的写入,
logstache
支持数据的全量同步或增量同步,不够灵活的处理多数据的关联处理和映射。
使用 cursor 对monstache 进行总结:
全量同步
可以在 config.toml 中配置toml:"direct-read-namespaces"
,使用 mongoDB 客户端直接查询指定的 namespace
将查询结果按批次处理并索引到 Elasticsearch
增量同步
支持 oplog 和 change stream 两种增量同步方式。
文档处理流程
mongoDB 文档经过以下处理后同步到 ES:
筛选: 通过 filterWithRegex 等函数确定哪些文档需要处理
转换: 通过 JavaScript 脚本、Go 插件或内联规则转换文档
关联: 处理关联的文档
索引: 最终将文档批量发送到 Elasticsearch
高可用/集群实现
clusterName 配置: 同一集群的节点共享 ClusterName
worker 配置: 同一集群不同worker 节点就是 clusterName + workerName
leader election: 同一 worker 可以有多个备用节点,使用 mongoDB 作为锁(使用 index ttl ) + 心跳来实现领导选举
状态管理: 共享最后同步的时间戳(lastTs)到 mongoDB 集合中
断点续传
monstache 支持断点续传,保证重启后能从上次处理的位置继续:
根据 ResumeStrategy 选择时间戳或 token 方式
将处理状态(lastTs或 token)存储在 mongoDB,重启时进行检索
监控服务器
提供 http 服务器用于监控和管理
健康检查
同步统计
实例信息
eventLoop 处理
处理各种事件和信号
回退处理
当 es 同步数据出错后,会基于熔断的机制,进行恢复
下面基于这几个模块进行分析:
数据处理
管道处理以及优雅下线
回退处理
断点续传
插件管理
高可用处理
1. 数据处理
不管是 directNamespace 进行全量同步,还是监听增量的变更,都会写入到同一管道进行处理:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 func (ic *indexClient) eventLoop() { ... for { select { ... case op, open := <-ic.gtmCtx.OpC: if !ic.enabled { break } if op == nil { if !open && !allOpsVisited { allOpsVisited = true ic.opsConsumed <- true } break } if op.IsSourceOplog() { ic.lastTs = op.Timestamp if ic.config.ResumeStrategy == tokenResumeStrategy { ic.tokens[op.ResumeToken.StreamID] = op.ResumeToken.ResumeToken } } if err = ic.routeOp(op); err != nil { ic.processErr(err) } } } } func (ic *indexClient) routeOp(op *gtm.Op) (err error ) { if processPlugin != nil { err = ic.routeProcess(op) } if op.IsDrop() { err = ic.routeDrop(op) } else if op.IsDelete() { err = ic.routeDelete(op) } else if op.Data != nil { err = ic.routeData(op) } return } func (ic *indexClient) routeData(op *gtm.Op) (err error ) { skip := false if op.IsSourceOplog() && len (ic.config.Relate) > 0 { skip, err = ic.routeDataRelate(op) } if !skip { if ic.hasFileContent(op) { ic.fileC <- op } else { ic.indexC <- op } } return } func (ic *indexClient) startIndex() { for i := 0 ; i < 5 ; i++ { ic.indexWg.Add(1 ) go func () { defer ic.indexWg.Done() for op := range ic.indexC { if err := ic.doIndex(op); err != nil { ic.processErr(err) } } }() } }
2. 管道以及处理生命周期
这里涉及多管道数据处理,以及 graceful shut down 等处理,需了解到各管道之间的数据流向和处理关系,下面是一个大概的管道数据关系
这里重点说明一下优雅关闭下的各管道处理流程,首先,是在什么情况下会进行优雅关闭:
设置了全量读取完成后,直接退出
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 func (ic *indexClient) startReadWait() { directReadsEnabled := len (ic.config.DirectReadNs) > 0 if directReadsEnabled { exitAfterDirectReads := ic.config.ExitAfterDirectReads go func () { ic.gtmCtx.DirectReadWg.Wait() if ic.config.Resume { ic.saveTimestampFromReplStatus() } if exitAfterDirectReads { var exit bool ic.rwmutex.RLock() exit = !ic.externalShutdown ic.rwmutex.RUnlock() if exit { ic.stopAllWorkers() ic.doneC <- 30 } } }() } }
monstache 会在最开始的时候,监听系统信号,当监听到退出信号,也会进行优雅退出
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 func (sh *sigHandler) start() { go func () { sigs := make (chan os.Signal, 1 ) signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM, syscall.SIGKILL) select { case <-sigs: os.Exit(0 ) case ic := <-sh.clientStartedC: <-sigs ic.onExternalShutdown() go func () { <-sigs infoLog.Println("Forcing shutdown, bye bye..." ) os.Exit(1 ) }() infoLog.Println("Starting clean shutdown" ) ic.stopAllWorkers() ic.doneC <- 10 } }() }
onExternalShutdown处理的是当意外退出后,需要根据是否开启全量写入状态,等待 500ms 后,在停止所有 workers 和客户端
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 func (ic *indexClient) onExternalShutdown() { ic.rwmutex.Lock() defer ic.rwmutex.Unlock() ic.externalShutdown = true ic.checkDirectReads() }func (ic *indexClient) checkDirectReads() { if len (ic.config.DirectReadNs) == 0 { return } drc := ic.directReadChan() t := time.NewTimer(time.Duration(500 ) * time.Millisecond) defer t.Stop() select { case <-t.C: ic.directReadsPending = true case <-drc: } }func (ic *indexClient) directReadChan() chan struct {} { c := make (chan struct {}) go func () { ic.gtmCtx.DirectReadWg.Wait() close (c) }() return c }
stopAllWorkers 做的事情就是:
先停止监听数据变化
直到没有数据会过来后,等待关闭相关的文档处理chan
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 func (ic *indexClient) stopAllWorkers() { infoLog.Println("Stopping all workers" ) ic.gtmCtx.Stop() <-ic.opsConsumed close (ic.relateC) ic.relateWg.Wait() close (ic.fileC) ic.fileWg.Wait() close (ic.indexC) ic.indexWg.Wait() close (ic.processC) ic.processWg.Wait() }
doneC 就是一个监听开始关闭客户端的信道,同时设置超时关闭时间,当收到要关闭客户端后,进行超时关闭
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 func (ic *indexClient) eventLoop() { ... for { select { ... case timeout := <-ic.doneC: ic.enabled = false ic.shutdown(timeout) return } } }func (ic *indexClient) shutdown(timeout int ) { infoLog.Println("Shutting down" ) go ic.closeClient() doneC := make (chan bool ) go func () { closeT := time.NewTimer(time.Duration(timeout) * time.Second) defer closeT.Stop() done := false for !done { select { case <-ic.closeC: done = true close (doneC) case <-closeT.C: done = true close (doneC) } } }() <-doneC os.Exit(exitStatus) }func (ic *indexClient) closeClient() { if ic.mongo != nil && ic.config.ClusterName != "" { ic.resetClusterState() } if ic.hsc != nil { ic.hsc.shutdown = true ic.hsc.httpServer.Shutdown(context.Background()) } if ic.bulk != nil { ic.bulk.Close() } if ic.bulkStats != nil { ic.bulkStats.Close() } if len (ic.config.DirectReadNs) > 0 { ic.rwmutex.RLock() if !ic.directReadsPending { infoLog.Println("Direct reads completed" ) if ic.config.DirectReadStateful { if err := ic.saveDirectReadNamespaces(); err != nil { errorLog.Printf("Error saving direct read state: %s" , err) } } } ic.rwmutex.RUnlock() } close (ic.closeC) }
3. 回退处理
es bulk server 设置 backoff 函数
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 func (ic *indexClient) newBulkProcessor(client *elastic.Client) (bulk *elastic.BulkProcessor, err error ) { config := ic.config bulkService := client.BulkProcessor().Name("monstache" ) bulkService.Workers(config.ElasticMaxConns) bulkService.Stats(config.Stats) bulkService.BulkActions(config.ElasticMaxDocs) bulkService.BulkSize(config.ElasticMaxBytes) if config.ElasticRetry == false { bulkService.Backoff(&elastic.StopBackoff{}) } bulkService.After(ic.afterBulk()) bulkService.FlushInterval(time.Duration(config.ElasticMaxSeconds) * time.Second) return bulkService.Do(context.Background()) } func (ic *indexClient) afterBulk() func (int64 , []elastic.BulkableRequest, *elastic.BulkResponse, error ) { return func (executionID int64 , requests []elastic.BulkableRequest, response *elastic.BulkResponse, err error ) { if response == nil || !response.Errors { ic.bulkErrs.Store(0 ) return } if failed := response.Failed(); failed != nil { backoff := false for _, item := range failed { if item.Status == http.StatusConflict { continue } logFailedResponseItem(item) if item.Status == http.StatusNotFound { continue } backoff = true } if backoff { wait := ic.backoffDuration() infoLog.Printf("Backing off for %.1f minutes after bulk indexing failures." , wait.Minutes()) ic.bulkBackoffC <- wait ic.backoff(wait) ic.bulkErrs.Add(1 ) } } } }
主进程 eventLoop 中会监听这个 bulkBackoffC ,当监听到之后,会根据设置的超时时间,自旋一段时间, 当收到信号,也会主动退出自旋,然后主进程监听到信号,也会主动进行 shutdown,也会响应状态探测函数
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 func (ic *indexClient) backoffDuration() time.Duration { consecutiveErrors := int (ic.bulkErrs.Load()) wait, ok := ic.bulkBackoff.Next(consecutiveErrors) if !ok { wait = ic.bulkBackoffMax } return wait }func (ic *indexClient) backoff(wait time.Duration) { timer := time.NewTimer(wait) defer timer.Stop() sigs := make (chan os.Signal, 1 ) signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM) defer signal.Stop(sigs) for { select { case <-timer.C: return case <-sigs: return case req := <-ic.statusReqC: enabled, lastTs := ic.enabled, ic.lastTs statusResp := &statusResponse{ enabled: enabled, lastTs: lastTs, backoff: true , } req.responseC <- statusResp } } }func (ic *indexClient) eventLoop() { ... for { select { case wait := <-ic.bulkBackoffC: ic.backoff(wait) } } }
4. 断点续传
保存时间戳的核心函数: saveTimestamp() , 每次保存最新的时间戳到ConfigDatabaseName = cluster + worker
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 func (ic *indexClient) saveTimestamp() error { col := ic.mongoWriter.Database(ic.config.ConfigDatabaseName).Collection("monstache" ) doc := map [string ]interface {}{ "ts" : ic.lastTs, } opts := options.Update() opts.SetUpsert(true ) _, err := col.UpdateOne(context.Background(), bson.M{ "_id" : ic.config.ResumeName, }, bson.M{ "$set" : doc, }, opts) return err }
时间戳更新机制: 每隔 10s 执行一次,根据重启策略,来进行保存时间戳或是 token
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 func (ic *indexClient) eventLoop() { timestampTicker := time.NewTicker(10 * time.Second) if !ic.config.Resume { timestampTicker.Stop() } for { select { case <-timestampTicker.C: if !ic.enabled { break } if ic.config.ResumeStrategy == tokenResumeStrategy { ic.nextTokens() } else { ic.nextTimestamp() } } } }
主要解释时间戳机制: nextTimestamp()
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 func (ic *indexClient) nextTimestamp() { if ic.hasNewEvents() { ic.bulk.Flush() if err := ic.saveTimestamp(); err == nil { ic.lastTsSaved = ic.lastTs } else { ic.processErr(err) } } }func (ic *indexClient) hasNewEvents() bool { if ic.lastTs.T > ic.lastTsSaved.T || (ic.lastTs.T == ic.lastTsSaved.T && ic.lastTs.I > ic.lastTsSaved.I) { return true } return false }
在优雅关闭时,也会保存一次时间戳,避免心跳还没有执行
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 func (ic *indexClient) shutdown(timeout int ) { if ic.config.Resume { if ic.config.ResumeStrategy == tokenResumeStrategy { if err := ic.saveTokens(); err != nil { errorLog.Printf("Unable to save tokens: %s" , err) } } else { if err := ic.saveTimestamp(); err != nil { errorLog.Printf("Unable to save timestamp: %s" , err) } } } }
什么时候获取lastTs 呢,在每次有新的更新数据来的时候,就会赋值 lastTs
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 func (ic *indexClient) eventLoop() { ... for { select { ... case op, open := <-ic.gtmCtx.OpC: if !ic.enabled { break } if op == nil { if !open && !allOpsVisited { allOpsVisited = true ic.opsConsumed <- true } break } if op.IsSourceOplog() { ic.lastTs = op.Timestamp if ic.config.ResumeStrategy == tokenResumeStrategy { ic.tokens[op.ResumeToken.StreamID] = op.ResumeToken.ResumeToken } } if err = ic.routeOp(op); err != nil { ic.processErr(err) } } } }
如果是全量同步时,则会在完成全量同步时,保存副本集中最小提交的时间戳,用于重启增量同步
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 func (ic *indexClient) startReadWait() { directReadsEnabled := len (ic.config.DirectReadNs) > 0 if directReadsEnabled { exitAfterDirectReads := ic.config.ExitAfterDirectReads go func () { ic.gtmCtx.DirectReadWg.Wait() if ic.config.Resume { ic.saveTimestampFromReplStatus() } if exitAfterDirectReads { var exit bool ic.rwmutex.RLock() exit = !ic.externalShutdown ic.rwmutex.RUnlock() if exit { ic.stopAllWorkers() ic.doneC <- 30 } } }() } }func (ic *indexClient) saveTimestampFromReplStatus() { if rs, err := gtm.GetReplStatus(ic.mongo); err == nil { if ic.lastTs, err = rs.GetLastCommitted(); err == nil { if err = ic.saveTimestamp(); err != nil { ic.processErr(err) } } else { ic.processErr(err) } } else { ic.saveTimestampFromServerStatus() } }
启动时,也会获取时间戳
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 func (ic *indexClient) buildTimestampGen() gtm.TimestampGenerator { var after gtm.TimestampGenerator config := ic.config if config.ResumeStrategy != timestampResumeStrategy { return after } ... else if config.Resume { after = func (client *mongo.Client, options *gtm.Options) (primitive.Timestamp, error ) { var candidateTs primitive.Timestamp var tsSource string var err error col := client.Database(config.ConfigDatabaseName).Collection("monstache" ) result := col.FindOne(context.Background(), bson.M{ "_id" : config.ResumeName, }) if err = result.Err(); err == nil { doc := make (map [string ]interface {}) if err = result.Decode(&doc); err == nil { if doc["ts" ] != nil { candidateTs = doc["ts" ].(primitive.Timestamp) candidateTs.I++ tsSource = oplog.TS_SOURCE_MONSTACHE } } } if candidateTs.T == 0 { candidateTs, _ = gtm.LastOpTimestamp(client, options) tsSource = oplog.TS_SOURCE_OPLOG } ts := <-ic.oplogTsResolver.GetResumeTimestamp(candidateTs, tsSource) infoLog.Printf("Resuming from timestamp %+v" , ts) return ts, nil } } return after }
startListen 后,当成为主节点,开始链接 mongo,同时配置数据,启动服务监听来自 mongo 的数据
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 func (ic *indexClient) startListen() { config := ic.config conns := ic.buildConnections() if config.ResumeStrategy == timestampResumeStrategy { if config.ResumeFromEarliestTimestamp { ic.oplogTsResolver = oplog.NewTimestampResolverEarliest(len (conns), infoLog) } else { ic.oplogTsResolver = oplog.TimestampResolverSimple{} } } gtmOpts := ic.buildGtmOptions() ic.gtmCtx = gtm.StartMulti(conns, gtmOpts) if config.readShards() && !config.DisableChangeEvents { ic.gtmCtx.AddShardListener(ic.mongoConfig, gtmOpts, config.makeShardInsertHandler()) } }
下面的时间戳处理器,是给分片集群使用的,此时connectionsTotal 会大于 1,然后算出最小的时间戳,每个分片连接都会调用这个 after 函数,此时会给每个mongo分片传递这个最早的时间戳(有可能每个分片的数据有关联,然后使用最小时间戳,会把关联的文档都会处理了),从最小时间戳开始同步
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 func (resolver *TimestampResolverEarliest) GetResumeTimestamp(candidateTs primitive.Timestamp, source string ) chan primitive.Timestamp { resolver.m.Lock() defer resolver.m.Unlock() if resolver.connectionsQueried >= resolver.connectionsTotal { resolver.logger.Printf( "Earliest oplog resume timestamp is already calculated: %s" , tsToString(resolver.earliestTs), ) tmpResultChan := make (chan primitive.Timestamp, 1 ) tmpResultChan <- resolver.earliestTs return tmpResultChan } resolver.connectionsQueried++ resolver.updateEarliestTs(source, candidateTs) if resolver.connectionsQueried == resolver.connectionsTotal { resolver.logger.Printf( "Earliest oplog resume timestamp calculated: %s, source: %s" , tsToString(resolver.earliestTs), resolver.earliestTsSource, ) for i := 0 ; i < resolver.connectionsTotal; i++ { resolver.resultChan <- resolver.earliestTs } } return resolver.resultChan }
monstache 保存的时间戳 > oplog 最新时间戳
buildConnections 处理监听比如分片集群的connection,副本集不会出现这种场景
处理恢复时间戳的策略
NewTimestampResolverEarliest 处理基于副本集最早时间戳开始恢复
TimestampResolverSimple 从记录的时间戳后面开始恢复
buildGtmOptions
配置各种 filter chain, 直接读取的 namespace
开始同步数据的时间戳生成器
ResumeFromTimestamp 基于给定的时间戳开始同步
Resume 为 true,从上次保存的时间戳开始同步
基于时间戳策略,读取 opLog 的时间戳,然后进行同步
Replay 则是从 mongo 记录的 oplog 的时间戳开始同步
StartMulti 开启多个 client 的监听
如果配置了分片,则监听分片集群AddShardListener
5. 插件管理
monstache 支持使用自定义的插件,来处理数据流
各插件的作用
Pipeline : 作用于数据源端(MongoDB 侧),在数据被读取之前应用,修改从 MongoDB 获取的数据流
Map : 作用于单个文档上,在文档读取后、索引前应用
Filter : 决定是否处理某个文档,在读取文档后立即应用
Process : 对clone 的文档做处理
在有数据更新时,会进行 routeOp 处理:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 func (ic *indexClient) routeOp(op *gtm.Op) (err error ) { if processPlugin != nil { err = ic.routeProcess(op) } if op.IsDrop() { err = ic.routeDrop(op) } else if op.IsDelete() { err = ic.routeDelete(op) } else if op.Data != nil { err = ic.routeData(op) } return }
在 routeData 中,会使用 mapperPlugin 来处理数据:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 func (ic *indexClient) routeData(op *gtm.Op) (err error ) { skip := false if op.IsSourceOplog() && len (ic.config.Relate) > 0 { skip, err = ic.routeDataRelate(op) } if !skip { if ic.hasFileContent(op) { ic.fileC <- op } else { ic.indexC <- op } } return }func (ic *indexClient) doIndex(op *gtm.Op) (err error ) { if err = ic.mapData(op); err == nil { if op.Data != nil { err = ic.doIndexing(op) } else if op.IsUpdate() { ic.doDelete(op) } } return }func (ic *indexClient) mapData(op *gtm.Op) error { if mapperPlugin != nil { return ic.mapDataGolang(op) } return ic.mapDataJavascript(op) }func (ic *indexClient) mapDataGolang(op *gtm.Op) error { input := &monstachemap.MapperPluginInput{ Document: op.Data, Namespace: op.Namespace, Database: op.GetDatabase(), Collection: op.GetCollection(), Operation: op.Operation, MongoClient: ic.mongo, UpdateDescription: op.UpdateDescription, } output, err := mapperPlugin(input) if err != nil { return err } if output == nil { return nil } if output.Drop { op.Data = nil } else { if output.Skip { op.Data = map [string ]interface {}{} } else if !output.Passthrough { if output.Document == nil { return errors.New("Map function must return a non-nil document" ) } op.Data = output.Document } meta := make (map [string ]interface {}) if output.Skip { meta["skip" ] = true } if output.Index != "" { meta["index" ] = output.Index } if output.ID != "" { meta["id" ] = output.ID } if output.Type != "" { meta["type" ] = output.Type } if output.Routing != "" { meta["routing" ] = output.Routing } if output.Parent != "" { meta["parent" ] = output.Parent } if output.Version != 0 { meta["version" ] = output.Version } if output.VersionType != "" { meta["versionType" ] = output.VersionType } if output.Pipeline != "" { meta["pipeline" ] = output.Pipeline } if output.RetryOnConflict != 0 { meta["retryOnConflict" ] = output.RetryOnConflict } if len (meta) > 0 { op.Data["_meta_monstache" ] = meta } } return nil }
filter plugin 会在开始的时候,在读取 directNameSpace 和 监听新来的数据是作为 filter chain 进行处理
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 func (ic *indexClient) buildGtmOptions() *gtm.Options { var nsFilter, filter, directReadFilter gtm.OpFilter config := ic.config filterChain := ic.buildFilterChain() filterArray := ic.buildFilterArray() nsFilter = gtm.ChainOpFilters(filterChain...) filter = gtm.ChainOpFilters(filterArray...) directReadFilter = gtm.ChainOpFilters(filterArray...) } func (ic *indexClient) buildFilterArray() []gtm.OpFilter { config := ic.config filterArray := []gtm.OpFilter{} var pluginFilter gtm.OpFilter if config.Worker != "" { workerFilter, err := consistent.ConsistentHashFilter(config.Worker, config.Workers) if err != nil { errorLog.Fatalln(err) } filterArray = append (filterArray, workerFilter) } else if config.Workers != nil { errorLog.Fatalln("Workers configured but this worker is undefined. worker must be set to one of the workers." ) } if filterPlugin != nil { pluginFilter = filterWithPlugin(ic.mongo) filterArray = append (filterArray, pluginFilter) } else if len (filterEnvs) > 0 { pluginFilter = filterWithScript() filterArray = append (filterArray, pluginFilter) } if pluginFilter != nil { ic.filter = pluginFilter } return filterArray }
6. 高可用处理
可以在启动时,设置不同的 workers 来并行处理不同的 opLog 的更新,这个时候需要将 opLog 分派给不同的 worker,使得数据一致性,不能重复处理,这个是通过设置 hash 来进行处理
在 buildFilterArray 的时候,通过设置 workerFilter,opLog 经过hash 函数处理后,只会被路由到某一个 worker
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 func (ic *indexClient) buildFilterArray() []gtm.OpFilter { config := ic.config filterArray := []gtm.OpFilter{} var pluginFilter gtm.OpFilter if config.Worker != "" { workerFilter, err := consistent.ConsistentHashFilter(config.Worker, config.Workers) if err != nil { errorLog.Fatalln(err) } filterArray = append (filterArray, workerFilter) } else if config.Workers != nil { errorLog.Fatalln("Workers configured but this worker is undefined. worker must be set to one of the workers." ) } if filterPlugin != nil { pluginFilter = filterWithPlugin(ic.mongo) filterArray = append (filterArray, pluginFilter) } else if len (filterEnvs) > 0 { pluginFilter = filterWithScript() filterArray = append (filterArray, pluginFilter) } if pluginFilter != nil { ic.filter = pluginFilter } return filterArray }
最后在赋一张 cursor 生成的全流程图的描述: