Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

loki-canary: Add support for client-side TLS certs for Loki connection #6310

Merged
merged 4 commits into from
Jun 9, 2022
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
## Main

* [6310](https://github.com/grafana/loki/pull/6310) **chodges15**: Add support for client-side TLS certs in loki-canary for Loki connection
DylanGuedes marked this conversation as resolved.
Show resolved Hide resolved
* [6099](https://github.com/grafana/loki/pull/6099/files) **cstyan**: Drop lines with malformed JSON in Promtail JSON pipeline stage
* [6136](https://github.com/grafana/loki/pull/6136) **periklis**: Add support for alertmanager header authorization
* [6102](https://github.com/grafana/loki/pull/6102) **timchenko-a**: Add multi-tenancy support to lambda-promtail
Expand Down
30 changes: 26 additions & 4 deletions cmd/loki-canary/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,18 @@ package main
import (
"flag"
"fmt"
"net/http"
"os"
"os/signal"
"strconv"
"sync"
"syscall"
"time"

"crypto/tls"
"net/http"
"os/signal"

"github.com/prometheus/client_golang/prometheus/promhttp"
"github.com/prometheus/common/config"
"github.com/prometheus/common/version"

"github.com/grafana/loki/pkg/canary/comparator"
Expand All @@ -36,7 +39,10 @@ func main() {
sValue := flag.String("streamvalue", "stdout", "The unique stream value for this instance of loki-canary to use in the log selector")
port := flag.Int("port", 3500, "Port which loki-canary should expose metrics")
addr := flag.String("addr", "", "The Loki server URL:Port, e.g. loki:3100")
tls := flag.Bool("tls", false, "Does the loki connection use TLS?")
useTls := flag.Bool("tls", false, "Does the loki connection use TLS?")
certFile := flag.String("cert-file", "", "Client PEM encoded X.509 certificate for optional use with TLS connection to Loki")
keyFile := flag.String("key-file", "", "Client PEM encoded X.509 key for optional use with TLS connection to Loki")
caFile := flag.String("ca-file", "", "Client certificate authority for optional use with TLS connection to Loki")
user := flag.String("user", "", "Loki username.")
pass := flag.String("pass", "", "Loki password.")
tenantID := flag.String("tenant-id", "", "Tenant ID to be set in X-Scope-OrgID header.")
Expand Down Expand Up @@ -83,6 +89,22 @@ func main() {
os.Exit(1)
}

var tlsConfig *tls.Config
tc := config.TLSConfig{}
if *certFile != "" || *keyFile != "" || *caFile != "" {
tc.CAFile = *caFile
tc.CertFile = *certFile
tc.KeyFile = *keyFile
tc.InsecureSkipVerify = false

var err error
tlsConfig, err = config.NewTLSConfig(&tc)
if err != nil {
_, _ = fmt.Fprintf(os.Stderr, "TLS configuration error: %s\n", err.Error())
os.Exit(1)
}
}

sentChan := make(chan time.Time)
receivedChan := make(chan time.Time)

Expand All @@ -94,7 +116,7 @@ func main() {
defer c.lock.Unlock()

c.writer = writer.NewWriter(os.Stdout, sentChan, *interval, *outOfOrderMin, *outOfOrderMax, *outOfOrderPercentage, *size)
c.reader = reader.NewReader(os.Stderr, receivedChan, *tls, *addr, *user, *pass, *tenantID, *queryTimeout, *lName, *lVal, *sName, *sValue, *interval)
c.reader = reader.NewReader(os.Stderr, receivedChan, *useTls, tlsConfig, *caFile, *addr, *user, *pass, *tenantID, *queryTimeout, *lName, *lVal, *sName, *sValue, *interval)
c.comparator = comparator.NewComparator(os.Stderr, *wait, *maxWait, *pruneInterval, *spotCheckInterval, *spotCheckMax, *spotCheckQueryRate, *spotCheckWait, *metricTestInterval, *metricTestQueryRange, *interval, *buckets, sentChan, receivedChan, c.reader, true)
}

Expand Down
54 changes: 50 additions & 4 deletions pkg/canary/reader/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package reader

import (
"context"
"crypto/tls"
"encoding/base64"
"fmt"
"io"
Expand All @@ -21,6 +22,7 @@ import (
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/prometheus/common/config"

"github.com/grafana/loki/pkg/loghttp"
"github.com/grafana/loki/pkg/logqlmodel"
Expand Down Expand Up @@ -50,6 +52,8 @@ type LokiReader interface {
type Reader struct {
header http.Header
tls bool
tlsConfig *tls.Config
caFile string
addr string
user string
pass string
Expand All @@ -74,6 +78,8 @@ type Reader struct {
func NewReader(writer io.Writer,
receivedChan chan time.Time,
tls bool,
tlsConfig *tls.Config,
caFile string,
address string,
user string,
pass string,
Expand Down Expand Up @@ -104,6 +110,8 @@ func NewReader(writer io.Writer,
rd := Reader{
header: h,
tls: tls,
tlsConfig: tlsConfig,
caFile: caFile,
addr: address,
user: user,
pass: pass,
Expand Down Expand Up @@ -189,7 +197,11 @@ func (r *Reader) QueryCountOverTime(queryRange string) (float64, error) {
}
req.Header.Set("User-Agent", userAgent)

resp, err := http.DefaultClient.Do(req)
httpClient, err := r.httpClient()
if err != nil {
return 0, err
}
resp, err := httpClient.Do(req)
if err != nil {
return 0, err
}
Expand Down Expand Up @@ -280,7 +292,11 @@ func (r *Reader) Query(start time.Time, end time.Time) ([]time.Time, error) {
}
req.Header.Set("User-Agent", userAgent)

resp, err := http.DefaultClient.Do(req)
httpClient, err := r.httpClient()
if err != nil {
return nil, err
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would highly recommend wrapping the error here. (and also on the QueryCountOverTime) with some context.

Rationale is I see we use it on both Query and QueryCountOverTime and it can confuse the consumer of the error, where exactly it is coming from.

}
resp, err := httpClient.Do(req)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -329,7 +345,7 @@ func (r *Reader) Query(start time.Time, end time.Time) ([]time.Time, error) {
return tss, nil
}

// run uses the established websocket connection to tail logs from Loki and
// run uses the established websocket connection to tail logs from Loki
func (r *Reader) run() {
r.closeAndReconnect()

Expand Down Expand Up @@ -421,7 +437,8 @@ func (r *Reader) closeAndReconnect() {

fmt.Fprintf(r.w, "Connecting to loki at %v, querying for label '%v' with value '%v'\n", u.String(), r.lName, r.lVal)

c, _, err := websocket.DefaultDialer.Dial(u.String(), r.header)
dialer := r.webSocketDialer()
c, _, err := dialer.Dial(u.String(), r.header)
if err != nil {
fmt.Fprintf(r.w, "failed to connect to %s with err %s\n", u.String(), err)
<-time.After(10 * time.Second)
Expand All @@ -442,6 +459,35 @@ func (r *Reader) closeAndReconnect() {
}
}

// httpClient uses the config in Reader to return a http client.
// http.DefaultClient will be returned in the case that the connection to Loki is http or TLS without client certs.
// For the mTLS case, return a http.Client configured to use the client side certificates.
func (r *Reader) httpClient() (*http.Client, error) {
if r.tlsConfig == nil || r.tls == false {
return http.DefaultClient, nil
DylanGuedes marked this conversation as resolved.
Show resolved Hide resolved
}
rt, err := config.NewTLSRoundTripper(r.tlsConfig, r.caFile, func(tls *tls.Config) (http.RoundTripper, error) {
return &http.Transport{TLSClientConfig: tls}, nil
})
if err != nil {
return nil, err
}
return &http.Client{
Transport: rt,
}, nil
}

// webSocketDialer creates a dialer for the web socket connection to Loki
// websocket.DefaultDialer will be returned in the case that the connection to Loki is http or TLS without client certs.
// For the mTLS case, return a websocket.Dialer configured to use client side certificates.
func (r *Reader) webSocketDialer() *websocket.Dialer {
dialer := websocket.DefaultDialer
if r.tlsConfig != nil {
dialer.TLSClientConfig = r.tlsConfig
}
return dialer
}

func parseResponse(entry *loghttp.Entry) (*time.Time, error) {
sp := strings.Split(entry.Line, " ")
if len(sp) != 2 {
Expand Down