~ruther/NosSmooth.Comms

ref: 59f89c10d3b19d2dff02b651d893181519a7f3e0 NosSmooth.Comms/src/Core/NosSmooth.Comms.Core/MessageHandler.cs -rw-r--r-- 3.9 KiB
59f89c10 — František Boháček feat: add named pipes implementation 2 years ago
                                                                                
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
//
//  MessageHandler.cs
//
//  Copyright (c) František Boháček. All rights reserved.
//  Licensed under the MIT license. See LICENSE file in the project root for full license information.

using System.Reflection;
using System.Runtime.InteropServices.JavaScript;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
using NosSmooth.Comms.Data;
using NosSmooth.Comms.Data.Messages;
using NosSmooth.Comms.Data.Responders;
using Remora.Results;

namespace NosSmooth.Comms.Core;

/// <summary>
/// An executor of message responders.
/// </summary>
public class MessageHandler
{
    private readonly IServiceProvider _services;
    private readonly bool _respond;

    /// <summary>
    /// Initializes a new instance of the <see cref="MessageHandler"/> class.
    /// </summary>
    /// <param name="services">The services.</param>
    /// <param name="respond">Whether to respond to the messages.</param>
    public MessageHandler(IServiceProvider services, bool respond)
    {
        _services = services;
        _respond = respond;
    }

    /// <summary>
    /// Handle the given message, call responders.
    /// </summary>
    /// <param name="connection">The connection the message comes from.</param>
    /// <param name="wrappedMessage">The message to handle.</param>
    /// <param name="ct">The cancellation token used for cancelling the operation.</param>
    /// <returns>A result that may or may not have succeeded.</returns>
    public async Task<Result> HandleMessageAsync
        (ConnectionHandler connection, object wrappedMessage, CancellationToken ct)
    {
        var wrappedType = wrappedMessage.GetType();
        if (!wrappedType.IsGenericType)
        {
            return new GenericError($"Message type is not MessageWrapper<>, but {wrappedType.FullName}");
        }

        if (wrappedType.GetGenericTypeDefinition() != typeof(MessageWrapper<>))
        {
            return new GenericError($"Message type is not MessageWrapper<>, but {wrappedType.FullName}");
        }

        var messageType = wrappedType.GetGenericArguments().First();

        var handleMessageMethod = GetType().GetMethod
            (nameof(GenericHandleMessageAsync), BindingFlags.NonPublic | BindingFlags.Instance)!.MakeGenericMethod
            (new[] { messageType });

        var task = (Task<Result>)handleMessageMethod.Invoke(this, new[] { connection, wrappedMessage, ct })!;
        return await task;
    }

    private async Task<Result> GenericHandleMessageAsync<TMessage>
        (ConnectionHandler connection, MessageWrapper<TMessage> wrappedMessage, CancellationToken ct)
    {
        var data = wrappedMessage.Data;

        await using var scope = _services.CreateAsyncScope();
        var injector = scope.ServiceProvider.GetRequiredService<ConnectionInjector>();
        injector.ConnectionHandler = connection;
        injector.Connection = connection.Connection;

        var responders = scope.ServiceProvider
            .GetServices<IMessageResponder<TMessage>>()
            .Select(x => x.Respond(data, ct));

        var results = (await Task.WhenAll(responders))
            .Where(x => !x.IsSuccess)
            .Cast<IResult>()
            .ToList();

        var result = results.Count switch
        {
            0 => Result.FromSuccess(),
            1 => (Result)results[0],
            _ => new AggregateError(results)
        };

        if (_respond && wrappedMessage.Data is not ResponseResult)
        {
            var response = new ResponseResult(wrappedMessage.MessageId, result);
            var sentMessageResult = await connection.SendMessageAsync(response, ct);
            if (!sentMessageResult.IsSuccess)
            {
                results.Add(sentMessageResult);
                result = results.Count switch
                {
                    0 => Result.FromSuccess(),
                    1 => (Result)results[0],
                    _ => new AggregateError(results)
                };
            }
        }

        return result;
    }
}