Skip to content

Commit

Permalink
Pass stop channel to informer factory instances
Browse files Browse the repository at this point in the history
  • Loading branch information
alebedev87 committed Jan 22, 2022
1 parent 98393df commit 4f41229
Show file tree
Hide file tree
Showing 22 changed files with 79 additions and 61 deletions.
2 changes: 1 addition & 1 deletion main.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ func main() {
}

// Lookup all the selected sources by names and pass them the desired configuration.
sources, err := source.ByNames(&source.SingletonClientGenerator{
sources, err := source.ByNames(ctx, &source.SingletonClientGenerator{
KubeConfig: cfg.KubeConfig,
APIServerURL: cfg.APIServerURL,
// If update events are enabled, disable timeout.
Expand Down
5 changes: 2 additions & 3 deletions source/ambassador_host.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ import (
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/dynamic"
"k8s.io/client-go/dynamic/dynamicinformer"
"k8s.io/client-go/informers"
Expand Down Expand Up @@ -64,6 +63,7 @@ type ambassadorHostSource struct {

// NewAmbassadorHostSource creates a new ambassadorHostSource with the given config.
func NewAmbassadorHostSource(
ctx context.Context,
dynamicKubeClient dynamic.Interface,
kubeClient kubernetes.Interface,
namespace string) (Source, error) {
Expand All @@ -82,8 +82,7 @@ func NewAmbassadorHostSource(
},
)

// TODO informer is not explicitly stopped since controller is not passing in its channel.
informerFactory.Start(wait.NeverStop)
informerFactory.Start(ctx.Done())

if err := waitForDynamicCacheSync(context.Background(), informerFactory); err != nil {
return nil, err
Expand Down
5 changes: 2 additions & 3 deletions source/contour_httpproxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/dynamic"
"k8s.io/client-go/dynamic/dynamicinformer"
"k8s.io/client-go/informers"
Expand All @@ -53,6 +52,7 @@ type httpProxySource struct {

// NewContourHTTPProxySource creates a new contourHTTPProxySource with the given config.
func NewContourHTTPProxySource(
ctx context.Context,
dynamicKubeClient dynamic.Interface,
namespace string,
annotationFilter string,
Expand All @@ -78,8 +78,7 @@ func NewContourHTTPProxySource(
},
)

// TODO informer is not explicitly stopped since controller is not passing in its channel.
informerFactory.Start(wait.NeverStop)
informerFactory.Start(ctx.Done())

// wait for the local cache to be populated.
if err := waitForDynamicCacheSync(context.Background(), informerFactory); err != nil {
Expand Down
4 changes: 4 additions & 0 deletions source/contour_httpproxy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ func (suite *HTTPProxySuite) SetupTest() {
var err error

suite.source, err = NewContourHTTPProxySource(
context.TODO(),
fakeDynamicClient,
"default",
"",
Expand Down Expand Up @@ -184,6 +185,7 @@ func TestNewContourHTTPProxySource(t *testing.T) {
fakeDynamicClient, _ := newDynamicKubernetesClient()

_, err := NewContourHTTPProxySource(
context.TODO(),
fakeDynamicClient,
"",
ti.annotationFilter,
Expand Down Expand Up @@ -1033,6 +1035,7 @@ func testHTTPProxyEndpoints(t *testing.T) {
}

httpProxySource, err := NewContourHTTPProxySource(
context.TODO(),
fakeDynamicClient,
ti.targetNamespace,
ti.annotationFilter,
Expand All @@ -1059,6 +1062,7 @@ func newTestHTTPProxySource() (*httpProxySource, error) {
fakeDynamicClient, _ := newDynamicKubernetesClient()

src, err := NewContourHTTPProxySource(
context.TODO(),
fakeDynamicClient,
"default",
"",
Expand Down
6 changes: 2 additions & 4 deletions source/ingress.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ import (
log "github.com/sirupsen/logrus"
networkv1 "k8s.io/api/networking/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/util/wait"
kubeinformers "k8s.io/client-go/informers"
netinformers "k8s.io/client-go/informers/networking/v1"
"k8s.io/client-go/kubernetes"
Expand Down Expand Up @@ -64,7 +63,7 @@ type ingressSource struct {
}

// NewIngressSource creates a new ingressSource with the given config.
func NewIngressSource(kubeClient kubernetes.Interface, namespace, annotationFilter string, fqdnTemplate string, combineFqdnAnnotation bool, ignoreHostnameAnnotation bool, ignoreIngressTLSSpec bool, ignoreIngressRulesSpec bool, labelSelector labels.Selector) (Source, error) {
func NewIngressSource(ctx context.Context, kubeClient kubernetes.Interface, namespace, annotationFilter string, fqdnTemplate string, combineFqdnAnnotation bool, ignoreHostnameAnnotation bool, ignoreIngressTLSSpec bool, ignoreIngressRulesSpec bool, labelSelector labels.Selector) (Source, error) {
tmpl, err := parseTemplate(fqdnTemplate)
if err != nil {
return nil, err
Expand All @@ -83,8 +82,7 @@ func NewIngressSource(kubeClient kubernetes.Interface, namespace, annotationFilt
},
)

// TODO informer is not explicitly stopped since controller is not passing in its channel.
informerFactory.Start(wait.NeverStop)
informerFactory.Start(ctx.Done())

// wait for the local cache to be populated.
if err := waitForCacheSync(context.Background(), informerFactory); err != nil {
Expand Down
3 changes: 3 additions & 0 deletions source/ingress_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ func (suite *IngressSuite) SetupTest() {
suite.NoError(err, "should succeed")

suite.sc, err = NewIngressSource(
context.TODO(),
fakeClient,
"",
"",
Expand Down Expand Up @@ -138,6 +139,7 @@ func TestNewIngressSource(t *testing.T) {
t.Parallel()

_, err := NewIngressSource(
context.TODO(),
fake.NewSimpleClientset(),
"",
ti.annotationFilter,
Expand Down Expand Up @@ -1225,6 +1227,7 @@ func testIngressEndpoints(t *testing.T) {
}

source, _ := NewIngressSource(
context.TODO(),
fakeClient,
ti.targetNamespace,
ti.annotationFilter,
Expand Down
7 changes: 3 additions & 4 deletions source/istio_gateway.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ import (
networkingv1alpha3informer "istio.io/client-go/pkg/informers/externalversions/networking/v1alpha3"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/util/wait"
kubeinformers "k8s.io/client-go/informers"
coreinformers "k8s.io/client-go/informers/core/v1"
"k8s.io/client-go/kubernetes"
Expand All @@ -56,6 +55,7 @@ type gatewaySource struct {

// NewIstioGatewaySource creates a new gatewaySource with the given config.
func NewIstioGatewaySource(
ctx context.Context,
kubeClient kubernetes.Interface,
istioClient istioclient.Interface,
namespace string,
Expand Down Expand Up @@ -93,9 +93,8 @@ func NewIstioGatewaySource(
},
)

// TODO informer is not explicitly stopped since controller is not passing in its channel.
informerFactory.Start(wait.NeverStop)
istioInformerFactory.Start(wait.NeverStop)
informerFactory.Start(ctx.Done())
istioInformerFactory.Start(ctx.Done())

// wait for the local cache to be populated.
if err := waitForCacheSync(context.Background(), informerFactory); err != nil {
Expand Down
4 changes: 4 additions & 0 deletions source/istio_gateway_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ func (suite *GatewaySuite) SetupTest() {
}

suite.source, err = NewIstioGatewaySource(
context.TODO(),
fakeKubernetesClient,
fakeIstioClient,
"",
Expand Down Expand Up @@ -142,6 +143,7 @@ func TestNewIstioGatewaySource(t *testing.T) {
t.Parallel()

_, err := NewIstioGatewaySource(
context.TODO(),
fake.NewSimpleClientset(),
istiofake.NewSimpleClientset(),
"",
Expand Down Expand Up @@ -1165,6 +1167,7 @@ func testGatewayEndpoints(t *testing.T) {
}

gatewaySource, err := NewIstioGatewaySource(
context.TODO(),
fakeKubernetesClient,
fakeIstioClient,
ti.targetNamespace,
Expand Down Expand Up @@ -1201,6 +1204,7 @@ func newTestGatewaySource(loadBalancerList []fakeIngressGatewayService) (*gatewa
}

src, err := NewIstioGatewaySource(
context.TODO(),
fakeKubernetesClient,
fakeIstioClient,
"",
Expand Down
7 changes: 3 additions & 4 deletions source/istio_virtualservice.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ import (
networkingv1alpha3informer "istio.io/client-go/pkg/informers/externalversions/networking/v1alpha3"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/util/wait"
kubeinformers "k8s.io/client-go/informers"
coreinformers "k8s.io/client-go/informers/core/v1"
"k8s.io/client-go/kubernetes"
Expand Down Expand Up @@ -60,6 +59,7 @@ type virtualServiceSource struct {

// NewIstioVirtualServiceSource creates a new virtualServiceSource with the given config.
func NewIstioVirtualServiceSource(
ctx context.Context,
kubeClient kubernetes.Interface,
istioClient istioclient.Interface,
namespace string,
Expand Down Expand Up @@ -97,9 +97,8 @@ func NewIstioVirtualServiceSource(
},
)

// TODO informer is not explicitly stopped since controller is not passing in its channel.
informerFactory.Start(wait.NeverStop)
istioInformerFactory.Start(wait.NeverStop)
informerFactory.Start(ctx.Done())
istioInformerFactory.Start(ctx.Done())

// wait for the local cache to be populated.
if err := waitForCacheSync(context.Background(), informerFactory); err != nil {
Expand Down
4 changes: 4 additions & 0 deletions source/istio_virtualservice_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ func (suite *VirtualServiceSuite) SetupTest() {
suite.NoError(err, "should succeed")

suite.source, err = NewIstioVirtualServiceSource(
context.TODO(),
fakeKubernetesClient,
fakeIstioClient,
"",
Expand Down Expand Up @@ -165,6 +166,7 @@ func TestNewIstioVirtualServiceSource(t *testing.T) {
t.Parallel()

_, err := NewIstioVirtualServiceSource(
context.TODO(),
fake.NewSimpleClientset(),
istiofake.NewSimpleClientset(),
"",
Expand Down Expand Up @@ -1482,6 +1484,7 @@ func testVirtualServiceEndpoints(t *testing.T) {
}

virtualServiceSource, err := NewIstioVirtualServiceSource(
context.TODO(),
fakeKubernetesClient,
fakeIstioClient,
ti.targetNamespace,
Expand Down Expand Up @@ -1557,6 +1560,7 @@ func newTestVirtualServiceSource(loadBalancerList []fakeIngressGatewayService, g
}

src, err := NewIstioVirtualServiceSource(
context.TODO(),
fakeKubernetesClient,
fakeIstioClient,
"",
Expand Down
6 changes: 2 additions & 4 deletions source/kong_tcpingress.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ import (
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/dynamic"
"k8s.io/client-go/dynamic/dynamicinformer"
"k8s.io/client-go/informers"
Expand Down Expand Up @@ -57,7 +56,7 @@ type kongTCPIngressSource struct {
}

// NewKongTCPIngressSource creates a new kongTCPIngressSource with the given config.
func NewKongTCPIngressSource(dynamicKubeClient dynamic.Interface, kubeClient kubernetes.Interface, namespace string, annotationFilter string) (Source, error) {
func NewKongTCPIngressSource(ctx context.Context, dynamicKubeClient dynamic.Interface, kubeClient kubernetes.Interface, namespace string, annotationFilter string) (Source, error) {
var err error

// Use shared informer to listen for add/update/delete of Host in the specified namespace.
Expand All @@ -73,8 +72,7 @@ func NewKongTCPIngressSource(dynamicKubeClient dynamic.Interface, kubeClient kub
},
)

// TODO informer is not explicitly stopped since controller is not passing in its channel.
informerFactory.Start(wait.NeverStop)
informerFactory.Start(ctx.Done())

// wait for the local cache to be populated.
if err := waitForDynamicCacheSync(context.Background(), informerFactory); err != nil {
Expand Down
2 changes: 1 addition & 1 deletion source/kong_tcpingress_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -241,7 +241,7 @@ func TestKongTCPIngressEndpoints(t *testing.T) {
_, err = fakeDynamicClient.Resource(kongGroupdVersionResource).Namespace(defaultKongNamespace).Create(context.Background(), &tcpi, metav1.CreateOptions{})
assert.NoError(t, err)

source, err := NewKongTCPIngressSource(fakeDynamicClient, fakeKubernetesClient, defaultKongNamespace, "kubernetes.io/ingress.class=kong")
source, err := NewKongTCPIngressSource(context.TODO(), fakeDynamicClient, fakeKubernetesClient, defaultKongNamespace, "kubernetes.io/ingress.class=kong")
assert.NoError(t, err)
assert.NotNil(t, source)

Expand Down
6 changes: 2 additions & 4 deletions source/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ import (
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/util/wait"
kubeinformers "k8s.io/client-go/informers"
coreinformers "k8s.io/client-go/informers/core/v1"
"k8s.io/client-go/kubernetes"
Expand All @@ -42,7 +41,7 @@ type nodeSource struct {
}

// NewNodeSource creates a new nodeSource with the given config.
func NewNodeSource(kubeClient kubernetes.Interface, annotationFilter, fqdnTemplate string) (Source, error) {
func NewNodeSource(ctx context.Context, kubeClient kubernetes.Interface, annotationFilter, fqdnTemplate string) (Source, error) {
tmpl, err := parseTemplate(fqdnTemplate)
if err != nil {
return nil, err
Expand All @@ -62,8 +61,7 @@ func NewNodeSource(kubeClient kubernetes.Interface, annotationFilter, fqdnTempla
},
)

// TODO informer is not explicitly stopped since controller is not passing in its channel.
informerFactory.Start(wait.NeverStop)
informerFactory.Start(ctx.Done())

// wait for the local cache to be populated.
if err := waitForCacheSync(context.Background(), informerFactory); err != nil {
Expand Down
2 changes: 2 additions & 0 deletions source/node_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ func testNodeSourceNewNodeSource(t *testing.T) {
t.Parallel()

_, err := NewNodeSource(
context.TODO(),
fake.NewSimpleClientset(),
ti.annotationFilter,
ti.fqdnTemplate,
Expand Down Expand Up @@ -353,6 +354,7 @@ func testNodeSourceEndpoints(t *testing.T) {

// Create our object under test and get the endpoints.
client, err := NewNodeSource(
context.TODO(),
kubernetes,
tc.annotationFilter,
tc.fqdnTemplate,
Expand Down
5 changes: 2 additions & 3 deletions source/openshift_route.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ import (
log "github.com/sirupsen/logrus"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/tools/cache"

"sigs.k8s.io/external-dns/endpoint"
Expand All @@ -54,6 +53,7 @@ type ocpRouteSource struct {

// NewOcpRouteSource creates a new ocpRouteSource with the given config.
func NewOcpRouteSource(
ctx context.Context,
ocpClient versioned.Interface,
namespace string,
annotationFilter string,
Expand Down Expand Up @@ -81,8 +81,7 @@ func NewOcpRouteSource(
},
)

// TODO informer is not explicitly stopped since controller is not passing in its channel.
informerFactory.Start(wait.NeverStop)
informerFactory.Start(ctx.Done())

// wait for the local cache to be populated.
if err := waitForCacheSync(context.Background(), informerFactory); err != nil {
Expand Down
3 changes: 3 additions & 0 deletions source/openshift_route_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ func (suite *OCPRouteSuite) SetupTest() {
var err error

suite.sc, err = NewOcpRouteSource(
context.TODO(),
fakeClient,
"",
"",
Expand Down Expand Up @@ -141,6 +142,7 @@ func testOcpRouteSourceNewOcpRouteSource(t *testing.T) {
t.Parallel()

_, err := NewOcpRouteSource(
context.TODO(),
fake.NewSimpleClientset(),
"",
ti.annotationFilter,
Expand Down Expand Up @@ -439,6 +441,7 @@ func testOcpRouteSourceEndpoints(t *testing.T) {
require.NoError(t, err)

source, err := NewOcpRouteSource(
context.TODO(),
fakeClient,
"",
"",
Expand Down
Loading

0 comments on commit 4f41229

Please sign in to comment.