sliced_scroll.go 4.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161
  1. // Copyright 2012-present Oliver Eilhard. All rights reserved.
  2. // Use of this source code is governed by a MIT-license.
  3. // See http://olivere.mit-license.org/license.txt for details.
  4. // SlicedScroll illustrates scrolling through a set of documents
  5. // in parallel. It uses the sliced scrolling feature introduced
  6. // in Elasticsearch 5.0 to create a number of Goroutines, each
  7. // scrolling through a slice of the total results. A second goroutine
  8. // receives the hits from the set of goroutines scrolling through
  9. // the slices and simply counts the total number and the number of
  10. // documents received per slice.
  11. //
  12. // The speedup of sliced scrolling can be significant but is very
  13. // dependent on the specific use case.
  14. //
  15. // See https://www.elastic.co/guide/en/elasticsearch/reference/5.2/search-request-scroll.html#sliced-scroll
  16. // for details on sliced scrolling in Elasticsearch.
  17. //
  18. // Example
  19. //
  20. // Scroll with 4 parallel slices through an index called "products".
  21. // Use "_uid" as the default field:
  22. //
  23. // sliced_scroll -index=products -n=4
  24. //
  25. package main
  26. import (
  27. "context"
  28. "flag"
  29. "fmt"
  30. "io"
  31. "log"
  32. "sync"
  33. "sync/atomic"
  34. "time"
  35. "golang.org/x/sync/errgroup"
  36. "gopkg.in/olivere/elastic.v5"
  37. )
  38. func main() {
  39. var (
  40. url = flag.String("url", "http://localhost:9200", "Elasticsearch URL")
  41. index = flag.String("index", "", "Elasticsearch index name")
  42. typ = flag.String("type", "", "Elasticsearch type name")
  43. field = flag.String("field", "", "Slice field (must be numeric)")
  44. numSlices = flag.Int("n", 2, "Number of slices to use in parallel")
  45. sniff = flag.Bool("sniff", true, "Enable or disable sniffing")
  46. )
  47. flag.Parse()
  48. log.SetFlags(0)
  49. if *url == "" {
  50. log.Fatal("missing url parameter")
  51. }
  52. if *index == "" {
  53. log.Fatal("missing index parameter")
  54. }
  55. if *numSlices <= 0 {
  56. log.Fatal("n must be greater than zero")
  57. }
  58. // Create an Elasticsearch client
  59. client, err := elastic.NewClient(elastic.SetURL(*url), elastic.SetSniff(*sniff))
  60. if err != nil {
  61. log.Fatal(err)
  62. }
  63. // Setup a group of goroutines from the excellent errgroup package
  64. g, ctx := errgroup.WithContext(context.TODO())
  65. // Hits channel will be sent to from the first set of goroutines and consumed by the second
  66. type hit struct {
  67. Slice int
  68. Hit elastic.SearchHit
  69. }
  70. hitsc := make(chan hit)
  71. begin := time.Now()
  72. // Start a number of goroutines to parallelize scrolling
  73. var wg sync.WaitGroup
  74. for i := 0; i < *numSlices; i++ {
  75. wg.Add(1)
  76. slice := i
  77. // Prepare the query
  78. var query elastic.Query
  79. if *typ == "" {
  80. query = elastic.NewMatchAllQuery()
  81. } else {
  82. query = elastic.NewTypeQuery(*typ)
  83. }
  84. // Prepare the slice
  85. sliceQuery := elastic.NewSliceQuery().Id(i).Max(*numSlices)
  86. if *field != "" {
  87. sliceQuery = sliceQuery.Field(*field)
  88. }
  89. // Start goroutine for this sliced scroll
  90. g.Go(func() error {
  91. defer wg.Done()
  92. svc := client.Scroll(*index).Query(query).Slice(sliceQuery)
  93. for {
  94. res, err := svc.Do(ctx)
  95. if err == io.EOF {
  96. break
  97. }
  98. if err != nil {
  99. return err
  100. }
  101. for _, searchHit := range res.Hits.Hits {
  102. // Pass the hit to the hits channel, which will be consumed below
  103. select {
  104. case hitsc <- hit{Slice: slice, Hit: *searchHit}:
  105. case <-ctx.Done():
  106. return ctx.Err()
  107. }
  108. }
  109. }
  110. return nil
  111. })
  112. }
  113. go func() {
  114. // Wait until all scrolling is done
  115. wg.Wait()
  116. close(hitsc)
  117. }()
  118. // Second goroutine will consume the hits sent from the workers in first set of goroutines
  119. var total uint64
  120. totals := make([]uint64, *numSlices)
  121. g.Go(func() error {
  122. for hit := range hitsc {
  123. // We simply count the hits here.
  124. atomic.AddUint64(&totals[hit.Slice], 1)
  125. current := atomic.AddUint64(&total, 1)
  126. sec := int(time.Since(begin).Seconds())
  127. fmt.Printf("%8d | %02d:%02d\r", current, sec/60, sec%60)
  128. select {
  129. default:
  130. case <-ctx.Done():
  131. return ctx.Err()
  132. }
  133. }
  134. return nil
  135. })
  136. // Wait until all goroutines are finished
  137. if err := g.Wait(); err != nil {
  138. log.Fatal(err)
  139. }
  140. fmt.Printf("Scrolled through a total of %d documents in %v\n", total, time.Since(begin))
  141. for i := 0; i < *numSlices; i++ {
  142. fmt.Printf("Slice %2d received %d documents\n", i, totals[i])
  143. }
  144. }