Skip to content

Commit

Permalink
feat: add table and endpoint for tracking waku message sequences
Browse files Browse the repository at this point in the history
  • Loading branch information
adklempner committed Jun 24, 2024
1 parent 8fe3616 commit f9fef72
Show file tree
Hide file tree
Showing 7 changed files with 159 additions and 2 deletions.
10 changes: 10 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,16 @@ Then you can run the server with:
go run cmd/server/main.go -data-source-name postgres://telemetry:newPassword@127.0.0.1:5432/telemetry
```

If trying to run locally you receive the following error:
```
pq: SSL is not enabled on the server
```

Run this command instead:
```
go run cmd/server/main.go -data-source-name "postgres://telemetry:newPassword@127.0.0.1:5432/telemetry?sslmode=disable"
```

Finally, to run the test:
```
make test
Expand Down
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ go 1.20
require (
github.com/go-auxiliaries/shrinking-map v0.3.0
github.com/golang-migrate/migrate/v4 v4.15.2
github.com/gorilla/handlers v1.5.2
github.com/gorilla/mux v1.8.0
github.com/lib/pq v1.10.3
github.com/robfig/cron/v3 v3.0.1
Expand All @@ -15,6 +16,7 @@ require (

require (
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/felixge/httpsnoop v1.0.3 // indirect
github.com/hashicorp/errwrap v1.1.0 // indirect
github.com/hashicorp/go-multierror v1.1.1 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
Expand Down
4 changes: 4 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -391,6 +391,8 @@ github.com/evanphx/json-patch v4.9.0+incompatible/go.mod h1:50XU6AFN0ol/bzJsmQLi
github.com/evanphx/json-patch v4.11.0+incompatible/go.mod h1:50XU6AFN0ol/bzJsmQLiYLvXMP4fmwYFNcr97nuDLSk=
github.com/fatih/color v1.7.0/go.mod h1:Zm6kSWBoL9eyXnKyktHP6abPY2pDugNf5KwzbycvMj4=
github.com/felixge/httpsnoop v1.0.1/go.mod h1:m8KPJKqk1gH5J9DgRY2ASl2lWCfGKXixSwevea8zH2U=
github.com/felixge/httpsnoop v1.0.3 h1:s/nj+GCswXYzN5v2DpNMuMQYe+0DDwt5WVCU6CWBdXk=
github.com/felixge/httpsnoop v1.0.3/go.mod h1:m8KPJKqk1gH5J9DgRY2ASl2lWCfGKXixSwevea8zH2U=
github.com/fogleman/gg v1.2.1-0.20190220221249-0403632d5b90/go.mod h1:R/bRT+9gY/C5z7JzPU0zXsXHKM4/ayA+zqcVNZzPa1k=
github.com/fogleman/gg v1.3.0/go.mod h1:R/bRT+9gY/C5z7JzPU0zXsXHKM4/ayA+zqcVNZzPa1k=
github.com/form3tech-oss/jwt-go v3.2.2+incompatible/go.mod h1:pbq4aXjuKjdthFRnoDwaVPLA+WlJuPGy+QneDUgJi2k=
Expand Down Expand Up @@ -595,6 +597,8 @@ github.com/googleapis/gnostic v0.5.5/go.mod h1:7+EbHbldMins07ALC74bsA81Ovc97Dwqy
github.com/gopherjs/gopherjs v0.0.0-20181017120253-0766667cb4d1/go.mod h1:wJfORRmW1u3UXTncJ5qlYoELFm8eSnnEO6hX4iZ3EWY=
github.com/gorilla/handlers v0.0.0-20150720190736-60c7bfde3e33/go.mod h1:Qkdc/uu4tH4g6mTK6auzZ766c4CA0Ng8+o/OAirnOIQ=
github.com/gorilla/handlers v1.4.2/go.mod h1:Qkdc/uu4tH4g6mTK6auzZ766c4CA0Ng8+o/OAirnOIQ=
github.com/gorilla/handlers v1.5.2 h1:cLTUSsNkgcwhgRqvCNmdbRWG0A3N4F+M2nWKdScwyEE=
github.com/gorilla/handlers v1.5.2/go.mod h1:dX+xVpaxdSw+q0Qek8SSsl3dfMk3jNddUkMzo0GtH0w=
github.com/gorilla/mux v1.7.2/go.mod h1:1lud6UwP+6orDFRuTfBEV8e9/aOM/c4fVVCaMa2zaAs=
github.com/gorilla/mux v1.7.3/go.mod h1:1lud6UwP+6orDFRuTfBEV8e9/aOM/c4fVVCaMa2zaAs=
github.com/gorilla/mux v1.7.4/go.mod h1:DVbg23sWSpFRCP0SfiEN6jmj59UnW/n46BH5rLB71So=
Expand Down
27 changes: 25 additions & 2 deletions telemetry/bindata.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

53 changes: 53 additions & 0 deletions telemetry/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ func NewServer(db *sql.DB, logger *zap.Logger) *Server {

server.Router.HandleFunc("/protocol-stats", server.createProtocolStats).Methods("POST")
server.Router.HandleFunc("/received-messages", server.createReceivedMessages).Methods("POST")
server.Router.HandleFunc("/waku-metrics", server.createWakuTelemetry).Methods("POST")
server.Router.HandleFunc("/received-envelope", server.createReceivedEnvelope).Methods("POST")
server.Router.HandleFunc("/sent-envelope", server.createSentEnvelope).Methods("POST")
server.Router.HandleFunc("/update-envelope", server.updateEnvelope).Methods("POST")
Expand Down Expand Up @@ -349,7 +350,59 @@ func (s *Server) rateLimit(next http.Handler) http.Handler {
})
}

func (s *Server) createWakuTelemetry(w http.ResponseWriter, r *http.Request) {
start := time.Now()
var telemetryData []WakuTelemetryRequest
decoder := json.NewDecoder(r.Body)
if err := decoder.Decode(&telemetryData); err != nil {
log.Println(err)
http.Error(w, "Failed to decode telemetry data", http.StatusBadRequest)
return
}

var errorDetails []map[string]interface{}

for _, data := range telemetryData {
switch data.TelemetryType {
case LightPushFilter:
var pushFilter TelemetryPushFilter
if err := json.Unmarshal(*data.TelemetryData, &pushFilter); err != nil {
errorDetails = append(errorDetails, map[string]interface{}{"Id": data.Id, "Error": fmt.Sprintf("Error decoding lightpush/filter metric: %v", err)})
continue
}
if err := pushFilter.put(s.DB); err != nil {
errorDetails = append(errorDetails, map[string]interface{}{"Id": data.Id, "Error": fmt.Sprintf("Error saving lightpush/filter metric: %v", err)})
continue
}
default:
errorDetails = append(errorDetails, map[string]interface{}{"Id": data.Id, "Error": fmt.Sprintf("Unknown waku telemetry type: %s", data.TelemetryType)})
}
}

if len(errorDetails) > 0 {
log.Printf("Errors encountered: %v", errorDetails)
err := respondWithError(w, http.StatusInternalServerError, "Error processing telemetry requests")
if err != nil {
s.logger.Error("failed to respond", zap.Error(err))
}
return
}

err := respondWithJSON(w, http.StatusCreated, errorDetails)
if err != nil {
log.Println(err)
}

log.Printf(
"%s\t%s\t%s",
r.Method,
r.RequestURI,
time.Since(start),
)
}

func (s *Server) Start(port int) {
s.logger.Info("Starting server", zap.Int("port", port))

log.Fatal(http.ListenAndServe(fmt.Sprintf(":%d", port), s.Router))
}
15 changes: 15 additions & 0 deletions telemetry/sql/000007_waku_push_filter.up.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
CREATE TABLE IF NOT EXISTS wakuPushFilter (
id SERIAL PRIMARY KEY,
walletAddress VARCHAR(255),
peerIdSender VARCHAR(255) NOT NULL,
peerIdReporter VARCHAR(255) NOT NULL,
sequenceHash VARCHAR(255) NOT NULL,
sequenceTotal VARCHAR(255) NOT NULL,
sequenceIndex VARCHAR(255) NOT NULL,
contentTopic VARCHAR(255) NOT NULL,
pubsubTopic VARCHAR(255) NOT NULL,
timestamp INTEGER NOT NULL,
createdAt INTEGER NOT NULL,

CONSTRAINT wakuPushFilter_unique unique(peerIdSender, peerIdReporter, sequenceHash, sequenceIndex)
);
50 changes: 50 additions & 0 deletions telemetry/waku_metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
package telemetry

import (
"database/sql"
"encoding/json"
"time"
)

type WakuTelemetryType string

const (
LightPushFilter WakuTelemetryType = "LightPushFilter"
)

type WakuTelemetryRequest struct {
Id int `json:"id"`
TelemetryType WakuTelemetryType `json:"telemetryType"`
TelemetryData *json.RawMessage `json:"telemetryData"`
}

type TelemetryPushFilter struct {
ID int `json:"id"`
WalletAddress string `json:"walletAddress"`
PeerIDSender string `json:"peerIdSender"`
PeerIDReporter string `json:"peerIdReporter"`
SequenceHash string `json:"sequenceHash"`
SequenceTotal uint64 `json:"sequenceTotal"`
SequenceIndex uint64 `json:"sequenceIndex"`
ContentTopic string `json:"contentTopic"`
PubsubTopic string `json:"pubsubTopic"`
Timestamp int64 `json:"timestamp"`
CreatedAt int64 `json:"createdAt"`
}

func (r *TelemetryPushFilter) put(db *sql.DB) error {
stmt, err := db.Prepare("INSERT INTO wakuPushFilter (peerIdSender, peerIdReporter, sequenceHash, sequenceTotal, sequenceIndex, contentTopic, pubsubTopic, timestamp, createdAt) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9) RETURNING id;")
if err != nil {
return err
}

r.CreatedAt = time.Now().Unix()
lastInsertId := 0
err = stmt.QueryRow(r.PeerIDSender, r.PeerIDReporter, r.SequenceHash, r.SequenceTotal, r.SequenceIndex, r.ContentTopic, r.PubsubTopic, r.Timestamp, r.CreatedAt).Scan(&lastInsertId)
if err != nil {
return err
}
r.ID = lastInsertId

return nil
}

0 comments on commit f9fef72

Please sign in to comment.