Pillar框架---websocketServer
最开始设计这个框架的时候是用了开发即时通讯系统,性能稳健。它是一个基于 .NET 8.0 的 WebSocket 服务器库,提供了可扩展的 WebSocket 通信功能。
主要特性
- 支持客户端证书验证
- 连接管理和限流
- 事件驱动的消息处理
- 支持广播和定向消息
- 用户会话管理
- 可扩展的认证机制
安装
dotnet add package Pillar.WebSocketServer
services.AddChatServer(options =>
{
options.Ssl.Enable = useSSL;
options.Ssl.CertificatePath = "server.crt";
options.Ssl.PrivateKeyPath = "server.key";
options.Ssl.CACertificatePath = "ca.crt";
options.Ssl.RequireClientCertificate = true;
options.Environment=EnvironmentType.Development;
});
// 配置服务器
var server = new ChatServer(options =>
{
options.MaxConnections = 1000;
options.MaxConcurrentOperations = 100;
options.Environment = EnvironmentType.Production;
});
// 注册事件处理
server.OnClientConnected += (sender, e) =>
{
Console.WriteLine($"Client {e.Client.ClientId} connected");
};
// 启动服务器
await server.StartAsync(new IPEndPoint(IPAddress.Any, 8080));
// 广播消息
await server.BroadcastAsync("Hello World!");
// 发送给特定用户
await server.SendToUserAsync("userId", "Private message");
public class ChatServerOptions
{
/// <summary>
/// 服务器监听端口
/// </summary>
public int Port { get; set; } = 8080;
/// <summary>
/// 服务器监听地址
/// </summary>
public string IPAddress { get; set; } = "0.0.0.0";
/// <summary>
/// 最大连接数
/// </summary>
public int MaxConnections { get; set; } = 1000;
/// <summary>
/// 最大并发操作数
/// </summary>
public int MaxConcurrentOperations { get; set; } = 100;
public ChatClientOptions ChatClientOptions { get; set; } = new ChatClientOptions();
/// <summary>
/// 运行环境
/// </summary>
public EnvironmentType Environment { get; set; } = EnvironmentType.Development;
/// <summary>
/// SSL/TLS配置
/// </summary>
public SslOptions Ssl { get; set; } = new SslOptions();
/// <summary>
/// 重启策略配置
/// </summary>
public RestartOptions Restart { get; set; } = new RestartOptions();
/// <summary>
/// 心跳配置
/// </summary>
public HeartbeatOptions Heartbeat { get; set; } = new HeartbeatOptions();
/// <summary>
/// 连接配置
/// </summary>
public ConnectionOptions Connection { get; set; } = new ConnectionOptions();
}
public class ChatClientOptions
{
/// <summary>
/// 客户端连接的消息缓冲队列大小,当客户端收到新消息时,会先放入这个队列,如果队列已满(达到1000条消息),新消息会被拒绝或阻塞
/// </summary>
public int MaxQueueSize { get; set; } = 1000;
// 服务器不会一收到消息就立即处理,而是等待积累到20条消息后,一次性处理
public int BatchSize { get; set; } = 20;
//如果消息数量还没到BatchSize(20条)但已经等待了20毫秒,也会触发一次处理
public int BatchTimeout { get; set; } = 20;
}
public class SslOptions
{
/// <summary>
/// 是否启用SSL/TLS
/// </summary>
public bool Enable { get; set; } = false;
/// <summary>
/// 证书文件路径(.crt文件)
/// </summary>
public string CertificatePath { get; set; }
/// <summary>
/// 私钥文件路径(.key文件)
/// </summary>
public string PrivateKeyPath { get; set; }
/// <summary>
/// CA证书路径(.crt文件)
/// </summary>
public string CACertificatePath { get; set; }
/// <summary>
/// PEM格式证书路径(如果使用.pem文件)
/// </summary>
public string PemCertificatePath { get; set; }
/// <summary>
/// 证书密码(如果有的话)
/// </summary>
public string CertificatePassword { get; set; }
/// <summary>
/// 是否需要客户端证书
/// </summary>
public bool RequireClientCertificate { get; set; } = false;
#if NET5_0_OR_GREATER
/// <summary>
/// 允许的SSL/TLS协议版本
/// </summary>
public SslProtocols EnabledProtocols { get; set; } = SslProtocols.Tls12 | SslProtocols.Tls13;
#else
/// <summary>
/// 允许的SSL/TLS协议版本
/// </summary>
public SslProtocols EnabledProtocols { get; set; } = SslProtocols.Tls12;
#endif
}
public class RestartOptions
{
/// <summary>
/// 是否启用自动重启
/// </summary>
public bool Enable { get; set; } = true;
/// <summary>
/// 最大重试次数,-1表示无限重试
/// </summary>
public int MaxRetries { get; set; } = 3;
/// <summary>
/// 基础重试间隔(毫秒)
/// </summary>
public int BaseIntervalMs { get; set; } = 5000;
/// <summary>
/// 最大重试间隔(毫秒)
/// </summary>
public int MaxIntervalMs { get; set; } = 300000; // 5分钟
/// <summary>
/// 重启策略
/// </summary>
public RestartStrategy Strategy { get; set; } = RestartStrategy.ExponentialBackoff;
/// <summary>
/// 重启时是否等待现有连接关闭
/// </summary>
public bool WaitForExistingConnections { get; set; } = true;
/// <summary>
/// 等待现有连接关闭的超时时间(毫秒)
/// </summary>
public int ConnectionGracePeriodMs { get; set; } = 30000;
}
public enum RestartStrategy
{
/// <summary>
/// 固定间隔
/// </summary>
FixedInterval,
/// <summary>
/// 指数退避
/// </summary>
ExponentialBackoff,
/// <summary>
/// 线性递增
/// </summary>
LinearBackoff
}
public class HeartbeatOptions
{
/// <summary>
/// 心跳超时时间(秒)
/// </summary>
public int TimeoutSeconds { get; set; } = 90;
}
核心组件
IChatServer 接口
WebSocket 服务器的主要接口,定义了服务器的基本功能:
public interface IChatServer
{
// 事件
event EventHandler<ChatEventArgs> OnClientConnected;
event EventHandler<ChatEventArgs> OnClientDisconnected;
event EventHandler<ClientAuthEventArgs> OnClientAuthenticating;
// 服务器控制
Task StartAsync(IPEndPoint endpoint, CancellationToken cancellationToken = default);
Task StopAsync();
// 消息发送
Task BroadcastAsync(string message);
Task SendToUserAsync(string userId, string message);
// 客户端管理
Task DisconnectUserAsync(string userId);
int GetConnectedClientCount();
IEnumerable<ChatClient> GetAllClients();
}
ConnectionManager 类
负责管理 WebSocket 连接的核心组件:
- 连接数量限制
- 并发操作控制
- 用户会话跟踪
- 连接生命周期管理
public class ConnectionManager
{
// 添加客户端连接
public async Task<bool> TryAddClientAsync(string clientId, ChatClient client);
// 广播消息
public async Task BroadcastAsync(byte[] message, string excludeClientId = null);
// 发送消息给指定用户
public async Task SendToUserAsync(string userId, byte[] message);
// 断开用户连接
public async Task DisconnectUserAsync(string userId);
}
证书验证
提供三种环境的证书验证策略:
开发环境 (Development)
- 宽松的验证规则
- 忽略证书链验证错误
- 适合开发测试使用
测试环境 (Staging)
- 中等严格度的验证
- 忽略吊销相关错误
- 验证证书链
生产环境 (Production)
- 严格的证书验证
- 在线吊销检查
- 验证客户端认证用途
- 完整的证书链验证
- 有效期检查
认证与扩展
提供可扩展的认证和用户信息机制:
public interface IUserIdProvider
{
string GetUserId(ChatClient client);
}
public interface IClientExtrasProvider
{
IDictionary<string, object> GetExtras(ChatClient client);
}
事件系统
连接事件
- ChatEventArgs: 客户端连接/断开事件
- ClientAuthEventArgs: 客户端认证事件
技术特性
System.IO.Pipelines
- 高性能的异步I/O处理
- 零拷贝的内存管理
- 高效的缓冲区管理
- 自动内存管理
- 支持多种I/O模型
public class WebSocketHandler
{
private readonly PipeReader _reader;
private readonly PipeWriter _writer;
public async Task ProcessMessageAsync()
{
ReadResult result = await _reader.ReadAsync();
ReadOnlySequence<byte> buffer = result.Buffer;
// 处理消息...
}
}
Channel
- 异步消息队列
- 生产者-消费者模式
- 线程安全的消息传递
private readonly Channel<WebSocketMessage> _messageChannel;
public async Task EnqueueMessageAsync(WebSocketMessage message)
{
await _messageChannel.Writer.WriteAsync(message);
}
ValueTask
- 高性能的异步任务返回
- 减少内存分配
- 提高性能
public ValueTask<bool> TrySendMessageAsync(string userId, byte[] message)
{
if (!_connections.TryGetValue(userId, out var client))
return new ValueTask<bool>(false);
return new ValueTask<bool>(SendCoreAsync(client, message));
}
SemaphoreSlim
- 并发连接限制
- 资源访问控制
private readonly SemaphoreSlim _connectionSemaphore;
public async Task<bool> TryAddConnectionAsync(WebSocketConnection connection)
{
if (!await _connectionSemaphore.WaitAsync(TimeSpan.FromSeconds(5)))
return false;
try
{
// 添加连接...
}
finally
{
_connectionSemaphore.Release();
}
}
ConcurrentDictionary
- 线程安全的集合
- 高性能并发操作
private readonly ConcurrentDictionary<string, WebSocketConnection> _connections;
ArrayPool
- 内存池化
- 减少 GC 压力
private readonly ArrayPool<byte> _arrayPool;
public byte[] RentBuffer()
{
return _arrayPool.Rent(4096);
}
Memory 和 Span
- 高效内存操作
- 零拷贝切片
public async Task ProcessMessageAsync(Memory<byte> buffer)
{
if (TryParseMessage(buffer.Span, out var message))
{
await HandleMessageAsync(message);
}
}
ObjectPool
- 对象池化
- 减少对象创建开销
private readonly ObjectPool<WebSocketMessage> _messagePool;