diff --git a/pkg/storage/etcd3/event.go b/pkg/storage/etcd3/event.go new file mode 100644 index 0000000000000..58072bd7b4f23 --- /dev/null +++ b/pkg/storage/etcd3/event.go @@ -0,0 +1,50 @@ +/* +Copyright 2016 The Kubernetes Authors All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package etcd3 + +import ( + "github.com/coreos/etcd/clientv3" + "github.com/coreos/etcd/storage/storagepb" +) + +type event struct { + key string + value []byte + rev int64 + isDeleted bool + isCreated bool +} + +func parseKV(kv *storagepb.KeyValue) *event { + return &event{ + key: string(kv.Key), + value: kv.Value, + rev: kv.ModRevision, + isDeleted: false, + isCreated: kv.ModRevision == kv.CreateRevision, + } +} + +func parseEvent(e *clientv3.Event) *event { + return &event{ + key: string(e.Kv.Key), + value: e.Kv.Value, + rev: e.Kv.ModRevision, + isDeleted: e.Type == clientv3.EventTypeDelete, + isCreated: e.IsCreate(), + } +} diff --git a/pkg/storage/etcd3/store.go b/pkg/storage/etcd3/store.go index c9b420197a7e0..46f10ad892cdc 100644 --- a/pkg/storage/etcd3/store.go +++ b/pkg/storage/etcd3/store.go @@ -42,6 +42,7 @@ type store struct { codec runtime.Codec versioner storage.Versioner pathPrefix string + watcher *watcher } type elemForDecode struct { @@ -57,11 +58,13 @@ type objState struct { } func newStore(c *clientv3.Client, codec runtime.Codec, prefix string) *store { + versioner := etcd.APIObjectVersioner{} return &store{ client: c, - versioner: etcd.APIObjectVersioner{}, + versioner: versioner, codec: codec, pathPrefix: prefix, + watcher: newWatcher(c, codec, versioner), } } @@ -315,12 +318,21 @@ func (s *store) List(ctx context.Context, key, resourceVersion string, filter st // Watch implements storage.Interface.Watch. func (s *store) Watch(ctx context.Context, key string, resourceVersion string, filter storage.FilterFunc) (watch.Interface, error) { - panic("TODO: unimplemented") + return s.watch(ctx, key, resourceVersion, filter, false) } // WatchList implements storage.Interface.WatchList. func (s *store) WatchList(ctx context.Context, key string, resourceVersion string, filter storage.FilterFunc) (watch.Interface, error) { - panic("TODO: unimplemented") + return s.watch(ctx, key, resourceVersion, filter, true) +} + +func (s *store) watch(ctx context.Context, key string, rv string, filter storage.FilterFunc, recursive bool) (watch.Interface, error) { + rev, err := storage.ParseWatchResourceVersion(rv) + if err != nil { + return nil, err + } + key = keyWithPrefix(s.pathPrefix, key) + return s.watcher.Watch(ctx, key, int64(rev), recursive, filter) } func (s *store) getState(getResp *clientv3.GetResponse, key string, v reflect.Value, ignoreNotFound bool) (*objState, error) { diff --git a/pkg/storage/etcd3/watcher.go b/pkg/storage/etcd3/watcher.go new file mode 100644 index 0000000000000..4473602e799b9 --- /dev/null +++ b/pkg/storage/etcd3/watcher.go @@ -0,0 +1,327 @@ +/* +Copyright 2016 The Kubernetes Authors All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package etcd3 + +import ( + "fmt" + "net/http" + "strings" + "sync" + + "k8s.io/kubernetes/pkg/api/unversioned" + "k8s.io/kubernetes/pkg/runtime" + "k8s.io/kubernetes/pkg/storage" + "k8s.io/kubernetes/pkg/watch" + + "github.com/coreos/etcd/clientv3" + etcdrpc "github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes" + "github.com/golang/glog" + "golang.org/x/net/context" +) + +const ( + // We have set a buffer in order to reduce times of context switches. + incomingBufSize = 100 + outgoingBufSize = 100 +) + +type watcher struct { + client *clientv3.Client + codec runtime.Codec + versioner storage.Versioner +} + +// watchChan implements watch.Interface. +type watchChan struct { + watcher *watcher + key string + initialRev int64 + recursive bool + filter storage.FilterFunc + ctx context.Context + cancel context.CancelFunc + incomingEventChan chan *event + resultChan chan watch.Event + errChan chan error +} + +func newWatcher(client *clientv3.Client, codec runtime.Codec, versioner storage.Versioner) *watcher { + return &watcher{ + client: client, + codec: codec, + versioner: versioner, + } +} + +// Watch watches on a key and returns a watch.Interface that transfers relevant notifications. +// If rev is zero, it will return the existing object(s) and then start watching from +// the maximum revision+1 from returned objects. +// If rev is non-zero, it will watch events happened after given revision. +// If recursive is false, it watches on given key. +// If recursive is true, it watches any children and directories under the key, excluding the root key itself. +// filter must be non-nil. Only if filter returns true will the changes be returned. +func (w *watcher) Watch(ctx context.Context, key string, rev int64, recursive bool, filter storage.FilterFunc) (watch.Interface, error) { + if recursive && !strings.HasSuffix(key, "/") { + key += "/" + } + wc := &watchChan{ + watcher: w, + key: key, + initialRev: rev, + recursive: recursive, + filter: filter, + incomingEventChan: make(chan *event, incomingBufSize), + resultChan: make(chan watch.Event, outgoingBufSize), + errChan: make(chan error, 1), + } + wc.ctx, wc.cancel = context.WithCancel(ctx) + go wc.run() + return wc, nil +} + +func (wc *watchChan) run() { + go wc.startWatching() + + var resultChanWG sync.WaitGroup + resultChanWG.Add(1) + go wc.processEvent(&resultChanWG) + + select { + case err := <-wc.errChan: + errResult := parseError(err) + wc.cancel() + // error result is guaranteed to be received by user before closing ResultChan. + if errResult != nil { + wc.resultChan <- *errResult + } + case <-wc.ctx.Done(): + } + // we need to wait until resultChan wouldn't be sent to anymore + resultChanWG.Wait() + close(wc.resultChan) +} + +func (wc *watchChan) Stop() { + wc.cancel() +} + +func (wc *watchChan) ResultChan() <-chan watch.Event { + return wc.resultChan +} + +// sync tries to retrieve existing data and send them to process. +// The revision to watch will be set to the revision in response. +func (wc *watchChan) sync() error { + opts := []clientv3.OpOption{} + if wc.recursive { + opts = append(opts, clientv3.WithPrefix()) + } + getResp, err := wc.watcher.client.Get(wc.ctx, wc.key, opts...) + if err != nil { + return err + } + wc.initialRev = getResp.Header.Revision + + for _, kv := range getResp.Kvs { + wc.sendEvent(parseKV(kv)) + } + return nil +} + +// startWatching does: +// - get current objects if initialRev=0; set initialRev to current rev +// - watch on given key and send events to process. +func (wc *watchChan) startWatching() { + if wc.initialRev == 0 { + if err := wc.sync(); err != nil { + wc.sendError(err) + return + } + } + opts := []clientv3.OpOption{clientv3.WithRev(wc.initialRev + 1)} + if wc.recursive { + opts = append(opts, clientv3.WithPrefix()) + } + wch := wc.watcher.client.Watch(wc.ctx, wc.key, opts...) + for wres := range wch { + if wres.Err() != nil { + // If there is an error on server (e.g. compaction), the channel will return it before closed. + wc.sendError(wres.Err()) + return + } + for _, e := range wres.Events { + wc.sendEvent(parseEvent(e)) + } + } +} + +// processEvent processes events from etcd watcher and sends results to resultChan. +func (wc *watchChan) processEvent(wg *sync.WaitGroup) { + defer wg.Done() + + for { + select { + case e := <-wc.incomingEventChan: + res := wc.transform(e) + if res == nil { + continue + } + // If user couldn't receive results fast enough, we also block incoming events from watcher. + // Because storing events in local will cause more memory usage. + // The worst case would be closing the fast watcher. + select { + case wc.resultChan <- *res: + case <-wc.ctx.Done(): + return + } + case <-wc.ctx.Done(): + return + } + } +} + +// transform transforms an event into a result for user if not filtered. +// TODO (Optimization): +// - Save remote round-trip. +// Currently, DELETE and PUT event don't contain the previous value. +// We need to do another Get() in order to get previous object and have logic upon it. +// We could potentially do some optimizations: +// - For PUT, we can save current and previous objects into the value. +// - For DELETE, See https://github.com/coreos/etcd/issues/4620 +func (wc *watchChan) transform(e *event) (res *watch.Event) { + curObj, oldObj, err := prepareObjs(wc.ctx, e, wc.watcher.client, wc.watcher.codec, wc.watcher.versioner) + if err != nil { + wc.sendError(err) + return nil + } + + switch { + case e.isDeleted: + if !wc.filter(oldObj) { + return nil + } + res = &watch.Event{ + Type: watch.Deleted, + Object: oldObj, + } + case e.isCreated: + if !wc.filter(curObj) { + return nil + } + res = &watch.Event{ + Type: watch.Added, + Object: curObj, + } + default: + curObjPasses := wc.filter(curObj) + oldObjPasses := wc.filter(oldObj) + switch { + case curObjPasses && oldObjPasses: + res = &watch.Event{ + Type: watch.Modified, + Object: curObj, + } + case curObjPasses && !oldObjPasses: + res = &watch.Event{ + Type: watch.Added, + Object: curObj, + } + case !curObjPasses && oldObjPasses: + res = &watch.Event{ + Type: watch.Deleted, + Object: oldObj, + } + } + } + return res +} + +func parseError(err error) *watch.Event { + var status *unversioned.Status + switch { + case err == etcdrpc.ErrCompacted: + status = &unversioned.Status{ + Status: unversioned.StatusFailure, + Message: err.Error(), + Code: http.StatusGone, + Reason: unversioned.StatusReasonExpired, + } + default: + status = &unversioned.Status{ + Status: unversioned.StatusFailure, + Message: err.Error(), + Code: http.StatusInternalServerError, + Reason: unversioned.StatusReasonInternalError, + } + } + + return &watch.Event{ + Type: watch.Error, + Object: status, + } +} + +func (wc *watchChan) sendError(err error) { + select { + case wc.errChan <- err: + case <-wc.ctx.Done(): + } +} + +func (wc *watchChan) sendEvent(e *event) { + if len(wc.incomingEventChan) == incomingBufSize { + glog.V(2).Infof("Fast watcher, slow processing. Number of buffered events: %d."+ + "Probably caused by slow decoding, user not receiving fast, or other processing logic", + incomingBufSize) + } + select { + case wc.incomingEventChan <- e: + case <-wc.ctx.Done(): + } +} + +func prepareObjs(ctx context.Context, e *event, client *clientv3.Client, codec runtime.Codec, versioner storage.Versioner) (curObj runtime.Object, oldObj runtime.Object, err error) { + if !e.isDeleted { + curObj, err = decodeObj(codec, versioner, e.value, e.rev) + if err != nil { + return nil, nil, err + } + } + if e.isDeleted || !e.isCreated { + getResp, err := client.Get(ctx, e.key, clientv3.WithRev(e.rev-1)) + if err != nil { + return nil, nil, err + } + oldObj, err = decodeObj(codec, versioner, getResp.Kvs[0].Value, getResp.Kvs[0].ModRevision) + if err != nil { + return nil, nil, err + } + } + return curObj, oldObj, nil +} + +func decodeObj(codec runtime.Codec, versioner storage.Versioner, data []byte, rev int64) (runtime.Object, error) { + obj, err := runtime.Decode(codec, []byte(data)) + if err != nil { + return nil, err + } + // ensure resource version is set on the object we load from etcd + if err := versioner.UpdateObject(obj, uint64(rev)); err != nil { + return nil, fmt.Errorf("failure to version api object (%d) %#v: %v", rev, obj, err) + } + return obj, nil +} diff --git a/pkg/storage/etcd3/watcher_test.go b/pkg/storage/etcd3/watcher_test.go new file mode 100644 index 0000000000000..315d22a0ad7e4 --- /dev/null +++ b/pkg/storage/etcd3/watcher_test.go @@ -0,0 +1,209 @@ +/* +Copyright 2016 The Kubernetes Authors All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package etcd3 + +import ( + "errors" + "reflect" + "testing" + "time" + + "github.com/coreos/etcd/integration" + "golang.org/x/net/context" + "k8s.io/kubernetes/pkg/api" + "k8s.io/kubernetes/pkg/api/testapi" + "k8s.io/kubernetes/pkg/api/unversioned" + "k8s.io/kubernetes/pkg/runtime" + "k8s.io/kubernetes/pkg/storage" + "k8s.io/kubernetes/pkg/util/wait" + "k8s.io/kubernetes/pkg/watch" +) + +func TestWatch(t *testing.T) { + testWatch(t, false) +} + +func TestWatchList(t *testing.T) { + testWatch(t, true) +} + +// It tests that +// - first occurrence of objects should notify Add event +// - +func testWatch(t *testing.T, recursive bool) { + ctx, store, cluster := testSetup(t) + defer cluster.Terminate(t) + + podFoo := &api.Pod{ObjectMeta: api.ObjectMeta{Name: "foo"}} + podBar := &api.Pod{ObjectMeta: api.ObjectMeta{Name: "bar"}} + + tests := []struct { + key string + filter storage.FilterFunc + watchTests []*testWatchStruct + }{{ // create a key + key: "/somekey-1", + watchTests: []*testWatchStruct{{podFoo, true, watch.Added}}, + filter: storage.Everything, + }, { // create a key but obj gets filtered + key: "/somekey-2", + watchTests: []*testWatchStruct{{podFoo, false, ""}}, + filter: func(runtime.Object) bool { return false }, + }, { // create a key but obj gets filtered. Then update it with unfiltered obj + key: "/somekey-3", + watchTests: []*testWatchStruct{{podFoo, false, ""}, {podBar, true, watch.Added}}, + filter: func(obj runtime.Object) bool { + pod := obj.(*api.Pod) + return pod.Name == "bar" + }, + }, { // update + key: "/somekey-4", + watchTests: []*testWatchStruct{{podFoo, true, watch.Added}, {podBar, true, watch.Modified}}, + filter: storage.Everything, + }, { // delete because of being filtered + key: "/somekey-5", + watchTests: []*testWatchStruct{{podFoo, true, watch.Added}, {podBar, true, watch.Deleted}}, + filter: func(obj runtime.Object) bool { + pod := obj.(*api.Pod) + return pod.Name != "bar" + }, + }} + for i, tt := range tests { + w, err := store.watch(ctx, tt.key, "0", tt.filter, recursive) + if err != nil { + t.Fatalf("Watch failed: %v", err) + } + for _, watchTest := range tt.watchTests { + out := &api.Pod{} + key := tt.key + if recursive { + key = key + "/item" + } + err := store.GuaranteedUpdate(ctx, key, out, true, nil, storage.SimpleUpdate( + func(runtime.Object) (runtime.Object, error) { + return watchTest.obj, nil + })) + if err != nil { + t.Fatalf("GuaranteedUpdate failed: %v", err) + } + if watchTest.expectEvent { + testCheckResult(t, i, watchTest.watchType, w, nil) + } + } + w.Stop() + testCheckStop(t, i, w) + } +} + +func TestDeleteTriggerWatch(t *testing.T) { + ctx, store, cluster := testSetup(t) + defer cluster.Terminate(t) + key, storedObj := testPropogateStore(t, store, ctx, &api.Pod{ObjectMeta: api.ObjectMeta{Name: "foo"}}) + w, err := store.Watch(ctx, key, storedObj.ResourceVersion, storage.Everything) + if err != nil { + t.Fatalf("Watch failed: %v", err) + } + if err := store.Delete(ctx, key, &api.Pod{}, nil); err != nil { + t.Fatalf("Delete failed: %v", err) + } + testCheckResult(t, 0, watch.Deleted, w, storedObj) +} + +// TestWatchSync tests that +// - watch from 0 should sync up and grab the object added before +// - watch from non-0 should just watch changes after given version +func TestWatchFromZeroAndNoneZero(t *testing.T) { + ctx, store, cluster := testSetup(t) + defer cluster.Terminate(t) + key, storedObj := testPropogateStore(t, store, ctx, &api.Pod{ObjectMeta: api.ObjectMeta{Name: "foo"}}) + + w, err := store.Watch(ctx, key, "0", storage.Everything) + if err != nil { + t.Fatalf("Watch failed: %v", err) + } + testCheckResult(t, 0, watch.Added, w, storedObj) + w.Stop() + testCheckStop(t, 0, w) + + w, err = store.Watch(ctx, key, storedObj.ResourceVersion, storage.Everything) + if err != nil { + t.Fatalf("Watch failed: %v", err) + } + out := &api.Pod{} + store.GuaranteedUpdate(ctx, key, out, true, nil, storage.SimpleUpdate( + func(runtime.Object) (runtime.Object, error) { + return &api.Pod{ObjectMeta: api.ObjectMeta{Name: "bar"}}, err + })) + testCheckResult(t, 0, watch.Modified, w, out) +} + +func TestWatchError(t *testing.T) { + cluster := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1}) + defer cluster.Terminate(t) + invalidStore := newStore(cluster.RandClient(), &testCodec{testapi.Default.Codec()}, "") + ctx := context.Background() + w, err := invalidStore.Watch(ctx, "/abc", "0", storage.Everything) + if err != nil { + t.Fatalf("Watch failed: %v", err) + } + validStore := newStore(cluster.RandClient(), testapi.Default.Codec(), "") + validStore.GuaranteedUpdate(ctx, "/abc", &api.Pod{}, true, nil, storage.SimpleUpdate( + func(runtime.Object) (runtime.Object, error) { + return &api.Pod{ObjectMeta: api.ObjectMeta{Name: "foo"}}, nil + })) + testCheckResult(t, 0, watch.Error, w, nil) +} + +type testWatchStruct struct { + obj *api.Pod + expectEvent bool + watchType watch.EventType +} + +type testCodec struct { + runtime.Codec +} + +func (c *testCodec) Decode(data []byte, defaults *unversioned.GroupVersionKind, into runtime.Object) (runtime.Object, *unversioned.GroupVersionKind, error) { + return nil, nil, errors.New("Expected decoding failure") +} + +func testCheckResult(t *testing.T, i int, expectEventType watch.EventType, w watch.Interface, expectObj *api.Pod) { + select { + case res := <-w.ResultChan(): + if res.Type != expectEventType { + t.Errorf("#%d: event type want=%v, get=%v", i, expectEventType, res.Type) + return + } + if expectObj != nil && !reflect.DeepEqual(expectObj, res.Object) { + t.Errorf("#%d: obj want=\n%#v\nget=\n%#v", i, expectObj, res.Object) + } + case <-time.After(wait.ForeverTestTimeout): + t.Errorf("#%d: time out after waiting %v on ResultChan", i, wait.ForeverTestTimeout) + } +} + +func testCheckStop(t *testing.T, i int, w watch.Interface) { + select { + case _, ok := <-w.ResultChan(): + if ok { + t.Errorf("#%d: ResultChan should have been closed", i) + } + case <-time.After(wait.ForeverTestTimeout): + t.Errorf("#%d: time out after waiting 1s on ResultChan", i) + } +}