123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662 |
- // Package client (v2) is the current official Go client for InfluxDB.
- package client // import "github.com/influxdata/influxdb/client/v2"
- import (
- "bytes"
- "crypto/tls"
- "encoding/json"
- "errors"
- "fmt"
- "io"
- "io/ioutil"
- "mime"
- "net/http"
- "net/url"
- "path"
- "strconv"
- "strings"
- "time"
- "github.com/influxdata/influxdb/models"
- )
- // HTTPConfig is the config data needed to create an HTTP Client.
- type HTTPConfig struct {
- // Addr should be of the form "http://host:port"
- // or "http://[ipv6-host%zone]:port".
- Addr string
- // Username is the influxdb username, optional.
- Username string
- // Password is the influxdb password, optional.
- Password string
- // UserAgent is the http User Agent, defaults to "InfluxDBClient".
- UserAgent string
- // Timeout for influxdb writes, defaults to no timeout.
- Timeout time.Duration
- // InsecureSkipVerify gets passed to the http client, if true, it will
- // skip https certificate verification. Defaults to false.
- InsecureSkipVerify bool
- // TLSConfig allows the user to set their own TLS config for the HTTP
- // Client. If set, this option overrides InsecureSkipVerify.
- TLSConfig *tls.Config
- // Proxy configures the Proxy function on the HTTP client.
- Proxy func(req *http.Request) (*url.URL, error)
- }
- // BatchPointsConfig is the config data needed to create an instance of the BatchPoints struct.
- type BatchPointsConfig struct {
- // Precision is the write precision of the points, defaults to "ns".
- Precision string
- // Database is the database to write points to.
- Database string
- // RetentionPolicy is the retention policy of the points.
- RetentionPolicy string
- // Write consistency is the number of servers required to confirm write.
- WriteConsistency string
- }
- // Client is a client interface for writing & querying the database.
- type Client interface {
- // Ping checks that status of cluster, and will always return 0 time and no
- // error for UDP clients.
- Ping(timeout time.Duration) (time.Duration, string, error)
- // Write takes a BatchPoints object and writes all Points to InfluxDB.
- Write(bp BatchPoints) error
- // Query makes an InfluxDB Query on the database. This will fail if using
- // the UDP client.
- Query(q Query) (*Response, error)
- // Close releases any resources a Client may be using.
- Close() error
- }
- // NewHTTPClient returns a new Client from the provided config.
- // Client is safe for concurrent use by multiple goroutines.
- func NewHTTPClient(conf HTTPConfig) (Client, error) {
- if conf.UserAgent == "" {
- conf.UserAgent = "InfluxDBClient"
- }
- u, err := url.Parse(conf.Addr)
- if err != nil {
- return nil, err
- } else if u.Scheme != "http" && u.Scheme != "https" {
- m := fmt.Sprintf("Unsupported protocol scheme: %s, your address"+
- " must start with http:// or https://", u.Scheme)
- return nil, errors.New(m)
- }
- tr := &http.Transport{
- TLSClientConfig: &tls.Config{
- InsecureSkipVerify: conf.InsecureSkipVerify,
- },
- Proxy: conf.Proxy,
- }
- if conf.TLSConfig != nil {
- tr.TLSClientConfig = conf.TLSConfig
- }
- return &client{
- url: *u,
- username: conf.Username,
- password: conf.Password,
- useragent: conf.UserAgent,
- httpClient: &http.Client{
- Timeout: conf.Timeout,
- Transport: tr,
- },
- transport: tr,
- }, nil
- }
- // Ping will check to see if the server is up with an optional timeout on waiting for leader.
- // Ping returns how long the request took, the version of the server it connected to, and an error if one occurred.
- func (c *client) Ping(timeout time.Duration) (time.Duration, string, error) {
- now := time.Now()
- u := c.url
- u.Path = path.Join(u.Path, "ping")
- req, err := http.NewRequest("GET", u.String(), nil)
- if err != nil {
- return 0, "", err
- }
- req.Header.Set("User-Agent", c.useragent)
- if c.username != "" {
- req.SetBasicAuth(c.username, c.password)
- }
- if timeout > 0 {
- params := req.URL.Query()
- params.Set("wait_for_leader", fmt.Sprintf("%.0fs", timeout.Seconds()))
- req.URL.RawQuery = params.Encode()
- }
- resp, err := c.httpClient.Do(req)
- if err != nil {
- return 0, "", err
- }
- defer resp.Body.Close()
- body, err := ioutil.ReadAll(resp.Body)
- if err != nil {
- return 0, "", err
- }
- if resp.StatusCode != http.StatusNoContent {
- var err = fmt.Errorf(string(body))
- return 0, "", err
- }
- version := resp.Header.Get("X-Influxdb-Version")
- return time.Since(now), version, nil
- }
- // Close releases the client's resources.
- func (c *client) Close() error {
- c.transport.CloseIdleConnections()
- return nil
- }
- // client is safe for concurrent use as the fields are all read-only
- // once the client is instantiated.
- type client struct {
- // N.B - if url.UserInfo is accessed in future modifications to the
- // methods on client, you will need to synchronize access to url.
- url url.URL
- username string
- password string
- useragent string
- httpClient *http.Client
- transport *http.Transport
- }
- // BatchPoints is an interface into a batched grouping of points to write into
- // InfluxDB together. BatchPoints is NOT thread-safe, you must create a separate
- // batch for each goroutine.
- type BatchPoints interface {
- // AddPoint adds the given point to the Batch of points.
- AddPoint(p *Point)
- // AddPoints adds the given points to the Batch of points.
- AddPoints(ps []*Point)
- // Points lists the points in the Batch.
- Points() []*Point
- // Precision returns the currently set precision of this Batch.
- Precision() string
- // SetPrecision sets the precision of this batch.
- SetPrecision(s string) error
- // Database returns the currently set database of this Batch.
- Database() string
- // SetDatabase sets the database of this Batch.
- SetDatabase(s string)
- // WriteConsistency returns the currently set write consistency of this Batch.
- WriteConsistency() string
- // SetWriteConsistency sets the write consistency of this Batch.
- SetWriteConsistency(s string)
- // RetentionPolicy returns the currently set retention policy of this Batch.
- RetentionPolicy() string
- // SetRetentionPolicy sets the retention policy of this Batch.
- SetRetentionPolicy(s string)
- }
- // NewBatchPoints returns a BatchPoints interface based on the given config.
- func NewBatchPoints(conf BatchPointsConfig) (BatchPoints, error) {
- if conf.Precision == "" {
- conf.Precision = "ns"
- }
- if _, err := time.ParseDuration("1" + conf.Precision); err != nil {
- return nil, err
- }
- bp := &batchpoints{
- database: conf.Database,
- precision: conf.Precision,
- retentionPolicy: conf.RetentionPolicy,
- writeConsistency: conf.WriteConsistency,
- }
- return bp, nil
- }
- type batchpoints struct {
- points []*Point
- database string
- precision string
- retentionPolicy string
- writeConsistency string
- }
- func (bp *batchpoints) AddPoint(p *Point) {
- bp.points = append(bp.points, p)
- }
- func (bp *batchpoints) AddPoints(ps []*Point) {
- bp.points = append(bp.points, ps...)
- }
- func (bp *batchpoints) Points() []*Point {
- return bp.points
- }
- func (bp *batchpoints) Precision() string {
- return bp.precision
- }
- func (bp *batchpoints) Database() string {
- return bp.database
- }
- func (bp *batchpoints) WriteConsistency() string {
- return bp.writeConsistency
- }
- func (bp *batchpoints) RetentionPolicy() string {
- return bp.retentionPolicy
- }
- func (bp *batchpoints) SetPrecision(p string) error {
- if _, err := time.ParseDuration("1" + p); err != nil {
- return err
- }
- bp.precision = p
- return nil
- }
- func (bp *batchpoints) SetDatabase(db string) {
- bp.database = db
- }
- func (bp *batchpoints) SetWriteConsistency(wc string) {
- bp.writeConsistency = wc
- }
- func (bp *batchpoints) SetRetentionPolicy(rp string) {
- bp.retentionPolicy = rp
- }
- // Point represents a single data point.
- type Point struct {
- pt models.Point
- }
- // NewPoint returns a point with the given timestamp. If a timestamp is not
- // given, then data is sent to the database without a timestamp, in which case
- // the server will assign local time upon reception. NOTE: it is recommended to
- // send data with a timestamp.
- func NewPoint(
- name string,
- tags map[string]string,
- fields map[string]interface{},
- t ...time.Time,
- ) (*Point, error) {
- var T time.Time
- if len(t) > 0 {
- T = t[0]
- }
- pt, err := models.NewPoint(name, models.NewTags(tags), fields, T)
- if err != nil {
- return nil, err
- }
- return &Point{
- pt: pt,
- }, nil
- }
- // String returns a line-protocol string of the Point.
- func (p *Point) String() string {
- return p.pt.String()
- }
- // PrecisionString returns a line-protocol string of the Point,
- // with the timestamp formatted for the given precision.
- func (p *Point) PrecisionString(precision string) string {
- return p.pt.PrecisionString(precision)
- }
- // Name returns the measurement name of the point.
- func (p *Point) Name() string {
- return string(p.pt.Name())
- }
- // Tags returns the tags associated with the point.
- func (p *Point) Tags() map[string]string {
- return p.pt.Tags().Map()
- }
- // Time return the timestamp for the point.
- func (p *Point) Time() time.Time {
- return p.pt.Time()
- }
- // UnixNano returns timestamp of the point in nanoseconds since Unix epoch.
- func (p *Point) UnixNano() int64 {
- return p.pt.UnixNano()
- }
- // Fields returns the fields for the point.
- func (p *Point) Fields() (map[string]interface{}, error) {
- return p.pt.Fields()
- }
- // NewPointFrom returns a point from the provided models.Point.
- func NewPointFrom(pt models.Point) *Point {
- return &Point{pt: pt}
- }
- func (c *client) Write(bp BatchPoints) error {
- var b bytes.Buffer
- for _, p := range bp.Points() {
- if p == nil {
- continue
- }
- if _, err := b.WriteString(p.pt.PrecisionString(bp.Precision())); err != nil {
- return err
- }
- if err := b.WriteByte('\n'); err != nil {
- return err
- }
- }
- u := c.url
- u.Path = path.Join(u.Path, "write")
- req, err := http.NewRequest("POST", u.String(), &b)
- if err != nil {
- return err
- }
- req.Header.Set("Content-Type", "")
- req.Header.Set("User-Agent", c.useragent)
- if c.username != "" {
- req.SetBasicAuth(c.username, c.password)
- }
- params := req.URL.Query()
- params.Set("db", bp.Database())
- params.Set("rp", bp.RetentionPolicy())
- params.Set("precision", bp.Precision())
- params.Set("consistency", bp.WriteConsistency())
- req.URL.RawQuery = params.Encode()
- resp, err := c.httpClient.Do(req)
- if err != nil {
- return err
- }
- defer resp.Body.Close()
- body, err := ioutil.ReadAll(resp.Body)
- if err != nil {
- return err
- }
- if resp.StatusCode != http.StatusNoContent && resp.StatusCode != http.StatusOK {
- var err = fmt.Errorf(string(body))
- return err
- }
- return nil
- }
- // Query defines a query to send to the server.
- type Query struct {
- Command string
- Database string
- RetentionPolicy string
- Precision string
- Chunked bool
- ChunkSize int
- Parameters map[string]interface{}
- }
- // NewQuery returns a query object.
- // The database and precision arguments can be empty strings if they are not needed for the query.
- func NewQuery(command, database, precision string) Query {
- return Query{
- Command: command,
- Database: database,
- Precision: precision,
- Parameters: make(map[string]interface{}),
- }
- }
- // NewQueryWithRP returns a query object.
- // The database, retention policy, and precision arguments can be empty strings if they are not needed
- // for the query. Setting the retention policy only works on InfluxDB versions 1.6 or greater.
- func NewQueryWithRP(command, database, retentionPolicy, precision string) Query {
- return Query{
- Command: command,
- Database: database,
- RetentionPolicy: retentionPolicy,
- Precision: precision,
- Parameters: make(map[string]interface{}),
- }
- }
- // NewQueryWithParameters returns a query object.
- // The database and precision arguments can be empty strings if they are not needed for the query.
- // parameters is a map of the parameter names used in the command to their values.
- func NewQueryWithParameters(command, database, precision string, parameters map[string]interface{}) Query {
- return Query{
- Command: command,
- Database: database,
- Precision: precision,
- Parameters: parameters,
- }
- }
- // Response represents a list of statement results.
- type Response struct {
- Results []Result
- Err string `json:"error,omitempty"`
- }
- // Error returns the first error from any statement.
- // It returns nil if no errors occurred on any statements.
- func (r *Response) Error() error {
- if r.Err != "" {
- return fmt.Errorf(r.Err)
- }
- for _, result := range r.Results {
- if result.Err != "" {
- return fmt.Errorf(result.Err)
- }
- }
- return nil
- }
- // Message represents a user message.
- type Message struct {
- Level string
- Text string
- }
- // Result represents a resultset returned from a single statement.
- type Result struct {
- Series []models.Row
- Messages []*Message
- Err string `json:"error,omitempty"`
- }
- // Query sends a command to the server and returns the Response.
- func (c *client) Query(q Query) (*Response, error) {
- u := c.url
- u.Path = path.Join(u.Path, "query")
- jsonParameters, err := json.Marshal(q.Parameters)
- if err != nil {
- return nil, err
- }
- req, err := http.NewRequest("POST", u.String(), nil)
- if err != nil {
- return nil, err
- }
- req.Header.Set("Content-Type", "")
- req.Header.Set("User-Agent", c.useragent)
- if c.username != "" {
- req.SetBasicAuth(c.username, c.password)
- }
- params := req.URL.Query()
- params.Set("q", q.Command)
- params.Set("db", q.Database)
- if q.RetentionPolicy != "" {
- params.Set("rp", q.RetentionPolicy)
- }
- params.Set("params", string(jsonParameters))
- if q.Chunked {
- params.Set("chunked", "true")
- if q.ChunkSize > 0 {
- params.Set("chunk_size", strconv.Itoa(q.ChunkSize))
- }
- }
- if q.Precision != "" {
- params.Set("epoch", q.Precision)
- }
- req.URL.RawQuery = params.Encode()
- resp, err := c.httpClient.Do(req)
- if err != nil {
- return nil, err
- }
- defer resp.Body.Close()
- // If we lack a X-Influxdb-Version header, then we didn't get a response from influxdb
- // but instead some other service. If the error code is also a 500+ code, then some
- // downstream loadbalancer/proxy/etc had an issue and we should report that.
- if resp.Header.Get("X-Influxdb-Version") == "" && resp.StatusCode >= http.StatusInternalServerError {
- body, err := ioutil.ReadAll(resp.Body)
- if err != nil || len(body) == 0 {
- return nil, fmt.Errorf("received status code %d from downstream server", resp.StatusCode)
- }
- return nil, fmt.Errorf("received status code %d from downstream server, with response body: %q", resp.StatusCode, body)
- }
- // If we get an unexpected content type, then it is also not from influx direct and therefore
- // we want to know what we received and what status code was returned for debugging purposes.
- if cType, _, _ := mime.ParseMediaType(resp.Header.Get("Content-Type")); cType != "application/json" {
- // Read up to 1kb of the body to help identify downstream errors and limit the impact of things
- // like downstream serving a large file
- body, err := ioutil.ReadAll(io.LimitReader(resp.Body, 1024))
- if err != nil || len(body) == 0 {
- return nil, fmt.Errorf("expected json response, got empty body, with status: %v", resp.StatusCode)
- }
- return nil, fmt.Errorf("expected json response, got %q, with status: %v and response body: %q", cType, resp.StatusCode, body)
- }
- var response Response
- if q.Chunked {
- cr := NewChunkedResponse(resp.Body)
- for {
- r, err := cr.NextResponse()
- if err != nil {
- // If we got an error while decoding the response, send that back.
- return nil, err
- }
- if r == nil {
- break
- }
- response.Results = append(response.Results, r.Results...)
- if r.Err != "" {
- response.Err = r.Err
- break
- }
- }
- } else {
- dec := json.NewDecoder(resp.Body)
- dec.UseNumber()
- decErr := dec.Decode(&response)
- // ignore this error if we got an invalid status code
- if decErr != nil && decErr.Error() == "EOF" && resp.StatusCode != http.StatusOK {
- decErr = nil
- }
- // If we got a valid decode error, send that back
- if decErr != nil {
- return nil, fmt.Errorf("unable to decode json: received status code %d err: %s", resp.StatusCode, decErr)
- }
- }
- // If we don't have an error in our json response, and didn't get statusOK
- // then send back an error
- if resp.StatusCode != http.StatusOK && response.Error() == nil {
- return &response, fmt.Errorf("received status code %d from server", resp.StatusCode)
- }
- return &response, nil
- }
- // duplexReader reads responses and writes it to another writer while
- // satisfying the reader interface.
- type duplexReader struct {
- r io.Reader
- w io.Writer
- }
- func (r *duplexReader) Read(p []byte) (n int, err error) {
- n, err = r.r.Read(p)
- if err == nil {
- r.w.Write(p[:n])
- }
- return n, err
- }
- // ChunkedResponse represents a response from the server that
- // uses chunking to stream the output.
- type ChunkedResponse struct {
- dec *json.Decoder
- duplex *duplexReader
- buf bytes.Buffer
- }
- // NewChunkedResponse reads a stream and produces responses from the stream.
- func NewChunkedResponse(r io.Reader) *ChunkedResponse {
- resp := &ChunkedResponse{}
- resp.duplex = &duplexReader{r: r, w: &resp.buf}
- resp.dec = json.NewDecoder(resp.duplex)
- resp.dec.UseNumber()
- return resp
- }
- // NextResponse reads the next line of the stream and returns a response.
- func (r *ChunkedResponse) NextResponse() (*Response, error) {
- var response Response
- if err := r.dec.Decode(&response); err != nil {
- if err == io.EOF {
- return nil, nil
- }
- // A decoding error happened. This probably means the server crashed
- // and sent a last-ditch error message to us. Ensure we have read the
- // entirety of the connection to get any remaining error text.
- io.Copy(ioutil.Discard, r.duplex)
- return nil, errors.New(strings.TrimSpace(r.buf.String()))
- }
- r.buf.Reset()
- return &response, nil
- }
|