diff --git a/plugins/inputs/prometheus/kubernetes.go b/plugins/inputs/prometheus/kubernetes.go index 4faf2d55efb02..87db15ffe0539 100644 --- a/plugins/inputs/prometheus/kubernetes.go +++ b/plugins/inputs/prometheus/kubernetes.go @@ -45,6 +45,7 @@ func (p *Prometheus) start(ctx context.Context) error { if err != nil { return fmt.Errorf("Failed to get current user - %v", err) } + configLocation := filepath.Join(u.HomeDir, ".kube/config") if p.KubeConfig != "" { configLocation = p.KubeConfig @@ -76,6 +77,10 @@ func (p *Prometheus) start(ctx context.Context) error { return nil } +// An edge case exists if a pod goes offline at the same time a new pod is created +// (without the scrape annotations). K8s may re-assign the old pod ip to the non-scrape +// pod, causing errors in the logs. This is only true if the pod going offline is not +// directed to do so by K8s. func (p *Prometheus) watch(ctx context.Context, client *k8s.Client) error { pod := &corev1.Pod{} watcher, err := client.Watch(ctx, "", &corev1.Pod{}) @@ -96,18 +101,44 @@ func (p *Prometheus) watch(ctx context.Context, client *k8s.Client) error { return err } + // If the pod is not "ready", there will be no ip associated with it. + if pod.GetMetadata().GetAnnotations()["prometheus.io/scrape"] != "true" || + !podReady(pod.Status.GetContainerStatuses()) { + continue + } + switch eventType { case k8s.EventAdded: registerPod(pod, p) - case k8s.EventDeleted: - unregisterPod(pod, p) case k8s.EventModified: + // To avoid multiple actions for each event, unregister on the first event + // in the delete sequence, when the containers are still "ready". + if pod.Metadata.GetDeletionTimestamp() != nil { + unregisterPod(pod, p) + } else { + registerPod(pod, p) + } } } } } +func podReady(statuss []*corev1.ContainerStatus) bool { + if len(statuss) == 0 { + return false + } + for _, cs := range statuss { + if !cs.GetReady() { + return false + } + } + return true +} + func registerPod(pod *corev1.Pod, p *Prometheus) { + if p.kubernetesPods == nil { + p.kubernetesPods = map[string]URLAndAddress{} + } targetURL := getScrapeURL(pod) if targetURL == nil { return @@ -116,6 +147,9 @@ func registerPod(pod *corev1.Pod, p *Prometheus) { log.Printf("D! [inputs.prometheus] will scrape metrics from %s", *targetURL) // add annotation as metrics tags tags := pod.GetMetadata().GetAnnotations() + if tags == nil { + tags = map[string]string{} + } tags["pod_name"] = pod.GetMetadata().GetName() tags["namespace"] = pod.GetMetadata().GetNamespace() // add labels as metrics tags @@ -129,20 +163,16 @@ func registerPod(pod *corev1.Pod, p *Prometheus) { } podURL := p.AddressToURL(URL, URL.Hostname()) p.lock.Lock() - p.kubernetesPods = append(p.kubernetesPods, - URLAndAddress{ - URL: podURL, - Address: URL.Hostname(), - OriginalURL: URL, - Tags: tags}) + p.kubernetesPods[podURL.String()] = URLAndAddress{ + URL: podURL, + Address: URL.Hostname(), + OriginalURL: URL, + Tags: tags, + } p.lock.Unlock() } func getScrapeURL(pod *corev1.Pod) *string { - scrape := pod.GetMetadata().GetAnnotations()["prometheus.io/scrape"] - if scrape != "true" { - return nil - } ip := pod.Status.GetPodIP() if ip == "" { // return as if scrape was disabled, we will be notified again once the pod @@ -181,18 +211,13 @@ func unregisterPod(pod *corev1.Pod, p *Prometheus) { return } - p.lock.Lock() - defer p.lock.Unlock() log.Printf("D! [inputs.prometheus] registered a delete request for %s in namespace %s", pod.GetMetadata().GetName(), pod.GetMetadata().GetNamespace()) - var result []URLAndAddress - for _, v := range p.kubernetesPods { - if v.URL.String() != *url { - result = append(result, v) - } else { - log.Printf("D! [inputs.prometheus] will stop scraping for %s", *url) - } + p.lock.Lock() + defer p.lock.Unlock() + if _, ok := p.kubernetesPods[*url]; ok { + delete(p.kubernetesPods, *url) + log.Printf("D! [inputs.prometheus] will stop scraping for %s", *url) } - p.kubernetesPods = result } diff --git a/plugins/inputs/prometheus/kubernetes_test.go b/plugins/inputs/prometheus/kubernetes_test.go index 2afdbc5ec53b5..c1bbe0a1f8aaa 100644 --- a/plugins/inputs/prometheus/kubernetes_test.go +++ b/plugins/inputs/prometheus/kubernetes_test.go @@ -15,6 +15,7 @@ func TestScrapeURLNoAnnotations(t *testing.T) { url := getScrapeURL(p) assert.Nil(t, url) } + func TestScrapeURLAnnotationsNoScrape(t *testing.T) { p := &v1.Pod{Metadata: &metav1.ObjectMeta{}} p.Metadata.Name = str("myPod") @@ -22,18 +23,21 @@ func TestScrapeURLAnnotationsNoScrape(t *testing.T) { url := getScrapeURL(p) assert.Nil(t, url) } + func TestScrapeURLAnnotations(t *testing.T) { p := pod() p.Metadata.Annotations = map[string]string{"prometheus.io/scrape": "true"} url := getScrapeURL(p) assert.Equal(t, "http://127.0.0.1:9102/metrics", *url) } + func TestScrapeURLAnnotationsCustomPort(t *testing.T) { p := pod() p.Metadata.Annotations = map[string]string{"prometheus.io/scrape": "true", "prometheus.io/port": "9000"} url := getScrapeURL(p) assert.Equal(t, "http://127.0.0.1:9000/metrics", *url) } + func TestScrapeURLAnnotationsCustomPath(t *testing.T) { p := pod() p.Metadata.Annotations = map[string]string{"prometheus.io/scrape": "true", "prometheus.io/path": "mymetrics"} @@ -50,11 +54,24 @@ func TestScrapeURLAnnotationsCustomPathWithSep(t *testing.T) { func TestAddPod(t *testing.T) { prom := &Prometheus{} + p := pod() p.Metadata.Annotations = map[string]string{"prometheus.io/scrape": "true"} registerPod(p, prom) assert.Equal(t, 1, len(prom.kubernetesPods)) } + +func TestAddMultipleDuplicatePods(t *testing.T) { + prom := &Prometheus{} + + p := pod() + p.Metadata.Annotations = map[string]string{"prometheus.io/scrape": "true"} + registerPod(p, prom) + p.Metadata.Name = str("Pod2") + registerPod(p, prom) + assert.Equal(t, 1, len(prom.kubernetesPods)) +} + func TestAddMultiplePods(t *testing.T) { prom := &Prometheus{} @@ -62,9 +79,11 @@ func TestAddMultiplePods(t *testing.T) { p.Metadata.Annotations = map[string]string{"prometheus.io/scrape": "true"} registerPod(p, prom) p.Metadata.Name = str("Pod2") + p.Status.PodIP = str("127.0.0.2") registerPod(p, prom) assert.Equal(t, 2, len(prom.kubernetesPods)) } + func TestDeletePods(t *testing.T) { prom := &Prometheus{} diff --git a/plugins/inputs/prometheus/prometheus.go b/plugins/inputs/prometheus/prometheus.go index eaadf14523650..65901a6bfc784 100644 --- a/plugins/inputs/prometheus/prometheus.go +++ b/plugins/inputs/prometheus/prometheus.go @@ -42,7 +42,7 @@ type Prometheus struct { // Should we scrape Kubernetes services for prometheus annotations MonitorPods bool `toml:"monitor_kubernetes_pods"` lock sync.Mutex - kubernetesPods []URLAndAddress + kubernetesPods map[string]URLAndAddress cancel context.CancelFunc wg sync.WaitGroup } @@ -115,21 +115,23 @@ type URLAndAddress struct { Tags map[string]string } -func (p *Prometheus) GetAllURLs() ([]URLAndAddress, error) { - allURLs := make([]URLAndAddress, 0) +func (p *Prometheus) GetAllURLs() (map[string]URLAndAddress, error) { + allURLs := make(map[string]URLAndAddress, 0) for _, u := range p.URLs { URL, err := url.Parse(u) if err != nil { log.Printf("prometheus: Could not parse %s, skipping it. Error: %s", u, err.Error()) continue } - - allURLs = append(allURLs, URLAndAddress{URL: URL, OriginalURL: URL}) + allURLs[URL.String()] = URLAndAddress{URL: URL, OriginalURL: URL} } + p.lock.Lock() defer p.lock.Unlock() // loop through all pods scraped via the prometheus annotation on the pods - allURLs = append(allURLs, p.kubernetesPods...) + for k, v := range p.kubernetesPods { + allURLs[k] = v + } for _, service := range p.KubernetesServices { URL, err := url.Parse(service) @@ -144,7 +146,11 @@ func (p *Prometheus) GetAllURLs() ([]URLAndAddress, error) { } for _, resolved := range resolvedAddresses { serviceURL := p.AddressToURL(URL, resolved) - allURLs = append(allURLs, URLAndAddress{URL: serviceURL, Address: resolved, OriginalURL: URL}) + allURLs[serviceURL.String()] = URLAndAddress{ + URL: serviceURL, + Address: resolved, + OriginalURL: URL, + } } } return allURLs, nil @@ -313,6 +319,9 @@ func (p *Prometheus) Stop() { func init() { inputs.Add("prometheus", func() telegraf.Input { - return &Prometheus{ResponseTimeout: internal.Duration{Duration: time.Second * 3}} + return &Prometheus{ + ResponseTimeout: internal.Duration{Duration: time.Second * 3}, + kubernetesPods: map[string]URLAndAddress{}, + } }) }