Skip to content

Commit

Permalink
Merge pull request #2220 from ericrrath/crd-source-event-handler
Browse files Browse the repository at this point in the history
CRD source: add event-handler support
  • Loading branch information
k8s-ci-robot authored Apr 14, 2023
2 parents e48ec6b + 929e618 commit e6ec8ea
Show file tree
Hide file tree
Showing 4 changed files with 63 additions and 5 deletions.
1 change: 1 addition & 0 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,7 @@ func main() {
RequestTimeout: cfg.RequestTimeout,
DefaultTargets: cfg.DefaultTargets,
OCPRouterName: cfg.OCPRouterName,
UpdateEvents: cfg.UpdateEvents,
}

// Lookup all the selected sources by names and pass them the desired configuration.
Expand Down
57 changes: 54 additions & 3 deletions source/crd.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,10 @@ import (
"os"
"strings"

"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/tools/cache"

log "github.com/sirupsen/logrus"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
Expand All @@ -44,6 +48,7 @@ type crdSource struct {
codec runtime.ParameterCodec
annotationFilter string
labelSelector labels.Selector
informer *cache.SharedInformer
}

func addKnownTypes(scheme *runtime.Scheme, groupVersion schema.GroupVersion) error {
Expand Down Expand Up @@ -103,18 +108,55 @@ func NewCRDClientForAPIVersionKind(client kubernetes.Interface, kubeConfig, apiS
}

// NewCRDSource creates a new crdSource with the given config.
func NewCRDSource(crdClient rest.Interface, namespace, kind string, annotationFilter string, labelSelector labels.Selector, scheme *runtime.Scheme) (Source, error) {
return &crdSource{
func NewCRDSource(crdClient rest.Interface, namespace, kind string, annotationFilter string, labelSelector labels.Selector, scheme *runtime.Scheme, startInformer bool) (Source, error) {
sourceCrd := crdSource{
crdResource: strings.ToLower(kind) + "s",
namespace: namespace,
annotationFilter: annotationFilter,
labelSelector: labelSelector,
crdClient: crdClient,
codec: runtime.NewParameterCodec(scheme),
}, nil
}
if startInformer {
// external-dns already runs its sync-handler periodically (controlled by `--interval` flag) to ensure any
// missed or dropped events are handled. specify a resync period 0 to avoid unnecessary sync handler invocations.
informer := cache.NewSharedInformer(
&cache.ListWatch{
ListFunc: func(lo metav1.ListOptions) (result runtime.Object, err error) {
return sourceCrd.List(context.TODO(), &lo)
},
WatchFunc: func(lo metav1.ListOptions) (watch.Interface, error) {
return sourceCrd.watch(context.TODO(), &lo)
},
},
&endpoint.DNSEndpoint{},
0)
sourceCrd.informer = &informer
go informer.Run(wait.NeverStop)
}
return &sourceCrd, nil
}

func (cs *crdSource) AddEventHandler(ctx context.Context, handler func()) {
if cs.informer != nil {
log.Debug("Adding event handler for CRD")
// Right now there is no way to remove event handler from informer, see:
// https://github.com/kubernetes/kubernetes/issues/79610
informer := *cs.informer
informer.AddEventHandler(
cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
handler()
},
UpdateFunc: func(old interface{}, new interface{}) {
handler()
},
DeleteFunc: func(obj interface{}) {
handler()
},
},
)
}
}

// Endpoints returns endpoint objects.
Expand Down Expand Up @@ -189,6 +231,15 @@ func (cs *crdSource) setResourceLabel(crd *endpoint.DNSEndpoint, endpoints []*en
}
}

func (cs *crdSource) watch(ctx context.Context, opts *metav1.ListOptions) (watch.Interface, error) {
opts.Watch = true
return cs.crdClient.Get().
Namespace(cs.namespace).
Resource(cs.crdResource).
VersionedParams(opts, cs.codec).
Watch(ctx)
}

func (cs *crdSource) List(ctx context.Context, opts *metav1.ListOptions) (result *endpoint.DNSEndpointList, err error) {
result = &endpoint.DNSEndpointList{}
err = cs.crdClient.Get().
Expand Down
7 changes: 6 additions & 1 deletion source/crd_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -398,7 +398,12 @@ func testCRDSourceEndpoints(t *testing.T) {
labelSelector, err := labels.Parse(ti.labelFilter)
require.NoError(t, err)

cs, err := NewCRDSource(restClient, ti.namespace, ti.kind, ti.annotationFilter, labelSelector, scheme)
// At present, client-go's fake.RESTClient (used by crd_test.go) is known to cause race conditions when used
// with informers: https://github.com/kubernetes/kubernetes/issues/95372
// So don't start the informer during testing.
startInformer := false

cs, err := NewCRDSource(restClient, ti.namespace, ti.kind, ti.annotationFilter, labelSelector, scheme, startInformer)
require.NoError(t, err)

receivedEndpoints, err := cs.Endpoints(context.Background())
Expand Down
3 changes: 2 additions & 1 deletion source/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ type Config struct {
RequestTimeout time.Duration
DefaultTargets []string
OCPRouterName string
UpdateEvents bool
}

// ClientGenerator provides clients
Expand Down Expand Up @@ -308,7 +309,7 @@ func BuildWithConfig(ctx context.Context, source string, p ClientGenerator, cfg
if err != nil {
return nil, err
}
return NewCRDSource(crdClient, cfg.Namespace, cfg.CRDSourceKind, cfg.AnnotationFilter, cfg.LabelFilter, scheme)
return NewCRDSource(crdClient, cfg.Namespace, cfg.CRDSourceKind, cfg.AnnotationFilter, cfg.LabelFilter, scheme, cfg.UpdateEvents)
case "skipper-routegroup":
apiServerURL := cfg.APIServerURL
tokenPath := ""
Expand Down

0 comments on commit e6ec8ea

Please sign in to comment.