Fleck 을 이용한 WebSocket Server 구현 (with streamjsonrpc)

2023. 3. 20. 00:00CSharp

반응형

이전 강좌에서 streamjsonrpc library 와 .NET 자체적으로 지원하는 HttpListener 를 통해 WebSocket 을 구현해 보았다.

물론 system native 한 websocket 을 사용하는게 best 이지만

.NET version 등에 의해 사용하기 곤란 할수도 있고

이미 구현된 library 가 있을 수도 있다. 

또한 직접 구현한 websocket (tcp socket 을 통해) 도 있을 것이다. 

대부분의 이러한 구현체는 개인이 구현하다 보니 코드상에 bug 가 있을 수도 있고

websocket 표준을 따르지 못하는 경우들도 있다. 

그래서 이러한 문제를 해결하기 위해 3rd party library 를 통해서

앞에 구현한 websocket server 를 다시 구현해 보겠다.

 

여러가지 WebSocket 관련 library 가 있는데

그중에서 우리는 Fleck 을 사용해 보겠다. 

일단 사용법 자체가 simple 하다. 

단점이라면 더이상의 update 가 없어서 최신 websocket 표준을 지원하지 못한다. 

(websocket 2.0 지원 안함)

 

https://github.com/statianzo/Fleck

 

GitHub - statianzo/Fleck: C# Websocket Implementation

C# Websocket Implementation. Contribute to statianzo/Fleck development by creating an account on GitHub.

github.com

fleck 은 위 site 에서 download 가 가능하니 우리는 이것을 이용해서 개발을 해보자

 

예제를 보면 event 방식으로 처리 하도록 되어 있고 다음과 같은 아주 simple 한 예제가 있다. 

var server = new WebSocketServer("ws://0.0.0.0:8181");
server.Start(socket =>
{
  socket.OnOpen = () => Console.WriteLine("Open!");
  socket.OnClose = () => Console.WriteLine("Close!");
  socket.OnMessage = message => socket.Send(message);
});

websocket  Server 를 생성하고 Start (listen) 상태에 들어간 후

client 에 접속이 있다면 IWebSocketConnection 인 socket 을 통해서 event 를 처리 하고 있다. 

(OnOpen 등은 action 으로 구현되어 있음)

 

아주 간단하다. 

이제 기존코드를 fleck 으로 migration 해보자

 

기존 코드를 fleck 으로 migration 하기 위해서는

streamjsonrpc 에서 처리 하고는 JsonRPC, Non JsonRPC message 에 대한 처리를 수정해야 한다. 

이전 과정에서 우리는 WebSocketMessageHandler 를 상속한 WebSocketAllMessageHandler 를 이용했다.

https://yogingang.tistory.com/423

 

WebSocket 에 JsonRPC 및 일반메세지 함께 처리하기

StreamJsonRPC Library 를 이용해서 WebSocket 에서 JSON-Rpc 를 처리 하였다. 이렇게 하게 되면 JSON-RPC 는 처리가 되지만 일반적인 Plain Text 를 (Json rpc 형태가 아닌) 처리하지 못하게 된다. 그래서 이 두가지

yogingang.tistory.com

이번에는 fleck 을 사용할 것이니 WebSocketAllMessageHandler 를 그대로 사용할 수는 없다. 

우리는 이번에 Fleck 에서 receive 하는 부분을 StreamJsonRPC 에 MessageBaseHandler 를 상속한 FleckWebSocketConnectionMessageHandler 를 구현할 것이다.

 

Fleck 은 IWebSocketConnection 으로 client connection 을 관리 하고 있으므로 이것을 이용하자

FleckWebSocketConnectionMessageHandler.cs

// See https://aka.ms/new-console-template for more information
using Fleck;
using Microsoft;
using Nerdbank.Streams;
using StreamJsonRpc;
using StreamJsonRpc.Protocol;
using StreamJsonRpc.Reflection;
using System.Buffers;
using System.Net.WebSockets;
using System.Runtime.InteropServices;
using System.Text;
using System.Text.Json;

public class FleckWebSocketConnectionMessageHandler : MessageHandlerBase, IJsonRpcMessageBufferManager
{
    public event EventHandler<string> RawMessageArrived;
    private readonly int sizeHint;

    private readonly Sequence<byte> contentSequenceBuilder = new Sequence<byte>();

    private IJsonRpcMessageBufferManager? bufferedMessage;

    /// <summary>
    /// Initializes a new instance of the <see cref="WebSocketMessageHandler"/> class
    /// that uses the <see cref="JsonMessageFormatter"/> to serialize messages as textual JSON.
    /// </summary>
    /// <param name="webSocket">
    /// The <see cref="System.Net.WebSockets.WebSocket"/> used to communicate.
    /// This will <em>not</em> be automatically disposed of with this <see cref="WebSocketMessageHandler"/>.
    /// </param>
    public FleckWebSocketConnectionMessageHandler(IWebSocketConnection webSocket)
#pragma warning disable CA2000 // Dispose objects before losing scope
        : this(webSocket, new JsonMessageFormatter())
#pragma warning restore CA2000 // Dispose objects before losing scope
    {
    }

    /// <summary>
    /// Initializes a new instance of the <see cref="WebSocketMessageHandler"/> class.
    /// </summary>
    /// <param name="webSocket">
    /// The <see cref="System.Net.WebSockets.WebSocket"/> used to communicate.
    /// This will <em>not</em> be automatically disposed of with this <see cref="WebSocketMessageHandler"/>.
    /// </param>
    /// <param name="formatter">The formatter to use to serialize <see cref="JsonRpcMessage"/> instances.</param>
    /// <param name="sizeHint">
    /// The size of the buffer to use for reading JSON-RPC messages.
    /// Messages which exceed this size will be handled properly but may require multiple I/O operations.
    /// </param>
    public FleckWebSocketConnectionMessageHandler(IWebSocketConnection webSocket, IJsonRpcMessageFormatter formatter, int sizeHint = 4096)
        : base(formatter)
    {
        Requires.NotNull(webSocket, nameof(webSocket));
        Requires.Range(sizeHint > 0, nameof(sizeHint));

        this.WebSocket = webSocket;
        this.sizeHint = sizeHint;
        this.WebSocket.OnMessage = (message) =>
        {
            _receivedMessage = message;
            _resetEvent.Set();
        };
    }


    private string _receivedMessage;

    /// <inheritdoc />
    public override bool CanWrite => true;

    /// <inheritdoc />
    public override bool CanRead => true;

    /// <summary>
    /// Gets the <see cref="System.Net.WebSockets.WebSocket"/> used to communicate.
    /// </summary>
    public IWebSocketConnection WebSocket { get; }

    /// <inheritdoc/>
    void IJsonRpcMessageBufferManager.DeserializationComplete(JsonRpcMessage message)
    {
        if (message is object && this.bufferedMessage == message)
        {
            this.bufferedMessage.DeserializationComplete(message);
            this.bufferedMessage = null;
            this.contentSequenceBuilder.Reset();
        }
    }


    private AutoResetEvent _resetEvent = new AutoResetEvent(false);

    /// <inheritdoc />
    protected override async ValueTask<JsonRpcMessage?> ReadCoreAsync(CancellationToken cancellationToken)
    {
        try
        {
            _resetEvent.WaitOne();
            if (IsJsonRpcMessage(_receivedMessage))
            {
                var buffer = Encoding.UTF8.GetBytes(_receivedMessage);
                Memory<byte> memory = this.contentSequenceBuilder.GetMemory(buffer.Length);
                this.contentSequenceBuilder.Write(buffer.AsSpan());
                JsonRpcMessage message = this.Formatter.Deserialize(this.contentSequenceBuilder);
                this.bufferedMessage = message as IJsonRpcMessageBufferManager;
                if (this.bufferedMessage is null)
                {
                    this.contentSequenceBuilder.Reset();
                }

                return message;
            }
            else
            {
                RawMessageArrived?.Invoke(this, _receivedMessage);
                return null;
            }
        }
        finally
        {

        }
    }

    private bool IsJsonRpcMessage(string message)
    {
        if (string.IsNullOrWhiteSpace(message))
        {
            return false;
        }

        try
        {
            var jsonDoc = JsonDocument.Parse(message);

            if (!jsonDoc.RootElement.TryGetProperty("jsonrpc", out _))
            {
                return false;
            }

            if (!jsonDoc.RootElement.TryGetProperty("method", out _))
            {
                return false;
            }

            return true;
        }
        catch (JsonException)
        {
            return false;
        }
    }



    /// <inheritdoc />
    protected override async ValueTask WriteCoreAsync(JsonRpcMessage content, CancellationToken cancellationToken)
    {
        Requires.NotNull(content, nameof(content));

        using (var contentSequenceBuilder = new Sequence<byte>())
        {
            WebSocketMessageType messageType = this.Formatter is IJsonRpcMessageTextFormatter ? WebSocketMessageType.Text : WebSocketMessageType.Binary;
            this.Formatter.Serialize(contentSequenceBuilder, content);
            cancellationToken.ThrowIfCancellationRequested();

            // Some formatters (e.g. MessagePackFormatter) needs the encoded form in order to produce JSON for tracing.
            // Other formatters (e.g. JsonMessageFormatter) would prefer to do its own tracing while it still has a JToken.
            // We only help the formatters that need the byte-encoded form here. The rest can do it themselves.
            if (this.Formatter is IJsonRpcFormatterTracingCallbacks tracer)
            {
                tracer.OnSerializationComplete(content, contentSequenceBuilder);
            }

            int bytesCopied = 0;
            ReadOnlySequence<byte> contentSequence = contentSequenceBuilder.AsReadOnlySequence;
            foreach (ReadOnlyMemory<byte> memory in contentSequence)
            {
                bool endOfMessage = bytesCopied + memory.Length == contentSequence.Length;
#if NETSTANDARD2_1_OR_GREATER
                await this.WebSocket.SendAsync(memory, messageType, endOfMessage, cancellationToken).ConfigureAwait(false);
#else
                if (MemoryMarshal.TryGetArray(memory, out ArraySegment<byte> segment))
                {
                    await this.WebSocket.Send(segment.Array);
                }
                else
                {
                    byte[] array = ArrayPool<byte>.Shared.Rent(memory.Length);
                    try
                    {
                        memory.CopyTo(array);
                        await this.WebSocket.Send(new ArraySegment<byte>(array, 0, memory.Length).ToArray());
                    }
                    finally
                    {
                        ArrayPool<byte>.Shared.Return(array);
                    }
                }
#endif

                bytesCopied += memory.Length;
            }
        }
    }

    /// <inheritdoc />
    protected override ValueTask FlushAsync(CancellationToken cancellationToken) => default;
}

최대한 기존 websocket 에 source 를 활용해 보았다. 

다른 점이라면 send 및 receive 부분이 fleck 의 websocket connection 개체로 대체 되었다.

가장 중요한 부분이 있는데 AutoResetEvent 인 _resetEvent 변수 이다.

...
private AutoResetEvent _resetEvent = new AutoResetEvent(false);
...

선언은 위와 같이 되어 있다. 

그리고 ReadCoreAsync 에서 다음과 같이 사용되고 있다. 

protected override async ValueTask<JsonRpcMessage?> ReadCoreAsync(CancellationToken cancellationToken)
{
    try
    {
        _resetEvent.WaitOne();
        if (IsJsonRpcMessage(_receivedMessage))
        {
        ...

}

즉 초기에 MessageBaseHanlder 에서 ReadCoreAync 를 바로 호출 하고

ReadCoreAsync 내부에서 websocket 을 이용해 receive 를 처리 하던 방식에서

event 형태의 message 를 받는 방식으로 변경된 fleck 을 위해 reset event 를 통해 대기를 하게 하는 것이다.

 

그리고 이 reset event 를 처리하는 부분은 생성자에 있으며  다음과 같다. 

...
this.WebSocket.OnMessage = (message) =>
{
    _receivedMessage = message;
    _resetEvent.Set();
};
...

Fleck 의 Client Connection 인 IWebSocketConnection 을 통해 OnMessage 에 message 가 들어오면

receiveMessage 를 setting 한후 _resetEvent 를 Set(); 한다. 

이렇게 하여 message 를 처리 하게 되는 것이다. 

 

이제 Handler 를 확인했으니 사용하는 코드를 확인해 보자

Program.cs

// See https://aka.ms/new-console-template for more information
using Fleck;
using StreamJsonRpc;

WebSocketServer webSocketServer = new WebSocketServer("ws://0.0.0.0:8080");
webSocketServer.Start(socket =>
{
    socket.OnOpen = () =>
    {
        ProcessJsonRpcAsync(socket).ConfigureAwait(false).GetAwaiter().GetResult();
        Console.WriteLine($"Client 연결됨. {socket.ConnectionInfo} ");
    };
    socket.OnClose = () => Console.WriteLine($"Client 닫힘. {socket.ConnectionInfo}");
});
Console.WriteLine("Listening...");
Console.ReadLine();

static async Task ProcessJsonRpcAsync(IWebSocketConnection webSocket)
{
    var handler = new FleckWebSocketConnectionMessageHandler(webSocket);
    handler.RawMessageArrived += Handler_RawMessageArrived;
    var rpc = new JsonRpc(handler, new MyService());

    rpc.StartListening();
    await rpc.Completion;
}

static void Handler_RawMessageArrived(object? sender, string e)
{
    Console.WriteLine($"Handler_RawMessageArrived = {e}");
}


class MyService
{
    public string Echo(string message)
    {
        Console.WriteLine($"Received message: {message}");
        return "Echo: " + message;
    }
}

 

Client 코드는 이전과 같다. 

Fleck 은 WebSocketServer 를 위한 library 이니 client 는 ClientWebSocket 을  이용하자

Program.cs

//See https://aka.ms/new-console-template for more information
using StreamJsonRpc;
using System.Net.WebSockets;
using System.Text;

var webSocket = new ClientWebSocket();
await webSocket.ConnectAsync(new Uri("ws://localhost:8080"), CancellationToken.None);

Console.WriteLine("WebSocket connected");
var handler = new WebSocketMessageHandler(webSocket);
var rpc = new JsonRpc(new WebSocketMessageHandler(webSocket));
rpc.StartListening();
var myService = rpc.Attach<IMyService>();
var message = "Hello, World!";
var response = await myService.Echo(message);
Console.WriteLine(response);

var buffer = new byte[1024];

var sendBuffer = new ArraySegment<byte>(Encoding.UTF8.GetBytes(message));

await webSocket.SendAsync(sendBuffer, WebSocketMessageType.Text, true, CancellationToken.None);

Console.ReadLine();


interface IMyService
{
    Task<string> Echo(string message);
}

 

이제 이전과 같은 동작을 하게 된다. 

server 실행 화면

 

client 실행화면

 

 

관련영상

https://youtu.be/Cqpn35n0at0

 

반응형