Skip to content
This repository has been archived by the owner on Sep 25, 2019. It is now read-only.

Commit

Permalink
add socket listener & writer (influxdata#2094)
Browse files Browse the repository at this point in the history
  • Loading branch information
phemmer authored and mlinde201 committed Feb 6, 2017
1 parent 71e9899 commit 5d01a0b
Show file tree
Hide file tree
Showing 9 changed files with 675 additions and 2 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@ plugins, not just statsd.
- [#1980](https://github.com/influxdata/telegraf/issues/1980): Hide username/password from elasticsearch error log messages.
- [#2097](https://github.com/influxdata/telegraf/issues/2097): Configurable HTTP timeouts in Jolokia plugin
- [#2255](https://github.com/influxdata/telegraf/pull/2255): Allow changing jolokia attribute delimiter
- [#2094](https://github.com/influxdata/telegraf/pull/2094): Add generic socket listener & writer.

### Bugfixes

Expand Down
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,7 @@ Telegraf can also collect metrics via the following service plugins:
* [nsq_consumer](./plugins/inputs/nsq_consumer)
* [logparser](./plugins/inputs/logparser)
* [statsd](./plugins/inputs/statsd)
* [socket_listener](./plugins/inputs/socket_listener)
* [tail](./plugins/inputs/tail)
* [tcp_listener](./plugins/inputs/tcp_listener)
* [udp_listener](./plugins/inputs/udp_listener)
Expand Down Expand Up @@ -219,6 +220,7 @@ Telegraf can also collect metrics via the following service plugins:
* [nsq](./plugins/outputs/nsq)
* [opentsdb](./plugins/outputs/opentsdb)
* [prometheus](./plugins/outputs/prometheus_client)
* [socket_writer](./plugins/outputs/socket_writer)
* [riemann](./plugins/outputs/riemann)
* [riemann_legacy](./plugins/outputs/riemann_legacy)

Expand Down
1 change: 1 addition & 0 deletions plugins/inputs/all/all.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ import (
_ "github.com/influxdata/telegraf/plugins/inputs/sensors"
_ "github.com/influxdata/telegraf/plugins/inputs/snmp"
_ "github.com/influxdata/telegraf/plugins/inputs/snmp_legacy"
_ "github.com/influxdata/telegraf/plugins/inputs/socket_listener"
_ "github.com/influxdata/telegraf/plugins/inputs/sqlserver"
_ "github.com/influxdata/telegraf/plugins/inputs/statsd"
_ "github.com/influxdata/telegraf/plugins/inputs/sysstat"
Expand Down
240 changes: 240 additions & 0 deletions plugins/inputs/socket_listener/socket_listener.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,240 @@
package socket_listener

import (
"bufio"
"fmt"
"io"
"log"
"net"
"strings"
"sync"

"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/plugins/inputs"
"github.com/influxdata/telegraf/plugins/parsers"
)

type setReadBufferer interface {
SetReadBuffer(bytes int) error
}

type streamSocketListener struct {
net.Listener
*SocketListener

connections map[string]net.Conn
connectionsMtx sync.Mutex
}

func (ssl *streamSocketListener) listen() {
ssl.connections = map[string]net.Conn{}

for {
c, err := ssl.Accept()
if err != nil {
ssl.AddError(err)
break
}

ssl.connectionsMtx.Lock()
if ssl.MaxConnections > 0 && len(ssl.connections) >= ssl.MaxConnections {
ssl.connectionsMtx.Unlock()
c.Close()
continue
}
ssl.connections[c.RemoteAddr().String()] = c
ssl.connectionsMtx.Unlock()
go ssl.read(c)
}

ssl.connectionsMtx.Lock()
for _, c := range ssl.connections {
c.Close()
}
ssl.connectionsMtx.Unlock()
}

func (ssl *streamSocketListener) removeConnection(c net.Conn) {
ssl.connectionsMtx.Lock()
delete(ssl.connections, c.RemoteAddr().String())
ssl.connectionsMtx.Unlock()
}

func (ssl *streamSocketListener) read(c net.Conn) {
defer ssl.removeConnection(c)
defer c.Close()

scnr := bufio.NewScanner(c)
for scnr.Scan() {
metrics, err := ssl.Parse(scnr.Bytes())
if err != nil {
ssl.AddError(fmt.Errorf("unable to parse incoming line"))
//TODO rate limit
continue
}
for _, m := range metrics {
ssl.AddFields(m.Name(), m.Fields(), m.Tags(), m.Time())
}
}

if err := scnr.Err(); err != nil {
ssl.AddError(err)
}
}

type packetSocketListener struct {
net.PacketConn
*SocketListener
}

func (psl *packetSocketListener) listen() {
buf := make([]byte, 64*1024) // 64kb - maximum size of IP packet
for {
n, _, err := psl.ReadFrom(buf)
if err != nil {
psl.AddError(err)
break
}

metrics, err := psl.Parse(buf[:n])
if err != nil {
psl.AddError(fmt.Errorf("unable to parse incoming packet"))
//TODO rate limit
continue
}
for _, m := range metrics {
psl.AddFields(m.Name(), m.Fields(), m.Tags(), m.Time())
}
}
}

type SocketListener struct {
ServiceAddress string
MaxConnections int
ReadBufferSize int

parsers.Parser
telegraf.Accumulator
io.Closer
}

func (sl *SocketListener) Description() string {
return "Generic socket listener capable of handling multiple socket types."
}

func (sl *SocketListener) SampleConfig() string {
return `
## URL to listen on
# service_address = "tcp://:8094"
# service_address = "tcp://127.0.0.1:http"
# service_address = "tcp4://:8094"
# service_address = "tcp6://:8094"
# service_address = "tcp6://[2001:db8::1]:8094"
# service_address = "udp://:8094"
# service_address = "udp4://:8094"
# service_address = "udp6://:8094"
# service_address = "unix:///tmp/telegraf.sock"
# service_address = "unixgram:///tmp/telegraf.sock"
## Maximum number of concurrent connections.
## Only applies to stream sockets (e.g. TCP).
## 0 (default) is unlimited.
# max_connections = 1024
## Maximum socket buffer size in bytes.
## For stream sockets, once the buffer fills up, the sender will start backing up.
## For datagram sockets, once the buffer fills up, metrics will start dropping.
## Defaults to the OS default.
# read_buffer_size = 65535
## Data format to consume.
## Each data format has it's own unique set of configuration options, read
## more about them here:
## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md
# data_format = "influx"
`
}

func (sl *SocketListener) Gather(_ telegraf.Accumulator) error {
return nil
}

func (sl *SocketListener) SetParser(parser parsers.Parser) {
sl.Parser = parser
}

func (sl *SocketListener) Start(acc telegraf.Accumulator) error {
sl.Accumulator = acc
spl := strings.SplitN(sl.ServiceAddress, "://", 2)
if len(spl) != 2 {
return fmt.Errorf("invalid service address: %s", sl.ServiceAddress)
}

switch spl[0] {
case "tcp", "tcp4", "tcp6", "unix", "unixpacket":
l, err := net.Listen(spl[0], spl[1])
if err != nil {
return err
}

if sl.ReadBufferSize > 0 {
if srb, ok := l.(setReadBufferer); ok {
srb.SetReadBuffer(sl.ReadBufferSize)
} else {
log.Printf("W! Unable to set read buffer on a %s socket", spl[0])
}
}

ssl := &streamSocketListener{
Listener: l,
SocketListener: sl,
}

sl.Closer = ssl
go ssl.listen()
case "udp", "udp4", "udp6", "ip", "ip4", "ip6", "unixgram":
pc, err := net.ListenPacket(spl[0], spl[1])
if err != nil {
return err
}

if sl.ReadBufferSize > 0 {
if srb, ok := pc.(setReadBufferer); ok {
srb.SetReadBuffer(sl.ReadBufferSize)
} else {
log.Printf("W! Unable to set read buffer on a %s socket", spl[0])
}
}

psl := &packetSocketListener{
PacketConn: pc,
SocketListener: sl,
}

sl.Closer = psl
go psl.listen()
default:
return fmt.Errorf("unknown protocol '%s' in '%s'", spl[0], sl.ServiceAddress)
}

return nil
}

func (sl *SocketListener) Stop() {
if sl.Closer != nil {
sl.Close()
sl.Closer = nil
}
}

func newSocketListener() *SocketListener {
parser, _ := parsers.NewInfluxParser()

return &SocketListener{
Parser: parser,
}
}

func init() {
inputs.Add("socket_listener", func() telegraf.Input { return newSocketListener() })
}
122 changes: 122 additions & 0 deletions plugins/inputs/socket_listener/socket_listener_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
package socket_listener

import (
"net"
"os"
"testing"
"time"

"github.com/influxdata/telegraf/testutil"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

func TestSocketListener_tcp(t *testing.T) {
sl := newSocketListener()
sl.ServiceAddress = "tcp://127.0.0.1:0"

acc := &testutil.Accumulator{}
err := sl.Start(acc)
require.NoError(t, err)

client, err := net.Dial("tcp", sl.Closer.(net.Listener).Addr().String())
require.NoError(t, err)

testSocketListener(t, sl, client)
}

func TestSocketListener_udp(t *testing.T) {
sl := newSocketListener()
sl.ServiceAddress = "udp://127.0.0.1:0"

acc := &testutil.Accumulator{}
err := sl.Start(acc)
require.NoError(t, err)

client, err := net.Dial("udp", sl.Closer.(net.PacketConn).LocalAddr().String())
require.NoError(t, err)

testSocketListener(t, sl, client)
}

func TestSocketListener_unix(t *testing.T) {
defer os.Remove("/tmp/telegraf_test.sock")
sl := newSocketListener()
sl.ServiceAddress = "unix:///tmp/telegraf_test.sock"

acc := &testutil.Accumulator{}
err := sl.Start(acc)
require.NoError(t, err)

client, err := net.Dial("unix", "/tmp/telegraf_test.sock")
require.NoError(t, err)

testSocketListener(t, sl, client)
}

func TestSocketListener_unixgram(t *testing.T) {
defer os.Remove("/tmp/telegraf_test.sock")
sl := newSocketListener()
sl.ServiceAddress = "unixgram:///tmp/telegraf_test.sock"

acc := &testutil.Accumulator{}
err := sl.Start(acc)
require.NoError(t, err)

client, err := net.Dial("unixgram", "/tmp/telegraf_test.sock")
require.NoError(t, err)

testSocketListener(t, sl, client)
}

func testSocketListener(t *testing.T, sl *SocketListener, client net.Conn) {
mstr12 := "test,foo=bar v=1i 123456789\ntest,foo=baz v=2i 123456790\n"
mstr3 := "test,foo=zab v=3i 123456791"
client.Write([]byte(mstr12))
client.Write([]byte(mstr3))
if _, ok := client.(net.Conn); ok {
// stream connection. needs trailing newline to terminate mstr3
client.Write([]byte{'\n'})
}

acc := sl.Accumulator.(*testutil.Accumulator)

acc.Lock()
if len(acc.Metrics) < 1 {
acc.Wait()
}
require.True(t, len(acc.Metrics) >= 1)
m := acc.Metrics[0]
acc.Unlock()

assert.Equal(t, "test", m.Measurement)
assert.Equal(t, map[string]string{"foo": "bar"}, m.Tags)
assert.Equal(t, map[string]interface{}{"v": int64(1)}, m.Fields)
assert.True(t, time.Unix(0, 123456789).Equal(m.Time))

acc.Lock()
if len(acc.Metrics) < 2 {
acc.Wait()
}
require.True(t, len(acc.Metrics) >= 2)
m = acc.Metrics[1]
acc.Unlock()

assert.Equal(t, "test", m.Measurement)
assert.Equal(t, map[string]string{"foo": "baz"}, m.Tags)
assert.Equal(t, map[string]interface{}{"v": int64(2)}, m.Fields)
assert.True(t, time.Unix(0, 123456790).Equal(m.Time))

acc.Lock()
if len(acc.Metrics) < 3 {
acc.Wait()
}
require.True(t, len(acc.Metrics) >= 3)
m = acc.Metrics[2]
acc.Unlock()

assert.Equal(t, "test", m.Measurement)
assert.Equal(t, map[string]string{"foo": "zab"}, m.Tags)
assert.Equal(t, map[string]interface{}{"v": int64(3)}, m.Fields)
assert.True(t, time.Unix(0, 123456791).Equal(m.Time))
}
Loading

0 comments on commit 5d01a0b

Please sign in to comment.