123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687 |
- /*
- Copyright 2016 The Kubernetes Authors.
- Licensed under the Apache License, Version 2.0 (the "License");
- you may not use this file except in compliance with the License.
- You may obtain a copy of the License at
- http://www.apache.org/licenses/LICENSE-2.0
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
- */
- package watch
- import (
- "errors"
- "time"
- "k8s.io/apimachinery/pkg/util/wait"
- )
- // ConditionFunc returns true if the condition has been reached, false if it has not been reached yet,
- // or an error if the condition cannot be checked and should terminate. In general, it is better to define
- // level driven conditions over edge driven conditions (pod has ready=true, vs pod modified and ready changed
- // from false to true).
- type ConditionFunc func(event Event) (bool, error)
- // ErrWatchClosed is returned when the watch channel is closed before timeout in Until.
- var ErrWatchClosed = errors.New("watch closed before Until timeout")
- // Until reads items from the watch until each provided condition succeeds, and then returns the last watch
- // encountered. The first condition that returns an error terminates the watch (and the event is also returned).
- // If no event has been received, the returned event will be nil.
- // Conditions are satisfied sequentially so as to provide a useful primitive for higher level composition.
- // A zero timeout means to wait forever.
- func Until(timeout time.Duration, watcher Interface, conditions ...ConditionFunc) (*Event, error) {
- ch := watcher.ResultChan()
- defer watcher.Stop()
- var after <-chan time.Time
- if timeout > 0 {
- after = time.After(timeout)
- } else {
- ch := make(chan time.Time)
- defer close(ch)
- after = ch
- }
- var lastEvent *Event
- for _, condition := range conditions {
- // check the next condition against the previous event and short circuit waiting for the next watch
- if lastEvent != nil {
- done, err := condition(*lastEvent)
- if err != nil {
- return lastEvent, err
- }
- if done {
- continue
- }
- }
- ConditionSucceeded:
- for {
- select {
- case event, ok := <-ch:
- if !ok {
- return lastEvent, ErrWatchClosed
- }
- lastEvent = &event
- // TODO: check for watch expired error and retry watch from latest point?
- done, err := condition(event)
- if err != nil {
- return lastEvent, err
- }
- if done {
- break ConditionSucceeded
- }
- case <-after:
- return lastEvent, wait.ErrWaitTimeout
- }
- }
- }
- return lastEvent, nil
- }
|