Skip to content

Commit

Permalink
config conflict
Browse files Browse the repository at this point in the history
  • Loading branch information
mrmohande3 committed Feb 23, 2023
2 parents dcaffc7 + dac8223 commit 6fb6c3c
Show file tree
Hide file tree
Showing 3 changed files with 34 additions and 29 deletions.
9 changes: 5 additions & 4 deletions source/Interfaces/IStompClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,11 @@ namespace Netina.Stomp.Client.Interfaces
{
public interface IStompClient : IDisposable
{
event EventHandler OnConnect;
event EventHandler<DisconnectionInfo> OnClose;
event EventHandler<string> OnMessage;
event EventHandler<ReconnectionInfo> OnReconnect;
event EventHandler<StompMessage> OnConnect;
event EventHandler<DisconnectionInfo> OnClose;
event EventHandler<StompMessage> OnMessage;
event EventHandler<ReconnectionInfo> OnReconnect;
event EventHandler<StompMessage> OnError;

StompConnectionState StompState { get; }

Expand Down
9 changes: 1 addition & 8 deletions source/Netina.Stomp.Client.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,8 @@
<PackageProjectUrl>https://github.com/Netina/Netina.Stomp.Client</PackageProjectUrl>
<RepositoryUrl>https://github.com/Netina/Netina.Stomp.Client</RepositoryUrl>
<PackageTags>stomp;websocket</PackageTags>
<Version>2.0.4.0</Version>
<SignAssembly>false</SignAssembly>
<PackageId>Netina.Stomp.Client</PackageId>
<Product>Netina.Stomp.Client</Product>
<AssemblyName>Netina.Stomp.Client</AssemblyName>
<RootNamespace>Netina.Stomp.Client</RootNamespace>
<Version>2.0.5.0</Version>
<PackageIcon>NetinaLogo.jpg</PackageIcon>
<AssemblyVersion>2.0.4.0</AssemblyVersion>
<FileVersion>2.0.4.0</FileVersion>
<Description>
.NET nuget package for connecting STOMP server in async way over WebSocket.
Package documentation and usage - https://github.com/Netina/Netina.Stomp.Client
Expand Down
45 changes: 28 additions & 17 deletions source/StompClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
using System.Collections.Generic;
using System.Text;
using System.Threading.Tasks;
using System.Net.WebSockets;
using Netina.Stomp.Client.Interfaces;
using Netina.Stomp.Client.Messages;
using Netina.Stomp.Client.Utils;
Expand All @@ -13,13 +14,15 @@ namespace Netina.Stomp.Client
{
public class StompClient : IStompClient
{
public event EventHandler OnConnect;
public event EventHandler<StompMessage> OnConnect;
public event EventHandler<DisconnectionInfo> OnClose;
public event EventHandler<string> OnMessage;
public event EventHandler<StompMessage> OnMessage;
public event EventHandler<ReconnectionInfo> OnReconnect;
public event EventHandler<string> OnError;
public event EventHandler<StompMessage> OnError;

public StompConnectionState StompState { get; private set; } = StompConnectionState.Closed;
public string Version { get; private set; }

private readonly WebsocketClient _socket;
private readonly StompMessageSerializer _stompSerializer = new StompMessageSerializer();
private readonly IDictionary<string, EventHandler<StompMessage>> _subscribers = new Dictionary<string, EventHandler<StompMessage>>();
Expand All @@ -28,18 +31,22 @@ public class StompClient : IStompClient
/// <summary>
/// StompClient Ctor
/// </summary>
/// <param name="url">Url of stomp websocket , start with wss or ws</param>
/// <param name="reconnectEnable">Set reconnect enable of disable</param>
/// <param name="stompVersion">Add stomp version in header for connecting , IF DONT SET VERSION HEADER SET 1.1,1.0 AUTOMATIC</param>
/// <param name="reconnectTimeOut">Time range in ms, how long to wait before reconnecting if last reconnection failed.Set null to disable this feature.Default: NULL</param>
/// <param name="heartBeat">If you set heat-beat null is set 0,1000 automatic</param>
/// <param name="url">Url of stomp websocket, start with wss or ws</param>
/// <param name="reconnectEnable">Set reconnect enable or disable</param>
/// <param name="stompVersion">Add stomp version in header for connecting, set "1.0,1.1,1.2" if nothing specified</param>
/// <param name="reconnectTimeOut">Time range in ms, how long to wait before reconnecting if last reconnection failed. Set null to disable this feature</param>
/// <param name="heartBeat">Set 0,1000 if nothing specified</param>
public StompClient(string url, bool reconnectEnable = true, string stompVersion = null, TimeSpan? reconnectTimeOut = null, string heartBeat = null)
{
_socket = new WebsocketClient(new Uri(url))
_socket = new WebsocketClient(new Uri(url), () => {
var ws = new ClientWebSocket();
ws.Options.AddSubProtocol("stomp");
return ws;
})
{
ReconnectTimeout = reconnectTimeOut,
IsReconnectionEnabled = reconnectEnable,
ErrorReconnectTimeout = TimeSpan.FromSeconds(2)
ErrorReconnectTimeout = TimeSpan.FromSeconds(2.0),
};

_socket.MessageReceived.Subscribe(HandleMessage);
Expand All @@ -48,7 +55,6 @@ public StompClient(string url, bool reconnectEnable = true, string stompVersion
StompState = StompConnectionState.Closed;
OnClose?.Invoke(this, info);
_subscribers.Clear();
});
_socket.ReconnectionHappened.Subscribe(async info =>
{
Expand All @@ -59,8 +65,13 @@ public StompClient(string url, bool reconnectEnable = true, string stompVersion
await Reconnect();
});

_connectingHeaders.Add("accept-version", string.IsNullOrEmpty(stompVersion) ? "1.1,1.0" : stompVersion);
_connectingHeaders.Add("heart-beat", string.IsNullOrEmpty(stompVersion) ? "0,1000" : heartBeat);
_connectingHeaders.Add("accept-version", string.IsNullOrEmpty(stompVersion) ? "1.0,1.1,1.2" : stompVersion);
_connectingHeaders.Add("heart-beat", string.IsNullOrEmpty(heartBeat) ? "0,1000" : heartBeat);

OnConnect += (sender, message) =>
{
Version = message.Headers["version"];
};
}

public async Task ConnectAsync(IDictionary<string, string> headers)
Expand Down Expand Up @@ -161,20 +172,20 @@ private async Task Acknowledge(bool isPositive, string id, string transaction =
{
{ "id", id }
};
if (string.IsNullOrEmpty(transaction))
if (!string.IsNullOrEmpty(transaction))
headers.Add("transaction", transaction);
var connectMessage = new StompMessage(isPositive ? StompCommand.Ack : StompCommand.Nack, headers);
await _socket.SendInstant(_stompSerializer.Serialize(connectMessage));
}

private void HandleMessage(ResponseMessage messageEventArgs)
{
OnMessage?.Invoke(this, messageEventArgs.Text);
var message = _stompSerializer.Deserialize(messageEventArgs.Text);
OnMessage?.Invoke(this, message);
if (message.Command == StompCommand.Connected)
OnConnect?.Invoke(this, new EventArgs());
OnConnect?.Invoke(this, message);
if (message.Command == StompCommand.Error)
OnError?.Invoke(this, message.Body);
OnError?.Invoke(this, message);
if (message.Headers.ContainsKey("destination"))
_subscribers[message.Headers["destination"]](this, message);
}
Expand Down

0 comments on commit 6fb6c3c

Please sign in to comment.