Skip to content

Commit

Permalink
[FIX] Storage implement the Surveyee interface
Browse files Browse the repository at this point in the history
This is a proper fix for #323
  • Loading branch information
Florimond committed Jan 29, 2024
1 parent 4418034 commit 2400eae
Show file tree
Hide file tree
Showing 6 changed files with 16 additions and 9 deletions.
6 changes: 1 addition & 5 deletions internal/broker/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,11 +157,7 @@ func NewService(ctx context.Context, cfg *config.Config) (s *Service, err error)
s.surveyor = survey.New(s.pubsub, s.cluster)
s.presence = presence.New(s, s.pubsub, s.surveyor, s.subscriptions)
if s.cluster != nil {
if s.storage.Name() == ssdstore.Name() {
s.surveyor.HandleFunc(s.presence, ssdstore)
} else if s.storage.Name() == memstore.Name() {
s.surveyor.HandleFunc(s.presence, memstore)
}
s.surveyor.HandleFunc(s.storage)
}

// Create a new cipher from the licence provided
Expand Down
2 changes: 1 addition & 1 deletion internal/provider/storage/memory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ func TestInMemory_Query(t *testing.T) {
if tc.gathered == nil {
s.survey = nil
} else {
s.survey = survey(func(string, []byte) (message.Awaiter, error) {
s.survey = surveyFunc(func(string, []byte) (message.Awaiter, error) {
return &mockAwaiter{f: func(_ time.Duration) [][]byte { return [][]byte{tc.gathered} }}, nil
})
}
Expand Down
2 changes: 1 addition & 1 deletion internal/provider/storage/ssd_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ func TestSSD_QuerySurveyed(t *testing.T) {
if tc.gathered == nil {
s.survey = nil
} else {
s.survey = survey(func(string, []byte) (message.Awaiter, error) {
s.survey = surveyFunc(func(string, []byte) (message.Awaiter, error) {
return &mockAwaiter{f: func(_ time.Duration) [][]byte { return [][]byte{tc.gathered} }}, nil
})
}
Expand Down
7 changes: 7 additions & 0 deletions internal/provider/storage/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"github.com/emitter-io/config"
"github.com/emitter-io/emitter/internal/message"
"github.com/emitter-io/emitter/internal/security"
"github.com/emitter-io/emitter/internal/service/survey"
)

var (
Expand All @@ -37,6 +38,7 @@ const (
type Storage interface {
config.Provider
io.Closer
survey.Surveyee

// Store is used to store a message, the SSID provided must be a full SSID
// SSID, where first element should be a contract ID. The time resolution
Expand Down Expand Up @@ -139,3 +141,8 @@ func (s *Noop) Query(ssid message.Ssid, from, until time.Time, startFromID messa
func (s *Noop) Close() error {
return nil
}

// OnSurvey handles an incoming cluster lookup request.
func (s *Noop) OnSurvey(surveyType string, payload []byte) ([]byte, bool) {
return []byte{}, true
}
4 changes: 2 additions & 2 deletions internal/provider/storage/storage_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,9 @@ import (
"github.com/stretchr/testify/assert"
)

type survey func(string, []byte) (message.Awaiter, error)
type surveyFunc func(string, []byte) (message.Awaiter, error)

func (s survey) Query(q string, b []byte) (message.Awaiter, error) {
func (s surveyFunc) Query(q string, b []byte) (message.Awaiter, error) {
return s(q, b)
}

Expand Down
4 changes: 4 additions & 0 deletions internal/service/pubsub/subscribe_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,3 +184,7 @@ func (s *buggyStore) Query(ssid message.Ssid, from, until time.Time, startFromID
func (s *buggyStore) Close() error {
return errors.New("not working")
}

func (s *buggyStore) OnSurvey(surveyType string, payload []byte) ([]byte, bool) {
return []byte{}, true
}

0 comments on commit 2400eae

Please sign in to comment.