~ruther/NosSmooth.Comms

ref: ecfc8cd7dc9143e9550044f3f5971bf8631f51b7 NosSmooth.Comms/src/Core/NosSmooth.Comms.Core/MessageHandler.cs -rw-r--r-- 4.6 KiB
ecfc8cd7 — Rutherther feat: add local injectable and injector to allow making connections to nostale processes 2 years ago
                                                                                
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
//
//  MessageHandler.cs
//
//  Copyright (c) František Boháček. All rights reserved.
//  Licensed under the MIT license. See LICENSE file in the project root for full license information.

using System.Diagnostics.Contracts;
using System.Reflection;
using System.Runtime.InteropServices.JavaScript;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
using NosSmooth.Comms.Core.Errors;
using NosSmooth.Comms.Data;
using NosSmooth.Comms.Data.Messages;
using NosSmooth.Comms.Data.Responders;
using NosSmooth.Core.Contracts;
using Remora.Results;

namespace NosSmooth.Comms.Core;

/// <summary>
/// An executor of message responders.
/// </summary>
public class MessageHandler
{
    private readonly IServiceProvider _services;
    private readonly bool _respond;

    /// <summary>
    /// Initializes a new instance of the <see cref="MessageHandler"/> class.
    /// </summary>
    /// <param name="services">The services.</param>
    /// <param name="respond">Whether to respond to the messages.</param>
    public MessageHandler(IServiceProvider services, bool respond)
    {
        _services = services;
        _respond = respond;
    }

    /// <summary>
    /// Handle the given message, call responders.
    /// </summary>
    /// <param name="connection">The connection the message comes from.</param>
    /// <param name="wrappedMessage">The message to handle.</param>
    /// <param name="ct">The cancellation token used for cancelling the operation.</param>
    /// <returns>A result that may or may not have succeeded.</returns>
    public async Task<Result> HandleMessageAsync
        (ConnectionHandler connection, object wrappedMessage, CancellationToken ct)
    {
        var wrappedType = wrappedMessage.GetType();
        if (!wrappedType.IsGenericType)
        {
            return new GenericError($"Message type is not MessageWrapper<>, but {wrappedType.FullName}");
        }

        if (wrappedType.GetGenericTypeDefinition() != typeof(MessageWrapper<>))
        {
            return new GenericError($"Message type is not MessageWrapper<>, but {wrappedType.FullName}");
        }

        var messageType = wrappedType.GetGenericArguments().First();

        var handleMessageMethod = GetType().GetMethod
            (nameof(GenericHandleMessageAsync), BindingFlags.NonPublic | BindingFlags.Instance)!.MakeGenericMethod
            (new[] { messageType });

        var task = (Task<Result>)handleMessageMethod.Invoke(this, new[] { connection, wrappedMessage, ct })!;
        return await task;
    }

    private async Task<Result> GenericHandleMessageAsync<TMessage>
        (ConnectionHandler connection, MessageWrapper<TMessage> wrappedMessage, CancellationToken ct)
        where TMessage : notnull
    {
        var data = wrappedMessage.Data;

        var contractor = _services.GetService<Contractor>();
        if (contractor is not null)
        {
            var contractorResult = await contractor.Update(wrappedMessage.Data, ct);
            if (!contractorResult.IsSuccess)
            {
                return contractorResult;
            }
        }

        await using var scope = _services.CreateAsyncScope();
        var injector = scope.ServiceProvider.GetRequiredService<ConnectionInjector>();
        injector.ConnectionHandler = connection;
        injector.Connection = connection.Connection;

        var responders = scope.ServiceProvider
            .GetServices<IMessageResponder<TMessage>>()
            .Select(x => x.Respond(data, ct))
            .ToArray();

        var results = (await Task.WhenAll(responders))
            .Where(x => !x.IsSuccess)
            .Cast<IResult>()
            .ToList();

        var result = results.Count switch
        {
            0 => Result.FromSuccess(),
            1 => (Result)results[0],
            _ => new AggregateError(results)
        };

        if (_respond && wrappedMessage.Data is not ResponseResult)
        {
            var responseResult = result;
            if (responders.Length == 0)
            {
                responseResult = new MessageHandlerNotFoundError();
            }

            var response = new ResponseResult(wrappedMessage.MessageId, responseResult);
            var sentMessageResult = await connection.SendMessageAsync(response, ct);
            if (!sentMessageResult.IsSuccess)
            {
                results.Add(Result.FromError(sentMessageResult));
                result = results.Count switch
                {
                    0 => Result.FromSuccess(),
                    1 => (Result)results[0],
                    _ => new AggregateError(results)
                };
            }
        }

        return result;
    }
}
Do not follow this link