Skip to content

Commit

Permalink
Merge pull request #30 from szuecs/init-cache
Browse files Browse the repository at this point in the history
Initialize cache at startup
  • Loading branch information
szuecs authored Jul 8, 2019
2 parents 750f426 + 568372b commit c447ceb
Show file tree
Hide file tree
Showing 5 changed files with 108 additions and 28 deletions.
15 changes: 6 additions & 9 deletions .travis.yml
Original file line number Diff line number Diff line change
@@ -1,22 +1,19 @@
os:
- linux

language: go

go:
- 1.x
- "1.12.x"
- tip

matrix:
allow_failures:
- go: tip

before_install:
- go get github.com/Masterminds/glide
env:
- GO111MODULE=on

install:
- glide install --strip-vendor
before_install:
- go get honnef.co/go/tools/cmd/staticcheck

script:
- make
- make build.docker
- make test
43 changes: 32 additions & 11 deletions controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,39 +8,61 @@ import (
"github.com/szuecs/kube-static-egress-controller/provider"
)

type EgressConfigSource interface {
ListConfigs() ([]provider.EgressConfig, error)
Config() <-chan provider.EgressConfig
}

// EgressController is the controller for creating Egress configuration via a
// provider.
type EgressController struct {
interval time.Duration
configsChan <-chan provider.EgressConfig
configsCache map[provider.Resource]map[string]struct{}
provider provider.Provider
interval time.Duration
configSource EgressConfigSource
configsCache map[provider.Resource]map[string]struct{}
provider provider.Provider
cacheInitialized bool
}

// NewEgressController initializes a new EgressController.
func NewEgressController(prov provider.Provider, configsChan <-chan provider.EgressConfig, interval time.Duration) *EgressController {
func NewEgressController(prov provider.Provider, configSource EgressConfigSource, interval time.Duration) *EgressController {
return &EgressController{
interval: interval,
configsChan: configsChan,
provider: prov,
configSource: configSource,
configsCache: make(map[provider.Resource]map[string]struct{}),
}
}

// Run runs the EgressController main loop.
func (c *EgressController) Run(ctx context.Context) {
log.Info("Running controller")
interval := c.interval

for {
if !c.cacheInitialized {
configs, err := c.configSource.ListConfigs()
if err != nil {
log.Errorf("Failed to list Egress configurations: %v", err)
time.Sleep(3 * time.Second)
continue
}

c.cacheInitialized = true
for _, config := range configs {
if len(config.IPAddresses) > 0 {
c.configsCache[config.Resource] = config.IPAddresses
}
}
continue
}

select {
case <-time.After(interval):
case <-time.After(c.interval):
err := c.provider.Ensure(c.configsCache)
if err != nil {
log.Errorf("Failed to ensure configuration: %v", err)
continue
}
interval = c.interval
case config := <-c.configsChan:
case config := <-c.configSource.Config():
if len(config.IPAddresses) == 0 {
delete(c.configsCache, config.Resource)
} else {
Expand All @@ -53,7 +75,6 @@ func (c *EgressController) Run(ctx context.Context) {
log.Errorf("Failed to ensure configuration: %v", err)
continue
}
interval = c.interval
case <-ctx.Done():
log.Info("Terminating controller loop.")
return
Expand Down
47 changes: 44 additions & 3 deletions controller/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,55 @@ import (
"github.com/szuecs/kube-static-egress-controller/provider/noop"
)

type mockEgressConfigSource struct {
configs []provider.EgressConfig
configsChan <-chan provider.EgressConfig
}

func (s mockEgressConfigSource) ListConfigs() ([]provider.EgressConfig, error) {
return s.configs, nil
}

func (s mockEgressConfigSource) Config() <-chan provider.EgressConfig {
return s.configsChan
}

func TestControllerRun(t *testing.T) {
prov := noop.NewNoopProvider()
configsChan := make(chan provider.EgressConfig)
controller := NewEgressController(prov, configsChan, 0)
configSource := mockEgressConfigSource{
configs: []provider.EgressConfig{
{
Resource: provider.Resource{
Name: "a",
Namespace: "y",
},
IPAddresses: map[string]struct{}{
"10.0.0.1": struct{}{},
},
},
},
configsChan: configsChan,
}
controller := NewEgressController(prov, configSource, 0)

// test adding the an egress config.
ctx, cancel := context.WithCancel(context.Background())

go func() {
cancel()
}()
controller.Run(ctx)

require.Len(t, controller.configsCache, 1)
require.Contains(t, controller.configsCache, provider.Resource{
Name: "a",
Namespace: "y",
})

// test adding the an egress config.
ctx, cancel = context.WithCancel(context.Background())

go func() {
configsChan <- provider.EgressConfig{
Resource: provider.Resource{
Expand All @@ -31,7 +72,7 @@ func TestControllerRun(t *testing.T) {
}()
controller.Run(ctx)

require.Len(t, controller.configsCache, 1)
require.Len(t, controller.configsCache, 2)
require.Contains(t, controller.configsCache, provider.Resource{
Name: "a",
Namespace: "x",
Expand All @@ -51,5 +92,5 @@ func TestControllerRun(t *testing.T) {
}()
controller.Run(ctx)

require.Len(t, controller.configsCache, 0)
require.Len(t, controller.configsCache, 1)
}
29 changes: 25 additions & 4 deletions kube/configmap.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,10 @@ type ConfigMapWatcher struct {
client kubernetes.Interface
namespace string
selector fields.Selector
configs chan<- provider.EgressConfig
configs chan provider.EgressConfig
}

func NewConfigMapWatcher(client kubernetes.Interface, namespace, selectorStr string, configs chan<- provider.EgressConfig) (*ConfigMapWatcher, error) {
func NewConfigMapWatcher(client kubernetes.Interface, namespace, selectorStr string, configs chan provider.EgressConfig) (*ConfigMapWatcher, error) {
selector, err := fields.ParseSelector(selectorStr)
if err != nil {
return nil, err
Expand Down Expand Up @@ -79,7 +79,7 @@ func (c *ConfigMapWatcher) add(obj interface{}) {
c.configs <- configMapToEgressConfig(cm)
}

func (c *ConfigMapWatcher) update(newObj, oldObj interface{}) {
func (c *ConfigMapWatcher) update(oldObj, newObj interface{}) {
newCM, ok := newObj.(*v1.ConfigMap)
if !ok {
log.Errorf("Failed to get new ConfigMap object")
Expand All @@ -104,12 +104,33 @@ func (c *ConfigMapWatcher) del(obj interface{}) {
}
}

func (c *ConfigMapWatcher) ListConfigs() ([]provider.EgressConfig, error) {
opts := metav1.ListOptions{
LabelSelector: c.selector.String(),
}

configMaps, err := c.client.CoreV1().ConfigMaps(c.namespace).List(opts)
if err != nil {
return nil, err
}

configs := make([]provider.EgressConfig, 0, len(configMaps.Items))
for _, cm := range configMaps.Items {
configs = append(configs, configMapToEgressConfig(&cm))
}
return configs, nil
}

func (c *ConfigMapWatcher) Config() <-chan provider.EgressConfig {
return c.configs
}

func configMapToEgressConfig(cm *v1.ConfigMap) provider.EgressConfig {
ipAddresses := make(map[string]struct{})
for key, cidr := range cm.Data {
_, ipnet, err := net.ParseCIDR(cidr)
if err != nil {
log.Errorf("Failed to parse CIDR %v from %s in ConfigMap %s/%s", cidr, key, cm.Namespace, cm.Name)
log.Errorf("Failed to parse CIDR '%s' from '%s' in ConfigMap %s/%s", cidr, key, cm.Namespace, cm.Name)
continue
}
ipAddresses[ipnet.String()] = struct{}{}
Expand Down
2 changes: 1 addition & 1 deletion main.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ func main() {

go cmWatcher.Run(ctx)

controller := controller.NewEgressController(p, configsChan, cfg.ResyncInterval)
controller := controller.NewEgressController(p, cmWatcher, cfg.ResyncInterval)
controller.Run(ctx)
}

Expand Down

0 comments on commit c447ceb

Please sign in to comment.