12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697 |
- // Copyright 2013 Hui Chen
- // Copyright 2016 ego authors
- //
- // Licensed under the Apache License, Version 2.0 (the "License"): you may
- // not use this file except in compliance with the License. You may obtain
- // a copy of the License at
- //
- // http://www.apache.org/licenses/LICENSE-2.0
- //
- // Unless required by applicable law or agreed to in writing, software
- // distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- // WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- // License for the specific language governing permissions and limitations
- // under the License.
- package riot
- import (
- "bytes"
- "encoding/binary"
- "encoding/gob"
- "sync/atomic"
- "github.com/go-ego/riot/types"
- )
- type storeIndexDocReq struct {
- docId uint64
- data types.DocData
- // data types.DocumentIndexData
- }
- func (engine *Engine) storeIndexDocWorker(shard int) {
- for {
- request := <-engine.storeIndexDocChans[shard]
- // 得到 key
- b := make([]byte, 10)
- length := binary.PutUvarint(b, request.docId)
- // 得到 value
- var buf bytes.Buffer
- enc := gob.NewEncoder(&buf)
- err := enc.Encode(request.data)
- if err != nil {
- atomic.AddUint64(&engine.numDocsStored, 1)
- continue
- }
- // has, err := engine.dbs[shard].Has(b[0:length])
- // if err != nil {
- // log.Println("engine.dbs[shard].Has(b[0:length]) ", err)
- // }
- // if has {
- // engine.dbs[shard].Delete(b[0:length])
- // }
- // 将 key-value 写入数据库
- engine.dbs[shard].Set(b[0:length], buf.Bytes())
- engine.loc.Lock()
- atomic.AddUint64(&engine.numDocsStored, 1)
- engine.loc.Unlock()
- }
- }
- func (engine *Engine) storeRemoveDocWorker(docId uint64, shard uint32) {
- // 得到 key
- b := make([]byte, 10)
- length := binary.PutUvarint(b, docId)
- // 从数据库删除该 key
- engine.dbs[shard].Delete(b[0:length])
- }
- // storageInitWorker persistent storage init worker
- func (engine *Engine) storeInitWorker(shard int) {
- engine.dbs[shard].ForEach(func(k, v []byte) error {
- key, value := k, v
- // 得到 docID
- docId, _ := binary.Uvarint(key)
- // 得到 data
- buf := bytes.NewReader(value)
- dec := gob.NewDecoder(buf)
- var data types.DocData
- err := dec.Decode(&data)
- if err == nil {
- // 添加索引
- engine.internalIndexDoc(docId, data, false)
- }
- return nil
- })
- engine.storeInitChan <- true
- }
|