123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161 |
- // Copyright 2012-present Oliver Eilhard. All rights reserved.
- // Use of this source code is governed by a MIT-license.
- // See http://olivere.mit-license.org/license.txt for details.
- // SlicedScroll illustrates scrolling through a set of documents
- // in parallel. It uses the sliced scrolling feature introduced
- // in Elasticsearch 5.0 to create a number of Goroutines, each
- // scrolling through a slice of the total results. A second goroutine
- // receives the hits from the set of goroutines scrolling through
- // the slices and simply counts the total number and the number of
- // documents received per slice.
- //
- // The speedup of sliced scrolling can be significant but is very
- // dependent on the specific use case.
- //
- // See https://www.elastic.co/guide/en/elasticsearch/reference/5.2/search-request-scroll.html#sliced-scroll
- // for details on sliced scrolling in Elasticsearch.
- //
- // Example
- //
- // Scroll with 4 parallel slices through an index called "products".
- // Use "_uid" as the default field:
- //
- // sliced_scroll -index=products -n=4
- //
- package main
- import (
- "context"
- "flag"
- "fmt"
- "io"
- "log"
- "sync"
- "sync/atomic"
- "time"
- "golang.org/x/sync/errgroup"
- "gopkg.in/olivere/elastic.v5"
- )
- func main() {
- var (
- url = flag.String("url", "http://localhost:9200", "Elasticsearch URL")
- index = flag.String("index", "", "Elasticsearch index name")
- typ = flag.String("type", "", "Elasticsearch type name")
- field = flag.String("field", "", "Slice field (must be numeric)")
- numSlices = flag.Int("n", 2, "Number of slices to use in parallel")
- sniff = flag.Bool("sniff", true, "Enable or disable sniffing")
- )
- flag.Parse()
- log.SetFlags(0)
- if *url == "" {
- log.Fatal("missing url parameter")
- }
- if *index == "" {
- log.Fatal("missing index parameter")
- }
- if *numSlices <= 0 {
- log.Fatal("n must be greater than zero")
- }
- // Create an Elasticsearch client
- client, err := elastic.NewClient(elastic.SetURL(*url), elastic.SetSniff(*sniff))
- if err != nil {
- log.Fatal(err)
- }
- // Setup a group of goroutines from the excellent errgroup package
- g, ctx := errgroup.WithContext(context.TODO())
- // Hits channel will be sent to from the first set of goroutines and consumed by the second
- type hit struct {
- Slice int
- Hit elastic.SearchHit
- }
- hitsc := make(chan hit)
- begin := time.Now()
- // Start a number of goroutines to parallelize scrolling
- var wg sync.WaitGroup
- for i := 0; i < *numSlices; i++ {
- wg.Add(1)
- slice := i
- // Prepare the query
- var query elastic.Query
- if *typ == "" {
- query = elastic.NewMatchAllQuery()
- } else {
- query = elastic.NewTypeQuery(*typ)
- }
- // Prepare the slice
- sliceQuery := elastic.NewSliceQuery().Id(i).Max(*numSlices)
- if *field != "" {
- sliceQuery = sliceQuery.Field(*field)
- }
- // Start goroutine for this sliced scroll
- g.Go(func() error {
- defer wg.Done()
- svc := client.Scroll(*index).Query(query).Slice(sliceQuery)
- for {
- res, err := svc.Do(ctx)
- if err == io.EOF {
- break
- }
- if err != nil {
- return err
- }
- for _, searchHit := range res.Hits.Hits {
- // Pass the hit to the hits channel, which will be consumed below
- select {
- case hitsc <- hit{Slice: slice, Hit: *searchHit}:
- case <-ctx.Done():
- return ctx.Err()
- }
- }
- }
- return nil
- })
- }
- go func() {
- // Wait until all scrolling is done
- wg.Wait()
- close(hitsc)
- }()
- // Second goroutine will consume the hits sent from the workers in first set of goroutines
- var total uint64
- totals := make([]uint64, *numSlices)
- g.Go(func() error {
- for hit := range hitsc {
- // We simply count the hits here.
- atomic.AddUint64(&totals[hit.Slice], 1)
- current := atomic.AddUint64(&total, 1)
- sec := int(time.Since(begin).Seconds())
- fmt.Printf("%8d | %02d:%02d\r", current, sec/60, sec%60)
- select {
- default:
- case <-ctx.Done():
- return ctx.Err()
- }
- }
- return nil
- })
- // Wait until all goroutines are finished
- if err := g.Wait(); err != nil {
- log.Fatal(err)
- }
- fmt.Printf("Scrolled through a total of %d documents in %v\n", total, time.Since(begin))
- for i := 0; i < *numSlices; i++ {
- fmt.Printf("Slice %2d received %d documents\n", i, totals[i])
- }
- }
|