//
// ConnectionHandler.cs
//
// Copyright (c) František Boháček. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.
using MessagePack;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using NosSmooth.Comms.Data;
using NosSmooth.Comms.Data.Messages;
using NosSmooth.Core.Contracts;
using NosSmooth.Core.Extensions;
using Remora.Results;
namespace NosSmooth.Comms.Core;
///
/// Manages a connection, calls message handler when message is received.
/// Serializes and deserializes the messages from the stream.
///
public class ConnectionHandler
{
private readonly Contractor? _contractor;
private readonly IConnection _connection;
private readonly MessageHandler _messageHandler;
private readonly MessagePackSerializerOptions _options;
private readonly ILogger _logger;
private long _messageId = 1;
private Task? _task;
///
/// Initializes a new instance of the class.
///
/// The contractor.
/// The connection.
/// The message handler.
/// The options.
/// The logger.
public ConnectionHandler
(
Contractor? contractor,
IConnection connection,
MessageHandler messageHandler,
IOptions options,
ILogger logger
)
{
_contractor = contractor;
_connection = connection;
_messageHandler = messageHandler;
_options = options.Value;
_logger = logger;
}
///
/// Gets the connection.
///
public IConnection Connection => _connection;
///
/// Run the handler and await the task.
///
/// The token used for stopping the handler and disconnecting the connection.
/// A result that may or may not have succeeded.
public Task RunHandlerAsync(CancellationToken stopToken)
{
StartHandler(stopToken);
return _task!;
}
///
/// Start the connection handler task, do not wait for it.
///
/// The token used for stopping/disconnecting the connection and handling.
public void StartHandler(CancellationToken stopToken)
{
if (_task is not null)
{
return;
}
_task = HandlerTask(stopToken);
}
private async Task HandlerTask(CancellationToken ct)
{
using var reader = new MessagePackStreamReader(_connection.ReadStream, true);
while (!ct.IsCancellationRequested)
{
try
{
var read = await reader.ReadAsync(ct);
if (!read.HasValue)
{
_logger.LogWarning("Message not read? ...");
continue;
}
var message = MessagePackSerializer.Typeless.Deserialize
(read.Value, _options, ct);
var result = await _messageHandler.HandleMessageAsync(this, message, ct);
if (!result.IsSuccess)
{
_logger.LogResultError(result);
}
}
catch (Exception e)
{
_logger.LogError(e, "An exception was thrown during deserialization of a message.");
}
}
_connection.Disconnect();
return Result.FromSuccess();
}
///
/// Create a contract for sending a message,
/// will be returned back.
///
/// The message.
/// The type of the message.
/// A contract representing send message operation.
/// Thrown in case contract is created on the server. Clients do not send responses.
public IContract ContractSendMessage(TMessage message)
{
if (_contractor is null)
{
throw new InvalidOperationException
(
"Contracting is not supported, the other side does not send responses. Only server sends responses back."
);
}
long messageId = 0;
return new ContractBuilder(_contractor, DefaultStates.None)
.SetMoveAction
(
DefaultStates.None,
async (a, ct) =>
{
var result = await SendMessageAsync(message, ct);
if (!result.IsDefined(out messageId))
{
return Result.FromError(result);
}
return true;
},
DefaultStates.Requested
)
.SetMoveFilter
(DefaultStates.Requested, (r) => r.MessageId == messageId, DefaultStates.ResponseObtained)
.SetFillData(DefaultStates.ResponseObtained, r => r.Result)
.Build();
}
///
/// Send message to the other end.
///
/// The message to send. It will be wrapped before sending.
/// The cancellation token used for cancelling the operation.
/// Type of the message to send.
/// The id of the message sent or an error.
public async Task> SendMessageAsync(TMessage message, CancellationToken ct = default)
{
var messageId = _messageId++;
var messageWrapper = new MessageWrapper(1, messageId, message);
try
{
await MessagePackSerializer.Typeless.SerializeAsync(_connection.WriteStream, messageWrapper, _options, ct);
await _connection.WriteStream.FlushAsync(ct);
}
catch (Exception e)
{
return e;
}
return messageId;
}
}