~ruther/NosSmooth.Comms

ref: 105312436687e9edc6d9a5627831c6ef66278b1a NosSmooth.Comms/src/Core/NosSmooth.Comms.Core/MessageHandler.cs -rw-r--r-- 4.8 KiB
10531243 — Rutherther feat: add client state 2 years ago
                                                                                
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
//
//  MessageHandler.cs
//
//  Copyright (c) František Boháček. All rights reserved.
//  Licensed under the MIT license. See LICENSE file in the project root for full license information.

using System.Diagnostics.Contracts;
using System.Reflection;
using System.Runtime.InteropServices.JavaScript;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
using NosSmooth.Comms.Core.Errors;
using NosSmooth.Comms.Data;
using NosSmooth.Comms.Data.Messages;
using NosSmooth.Comms.Data.Responders;
using NosSmooth.Core.Contracts;
using Remora.Results;

namespace NosSmooth.Comms.Core;

/// <summary>
/// An executor of message responders.
/// </summary>
public class MessageHandler
{
    private readonly IServiceProvider _services;
    private readonly bool _respond;

    /// <summary>
    /// Initializes a new instance of the <see cref="MessageHandler"/> class.
    /// </summary>
    /// <param name="services">The services.</param>
    /// <param name="respond">Whether to respond to the messages.</param>
    public MessageHandler(IServiceProvider services, bool respond)
    {
        _services = services;
        _respond = respond;
    }

    /// <summary>
    /// Handle the given message, call responders.
    /// </summary>
    /// <param name="connection">The connection the message comes from.</param>
    /// <param name="wrappedMessage">The message to handle.</param>
    /// <param name="ct">The cancellation token used for cancelling the operation.</param>
    /// <returns>A result that may or may not have succeeded.</returns>
    public async Task<Result> HandleMessageAsync
        (ConnectionHandler connection, object wrappedMessage, CancellationToken ct)
    {
        var wrappedType = wrappedMessage.GetType();
        if (!wrappedType.IsGenericType)
        {
            return new GenericError($"Message type is not MessageWrapper<>, but {wrappedType.FullName}");
        }

        if (wrappedType.GetGenericTypeDefinition() != typeof(MessageWrapper<>))
        {
            return new GenericError($"Message type is not MessageWrapper<>, but {wrappedType.FullName}");
        }

        var messageType = wrappedType.GetGenericArguments().First();

        var handleMessageMethod = GetType().GetMethod
            (nameof(GenericHandleMessageAsync), BindingFlags.NonPublic | BindingFlags.Instance)!.MakeGenericMethod
            (new[] { messageType });

        var task = (Task<Result>)handleMessageMethod.Invoke(this, new[] { connection, wrappedMessage, ct })!;
        return await task;
    }

    private async Task<Result> GenericHandleMessageAsync<TMessage>
        (ConnectionHandler connection, MessageWrapper<TMessage> wrappedMessage, CancellationToken ct)
        where TMessage : notnull
    {
        var data = wrappedMessage.Data;

        var contractor = _services.GetService<Contractor>();
        if (contractor is not null)
        {
            var contractorResult = await contractor.Update(wrappedMessage.Data, ct);
            if (!contractorResult.IsSuccess)
            {
                return contractorResult;
            }
        }

        await using var scope = _services.CreateAsyncScope();
        var injector = scope.ServiceProvider.GetRequiredService<ConnectionInjector>();
        injector.ConnectionHandler = connection;
        injector.Connection = connection.Connection;

        var responders = scope.ServiceProvider
            .GetServices<IMessageResponder<TMessage>>()
            .Select(x => SafeCall(x.Respond(data, ct)))
            .ToArray();

        var results = (await Task.WhenAll(responders))
            .Where(x => !x.IsSuccess)
            .Cast<IResult>()
            .ToList();

        var result = results.Count switch
        {
            0 => Result.FromSuccess(),
            1 => (Result)results[0],
            _ => new AggregateError(results)
        };

        if (_respond && wrappedMessage.Data is not ResponseResult)
        {
            var responseResult = result;
            if (responders.Length == 0)
            {
                responseResult = new MessageHandlerNotFoundError();
            }

            var response = new ResponseResult(wrappedMessage.MessageId, responseResult);
            var sentMessageResult = await connection.SendMessageAsync(response, ct);
            if (!sentMessageResult.IsSuccess)
            {
                results.Add(Result.FromError(sentMessageResult));
                result = results.Count switch
                {
                    0 => Result.FromSuccess(),
                    1 => (Result)results[0],
                    _ => new AggregateError(results)
                };
            }
        }

        return result;
    }

    private async Task<Result> SafeCall(Task<Result> respond)
    {
        try
        {
            return await respond;
        }
        catch (Exception e)
        {
            return e;
        }
    }
}
Do not follow this link