Skip to content

Commit

Permalink
Add ActiveMQ input plugin (influxdata#2689)
Browse files Browse the repository at this point in the history
  • Loading branch information
mlabouardy authored and rgitzel committed Oct 17, 2018
1 parent a880f0b commit 47d805b
Show file tree
Hide file tree
Showing 5 changed files with 488 additions and 0 deletions.
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,7 @@ configuration options.

## Input Plugins

* [activemq](./plugins/inputs/activemq)
* [aerospike](./plugins/inputs/aerospike)
* [amqp_consumer](./plugins/inputs/amqp_consumer) (rabbitmq)
* [apache](./plugins/inputs/apache)
Expand Down
86 changes: 86 additions & 0 deletions plugins/inputs/activemq/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
# Telegraf Input Plugin: ActiveMQ

This plugin gather queues, topics & subscribers metrics using ActiveMQ Console API.

### Configuration:

```toml
# Description
[[inputs.activemq]]
## Required ActiveMQ Endpoint
# server = "192.168.50.10"

## Required ActiveMQ port
# port = 8161

## Credentials for basic HTTP authentication
# username = "admin"
# password = "admin"

## Required ActiveMQ webadmin root path
# webadmin = "admin"

## Maximum time to receive response.
# response_timeout = "5s"

## Optional TLS Config
# tls_ca = "/etc/telegraf/ca.pem"
# tls_cert = "/etc/telegraf/cert.pem"
# tls_key = "/etc/telegraf/key.pem"
## Use TLS but skip chain & host verification
```

### Measurements & Fields:

Every effort was made to preserve the names based on the XML response from the ActiveMQ Console API.

- activemq_queues:
- size
- consumer_count
- enqueue_count
- dequeue_count
- activemq_topics:
- size
- consumer_count
- enqueue_count
- dequeue_count
- subscribers_metrics:
- pending_queue_size
- dispatched_queue_size
- dispatched_counter
- enqueue_counter
- dequeue_counter

### Tags:

- activemq_queues:
- name
- source
- port
- activemq_topics:
- name
- source
- port
- activemq_subscribers:
- client_id
- subscription_name
- connection_id
- destination_name
- selector
- active
- source
- port

### Example Output:

```
$ ./telegraf -config telegraf.conf -input-filter activemq -test
activemq_queues,name=sandra,host=88284b2fe51b,source=localhost,port=8161 consumer_count=0i,enqueue_count=0i,dequeue_count=0i,size=0i 1492610703000000000
activemq_queues,name=Test,host=88284b2fe51b,source=localhost,port=8161 dequeue_count=0i,size=0i,consumer_count=0i,enqueue_count=0i 1492610703000000000
activemq_topics,name=ActiveMQ.Advisory.MasterBroker\ ,host=88284b2fe51b,source=localhost,port=8161 size=0i,consumer_count=0i,enqueue_count=1i,dequeue_count=0i 1492610703000000000
activemq_topics,host=88284b2fe51b,name=AAA\,source=localhost,port=8161 size=0i,consumer_count=1i,enqueue_count=0i,dequeue_count=0i 1492610703000000000
activemq_topics,name=ActiveMQ.Advisory.Topic\,source=localhost,port=8161 ,host=88284b2fe51b enqueue_count=1i,dequeue_count=0i,size=0i,consumer_count=0i 1492610703000000000
activemq_topics,name=ActiveMQ.Advisory.Queue\,source=localhost,port=8161 ,host=88284b2fe51b size=0i,consumer_count=0i,enqueue_count=2i,dequeue_count=0i 1492610703000000000
activemq_topics,name=AAAA\ ,host=88284b2fe51b,source=localhost,port=8161 consumer_count=0i,enqueue_count=0i,dequeue_count=0i,size=0i 1492610703000000000
activemq_subscribers,connection_id=NOTSET,destination_name=AAA,,source=localhost,port=8161,selector=AA,active=no,host=88284b2fe51b,client_id=AAA,subscription_name=AAA pending_queue_size=0i,dispatched_queue_size=0i,dispatched_counter=0i,enqueue_counter=0i,dequeue_counter=0i 1492610703000000000
```
261 changes: 261 additions & 0 deletions plugins/inputs/activemq/activemq.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,261 @@
package activemq

import (
"encoding/xml"
"fmt"
"io/ioutil"
"net/http"
"strconv"
"time"

"strings"

"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/internal"
"github.com/influxdata/telegraf/internal/tls"
"github.com/influxdata/telegraf/plugins/inputs"
)

type ActiveMQ struct {
Server string `json:"server"`
Port int `json:"port"`
Username string `json:"username"`
Password string `json:"password"`
Webadmin string `json:"webadmin"`
ResponseTimeout internal.Duration
tls.ClientConfig

client *http.Client
}

type Topics struct {
XMLName xml.Name `xml:"topics"`
TopicItems []Topic `xml:"topic"`
}

type Topic struct {
XMLName xml.Name `xml:"topic"`
Name string `xml:"name,attr"`
Stats Stats `xml:"stats"`
}

type Subscribers struct {
XMLName xml.Name `xml:"subscribers"`
SubscriberItems []Subscriber `xml:"subscriber"`
}

type Subscriber struct {
XMLName xml.Name `xml:"subscriber"`
ClientId string `xml:"clientId,attr"`
SubscriptionName string `xml:"subscriptionName,attr"`
ConnectionId string `xml:"connectionId,attr"`
DestinationName string `xml:"destinationName,attr"`
Selector string `xml:"selector,attr"`
Active string `xml:"active,attr"`
Stats Stats `xml:"stats"`
}

type Queues struct {
XMLName xml.Name `xml:"queues"`
QueueItems []Queue `xml:"queue"`
}

type Queue struct {
XMLName xml.Name `xml:"queue"`
Name string `xml:"name,attr"`
Stats Stats `xml:"stats"`
}

type Stats struct {
XMLName xml.Name `xml:"stats"`
Size int `xml:"size,attr"`
ConsumerCount int `xml:"consumerCount,attr"`
EnqueueCount int `xml:"enqueueCount,attr"`
DequeueCount int `xml:"dequeueCount,attr"`
PendingQueueSize int `xml:"pendingQueueSize,attr"`
DispatchedQueueSize int `xml:"dispatchedQueueSize,attr"`
DispatchedCounter int `xml:"dispatchedCounter,attr"`
EnqueueCounter int `xml:"enqueueCounter,attr"`
DequeueCounter int `xml:"dequeueCounter,attr"`
}

const (
QUEUES_STATS = "queues"
TOPICS_STATS = "topics"
SUBSCRIBERS_STATS = "subscribers"
)

var sampleConfig = `
## Required ActiveMQ Endpoint
# server = "192.168.50.10"
## Required ActiveMQ port
# port = 8161
## Credentials for basic HTTP authentication
# username = "admin"
# password = "admin"
## Required ActiveMQ webadmin root path
# webadmin = "admin"
## Maximum time to receive response.
# response_timeout = "5s"
## Optional TLS Config
# tls_ca = "/etc/telegraf/ca.pem"
# tls_cert = "/etc/telegraf/cert.pem"
# tls_key = "/etc/telegraf/key.pem"
## Use TLS but skip chain & host verification
`

func (a *ActiveMQ) Description() string {
return "Gather ActiveMQ metrics"
}

func (a *ActiveMQ) SampleConfig() string {
return sampleConfig
}

func (a *ActiveMQ) createHttpClient() (*http.Client, error) {
tlsCfg, err := a.ClientConfig.TLSConfig()
if err != nil {
return nil, err
}

client := &http.Client{
Transport: &http.Transport{
TLSClientConfig: tlsCfg,
},
Timeout: a.ResponseTimeout.Duration,
}

return client, nil
}

func (a *ActiveMQ) GetMetrics(keyword string) ([]byte, error) {
if a.ResponseTimeout.Duration < time.Second {
a.ResponseTimeout.Duration = time.Second * 5
}

if a.client == nil {
client, err := a.createHttpClient()
if err != nil {
return nil, err
}
a.client = client
}
url := fmt.Sprintf("http://%s:%d/%s/xml/%s.jsp", a.Server, a.Port, a.Webadmin, keyword)

req, err := http.NewRequest("GET", url, nil)
if err != nil {
return nil, err
}

req.SetBasicAuth(a.Username, a.Password)
resp, err := a.client.Do(req)
if err != nil {
return nil, err
}

defer resp.Body.Close()
return ioutil.ReadAll(resp.Body)
}

func (a *ActiveMQ) GatherQueuesMetrics(acc telegraf.Accumulator, queues Queues) {
for _, queue := range queues.QueueItems {
records := make(map[string]interface{})
tags := make(map[string]string)

tags["name"] = strings.TrimSpace(queue.Name)
tags["source"] = a.Server
tags["port"] = strconv.Itoa(a.Port)

records["size"] = queue.Stats.Size
records["consumer_count"] = queue.Stats.ConsumerCount
records["enqueue_count"] = queue.Stats.EnqueueCount
records["dequeue_count"] = queue.Stats.DequeueCount

acc.AddFields("activemq_queues", records, tags)
}
}

func (a *ActiveMQ) GatherTopicsMetrics(acc telegraf.Accumulator, topics Topics) {
for _, topic := range topics.TopicItems {
records := make(map[string]interface{})
tags := make(map[string]string)

tags["name"] = topic.Name
tags["source"] = a.Server
tags["port"] = strconv.Itoa(a.Port)

records["size"] = topic.Stats.Size
records["consumer_count"] = topic.Stats.ConsumerCount
records["enqueue_count"] = topic.Stats.EnqueueCount
records["dequeue_count"] = topic.Stats.DequeueCount

acc.AddFields("activemq_topics", records, tags)
}
}

func (a *ActiveMQ) GatherSubscribersMetrics(acc telegraf.Accumulator, subscribers Subscribers) {
for _, subscriber := range subscribers.SubscriberItems {
records := make(map[string]interface{})
tags := make(map[string]string)

tags["client_id"] = subscriber.ClientId
tags["subscription_name"] = subscriber.SubscriptionName
tags["connection_id"] = subscriber.ConnectionId
tags["destination_name"] = subscriber.DestinationName
tags["selector"] = subscriber.Selector
tags["active"] = subscriber.Active
tags["source"] = a.Server
tags["port"] = strconv.Itoa(a.Port)

records["pending_queue_size"] = subscriber.Stats.PendingQueueSize
records["dispatched_queue_size"] = subscriber.Stats.DispatchedQueueSize
records["dispatched_counter"] = subscriber.Stats.DispatchedCounter
records["enqueue_counter"] = subscriber.Stats.EnqueueCounter
records["dequeue_counter"] = subscriber.Stats.DequeueCounter

acc.AddFields("activemq_subscribers", records, tags)
}
}

func (a *ActiveMQ) Gather(acc telegraf.Accumulator) error {
dataQueues, err := a.GetMetrics(QUEUES_STATS)
queues := Queues{}
err = xml.Unmarshal(dataQueues, &queues)
if err != nil {
return err
}

dataTopics, err := a.GetMetrics(TOPICS_STATS)
topics := Topics{}
err = xml.Unmarshal(dataTopics, &topics)
if err != nil {
return err
}

dataSubscribers, err := a.GetMetrics(SUBSCRIBERS_STATS)
subscribers := Subscribers{}
err = xml.Unmarshal(dataSubscribers, &subscribers)
if err != nil {
return err
}

a.GatherQueuesMetrics(acc, queues)
a.GatherTopicsMetrics(acc, topics)
a.GatherSubscribersMetrics(acc, subscribers)

return nil
}

func init() {
inputs.Add("activemq", func() telegraf.Input {
return &ActiveMQ{
Server: "localhost",
Port: 8161,
}
})
}
Loading

0 comments on commit 47d805b

Please sign in to comment.