Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Collect from newly discovered/launched pods #5293

Merged
merged 3 commits into from
Jan 16, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
69 changes: 47 additions & 22 deletions plugins/inputs/prometheus/kubernetes.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ func (p *Prometheus) start(ctx context.Context) error {
if err != nil {
return fmt.Errorf("Failed to get current user - %v", err)
}

configLocation := filepath.Join(u.HomeDir, ".kube/config")
if p.KubeConfig != "" {
configLocation = p.KubeConfig
Expand Down Expand Up @@ -76,6 +77,10 @@ func (p *Prometheus) start(ctx context.Context) error {
return nil
}

// An edge case exists if a pod goes offline at the same time a new pod is created
// (without the scrape annotations). K8s may re-assign the old pod ip to the non-scrape
// pod, causing errors in the logs. This is only true if the pod going offline is not
// directed to do so by K8s.
func (p *Prometheus) watch(ctx context.Context, client *k8s.Client) error {
pod := &corev1.Pod{}
watcher, err := client.Watch(ctx, "", &corev1.Pod{})
Expand All @@ -96,18 +101,44 @@ func (p *Prometheus) watch(ctx context.Context, client *k8s.Client) error {
return err
}

// If the pod is not "ready", there will be no ip associated with it.
if pod.GetMetadata().GetAnnotations()["prometheus.io/scrape"] != "true" ||
!podReady(pod.Status.GetContainerStatuses()) {
continue
}

switch eventType {
case k8s.EventAdded:
registerPod(pod, p)
case k8s.EventDeleted:
unregisterPod(pod, p)
case k8s.EventModified:
// To avoid multiple actions for each event, unregister on the first event
// in the delete sequence, when the containers are still "ready".
if pod.Metadata.GetDeletionTimestamp() != nil {
unregisterPod(pod, p)
} else {
registerPod(pod, p)
glinton marked this conversation as resolved.
Show resolved Hide resolved
}
}
}
}
}

func podReady(statuss []*corev1.ContainerStatus) bool {
if len(statuss) == 0 {
return false
}
for _, cs := range statuss {
if !cs.GetReady() {
return false
}
}
return true
}

func registerPod(pod *corev1.Pod, p *Prometheus) {
if p.kubernetesPods == nil {
p.kubernetesPods = map[string]URLAndAddress{}
}
targetURL := getScrapeURL(pod)
if targetURL == nil {
return
Expand All @@ -116,6 +147,9 @@ func registerPod(pod *corev1.Pod, p *Prometheus) {
log.Printf("D! [inputs.prometheus] will scrape metrics from %s", *targetURL)
// add annotation as metrics tags
tags := pod.GetMetadata().GetAnnotations()
if tags == nil {
tags = map[string]string{}
}
tags["pod_name"] = pod.GetMetadata().GetName()
tags["namespace"] = pod.GetMetadata().GetNamespace()
// add labels as metrics tags
Expand All @@ -129,20 +163,16 @@ func registerPod(pod *corev1.Pod, p *Prometheus) {
}
podURL := p.AddressToURL(URL, URL.Hostname())
p.lock.Lock()
p.kubernetesPods = append(p.kubernetesPods,
URLAndAddress{
URL: podURL,
Address: URL.Hostname(),
OriginalURL: URL,
Tags: tags})
p.kubernetesPods[podURL.String()] = URLAndAddress{
URL: podURL,
Address: URL.Hostname(),
OriginalURL: URL,
Tags: tags,
}
p.lock.Unlock()
}

func getScrapeURL(pod *corev1.Pod) *string {
scrape := pod.GetMetadata().GetAnnotations()["prometheus.io/scrape"]
if scrape != "true" {
return nil
}
ip := pod.Status.GetPodIP()
if ip == "" {
// return as if scrape was disabled, we will be notified again once the pod
Expand Down Expand Up @@ -181,18 +211,13 @@ func unregisterPod(pod *corev1.Pod, p *Prometheus) {
return
}

p.lock.Lock()
defer p.lock.Unlock()
log.Printf("D! [inputs.prometheus] registered a delete request for %s in namespace %s",
pod.GetMetadata().GetName(), pod.GetMetadata().GetNamespace())
var result []URLAndAddress
for _, v := range p.kubernetesPods {
if v.URL.String() != *url {
result = append(result, v)
} else {
log.Printf("D! [inputs.prometheus] will stop scraping for %s", *url)
}

p.lock.Lock()
defer p.lock.Unlock()
if _, ok := p.kubernetesPods[*url]; ok {
delete(p.kubernetesPods, *url)
log.Printf("D! [inputs.prometheus] will stop scraping for %s", *url)
}
p.kubernetesPods = result
}
19 changes: 19 additions & 0 deletions plugins/inputs/prometheus/kubernetes_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,25 +15,29 @@ func TestScrapeURLNoAnnotations(t *testing.T) {
url := getScrapeURL(p)
assert.Nil(t, url)
}

func TestScrapeURLAnnotationsNoScrape(t *testing.T) {
p := &v1.Pod{Metadata: &metav1.ObjectMeta{}}
p.Metadata.Name = str("myPod")
p.Metadata.Annotations = map[string]string{"prometheus.io/scrape": "false"}
url := getScrapeURL(p)
assert.Nil(t, url)
}

func TestScrapeURLAnnotations(t *testing.T) {
p := pod()
p.Metadata.Annotations = map[string]string{"prometheus.io/scrape": "true"}
url := getScrapeURL(p)
assert.Equal(t, "http://127.0.0.1:9102/metrics", *url)
}

func TestScrapeURLAnnotationsCustomPort(t *testing.T) {
p := pod()
p.Metadata.Annotations = map[string]string{"prometheus.io/scrape": "true", "prometheus.io/port": "9000"}
url := getScrapeURL(p)
assert.Equal(t, "http://127.0.0.1:9000/metrics", *url)
}

func TestScrapeURLAnnotationsCustomPath(t *testing.T) {
p := pod()
p.Metadata.Annotations = map[string]string{"prometheus.io/scrape": "true", "prometheus.io/path": "mymetrics"}
Expand All @@ -50,21 +54,36 @@ func TestScrapeURLAnnotationsCustomPathWithSep(t *testing.T) {

func TestAddPod(t *testing.T) {
prom := &Prometheus{}

p := pod()
p.Metadata.Annotations = map[string]string{"prometheus.io/scrape": "true"}
registerPod(p, prom)
assert.Equal(t, 1, len(prom.kubernetesPods))
}

func TestAddMultipleDuplicatePods(t *testing.T) {
prom := &Prometheus{}

p := pod()
p.Metadata.Annotations = map[string]string{"prometheus.io/scrape": "true"}
registerPod(p, prom)
p.Metadata.Name = str("Pod2")
registerPod(p, prom)
assert.Equal(t, 1, len(prom.kubernetesPods))
}

func TestAddMultiplePods(t *testing.T) {
prom := &Prometheus{}

p := pod()
p.Metadata.Annotations = map[string]string{"prometheus.io/scrape": "true"}
registerPod(p, prom)
p.Metadata.Name = str("Pod2")
p.Status.PodIP = str("127.0.0.2")
registerPod(p, prom)
assert.Equal(t, 2, len(prom.kubernetesPods))
}

func TestDeletePods(t *testing.T) {
prom := &Prometheus{}

Expand Down
25 changes: 17 additions & 8 deletions plugins/inputs/prometheus/prometheus.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ type Prometheus struct {
// Should we scrape Kubernetes services for prometheus annotations
MonitorPods bool `toml:"monitor_kubernetes_pods"`
lock sync.Mutex
kubernetesPods []URLAndAddress
kubernetesPods map[string]URLAndAddress
cancel context.CancelFunc
wg sync.WaitGroup
}
Expand Down Expand Up @@ -115,21 +115,23 @@ type URLAndAddress struct {
Tags map[string]string
}

func (p *Prometheus) GetAllURLs() ([]URLAndAddress, error) {
allURLs := make([]URLAndAddress, 0)
func (p *Prometheus) GetAllURLs() (map[string]URLAndAddress, error) {
allURLs := make(map[string]URLAndAddress, 0)
for _, u := range p.URLs {
URL, err := url.Parse(u)
if err != nil {
log.Printf("prometheus: Could not parse %s, skipping it. Error: %s", u, err.Error())
continue
}

allURLs = append(allURLs, URLAndAddress{URL: URL, OriginalURL: URL})
allURLs[URL.String()] = URLAndAddress{URL: URL, OriginalURL: URL}
}

p.lock.Lock()
defer p.lock.Unlock()
// loop through all pods scraped via the prometheus annotation on the pods
allURLs = append(allURLs, p.kubernetesPods...)
for k, v := range p.kubernetesPods {
allURLs[k] = v
}

for _, service := range p.KubernetesServices {
URL, err := url.Parse(service)
Expand All @@ -144,7 +146,11 @@ func (p *Prometheus) GetAllURLs() ([]URLAndAddress, error) {
}
for _, resolved := range resolvedAddresses {
serviceURL := p.AddressToURL(URL, resolved)
allURLs = append(allURLs, URLAndAddress{URL: serviceURL, Address: resolved, OriginalURL: URL})
allURLs[serviceURL.String()] = URLAndAddress{
URL: serviceURL,
Address: resolved,
OriginalURL: URL,
}
}
}
return allURLs, nil
Expand Down Expand Up @@ -313,6 +319,9 @@ func (p *Prometheus) Stop() {

func init() {
inputs.Add("prometheus", func() telegraf.Input {
return &Prometheus{ResponseTimeout: internal.Duration{Duration: time.Second * 3}}
return &Prometheus{
ResponseTimeout: internal.Duration{Duration: time.Second * 3},
kubernetesPods: map[string]URLAndAddress{},
}
})
}