Skip to content

Commit

Permalink
[DRAFT][IMP] keyban now disconnect the client
Browse files Browse the repository at this point in the history
This has the effect of unsubsribing the client from all channels.
Once the key is banned, the key user cannot publish anymore. Now he can't
receive anymore either.

WON'T WORK
-> As it won't disconnect other clients on different brokers. The ban is just
an event that is propagated but doesn't trigger any function
Too bad, as this would be more efficient than the alternative: check for a
banned key before every send()
  • Loading branch information
Florimond committed Aug 12, 2024
1 parent 996fa4a commit 3436dad
Show file tree
Hide file tree
Showing 5 changed files with 104 additions and 9 deletions.
17 changes: 15 additions & 2 deletions internal/broker/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ type Conn struct {
connect *event.Connection // The associated connection event.
username string // The username provided by the client during MQTT connect.
links map[string]string // The map of all pre-authorized links.
banned bool // Close() will behave differently if this flag is true. Note: can't modify Close's signature as it has to respect the one from the MQTT lib.
}

// NewConn creates a new connection.
Expand Down Expand Up @@ -88,6 +89,16 @@ func (s *Service) newConn(t net.Conn, readRate int) *Conn {
return c
}

// IsBanned checks if the connection is banned.
func (c *Conn) IsBanned() bool {
return c.banned
}

// Ban bans the connection.
func (c *Conn) Ban() {
c.banned = true
}

// ID returns the unique identifier of the subsriber.
func (c *Conn) ID() string {
return c.guid
Expand Down Expand Up @@ -365,8 +376,10 @@ func (c *Conn) Close() error {
})
}

// Publish last will
c.service.pubsub.OnLastWill(c, c.connect)
if !c.IsBanned() {
// Publish last will
c.service.pubsub.OnLastWill(c, c.connect)
}

//logging.LogTarget("conn", "closed", c.guid)
return c.socket.Close()
Expand Down
8 changes: 8 additions & 0 deletions internal/service/fake/fakes.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,14 @@ func (f *Conn) initialize() {
}
}

func (f *Conn) Ban() {

}

func (f *Conn) IsBanned() bool {
return false
}

// Close provides a fake implementation.
func (f *Conn) Close() error {
return nil
Expand Down
4 changes: 3 additions & 1 deletion internal/service/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ type Surveyee interface {
OnSurvey(string, []byte) ([]byte, bool)
}

//Surveyor issues the surveys.
// Surveyor issues the surveys.
type Surveyor interface {
Query(string, []byte) (message.Awaiter, error)
}
Expand All @@ -66,6 +66,8 @@ type Conn interface {
Links() map[string]string
GetLink([]byte) []byte
AddLink(string, *security.Channel)
Ban()
IsBanned() bool
}

// Replicator replicates an event withih the cluster
Expand Down
51 changes: 45 additions & 6 deletions internal/service/keyban/keyban.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,14 @@
package keyban

import (
"context"
"encoding/json"
"fmt"
"regexp"

"github.com/emitter-io/emitter/internal/errors"
"github.com/emitter-io/emitter/internal/event"
"github.com/emitter-io/emitter/internal/message"
"github.com/emitter-io/emitter/internal/security"
"github.com/emitter-io/emitter/internal/service"
"github.com/kelindar/binary"
Expand All @@ -32,21 +34,56 @@ var (

// Service represents a key blacklisting service.
type Service struct {
auth service.Authorizer // The authorizer to use.
keygen service.Decryptor // The key generator to use.
cluster service.Replicator // The cluster service to use.
connection service.Conn
auth service.Authorizer // The authorizer to use.
keygen service.Decryptor // The key generator to use.
cluster service.Replicator // The cluster service to use.
queue chan *Notification // The channel for keyban notifications.
context context.Context // The context for the service.
cancel context.CancelFunc // The cancellation function.
trie *message.Trie // The subscription matching trie.
}

// New creates a new key blacklisting service.
func New(auth service.Authorizer, keygen service.Decryptor, cluster service.Replicator) *Service {
return &Service{
func New(auth service.Authorizer, keygen service.Decryptor, cluster service.Replicator, trie *message.Trie) *Service {
ctx, cancel := context.WithCancel(context.Background())
s := &Service{
context: ctx,
cancel: cancel,
auth: auth,
keygen: keygen,
cluster: cluster,
trie: trie,
queue: make(chan *Notification, 100),
}

s.pollKeybanChange()
return s
}

func (s *Service) pollKeybanChange() {
go func() {
for {
select {
case <-s.context.Done():
return
case notif := <-s.queue:
// Depending on the flag, ban or unban the key
bannedKey := event.Ban(notif.Key)
switch {
case notif.Banned && !s.cluster.Contains(&bannedKey):
s.connection.Ban()
s.connection.Close()
s.cluster.Notify(&bannedKey, true)
case !notif.Banned && s.cluster.Contains(&bannedKey):
s.cluster.Notify(&bannedKey, false)
}
}
}
}()
}

// OnRequest handles a request to create a link.
// OnRequest handles a request to ban or unban a key.
func (s *Service) OnRequest(c service.Conn, payload []byte) (service.Response, bool) {
var message Request
if err := json.Unmarshal(payload, &message); err != nil {
Expand All @@ -71,6 +108,8 @@ func (s *Service) OnRequest(c service.Conn, payload []byte) (service.Response, b
bannedKey := event.Ban(message.Target)
switch {
case message.Banned && !s.cluster.Contains(&bannedKey):
c.Ban()
c.Close()
s.cluster.Notify(&bannedKey, true)
case !message.Banned && s.cluster.Contains(&bannedKey):
s.cluster.Notify(&bannedKey, false)
Expand Down
33 changes: 33 additions & 0 deletions internal/service/keyban/requests.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,12 @@

package keyban

import (
"time"

"github.com/emitter-io/emitter/internal/message"
)

// Request represents a key ban request.
type Request struct {
Secret string `json:"secret"` // The master key to use.
Expand All @@ -22,6 +28,16 @@ type Request struct {
}

// ------------------------------------------------------------------------------------
/*
// EventType represents a presence event type
type EventType string
// Various event types
const (
EventTypeKeyban = EventType("keyban")
EventTypeKeyunban = EventType("keyunban")
)
*/

// Response represents a key ban response.
type Response struct {
Expand All @@ -34,3 +50,20 @@ type Response struct {
func (r *Response) ForRequest(id uint16) {
r.Request = id
}

type Notification struct {
Time int64 `json:"time"` // The UNIX timestamp.
Banned bool `json:"banned"` // The event, must be "status", "subscribe" or "unsubscribe".
Key string `json:"key"` // The target channel for the notification.
filter func(message.Subscriber) bool // The filter function (optional)
}

// newNotification creates a new notification payload.
func newNotification(banned bool, key string, filter func(message.Subscriber) bool) *Notification {
return &Notification{
filter: filter,
Time: time.Now().UTC().Unix(),
Key: key,
Banned: banned,
}
}

0 comments on commit 3436dad

Please sign in to comment.