store_worker.go 2.3 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697
  1. // Copyright 2013 Hui Chen
  2. // Copyright 2016 ego authors
  3. //
  4. // Licensed under the Apache License, Version 2.0 (the "License"): you may
  5. // not use this file except in compliance with the License. You may obtain
  6. // a copy of the License at
  7. //
  8. // http://www.apache.org/licenses/LICENSE-2.0
  9. //
  10. // Unless required by applicable law or agreed to in writing, software
  11. // distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
  12. // WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
  13. // License for the specific language governing permissions and limitations
  14. // under the License.
  15. package riot
  16. import (
  17. "bytes"
  18. "encoding/binary"
  19. "encoding/gob"
  20. "sync/atomic"
  21. "github.com/go-ego/riot/types"
  22. )
  23. type storeIndexDocReq struct {
  24. docId uint64
  25. data types.DocData
  26. // data types.DocumentIndexData
  27. }
  28. func (engine *Engine) storeIndexDocWorker(shard int) {
  29. for {
  30. request := <-engine.storeIndexDocChans[shard]
  31. // 得到 key
  32. b := make([]byte, 10)
  33. length := binary.PutUvarint(b, request.docId)
  34. // 得到 value
  35. var buf bytes.Buffer
  36. enc := gob.NewEncoder(&buf)
  37. err := enc.Encode(request.data)
  38. if err != nil {
  39. atomic.AddUint64(&engine.numDocsStored, 1)
  40. continue
  41. }
  42. // has, err := engine.dbs[shard].Has(b[0:length])
  43. // if err != nil {
  44. // log.Println("engine.dbs[shard].Has(b[0:length]) ", err)
  45. // }
  46. // if has {
  47. // engine.dbs[shard].Delete(b[0:length])
  48. // }
  49. // 将 key-value 写入数据库
  50. engine.dbs[shard].Set(b[0:length], buf.Bytes())
  51. engine.loc.Lock()
  52. atomic.AddUint64(&engine.numDocsStored, 1)
  53. engine.loc.Unlock()
  54. }
  55. }
  56. func (engine *Engine) storeRemoveDocWorker(docId uint64, shard uint32) {
  57. // 得到 key
  58. b := make([]byte, 10)
  59. length := binary.PutUvarint(b, docId)
  60. // 从数据库删除该 key
  61. engine.dbs[shard].Delete(b[0:length])
  62. }
  63. // storageInitWorker persistent storage init worker
  64. func (engine *Engine) storeInitWorker(shard int) {
  65. engine.dbs[shard].ForEach(func(k, v []byte) error {
  66. key, value := k, v
  67. // 得到 docID
  68. docId, _ := binary.Uvarint(key)
  69. // 得到 data
  70. buf := bytes.NewReader(value)
  71. dec := gob.NewDecoder(buf)
  72. var data types.DocData
  73. err := dec.Decode(&data)
  74. if err == nil {
  75. // 添加索引
  76. engine.internalIndexDoc(docId, data, false)
  77. }
  78. return nil
  79. })
  80. engine.storeInitChan <- true
  81. }