-
Notifications
You must be signed in to change notification settings - Fork 9
/
StompClient.cs
193 lines (171 loc) · 8.12 KB
/
StompClient.cs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading.Tasks;
using System.Net.WebSockets;
using Netina.Stomp.Client.Interfaces;
using Netina.Stomp.Client.Messages;
using Netina.Stomp.Client.Utils;
using Newtonsoft.Json;
using Websocket.Client;
using Websocket.Client.Models;
namespace Netina.Stomp.Client
{
public class StompClient : IStompClient
{
public event EventHandler<StompMessage> OnConnect;
public event EventHandler<DisconnectionInfo> OnClose;
public event EventHandler<StompMessage> OnMessage;
public event EventHandler<ReconnectionInfo> OnReconnect;
public event EventHandler<StompMessage> OnError;
public StompConnectionState StompState { get; private set; } = StompConnectionState.Closed;
public string Version { get; private set; }
private readonly WebsocketClient _socket;
private readonly StompMessageSerializer _stompSerializer = new StompMessageSerializer();
private readonly IDictionary<string, EventHandler<StompMessage>> _subscribers = new Dictionary<string, EventHandler<StompMessage>>();
private readonly IDictionary<string, string> _connectingHeaders = new Dictionary<string, string>();
/// <summary>
/// StompClient Ctor
/// </summary>
/// <param name="url">Url of stomp websocket, start with wss or ws</param>
/// <param name="reconnectEnable">Set reconnect enable or disable</param>
/// <param name="stompVersion">Add stomp version in header for connecting, set "1.0,1.1,1.2" if nothing specified</param>
/// <param name="reconnectTimeOut">Time range in ms, how long to wait before reconnecting if last reconnection failed. Set null to disable this feature</param>
/// <param name="heartBeat">Set 0,1000 if nothing specified</param>
public StompClient(string url, bool reconnectEnable = true, string stompVersion = null, TimeSpan? reconnectTimeOut = null, string heartBeat = null)
{
_socket = new WebsocketClient(new Uri(url), () => {
var ws = new ClientWebSocket();
ws.Options.AddSubProtocol("stomp");
return ws;
})
{
ReconnectTimeout = reconnectTimeOut,
IsReconnectionEnabled = reconnectEnable,
ErrorReconnectTimeout = TimeSpan.FromSeconds(2.0),
};
_socket.MessageReceived.Subscribe(HandleMessage);
_socket.DisconnectionHappened.Subscribe(info =>
{
StompState = StompConnectionState.Closed;
OnClose?.Invoke(this, info);
_subscribers.Clear();
});
_socket.ReconnectionHappened.Subscribe(async info =>
{
if (info.Type == ReconnectionType.Initial)
return;
OnReconnect?.Invoke(this, info);
StompState = StompConnectionState.Reconnecting;
await Reconnect();
});
_connectingHeaders.Add("accept-version", string.IsNullOrEmpty(stompVersion) ? "1.0,1.1,1.2" : stompVersion);
_connectingHeaders.Add("heart-beat", string.IsNullOrEmpty(heartBeat) ? "0,1000" : heartBeat);
OnConnect += (sender, message) =>
{
Version = message.Headers["version"];
};
}
public async Task ConnectAsync(IDictionary<string, string> headers)
{
if (!_socket.IsRunning)
await _socket.Start();
if (!_socket.IsRunning)
throw new Exception("Connection is not open");
if (StompState != StompConnectionState.Closed)
return;
foreach (var header in headers)
{
_connectingHeaders.Add(header);
}
var connectMessage = new StompMessage(StompCommand.Connect, _connectingHeaders);
await _socket.SendInstant(_stompSerializer.Serialize(connectMessage));
StompState = StompConnectionState.Open;
}
public async Task Reconnect()
{
if (!_socket.IsRunning)
await _socket.Start();
if (StompState == StompConnectionState.Open)
return;
var connectMessage = new StompMessage(StompCommand.Connect, _connectingHeaders);
await _socket.SendInstant(_stompSerializer.Serialize(connectMessage));
StompState = StompConnectionState.Open;
}
public async Task SendAsync(object body, string destination, IDictionary<string, string> headers)
{
var jsonPayload = JsonConvert.SerializeObject(body);
headers.Add("content-type", "application/json;charset=UTF-8");
headers.Add("content-length", Encoding.UTF8.GetByteCount(jsonPayload).ToString());
await SendAsync(jsonPayload, destination, headers);
}
public async Task SendAsync(string body, string destination, IDictionary<string, string> headers)
{
if (StompState != StompConnectionState.Open)
await Reconnect();
headers.Add("destination", destination);
var connectMessage = new StompMessage(StompCommand.Send, body, headers);
await _socket.SendInstant(_stompSerializer.Serialize(connectMessage));
}
public async Task SubscribeAsync<T>(string topic, IDictionary<string, string> headers, EventHandler<T> handler)
{
await SubscribeAsync(topic, headers, (sender, message) => handler(this, JsonConvert.DeserializeObject<T>(message.Body)));
}
public async Task SubscribeAsync(string topic, IDictionary<string, string> headers, EventHandler<StompMessage> handler)
{
if (StompState != StompConnectionState.Open)
await Reconnect();
headers.Add("destination", topic);
headers.Add("id", $"sub-{_subscribers.Count}");
var subscribeMessage = new StompMessage(StompCommand.Subscribe, headers);
await _socket.SendInstant(_stompSerializer.Serialize(subscribeMessage));
_subscribers.Add(topic, handler);
}
public async Task AckAsync(string id, string transaction = null)
{
await Acknowledge(true, id, transaction);
}
public async Task NackAsync(string id, string transaction = null)
{
await Acknowledge(false, id, transaction);
}
public async Task DisconnectAsync()
{
var connectMessage = new StompMessage(StompCommand.Disconnect);
await _socket.SendInstant(_stompSerializer.Serialize(connectMessage));
StompState = StompConnectionState.Closed;
_socket.Dispose();
_subscribers.Clear();
}
public void Dispose()
{
StompState = StompConnectionState.Closed;
((IDisposable)_socket).Dispose();
_subscribers.Clear();
}
private async Task Acknowledge(bool isPositive, string id, string transaction = null)
{
if (StompState != StompConnectionState.Open)
await Reconnect();
var headers = new Dictionary<string, string>()
{
{ "id", id }
};
if (!string.IsNullOrEmpty(transaction))
headers.Add("transaction", transaction);
var connectMessage = new StompMessage(isPositive ? StompCommand.Ack : StompCommand.Nack, headers);
await _socket.SendInstant(_stompSerializer.Serialize(connectMessage));
}
private void HandleMessage(ResponseMessage messageEventArgs)
{
var message = _stompSerializer.Deserialize(messageEventArgs.Text);
OnMessage?.Invoke(this, message);
if (message.Command == StompCommand.Connected)
OnConnect?.Invoke(this, message);
if (message.Command == StompCommand.Error)
OnError?.Invoke(this, message);
if (message.Headers.ContainsKey("destination"))
_subscribers[message.Headers["destination"]](this, message);
}
}
}