Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor etcd scaler config #6199

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
140 changes: 48 additions & 92 deletions pkg/scalers/etcd_scaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (
"errors"
"fmt"
"strconv"
"strings"
"time"

"github.com/go-logr/logr"
Expand Down Expand Up @@ -38,18 +37,39 @@ type etcdScaler struct {
}

type etcdMetadata struct {
endpoints []string
watchKey string
value float64
activationValue float64
watchProgressNotifyInterval int
triggerIndex int
triggerIndex int

Endpoints []string `keda:"name=endpoints, order=triggerMetadata"`
WatchKey string `keda:"name=watchKey, order=triggerMetadata"`
Value float64 `keda:"name=value, order=triggerMetadata"`
ActivationValue float64 `keda:"name=activationValue, order=triggerMetadata, optional, default=0"`
WatchProgressNotifyInterval int `keda:"name=watchProgressNotifyInterval, order=triggerMetadata, optional, default=600"`

// TLS
enableTLS bool
cert string
key string
keyPassword string
ca string
EnableTLS string `keda:"name=tls, order=authParams, optional, default=disable"`
Cert string `keda:"name=cert, order=authParams, optional"`
Key string `keda:"name=key, order=authParams, optional"`
KeyPassword string `keda:"name=keyPassword, order=authParams, optional"`
Ca string `keda:"name=ca, order=authParams, optional"`
}

func (meta *etcdMetadata) Validate() error {
if meta.WatchProgressNotifyInterval <= 0 {
return errors.New("watchProgressNotifyInterval must be greater than 0")
}

if meta.EnableTLS == etcdTLSEnable {
if meta.Cert == "" && meta.Key != "" {
return errors.New("cert must be provided with key")
}
if meta.Key == "" && meta.Cert != "" {
return errors.New("key must be provided with cert")
}
} else if meta.EnableTLS != etcdTLSDisable {
return fmt.Errorf("incorrect value for TLS given: %s", meta.EnableTLS)
}

return nil
}

// NewEtcdScaler creates a new etcdScaler
Expand All @@ -76,75 +96,11 @@ func NewEtcdScaler(config *scalersconfig.ScalerConfig) (Scaler, error) {
}, nil
}

func parseEtcdAuthParams(config *scalersconfig.ScalerConfig, meta *etcdMetadata) error {
meta.enableTLS = false
if val, ok := config.AuthParams["tls"]; ok {
val = strings.TrimSpace(val)
if val == etcdTLSEnable {
certGiven := config.AuthParams["cert"] != ""
keyGiven := config.AuthParams["key"] != ""
if certGiven && !keyGiven {
return errors.New("key must be provided with cert")
}
if keyGiven && !certGiven {
return errors.New("cert must be provided with key")
}
meta.ca = config.AuthParams["ca"]
meta.cert = config.AuthParams["cert"]
meta.key = config.AuthParams["key"]
if value, found := config.AuthParams["keyPassword"]; found {
meta.keyPassword = value
} else {
meta.keyPassword = ""
}
meta.enableTLS = true
} else if val != etcdTLSDisable {
return fmt.Errorf("err incorrect value for TLS given: %s", val)
}
}

return nil
}

func parseEtcdMetadata(config *scalersconfig.ScalerConfig) (*etcdMetadata, error) {
meta := &etcdMetadata{}
var err error
meta.endpoints = strings.Split(config.TriggerMetadata[endpoints], ",")
if len(meta.endpoints) == 0 || meta.endpoints[0] == "" {
return nil, fmt.Errorf("endpoints required")
}

meta.watchKey = config.TriggerMetadata[watchKey]
if len(meta.watchKey) == 0 {
return nil, fmt.Errorf("watchKey required")
}

value, err := strconv.ParseFloat(config.TriggerMetadata[value], 64)
if err != nil || value <= 0 {
return nil, fmt.Errorf("value must be a float greater than 0")
}
meta.value = value

meta.activationValue = 0
if val, ok := config.TriggerMetadata[activationValue]; ok {
activationValue, err := strconv.ParseFloat(val, 64)
if err != nil {
return nil, fmt.Errorf("activationValue must be a float")
}
meta.activationValue = activationValue
}

meta.watchProgressNotifyInterval = defaultWatchProgressNotifyInterval
if val, ok := config.TriggerMetadata[watchProgressNotifyInterval]; ok {
interval, err := strconv.Atoi(val)
if err != nil || interval <= 0 {
return nil, fmt.Errorf("watchProgressNotifyInterval must be a int greater than 0")
}
meta.watchProgressNotifyInterval = interval
}

if err = parseEtcdAuthParams(config, meta); err != nil {
return meta, err
if err := config.TypedConfig(meta); err != nil {
return nil, fmt.Errorf("error parsing redis metadata: %w", err)
}

meta.triggerIndex = config.TriggerIndex
Expand All @@ -154,15 +110,15 @@ func parseEtcdMetadata(config *scalersconfig.ScalerConfig) (*etcdMetadata, error
func getEtcdClients(metadata *etcdMetadata) (*clientv3.Client, error) {
var tlsConfig *tls.Config
var err error
if metadata.enableTLS {
tlsConfig, err = kedautil.NewTLSConfigWithPassword(metadata.cert, metadata.key, metadata.keyPassword, metadata.ca, false)
if metadata.EnableTLS == etcdTLSEnable {
tlsConfig, err = kedautil.NewTLSConfigWithPassword(metadata.Cert, metadata.Key, metadata.KeyPassword, metadata.Ca, false)
if err != nil {
return nil, err
}
}

cli, err := clientv3.New(clientv3.Config{
Endpoints: metadata.endpoints,
Endpoints: metadata.Endpoints,
DialTimeout: 5 * time.Second,
TLS: tlsConfig,
})
Expand All @@ -189,16 +145,16 @@ func (s *etcdScaler) GetMetricsAndActivity(ctx context.Context, metricName strin
}

metric := GenerateMetricInMili(metricName, v)
return append([]external_metrics.ExternalMetricValue{}, metric), v > s.metadata.activationValue, nil
return append([]external_metrics.ExternalMetricValue{}, metric), v > s.metadata.ActivationValue, nil
}

// GetMetricSpecForScaling returns the metric spec for the HPA.
func (s *etcdScaler) GetMetricSpecForScaling(context.Context) []v2.MetricSpec {
externalMetric := &v2.ExternalMetricSource{
Metric: v2.MetricIdentifier{
Name: GenerateMetricNameWithIndex(s.metadata.triggerIndex, kedautil.NormalizeString(fmt.Sprintf("etcd-%s", s.metadata.watchKey))),
Name: GenerateMetricNameWithIndex(s.metadata.triggerIndex, kedautil.NormalizeString(fmt.Sprintf("etcd-%s", s.metadata.WatchKey))),
},
Target: GetMetricTargetMili(s.metricType, s.metadata.value),
Target: GetMetricTargetMili(s.metricType, s.metadata.Value),
}
metricSpec := v2.MetricSpec{External: externalMetric, Type: etcdMetricType}
return []v2.MetricSpec{metricSpec}
Expand All @@ -209,16 +165,16 @@ func (s *etcdScaler) Run(ctx context.Context, active chan<- bool) {

// It's possible for the watch to get terminated anytime, we need to run this in a retry loop
runWithWatch := func() {
s.logger.Info("run watch", "watchKey", s.metadata.watchKey, "endpoints", s.metadata.endpoints)
s.logger.Info("run watch", "watchKey", s.metadata.WatchKey, "endpoints", s.metadata.Endpoints)
subCtx, cancel := context.WithCancel(ctx)
subCtx = clientv3.WithRequireLeader(subCtx)
rch := s.client.Watch(subCtx, s.metadata.watchKey, clientv3.WithProgressNotify())
rch := s.client.Watch(subCtx, s.metadata.WatchKey, clientv3.WithProgressNotify())

// rewatch to another etcd server when the network is isolated from the current etcd server.
progress := make(chan bool)
defer close(progress)
go func() {
delayDuration := time.Duration(s.metadata.watchProgressNotifyInterval) * 2 * time.Second
delayDuration := time.Duration(s.metadata.WatchProgressNotifyInterval) * 2 * time.Second
delay := time.NewTimer(delayDuration)
defer delay.Stop()
for {
Expand All @@ -228,7 +184,7 @@ func (s *etcdScaler) Run(ctx context.Context, active chan<- bool) {
case <-subCtx.Done():
return
case <-delay.C:
s.logger.Info("no watch progress notification in the interval", "watchKey", s.metadata.watchKey, "endpoints", s.metadata.endpoints)
s.logger.Info("no watch progress notification in the interval", "watchKey", s.metadata.WatchKey, "endpoints", s.metadata.Endpoints)
cancel()
return
}
Expand All @@ -240,7 +196,7 @@ func (s *etcdScaler) Run(ctx context.Context, active chan<- bool) {

// rewatch to another etcd server when there is an error form the current etcd server, such as 'no leader','required revision has been compacted'
if wresp.Err() != nil {
s.logger.Error(wresp.Err(), "an error occurred in the watch process", "watchKey", s.metadata.watchKey, "endpoints", s.metadata.endpoints)
s.logger.Error(wresp.Err(), "an error occurred in the watch process", "watchKey", s.metadata.WatchKey, "endpoints", s.metadata.Endpoints)
cancel()
return
}
Expand All @@ -251,7 +207,7 @@ func (s *etcdScaler) Run(ctx context.Context, active chan<- bool) {
s.logger.Error(err, "etcdValue invalid will be treated as 0")
v = 0
}
active <- v > s.metadata.activationValue
active <- v > s.metadata.ActivationValue
}
}
}
Expand Down Expand Up @@ -288,12 +244,12 @@ func (s *etcdScaler) Run(ctx context.Context, active chan<- bool) {
func (s *etcdScaler) getMetricValue(ctx context.Context) (float64, error) {
ctx, cancel := context.WithTimeout(ctx, time.Second*5)
defer cancel()
resp, err := s.client.Get(ctx, s.metadata.watchKey)
resp, err := s.client.Get(ctx, s.metadata.WatchKey)
if err != nil {
return 0, err
}
if resp.Kvs == nil {
return 0, fmt.Errorf("watchKey %s doesn't exist", s.metadata.watchKey)
return 0, fmt.Errorf("watchKey %s doesn't exist", s.metadata.WatchKey)
}
v, err := strconv.ParseFloat(string(resp.Kvs[0].Value), 64)
if err != nil {
Expand Down
44 changes: 22 additions & 22 deletions pkg/scalers/etcd_scaler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ type parseEtcdMetadataTestData struct {
type parseEtcdAuthParamsTestData struct {
authParams map[string]string
isError bool
enableTLS bool
enableTLS string
}

type etcdMetricIdentifier struct {
Expand Down Expand Up @@ -56,19 +56,19 @@ var parseEtcdMetadataTestDataset = []parseEtcdMetadataTestData{

var parseEtcdAuthParamsTestDataset = []parseEtcdAuthParamsTestData{
// success, TLS only
{map[string]string{"tls": "enable", "ca": "caaa", "cert": "ceert", "key": "keey"}, false, true},
{map[string]string{"tls": "enable", "ca": "caaa", "cert": "ceert", "key": "keey"}, false, etcdTLSEnable},
// success, TLS cert/key and assumed public CA
{map[string]string{"tls": "enable", "cert": "ceert", "key": "keey"}, false, true},
{map[string]string{"tls": "enable", "cert": "ceert", "key": "keey"}, false, etcdTLSEnable},
// success, TLS cert/key + key password and assumed public CA
{map[string]string{"tls": "enable", "cert": "ceert", "key": "keey", "keyPassword": "keeyPassword"}, false, true},
{map[string]string{"tls": "enable", "cert": "ceert", "key": "keey", "keyPassword": "keeyPassword"}, false, etcdTLSEnable},
// success, TLS CA only
{map[string]string{"tls": "enable", "ca": "caaa"}, false, true},
{map[string]string{"tls": "enable", "ca": "caaa"}, false, etcdTLSEnable},
// failure, TLS missing cert
{map[string]string{"tls": "enable", "ca": "caaa", "key": "keey"}, true, false},
{map[string]string{"tls": "enable", "ca": "caaa", "key": "keey"}, true, etcdTLSDisable},
// failure, TLS missing key
{map[string]string{"tls": "enable", "ca": "caaa", "cert": "ceert"}, true, false},
{map[string]string{"tls": "enable", "ca": "caaa", "cert": "ceert"}, true, etcdTLSDisable},
// failure, TLS invalid
{map[string]string{"tls": "yes", "ca": "caaa", "cert": "ceert", "key": "keey"}, true, false},
{map[string]string{"tls": "yes", "ca": "caaa", "cert": "ceert", "key": "keey"}, true, etcdTLSDisable},
}

var etcdMetricIdentifiers = []etcdMetricIdentifier{
Expand All @@ -83,10 +83,10 @@ func TestParseEtcdMetadata(t *testing.T) {
t.Error("Expected success but got error", err)
}
if testData.isError && err == nil {
t.Error("Expected error but got success")
t.Errorf("Expected error but got success %v", testData)
}
if err == nil && !reflect.DeepEqual(meta.endpoints, testData.endpoints) {
t.Errorf("Expected %v but got %v\n", testData.endpoints, meta.endpoints)
if err == nil && !reflect.DeepEqual(meta.Endpoints, testData.endpoints) {
t.Errorf("Expected %v but got %v\n", testData.endpoints, meta.Endpoints)
}
}
}
Expand All @@ -101,21 +101,21 @@ func TestParseEtcdAuthParams(t *testing.T) {
if testData.isError && err == nil {
t.Error("Expected error but got success")
}
if meta.enableTLS != testData.enableTLS {
t.Errorf("Expected enableTLS to be set to %v but got %v\n", testData.enableTLS, meta.enableTLS)
if meta != nil && meta.EnableTLS != testData.enableTLS {
t.Errorf("Expected enableTLS to be set to %v but got %v\n", testData.enableTLS, meta.EnableTLS)
}
if meta.enableTLS {
if meta.ca != testData.authParams["ca"] {
t.Errorf("Expected ca to be set to %v but got %v\n", testData.authParams["ca"], meta.enableTLS)
if meta != nil && meta.EnableTLS == etcdTLSEnable {
if meta.Ca != testData.authParams["ca"] {
t.Errorf("Expected ca to be set to %v but got %v\n", testData.authParams["ca"], meta.EnableTLS)
}
if meta.cert != testData.authParams["cert"] {
t.Errorf("Expected cert to be set to %v but got %v\n", testData.authParams["cert"], meta.cert)
if meta.Cert != testData.authParams["cert"] {
t.Errorf("Expected cert to be set to %v but got %v\n", testData.authParams["cert"], meta.Cert)
}
if meta.key != testData.authParams["key"] {
t.Errorf("Expected key to be set to %v but got %v\n", testData.authParams["key"], meta.key)
if meta.Key != testData.authParams["key"] {
t.Errorf("Expected key to be set to %v but got %v\n", testData.authParams["key"], meta.Key)
}
if meta.keyPassword != testData.authParams["keyPassword"] {
t.Errorf("Expected key to be set to %v but got %v\n", testData.authParams["keyPassword"], meta.key)
if meta.KeyPassword != testData.authParams["keyPassword"] {
t.Errorf("Expected key to be set to %v but got %v\n", testData.authParams["keyPassword"], meta.Key)
}
}
}
Expand Down
Loading