Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactored survey #315

Merged
merged 2 commits into from
May 24, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 18 additions & 7 deletions internal/broker/cluster/swarm.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package cluster

import (
"context"
"errors"
"fmt"
"net"
"strings"
Expand Down Expand Up @@ -114,6 +115,16 @@ func (s *Swarm) Printf(format string, args ...interface{}) {
}
}

// findPeer retrieves a peer.
func (s *Swarm) findPeer(name mesh.PeerName) *Peer {
peer, added := s.members.GetOrAdd(name)
if added {
s.onPeerOnline(peer)
}

return peer
}

// onPeerOnline occurs when a new peer is created.
func (s *Swarm) onPeerOnline(peer *Peer) {
logging.LogTarget("swarm", "peer created", peer.name)
Expand All @@ -137,14 +148,14 @@ func (s *Swarm) onPeerOffline(name mesh.PeerName) {
}
}

// FindPeer retrieves a peer.
func (s *Swarm) FindPeer(name mesh.PeerName) *Peer {
peer, added := s.members.GetOrAdd(name)
if added {
s.onPeerOnline(peer)
// SendTo sends a message to a peer.
func (s *Swarm) SendTo(name mesh.PeerName, msg *message.Message) error {
peer := s.findPeer(name)
if !peer.IsActive() {
return errors.New("swarm: unable to reply to a request, peer is not active")
}

return peer
return peer.Send(msg)
}

// ID returns the local node ID.
Expand Down Expand Up @@ -236,7 +247,7 @@ func (s *Swarm) merge(buf []byte) (mesh.GossipData, error) {

// Find the active peer for this subscription event
encoded := ev.Encode()
peer := s.FindPeer(mesh.PeerName(ev.Peer))
peer := s.findPeer(mesh.PeerName(ev.Peer))

// If the subscription is added, notify (TODO: use channels)
if t.IsAdded() && peer.onSubscribe(encoded, ev.Ssid) && peer.IsActive() {
Expand Down
16 changes: 14 additions & 2 deletions internal/broker/cluster/swarm_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,10 @@
package cluster

import (
"github.com/emitter-io/emitter/internal/event"
"testing"

"github.com/emitter-io/emitter/internal/config"
"github.com/emitter-io/emitter/internal/event"
"github.com/emitter-io/emitter/internal/message"
"github.com/stretchr/testify/assert"
"github.com/weaveworks/mesh"
Expand Down Expand Up @@ -57,6 +57,7 @@ func TestOnGossipUnicast(t *testing.T) {
}

func TestNewSwarm_Scenario(t *testing.T) {
msg := newTestMessage(message.Ssid{1, 2, 3}, "a/b/c/", "hello abc")
cfg := config.ClusterConfig{
NodeName: "00:00:00:00:00:01",
ListenAddr: ":4000",
Expand All @@ -65,6 +66,8 @@ func TestNewSwarm_Scenario(t *testing.T) {

// Create a new swarm and check if it was constructed well
s := NewSwarm(&cfg)
s.update()

assert.Equal(t, 0, s.NumPeers())
assert.Equal(t, uint64(1), s.ID())
assert.NotNil(t, s.Gossip())
Expand All @@ -86,9 +89,18 @@ func TestNewSwarm_Scenario(t *testing.T) {
assert.Error(t, err)

// Find peer
peer := s.FindPeer(123)
peer := s.findPeer(123)
assert.NotNil(t, peer)

// Send to active peer
err = s.SendTo(123, &msg)
assert.NoError(t, err)

// Send to inactive peer
peer.activity = 0
err = s.SendTo(123, &msg)
assert.Error(t, err)

// Remove that peer, it should not be there
s.onPeerOffline(123)
assert.False(t, s.members.Contains(mesh.PeerName(123)))
Expand Down
31 changes: 11 additions & 20 deletions internal/broker/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@ import (
"time"

"github.com/emitter-io/emitter/internal/broker/keygen"
"github.com/emitter-io/emitter/internal/event"
"github.com/emitter-io/emitter/internal/errors"
"github.com/emitter-io/emitter/internal/event"
"github.com/emitter-io/emitter/internal/message"
"github.com/emitter-io/emitter/internal/network/mqtt"
"github.com/emitter-io/emitter/internal/provider/contract"
Expand Down Expand Up @@ -69,7 +69,7 @@ func (s *Service) newConn(t net.Conn, readRate int) *Conn {
}

// Generate a globally unique id as well
c.guid = c.luid.Unique(uint64(s.LocalName()), "emitter")
c.guid = c.luid.Unique(uint64(s.ID()), "emitter")
if readRate == 0 {
readRate = defaultReadRate
}
Expand Down Expand Up @@ -271,16 +271,13 @@ func (c *Conn) Subscribe(ssid message.Ssid, channel []byte) {

// Add the subscription
if first := c.subs.Increment(ssid, channel); first {
ev := &event.Subscription{
Peer: c.service.LocalName(),
c.service.Subscribe(c, &event.Subscription{
Peer: c.service.ID(),
Conn: c.luid,
User: nocopy.String(c.username),
Ssid: ssid,
Channel: channel,
}

c.service.onSubscribe(c, ev) // Subscribe the subscriber
c.service.notifySubscribe(ev) // Broadcast the subscription within our cluster
})
}
}

Expand All @@ -291,16 +288,13 @@ func (c *Conn) Unsubscribe(ssid message.Ssid, channel []byte) {

// Decrement the counter and if there's no more subscriptions, notify everyone.
if last := c.subs.Decrement(ssid); last {
ev := &event.Subscription{
Peer: c.service.LocalName(),
c.service.Unsubscribe(c, &event.Subscription{
Peer: c.service.ID(),
Conn: c.luid,
User: nocopy.String(c.username),
Ssid: ssid,
Channel: channel,
}

c.service.onUnsubscribe(c, ev) // Unsubscribe the subscriber
c.service.notifyUnsubscribe(ev) // Broadcast the unsubscription within our cluster
})
}
}

Expand All @@ -313,16 +307,13 @@ func (c *Conn) Close() error {
// Unsubscribe from everything, no need to lock since each Unsubscribe is
// already locked. Locking the 'Close()' would result in a deadlock.
for _, counter := range c.subs.All() {
ev := &event.Subscription{
Peer: c.service.LocalName(),
c.service.Unsubscribe(c, &event.Subscription{
Peer: c.service.ID(),
Conn: c.luid,
User: nocopy.String(c.username),
Ssid: counter.Ssid,
Channel: counter.Channel,
}

c.service.onUnsubscribe(c, ev)
c.service.notifyUnsubscribe(ev)
})
}

// Close the transport and decrement the connection counter
Expand Down
2 changes: 1 addition & 1 deletion internal/broker/handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,7 @@ func (c *Conn) onPublish(packet *mqtt.Publish) *errors.Error {
}

// Iterate through all subscribers and send them the message
size := c.service.publish(msg, func(s message.Subscriber) bool {
size := c.service.Publish(msg, func(s message.Subscriber) bool {
return s.ID() != exclude
})

Expand Down
84 changes: 0 additions & 84 deletions internal/broker/query_test.go

This file was deleted.

Loading