decoder.go 8.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346
  1. /*
  2. Copyright 2014 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package yaml
  14. import (
  15. "bufio"
  16. "bytes"
  17. "encoding/json"
  18. "fmt"
  19. "io"
  20. "io/ioutil"
  21. "strings"
  22. "unicode"
  23. "github.com/ghodss/yaml"
  24. "github.com/golang/glog"
  25. )
  26. // ToJSON converts a single YAML document into a JSON document
  27. // or returns an error. If the document appears to be JSON the
  28. // YAML decoding path is not used (so that error messages are
  29. // JSON specific).
  30. func ToJSON(data []byte) ([]byte, error) {
  31. if hasJSONPrefix(data) {
  32. return data, nil
  33. }
  34. return yaml.YAMLToJSON(data)
  35. }
  36. // YAMLToJSONDecoder decodes YAML documents from an io.Reader by
  37. // separating individual documents. It first converts the YAML
  38. // body to JSON, then unmarshals the JSON.
  39. type YAMLToJSONDecoder struct {
  40. reader Reader
  41. }
  42. // NewYAMLToJSONDecoder decodes YAML documents from the provided
  43. // stream in chunks by converting each document (as defined by
  44. // the YAML spec) into its own chunk, converting it to JSON via
  45. // yaml.YAMLToJSON, and then passing it to json.Decoder.
  46. func NewYAMLToJSONDecoder(r io.Reader) *YAMLToJSONDecoder {
  47. reader := bufio.NewReader(r)
  48. return &YAMLToJSONDecoder{
  49. reader: NewYAMLReader(reader),
  50. }
  51. }
  52. // Decode reads a YAML document as JSON from the stream or returns
  53. // an error. The decoding rules match json.Unmarshal, not
  54. // yaml.Unmarshal.
  55. func (d *YAMLToJSONDecoder) Decode(into interface{}) error {
  56. bytes, err := d.reader.Read()
  57. if err != nil && err != io.EOF {
  58. return err
  59. }
  60. if len(bytes) != 0 {
  61. err := yaml.Unmarshal(bytes, into)
  62. if err != nil {
  63. return YAMLSyntaxError{err}
  64. }
  65. }
  66. return err
  67. }
  68. // YAMLDecoder reads chunks of objects and returns ErrShortBuffer if
  69. // the data is not sufficient.
  70. type YAMLDecoder struct {
  71. r io.ReadCloser
  72. scanner *bufio.Scanner
  73. remaining []byte
  74. }
  75. // NewDocumentDecoder decodes YAML documents from the provided
  76. // stream in chunks by converting each document (as defined by
  77. // the YAML spec) into its own chunk. io.ErrShortBuffer will be
  78. // returned if the entire buffer could not be read to assist
  79. // the caller in framing the chunk.
  80. func NewDocumentDecoder(r io.ReadCloser) io.ReadCloser {
  81. scanner := bufio.NewScanner(r)
  82. scanner.Split(splitYAMLDocument)
  83. return &YAMLDecoder{
  84. r: r,
  85. scanner: scanner,
  86. }
  87. }
  88. // Read reads the previous slice into the buffer, or attempts to read
  89. // the next chunk.
  90. // TODO: switch to readline approach.
  91. func (d *YAMLDecoder) Read(data []byte) (n int, err error) {
  92. left := len(d.remaining)
  93. if left == 0 {
  94. // return the next chunk from the stream
  95. if !d.scanner.Scan() {
  96. err := d.scanner.Err()
  97. if err == nil {
  98. err = io.EOF
  99. }
  100. return 0, err
  101. }
  102. out := d.scanner.Bytes()
  103. d.remaining = out
  104. left = len(out)
  105. }
  106. // fits within data
  107. if left <= len(data) {
  108. copy(data, d.remaining)
  109. d.remaining = nil
  110. return left, nil
  111. }
  112. // caller will need to reread
  113. copy(data, d.remaining[:len(data)])
  114. d.remaining = d.remaining[len(data):]
  115. return len(data), io.ErrShortBuffer
  116. }
  117. func (d *YAMLDecoder) Close() error {
  118. return d.r.Close()
  119. }
  120. const yamlSeparator = "\n---"
  121. const separator = "---"
  122. // splitYAMLDocument is a bufio.SplitFunc for splitting YAML streams into individual documents.
  123. func splitYAMLDocument(data []byte, atEOF bool) (advance int, token []byte, err error) {
  124. if atEOF && len(data) == 0 {
  125. return 0, nil, nil
  126. }
  127. sep := len([]byte(yamlSeparator))
  128. if i := bytes.Index(data, []byte(yamlSeparator)); i >= 0 {
  129. // We have a potential document terminator
  130. i += sep
  131. after := data[i:]
  132. if len(after) == 0 {
  133. // we can't read any more characters
  134. if atEOF {
  135. return len(data), data[:len(data)-sep], nil
  136. }
  137. return 0, nil, nil
  138. }
  139. if j := bytes.IndexByte(after, '\n'); j >= 0 {
  140. return i + j + 1, data[0 : i-sep], nil
  141. }
  142. return 0, nil, nil
  143. }
  144. // If we're at EOF, we have a final, non-terminated line. Return it.
  145. if atEOF {
  146. return len(data), data, nil
  147. }
  148. // Request more data.
  149. return 0, nil, nil
  150. }
  151. // decoder is a convenience interface for Decode.
  152. type decoder interface {
  153. Decode(into interface{}) error
  154. }
  155. // YAMLOrJSONDecoder attempts to decode a stream of JSON documents or
  156. // YAML documents by sniffing for a leading { character.
  157. type YAMLOrJSONDecoder struct {
  158. r io.Reader
  159. bufferSize int
  160. decoder decoder
  161. rawData []byte
  162. }
  163. type JSONSyntaxError struct {
  164. Line int
  165. Err error
  166. }
  167. func (e JSONSyntaxError) Error() string {
  168. return fmt.Sprintf("json: line %d: %s", e.Line, e.Err.Error())
  169. }
  170. type YAMLSyntaxError struct {
  171. err error
  172. }
  173. func (e YAMLSyntaxError) Error() string {
  174. return e.err.Error()
  175. }
  176. // NewYAMLOrJSONDecoder returns a decoder that will process YAML documents
  177. // or JSON documents from the given reader as a stream. bufferSize determines
  178. // how far into the stream the decoder will look to figure out whether this
  179. // is a JSON stream (has whitespace followed by an open brace).
  180. func NewYAMLOrJSONDecoder(r io.Reader, bufferSize int) *YAMLOrJSONDecoder {
  181. return &YAMLOrJSONDecoder{
  182. r: r,
  183. bufferSize: bufferSize,
  184. }
  185. }
  186. // Decode unmarshals the next object from the underlying stream into the
  187. // provide object, or returns an error.
  188. func (d *YAMLOrJSONDecoder) Decode(into interface{}) error {
  189. if d.decoder == nil {
  190. buffer, origData, isJSON := GuessJSONStream(d.r, d.bufferSize)
  191. if isJSON {
  192. glog.V(4).Infof("decoding stream as JSON")
  193. d.decoder = json.NewDecoder(buffer)
  194. d.rawData = origData
  195. } else {
  196. glog.V(4).Infof("decoding stream as YAML")
  197. d.decoder = NewYAMLToJSONDecoder(buffer)
  198. }
  199. }
  200. err := d.decoder.Decode(into)
  201. if jsonDecoder, ok := d.decoder.(*json.Decoder); ok {
  202. if syntax, ok := err.(*json.SyntaxError); ok {
  203. data, readErr := ioutil.ReadAll(jsonDecoder.Buffered())
  204. if readErr != nil {
  205. glog.V(4).Infof("reading stream failed: %v", readErr)
  206. }
  207. js := string(data)
  208. // if contents from io.Reader are not complete,
  209. // use the original raw data to prevent panic
  210. if int64(len(js)) <= syntax.Offset {
  211. js = string(d.rawData)
  212. }
  213. start := strings.LastIndex(js[:syntax.Offset], "\n") + 1
  214. line := strings.Count(js[:start], "\n")
  215. return JSONSyntaxError{
  216. Line: line,
  217. Err: fmt.Errorf(syntax.Error()),
  218. }
  219. }
  220. }
  221. return err
  222. }
  223. type Reader interface {
  224. Read() ([]byte, error)
  225. }
  226. type YAMLReader struct {
  227. reader Reader
  228. }
  229. func NewYAMLReader(r *bufio.Reader) *YAMLReader {
  230. return &YAMLReader{
  231. reader: &LineReader{reader: r},
  232. }
  233. }
  234. // Read returns a full YAML document.
  235. func (r *YAMLReader) Read() ([]byte, error) {
  236. var buffer bytes.Buffer
  237. for {
  238. line, err := r.reader.Read()
  239. if err != nil && err != io.EOF {
  240. return nil, err
  241. }
  242. sep := len([]byte(separator))
  243. if i := bytes.Index(line, []byte(separator)); i == 0 {
  244. // We have a potential document terminator
  245. i += sep
  246. after := line[i:]
  247. if len(strings.TrimRightFunc(string(after), unicode.IsSpace)) == 0 {
  248. if buffer.Len() != 0 {
  249. return buffer.Bytes(), nil
  250. }
  251. if err == io.EOF {
  252. return nil, err
  253. }
  254. }
  255. }
  256. if err == io.EOF {
  257. if buffer.Len() != 0 {
  258. // If we're at EOF, we have a final, non-terminated line. Return it.
  259. return buffer.Bytes(), nil
  260. }
  261. return nil, err
  262. }
  263. buffer.Write(line)
  264. }
  265. }
  266. type LineReader struct {
  267. reader *bufio.Reader
  268. }
  269. // Read returns a single line (with '\n' ended) from the underlying reader.
  270. // An error is returned iff there is an error with the underlying reader.
  271. func (r *LineReader) Read() ([]byte, error) {
  272. var (
  273. isPrefix bool = true
  274. err error = nil
  275. line []byte
  276. buffer bytes.Buffer
  277. )
  278. for isPrefix && err == nil {
  279. line, isPrefix, err = r.reader.ReadLine()
  280. buffer.Write(line)
  281. }
  282. buffer.WriteByte('\n')
  283. return buffer.Bytes(), err
  284. }
  285. // GuessJSONStream scans the provided reader up to size, looking
  286. // for an open brace indicating this is JSON. It will return the
  287. // bufio.Reader it creates for the consumer.
  288. func GuessJSONStream(r io.Reader, size int) (io.Reader, []byte, bool) {
  289. buffer := bufio.NewReaderSize(r, size)
  290. b, _ := buffer.Peek(size)
  291. return buffer, b, hasJSONPrefix(b)
  292. }
  293. var jsonPrefix = []byte("{")
  294. // hasJSONPrefix returns true if the provided buffer appears to start with
  295. // a JSON open brace.
  296. func hasJSONPrefix(buf []byte) bool {
  297. return hasPrefix(buf, jsonPrefix)
  298. }
  299. // Return true if the first non-whitespace bytes in buf is
  300. // prefix.
  301. func hasPrefix(buf []byte, prefix []byte) bool {
  302. trim := bytes.TrimLeftFunc(buf, unicode.IsSpace)
  303. return bytes.HasPrefix(trim, prefix)
  304. }