WebSocket 에 JsonRPC 및 일반메세지 함께 처리하기

2023. 3. 13. 00:00CSharp

반응형

StreamJsonRPC Library 를 이용해서 WebSocket 에서 JSON-Rpc 를 처리 하였다.

이렇게 하게 되면 JSON-RPC 는 처리가 되지만

일반적인 Plain Text 를 (Json rpc 형태가 아닌) 처리하지 못하게 된다. 

그래서 이 두가지를 다 활용하기 위해서 StreamJsonRPC 에 WebSocketMessageHandler 를 

조금 수정할 것이다. 

그런데 이 Library 가 확장에 전혀 신경을 쓰지 않은듯한 느낌이다.

어쩔 수 없이 해당  Library 의 WebSocketMessageHandler 부분을 참고하여

ReadCoreAsync 부분을 수정한 후 작업해 볼것이다. 

 

EventHandler 형태의 event 를 노출할 것이다. RawMessageArrived 라는 형태로 만들고

사용하는 측에서 저 event 를 등록하면 string 형태로 받은 message 를 전달해 줄것이다. 

 

그럼 일단 구현한 코드를 확인해 보자!!!

 

 WbSocketMessageHandler 를 상속받아서 다시 구현할 것이다.

WebSocketAllMessageHandler.cs 

public class WebSocketAllMessageHandler : WebSocketMessageHandler
{
    public event EventHandler<string> RawMessageArrived;
    private readonly int sizeHint;

    private readonly Sequence<byte> contentSequenceBuilder = new Sequence<byte>();

    private IJsonRpcMessageBufferManager? bufferedMessage;

    public WebSocketAllMessageHandler(WebSocket webSocket)
       : this(webSocket, new JsonMessageFormatter())
    {
    }

    public WebSocketAllMessageHandler(WebSocket webSocket, IJsonRpcMessageFormatter formatter, int sizeHint = 4096)
        : base(webSocket, formatter, sizeHint)
    {
        this.sizeHint = sizeHint;
    }

    /// <inheritdoc />
    protected override async ValueTask<JsonRpcMessage?> ReadCoreAsync(CancellationToken cancellationToken)
    {
#if NETSTANDARD2_1_OR_GREATER
        ValueWebSocketReceiveResult result;
#else
        WebSocketReceiveResult result;
#endif
        do
        {
#if NETSTANDARD2_1_OR_GREATER
            Memory<byte> memory = this.contentSequenceBuilder.GetMemory(this.sizeHint);
            result = await this.WebSocket.ReceiveAsync(memory, cancellationToken).ConfigureAwait(false);
            this.contentSequenceBuilder.Advance(result.Count);
#else
            ArrayPool<byte> pool = ArrayPool<byte>.Shared;
            byte[] segment = pool.Rent(this.sizeHint);
            try
            {
                result = await this.WebSocket.ReceiveAsync(new ArraySegment<byte>(segment), cancellationToken).ConfigureAwait(false);
                this.contentSequenceBuilder.Write(segment.AsSpan(0, result.Count));
            }
            finally
            {
                pool.Return(segment);
            }
#endif
            if (result.MessageType == WebSocketMessageType.Close)
            {
                await this.WebSocket.CloseAsync(WebSocketCloseStatus.NormalClosure, "Closed as requested.", CancellationToken.None).ConfigureAwait(false);
                return null;
            }
        }
        while (!result.EndOfMessage);

        if (this.contentSequenceBuilder.AsReadOnlySequence.Length > 0)
        {
            var receivedMessage = Encoding.UTF8.GetString(contentSequenceBuilder.AsReadOnlySequence.ToArray());
            if (IsJsonRpcMessage(receivedMessage))
            {
                JsonRpcMessage message = this.Formatter.Deserialize(this.contentSequenceBuilder);
                this.bufferedMessage = message as IJsonRpcMessageBufferManager;
                if (this.bufferedMessage is null)
                {
                    this.contentSequenceBuilder.Reset();
                }

                return message;
            }
            else
            {
                RawMessageArrived?.Invoke(this, receivedMessage);
                return null;
            }
        }
        else
        {
            return null;
        }

    }

    private bool IsJsonRpcMessage(string message)
    {
        if (string.IsNullOrWhiteSpace(message))
        {
            return false;
        }

        try
        {
            var jsonDoc = JsonDocument.Parse(message);

            if (!jsonDoc.RootElement.TryGetProperty("jsonrpc", out _))
            {
                return false;
            }

            if (!jsonDoc.RootElement.TryGetProperty("method", out _))
            {
                return false;
            }

            return true;
        }
        catch (JsonException)
        {
            return false;
        }
    }


}

코드는 IsJsonRpcMessage method 를 통해 받은 message 가 JSON-RPC 인지 일반 메세지 인지 구분한다. 

그리고 JSON-RPC 메세지라면 기존 WebSocketMessageHandler 에서 처리 하던데로 처리 하고

아니라면 RawMessageArrived?.Invoke(this, receivedMessage); 를 통해 받은 메세지를 전달한다.

 

WebSocketMessageHandler.cs 에 코드를 참조하여 작성하였다. 

상속으로 확장 가능 할 것으로 생각했는데...

protected 로 있어야 할 field 들이나 method , property 들이 ...

무슨생각인지 전부 private 으로 선언되어 있었다...

다른 방식이 있는지 모르겠지만... 

override 가능한 method 는 ReadCoreAsync, WriteCoreAsync 정도고..

솔직히 말해서 확장에 대해서는 별로 신경을 쓴것 같지 않다.

(아니면 내가 잘 모르는 건지...)

 

이제 사용하는 코드를 보자

// See https://aka.ms/new-console-template for more information
using StreamJsonRpc;
using System.Net;
using System.Net.WebSockets;

var listener = new HttpListener();
listener.Prefixes.Add("http://localhost:8080/");
listener.Start();

Console.WriteLine("Listening...");

while (true)
{
    var context = await listener.GetContextAsync();

    if (context.Request.IsWebSocketRequest)
    {
        Console.WriteLine("WebSocket request received");

        var webSocketContext = await context.AcceptWebSocketAsync(subProtocol: null);
        var webSocket = webSocketContext.WebSocket;

        await ProcessJsonRpcAsync(webSocket);

        Console.ReadLine();
    }
    else
    {
        context.Response.StatusCode = (int)HttpStatusCode.BadRequest;
        context.Response.Close();
    }
}

static async Task ProcessJsonRpcAsync(WebSocket webSocket)
{
    var handler = new WebSocketAllMessageHandler(webSocket);
    handler.RawMessageArrived += Handler_RawMessageArrived;
    var rpc = new JsonRpc(handler, new MyService(webSocket));

    rpc.StartListening();
    await rpc.Completion;
}

static void Handler_RawMessageArrived(object? sender, string e)
{
    Console.WriteLine($"Handler_RawMessageArrived = {e}");
}


class MyService
{
    private readonly WebSocket _socket;

    public MyService(WebSocket socket)
    {
        _socket = socket;
    }

    public string Echo(string message)
    {
        Console.WriteLine($"Received message: {message}");
        return "Echo: " + message;
    }
}

이 소스체서는 Handler_RawMessageArrived 를 통해서 일반메세지를 처리 하고 있다. 

JSON-RPC 는 기존처럼 똑같이 StreamJsonRPC Library 가 처리 한다. 

 

Client 는 다음과 같다. 

//See https://aka.ms/new-console-template for more information
using StreamJsonRpc;
using StreamJsonRpc.Protocol;
using System.Net.WebSockets;
using System.Text;

var webSocket = new ClientWebSocket();
await webSocket.ConnectAsync(new Uri("ws://localhost:8080"), CancellationToken.None);

Console.WriteLine("WebSocket connected");
var handler = new WebSocketMessageHandler(webSocket);
var rpc = new JsonRpc(new WebSocketMessageHandler(webSocket));
rpc.StartListening();
var myService = rpc.Attach<IMyService>();
var message = "Hello, World!";
var response = await myService.Echo(message);
Console.WriteLine(response);

var buffer = new byte[1024];

var sendBuffer = new ArraySegment<byte>(Encoding.UTF8.GetBytes(message));

await webSocket.SendAsync(sendBuffer, WebSocketMessageType.Text, true, CancellationToken.None);

Console.ReadLine();


interface IMyService
{
    Task<string> Echo(string message);
}

 

JSON-RPC 를 이용해서 Echo 메소드를 통해 "Hello, World!" 를 전달 하고 response 를 받는다.

websocket 에서 직접 SendAsync 를 통해 "Hello, World!" 를 전달한다. 

 

위와 같이 작업해서 JSON-RPC 와 Non JSON-RPC 메세지를 하나의 WebSocket 에서 처리 하였다. 

좀더 쉬운 방법이 있는지 모르겠지만.. 일단 내가 발견한 것은 위와 같다. 

더 좋은 방법이 있다면 공유 바란다. 

 

PS: 물론 StreamJSONRPC 같은 library 를 사용하지 않고 직접 구현한다면
이런 작업은 하지 않아도 된다.

 

관련영상

https://youtu.be/9rnEGQswidI

 

반응형