2023. 3. 20. 00:00ㆍCSharp
이전 강좌에서 streamjsonrpc library 와 .NET 자체적으로 지원하는 HttpListener 를 통해 WebSocket 을 구현해 보았다.
물론 system native 한 websocket 을 사용하는게 best 이지만
.NET version 등에 의해 사용하기 곤란 할수도 있고
이미 구현된 library 가 있을 수도 있다.
또한 직접 구현한 websocket (tcp socket 을 통해) 도 있을 것이다.
대부분의 이러한 구현체는 개인이 구현하다 보니 코드상에 bug 가 있을 수도 있고
websocket 표준을 따르지 못하는 경우들도 있다.
그래서 이러한 문제를 해결하기 위해 3rd party library 를 통해서
앞에 구현한 websocket server 를 다시 구현해 보겠다.
여러가지 WebSocket 관련 library 가 있는데
그중에서 우리는 Fleck 을 사용해 보겠다.
일단 사용법 자체가 simple 하다.
단점이라면 더이상의 update 가 없어서 최신 websocket 표준을 지원하지 못한다.
(websocket 2.0 지원 안함)
https://github.com/statianzo/Fleck
GitHub - statianzo/Fleck: C# Websocket Implementation
C# Websocket Implementation. Contribute to statianzo/Fleck development by creating an account on GitHub.
github.com
fleck 은 위 site 에서 download 가 가능하니 우리는 이것을 이용해서 개발을 해보자
예제를 보면 event 방식으로 처리 하도록 되어 있고 다음과 같은 아주 simple 한 예제가 있다.
var server = new WebSocketServer("ws://0.0.0.0:8181");
server.Start(socket =>
{
socket.OnOpen = () => Console.WriteLine("Open!");
socket.OnClose = () => Console.WriteLine("Close!");
socket.OnMessage = message => socket.Send(message);
});
websocket Server 를 생성하고 Start (listen) 상태에 들어간 후
client 에 접속이 있다면 IWebSocketConnection 인 socket 을 통해서 event 를 처리 하고 있다.
(OnOpen 등은 action 으로 구현되어 있음)
아주 간단하다.
이제 기존코드를 fleck 으로 migration 해보자
기존 코드를 fleck 으로 migration 하기 위해서는
streamjsonrpc 에서 처리 하고는 JsonRPC, Non JsonRPC message 에 대한 처리를 수정해야 한다.
이전 과정에서 우리는 WebSocketMessageHandler 를 상속한 WebSocketAllMessageHandler 를 이용했다.
https://yogingang.tistory.com/423
WebSocket 에 JsonRPC 및 일반메세지 함께 처리하기
StreamJsonRPC Library 를 이용해서 WebSocket 에서 JSON-Rpc 를 처리 하였다. 이렇게 하게 되면 JSON-RPC 는 처리가 되지만 일반적인 Plain Text 를 (Json rpc 형태가 아닌) 처리하지 못하게 된다. 그래서 이 두가지
yogingang.tistory.com
이번에는 fleck 을 사용할 것이니 WebSocketAllMessageHandler 를 그대로 사용할 수는 없다.
우리는 이번에 Fleck 에서 receive 하는 부분을 StreamJsonRPC 에 MessageBaseHandler 를 상속한 FleckWebSocketConnectionMessageHandler 를 구현할 것이다.
Fleck 은 IWebSocketConnection 으로 client connection 을 관리 하고 있으므로 이것을 이용하자
FleckWebSocketConnectionMessageHandler.cs
// See https://aka.ms/new-console-template for more information
using Fleck;
using Microsoft;
using Nerdbank.Streams;
using StreamJsonRpc;
using StreamJsonRpc.Protocol;
using StreamJsonRpc.Reflection;
using System.Buffers;
using System.Net.WebSockets;
using System.Runtime.InteropServices;
using System.Text;
using System.Text.Json;
public class FleckWebSocketConnectionMessageHandler : MessageHandlerBase, IJsonRpcMessageBufferManager
{
public event EventHandler<string> RawMessageArrived;
private readonly int sizeHint;
private readonly Sequence<byte> contentSequenceBuilder = new Sequence<byte>();
private IJsonRpcMessageBufferManager? bufferedMessage;
/// <summary>
/// Initializes a new instance of the <see cref="WebSocketMessageHandler"/> class
/// that uses the <see cref="JsonMessageFormatter"/> to serialize messages as textual JSON.
/// </summary>
/// <param name="webSocket">
/// The <see cref="System.Net.WebSockets.WebSocket"/> used to communicate.
/// This will <em>not</em> be automatically disposed of with this <see cref="WebSocketMessageHandler"/>.
/// </param>
public FleckWebSocketConnectionMessageHandler(IWebSocketConnection webSocket)
#pragma warning disable CA2000 // Dispose objects before losing scope
: this(webSocket, new JsonMessageFormatter())
#pragma warning restore CA2000 // Dispose objects before losing scope
{
}
/// <summary>
/// Initializes a new instance of the <see cref="WebSocketMessageHandler"/> class.
/// </summary>
/// <param name="webSocket">
/// The <see cref="System.Net.WebSockets.WebSocket"/> used to communicate.
/// This will <em>not</em> be automatically disposed of with this <see cref="WebSocketMessageHandler"/>.
/// </param>
/// <param name="formatter">The formatter to use to serialize <see cref="JsonRpcMessage"/> instances.</param>
/// <param name="sizeHint">
/// The size of the buffer to use for reading JSON-RPC messages.
/// Messages which exceed this size will be handled properly but may require multiple I/O operations.
/// </param>
public FleckWebSocketConnectionMessageHandler(IWebSocketConnection webSocket, IJsonRpcMessageFormatter formatter, int sizeHint = 4096)
: base(formatter)
{
Requires.NotNull(webSocket, nameof(webSocket));
Requires.Range(sizeHint > 0, nameof(sizeHint));
this.WebSocket = webSocket;
this.sizeHint = sizeHint;
this.WebSocket.OnMessage = (message) =>
{
_receivedMessage = message;
_resetEvent.Set();
};
}
private string _receivedMessage;
/// <inheritdoc />
public override bool CanWrite => true;
/// <inheritdoc />
public override bool CanRead => true;
/// <summary>
/// Gets the <see cref="System.Net.WebSockets.WebSocket"/> used to communicate.
/// </summary>
public IWebSocketConnection WebSocket { get; }
/// <inheritdoc/>
void IJsonRpcMessageBufferManager.DeserializationComplete(JsonRpcMessage message)
{
if (message is object && this.bufferedMessage == message)
{
this.bufferedMessage.DeserializationComplete(message);
this.bufferedMessage = null;
this.contentSequenceBuilder.Reset();
}
}
private AutoResetEvent _resetEvent = new AutoResetEvent(false);
/// <inheritdoc />
protected override async ValueTask<JsonRpcMessage?> ReadCoreAsync(CancellationToken cancellationToken)
{
try
{
_resetEvent.WaitOne();
if (IsJsonRpcMessage(_receivedMessage))
{
var buffer = Encoding.UTF8.GetBytes(_receivedMessage);
Memory<byte> memory = this.contentSequenceBuilder.GetMemory(buffer.Length);
this.contentSequenceBuilder.Write(buffer.AsSpan());
JsonRpcMessage message = this.Formatter.Deserialize(this.contentSequenceBuilder);
this.bufferedMessage = message as IJsonRpcMessageBufferManager;
if (this.bufferedMessage is null)
{
this.contentSequenceBuilder.Reset();
}
return message;
}
else
{
RawMessageArrived?.Invoke(this, _receivedMessage);
return null;
}
}
finally
{
}
}
private bool IsJsonRpcMessage(string message)
{
if (string.IsNullOrWhiteSpace(message))
{
return false;
}
try
{
var jsonDoc = JsonDocument.Parse(message);
if (!jsonDoc.RootElement.TryGetProperty("jsonrpc", out _))
{
return false;
}
if (!jsonDoc.RootElement.TryGetProperty("method", out _))
{
return false;
}
return true;
}
catch (JsonException)
{
return false;
}
}
/// <inheritdoc />
protected override async ValueTask WriteCoreAsync(JsonRpcMessage content, CancellationToken cancellationToken)
{
Requires.NotNull(content, nameof(content));
using (var contentSequenceBuilder = new Sequence<byte>())
{
WebSocketMessageType messageType = this.Formatter is IJsonRpcMessageTextFormatter ? WebSocketMessageType.Text : WebSocketMessageType.Binary;
this.Formatter.Serialize(contentSequenceBuilder, content);
cancellationToken.ThrowIfCancellationRequested();
// Some formatters (e.g. MessagePackFormatter) needs the encoded form in order to produce JSON for tracing.
// Other formatters (e.g. JsonMessageFormatter) would prefer to do its own tracing while it still has a JToken.
// We only help the formatters that need the byte-encoded form here. The rest can do it themselves.
if (this.Formatter is IJsonRpcFormatterTracingCallbacks tracer)
{
tracer.OnSerializationComplete(content, contentSequenceBuilder);
}
int bytesCopied = 0;
ReadOnlySequence<byte> contentSequence = contentSequenceBuilder.AsReadOnlySequence;
foreach (ReadOnlyMemory<byte> memory in contentSequence)
{
bool endOfMessage = bytesCopied + memory.Length == contentSequence.Length;
#if NETSTANDARD2_1_OR_GREATER
await this.WebSocket.SendAsync(memory, messageType, endOfMessage, cancellationToken).ConfigureAwait(false);
#else
if (MemoryMarshal.TryGetArray(memory, out ArraySegment<byte> segment))
{
await this.WebSocket.Send(segment.Array);
}
else
{
byte[] array = ArrayPool<byte>.Shared.Rent(memory.Length);
try
{
memory.CopyTo(array);
await this.WebSocket.Send(new ArraySegment<byte>(array, 0, memory.Length).ToArray());
}
finally
{
ArrayPool<byte>.Shared.Return(array);
}
}
#endif
bytesCopied += memory.Length;
}
}
}
/// <inheritdoc />
protected override ValueTask FlushAsync(CancellationToken cancellationToken) => default;
}
최대한 기존 websocket 에 source 를 활용해 보았다.
다른 점이라면 send 및 receive 부분이 fleck 의 websocket connection 개체로 대체 되었다.
가장 중요한 부분이 있는데 AutoResetEvent 인 _resetEvent 변수 이다.
...
private AutoResetEvent _resetEvent = new AutoResetEvent(false);
...
선언은 위와 같이 되어 있다.
그리고 ReadCoreAsync 에서 다음과 같이 사용되고 있다.
protected override async ValueTask<JsonRpcMessage?> ReadCoreAsync(CancellationToken cancellationToken)
{
try
{
_resetEvent.WaitOne();
if (IsJsonRpcMessage(_receivedMessage))
{
...
}
즉 초기에 MessageBaseHanlder 에서 ReadCoreAync 를 바로 호출 하고
ReadCoreAsync 내부에서 websocket 을 이용해 receive 를 처리 하던 방식에서
event 형태의 message 를 받는 방식으로 변경된 fleck 을 위해 reset event 를 통해 대기를 하게 하는 것이다.
그리고 이 reset event 를 처리하는 부분은 생성자에 있으며 다음과 같다.
...
this.WebSocket.OnMessage = (message) =>
{
_receivedMessage = message;
_resetEvent.Set();
};
...
Fleck 의 Client Connection 인 IWebSocketConnection 을 통해 OnMessage 에 message 가 들어오면
receiveMessage 를 setting 한후 _resetEvent 를 Set(); 한다.
이렇게 하여 message 를 처리 하게 되는 것이다.
이제 Handler 를 확인했으니 사용하는 코드를 확인해 보자
Program.cs
// See https://aka.ms/new-console-template for more information
using Fleck;
using StreamJsonRpc;
WebSocketServer webSocketServer = new WebSocketServer("ws://0.0.0.0:8080");
webSocketServer.Start(socket =>
{
socket.OnOpen = () =>
{
ProcessJsonRpcAsync(socket).ConfigureAwait(false).GetAwaiter().GetResult();
Console.WriteLine($"Client 연결됨. {socket.ConnectionInfo} ");
};
socket.OnClose = () => Console.WriteLine($"Client 닫힘. {socket.ConnectionInfo}");
});
Console.WriteLine("Listening...");
Console.ReadLine();
static async Task ProcessJsonRpcAsync(IWebSocketConnection webSocket)
{
var handler = new FleckWebSocketConnectionMessageHandler(webSocket);
handler.RawMessageArrived += Handler_RawMessageArrived;
var rpc = new JsonRpc(handler, new MyService());
rpc.StartListening();
await rpc.Completion;
}
static void Handler_RawMessageArrived(object? sender, string e)
{
Console.WriteLine($"Handler_RawMessageArrived = {e}");
}
class MyService
{
public string Echo(string message)
{
Console.WriteLine($"Received message: {message}");
return "Echo: " + message;
}
}
Client 코드는 이전과 같다.
Fleck 은 WebSocketServer 를 위한 library 이니 client 는 ClientWebSocket 을 이용하자
Program.cs
//See https://aka.ms/new-console-template for more information
using StreamJsonRpc;
using System.Net.WebSockets;
using System.Text;
var webSocket = new ClientWebSocket();
await webSocket.ConnectAsync(new Uri("ws://localhost:8080"), CancellationToken.None);
Console.WriteLine("WebSocket connected");
var handler = new WebSocketMessageHandler(webSocket);
var rpc = new JsonRpc(new WebSocketMessageHandler(webSocket));
rpc.StartListening();
var myService = rpc.Attach<IMyService>();
var message = "Hello, World!";
var response = await myService.Echo(message);
Console.WriteLine(response);
var buffer = new byte[1024];
var sendBuffer = new ArraySegment<byte>(Encoding.UTF8.GetBytes(message));
await webSocket.SendAsync(sendBuffer, WebSocketMessageType.Text, true, CancellationToken.None);
Console.ReadLine();
interface IMyService
{
Task<string> Echo(string message);
}
이제 이전과 같은 동작을 하게 된다.
server 실행 화면
client 실행화면
관련영상
'CSharp' 카테고리의 다른 글
C# 12 - Collection Expression (1) | 2023.12.25 |
---|---|
Expression Tree (0) | 2023.09.04 |
WebSocket 에 JsonRPC 및 일반메세지 함께 처리하기 (0) | 2023.03.13 |
.NET Core Console 에서 WebSocket Server 에 JsonRPC 적용하기 (0) | 2023.03.07 |
.NET Core Console 에서 WebSocket Server 만들기 (0) | 2023.03.07 |