// // ConnectionHandler.cs // // Copyright (c) František Boháček. All rights reserved. // Licensed under the MIT license. See LICENSE file in the project root for full license information. using MessagePack; using Microsoft.Extensions.Logging; using Microsoft.Extensions.Options; using NosSmooth.Comms.Data; using NosSmooth.Comms.Data.Messages; using NosSmooth.Core.Contracts; using NosSmooth.Core.Extensions; using Remora.Results; namespace NosSmooth.Comms.Core; /// /// Manages a connection, calls message handler when message is received. /// Serializes and deserializes the messages from the stream. /// public class ConnectionHandler { private readonly Contractor? _contractor; private readonly IConnection _connection; private readonly MessageHandler _messageHandler; private readonly MessagePackSerializerOptions _options; private readonly ILogger _logger; private long _messageId = 1; private Task? _task; /// /// Initializes a new instance of the class. /// /// The contractor. /// The connection. /// The message handler. /// The options. /// The logger. public ConnectionHandler ( Contractor? contractor, IConnection connection, MessageHandler messageHandler, IOptions options, ILogger logger ) { _contractor = contractor; _connection = connection; _messageHandler = messageHandler; _options = options.Value; _logger = logger; } /// /// Gets the connection. /// public IConnection Connection => _connection; /// /// Run the handler and await the task. /// /// The token used for stopping the handler and disconnecting the connection. /// A result that may or may not have succeeded. public Task RunHandlerAsync(CancellationToken stopToken) { StartHandler(stopToken); return _task!; } /// /// Start the connection handler task, do not wait for it. /// /// The token used for stopping/disconnecting the connection and handling. public void StartHandler(CancellationToken stopToken) { if (_task is not null) { return; } _task = HandlerTask(stopToken); } private async Task HandlerTask(CancellationToken ct) { using var reader = new MessagePackStreamReader(_connection.ReadStream, true); while (!ct.IsCancellationRequested) { try { var read = await reader.ReadAsync(ct); if (!read.HasValue) { _logger.LogWarning("Message not read? ..."); continue; } var message = MessagePackSerializer.Typeless.Deserialize (read.Value, _options, ct); var result = await _messageHandler.HandleMessageAsync(this, message, ct); if (!result.IsSuccess) { _logger.LogResultError(result); } } catch (Exception e) { _logger.LogError(e, "An exception was thrown during deserialization of a message."); } } _connection.Disconnect(); return Result.FromSuccess(); } /// /// Create a contract for sending a message, /// will be returned back. /// /// The message. /// The type of the message. /// A contract representing send message operation. /// Thrown in case contract is created on the server. Clients do not send responses. public IContract ContractSendMessage(TMessage message) { if (_contractor is null) { throw new InvalidOperationException ( "Contracting is not supported, the other side does not send responses. Only server sends responses back." ); } long messageId = 0; return new ContractBuilder(_contractor, DefaultStates.None) .SetMoveAction ( DefaultStates.None, async (a, ct) => { var result = await SendMessageAsync(message, ct); if (!result.IsDefined(out messageId)) { return Result.FromError(result); } return true; }, DefaultStates.Requested ) .SetMoveFilter (DefaultStates.Requested, (r) => r.MessageId == messageId, DefaultStates.ResponseObtained) .SetFillData(DefaultStates.ResponseObtained, r => r.Result) .Build(); } /// /// Send message to the other end. /// /// The message to send. It will be wrapped before sending. /// The cancellation token used for cancelling the operation. /// Type of the message to send. /// The id of the message sent or an error. public async Task> SendMessageAsync(TMessage message, CancellationToken ct = default) { var messageId = _messageId++; var messageWrapper = new MessageWrapper(1, messageId, message); try { await MessagePackSerializer.Typeless.SerializeAsync(_connection.WriteStream, messageWrapper, _options, ct); await _connection.WriteStream.FlushAsync(ct); } catch (Exception e) { return e; } return messageId; } }