protobuf.go 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448
  1. /*
  2. Copyright 2015 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package protobuf
  14. import (
  15. "bytes"
  16. "fmt"
  17. "io"
  18. "reflect"
  19. "github.com/gogo/protobuf/proto"
  20. "k8s.io/apimachinery/pkg/runtime"
  21. "k8s.io/apimachinery/pkg/runtime/schema"
  22. "k8s.io/apimachinery/pkg/runtime/serializer/recognizer"
  23. "k8s.io/apimachinery/pkg/util/framer"
  24. )
  25. var (
  26. // protoEncodingPrefix serves as a magic number for an encoded protobuf message on this serializer. All
  27. // proto messages serialized by this schema will be preceded by the bytes 0x6b 0x38 0x73, with the fourth
  28. // byte being reserved for the encoding style. The only encoding style defined is 0x00, which means that
  29. // the rest of the byte stream is a message of type k8s.io.kubernetes.pkg.runtime.Unknown (proto2).
  30. //
  31. // See k8s.io/apimachinery/pkg/runtime/generated.proto for details of the runtime.Unknown message.
  32. //
  33. // This encoding scheme is experimental, and is subject to change at any time.
  34. protoEncodingPrefix = []byte{0x6b, 0x38, 0x73, 0x00}
  35. )
  36. type errNotMarshalable struct {
  37. t reflect.Type
  38. }
  39. func (e errNotMarshalable) Error() string {
  40. return fmt.Sprintf("object %v does not implement the protobuf marshalling interface and cannot be encoded to a protobuf message", e.t)
  41. }
  42. func IsNotMarshalable(err error) bool {
  43. _, ok := err.(errNotMarshalable)
  44. return err != nil && ok
  45. }
  46. // NewSerializer creates a Protobuf serializer that handles encoding versioned objects into the proper wire form. If a typer
  47. // is passed, the encoded object will have group, version, and kind fields set. If typer is nil, the objects will be written
  48. // as-is (any type info passed with the object will be used).
  49. //
  50. // This encoding scheme is experimental, and is subject to change at any time.
  51. func NewSerializer(creater runtime.ObjectCreater, typer runtime.ObjectTyper, defaultContentType string) *Serializer {
  52. return &Serializer{
  53. prefix: protoEncodingPrefix,
  54. creater: creater,
  55. typer: typer,
  56. contentType: defaultContentType,
  57. }
  58. }
  59. type Serializer struct {
  60. prefix []byte
  61. creater runtime.ObjectCreater
  62. typer runtime.ObjectTyper
  63. contentType string
  64. }
  65. var _ runtime.Serializer = &Serializer{}
  66. var _ recognizer.RecognizingDecoder = &Serializer{}
  67. // Decode attempts to convert the provided data into a protobuf message, extract the stored schema kind, apply the provided default
  68. // gvk, and then load that data into an object matching the desired schema kind or the provided into. If into is *runtime.Unknown,
  69. // the raw data will be extracted and no decoding will be performed. If into is not registered with the typer, then the object will
  70. // be straight decoded using normal protobuf unmarshalling (the MarshalTo interface). If into is provided and the original data is
  71. // not fully qualified with kind/version/group, the type of the into will be used to alter the returned gvk. On success or most
  72. // errors, the method will return the calculated schema kind.
  73. func (s *Serializer) Decode(originalData []byte, gvk *schema.GroupVersionKind, into runtime.Object) (runtime.Object, *schema.GroupVersionKind, error) {
  74. if versioned, ok := into.(*runtime.VersionedObjects); ok {
  75. into = versioned.Last()
  76. obj, actual, err := s.Decode(originalData, gvk, into)
  77. if err != nil {
  78. return nil, actual, err
  79. }
  80. // the last item in versioned becomes into, so if versioned was not originally empty we reset the object
  81. // array so the first position is the decoded object and the second position is the outermost object.
  82. // if there were no objects in the versioned list passed to us, only add ourselves.
  83. if into != nil && into != obj {
  84. versioned.Objects = []runtime.Object{obj, into}
  85. } else {
  86. versioned.Objects = []runtime.Object{obj}
  87. }
  88. return versioned, actual, err
  89. }
  90. prefixLen := len(s.prefix)
  91. switch {
  92. case len(originalData) == 0:
  93. // TODO: treat like decoding {} from JSON with defaulting
  94. return nil, nil, fmt.Errorf("empty data")
  95. case len(originalData) < prefixLen || !bytes.Equal(s.prefix, originalData[:prefixLen]):
  96. return nil, nil, fmt.Errorf("provided data does not appear to be a protobuf message, expected prefix %v", s.prefix)
  97. case len(originalData) == prefixLen:
  98. // TODO: treat like decoding {} from JSON with defaulting
  99. return nil, nil, fmt.Errorf("empty body")
  100. }
  101. data := originalData[prefixLen:]
  102. unk := runtime.Unknown{}
  103. if err := unk.Unmarshal(data); err != nil {
  104. return nil, nil, err
  105. }
  106. actual := unk.GroupVersionKind()
  107. copyKindDefaults(&actual, gvk)
  108. if intoUnknown, ok := into.(*runtime.Unknown); ok && intoUnknown != nil {
  109. *intoUnknown = unk
  110. if ok, _, _ := s.RecognizesData(bytes.NewBuffer(unk.Raw)); ok {
  111. intoUnknown.ContentType = s.contentType
  112. }
  113. return intoUnknown, &actual, nil
  114. }
  115. if into != nil {
  116. types, _, err := s.typer.ObjectKinds(into)
  117. switch {
  118. case runtime.IsNotRegisteredError(err):
  119. pb, ok := into.(proto.Message)
  120. if !ok {
  121. return nil, &actual, errNotMarshalable{reflect.TypeOf(into)}
  122. }
  123. if err := proto.Unmarshal(unk.Raw, pb); err != nil {
  124. return nil, &actual, err
  125. }
  126. return into, &actual, nil
  127. case err != nil:
  128. return nil, &actual, err
  129. default:
  130. copyKindDefaults(&actual, &types[0])
  131. // if the result of defaulting did not set a version or group, ensure that at least group is set
  132. // (copyKindDefaults will not assign Group if version is already set). This guarantees that the group
  133. // of into is set if there is no better information from the caller or object.
  134. if len(actual.Version) == 0 && len(actual.Group) == 0 {
  135. actual.Group = types[0].Group
  136. }
  137. }
  138. }
  139. if len(actual.Kind) == 0 {
  140. return nil, &actual, runtime.NewMissingKindErr(fmt.Sprintf("%#v", unk.TypeMeta))
  141. }
  142. if len(actual.Version) == 0 {
  143. return nil, &actual, runtime.NewMissingVersionErr(fmt.Sprintf("%#v", unk.TypeMeta))
  144. }
  145. return unmarshalToObject(s.typer, s.creater, &actual, into, unk.Raw)
  146. }
  147. // Encode serializes the provided object to the given writer.
  148. func (s *Serializer) Encode(obj runtime.Object, w io.Writer) error {
  149. prefixSize := uint64(len(s.prefix))
  150. var unk runtime.Unknown
  151. switch t := obj.(type) {
  152. case *runtime.Unknown:
  153. estimatedSize := prefixSize + uint64(t.Size())
  154. data := make([]byte, estimatedSize)
  155. i, err := t.MarshalTo(data[prefixSize:])
  156. if err != nil {
  157. return err
  158. }
  159. copy(data, s.prefix)
  160. _, err = w.Write(data[:prefixSize+uint64(i)])
  161. return err
  162. default:
  163. kind := obj.GetObjectKind().GroupVersionKind()
  164. unk = runtime.Unknown{
  165. TypeMeta: runtime.TypeMeta{
  166. Kind: kind.Kind,
  167. APIVersion: kind.GroupVersion().String(),
  168. },
  169. }
  170. }
  171. switch t := obj.(type) {
  172. case bufferedMarshaller:
  173. // this path performs a single allocation during write but requires the caller to implement
  174. // the more efficient Size and MarshalTo methods
  175. encodedSize := uint64(t.Size())
  176. estimatedSize := prefixSize + estimateUnknownSize(&unk, encodedSize)
  177. data := make([]byte, estimatedSize)
  178. i, err := unk.NestedMarshalTo(data[prefixSize:], t, encodedSize)
  179. if err != nil {
  180. return err
  181. }
  182. copy(data, s.prefix)
  183. _, err = w.Write(data[:prefixSize+uint64(i)])
  184. return err
  185. case proto.Marshaler:
  186. // this path performs extra allocations
  187. data, err := t.Marshal()
  188. if err != nil {
  189. return err
  190. }
  191. unk.Raw = data
  192. estimatedSize := prefixSize + uint64(unk.Size())
  193. data = make([]byte, estimatedSize)
  194. i, err := unk.MarshalTo(data[prefixSize:])
  195. if err != nil {
  196. return err
  197. }
  198. copy(data, s.prefix)
  199. _, err = w.Write(data[:prefixSize+uint64(i)])
  200. return err
  201. default:
  202. // TODO: marshal with a different content type and serializer (JSON for third party objects)
  203. return errNotMarshalable{reflect.TypeOf(obj)}
  204. }
  205. }
  206. // RecognizesData implements the RecognizingDecoder interface.
  207. func (s *Serializer) RecognizesData(peek io.Reader) (bool, bool, error) {
  208. prefix := make([]byte, 4)
  209. n, err := peek.Read(prefix)
  210. if err != nil {
  211. if err == io.EOF {
  212. return false, false, nil
  213. }
  214. return false, false, err
  215. }
  216. if n != 4 {
  217. return false, false, nil
  218. }
  219. return bytes.Equal(s.prefix, prefix), false, nil
  220. }
  221. // copyKindDefaults defaults dst to the value in src if dst does not have a value set.
  222. func copyKindDefaults(dst, src *schema.GroupVersionKind) {
  223. if src == nil {
  224. return
  225. }
  226. // apply kind and version defaulting from provided default
  227. if len(dst.Kind) == 0 {
  228. dst.Kind = src.Kind
  229. }
  230. if len(dst.Version) == 0 && len(src.Version) > 0 {
  231. dst.Group = src.Group
  232. dst.Version = src.Version
  233. }
  234. }
  235. // bufferedMarshaller describes a more efficient marshalling interface that can avoid allocating multiple
  236. // byte buffers by pre-calculating the size of the final buffer needed.
  237. type bufferedMarshaller interface {
  238. proto.Sizer
  239. runtime.ProtobufMarshaller
  240. }
  241. // estimateUnknownSize returns the expected bytes consumed by a given runtime.Unknown
  242. // object with a nil RawJSON struct and the expected size of the provided buffer. The
  243. // returned size will not be correct if RawJSOn is set on unk.
  244. func estimateUnknownSize(unk *runtime.Unknown, byteSize uint64) uint64 {
  245. size := uint64(unk.Size())
  246. // protobuf uses 1 byte for the tag, a varint for the length of the array (at most 8 bytes - uint64 - here),
  247. // and the size of the array.
  248. size += 1 + 8 + byteSize
  249. return size
  250. }
  251. // NewRawSerializer creates a Protobuf serializer that handles encoding versioned objects into the proper wire form. If typer
  252. // is not nil, the object has the group, version, and kind fields set. This serializer does not provide type information for the
  253. // encoded object, and thus is not self describing (callers must know what type is being described in order to decode).
  254. //
  255. // This encoding scheme is experimental, and is subject to change at any time.
  256. func NewRawSerializer(creater runtime.ObjectCreater, typer runtime.ObjectTyper, defaultContentType string) *RawSerializer {
  257. return &RawSerializer{
  258. creater: creater,
  259. typer: typer,
  260. contentType: defaultContentType,
  261. }
  262. }
  263. // RawSerializer encodes and decodes objects without adding a runtime.Unknown wrapper (objects are encoded without identifying
  264. // type).
  265. type RawSerializer struct {
  266. creater runtime.ObjectCreater
  267. typer runtime.ObjectTyper
  268. contentType string
  269. }
  270. var _ runtime.Serializer = &RawSerializer{}
  271. // Decode attempts to convert the provided data into a protobuf message, extract the stored schema kind, apply the provided default
  272. // gvk, and then load that data into an object matching the desired schema kind or the provided into. If into is *runtime.Unknown,
  273. // the raw data will be extracted and no decoding will be performed. If into is not registered with the typer, then the object will
  274. // be straight decoded using normal protobuf unmarshalling (the MarshalTo interface). If into is provided and the original data is
  275. // not fully qualified with kind/version/group, the type of the into will be used to alter the returned gvk. On success or most
  276. // errors, the method will return the calculated schema kind.
  277. func (s *RawSerializer) Decode(originalData []byte, gvk *schema.GroupVersionKind, into runtime.Object) (runtime.Object, *schema.GroupVersionKind, error) {
  278. if into == nil {
  279. return nil, nil, fmt.Errorf("this serializer requires an object to decode into: %#v", s)
  280. }
  281. if versioned, ok := into.(*runtime.VersionedObjects); ok {
  282. into = versioned.Last()
  283. obj, actual, err := s.Decode(originalData, gvk, into)
  284. if err != nil {
  285. return nil, actual, err
  286. }
  287. if into != nil && into != obj {
  288. versioned.Objects = []runtime.Object{obj, into}
  289. } else {
  290. versioned.Objects = []runtime.Object{obj}
  291. }
  292. return versioned, actual, err
  293. }
  294. if len(originalData) == 0 {
  295. // TODO: treat like decoding {} from JSON with defaulting
  296. return nil, nil, fmt.Errorf("empty data")
  297. }
  298. data := originalData
  299. actual := &schema.GroupVersionKind{}
  300. copyKindDefaults(actual, gvk)
  301. if intoUnknown, ok := into.(*runtime.Unknown); ok && intoUnknown != nil {
  302. intoUnknown.Raw = data
  303. intoUnknown.ContentEncoding = ""
  304. intoUnknown.ContentType = s.contentType
  305. intoUnknown.SetGroupVersionKind(*actual)
  306. return intoUnknown, actual, nil
  307. }
  308. types, _, err := s.typer.ObjectKinds(into)
  309. switch {
  310. case runtime.IsNotRegisteredError(err):
  311. pb, ok := into.(proto.Message)
  312. if !ok {
  313. return nil, actual, errNotMarshalable{reflect.TypeOf(into)}
  314. }
  315. if err := proto.Unmarshal(data, pb); err != nil {
  316. return nil, actual, err
  317. }
  318. return into, actual, nil
  319. case err != nil:
  320. return nil, actual, err
  321. default:
  322. copyKindDefaults(actual, &types[0])
  323. // if the result of defaulting did not set a version or group, ensure that at least group is set
  324. // (copyKindDefaults will not assign Group if version is already set). This guarantees that the group
  325. // of into is set if there is no better information from the caller or object.
  326. if len(actual.Version) == 0 && len(actual.Group) == 0 {
  327. actual.Group = types[0].Group
  328. }
  329. }
  330. if len(actual.Kind) == 0 {
  331. return nil, actual, runtime.NewMissingKindErr("<protobuf encoded body - must provide default type>")
  332. }
  333. if len(actual.Version) == 0 {
  334. return nil, actual, runtime.NewMissingVersionErr("<protobuf encoded body - must provide default type>")
  335. }
  336. return unmarshalToObject(s.typer, s.creater, actual, into, data)
  337. }
  338. // unmarshalToObject is the common code between decode in the raw and normal serializer.
  339. func unmarshalToObject(typer runtime.ObjectTyper, creater runtime.ObjectCreater, actual *schema.GroupVersionKind, into runtime.Object, data []byte) (runtime.Object, *schema.GroupVersionKind, error) {
  340. // use the target if necessary
  341. obj, err := runtime.UseOrCreateObject(typer, creater, *actual, into)
  342. if err != nil {
  343. return nil, actual, err
  344. }
  345. pb, ok := obj.(proto.Message)
  346. if !ok {
  347. return nil, actual, errNotMarshalable{reflect.TypeOf(obj)}
  348. }
  349. if err := proto.Unmarshal(data, pb); err != nil {
  350. return nil, actual, err
  351. }
  352. return obj, actual, nil
  353. }
  354. // Encode serializes the provided object to the given writer. Overrides is ignored.
  355. func (s *RawSerializer) Encode(obj runtime.Object, w io.Writer) error {
  356. switch t := obj.(type) {
  357. case bufferedMarshaller:
  358. // this path performs a single allocation during write but requires the caller to implement
  359. // the more efficient Size and MarshalTo methods
  360. encodedSize := uint64(t.Size())
  361. data := make([]byte, encodedSize)
  362. n, err := t.MarshalTo(data)
  363. if err != nil {
  364. return err
  365. }
  366. _, err = w.Write(data[:n])
  367. return err
  368. case proto.Marshaler:
  369. // this path performs extra allocations
  370. data, err := t.Marshal()
  371. if err != nil {
  372. return err
  373. }
  374. _, err = w.Write(data)
  375. return err
  376. default:
  377. return errNotMarshalable{reflect.TypeOf(obj)}
  378. }
  379. }
  380. var LengthDelimitedFramer = lengthDelimitedFramer{}
  381. type lengthDelimitedFramer struct{}
  382. // NewFrameWriter implements stream framing for this serializer
  383. func (lengthDelimitedFramer) NewFrameWriter(w io.Writer) io.Writer {
  384. return framer.NewLengthDelimitedFrameWriter(w)
  385. }
  386. // NewFrameReader implements stream framing for this serializer
  387. func (lengthDelimitedFramer) NewFrameReader(r io.ReadCloser) io.ReadCloser {
  388. return framer.NewLengthDelimitedFrameReader(r)
  389. }