Skip to content

Commit

Permalink
WIP Basic limiter working, tests todo
Browse files Browse the repository at this point in the history
  • Loading branch information
Florimond committed Sep 7, 2023
1 parent 007434d commit 7304a50
Show file tree
Hide file tree
Showing 11 changed files with 100 additions and 85 deletions.
8 changes: 4 additions & 4 deletions internal/provider/storage/memory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ func TestInMemory_Query(t *testing.T) {
})
}

out, err := s.Query(tc.query, zero, zero, tc.limit)
out, err := s.Query(tc.query, zero, zero, nil, NewMessageNumberLimiter(int64(tc.limit)))
assert.NoError(t, err)

count := 0
Expand Down Expand Up @@ -152,7 +152,7 @@ func TestInMemory_lookup(t *testing.T) {
}

for _, tc := range tests {
matches := s.lookup(newLookupQuery(tc.query, zero, zero, tc.limit))
matches := s.lookup(newLookupQuery(tc.query, zero, zero, nil, NewMessageNumberLimiter(int64(tc.limit))))
assert.Equal(t, tc.count, len(matches))
}
}
Expand All @@ -172,13 +172,13 @@ func TestInMemory_OnSurvey(t *testing.T) {
{name: "ssdstore"},
{
name: "ssdstore",
query: newLookupQuery(message.Ssid{0, 1}, zero, zero, 1),
query: newLookupQuery(message.Ssid{0, 1}, zero, zero, nil, NewMessageNumberLimiter(1)),
expectOk: true,
expectCount: 1,
},
{
name: "ssdstore",
query: newLookupQuery(message.Ssid{0, 1}, zero, zero, 10),
query: newLookupQuery(message.Ssid{0, 1}, zero, zero, nil, NewMessageNumberLimiter(10)),
expectOk: true,
expectCount: 2,
},
Expand Down
15 changes: 11 additions & 4 deletions internal/provider/storage/ssd.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ func (s *SSD) Query(ssid message.Ssid, from, untilTime time.Time, untilID messag
}
}

match.Limit(limit)
limiter.Limit(&match)
return match, nil
}

Expand Down Expand Up @@ -183,13 +183,20 @@ func (s *SSD) lookup(q lookupQuery) (matches message.Frame) {

// Since we're starting backwards, seek to the 'until' position first and then
// we'll iterate forward but have reverse time ('until' -> 'from')
prefix := q.UntilID
if len(prefix) == 0 {
var prefix message.ID
if len(q.UntilID) == 0 {
prefix = message.NewPrefix(q.Ssid, q.UntilTime)
it.Seek(prefix)
} else {
it.Seek(q.UntilID)
if !it.Valid() {
return nil
}
it.Next()
}

// Seek the prefix and check the key so we can quickly exit the iteration.
for it.Seek(prefix); it.Valid() &&
for ; it.Valid() &&
message.ID(it.Item().Key()).HasPrefix(q.Ssid, q.From); it.Next() {
if message.ID(it.Item().Key()).Match(q.Ssid, q.From, q.UntilTime) {
if msg, err := loadMessage(it.Item()); err == nil {
Expand Down
11 changes: 6 additions & 5 deletions internal/provider/storage/ssd_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ func TestSSD_Query(t *testing.T) {
assert.NoError(t, err)

zero := time.Unix(0, 0)
f, err := store.Query([]uint32{0, 3, 2, 6}, zero, zero, 5)
f, err := store.Query([]uint32{0, 3, 2, 6}, zero, zero, nil, NewMessageNumberLimiter(5))
assert.NoError(t, err)
assert.Len(t, f, 1)
})
Expand Down Expand Up @@ -121,7 +121,8 @@ func TestSSD_QuerySurveyed(t *testing.T) {
})
}

out, err := s.Query(tc.query, zero, zero, tc.limit, nil)
limiter := NewMessageNumberLimiter(int64(tc.limit))
out, err := s.Query(tc.query, zero, zero, nil, limiter)
assert.NoError(t, err)
count := 0
for range out {
Expand All @@ -146,13 +147,13 @@ func TestSSD_OnSurvey(t *testing.T) {
{name: "ssdstore"},
{
name: "ssdstore",
query: newLookupQuery(message.Ssid{0, 1}, zero, zero, 1, nil),
query: newLookupQuery(message.Ssid{0, 1}, zero, zero, nil, NewMessageNumberLimiter(1)),
expectOk: true,
expectCount: 1,
},
{
name: "ssdstore",
query: newLookupQuery(message.Ssid{0, 1}, zero, zero, 10, nil),
query: newLookupQuery(message.Ssid{0, 1}, zero, zero, nil, NewMessageNumberLimiter(10)),
expectOk: true,
expectCount: 2,
},
Expand Down Expand Up @@ -316,7 +317,7 @@ func benchmarkQuery(b *testing.B, store *SSD, last int, m *stats.Metric) {
return

default:
store.Query(ssid, t0, t1, last, nil)
store.Query(ssid, t0, t1, nil, NewMessageNumberLimiter(int64(last)))
m.Update(int32(last))
}
}
Expand Down
18 changes: 13 additions & 5 deletions internal/provider/storage/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,20 +75,28 @@ type lookupQuery struct {

type Limiter interface {
Admit(*message.Message) bool
Limit(*message.Frame)
}

// MessageNumberLimiter provide an Limiter implementation to replace the "limit"
// parameter in the Query() function.
type MessageNumberLimiter struct {
count int64
limit int64
msgCount int64
MsgLimit int64
}

func (n *MessageNumberLimiter) Admit(m *message.Message) bool {
return n.count < n.limit
func (limiter *MessageNumberLimiter) Admit(m *message.Message) bool {
admit := limiter.msgCount < limiter.MsgLimit
limiter.msgCount += 1
return admit
}

func (limiter *MessageNumberLimiter) Limit(frame *message.Frame) {
frame.Limit(int(limiter.MsgLimit))
}

func NewMessageNumberLimiter(limit int64) Limiter {
return &MessageNumberLimiter{limit: limit}
return &MessageNumberLimiter{MsgLimit: limit}
}

// newLookupQuery creates a new lookup query
Expand Down
8 changes: 4 additions & 4 deletions internal/provider/storage/storage_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ func TestNoop_Store(t *testing.T) {
func TestNoop_Query(t *testing.T) {
s := new(Noop)
zero := time.Unix(0, 0)
r, err := s.Query(testMessage(1, 2, 3).Ssid(), zero, zero, 10)
r, err := s.Query(testMessage(1, 2, 3).Ssid(), zero, zero, nil, NewMessageNumberLimiter(10))
assert.NoError(t, err)
for range r {
t.Errorf("Should be empty")
Expand Down Expand Up @@ -80,7 +80,7 @@ func testOrder(t *testing.T, store Storage) {

// Issue a query
zero := time.Unix(0, 0)
f, err := store.Query([]uint32{0, 1, 2}, zero, zero, 5)
f, err := store.Query([]uint32{0, 1, 2}, zero, zero, nil, NewMessageNumberLimiter(5))
assert.NoError(t, err)

assert.Len(t, f, 5)
Expand All @@ -103,7 +103,7 @@ func testRetained(t *testing.T, store Storage) {

// Issue a query
zero := time.Unix(0, 0)
f, err := store.Query([]uint32{0, 1, 2}, zero, zero, 1)
f, err := store.Query([]uint32{0, 1, 2}, zero, zero, nil, NewMessageNumberLimiter(1))
assert.NoError(t, err)

assert.Len(t, f, 1)
Expand All @@ -126,7 +126,7 @@ func testRange(t *testing.T, store Storage) {
}

// Issue a query
f, err := store.Query([]uint32{0, 1, 2}, time.Unix(t0, 0), time.Unix(t1, 0), 5)
f, err := store.Query([]uint32{0, 1, 2}, time.Unix(t0, 0), time.Unix(t1, 0), nil, NewMessageNumberLimiter(5))
assert.NoError(t, err)

assert.Len(t, f, 5)
Expand Down
16 changes: 9 additions & 7 deletions internal/service/history/history.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"github.com/emitter-io/emitter/internal/errors"
"github.com/emitter-io/emitter/internal/message"
"github.com/emitter-io/emitter/internal/provider/logging"
"github.com/emitter-io/emitter/internal/provider/storage"
"github.com/emitter-io/emitter/internal/security"
"github.com/emitter-io/emitter/internal/service"
)
Expand Down Expand Up @@ -80,17 +81,18 @@ func (s *Service) OnRequest(c service.Conn, payload []byte) (service.Response, b
}

limit := int64(3)
if v, ok := channel.Last(); ok {
limit = v
}
messageLimiter := &limiter{
maxCount: limit,
}
// if v, ok := channel.Last(); ok {
// limit = v
// }
// messageLimiter := &limiter{
// maxCount: limit,
// }

ssid := message.NewSsid(key.Contract(), channel.Query)
t0, t1 := channel.Window() // Get the window

msgs, err := s.store.LimitedQuery(ssid, t0, t1, messageLimiter, request.LastMessageID)
messageLimiter := storage.NewMessageNumberLimiter(limit)
msgs, err := s.store.Query(ssid, t0, t1, request.LastMessageID, messageLimiter)
if err != nil {
logging.LogError("conn", "query last messages", err)
return errors.ErrServerError, false
Expand Down
100 changes: 48 additions & 52 deletions internal/service/history/history_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,72 +15,68 @@
package history

import (
"errors"
"testing"
"time"

"github.com/emitter-io/emitter/internal/message"
"github.com/emitter-io/emitter/internal/provider/storage"
"github.com/emitter-io/emitter/internal/service/fake"
"github.com/stretchr/testify/assert"
)

func TestHistory(t *testing.T) {
ssid := message.Ssid{1, 3238259379, 500706888, 1027807523}
store := storage.NewInMemory(nil)
store.Configure(nil)
trie := message.NewTrie()
auth := &fake.Authorizer{
Contract: 1,
}

// Create new service
s := New(auth, store)
c := &fake.Conn{}

// Store a message
for i := 0; i < 10; i++ {
store.Store(&message.Message{
ID: message.NewID(ssid),
Channel: []byte("test/"),
Payload: []byte("hello"),
TTL: 30,
})
}

request :=
err := s.OnRequest(c, )
assert.Equal(t, tc.success, err == nil)
assert.Equal(t, tc.expectLoaded, len(c.Outgoing))
assert.Equal(t, tc.expectCount, trie.Count())
assert.True(t, true)
// ssid := message.Ssid{1, 3238259379, 500706888, 1027807523}
// store := storage.NewInMemory(nil)
// store.Configure(nil)
// trie := message.NewTrie()
// auth := &fake.Authorizer{
// Contract: 1,
// }

// // Create new service
// s := New(auth, store)
// c := &fake.Conn{}

// // Store a message
// for i := 0; i < 10; i++ {
// store.Store(&message.Message{
// ID: message.NewID(ssid),
// Channel: []byte("test/"),
// Payload: []byte("hello"),
// TTL: 30,
// })
// }

// request :=
// err := s.OnRequest(c, )
// assert.Equal(t, tc.success, err == nil)
// assert.Equal(t, tc.expectLoaded, len(c.Outgoing))
// assert.Equal(t, tc.expectCount, trie.Count())

}

// ------------------------------------------------------------------------------------

// Noop implements Storage contract.
var _ storage.Storage = new(buggyStore)
// // Noop implements Storage contract.
// var _ storage.Storage = new(buggyStore)

// Noop represents a storage which does nothing.
type buggyStore struct{}
// // Noop represents a storage which does nothing.
// type buggyStore struct{}

// Name returns the name of the provider.
func (s *buggyStore) Name() string {
return "noop"
}
// // Name returns the name of the provider.
// func (s *buggyStore) Name() string {
// return "noop"
// }

func (s *buggyStore) Configure(config map[string]interface{}) error {
return errors.New("not working")
}
// func (s *buggyStore) Configure(config map[string]interface{}) error {
// return errors.New("not working")
// }

func (s *buggyStore) Store(m *message.Message) error {
return errors.New("not working")
}
// func (s *buggyStore) Store(m *message.Message) error {
// return errors.New("not working")
// }

func (s *buggyStore) Query(ssid message.Ssid, from, until time.Time, limit int) (message.Frame, error) {
return nil, errors.New("not working")
}
// func (s *buggyStore) Query(ssid message.Ssid, from, until time.Time, limit int) (message.Frame, error) {
// return nil, errors.New("not working")
// }

func (s *buggyStore) Close() error {
return errors.New("not working")
}
// func (s *buggyStore) Close() error {
// return errors.New("not working")
// }
2 changes: 1 addition & 1 deletion internal/service/pubsub/lastwill_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ func TestPubSub_LastWill(t *testing.T) {

// Query the storage
{
msgs, err := store.Query(ssid, time.Unix(0, 0), time.Now(), 100)
msgs, err := store.Query(ssid, time.Unix(0, 0), time.Now(), nil, storage.NewMessageNumberLimiter(100))
assert.NoError(t, err)
assert.Equal(t, tc.expectStored, len(msgs))
}
Expand Down
2 changes: 1 addition & 1 deletion internal/service/pubsub/publish_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ func TestPubSub_Publish(t *testing.T) {

// Query the storage
{
msgs, err := store.Query(ssid, time.Unix(0, 0), time.Now(), 100)
msgs, err := store.Query(ssid, time.Unix(0, 0), time.Now(), nil, storage.NewMessageNumberLimiter(100))
assert.NoError(t, err)
assert.Equal(t, tc.expectStored, len(msgs))
}
Expand Down
3 changes: 2 additions & 1 deletion internal/service/pubsub/subscribe.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"github.com/emitter-io/emitter/internal/event"
"github.com/emitter-io/emitter/internal/message"
"github.com/emitter-io/emitter/internal/provider/logging"
"github.com/emitter-io/emitter/internal/provider/storage"
"github.com/emitter-io/emitter/internal/security"
"github.com/emitter-io/emitter/internal/service"
"github.com/kelindar/binary/nocopy"
Expand Down Expand Up @@ -85,7 +86,7 @@ func (s *Service) OnSubscribe(c service.Conn, mqttTopic []byte) *errors.Error {
// Check if the key has a load permission (also applies for retained)
if key.HasPermission(security.AllowLoad) {
t0, t1 := channel.Window() // Get the window
msgs, err := s.store.Query(ssid, t0, t1, int(limit))
msgs, err := s.store.Query(ssid, t0, t1, nil, storage.NewMessageNumberLimiter(limit))
if err != nil {
logging.LogError("conn", "query last messages", err)
return errors.ErrServerError
Expand Down
2 changes: 1 addition & 1 deletion internal/service/pubsub/subscribe_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,7 @@ func (s *buggyStore) Store(m *message.Message) error {
return errors.New("not working")
}

func (s *buggyStore) Query(ssid message.Ssid, from, until time.Time, limit int) (message.Frame, error) {
func (s *buggyStore) Query(ssid message.Ssid, from, untilTime time.Time, untilID message.ID, limiter storage.Limiter) (message.Frame, error) {
return nil, errors.New("not working")
}

Expand Down

0 comments on commit 7304a50

Please sign in to comment.