engine.go 21 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790
  1. // Copyright 2013 Hui Chen
  2. // Copyright 2016 ego authors
  3. //
  4. // Licensed under the Apache License, Version 2.0 (the "License"): you may
  5. // not use this file except in compliance with the License. You may obtain
  6. // a copy of the License at
  7. //
  8. // http://www.apache.org/licenses/LICENSE-2.0
  9. //
  10. // Unless required by applicable law or agreed to in writing, software
  11. // distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
  12. // WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
  13. // License for the specific language governing permissions and limitations
  14. // under the License.
  15. /*
  16. Package riot is riot engine
  17. */
  18. package riot
  19. import (
  20. "fmt"
  21. "log"
  22. "os"
  23. "runtime"
  24. "sort"
  25. "strconv"
  26. "strings"
  27. "sync"
  28. "time"
  29. // "reflect"
  30. "sync/atomic"
  31. "github.com/go-ego/riot/core"
  32. "github.com/go-ego/riot/store"
  33. "github.com/go-ego/riot/types"
  34. "github.com/go-ego/riot/utils"
  35. "github.com/go-ego/gse"
  36. "github.com/go-ego/murmur"
  37. "github.com/shirou/gopsutil/mem"
  38. )
  39. const (
  40. // Version get the riot version
  41. Version string = "v0.10.0.425, Danube River!"
  42. // NumNanosecondsInAMillisecond nano-seconds in a milli-second num
  43. NumNanosecondsInAMillisecond = 1000000
  44. // StoreFilePrefix persistent store file prefix
  45. StoreFilePrefix = "riot"
  46. // DefaultPath default db path
  47. DefaultPath = "./riot-index"
  48. )
  49. // GetVersion get the riot version
  50. func GetVersion() string {
  51. return Version
  52. }
  53. // Engine initialize the engine
  54. type Engine struct {
  55. loc sync.RWMutex
  56. // 计数器,用来统计有多少文档被索引等信息
  57. numDocsIndexed uint64
  58. numDocsRemoved uint64
  59. numDocsForceUpdated uint64
  60. numIndexingReqs uint64
  61. numRemovingReqs uint64
  62. numForceUpdatingReqs uint64
  63. numTokenIndexAdded uint64
  64. numDocsStored uint64
  65. // 记录初始化参数
  66. initOptions types.EngineOpts
  67. initialized bool
  68. indexers []core.Indexer
  69. rankers []core.Ranker
  70. segmenter gse.Segmenter
  71. loaded bool
  72. stopTokens StopTokens
  73. dbs []store.Store
  74. // 建立索引器使用的通信通道
  75. segmenterChan chan segmenterReq
  76. indexerAddDocChans []chan indexerAddDocReq
  77. indexerRemoveDocChans []chan indexerRemoveDocReq
  78. rankerAddDocChans []chan rankerAddDocReq
  79. // 建立排序器使用的通信通道
  80. indexerLookupChans []chan indexerLookupReq
  81. rankerRankChans []chan rankerRankReq
  82. rankerRemoveDocChans []chan rankerRemoveDocReq
  83. // 建立持久存储使用的通信通道
  84. storeIndexDocChans []chan storeIndexDocReq
  85. storeInitChan chan bool
  86. }
  87. // Indexer initialize the indexer channel
  88. func (engine *Engine) Indexer(options types.EngineOpts) {
  89. engine.indexerAddDocChans = make(
  90. []chan indexerAddDocReq, options.NumShards)
  91. engine.indexerRemoveDocChans = make(
  92. []chan indexerRemoveDocReq, options.NumShards)
  93. engine.indexerLookupChans = make(
  94. []chan indexerLookupReq, options.NumShards)
  95. for shard := 0; shard < options.NumShards; shard++ {
  96. engine.indexerAddDocChans[shard] = make(
  97. chan indexerAddDocReq, options.IndexerBufLen)
  98. engine.indexerRemoveDocChans[shard] = make(
  99. chan indexerRemoveDocReq, options.IndexerBufLen)
  100. engine.indexerLookupChans[shard] = make(
  101. chan indexerLookupReq, options.IndexerBufLen)
  102. }
  103. }
  104. // Ranker initialize the ranker channel
  105. func (engine *Engine) Ranker(options types.EngineOpts) {
  106. engine.rankerAddDocChans = make(
  107. []chan rankerAddDocReq, options.NumShards)
  108. engine.rankerRankChans = make(
  109. []chan rankerRankReq, options.NumShards)
  110. engine.rankerRemoveDocChans = make(
  111. []chan rankerRemoveDocReq, options.NumShards)
  112. for shard := 0; shard < options.NumShards; shard++ {
  113. engine.rankerAddDocChans[shard] = make(
  114. chan rankerAddDocReq, options.RankerBufLen)
  115. engine.rankerRankChans[shard] = make(
  116. chan rankerRankReq, options.RankerBufLen)
  117. engine.rankerRemoveDocChans[shard] = make(
  118. chan rankerRemoveDocReq, options.RankerBufLen)
  119. }
  120. }
  121. // InitStore initialize the persistent store channel
  122. func (engine *Engine) InitStore() {
  123. engine.storeIndexDocChans = make(
  124. []chan storeIndexDocReq, engine.initOptions.StoreShards)
  125. for shard := 0; shard < engine.initOptions.StoreShards; shard++ {
  126. engine.storeIndexDocChans[shard] = make(
  127. chan storeIndexDocReq)
  128. }
  129. engine.storeInitChan = make(
  130. chan bool, engine.initOptions.StoreShards)
  131. }
  132. // CheckMem check the memory when the memory is larger
  133. // than 99.99% using the store
  134. func (engine *Engine) CheckMem() {
  135. // Todo test
  136. if !engine.initOptions.UseStore {
  137. log.Println("Check virtualMemory...")
  138. vmem, _ := mem.VirtualMemory()
  139. log.Printf("Total: %v, Free: %v, UsedPercent: %f%%\n",
  140. vmem.Total, vmem.Free, vmem.UsedPercent)
  141. useMem := fmt.Sprintf("%.2f", vmem.UsedPercent)
  142. if useMem == "99.99" {
  143. engine.initOptions.UseStore = true
  144. engine.initOptions.StoreFolder = DefaultPath
  145. // os.MkdirAll(DefaultPath, 0777)
  146. }
  147. }
  148. }
  149. // Store start the persistent store work connection
  150. func (engine *Engine) Store() {
  151. // if engine.initOptions.UseStore {
  152. err := os.MkdirAll(engine.initOptions.StoreFolder, 0700)
  153. if err != nil {
  154. log.Fatalf("Can not create directory: %s ; %v",
  155. engine.initOptions.StoreFolder, err)
  156. }
  157. // 打开或者创建数据库
  158. engine.dbs = make([]store.Store, engine.initOptions.StoreShards)
  159. for shard := 0; shard < engine.initOptions.StoreShards; shard++ {
  160. dbPath := engine.initOptions.StoreFolder + "/" +
  161. StoreFilePrefix + "." + strconv.Itoa(shard)
  162. db, err := store.OpenStore(dbPath, engine.initOptions.StoreEngine)
  163. if db == nil || err != nil {
  164. log.Fatal("Unable to open database ", dbPath, ": ", err)
  165. }
  166. engine.dbs[shard] = db
  167. }
  168. // 从数据库中恢复
  169. for shard := 0; shard < engine.initOptions.StoreShards; shard++ {
  170. go engine.storeInitWorker(shard)
  171. }
  172. // 等待恢复完成
  173. for shard := 0; shard < engine.initOptions.StoreShards; shard++ {
  174. <-engine.storeInitChan
  175. }
  176. for {
  177. runtime.Gosched()
  178. engine.loc.RLock()
  179. numDoced := engine.numIndexingReqs == engine.numDocsIndexed
  180. engine.loc.RUnlock()
  181. if numDoced {
  182. break
  183. }
  184. }
  185. // 关闭并重新打开数据库
  186. for shard := 0; shard < engine.initOptions.StoreShards; shard++ {
  187. engine.dbs[shard].Close()
  188. dbPath := engine.initOptions.StoreFolder + "/" +
  189. StoreFilePrefix + "." + strconv.Itoa(shard)
  190. db, err := store.OpenStore(dbPath, engine.initOptions.StoreEngine)
  191. if db == nil || err != nil {
  192. log.Fatal("Unable to open database ", dbPath, ": ", err)
  193. }
  194. engine.dbs[shard] = db
  195. }
  196. for shard := 0; shard < engine.initOptions.StoreShards; shard++ {
  197. go engine.storeIndexDocWorker(shard)
  198. }
  199. // }
  200. }
  201. // WithGse Using user defined segmenter
  202. // If using a not nil segmenter and the dictionary is loaded,
  203. // the `opt.GseDict` will be ignore.
  204. func (engine *Engine) WithGse(segmenter gse.Segmenter) *Engine {
  205. if engine.initialized {
  206. log.Fatal(`Do not re-initialize the engine,
  207. WithGse should call before initialize the engine.`)
  208. }
  209. engine.segmenter = segmenter
  210. engine.loaded = true
  211. return engine
  212. }
  213. // Init initialize the engine
  214. func (engine *Engine) Init(options types.EngineOpts) {
  215. // 将线程数设置为CPU数
  216. // runtime.GOMAXPROCS(runtime.NumCPU())
  217. // runtime.GOMAXPROCS(128)
  218. // 初始化初始参数
  219. if engine.initialized {
  220. log.Fatal("Do not re-initialize the engine.")
  221. }
  222. if options.GseDict == "" && !options.NotUseGse && !engine.loaded {
  223. log.Printf("Dictionary file path is empty, load the default dictionary file.")
  224. options.GseDict = "zh"
  225. }
  226. if options.UseStore == true && options.StoreFolder == "" {
  227. log.Printf("Store file path is empty, use default folder path.")
  228. options.StoreFolder = DefaultPath
  229. // os.MkdirAll(DefaultPath, 0777)
  230. }
  231. options.Init()
  232. engine.initOptions = options
  233. engine.initialized = true
  234. if !options.NotUseGse {
  235. if !engine.loaded {
  236. // 载入分词器词典
  237. engine.segmenter.LoadDict(options.GseDict)
  238. engine.loaded = true
  239. }
  240. // 初始化停用词
  241. engine.stopTokens.Init(options.StopTokenFile)
  242. }
  243. // 初始化索引器和排序器
  244. for shard := 0; shard < options.NumShards; shard++ {
  245. engine.indexers = append(engine.indexers, core.Indexer{})
  246. engine.indexers[shard].Init(*options.IndexerOpts)
  247. engine.rankers = append(engine.rankers, core.Ranker{})
  248. engine.rankers[shard].Init(options.IDOnly)
  249. }
  250. // 初始化分词器通道
  251. engine.segmenterChan = make(
  252. chan segmenterReq, options.NumGseThreads)
  253. // 初始化索引器通道
  254. engine.Indexer(options)
  255. // 初始化排序器通道
  256. engine.Ranker(options)
  257. // engine.CheckMem(engine.initOptions.UseStore)
  258. engine.CheckMem()
  259. // 初始化持久化存储通道
  260. if engine.initOptions.UseStore {
  261. engine.InitStore()
  262. }
  263. // 启动分词器
  264. for iThread := 0; iThread < options.NumGseThreads; iThread++ {
  265. go engine.segmenterWorker()
  266. }
  267. // 启动索引器和排序器
  268. for shard := 0; shard < options.NumShards; shard++ {
  269. go engine.indexerAddDocWorker(shard)
  270. go engine.indexerRemoveDocWorker(shard)
  271. go engine.rankerAddDocWorker(shard)
  272. go engine.rankerRemoveDocWorker(shard)
  273. for i := 0; i < options.NumIndexerThreadsPerShard; i++ {
  274. go engine.indexerLookupWorker(shard)
  275. }
  276. for i := 0; i < options.NumRankerThreadsPerShard; i++ {
  277. go engine.rankerRankWorker(shard)
  278. }
  279. }
  280. // 启动持久化存储工作协程
  281. if engine.initOptions.UseStore {
  282. engine.Store()
  283. }
  284. atomic.AddUint64(&engine.numDocsStored, engine.numIndexingReqs)
  285. }
  286. // IndexDoc add the document to the index
  287. // 将文档加入索引
  288. //
  289. // 输入参数:
  290. // docId 标识文档编号,必须唯一,docId == 0 表示非法文档(用于强制刷新索引),[1, +oo) 表示合法文档
  291. // data 见 DocIndexData 注释
  292. // forceUpdate 是否强制刷新 cache,如果设为 true,则尽快添加到索引,否则等待 cache 满之后一次全量添加
  293. //
  294. // 注意:
  295. // 1. 这个函数是线程安全的,请尽可能并发调用以提高索引速度
  296. // 2. 这个函数调用是非同步的,也就是说在函数返回时有可能文档还没有加入索引中,因此
  297. // 如果立刻调用Search可能无法查询到这个文档。强制刷新索引请调用FlushIndex函数。
  298. func (engine *Engine) IndexDoc(docId uint64, data types.DocData,
  299. forceUpdate ...bool) {
  300. engine.Index(docId, data, forceUpdate...)
  301. }
  302. // Index add the document to the index
  303. func (engine *Engine) Index(docId uint64, data types.DocData,
  304. forceUpdate ...bool) {
  305. var force bool
  306. if len(forceUpdate) > 0 {
  307. force = forceUpdate[0]
  308. }
  309. // if engine.HasDoc(docId) {
  310. // engine.RemoveDoc(docId)
  311. // }
  312. // data.Tokens
  313. engine.internalIndexDoc(docId, data, force)
  314. hash := murmur.Sum32(fmt.Sprintf("%d", docId)) %
  315. uint32(engine.initOptions.StoreShards)
  316. if engine.initOptions.UseStore && docId != 0 {
  317. engine.storeIndexDocChans[hash] <- storeIndexDocReq{
  318. docId: docId, data: data}
  319. }
  320. }
  321. func (engine *Engine) internalIndexDoc(docId uint64, data types.DocData,
  322. forceUpdate bool) {
  323. if !engine.initialized {
  324. log.Fatal("The engine must be initialized first.")
  325. }
  326. if docId != 0 {
  327. atomic.AddUint64(&engine.numIndexingReqs, 1)
  328. }
  329. if forceUpdate {
  330. atomic.AddUint64(&engine.numForceUpdatingReqs, 1)
  331. }
  332. hash := murmur.Sum32(fmt.Sprintf("%d%s", docId, data.Content))
  333. engine.segmenterChan <- segmenterReq{
  334. docId: docId, hash: hash, data: data, forceUpdate: forceUpdate}
  335. }
  336. // RemoveDoc remove the document from the index
  337. // 将文档从索引中删除
  338. //
  339. // 输入参数:
  340. // docId 标识文档编号,必须唯一,docId == 0 表示非法文档(用于强制刷新索引),[1, +oo) 表示合法文档
  341. // forceUpdate 是否强制刷新 cache,如果设为 true,则尽快删除索引,否则等待 cache 满之后一次全量删除
  342. //
  343. // 注意:
  344. // 1. 这个函数是线程安全的,请尽可能并发调用以提高索引速度
  345. // 2. 这个函数调用是非同步的,也就是说在函数返回时有可能文档还没有加入索引中,因此
  346. // 如果立刻调用 Search 可能无法查询到这个文档。强制刷新索引请调用 FlushIndex 函数。
  347. func (engine *Engine) RemoveDoc(docId uint64, forceUpdate ...bool) {
  348. var force bool
  349. if len(forceUpdate) > 0 {
  350. force = forceUpdate[0]
  351. }
  352. if !engine.initialized {
  353. log.Fatal("The engine must be initialized first.")
  354. }
  355. if docId != 0 {
  356. atomic.AddUint64(&engine.numRemovingReqs, 1)
  357. }
  358. if force {
  359. atomic.AddUint64(&engine.numForceUpdatingReqs, 1)
  360. }
  361. for shard := 0; shard < engine.initOptions.NumShards; shard++ {
  362. engine.indexerRemoveDocChans[shard] <- indexerRemoveDocReq{
  363. docId: docId, forceUpdate: force}
  364. if docId == 0 {
  365. continue
  366. }
  367. engine.rankerRemoveDocChans[shard] <- rankerRemoveDocReq{docId: docId}
  368. }
  369. if engine.initOptions.UseStore && docId != 0 {
  370. // 从数据库中删除
  371. hash := murmur.Sum32(fmt.Sprintf("%d", docId)) %
  372. uint32(engine.initOptions.StoreShards)
  373. go engine.storeRemoveDocWorker(docId, hash)
  374. }
  375. }
  376. // // 获取文本的分词结果
  377. // func (engine *Engine) Tokens(text []byte) (tokens []string) {
  378. // querySegments := engine.segmenter.Segment(text)
  379. // for _, s := range querySegments {
  380. // token := s.Token().Text()
  381. // if !engine.stopTokens.IsStopToken(token) {
  382. // tokens = append(tokens, token)
  383. // }
  384. // }
  385. // return tokens
  386. // }
  387. // Segment get the word segmentation result of the text
  388. // 获取文本的分词结果, 只分词与过滤弃用词
  389. func (engine *Engine) Segment(content string) (keywords []string) {
  390. segments := engine.segmenter.ModeSegment([]byte(content),
  391. engine.initOptions.GseMode)
  392. for _, segment := range segments {
  393. token := segment.Token().Text()
  394. if !engine.stopTokens.IsStopToken(token) {
  395. keywords = append(keywords, token)
  396. }
  397. }
  398. return
  399. }
  400. // Tokens get the engine tokens
  401. func (engine *Engine) Tokens(request types.SearchReq) (tokens []string) {
  402. // 收集关键词
  403. // tokens := []string{}
  404. if request.Text != "" {
  405. request.Text = strings.ToLower(request.Text)
  406. if engine.initOptions.NotUseGse {
  407. tokens = strings.Split(request.Text, " ")
  408. } else {
  409. // querySegments := engine.segmenter.Segment([]byte(request.Text))
  410. // tokens = engine.Tokens([]byte(request.Text))
  411. tokens = engine.Segment(request.Text)
  412. }
  413. // 叠加 tokens
  414. for _, t := range request.Tokens {
  415. tokens = append(tokens, t)
  416. }
  417. return
  418. }
  419. for _, t := range request.Tokens {
  420. tokens = append(tokens, t)
  421. }
  422. return
  423. }
  424. // RankId rank docs by types.ScoredIDs
  425. func (engine *Engine) RankId(request types.SearchReq, RankOpts types.RankOpts,
  426. tokens []string, rankerReturnChan chan rankerReturnReq) (
  427. output types.SearchResp) {
  428. // 从通信通道读取排序器的输出
  429. numDocs := 0
  430. var rankOutput types.ScoredIDs
  431. // var rankOutput interface{}
  432. //**********/ begin
  433. timeout := request.Timeout
  434. isTimeout := false
  435. if timeout <= 0 {
  436. // 不设置超时
  437. for shard := 0; shard < engine.initOptions.NumShards; shard++ {
  438. rankerOutput := <-rankerReturnChan
  439. if !request.CountDocsOnly {
  440. if rankerOutput.docs != nil {
  441. for _, doc := range rankerOutput.docs.(types.ScoredIDs) {
  442. rankOutput = append(rankOutput, doc)
  443. }
  444. }
  445. }
  446. numDocs += rankerOutput.numDocs
  447. }
  448. } else {
  449. // 设置超时
  450. deadline := time.Now().Add(time.Nanosecond *
  451. time.Duration(NumNanosecondsInAMillisecond*request.Timeout))
  452. for shard := 0; shard < engine.initOptions.NumShards; shard++ {
  453. select {
  454. case rankerOutput := <-rankerReturnChan:
  455. if !request.CountDocsOnly {
  456. if rankerOutput.docs != nil {
  457. for _, doc := range rankerOutput.docs.(types.ScoredIDs) {
  458. rankOutput = append(rankOutput, doc)
  459. }
  460. }
  461. }
  462. numDocs += rankerOutput.numDocs
  463. case <-time.After(deadline.Sub(time.Now())):
  464. isTimeout = true
  465. break
  466. }
  467. }
  468. }
  469. // 再排序
  470. if !request.CountDocsOnly && !request.Orderless {
  471. if RankOpts.ReverseOrder {
  472. sort.Sort(sort.Reverse(rankOutput))
  473. } else {
  474. sort.Sort(rankOutput)
  475. }
  476. }
  477. // 准备输出
  478. output.Tokens = tokens
  479. // 仅当 CountDocsOnly 为 false 时才充填 output.Docs
  480. if !request.CountDocsOnly {
  481. if request.Orderless {
  482. // 无序状态无需对 Offset 截断
  483. output.Docs = rankOutput
  484. } else {
  485. var start, end int
  486. if RankOpts.MaxOutputs == 0 {
  487. start = utils.MinInt(RankOpts.OutputOffset, len(rankOutput))
  488. end = len(rankOutput)
  489. } else {
  490. start = utils.MinInt(RankOpts.OutputOffset, len(rankOutput))
  491. end = utils.MinInt(start+RankOpts.MaxOutputs, len(rankOutput))
  492. }
  493. output.Docs = rankOutput[start:end]
  494. }
  495. }
  496. output.NumDocs = numDocs
  497. output.Timeout = isTimeout
  498. return
  499. }
  500. // Ranks rank docs by types.ScoredDocs
  501. func (engine *Engine) Ranks(request types.SearchReq, RankOpts types.RankOpts,
  502. tokens []string, rankerReturnChan chan rankerReturnReq) (
  503. output types.SearchResp) {
  504. // 从通信通道读取排序器的输出
  505. numDocs := 0
  506. rankOutput := types.ScoredDocs{}
  507. //**********/ begin
  508. timeout := request.Timeout
  509. isTimeout := false
  510. if timeout <= 0 {
  511. // 不设置超时
  512. for shard := 0; shard < engine.initOptions.NumShards; shard++ {
  513. rankerOutput := <-rankerReturnChan
  514. if !request.CountDocsOnly {
  515. if rankerOutput.docs != nil {
  516. for _, doc := range rankerOutput.docs.(types.ScoredDocs) {
  517. rankOutput = append(rankOutput, doc)
  518. }
  519. }
  520. }
  521. numDocs += rankerOutput.numDocs
  522. }
  523. } else {
  524. // 设置超时
  525. deadline := time.Now().Add(time.Nanosecond *
  526. time.Duration(NumNanosecondsInAMillisecond*request.Timeout))
  527. for shard := 0; shard < engine.initOptions.NumShards; shard++ {
  528. select {
  529. case rankerOutput := <-rankerReturnChan:
  530. if !request.CountDocsOnly {
  531. if rankerOutput.docs != nil {
  532. for _, doc := range rankerOutput.docs.(types.ScoredDocs) {
  533. rankOutput = append(rankOutput, doc)
  534. }
  535. }
  536. }
  537. numDocs += rankerOutput.numDocs
  538. case <-time.After(deadline.Sub(time.Now())):
  539. isTimeout = true
  540. break
  541. }
  542. }
  543. }
  544. // 再排序
  545. if !request.CountDocsOnly && !request.Orderless {
  546. if RankOpts.ReverseOrder {
  547. sort.Sort(sort.Reverse(rankOutput))
  548. } else {
  549. sort.Sort(rankOutput)
  550. }
  551. }
  552. // 准备输出
  553. output.Tokens = tokens
  554. // 仅当 CountDocsOnly 为 false 时才充填 output.Docs
  555. if !request.CountDocsOnly {
  556. if request.Orderless {
  557. // 无序状态无需对 Offset 截断
  558. output.Docs = rankOutput
  559. } else {
  560. var start, end int
  561. if RankOpts.MaxOutputs == 0 {
  562. start = utils.MinInt(RankOpts.OutputOffset, len(rankOutput))
  563. end = len(rankOutput)
  564. } else {
  565. start = utils.MinInt(RankOpts.OutputOffset, len(rankOutput))
  566. end = utils.MinInt(start+RankOpts.MaxOutputs, len(rankOutput))
  567. }
  568. output.Docs = rankOutput[start:end]
  569. }
  570. }
  571. output.NumDocs = numDocs
  572. output.Timeout = isTimeout
  573. return
  574. }
  575. // Search find the document that satisfies the search criteria.
  576. // This function is thread safe
  577. // 查找满足搜索条件的文档,此函数线程安全
  578. func (engine *Engine) Search(request types.SearchReq) (output types.SearchResp) {
  579. if !engine.initialized {
  580. log.Fatal("The engine must be initialized first.")
  581. }
  582. tokens := engine.Tokens(request)
  583. var RankOpts types.RankOpts
  584. if request.RankOpts == nil {
  585. RankOpts = *engine.initOptions.DefaultRankOpts
  586. } else {
  587. RankOpts = *request.RankOpts
  588. }
  589. if RankOpts.ScoringCriteria == nil {
  590. RankOpts.ScoringCriteria = engine.initOptions.DefaultRankOpts.ScoringCriteria
  591. }
  592. // 建立排序器返回的通信通道
  593. rankerReturnChan := make(
  594. chan rankerReturnReq, engine.initOptions.NumShards)
  595. // 生成查找请求
  596. lookupRequest := indexerLookupReq{
  597. countDocsOnly: request.CountDocsOnly,
  598. tokens: tokens,
  599. labels: request.Labels,
  600. docIds: request.DocIds,
  601. options: RankOpts,
  602. rankerReturnChan: rankerReturnChan,
  603. orderless: request.Orderless,
  604. logic: request.Logic,
  605. }
  606. // 向索引器发送查找请求
  607. for shard := 0; shard < engine.initOptions.NumShards; shard++ {
  608. engine.indexerLookupChans[shard] <- lookupRequest
  609. }
  610. if engine.initOptions.IDOnly {
  611. output = engine.RankId(request, RankOpts, tokens, rankerReturnChan)
  612. return
  613. }
  614. output = engine.Ranks(request, RankOpts, tokens, rankerReturnChan)
  615. return
  616. }
  617. // Flush block wait until all indexes are added
  618. // 阻塞等待直到所有索引添加完毕
  619. func (engine *Engine) Flush() {
  620. for {
  621. runtime.Gosched()
  622. engine.loc.RLock()
  623. inxd := engine.numIndexingReqs == engine.numDocsIndexed
  624. rmd := engine.numRemovingReqs*uint64(engine.initOptions.NumShards) ==
  625. engine.numDocsRemoved
  626. stored := !engine.initOptions.UseStore || engine.numIndexingReqs ==
  627. engine.numDocsStored
  628. engine.loc.RUnlock()
  629. if inxd && rmd && stored {
  630. // 保证 CHANNEL 中 REQUESTS 全部被执行完
  631. break
  632. }
  633. }
  634. // 强制更新,保证其为最后的请求
  635. engine.IndexDoc(0, types.DocData{}, true)
  636. for {
  637. runtime.Gosched()
  638. engine.loc.RLock()
  639. forced := engine.numForceUpdatingReqs*uint64(engine.initOptions.NumShards) ==
  640. engine.numDocsForceUpdated
  641. engine.loc.RUnlock()
  642. if forced {
  643. return
  644. }
  645. }
  646. }
  647. // FlushIndex block wait until all indexes are added
  648. // 阻塞等待直到所有索引添加完毕
  649. func (engine *Engine) FlushIndex() {
  650. engine.Flush()
  651. }
  652. // Close close the engine
  653. // 关闭引擎
  654. func (engine *Engine) Close() {
  655. engine.Flush()
  656. if engine.initOptions.UseStore {
  657. for _, db := range engine.dbs {
  658. db.Close()
  659. }
  660. }
  661. }
  662. // 从文本hash得到要分配到的 shard
  663. func (engine *Engine) getShard(hash uint32) int {
  664. return int(hash - hash/uint32(engine.initOptions.NumShards)*
  665. uint32(engine.initOptions.NumShards))
  666. }