client.go 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662
  1. // Package client (v2) is the current official Go client for InfluxDB.
  2. package client // import "github.com/influxdata/influxdb/client/v2"
  3. import (
  4. "bytes"
  5. "crypto/tls"
  6. "encoding/json"
  7. "errors"
  8. "fmt"
  9. "io"
  10. "io/ioutil"
  11. "mime"
  12. "net/http"
  13. "net/url"
  14. "path"
  15. "strconv"
  16. "strings"
  17. "time"
  18. "github.com/influxdata/influxdb/models"
  19. )
  20. // HTTPConfig is the config data needed to create an HTTP Client.
  21. type HTTPConfig struct {
  22. // Addr should be of the form "http://host:port"
  23. // or "http://[ipv6-host%zone]:port".
  24. Addr string
  25. // Username is the influxdb username, optional.
  26. Username string
  27. // Password is the influxdb password, optional.
  28. Password string
  29. // UserAgent is the http User Agent, defaults to "InfluxDBClient".
  30. UserAgent string
  31. // Timeout for influxdb writes, defaults to no timeout.
  32. Timeout time.Duration
  33. // InsecureSkipVerify gets passed to the http client, if true, it will
  34. // skip https certificate verification. Defaults to false.
  35. InsecureSkipVerify bool
  36. // TLSConfig allows the user to set their own TLS config for the HTTP
  37. // Client. If set, this option overrides InsecureSkipVerify.
  38. TLSConfig *tls.Config
  39. // Proxy configures the Proxy function on the HTTP client.
  40. Proxy func(req *http.Request) (*url.URL, error)
  41. }
  42. // BatchPointsConfig is the config data needed to create an instance of the BatchPoints struct.
  43. type BatchPointsConfig struct {
  44. // Precision is the write precision of the points, defaults to "ns".
  45. Precision string
  46. // Database is the database to write points to.
  47. Database string
  48. // RetentionPolicy is the retention policy of the points.
  49. RetentionPolicy string
  50. // Write consistency is the number of servers required to confirm write.
  51. WriteConsistency string
  52. }
  53. // Client is a client interface for writing & querying the database.
  54. type Client interface {
  55. // Ping checks that status of cluster, and will always return 0 time and no
  56. // error for UDP clients.
  57. Ping(timeout time.Duration) (time.Duration, string, error)
  58. // Write takes a BatchPoints object and writes all Points to InfluxDB.
  59. Write(bp BatchPoints) error
  60. // Query makes an InfluxDB Query on the database. This will fail if using
  61. // the UDP client.
  62. Query(q Query) (*Response, error)
  63. // Close releases any resources a Client may be using.
  64. Close() error
  65. }
  66. // NewHTTPClient returns a new Client from the provided config.
  67. // Client is safe for concurrent use by multiple goroutines.
  68. func NewHTTPClient(conf HTTPConfig) (Client, error) {
  69. if conf.UserAgent == "" {
  70. conf.UserAgent = "InfluxDBClient"
  71. }
  72. u, err := url.Parse(conf.Addr)
  73. if err != nil {
  74. return nil, err
  75. } else if u.Scheme != "http" && u.Scheme != "https" {
  76. m := fmt.Sprintf("Unsupported protocol scheme: %s, your address"+
  77. " must start with http:// or https://", u.Scheme)
  78. return nil, errors.New(m)
  79. }
  80. tr := &http.Transport{
  81. TLSClientConfig: &tls.Config{
  82. InsecureSkipVerify: conf.InsecureSkipVerify,
  83. },
  84. Proxy: conf.Proxy,
  85. }
  86. if conf.TLSConfig != nil {
  87. tr.TLSClientConfig = conf.TLSConfig
  88. }
  89. return &client{
  90. url: *u,
  91. username: conf.Username,
  92. password: conf.Password,
  93. useragent: conf.UserAgent,
  94. httpClient: &http.Client{
  95. Timeout: conf.Timeout,
  96. Transport: tr,
  97. },
  98. transport: tr,
  99. }, nil
  100. }
  101. // Ping will check to see if the server is up with an optional timeout on waiting for leader.
  102. // Ping returns how long the request took, the version of the server it connected to, and an error if one occurred.
  103. func (c *client) Ping(timeout time.Duration) (time.Duration, string, error) {
  104. now := time.Now()
  105. u := c.url
  106. u.Path = path.Join(u.Path, "ping")
  107. req, err := http.NewRequest("GET", u.String(), nil)
  108. if err != nil {
  109. return 0, "", err
  110. }
  111. req.Header.Set("User-Agent", c.useragent)
  112. if c.username != "" {
  113. req.SetBasicAuth(c.username, c.password)
  114. }
  115. if timeout > 0 {
  116. params := req.URL.Query()
  117. params.Set("wait_for_leader", fmt.Sprintf("%.0fs", timeout.Seconds()))
  118. req.URL.RawQuery = params.Encode()
  119. }
  120. resp, err := c.httpClient.Do(req)
  121. if err != nil {
  122. return 0, "", err
  123. }
  124. defer resp.Body.Close()
  125. body, err := ioutil.ReadAll(resp.Body)
  126. if err != nil {
  127. return 0, "", err
  128. }
  129. if resp.StatusCode != http.StatusNoContent {
  130. var err = fmt.Errorf(string(body))
  131. return 0, "", err
  132. }
  133. version := resp.Header.Get("X-Influxdb-Version")
  134. return time.Since(now), version, nil
  135. }
  136. // Close releases the client's resources.
  137. func (c *client) Close() error {
  138. c.transport.CloseIdleConnections()
  139. return nil
  140. }
  141. // client is safe for concurrent use as the fields are all read-only
  142. // once the client is instantiated.
  143. type client struct {
  144. // N.B - if url.UserInfo is accessed in future modifications to the
  145. // methods on client, you will need to synchronize access to url.
  146. url url.URL
  147. username string
  148. password string
  149. useragent string
  150. httpClient *http.Client
  151. transport *http.Transport
  152. }
  153. // BatchPoints is an interface into a batched grouping of points to write into
  154. // InfluxDB together. BatchPoints is NOT thread-safe, you must create a separate
  155. // batch for each goroutine.
  156. type BatchPoints interface {
  157. // AddPoint adds the given point to the Batch of points.
  158. AddPoint(p *Point)
  159. // AddPoints adds the given points to the Batch of points.
  160. AddPoints(ps []*Point)
  161. // Points lists the points in the Batch.
  162. Points() []*Point
  163. // Precision returns the currently set precision of this Batch.
  164. Precision() string
  165. // SetPrecision sets the precision of this batch.
  166. SetPrecision(s string) error
  167. // Database returns the currently set database of this Batch.
  168. Database() string
  169. // SetDatabase sets the database of this Batch.
  170. SetDatabase(s string)
  171. // WriteConsistency returns the currently set write consistency of this Batch.
  172. WriteConsistency() string
  173. // SetWriteConsistency sets the write consistency of this Batch.
  174. SetWriteConsistency(s string)
  175. // RetentionPolicy returns the currently set retention policy of this Batch.
  176. RetentionPolicy() string
  177. // SetRetentionPolicy sets the retention policy of this Batch.
  178. SetRetentionPolicy(s string)
  179. }
  180. // NewBatchPoints returns a BatchPoints interface based on the given config.
  181. func NewBatchPoints(conf BatchPointsConfig) (BatchPoints, error) {
  182. if conf.Precision == "" {
  183. conf.Precision = "ns"
  184. }
  185. if _, err := time.ParseDuration("1" + conf.Precision); err != nil {
  186. return nil, err
  187. }
  188. bp := &batchpoints{
  189. database: conf.Database,
  190. precision: conf.Precision,
  191. retentionPolicy: conf.RetentionPolicy,
  192. writeConsistency: conf.WriteConsistency,
  193. }
  194. return bp, nil
  195. }
  196. type batchpoints struct {
  197. points []*Point
  198. database string
  199. precision string
  200. retentionPolicy string
  201. writeConsistency string
  202. }
  203. func (bp *batchpoints) AddPoint(p *Point) {
  204. bp.points = append(bp.points, p)
  205. }
  206. func (bp *batchpoints) AddPoints(ps []*Point) {
  207. bp.points = append(bp.points, ps...)
  208. }
  209. func (bp *batchpoints) Points() []*Point {
  210. return bp.points
  211. }
  212. func (bp *batchpoints) Precision() string {
  213. return bp.precision
  214. }
  215. func (bp *batchpoints) Database() string {
  216. return bp.database
  217. }
  218. func (bp *batchpoints) WriteConsistency() string {
  219. return bp.writeConsistency
  220. }
  221. func (bp *batchpoints) RetentionPolicy() string {
  222. return bp.retentionPolicy
  223. }
  224. func (bp *batchpoints) SetPrecision(p string) error {
  225. if _, err := time.ParseDuration("1" + p); err != nil {
  226. return err
  227. }
  228. bp.precision = p
  229. return nil
  230. }
  231. func (bp *batchpoints) SetDatabase(db string) {
  232. bp.database = db
  233. }
  234. func (bp *batchpoints) SetWriteConsistency(wc string) {
  235. bp.writeConsistency = wc
  236. }
  237. func (bp *batchpoints) SetRetentionPolicy(rp string) {
  238. bp.retentionPolicy = rp
  239. }
  240. // Point represents a single data point.
  241. type Point struct {
  242. pt models.Point
  243. }
  244. // NewPoint returns a point with the given timestamp. If a timestamp is not
  245. // given, then data is sent to the database without a timestamp, in which case
  246. // the server will assign local time upon reception. NOTE: it is recommended to
  247. // send data with a timestamp.
  248. func NewPoint(
  249. name string,
  250. tags map[string]string,
  251. fields map[string]interface{},
  252. t ...time.Time,
  253. ) (*Point, error) {
  254. var T time.Time
  255. if len(t) > 0 {
  256. T = t[0]
  257. }
  258. pt, err := models.NewPoint(name, models.NewTags(tags), fields, T)
  259. if err != nil {
  260. return nil, err
  261. }
  262. return &Point{
  263. pt: pt,
  264. }, nil
  265. }
  266. // String returns a line-protocol string of the Point.
  267. func (p *Point) String() string {
  268. return p.pt.String()
  269. }
  270. // PrecisionString returns a line-protocol string of the Point,
  271. // with the timestamp formatted for the given precision.
  272. func (p *Point) PrecisionString(precision string) string {
  273. return p.pt.PrecisionString(precision)
  274. }
  275. // Name returns the measurement name of the point.
  276. func (p *Point) Name() string {
  277. return string(p.pt.Name())
  278. }
  279. // Tags returns the tags associated with the point.
  280. func (p *Point) Tags() map[string]string {
  281. return p.pt.Tags().Map()
  282. }
  283. // Time return the timestamp for the point.
  284. func (p *Point) Time() time.Time {
  285. return p.pt.Time()
  286. }
  287. // UnixNano returns timestamp of the point in nanoseconds since Unix epoch.
  288. func (p *Point) UnixNano() int64 {
  289. return p.pt.UnixNano()
  290. }
  291. // Fields returns the fields for the point.
  292. func (p *Point) Fields() (map[string]interface{}, error) {
  293. return p.pt.Fields()
  294. }
  295. // NewPointFrom returns a point from the provided models.Point.
  296. func NewPointFrom(pt models.Point) *Point {
  297. return &Point{pt: pt}
  298. }
  299. func (c *client) Write(bp BatchPoints) error {
  300. var b bytes.Buffer
  301. for _, p := range bp.Points() {
  302. if p == nil {
  303. continue
  304. }
  305. if _, err := b.WriteString(p.pt.PrecisionString(bp.Precision())); err != nil {
  306. return err
  307. }
  308. if err := b.WriteByte('\n'); err != nil {
  309. return err
  310. }
  311. }
  312. u := c.url
  313. u.Path = path.Join(u.Path, "write")
  314. req, err := http.NewRequest("POST", u.String(), &b)
  315. if err != nil {
  316. return err
  317. }
  318. req.Header.Set("Content-Type", "")
  319. req.Header.Set("User-Agent", c.useragent)
  320. if c.username != "" {
  321. req.SetBasicAuth(c.username, c.password)
  322. }
  323. params := req.URL.Query()
  324. params.Set("db", bp.Database())
  325. params.Set("rp", bp.RetentionPolicy())
  326. params.Set("precision", bp.Precision())
  327. params.Set("consistency", bp.WriteConsistency())
  328. req.URL.RawQuery = params.Encode()
  329. resp, err := c.httpClient.Do(req)
  330. if err != nil {
  331. return err
  332. }
  333. defer resp.Body.Close()
  334. body, err := ioutil.ReadAll(resp.Body)
  335. if err != nil {
  336. return err
  337. }
  338. if resp.StatusCode != http.StatusNoContent && resp.StatusCode != http.StatusOK {
  339. var err = fmt.Errorf(string(body))
  340. return err
  341. }
  342. return nil
  343. }
  344. // Query defines a query to send to the server.
  345. type Query struct {
  346. Command string
  347. Database string
  348. RetentionPolicy string
  349. Precision string
  350. Chunked bool
  351. ChunkSize int
  352. Parameters map[string]interface{}
  353. }
  354. // NewQuery returns a query object.
  355. // The database and precision arguments can be empty strings if they are not needed for the query.
  356. func NewQuery(command, database, precision string) Query {
  357. return Query{
  358. Command: command,
  359. Database: database,
  360. Precision: precision,
  361. Parameters: make(map[string]interface{}),
  362. }
  363. }
  364. // NewQueryWithRP returns a query object.
  365. // The database, retention policy, and precision arguments can be empty strings if they are not needed
  366. // for the query. Setting the retention policy only works on InfluxDB versions 1.6 or greater.
  367. func NewQueryWithRP(command, database, retentionPolicy, precision string) Query {
  368. return Query{
  369. Command: command,
  370. Database: database,
  371. RetentionPolicy: retentionPolicy,
  372. Precision: precision,
  373. Parameters: make(map[string]interface{}),
  374. }
  375. }
  376. // NewQueryWithParameters returns a query object.
  377. // The database and precision arguments can be empty strings if they are not needed for the query.
  378. // parameters is a map of the parameter names used in the command to their values.
  379. func NewQueryWithParameters(command, database, precision string, parameters map[string]interface{}) Query {
  380. return Query{
  381. Command: command,
  382. Database: database,
  383. Precision: precision,
  384. Parameters: parameters,
  385. }
  386. }
  387. // Response represents a list of statement results.
  388. type Response struct {
  389. Results []Result
  390. Err string `json:"error,omitempty"`
  391. }
  392. // Error returns the first error from any statement.
  393. // It returns nil if no errors occurred on any statements.
  394. func (r *Response) Error() error {
  395. if r.Err != "" {
  396. return fmt.Errorf(r.Err)
  397. }
  398. for _, result := range r.Results {
  399. if result.Err != "" {
  400. return fmt.Errorf(result.Err)
  401. }
  402. }
  403. return nil
  404. }
  405. // Message represents a user message.
  406. type Message struct {
  407. Level string
  408. Text string
  409. }
  410. // Result represents a resultset returned from a single statement.
  411. type Result struct {
  412. Series []models.Row
  413. Messages []*Message
  414. Err string `json:"error,omitempty"`
  415. }
  416. // Query sends a command to the server and returns the Response.
  417. func (c *client) Query(q Query) (*Response, error) {
  418. u := c.url
  419. u.Path = path.Join(u.Path, "query")
  420. jsonParameters, err := json.Marshal(q.Parameters)
  421. if err != nil {
  422. return nil, err
  423. }
  424. req, err := http.NewRequest("POST", u.String(), nil)
  425. if err != nil {
  426. return nil, err
  427. }
  428. req.Header.Set("Content-Type", "")
  429. req.Header.Set("User-Agent", c.useragent)
  430. if c.username != "" {
  431. req.SetBasicAuth(c.username, c.password)
  432. }
  433. params := req.URL.Query()
  434. params.Set("q", q.Command)
  435. params.Set("db", q.Database)
  436. if q.RetentionPolicy != "" {
  437. params.Set("rp", q.RetentionPolicy)
  438. }
  439. params.Set("params", string(jsonParameters))
  440. if q.Chunked {
  441. params.Set("chunked", "true")
  442. if q.ChunkSize > 0 {
  443. params.Set("chunk_size", strconv.Itoa(q.ChunkSize))
  444. }
  445. }
  446. if q.Precision != "" {
  447. params.Set("epoch", q.Precision)
  448. }
  449. req.URL.RawQuery = params.Encode()
  450. resp, err := c.httpClient.Do(req)
  451. if err != nil {
  452. return nil, err
  453. }
  454. defer resp.Body.Close()
  455. // If we lack a X-Influxdb-Version header, then we didn't get a response from influxdb
  456. // but instead some other service. If the error code is also a 500+ code, then some
  457. // downstream loadbalancer/proxy/etc had an issue and we should report that.
  458. if resp.Header.Get("X-Influxdb-Version") == "" && resp.StatusCode >= http.StatusInternalServerError {
  459. body, err := ioutil.ReadAll(resp.Body)
  460. if err != nil || len(body) == 0 {
  461. return nil, fmt.Errorf("received status code %d from downstream server", resp.StatusCode)
  462. }
  463. return nil, fmt.Errorf("received status code %d from downstream server, with response body: %q", resp.StatusCode, body)
  464. }
  465. // If we get an unexpected content type, then it is also not from influx direct and therefore
  466. // we want to know what we received and what status code was returned for debugging purposes.
  467. if cType, _, _ := mime.ParseMediaType(resp.Header.Get("Content-Type")); cType != "application/json" {
  468. // Read up to 1kb of the body to help identify downstream errors and limit the impact of things
  469. // like downstream serving a large file
  470. body, err := ioutil.ReadAll(io.LimitReader(resp.Body, 1024))
  471. if err != nil || len(body) == 0 {
  472. return nil, fmt.Errorf("expected json response, got empty body, with status: %v", resp.StatusCode)
  473. }
  474. return nil, fmt.Errorf("expected json response, got %q, with status: %v and response body: %q", cType, resp.StatusCode, body)
  475. }
  476. var response Response
  477. if q.Chunked {
  478. cr := NewChunkedResponse(resp.Body)
  479. for {
  480. r, err := cr.NextResponse()
  481. if err != nil {
  482. // If we got an error while decoding the response, send that back.
  483. return nil, err
  484. }
  485. if r == nil {
  486. break
  487. }
  488. response.Results = append(response.Results, r.Results...)
  489. if r.Err != "" {
  490. response.Err = r.Err
  491. break
  492. }
  493. }
  494. } else {
  495. dec := json.NewDecoder(resp.Body)
  496. dec.UseNumber()
  497. decErr := dec.Decode(&response)
  498. // ignore this error if we got an invalid status code
  499. if decErr != nil && decErr.Error() == "EOF" && resp.StatusCode != http.StatusOK {
  500. decErr = nil
  501. }
  502. // If we got a valid decode error, send that back
  503. if decErr != nil {
  504. return nil, fmt.Errorf("unable to decode json: received status code %d err: %s", resp.StatusCode, decErr)
  505. }
  506. }
  507. // If we don't have an error in our json response, and didn't get statusOK
  508. // then send back an error
  509. if resp.StatusCode != http.StatusOK && response.Error() == nil {
  510. return &response, fmt.Errorf("received status code %d from server", resp.StatusCode)
  511. }
  512. return &response, nil
  513. }
  514. // duplexReader reads responses and writes it to another writer while
  515. // satisfying the reader interface.
  516. type duplexReader struct {
  517. r io.Reader
  518. w io.Writer
  519. }
  520. func (r *duplexReader) Read(p []byte) (n int, err error) {
  521. n, err = r.r.Read(p)
  522. if err == nil {
  523. r.w.Write(p[:n])
  524. }
  525. return n, err
  526. }
  527. // ChunkedResponse represents a response from the server that
  528. // uses chunking to stream the output.
  529. type ChunkedResponse struct {
  530. dec *json.Decoder
  531. duplex *duplexReader
  532. buf bytes.Buffer
  533. }
  534. // NewChunkedResponse reads a stream and produces responses from the stream.
  535. func NewChunkedResponse(r io.Reader) *ChunkedResponse {
  536. resp := &ChunkedResponse{}
  537. resp.duplex = &duplexReader{r: r, w: &resp.buf}
  538. resp.dec = json.NewDecoder(resp.duplex)
  539. resp.dec.UseNumber()
  540. return resp
  541. }
  542. // NextResponse reads the next line of the stream and returns a response.
  543. func (r *ChunkedResponse) NextResponse() (*Response, error) {
  544. var response Response
  545. if err := r.dec.Decode(&response); err != nil {
  546. if err == io.EOF {
  547. return nil, nil
  548. }
  549. // A decoding error happened. This probably means the server crashed
  550. // and sent a last-ditch error message to us. Ensure we have read the
  551. // entirety of the connection to get any remaining error text.
  552. io.Copy(ioutil.Discard, r.duplex)
  553. return nil, errors.New(strings.TrimSpace(r.buf.String()))
  554. }
  555. r.buf.Reset()
  556. return &response, nil
  557. }