cluster-test.go 9.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361
  1. // Copyright 2012-present Oliver Eilhard. All rights reserved.
  2. // Use of this source code is governed by a MIT-license.
  3. // See http://olivere.mit-license.org/license.txt for details.
  4. package main
  5. import (
  6. "context"
  7. "encoding/json"
  8. "errors"
  9. "flag"
  10. "fmt"
  11. "log"
  12. "math/rand"
  13. "os"
  14. "runtime"
  15. "strings"
  16. "sync/atomic"
  17. "time"
  18. elastic "gopkg.in/olivere/elastic.v5"
  19. )
  20. type Tweet struct {
  21. User string `json:"user"`
  22. Message string `json:"message"`
  23. Retweets int `json:"retweets"`
  24. Image string `json:"image,omitempty"`
  25. Created time.Time `json:"created,omitempty"`
  26. Tags []string `json:"tags,omitempty"`
  27. Location string `json:"location,omitempty"`
  28. Suggest *elastic.SuggestField `json:"suggest_field,omitempty"`
  29. }
  30. var (
  31. nodes = flag.String("nodes", "", "comma-separated list of ES URLs (e.g. 'http://192.168.2.10:9200,http://192.168.2.11:9200')")
  32. n = flag.Int("n", 5, "number of goroutines that run searches")
  33. index = flag.String("index", "twitter", "name of ES index to use")
  34. errorlogfile = flag.String("errorlog", "", "error log file")
  35. infologfile = flag.String("infolog", "", "info log file")
  36. tracelogfile = flag.String("tracelog", "", "trace log file")
  37. retries = flag.Int("retries", 0, "number of retries")
  38. sniff = flag.Bool("sniff", elastic.DefaultSnifferEnabled, "enable or disable sniffer")
  39. sniffer = flag.Duration("sniffer", elastic.DefaultSnifferInterval, "sniffer interval")
  40. healthcheck = flag.Bool("healthcheck", elastic.DefaultHealthcheckEnabled, "enable or disable healthchecks")
  41. healthchecker = flag.Duration("healthchecker", elastic.DefaultHealthcheckInterval, "healthcheck interval")
  42. )
  43. func main() {
  44. flag.Parse()
  45. runtime.GOMAXPROCS(runtime.NumCPU())
  46. if *nodes == "" {
  47. log.Fatal("no nodes specified")
  48. }
  49. urls := strings.SplitN(*nodes, ",", -1)
  50. testcase, err := NewTestCase(*index, urls)
  51. if err != nil {
  52. log.Fatal(err)
  53. }
  54. testcase.SetErrorLogFile(*errorlogfile)
  55. testcase.SetInfoLogFile(*infologfile)
  56. testcase.SetTraceLogFile(*tracelogfile)
  57. testcase.SetMaxRetries(*retries)
  58. testcase.SetHealthcheck(*healthcheck)
  59. testcase.SetHealthcheckInterval(*healthchecker)
  60. testcase.SetSniff(*sniff)
  61. testcase.SetSnifferInterval(*sniffer)
  62. if err := testcase.Run(*n); err != nil {
  63. log.Fatal(err)
  64. }
  65. select {}
  66. }
  67. type RunInfo struct {
  68. Success bool
  69. }
  70. type TestCase struct {
  71. nodes []string
  72. client *elastic.Client
  73. runs int64
  74. failures int64
  75. runCh chan RunInfo
  76. index string
  77. errorlogfile string
  78. infologfile string
  79. tracelogfile string
  80. maxRetries int
  81. healthcheck bool
  82. healthcheckInterval time.Duration
  83. sniff bool
  84. snifferInterval time.Duration
  85. }
  86. func NewTestCase(index string, nodes []string) (*TestCase, error) {
  87. if index == "" {
  88. return nil, errors.New("no index name specified")
  89. }
  90. return &TestCase{
  91. index: index,
  92. nodes: nodes,
  93. runCh: make(chan RunInfo),
  94. }, nil
  95. }
  96. func (t *TestCase) SetIndex(name string) {
  97. t.index = name
  98. }
  99. func (t *TestCase) SetErrorLogFile(name string) {
  100. t.errorlogfile = name
  101. }
  102. func (t *TestCase) SetInfoLogFile(name string) {
  103. t.infologfile = name
  104. }
  105. func (t *TestCase) SetTraceLogFile(name string) {
  106. t.tracelogfile = name
  107. }
  108. func (t *TestCase) SetMaxRetries(n int) {
  109. t.maxRetries = n
  110. }
  111. func (t *TestCase) SetSniff(enabled bool) {
  112. t.sniff = enabled
  113. }
  114. func (t *TestCase) SetSnifferInterval(d time.Duration) {
  115. t.snifferInterval = d
  116. }
  117. func (t *TestCase) SetHealthcheck(enabled bool) {
  118. t.healthcheck = enabled
  119. }
  120. func (t *TestCase) SetHealthcheckInterval(d time.Duration) {
  121. t.healthcheckInterval = d
  122. }
  123. func (t *TestCase) Run(n int) error {
  124. if err := t.setup(); err != nil {
  125. return err
  126. }
  127. for i := 1; i < n; i++ {
  128. go t.search()
  129. }
  130. go t.monitor()
  131. return nil
  132. }
  133. func (t *TestCase) monitor() {
  134. print := func() {
  135. fmt.Printf("\033[32m%5d\033[0m; \033[31m%5d\033[0m: %s%s\r", t.runs, t.failures, t.client.String(), " ")
  136. }
  137. for {
  138. select {
  139. case run := <-t.runCh:
  140. atomic.AddInt64(&t.runs, 1)
  141. if !run.Success {
  142. atomic.AddInt64(&t.failures, 1)
  143. fmt.Println()
  144. }
  145. print()
  146. case <-time.After(5 * time.Second):
  147. // Print stats after some inactivity
  148. print()
  149. break
  150. }
  151. }
  152. }
  153. func (t *TestCase) setup() error {
  154. var errorlogger *log.Logger
  155. if t.errorlogfile != "" {
  156. f, err := os.OpenFile(t.errorlogfile, os.O_RDWR|os.O_CREATE|os.O_APPEND, 0664)
  157. if err != nil {
  158. return err
  159. }
  160. errorlogger = log.New(f, "", log.Ltime|log.Lmicroseconds|log.Lshortfile)
  161. }
  162. var infologger *log.Logger
  163. if t.infologfile != "" {
  164. f, err := os.OpenFile(t.infologfile, os.O_RDWR|os.O_CREATE|os.O_APPEND, 0664)
  165. if err != nil {
  166. return err
  167. }
  168. infologger = log.New(f, "", log.LstdFlags)
  169. }
  170. // Trace request and response details like this
  171. var tracelogger *log.Logger
  172. if t.tracelogfile != "" {
  173. f, err := os.OpenFile(t.tracelogfile, os.O_RDWR|os.O_CREATE|os.O_APPEND, 0664)
  174. if err != nil {
  175. return err
  176. }
  177. tracelogger = log.New(f, "", log.LstdFlags)
  178. }
  179. client, err := elastic.NewClient(
  180. elastic.SetURL(t.nodes...),
  181. elastic.SetErrorLog(errorlogger),
  182. elastic.SetInfoLog(infologger),
  183. elastic.SetTraceLog(tracelogger),
  184. elastic.SetMaxRetries(t.maxRetries),
  185. elastic.SetSniff(t.sniff),
  186. elastic.SetSnifferInterval(t.snifferInterval),
  187. elastic.SetHealthcheck(t.healthcheck),
  188. elastic.SetHealthcheckInterval(t.healthcheckInterval))
  189. if err != nil {
  190. // Handle error
  191. return err
  192. }
  193. t.client = client
  194. ctx := context.Background()
  195. // Use the IndexExists service to check if a specified index exists.
  196. exists, err := t.client.IndexExists(t.index).Do(ctx)
  197. if err != nil {
  198. return err
  199. }
  200. if exists {
  201. deleteIndex, err := t.client.DeleteIndex(t.index).Do(ctx)
  202. if err != nil {
  203. return err
  204. }
  205. if !deleteIndex.Acknowledged {
  206. return errors.New("delete index not acknowledged")
  207. }
  208. }
  209. // Create a new index.
  210. createIndex, err := t.client.CreateIndex(t.index).Do(ctx)
  211. if err != nil {
  212. return err
  213. }
  214. if !createIndex.Acknowledged {
  215. return errors.New("create index not acknowledged")
  216. }
  217. // Index a tweet (using JSON serialization)
  218. tweet1 := Tweet{User: "olivere", Message: "Take Five", Retweets: 0}
  219. _, err = t.client.Index().
  220. Index(t.index).
  221. Type("tweet").
  222. Id("1").
  223. BodyJson(tweet1).
  224. Do(ctx)
  225. if err != nil {
  226. return err
  227. }
  228. // Index a second tweet (by string)
  229. tweet2 := `{"user" : "olivere", "message" : "It's a Raggy Waltz"}`
  230. _, err = t.client.Index().
  231. Index(t.index).
  232. Type("tweet").
  233. Id("2").
  234. BodyString(tweet2).
  235. Do(ctx)
  236. if err != nil {
  237. return err
  238. }
  239. // Flush to make sure the documents got written.
  240. _, err = t.client.Flush().Index(t.index).Do(ctx)
  241. if err != nil {
  242. return err
  243. }
  244. return nil
  245. }
  246. func (t *TestCase) search() {
  247. ctx := context.Background()
  248. // Loop forever to check for connection issues
  249. for {
  250. // Get tweet with specified ID
  251. get1, err := t.client.Get().
  252. Index(t.index).
  253. Type("tweet").
  254. Id("1").
  255. Do(ctx)
  256. if err != nil {
  257. //failf("Get failed: %v", err)
  258. t.runCh <- RunInfo{Success: false}
  259. continue
  260. }
  261. if !get1.Found {
  262. //log.Printf("Document %s not found\n", "1")
  263. //fmt.Printf("Got document %s in version %d from index %s, type %s\n", get1.Id, get1.Version, get1.Index, get1.Type)
  264. t.runCh <- RunInfo{Success: false}
  265. continue
  266. }
  267. // Search with a term query
  268. searchResult, err := t.client.Search().
  269. Index(t.index). // search in index t.index
  270. Query(elastic.NewTermQuery("user", "olivere")). // specify the query
  271. Sort("user", true). // sort by "user" field, ascending
  272. From(0).Size(10). // take documents 0-9
  273. Pretty(true). // pretty print request and response JSON
  274. Do(ctx) // execute
  275. if err != nil {
  276. //failf("Search failed: %v\n", err)
  277. t.runCh <- RunInfo{Success: false}
  278. continue
  279. }
  280. // searchResult is of type SearchResult and returns hits, suggestions,
  281. // and all kinds of other information from Elasticsearch.
  282. //fmt.Printf("Query took %d milliseconds\n", searchResult.TookInMillis)
  283. // Number of hits
  284. if searchResult.Hits.TotalHits > 0 {
  285. //fmt.Printf("Found a total of %d tweets\n", searchResult.Hits.TotalHits)
  286. // Iterate through results
  287. for _, hit := range searchResult.Hits.Hits {
  288. // hit.Index contains the name of the index
  289. // Deserialize hit.Source into a Tweet (could also be just a map[string]interface{}).
  290. var tweet Tweet
  291. err := json.Unmarshal(*hit.Source, &tweet)
  292. if err != nil {
  293. // Deserialization failed
  294. //failf("Deserialize failed: %v\n", err)
  295. t.runCh <- RunInfo{Success: false}
  296. continue
  297. }
  298. // Work with tweet
  299. //fmt.Printf("Tweet by %s: %s\n", t.User, t.Message)
  300. }
  301. } else {
  302. // No hits
  303. //fmt.Print("Found no tweets\n")
  304. }
  305. t.runCh <- RunInfo{Success: true}
  306. // Sleep some time
  307. time.Sleep(time.Duration(rand.Intn(500)) * time.Millisecond)
  308. }
  309. }