123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448 |
- /*
- Copyright 2015 The Kubernetes Authors.
- Licensed under the Apache License, Version 2.0 (the "License");
- you may not use this file except in compliance with the License.
- You may obtain a copy of the License at
- http://www.apache.org/licenses/LICENSE-2.0
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
- */
- package protobuf
- import (
- "bytes"
- "fmt"
- "io"
- "reflect"
- "github.com/gogo/protobuf/proto"
- "k8s.io/apimachinery/pkg/runtime"
- "k8s.io/apimachinery/pkg/runtime/schema"
- "k8s.io/apimachinery/pkg/runtime/serializer/recognizer"
- "k8s.io/apimachinery/pkg/util/framer"
- )
- var (
- // protoEncodingPrefix serves as a magic number for an encoded protobuf message on this serializer. All
- // proto messages serialized by this schema will be preceded by the bytes 0x6b 0x38 0x73, with the fourth
- // byte being reserved for the encoding style. The only encoding style defined is 0x00, which means that
- // the rest of the byte stream is a message of type k8s.io.kubernetes.pkg.runtime.Unknown (proto2).
- //
- // See k8s.io/apimachinery/pkg/runtime/generated.proto for details of the runtime.Unknown message.
- //
- // This encoding scheme is experimental, and is subject to change at any time.
- protoEncodingPrefix = []byte{0x6b, 0x38, 0x73, 0x00}
- )
- type errNotMarshalable struct {
- t reflect.Type
- }
- func (e errNotMarshalable) Error() string {
- return fmt.Sprintf("object %v does not implement the protobuf marshalling interface and cannot be encoded to a protobuf message", e.t)
- }
- func IsNotMarshalable(err error) bool {
- _, ok := err.(errNotMarshalable)
- return err != nil && ok
- }
- // NewSerializer creates a Protobuf serializer that handles encoding versioned objects into the proper wire form. If a typer
- // is passed, the encoded object will have group, version, and kind fields set. If typer is nil, the objects will be written
- // as-is (any type info passed with the object will be used).
- //
- // This encoding scheme is experimental, and is subject to change at any time.
- func NewSerializer(creater runtime.ObjectCreater, typer runtime.ObjectTyper, defaultContentType string) *Serializer {
- return &Serializer{
- prefix: protoEncodingPrefix,
- creater: creater,
- typer: typer,
- contentType: defaultContentType,
- }
- }
- type Serializer struct {
- prefix []byte
- creater runtime.ObjectCreater
- typer runtime.ObjectTyper
- contentType string
- }
- var _ runtime.Serializer = &Serializer{}
- var _ recognizer.RecognizingDecoder = &Serializer{}
- // Decode attempts to convert the provided data into a protobuf message, extract the stored schema kind, apply the provided default
- // gvk, and then load that data into an object matching the desired schema kind or the provided into. If into is *runtime.Unknown,
- // the raw data will be extracted and no decoding will be performed. If into is not registered with the typer, then the object will
- // be straight decoded using normal protobuf unmarshalling (the MarshalTo interface). If into is provided and the original data is
- // not fully qualified with kind/version/group, the type of the into will be used to alter the returned gvk. On success or most
- // errors, the method will return the calculated schema kind.
- func (s *Serializer) Decode(originalData []byte, gvk *schema.GroupVersionKind, into runtime.Object) (runtime.Object, *schema.GroupVersionKind, error) {
- if versioned, ok := into.(*runtime.VersionedObjects); ok {
- into = versioned.Last()
- obj, actual, err := s.Decode(originalData, gvk, into)
- if err != nil {
- return nil, actual, err
- }
- // the last item in versioned becomes into, so if versioned was not originally empty we reset the object
- // array so the first position is the decoded object and the second position is the outermost object.
- // if there were no objects in the versioned list passed to us, only add ourselves.
- if into != nil && into != obj {
- versioned.Objects = []runtime.Object{obj, into}
- } else {
- versioned.Objects = []runtime.Object{obj}
- }
- return versioned, actual, err
- }
- prefixLen := len(s.prefix)
- switch {
- case len(originalData) == 0:
- // TODO: treat like decoding {} from JSON with defaulting
- return nil, nil, fmt.Errorf("empty data")
- case len(originalData) < prefixLen || !bytes.Equal(s.prefix, originalData[:prefixLen]):
- return nil, nil, fmt.Errorf("provided data does not appear to be a protobuf message, expected prefix %v", s.prefix)
- case len(originalData) == prefixLen:
- // TODO: treat like decoding {} from JSON with defaulting
- return nil, nil, fmt.Errorf("empty body")
- }
- data := originalData[prefixLen:]
- unk := runtime.Unknown{}
- if err := unk.Unmarshal(data); err != nil {
- return nil, nil, err
- }
- actual := unk.GroupVersionKind()
- copyKindDefaults(&actual, gvk)
- if intoUnknown, ok := into.(*runtime.Unknown); ok && intoUnknown != nil {
- *intoUnknown = unk
- if ok, _, _ := s.RecognizesData(bytes.NewBuffer(unk.Raw)); ok {
- intoUnknown.ContentType = s.contentType
- }
- return intoUnknown, &actual, nil
- }
- if into != nil {
- types, _, err := s.typer.ObjectKinds(into)
- switch {
- case runtime.IsNotRegisteredError(err):
- pb, ok := into.(proto.Message)
- if !ok {
- return nil, &actual, errNotMarshalable{reflect.TypeOf(into)}
- }
- if err := proto.Unmarshal(unk.Raw, pb); err != nil {
- return nil, &actual, err
- }
- return into, &actual, nil
- case err != nil:
- return nil, &actual, err
- default:
- copyKindDefaults(&actual, &types[0])
- // if the result of defaulting did not set a version or group, ensure that at least group is set
- // (copyKindDefaults will not assign Group if version is already set). This guarantees that the group
- // of into is set if there is no better information from the caller or object.
- if len(actual.Version) == 0 && len(actual.Group) == 0 {
- actual.Group = types[0].Group
- }
- }
- }
- if len(actual.Kind) == 0 {
- return nil, &actual, runtime.NewMissingKindErr(fmt.Sprintf("%#v", unk.TypeMeta))
- }
- if len(actual.Version) == 0 {
- return nil, &actual, runtime.NewMissingVersionErr(fmt.Sprintf("%#v", unk.TypeMeta))
- }
- return unmarshalToObject(s.typer, s.creater, &actual, into, unk.Raw)
- }
- // Encode serializes the provided object to the given writer.
- func (s *Serializer) Encode(obj runtime.Object, w io.Writer) error {
- prefixSize := uint64(len(s.prefix))
- var unk runtime.Unknown
- switch t := obj.(type) {
- case *runtime.Unknown:
- estimatedSize := prefixSize + uint64(t.Size())
- data := make([]byte, estimatedSize)
- i, err := t.MarshalTo(data[prefixSize:])
- if err != nil {
- return err
- }
- copy(data, s.prefix)
- _, err = w.Write(data[:prefixSize+uint64(i)])
- return err
- default:
- kind := obj.GetObjectKind().GroupVersionKind()
- unk = runtime.Unknown{
- TypeMeta: runtime.TypeMeta{
- Kind: kind.Kind,
- APIVersion: kind.GroupVersion().String(),
- },
- }
- }
- switch t := obj.(type) {
- case bufferedMarshaller:
- // this path performs a single allocation during write but requires the caller to implement
- // the more efficient Size and MarshalTo methods
- encodedSize := uint64(t.Size())
- estimatedSize := prefixSize + estimateUnknownSize(&unk, encodedSize)
- data := make([]byte, estimatedSize)
- i, err := unk.NestedMarshalTo(data[prefixSize:], t, encodedSize)
- if err != nil {
- return err
- }
- copy(data, s.prefix)
- _, err = w.Write(data[:prefixSize+uint64(i)])
- return err
- case proto.Marshaler:
- // this path performs extra allocations
- data, err := t.Marshal()
- if err != nil {
- return err
- }
- unk.Raw = data
- estimatedSize := prefixSize + uint64(unk.Size())
- data = make([]byte, estimatedSize)
- i, err := unk.MarshalTo(data[prefixSize:])
- if err != nil {
- return err
- }
- copy(data, s.prefix)
- _, err = w.Write(data[:prefixSize+uint64(i)])
- return err
- default:
- // TODO: marshal with a different content type and serializer (JSON for third party objects)
- return errNotMarshalable{reflect.TypeOf(obj)}
- }
- }
- // RecognizesData implements the RecognizingDecoder interface.
- func (s *Serializer) RecognizesData(peek io.Reader) (bool, bool, error) {
- prefix := make([]byte, 4)
- n, err := peek.Read(prefix)
- if err != nil {
- if err == io.EOF {
- return false, false, nil
- }
- return false, false, err
- }
- if n != 4 {
- return false, false, nil
- }
- return bytes.Equal(s.prefix, prefix), false, nil
- }
- // copyKindDefaults defaults dst to the value in src if dst does not have a value set.
- func copyKindDefaults(dst, src *schema.GroupVersionKind) {
- if src == nil {
- return
- }
- // apply kind and version defaulting from provided default
- if len(dst.Kind) == 0 {
- dst.Kind = src.Kind
- }
- if len(dst.Version) == 0 && len(src.Version) > 0 {
- dst.Group = src.Group
- dst.Version = src.Version
- }
- }
- // bufferedMarshaller describes a more efficient marshalling interface that can avoid allocating multiple
- // byte buffers by pre-calculating the size of the final buffer needed.
- type bufferedMarshaller interface {
- proto.Sizer
- runtime.ProtobufMarshaller
- }
- // estimateUnknownSize returns the expected bytes consumed by a given runtime.Unknown
- // object with a nil RawJSON struct and the expected size of the provided buffer. The
- // returned size will not be correct if RawJSOn is set on unk.
- func estimateUnknownSize(unk *runtime.Unknown, byteSize uint64) uint64 {
- size := uint64(unk.Size())
- // protobuf uses 1 byte for the tag, a varint for the length of the array (at most 8 bytes - uint64 - here),
- // and the size of the array.
- size += 1 + 8 + byteSize
- return size
- }
- // NewRawSerializer creates a Protobuf serializer that handles encoding versioned objects into the proper wire form. If typer
- // is not nil, the object has the group, version, and kind fields set. This serializer does not provide type information for the
- // encoded object, and thus is not self describing (callers must know what type is being described in order to decode).
- //
- // This encoding scheme is experimental, and is subject to change at any time.
- func NewRawSerializer(creater runtime.ObjectCreater, typer runtime.ObjectTyper, defaultContentType string) *RawSerializer {
- return &RawSerializer{
- creater: creater,
- typer: typer,
- contentType: defaultContentType,
- }
- }
- // RawSerializer encodes and decodes objects without adding a runtime.Unknown wrapper (objects are encoded without identifying
- // type).
- type RawSerializer struct {
- creater runtime.ObjectCreater
- typer runtime.ObjectTyper
- contentType string
- }
- var _ runtime.Serializer = &RawSerializer{}
- // Decode attempts to convert the provided data into a protobuf message, extract the stored schema kind, apply the provided default
- // gvk, and then load that data into an object matching the desired schema kind or the provided into. If into is *runtime.Unknown,
- // the raw data will be extracted and no decoding will be performed. If into is not registered with the typer, then the object will
- // be straight decoded using normal protobuf unmarshalling (the MarshalTo interface). If into is provided and the original data is
- // not fully qualified with kind/version/group, the type of the into will be used to alter the returned gvk. On success or most
- // errors, the method will return the calculated schema kind.
- func (s *RawSerializer) Decode(originalData []byte, gvk *schema.GroupVersionKind, into runtime.Object) (runtime.Object, *schema.GroupVersionKind, error) {
- if into == nil {
- return nil, nil, fmt.Errorf("this serializer requires an object to decode into: %#v", s)
- }
- if versioned, ok := into.(*runtime.VersionedObjects); ok {
- into = versioned.Last()
- obj, actual, err := s.Decode(originalData, gvk, into)
- if err != nil {
- return nil, actual, err
- }
- if into != nil && into != obj {
- versioned.Objects = []runtime.Object{obj, into}
- } else {
- versioned.Objects = []runtime.Object{obj}
- }
- return versioned, actual, err
- }
- if len(originalData) == 0 {
- // TODO: treat like decoding {} from JSON with defaulting
- return nil, nil, fmt.Errorf("empty data")
- }
- data := originalData
- actual := &schema.GroupVersionKind{}
- copyKindDefaults(actual, gvk)
- if intoUnknown, ok := into.(*runtime.Unknown); ok && intoUnknown != nil {
- intoUnknown.Raw = data
- intoUnknown.ContentEncoding = ""
- intoUnknown.ContentType = s.contentType
- intoUnknown.SetGroupVersionKind(*actual)
- return intoUnknown, actual, nil
- }
- types, _, err := s.typer.ObjectKinds(into)
- switch {
- case runtime.IsNotRegisteredError(err):
- pb, ok := into.(proto.Message)
- if !ok {
- return nil, actual, errNotMarshalable{reflect.TypeOf(into)}
- }
- if err := proto.Unmarshal(data, pb); err != nil {
- return nil, actual, err
- }
- return into, actual, nil
- case err != nil:
- return nil, actual, err
- default:
- copyKindDefaults(actual, &types[0])
- // if the result of defaulting did not set a version or group, ensure that at least group is set
- // (copyKindDefaults will not assign Group if version is already set). This guarantees that the group
- // of into is set if there is no better information from the caller or object.
- if len(actual.Version) == 0 && len(actual.Group) == 0 {
- actual.Group = types[0].Group
- }
- }
- if len(actual.Kind) == 0 {
- return nil, actual, runtime.NewMissingKindErr("<protobuf encoded body - must provide default type>")
- }
- if len(actual.Version) == 0 {
- return nil, actual, runtime.NewMissingVersionErr("<protobuf encoded body - must provide default type>")
- }
- return unmarshalToObject(s.typer, s.creater, actual, into, data)
- }
- // unmarshalToObject is the common code between decode in the raw and normal serializer.
- func unmarshalToObject(typer runtime.ObjectTyper, creater runtime.ObjectCreater, actual *schema.GroupVersionKind, into runtime.Object, data []byte) (runtime.Object, *schema.GroupVersionKind, error) {
- // use the target if necessary
- obj, err := runtime.UseOrCreateObject(typer, creater, *actual, into)
- if err != nil {
- return nil, actual, err
- }
- pb, ok := obj.(proto.Message)
- if !ok {
- return nil, actual, errNotMarshalable{reflect.TypeOf(obj)}
- }
- if err := proto.Unmarshal(data, pb); err != nil {
- return nil, actual, err
- }
- return obj, actual, nil
- }
- // Encode serializes the provided object to the given writer. Overrides is ignored.
- func (s *RawSerializer) Encode(obj runtime.Object, w io.Writer) error {
- switch t := obj.(type) {
- case bufferedMarshaller:
- // this path performs a single allocation during write but requires the caller to implement
- // the more efficient Size and MarshalTo methods
- encodedSize := uint64(t.Size())
- data := make([]byte, encodedSize)
- n, err := t.MarshalTo(data)
- if err != nil {
- return err
- }
- _, err = w.Write(data[:n])
- return err
- case proto.Marshaler:
- // this path performs extra allocations
- data, err := t.Marshal()
- if err != nil {
- return err
- }
- _, err = w.Write(data)
- return err
- default:
- return errNotMarshalable{reflect.TypeOf(obj)}
- }
- }
- var LengthDelimitedFramer = lengthDelimitedFramer{}
- type lengthDelimitedFramer struct{}
- // NewFrameWriter implements stream framing for this serializer
- func (lengthDelimitedFramer) NewFrameWriter(w io.Writer) io.Writer {
- return framer.NewLengthDelimitedFrameWriter(w)
- }
- // NewFrameReader implements stream framing for this serializer
- func (lengthDelimitedFramer) NewFrameReader(r io.ReadCloser) io.ReadCloser {
- return framer.NewLengthDelimitedFrameReader(r)
- }
|