Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

loki-canary: Add support for client-side TLS certs for Loki connection #6310

Merged
merged 4 commits into from
Jun 9, 2022
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@
* [5711](https://github.com/grafana/loki/pull/5711) **MichelHollands**: Update fluent-bit output name

#### Loki Canary
* [6310](https://github.com/grafana/loki/pull/6310) **chodges15**: Add support for client-side TLS certs in loki-canary for Loki connection
* [5568](https://github.com/grafana/loki/pull/5568) **afayngelerindbx**: canary: Adds locking to prevent multiple concurrent invocations of `confirmMissing` from clobbering each other
### Notes

Expand Down
34 changes: 30 additions & 4 deletions cmd/loki-canary/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,18 @@ package main
import (
"flag"
"fmt"
"net/http"
"os"
"os/signal"
"strconv"
"sync"
"syscall"
"time"

"crypto/tls"
"net/http"
"os/signal"

"github.com/prometheus/client_golang/prometheus/promhttp"
"github.com/prometheus/common/config"
"github.com/prometheus/common/version"

"github.com/grafana/loki/pkg/canary/comparator"
Expand All @@ -36,7 +39,10 @@ func main() {
sValue := flag.String("streamvalue", "stdout", "The unique stream value for this instance of loki-canary to use in the log selector")
port := flag.Int("port", 3500, "Port which loki-canary should expose metrics")
addr := flag.String("addr", "", "The Loki server URL:Port, e.g. loki:3100")
tls := flag.Bool("tls", false, "Does the loki connection use TLS?")
useTLS := flag.Bool("tls", false, "Does the loki connection use TLS?")
certFile := flag.String("cert-file", "", "Client PEM encoded X.509 certificate for optional use with TLS connection to Loki")
keyFile := flag.String("key-file", "", "Client PEM encoded X.509 key for optional use with TLS connection to Loki")
caFile := flag.String("ca-file", "", "Client certificate authority for optional use with TLS connection to Loki")
user := flag.String("user", "", "Loki username.")
pass := flag.String("pass", "", "Loki password.")
tenantID := flag.String("tenant-id", "", "Tenant ID to be set in X-Scope-OrgID header.")
Expand Down Expand Up @@ -83,6 +89,26 @@ func main() {
os.Exit(1)
}

var tlsConfig *tls.Config
tc := config.TLSConfig{}
if *certFile != "" || *keyFile != "" || *caFile != "" {
if !*useTLS {
_, _ = fmt.Fprintf(os.Stderr, "Must set --tls when specifying client certs\n")
os.Exit(1)
}
tc.CAFile = *caFile
tc.CertFile = *certFile
tc.KeyFile = *keyFile
tc.InsecureSkipVerify = false

var err error
tlsConfig, err = config.NewTLSConfig(&tc)
if err != nil {
_, _ = fmt.Fprintf(os.Stderr, "TLS configuration error: %s\n", err.Error())
os.Exit(1)
}
}

Comment on lines +92 to +111
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Correct me if I'm missing anything. We could simplify this initialization code bit.

the gist is

  1. to first use if useTLS to do all this and skip everything completely (even before checking for *certFile, *keyFile, *caFile.
  2. And trusting config.NewTLSConfig for checking if all the files are invalid (even if its empty)

Something simple like

	if useTLS {
		tls, err := config.NewTLSConfig(
			&config.TLSConfig{
				CAFile, *caFile,
				CertFile, *certFile,
				KeyFile, *keyFile,
				InsecureSkipVerify: false,
			},
		)
		if err != nil {
			// print error
			os.Exit(1)
		}
	}
}

No need to check for any empty string on those cert files. NewTLSConfig() should return error in that case correct?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The main reason for doing this was to ensure valid configuration. Setting the client certs requires -tls to be set otherwise it is a noop. I actually made this mistake in my own testing and was glad to have the error message. On the other hand, when setting -tls it does not require setting client certs.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

On the other hand, when setting -tls it does not require setting client certs.

Ah make sense. In that case we just use default client. thanks for the clarification.

sentChan := make(chan time.Time)
receivedChan := make(chan time.Time)

Expand All @@ -94,7 +120,7 @@ func main() {
defer c.lock.Unlock()

c.writer = writer.NewWriter(os.Stdout, sentChan, *interval, *outOfOrderMin, *outOfOrderMax, *outOfOrderPercentage, *size)
c.reader = reader.NewReader(os.Stderr, receivedChan, *tls, *addr, *user, *pass, *tenantID, *queryTimeout, *lName, *lVal, *sName, *sValue, *interval)
c.reader = reader.NewReader(os.Stderr, receivedChan, *useTLS, tlsConfig, *caFile, *addr, *user, *pass, *tenantID, *queryTimeout, *lName, *lVal, *sName, *sValue, *interval)
c.comparator = comparator.NewComparator(os.Stderr, *wait, *maxWait, *pruneInterval, *spotCheckInterval, *spotCheckMax, *spotCheckQueryRate, *spotCheckWait, *metricTestInterval, *metricTestQueryRange, *interval, *buckets, sentChan, receivedChan, c.reader, true)
}

Expand Down
134 changes: 90 additions & 44 deletions pkg/canary/reader/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package reader

import (
"context"
"crypto/tls"
"encoding/base64"
"fmt"
"io"
Expand All @@ -21,6 +22,7 @@ import (
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/prometheus/common/config"

"github.com/grafana/loki/pkg/loghttp"
"github.com/grafana/loki/pkg/logqlmodel"
Expand Down Expand Up @@ -48,32 +50,36 @@ type LokiReader interface {
}

type Reader struct {
header http.Header
tls bool
addr string
user string
pass string
tenantID string
queryTimeout time.Duration
sName string
sValue string
lName string
lVal string
backoff *backoff.Backoff
nextQuery time.Time
backoffMtx sync.RWMutex
interval time.Duration
conn *websocket.Conn
w io.Writer
recv chan time.Time
quit chan struct{}
shuttingDown bool
done chan struct{}
header http.Header
tls bool
clientTLSConfig *tls.Config
caFile string
addr string
user string
pass string
tenantID string
queryTimeout time.Duration
sName string
sValue string
lName string
lVal string
backoff *backoff.Backoff
nextQuery time.Time
backoffMtx sync.RWMutex
interval time.Duration
conn *websocket.Conn
w io.Writer
recv chan time.Time
quit chan struct{}
shuttingDown bool
done chan struct{}
}

func NewReader(writer io.Writer,
receivedChan chan time.Time,
tls bool,
tlsConfig *tls.Config,
caFile string,
address string,
user string,
pass string,
Expand Down Expand Up @@ -102,25 +108,27 @@ func NewReader(writer io.Writer,
bkoff := backoff.New(context.Background(), bkcfg)

rd := Reader{
header: h,
tls: tls,
addr: address,
user: user,
pass: pass,
tenantID: tenantID,
queryTimeout: queryTimeout,
sName: streamName,
sValue: streamValue,
lName: labelName,
lVal: labelVal,
nextQuery: next,
backoff: bkoff,
interval: interval,
w: writer,
recv: receivedChan,
quit: make(chan struct{}),
done: make(chan struct{}),
shuttingDown: false,
header: h,
tls: tls,
clientTLSConfig: tlsConfig,
caFile: caFile,
addr: address,
user: user,
pass: pass,
tenantID: tenantID,
queryTimeout: queryTimeout,
sName: streamName,
sValue: streamValue,
lName: labelName,
lVal: labelVal,
nextQuery: next,
backoff: bkoff,
interval: interval,
w: writer,
recv: receivedChan,
quit: make(chan struct{}),
done: make(chan struct{}),
shuttingDown: false,
}

go rd.run()
Expand Down Expand Up @@ -189,7 +197,11 @@ func (r *Reader) QueryCountOverTime(queryRange string) (float64, error) {
}
req.Header.Set("User-Agent", userAgent)

resp, err := http.DefaultClient.Do(req)
httpClient, err := r.httpClient()
if err != nil {
return 0, err
}
resp, err := httpClient.Do(req)
if err != nil {
return 0, err
}
Expand Down Expand Up @@ -280,7 +292,11 @@ func (r *Reader) Query(start time.Time, end time.Time) ([]time.Time, error) {
}
req.Header.Set("User-Agent", userAgent)

resp, err := http.DefaultClient.Do(req)
httpClient, err := r.httpClient()
if err != nil {
return nil, err
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would highly recommend wrapping the error here. (and also on the QueryCountOverTime) with some context.

Rationale is I see we use it on both Query and QueryCountOverTime and it can confuse the consumer of the error, where exactly it is coming from.

}
resp, err := httpClient.Do(req)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -329,7 +345,7 @@ func (r *Reader) Query(start time.Time, end time.Time) ([]time.Time, error) {
return tss, nil
}

// run uses the established websocket connection to tail logs from Loki and
// run uses the established websocket connection to tail logs from Loki
func (r *Reader) run() {
r.closeAndReconnect()

Expand Down Expand Up @@ -421,7 +437,8 @@ func (r *Reader) closeAndReconnect() {

fmt.Fprintf(r.w, "Connecting to loki at %v, querying for label '%v' with value '%v'\n", u.String(), r.lName, r.lVal)

c, _, err := websocket.DefaultDialer.Dial(u.String(), r.header)
dialer := r.webSocketDialer()
c, _, err := dialer.Dial(u.String(), r.header)
if err != nil {
fmt.Fprintf(r.w, "failed to connect to %s with err %s\n", u.String(), err)
<-time.After(10 * time.Second)
Expand All @@ -442,6 +459,35 @@ func (r *Reader) closeAndReconnect() {
}
}

// httpClient uses the config in Reader to return a http client.
// http.DefaultClient will be returned in the case that the connection to Loki is http or TLS without client certs.
// For the mTLS case, return a http.Client configured to use the client side certificates.
func (r *Reader) httpClient() (*http.Client, error) {
if r.clientTLSConfig == nil {
return http.DefaultClient, nil
DylanGuedes marked this conversation as resolved.
Show resolved Hide resolved
}
rt, err := config.NewTLSRoundTripper(r.clientTLSConfig, r.caFile, func(tls *tls.Config) (http.RoundTripper, error) {
return &http.Transport{TLSClientConfig: tls}, nil
})
if err != nil {
return nil, err
}
return &http.Client{
Transport: rt,
}, nil
}

// webSocketDialer creates a dialer for the web socket connection to Loki
// websocket.DefaultDialer will be returned in the case that the connection to Loki is http or TLS without client certs.
// For the mTLS case, return a websocket.Dialer configured to use client side certificates.
func (r *Reader) webSocketDialer() *websocket.Dialer {
dialer := websocket.DefaultDialer
if r.clientTLSConfig != nil {
dialer.TLSClientConfig = r.clientTLSConfig
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We probably shouldn't be mutating websocket.DefaultDialer in this library package.

Instead, let's create a new one like:

var dialer = &Dialer{
	Proxy:            http.ProxyFromEnvironment,
	HandshakeTimeout: 45 * time.Second,
	TLSClientConfig = r.clientTLSConfig,
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My understanding is that dialer is a copy of websocket.DefaultDialer and there is no mutation happening outside this package. However, I like your way better as it keeps me from having to do local mutation.

}
return dialer
}

func parseResponse(entry *loghttp.Entry) (*time.Time, error) {
sp := strings.Split(entry.Line, " ")
if len(sp) != 2 {
Expand Down