// // ServerManager.cs // // Copyright (c) František Boháček. All rights reserved. // Licensed under the MIT license. See LICENSE file in the project root for full license information. using Microsoft.Extensions.Logging; using Microsoft.Extensions.Options; using NosSmooth.Comms.Data; using NosSmooth.Core.Extensions; using Remora.Results; namespace NosSmooth.Comms.Core; /// /// Manages a server, awaits connections, handles messages. /// public class ServerManager { private readonly IServer _server; private readonly MessageHandler _messageHandler; private readonly IOptions _options; private readonly ILogger _logger; private readonly ILogger _handlerLogger; private readonly List _connectionHandlers; private readonly ReaderWriterLockSlim _readerWriterLock; private Task? _task; private CancellationTokenSource? _ctSource; /// /// Initializes a new instance of the class. /// /// The server to manage. /// The message handler. /// The options. /// The logger. /// The logger for message handler. public ServerManager ( IServer server, MessageHandler messageHandler, IOptions options, ILogger logger, ILogger handlerLogger ) { _server = server; _connectionHandlers = new List(); _readerWriterLock = new ReaderWriterLockSlim(); _messageHandler = messageHandler; _options = options; _logger = logger; _handlerLogger = handlerLogger; } /// /// Gets connection handlers. /// public IReadOnlyList ConnectionHandlers { get { _readerWriterLock.EnterReadLock(); try { return _connectionHandlers.ToArray(); } finally { _readerWriterLock.ExitReadLock(); } } } /// /// Run the manager and await the task. /// /// The token used for stopping the handler and disconnecting the connection. /// A result that may or may not have succeeded. public Task RunManagerAsync(CancellationToken stopToken) { StartManager(stopToken); return _task!; } /// /// Broadcast the given message to all clients. /// /// The message to broadcast. /// The cancellation token used for cancelling the operation. /// The type of the message. /// A result that may or may not have succeeded. public async Task BroadcastAsync(TMessage message, CancellationToken ct = default) { var errors = new List(); foreach (var handler in _connectionHandlers) { var result = await handler.SendMessageAsync(message, ct); if (!result.IsSuccess) { errors.Add(Result.FromError(result)); } } return errors.Count switch { 0 => Result.FromSuccess(), 1 => (Result)errors[0], _ => new AggregateError(errors) }; } /// /// Run the handler without awaiting the task. /// /// The token used for stopping the handler and disconnecting the connection. public void StartManager(CancellationToken stopToken = default) { if (_task is not null) { return; } _ctSource = CancellationTokenSource.CreateLinkedTokenSource(stopToken); _task = ManagerTask(); } /// /// Request stop the server. /// public void RequestStop() { _ctSource?.Cancel(); } private async Task ManagerTask() { if (_ctSource is null) { throw new InvalidOperationException("The ct source is not initialized."); } await _server.ListenAsync(_ctSource!.Token); while (!_ctSource.IsCancellationRequested) { var connectionResult = await _server.WaitForConnectionAsync(_ctSource.Token); if (!connectionResult.IsDefined(out var connection)) { _logger.LogResultError(connectionResult); continue; } var handler = new ConnectionHandler ( null, connection, _messageHandler, _options, _handlerLogger ); _readerWriterLock.EnterWriteLock(); try { _connectionHandlers.Add(handler); } finally { _readerWriterLock.ExitWriteLock(); } handler.Closed += (o, e) => { _logger.LogInformation("A connection ({ConnectionId}) has been closed", handler.Id); _readerWriterLock.EnterWriteLock(); try { _connectionHandlers.Remove(handler); } finally { _readerWriterLock.ExitWriteLock(); } }; _logger.LogInformation("A connection ({ConnectionId}) has been established", handler.Id); handler.StartHandler(_ctSource.Token); } List errors = new List(); foreach (var handler in ConnectionHandlers) { var handlerResult = await handler.RunHandlerAsync(_ctSource.Token); if (!handlerResult.IsSuccess) { errors.Add(handlerResult); } } _readerWriterLock.EnterWriteLock(); try { _connectionHandlers.Clear(); } finally { _readerWriterLock.ExitWriteLock(); } _server.Close(); return errors.Count switch { 0 => Result.FromSuccess(), 1 => (Result)errors[0], _ => new AggregateError(errors) }; } }