LOGO OA教程 ERP教程 模切知识交流 PMS教程 CRM教程 开发文档 其他文档  
 
网站管理员

一分钟实现.NET与飞书长连接的WebSocket架构

freeflydom
2025年12月9日 15:39 本文热度 16

飞书服务端SDK已全面支持Java、Go、Python与Node.js等主流开发语言,然而.NET生态系统的开发者们却面临着官方SDK缺失的困境,这无疑为.NET社区接入飞书平台带来了不便。

一、.net中如何实现飞书WebSocket长连接

为什么选择飞书WebSocket?

相较于传统的Webhook模式,长连接模式大大降低了接入成本,将原先1周左右的开发周期降低到5分钟。

核心优势:

  • 开发便捷:无需公网IP或域名,本地环境即可接收回调
  • 安全传输:内置加密和鉴权,无需额外安全处理
  • 实时性强:消息延迟从分钟级降至毫秒级
  • 资源高效:避免频繁HTTP请求,连接复用多种事件类型

企业级应用场景

飞书平台提供丰富的事件类型,支持:

  • 用户事件:员工入职/离职自动化处理
  • 消息事件:智能客服、消息机器人
  • 审批事件:OA系统集成、自动审批
  • 部门事件:组织架构同步管理
  • 其它事件:.....

Mud.Feishu架构设计

 

二、抽象层设计(Mud.Feishu.Abstractions)

事件处理策略模式

Mud.Feishu采用策略模式实现灵活的事件处理机制,核心接口设计简洁而强大:

/// <summary>
/// 飞书事件处理器接口
/// </summary>
public interface IFeishuEventHandler
{
    /// <summary>
    /// 支持的事件类型
    /// </summary>
    string SupportedEventType { get; }
    /// <summary>
    /// 处理事件
    /// </summary>
    Task HandleAsync(EventData eventData, CancellationToken cancellationToken = default);
}

策略模式核心优势:

  • 🎯 单一职责:每个处理器专注特定事件类型
  • 🔧 开闭原则:对扩展开放,对修改封闭
  • 🧪 可测试性:独立测试,依赖清晰
  • ⚡ 运行时多态:动态选择处理策略

实际应用示例:

// 用户创建事件处理器
public class UserCreatedEventHandler : IFeishuEventHandler
{
    public string SupportedEventType => "contact.user.created_v3";
    
    public async Task HandleAsync(EventData eventData, CancellationToken cancellationToken = default)
    {
        var user = JsonSerializer.Deserialize<UserData>(eventData.EventJson);
        await _userService.CreateUserAsync(user, cancellationToken);
        _logger.LogInformation("用户创建事件处理完成: {UserId}", user.UserId);
    }
}
// 消息接收事件处理器
public class MessageReceiveEventHandler : IFeishuEventHandler
{
    public string SupportedEventType => "im.message.receive_v1";
    
    public async Task HandleAsync(EventData eventData, CancellationToken cancellationToken = default)
    {
        var message = JsonSerializer.Deserialize<MessageData>(eventData.EventJson);
        await _messageService.ProcessMessageAsync(message, cancellationToken);
    }
}

事件类型与数据模型

EventData统一事件模型

/// <summary>
/// 飞书事件数据模型 - 统一的事件载体
/// </summary>
public class EventData
{
    public string EventId { get; set; } = string.Empty;
    public string EventType { get; set; } = string.Empty;
    public DateTime EventTime { get; set; } = DateTime.UtcNow;
    public object? Event { get; set; }
    public string EventJson { get; set; } = string.Empty;
}

主要事件类型

类别事件类型说明
用户管理contact.user.created_v3用户创建

contact.user.updated_v3用户更新

contact.user.deleted_v3用户删除
消息事件im.message.receive_v1接收消息

im.message.message_read_v1消息已读
部门管理contact.department.created_v3部门创建

contact.department.updated_v3部门更新
审批流程approval.approval.approved_v1审批通过

approval.approval.rejected_v1审批拒绝
.........

强类型数据模型示例

// 用户创建事件数据
public class UserCreatedEvent
{
    [JsonPropertyName("user_id")]
    public string UserId { get; set; }
    
    [JsonPropertyName("name")]
    public string Name { get; set; }
    
    [JsonPropertyName("email")]
    public string Email { get; set; }
}
// 消息接收事件数据
public class MessageReceiveEvent
{
    [JsonPropertyName("message_id")]
    public string MessageId { get; set; }
    
    [JsonPropertyName("chat_id")]
    public string ChatId { get; set; }
    
    [JsonPropertyName("content")]
    public string Content { get; set; }
}

工厂模式应用

事件处理器工厂

/// <summary>
/// 事件处理器工厂接口
/// </summary>
public interface IFeishuEventHandlerFactory
{
    IFeishuEventHandler? GetHandler(string eventType);
    void RegisterHandler(IFeishuEventHandler handler);
    IReadOnlyList<string> GetRegisteredEventTypes();
}

使用示例:

// 注册事件处理器
var factory = serviceProvider.GetService<IFeishuEventHandlerFactory>();
factory.RegisterHandler(new UserCreatedEventHandler());
factory.RegisterHandler(new MessageReceiveEventHandler());
// 获取处理器处理事件
var handler = factory.GetHandler("contact.user.created_v3");
if (handler != null)
{
    await handler.HandleAsync(eventData);
}

三、核心实现层(Mud.Feishu.WebSocket)

组件化架构设计

Mud.Feishu.WebSocket采用严格的组件化设计,包含四个核心组件:

3.1 连接管理器

核心职责

  • 连接生命周期管理:建立、维护、恢复WebSocket连接
  • 线程安全:使用SemaphoreSlim确保并发安全
  • 超时控制:多级取消令牌支持精确超时控制
  • 自动重连:智能重连策略,支持指数退避算法

连接状态管理

 

关键实现特点

  • 事件驱动通知机制(Connected/Disconnected/Error事件)
  • 线程安全的连接操作
  • 灵活的重连策略配置

3.2 认证管理器

认证流程设计

 

核心功能

  • 令牌验证:应用访问令牌有效性检查
  • 消息构建:标准化认证消息格式
  • 状态管理:认证状态实时跟踪
  • 自动重试:认证失败智能重试机制

认证消息格式

{
    "timestamp": 1703920800,
    "data": {
        "app_access_token": "your_app_access_token_here"
    }
}

认证机制架构设计

认证管理器采用命令模式和回调机制,将认证逻辑与网络通信解耦,确保认证过程的可测试性和可扩展性。

/// <summary>
/// 认证管理器 - 处理WebSocket认证相关逻辑
/// </summary>
public class AuthenticationManager
{
    private readonly ILogger<AuthenticationManager> _logger;
    private readonly Func<string, Task> _sendMessageCallback;
    private bool _isAuthenticated = false;
    private readonly FeishuWebSocketOptions _options;
    
    public event EventHandler<EventArgs>? Authenticated;
    public event EventHandler<WebSocketErrorEventArgs>? AuthenticationFailed;
    
    public bool IsAuthenticated => _isAuthenticated;
    
    public AuthenticationManager(
        ILogger<AuthenticationManager> logger,
        FeishuWebSocketOptions options,
        Func<string, Task> sendMessageCallback)
    {
        _logger = logger ?? throw new ArgumentNullException(nameof(logger));
        _sendMessageCallback = sendMessageCallback ?? throw new ArgumentNullException(nameof(sendMessageCallback));
        _options = options;
    }
    
    /// <summary>
    /// 发送认证消息
    /// </summary>
    public async Task AuthenticateAsync(string appAccessToken, CancellationToken cancellationToken = default)
    {
        if (string.IsNullOrEmpty(appAccessToken))
            throw new ArgumentException("应用访问令牌不能为空", nameof(appAccessToken));
        
        try
        {
            _logger.LogInformation("正在进行WebSocket认证...");
            _isAuthenticated = false; // 重置认证状态
            
            // 创建认证消息
            var authMessage = new AuthMessage
            {
                Timestamp = DateTimeOffset.UtcNow.ToUnixTimeSeconds(),
                Data = new AuthData
                {
                    AppAccessToken = appAccessToken
                }
            };
            
            var jsonOptions = new JsonSerializerOptions
            {
                DefaultIgnoreCondition = JsonIgnoreCondition.WhenWritingNull,
                WriteIndented = false
            };
            
            var authJson = JsonSerializer.Serialize(authMessage, jsonOptions);
            await _sendMessageCallback(authJson);
            
            _logger.LogInformation("已发送认证消息,等待响应...");
        }
        catch (Exception ex)
        {
            _isAuthenticated = false;
            _logger.LogError(ex, "WebSocket认证失败");
            
            var errorArgs = new WebSocketErrorEventArgs
            {
                Exception = ex,
                ErrorMessage = $"WebSocket认证失败: {ex.Message}",
                ErrorType = ex.GetType().Name,
                IsAuthError = true
            };
            
            AuthenticationFailed?.Invoke(this, errorArgs);
            throw;
        }
    }
    
    /// <summary>
    /// 处理认证响应
    /// </summary>
    public void HandleAuthResponse(string responseMessage)
    {
        try
        {
            var authResponse = JsonSerializer.Deserialize<AuthResponseMessage>(responseMessage);
            
            if (authResponse?.Code == 0)
            {
                _isAuthenticated = true;
                _logger.LogInformation("WebSocket认证成功: {Message}", authResponse.Message);
                Authenticated?.Invoke(this, EventArgs.Empty);
            }
            else
            {
                _isAuthenticated = false;
                _logger.LogError("WebSocket认证失败: {Code} - {Message}", 
                    authResponse?.Code, authResponse?.Message);
                
                var errorArgs = new WebSocketErrorEventArgs
                {
                    ErrorMessage = $"WebSocket认证失败: {authResponse?.Code} - {authResponse?.Message}",
                    IsAuthError = true
                };
                
                AuthenticationFailed?.Invoke(this, errorArgs);
            }
        }
        catch (JsonException ex)
        {
            _isAuthenticated = false;
            _logger.LogError(ex, "解析认证响应失败: {Message}", responseMessage);
            
            var errorArgs = new WebSocketErrorEventArgs
            {
                Exception = ex,
                ErrorMessage = $"解析认证响应失败: {ex.Message}",
                ErrorType = ex.GetType().Name,
                IsAuthError = true
            };
            
            AuthenticationFailed?.Invoke(this, errorArgs);
        }
        catch (Exception ex)
        {
            _isAuthenticated = false;
            _logger.LogError(ex, "处理认证响应时发生错误");
            
            var errorArgs = new WebSocketErrorEventArgs
            {
                Exception = ex,
                ErrorMessage = $"处理认证响应时发生错误: {ex.Message}",
                ErrorType = ex.GetType().Name,
                IsAuthError = true
            };
            
            AuthenticationFailed?.Invoke(this, errorArgs);
        }
    }
    
    /// <summary>
    /// 重置认证状态
    /// </summary>
    public void ResetAuthentication()
    {
        _isAuthenticated = false;
        _logger.LogDebug("已重置认证状态");
    }
}

认证流程详细机制

认证管理器的实现遵循安全性和可靠性原则,确保认证过程的安全和稳定:

  1. 参数验证机制:在认证开始前进行严格的参数验证,包括:

    • AppAccessToken的非空检查
    • 令牌格式验证
    • 权限范围确认
  2. 消息构建策略:采用标准化的认证消息格式,包含:

    • 时间戳(防重放攻击)
    • 应用访问令牌
    • 协议版本信息
  3. 状态管理:通过内部状态标志确保认证状态的一致性:

    • _isAuthenticated标志控制认证状态
    • 状态变化时触发相应事件
    • 支持状态查询和重置
  4. 异常处理机制:完善的错误处理和恢复策略:

    • 网络异常处理
    • 认证失败处理
    • 超时和取消支持

认证交互时序图

核心认证消息格式

飞书WebSocket认证使用标准化的JSON消息格式:

{
    "timestamp": 1703920800,
    "data": {
        "app_access_token": "your_app_access_token_here"
    }
}

响应消息格式

{
    "code": 0,
    "msg": "success",
    "data": {
        "session_id": "websocket_session_id",
        "expires_in": 3600
    }
}

认证安全考虑

  1. 令牌安全

    • AppAccessToken不应在代码中硬编码
    • 支持令牌自动刷新机制
    • 令牌存储和传输加密
  2. 防重放攻击

    • 使用时间戳验证消息新鲜度
    • 支持消息签名机制
    • 会话唯一性标识
  3. 错误处理

    • 敏感信息不记录到日志
    • 认证失败不暴露具体错误
    • 支持优雅降级处理

3.3 消息路由器(MessageRouter)

消息路由器是飞书WebSocket框架的消息分发核心,负责识别消息类型、版本信息,并将消息精确路由到对应的处理器。它采用策略模式和责任链模式的组合,确保消息处理的高效性和可扩展性。

消息路由架构

 

消息类型识别

消息路由器首先需要对接收到的消息进行类型和版本识别,以确定合适的处理策略。飞书WebSocket支持多种消息格式,包括v1.0和v2.0协议版本。

/// <summary>
/// 消息处理器接口
/// </summary>
public interface IMessageHandler
{
    /// <summary>
    /// 是否可以处理指定类型的消息
    /// </summary>
    /// <param name="messageType">消息类型</param>
    /// <returns>是否可以处理</returns>
    bool CanHandle(string messageType);
    /// <summary>
    /// 处理消息
    /// </summary>
    /// <param name="message">消息内容</param>
    /// <param name="cancellationToken">取消令牌</param>
    /// <returns>处理任务</returns>
    Task HandleAsync(string message, CancellationToken cancellationToken = default);
}
/// <summary>
/// 消息路由器 - 负责将消息分发给合适的处理器
/// </summary>
public class MessageRouter
{
    private readonly ILogger<MessageRouter> _logger;
    private readonly List<IMessageHandler> _handlers;
    private readonly FeishuWebSocketOptions _options;
    public MessageRouter(ILogger<MessageRouter> logger, FeishuWebSocketOptions options)
    {
        _logger = logger ?? throw new ArgumentNullException(nameof(logger));
        _handlers = new List<IMessageHandler>();
        _options = options;
    }
    /// <summary>
    /// 注册消息处理器
    /// </summary>
    public void RegisterHandler(IMessageHandler handler)
    {
        if (handler == null)
            throw new ArgumentNullException(nameof(handler));
        _handlers.Add(handler);
        _logger.LogDebug("已注册消息处理器: {HandlerType}", handler.GetType().Name);
    }
    /// <summary>
    /// 移除消息处理器
    /// </summary>
    public bool UnregisterHandler(IMessageHandler handler)
    {
        var removed = _handlers.Remove(handler);
        if (removed)
        {
            if (_options.EnableLogging)
            {
                _logger.LogDebug("已移除消息处理器: {HandlerType}", handler.GetType().Name);
            }
        }
        return removed;
    }
    /// <summary>
    /// 路由消息到合适的处理器
    /// </summary>
    public async Task RouteMessageAsync(string message, CancellationToken cancellationToken = default)
    {
        if (string.IsNullOrWhiteSpace(message))
        {
            if (_options.EnableLogging)
            {
                _logger.LogWarning("收到空消息,跳过路由");
            }
            return;
        }
        await RouteMessageInternalAsync(message, "Text", cancellationToken);
    }
    /// <summary>
    /// 路由从二进制消息转换而来的JSON消息到合适的处理器
    /// </summary>
    public async Task RouteBinaryMessageAsync(string jsonContent, string messageType, CancellationToken cancellationToken = default)
    {
        if (string.IsNullOrWhiteSpace(jsonContent))
        {
            if (_options.EnableLogging)
            {
                _logger.LogWarning("收到空的二进制转换消息,跳过路由");
            }
            return;
        }
        await RouteMessageInternalAsync(jsonContent, $"Binary_{messageType}", cancellationToken);
    }
    /// <summary>
    /// 提取消息类型
    /// </summary>
    private string ExtractMessageType(string message)
    {
        try
        {
            using var jsonDoc = System.Text.Json.JsonDocument.Parse(message);
            var root = jsonDoc.RootElement;
            // 检查是否为v2.0版本
            if (root.TryGetProperty("schema", out var schemaElement) &&
                schemaElement.GetString() == "2.0")
            {
                if (root.TryGetProperty("header", out var headerElement) &&
                    headerElement.TryGetProperty("event_type", out var eventTypeElement))
                {
                    return "event"; // v2.0主要是事件消息
                }
            }
            // v1.0版本处理
            if (root.TryGetProperty("type", out var typeElement))
            {
                return typeElement.GetString()?.ToLowerInvariant() ?? string.Empty;
            }
            return string.Empty;
        }
        catch (System.Text.Json.JsonException ex)
        {
            _logger.LogError(ex, "解析消息JSON失败: {Message}", message);
            return string.Empty;
        }
    }
    /// <summary>
    /// 内部消息路由处理
    /// </summary>
    private async Task RouteMessageInternalAsync(string message, string sourceType, CancellationToken cancellationToken)
    {
        try
        {
            // 提取消息类型
            var messageType = ExtractMessageType(message);
            if (string.IsNullOrEmpty(messageType))
            {
                _logger.LogWarning("无法提取消息类型 (来源: {SourceType}): {Message}", sourceType, message);
                return;
            }
            // 查找能处理该消息类型的处理器
            var handler = _handlers.FirstOrDefault(h => h.CanHandle(messageType));
            if (handler == null)
            {
                _logger.LogWarning("未找到能处理消息类型 {MessageType} 的处理器 (来源: {SourceType})", messageType, sourceType);
                return;
            }
            _logger.LogDebug("将消息路由到处理器: {HandlerType} (来源: {SourceType}, 消息类型: {MessageType})",
                    handler.GetType().Name, sourceType, messageType);
            await handler.HandleAsync(message, cancellationToken);
        }
        catch (Exception ex)
        {
            _logger.LogError(ex, "路由消息时发生错误 (来源: {SourceType}): {Message}", sourceType, message);
        }
    }
}

消息路由流程详解

消息路由器的核心职责是根据消息特征进行智能分发,其工作流程包括以下几个关键步骤:

  1. 消息预验证:检查消息的基本格式和有效性
  2. 版本识别:通过schema字段或type字段确定消息版本
  3. 类型匹配:根据消息内容找到对应的处理器
  4. 异步分发:将消息异步发送给匹配的处理器
  5. 结果聚合:收集处理结果并进行后续处理

路由策略设计

消息路由器采用多维度匹配策略,确保消息能够精确路由:

  1. 基于版本的匹配

    • v1.0协议:通过type字段识别消息类型
    • v2.0协议:通过schema字段确认版本
    • 向后兼容:支持旧版本消息格式
  2. 基于类型的匹配

    • 事件消息:event类型,如消息接收、状态变更
    • 响应消息:response类型,如认证响应、操作结果
    • 通知消息:notification类型,如系统通知
  3. 基于优先级的匹配

    • 高优先级处理器:认证、心跳等关键消息
    • 中优先级处理器:业务事件、用户消息
    • 低优先级处理器:统计数据、日志信息

处理器注册机制

消息路由器支持动态注册和注销消息处理器:

// 注册处理器
router.RegisterHandler(new AuthMessageHandler());
router.RegisterHandler(new EventMessageHandler());
router.RegisterHandler(new BinaryMessageHandler());
// 处理器接口定义
public interface IMessageHandler
{
    bool CanHandle(string messageType, MessageVersionInfo version);
    Task<HandlerResult> HandleAsync(string message, CancellationToken cancellationToken);
    int Priority { get; }
}

性能优化策略

  1. 并行处理:支持多个消息并行处理,提高吞吐量
  2. 缓存机制:缓存处理器匹配结果,减少重复计算
  3. 批处理:支持批量消息处理,减少网络开销
  4. 连接池:复用连接资源,提高连接效率

错误处理机制

  1. 容错设计:单个处理器异常不影响其他处理器
  2. 降级策略:无匹配处理器时使用默认处理逻辑
  3. 重试机制:处理失败时支持自动重试
  4. 监控告警:记录处理异常并触发告警机制

处理器分发机制

/// <summary>
/// 路由策略实现
/// </summary>
public class RoutingStrategy
{
    private readonly ILogger<MessageRouter> _logger;
    private readonly FeishuWebSocketOptions _options;
    
    public async Task RouteMessageAsync(string message, string sourceType, 
        List<IMessageHandler> handlers, CancellationToken cancellationToken)
    {
        // 提取消息类型
        var messageType = ExtractMessageType(message);
        if (string.IsNullOrEmpty(messageType))
        {
            if (_options.EnableLogging)
                _logger.LogWarning("无法提取消息类型 (来源: {SourceType}): {Message}", sourceType, message);
            return;
        }
        
        // 查找能处理该消息类型的处理器
        var handler = handlers.FirstOrDefault(h => h.CanHandle(messageType));
        if (handler == null)
        {
            if (_options.EnableLogging)
                _logger.LogWarning("未找到能处理消息类型 {MessageType} 的处理器 (来源: {SourceType})", 
                    messageType, sourceType);
            return;
        }
        
        if (_options.EnableLogging)
            _logger.LogDebug("将消息路由到处理器: {HandlerType} (来源: {SourceType}, 消息类型: {MessageType})",
                handler.GetType().Name, sourceType, messageType);
        
        await handler.HandleAsync(message, cancellationToken);
    }
    
    /// <summary>
    /// 提取消息类型
    /// </summary>
    private string ExtractMessageType(string message)
    {
        try
        {
            using var jsonDoc = JsonDocument.Parse(message);
            var root = jsonDoc.RootElement;
            
            // 检查是否为v2.0版本
            if (root.TryGetProperty("schema", out var schemaElement) &&
                schemaElement.GetString() == "2.0")
            {
                if (root.TryGetProperty("header", out var headerElement) &&
                    headerElement.TryGetProperty("event_type", out var eventTypeElement))
                {
                    return "event"; // v2.0主要是事件消息
                }
            }
            
            // v1.0版本处理
            if (root.TryGetProperty("type", out var typeElement))
            {
                return typeElement.GetString()?.ToLowerInvariant() ?? string.Empty;
            }
            
            return string.Empty;
        }
        catch (JsonException ex)
        {
            _logger.LogError(ex, "解析消息JSON失败");
            return string.Empty;
        }
    }
}

3.4 二进制消息处理器(BinaryMessageProcessor)

二进制消息处理器是飞书WebSocket框架中专门处理二进制数据流的核心组件,负责增量接收、数据组装、格式解析和消息分发。它采用流式处理架构,支持大消息的分片接收和解析。

二进制消息处理架构

 

增量数据接收

/// <summary>
/// 二进制消息处理器 - 负责处理二进制数据的增量接收和解析
/// </summary>
public class BinaryMessageProcessor : IDisposable
{
    private readonly ILogger<BinaryMessageProcessor> _logger;
    private readonly FeishuWebSocketOptions _options;
    private MemoryStream? _binaryDataStream;
    private readonly object _binaryDataStreamLock = new object();
    private DateTime _binaryDataReceiveStartTime = DateTime.MinValue;
    private bool _disposed = false;
    private readonly MessageRouter? _messageRouter;
    private readonly WebSocketConnectionManager? _connectionManager;
    public event EventHandler<WebSocketBinaryMessageEventArgs>? BinaryMessageReceived;
    public event EventHandler<WebSocketErrorEventArgs>? Error;
    /// <summary>
    /// 处理二进制数据
    /// </summary>
    public async Task ProcessBinaryDataAsync(byte[] data, int offset, int count, 
        bool endOfMessage, CancellationToken cancellationToken = default)
    {
        try
        {
            lock (_binaryDataStreamLock)
            {
                // 如果是新消息的开始,初始化内存流
                if (_binaryDataStream == null)
                {
                    _binaryDataStream = new MemoryStream();
                    _binaryDataReceiveStartTime = DateTime.UtcNow;
                    
                    if (_options.EnableLogging)
                        _logger.LogDebug("开始接收新的二进制消息");
                }
                
                // 写入数据片段
                _binaryDataStream.Write(data, offset, count);
                
                // 检查数据大小限制
                if (_binaryDataStream.Length > _options.MaxBinaryMessageSize)
                {
                    var errorMessage = $"二进制消息大小超过限制 ({_binaryDataStream.Length} > {_options.MaxBinaryMessageSize})";
                    _logger.LogError(errorMessage);
                    
                    // 清理当前数据流
                    _binaryDataStream.Dispose();
                    _binaryDataStream = null;
                    
                    // 触发错误事件
                    OnError(errorMessage, "MessageSizeExceeded");
                    return;
                }
                
                // 如果消息接收完成
                if (endOfMessage)
                {
                    var completeData = _binaryDataStream.ToArray();
                    var receiveDuration = DateTime.UtcNow - _binaryDataReceiveStartTime;
                    
                    if (_options.EnableLogging)
                        _logger.LogInformation("二进制消息接收完成,大小: {Size} 字节,耗时: {Duration}ms",
                            completeData.Length, receiveDuration.TotalMilliseconds);
                    
                    // 异步处理完整的二进制消息
                    _ = Task.Run(async () =>
                    {
                        await ProcessCompleteBinaryMessageAsync(completeData, cancellationToken);
                    }, cancellationToken);
                    
                    // 清理资源
                    _binaryDataStream.Dispose();
                    _binaryDataStream = null;
                }
                else
                {
                    if (_options.EnableLogging)
                        _logger.LogDebug("已接收二进制消息片段,当前总大小: {Size} 字节", 
                            _binaryDataStream.Length);
                }
            }
        }
        catch (Exception ex)
        {
            // 发生异常时清理资源
            lock (_binaryDataStreamLock)
            {
                _binaryDataStream?.Dispose();
                _binaryDataStream = null;
            }
            
            if (_options.EnableLogging)
                _logger.LogError(ex, "处理二进制消息时发生错误");
            OnError($"处理二进制消息时发生错误: {ex.Message}", ex.GetType().Name);
        }
    }
}

核心特性

  • 📦 流式处理:支持大消息分片接收
  • 🔄 双格式支持:ProtoBuf优先,JSON回退
  • 📊 大小限制:可配置的消息大小限制
  • 🎯 自动路由:解析后自动路由到消息处理器

处理流程

  1. 增量接收:分片接收二进制数据,写入内存流
  2. 大小检查:实时监控数据大小,防止内存溢出
  3. 格式解析:ProtoBuf反序列化,失败则回退到JSON
  4. 消息路由:提取JSON Payload,路由到对应处理器
  5. ACK确认:向服务器发送处理确认

主客户端集成(FeishuWebSocketClient)

FeishuWebSocketClient是整个框架的门面组件,负责协调和编排各个子组件的工作。它采用组件化设计模式,将复杂的WebSocket连接管理功能分解为独立的、可复用的组件,并通过统一的事件系统进行集成。

客户端架构集成

组件协调与编排

/// <summary>
/// 飞书WebSocket客户端 - 采用组件化设计提高可维护性
/// </summary>
public sealed class FeishuWebSocketClient : IFeishuWebSocketClient, IDisposable
{
    private readonly ILogger<FeishuWebSocketClient> _logger;
    private readonly FeishuWebSocketOptions _options;
    private readonly IFeishuEventHandlerFactory _eventHandlerFactory;
    private readonly WebSocketConnectionManager _connectionManager;
    private readonly AuthenticationManager _authManager;
    private readonly MessageRouter _messageRouter;
    private readonly BinaryMessageProcessor _binaryProcessor;
    private readonly ConcurrentQueue<string> _messageQueue = new();
    private readonly List<Func<string, Task>> _messageProcessors = new();
    private Task? _messageProcessingTask;
    private ILoggerFactory _loggerFactory;
    private bool _disposed = false;
    private CancellationTokenSource? _cancellationTokenSource;
    /// <inheritdoc/>
    public WebSocketState State => _connectionManager.State;
    /// <inheritdoc/>
    public bool IsAuthenticated => _authManager.IsAuthenticated;
    /// <inheritdoc/>
    public event EventHandler<EventArgs>? Connected;
    /// <inheritdoc/>
    public event EventHandler<WebSocketCloseEventArgs>? Disconnected;
    /// <inheritdoc/>
    public event EventHandler<WebSocketMessageEventArgs>? MessageReceived;
    /// <inheritdoc/>
    public event EventHandler<WebSocketErrorEventArgs>? Error;
    /// <inheritdoc/>
    public event EventHandler<EventArgs>? Authenticated;
    /// <inheritdoc/>
    public event EventHandler<WebSocketPingEventArgs>? PingReceived;
    /// <inheritdoc/>
    public event EventHandler<WebSocketPongEventArgs>? PongReceived;
    /// <inheritdoc/>
    public event EventHandler<WebSocketHeartbeatEventArgs>? HeartbeatReceived;
    /// <inheritdoc/>
    public event EventHandler<WebSocketFeishuEventArgs>? FeishuEventReceived;
    /// <inheritdoc/>
    public event EventHandler<WebSocketBinaryMessageEventArgs>? BinaryMessageReceived;
    /// <summary>
    /// 默认构造函数
    /// </summary>
    public FeishuWebSocketClient(
        ILogger<FeishuWebSocketClient> logger,
        IFeishuEventHandlerFactory eventHandlerFactory,
        ILoggerFactory loggerFactory,
        FeishuWebSocketOptions? options = null)
    {
        _logger = logger ?? throw new ArgumentNullException(nameof(logger));
        _eventHandlerFactory = eventHandlerFactory ?? throw new ArgumentNullException(nameof(eventHandlerFactory));
        _options = options ?? new FeishuWebSocketOptions();
        _loggerFactory = loggerFactory;
        
        // 初始化组件
        _connectionManager = new WebSocketConnectionManager(_loggerFactory.CreateLogger<WebSocketConnectionManager>(), _options);
        _authManager = new AuthenticationManager(_loggerFactory.CreateLogger<AuthenticationManager>(), _options, (message) => SendMessageAsync(message));
        _messageRouter = new MessageRouter(_loggerFactory.CreateLogger<MessageRouter>(), _options);
        _binaryProcessor = new BinaryMessageProcessor(_loggerFactory.CreateLogger<BinaryMessageProcessor>(), _connectionManager, _options, _messageRouter);
        // 订阅组件事件
        SubscribeToComponentEvents();
        // 注册消息处理器
        RegisterMessageHandlers();
    }
    /// <summary>
    /// 订阅组件事件
    /// </summary>
    private void SubscribeToComponentEvents()
    {
        // 连接管理器事件
        _connectionManager.Connected += (s, e) => Connected?.Invoke(this, e);
        _connectionManager.Disconnected += (s, e) => Disconnected?.Invoke(this, e);
        _connectionManager.Error += (s, e) => Error?.Invoke(this, e);
        // 认证管理器事件
        _authManager.Authenticated += (s, e) => Authenticated?.Invoke(this, e);
        _authManager.AuthenticationFailed += (s, e) => Error?.Invoke(this, e);
        // 二进制处理器事件
        _binaryProcessor.BinaryMessageReceived += (s, e) => BinaryMessageReceived?.Invoke(this, e);
        _binaryProcessor.Error += (s, e) => Error?.Invoke(this, e);
    }
    /// <summary>
    /// 注册消息处理器
    /// </summary>
    private void RegisterMessageHandlers()
    {
        var pingPongHandler = new PingPongMessageHandler(
            _loggerFactory.CreateLogger<PingPongMessageHandler>(),
            _options,
            (message) => SendMessageAsync(message));
        var authHandler = new AuthMessageHandler(
            _loggerFactory.CreateLogger<AuthMessageHandler>(),
            (success) =>
            {
                if (success)
                {
                    _authManager.HandleAuthResponse("{\"code\":0,\"msg\":\"Authentication successful\"}");
                }
                else
                {
                    _authManager.HandleAuthResponse("{\"code\":-1,\"msg\":\"Authentication failed\"}");
                }
            });
        var heartbeatHandler = new HeartbeatMessageHandler(_loggerFactory.CreateLogger<HeartbeatMessageHandler>(), _options);
        _messageRouter.RegisterHandler(pingPongHandler);
        _messageRouter.RegisterHandler(authHandler);
        _messageRouter.RegisterHandler(heartbeatHandler);
    }
    /// <summary>
    /// 建立WebSocket连接
    /// </summary>
    public async Task ConnectAsync(WsEndpointResult endpoint, CancellationToken cancellationToken = default)
    {
        if (endpoint == null)
            throw new ArgumentNullException(nameof(endpoint));
        await _connectionManager.ConnectAsync(endpoint.Url, cancellationToken);
        // 启动消息接收
        _cancellationTokenSource = new CancellationTokenSource();
        _ = Task.Run(() => StartReceivingAsyncInternal(_cancellationTokenSource.Token), _cancellationTokenSource.Token);
        // 启动心跳
        _ = Task.Run(() => StartHeartbeatAsync(_cancellationTokenSource.Token), _cancellationTokenSource.Token);
        // 启动消息队列处理
        if (_options.EnableMessageQueue)
        {
            _messageProcessingTask = ProcessMessageQueueAsync(_cancellationTokenSource.Token);
        }
    }
    /// <summary>
    /// 建立WebSocket连接并进行认证
    /// </summary>
    public async Task ConnectAsync(WsEndpointResult endpoint, string appAccessToken, CancellationToken cancellationToken = default)
    {
        await ConnectAsync(endpoint, cancellationToken);
        await _authManager.AuthenticateAsync(appAccessToken, cancellationToken);
    }
    /// <summary>
    /// 断开WebSocket连接
    /// </summary>
    public async Task DisconnectAsync(CancellationToken cancellationToken = default)
    {
        _cancellationTokenSource?.Cancel();
        await _connectionManager.DisconnectAsync(cancellationToken);
    }
    /// <summary>
    /// 发送消息
    /// </summary>
    public async Task SendMessageAsync(string message, CancellationToken cancellationToken = default)
    {
        await _connectionManager.SendMessageAsync(message, cancellationToken);
    }
    /// <summary>
    /// 注册消息处理器
    /// </summary>
    public void RegisterMessageProcessor(Func<string, Task> processor)
    {
        if (processor == null)
            throw new ArgumentNullException(nameof(processor));
        _messageProcessors.Add(processor);
    }
}

客户端生命周期管理

组件协作时序图

核心设计模式

  1. 门面模式(Facade Pattern)

    • FeishuWebSocketClient作为统一入口
    • 隐藏内部组件复杂性
    • 提供简洁的API接口
  2. 观察者模式(Observer Pattern)

    • 事件驱动的组件通信
    • 松耦合的组件协作
    • 支持多订阅者监听
  3. 策略模式(Strategy Pattern)

    • 可插拔的消息处理器
    • 灵活的路由策略
    • 动态处理器注册
  4. 工厂模式(Factory Pattern)

    • EventHandlerFactory创建处理器
    • 统一的组件初始化
    • 依赖注入支持

四、应用示例

4.1 事件处理器实现

用户创建事件处理器示例

/// <summary>
/// 演示用户事件处理器 - 继承预定义的用户创建事件处理器
/// </summary>
public class DemoUserEventHandler : DefaultFeishuEventHandler<UserCreatedResult>
{
    private readonly DemoEventService _eventService;
    private readonly INotificationService _notificationService;
    
    public DemoUserEventHandler(
        ILogger<DemoUserEventHandler> logger,
        INotificationService notificationService) : base(logger)
    {
        _eventService = eventService ?? throw new ArgumentNullException(nameof(eventService));
        _notificationService = notificationService ?? throw new ArgumentNullException(nameof(notificationService));
    }
    
    protected override async Task ProcessBusinessLogicAsync(
        EventData eventData, 
        UserCreatedResult? userCreated, 
        CancellationToken cancellationToken = default)
    {
        if (eventData == null)
            throw new ArgumentNullException(nameof(eventData));
        _logger.LogInformation("👤 [用户事件] 开始处理用户创建事件: {EventId}", eventData.EventId);
        try
        {
            // 1. 数据验证和转换
            var user = ValidateAndTransformUser(userCreated?.Object);
            
            // 2. 保存到数据库
            await _eventService.RecordUserEventAsync(user, cancellationToken);
            
            // 3. 业务逻辑处理
            await ProcessUserEventAsync(user, cancellationToken);
            
            // 4. 更新统计信息
            _eventService.IncrementUserCount();
            _logger.LogInformation("✅ [用户事件] 用户创建事件处理完成: 用户ID {UserId}, 用户名 {UserName}",
                user.UserId, user.UserName);
        }
        catch (Exception ex)
        {
            _logger.LogError(ex, "❌ [用户事件] 处理用户创建事件失败: {EventId}", eventData.EventId);
            throw;
        }
    }
    
    private UserData ValidateAndTransformUser(UserCreatedResult? userCreated)
    {
        if (userCreated == null)
            throw new ArgumentException("用户创建数据不能为空");
        
        if (string.IsNullOrWhiteSpace(userCreated.UserId))
            throw new ArgumentException("用户ID不能为空");
        
        if (string.IsNullOrWhiteSpace(userCreated.Name))
            throw new ArgumentException("用户名不能为空");
        
        return new UserData
        {
            UserId = userCreated.UserId,
            UserName = userCreated.Name,
            Email = userCreated.Email ?? string.Empty,
            Department = userCreated.Department ?? string.Empty,
            EmployeeType = userCreated.EmployeeType ?? string.Empty,
            CreatedAt = userCreated.CreatedAt
        };
    }
    
    private async Task ProcessUserEventAsync(UserData user, CancellationToken cancellationToken)
    {
        // 模拟异步业务操作
        await Task.Delay(100, cancellationToken);
        
        // 验证必要字段
        if (string.IsNullOrWhiteSpace(user.UserId))
        {
            throw new ArgumentException("用户ID不能为空");
        }
        
        // 模拟发送欢迎通知
        if (_options.EnableLogging)
            _logger.LogInformation("📧 [用户事件] 发送欢迎通知给用户: {UserName} ({Email})",
                user.UserName, user.Email);
        
        await _notificationService.SendWelcomeEmailAsync(user, cancellationToken);
        
        // 模拟用户配置文件创建
        if (_options.EnableLogging)
            _logger.LogInformation("⚙️ [用户事件] 创建用户配置文件: {UserId}", user.UserId);
        
        await _eventService.CreateUserProfileAsync(user, cancellationToken);
        
        // 模拟权限初始化
        if (_options.EnableLogging)
            _logger.LogInformation("🔐 [用户事件] 初始化用户权限: {UserId}", user.UserId);
        
        await _eventService.InitializeUserPermissionsAsync(user, cancellationToken);
        
        await Task.CompletedTask;
    }
}

4.2 继承预定义事件处理器

/// <summary>
/// 演示部门事件处理器 - 继承预定义的部门创建事件处理器
/// </summary>
public class DemoDepartmentEventHandler : DepartmentCreatedEventHandler
{
    private readonly DemoEventService _eventService;
    private readonly INotificationService _notificationService;
    private readonly IPermissionService _permissionService;
    public DemoDepartmentEventHandler(
        ILogger<DemoDepartmentEventHandler> logger, 
        DemoEventService eventService,
        INotificationService notificationService,
        IPermissionService permissionService) : base(logger)
    {
        _eventService = eventService ?? throw new ArgumentNullException(nameof(eventService));
        _notificationService = notificationService ?? throw new ArgumentNullException(nameof(notificationService));
        _permissionService = permissionService ?? throw new ArgumentNullException(nameof(permissionService));
    }
    protected override async Task ProcessBusinessLogicAsync(
        EventData eventData, 
        ObjectEventResult<DepartmentCreatedResult>? departmentData, 
        CancellationToken cancellationToken = default)
    {
        if (eventData == null)
            throw new ArgumentNullException(nameof(eventData));
        _logger.LogInformation("[部门事件] 开始处理部门创建事件: {EventId}", eventData.EventId);
        try
        {
            // 1. 验证部门数据
            var department = ValidateDepartmentData(departmentData?.Object);
            
            // 2. 记录事件到服务
            await _eventService.RecordDepartmentEventAsync(department, cancellationToken);
            
            // 3. 处理部门业务逻辑
            await ProcessDepartmentEventAsync(department, cancellationToken);
            
            // 4. 初始化部门权限
            await InitializeDepartmentPermissionsAsync(department, cancellationToken);
            
            // 5. 更新统计信息
            _eventService.IncrementDepartmentCount();
            _logger.LogInformation("[部门事件] 部门创建事件处理完成: 部门ID {DepartmentId}, 部门名 {DepartmentName}",
                department.DepartmentId, department.Name);
        }
        catch (Exception ex)
        {
            _logger.LogError(ex, "[部门事件] 处理部门创建事件失败: {EventId}", eventData.EventId);
            throw;
        }
    }
    
    private DepartmentData ValidateDepartmentData(DepartmentCreatedResult? departmentResult)
    {
        if (departmentResult == null)
            throw new ArgumentException("部门创建数据不能为空");
        
        if (string.IsNullOrWhiteSpace(departmentResult.DepartmentId))
            throw new ArgumentException("部门ID不能为空");
        
        if (string.IsNullOrWhiteSpace(departmentResult.Name))
            throw new ArgumentException("部门名不能为空");
        
        return new DepartmentData
        {
            DepartmentId = departmentResult.DepartmentId,
            Name = departmentResult.Name,
            ParentDepartmentId = departmentResult.ParentDepartmentId,
            LeaderUserId = departmentResult.LeaderUserId,
            DepartmentLevel = departmentResult.DepartmentLevel,
            CreatedAt = DateTime.UtcNow
        };
    }
    
    private async Task ProcessDepartmentEventAsync(DepartmentData department, CancellationToken cancellationToken)
    {
        // 模拟异步业务操作
        await Task.Delay(100, cancellationToken);
        
        // 模拟设置部门配置
        if (_options.EnableLogging)
            _logger.LogInformation("[部门事件] 设置部门配置: {DepartmentName}", department.Name);
        
        await _eventService.ConfigureDepartmentSettingsAsync(department, cancellationToken);
        
        // 通知部门主管
        if (!string.IsNullOrWhiteSpace(department.LeaderUserId))
        {
            if (_options.EnableLogging)
                _logger.LogInformation("[部门事件] 通知部门主管: {LeaderUserId}", department.LeaderUserId);
            
            await _notificationService.NotifyDepartmentLeaderAsync(department, cancellationToken);
        }
        
        // 处理层级关系
        if (!string.IsNullOrWhiteSpace(department.ParentDepartmentId))
        {
            if (_options.EnableLogging)
                _logger.LogInformation("[部门事件] 建立层级关系: {DepartmentId} -> {ParentDepartmentId}",
                    department.DepartmentId, department.ParentDepartmentId);
            
            await _eventService.UpdateDepartmentHierarchyAsync(department, cancellationToken);
        }
        
        await Task.CompletedTask;
    }
    
    private async Task InitializeDepartmentPermissionsAsync(DepartmentData department, CancellationToken cancellationToken)
    {
        if (_options.EnableLogging)
            _logger.LogInformation("[部门事件] 初始化部门权限: {DepartmentName}", department.Name);
        
        // 创建部门默认权限
        var defaultPermissions = new[]
        {
            "department.view",
            "department.edit",
            "department.member.manage"
        };
        
        foreach (var permission in defaultPermissions)
        {
            await _permissionService.GrantPermissionAsync(
                department.DepartmentId, permission, cancellationToken);
        }
        
        // 为部门主管分配管理员权限
        if (!string.IsNullOrWhiteSpace(department.LeaderUserId))
        {
            await _permissionService.GrantPermissionAsync(
                department.DepartmentId, "department.admin", cancellationToken);
        }
    }
}

4.3 依赖注入配置

建造者模式应用

/// <summary>
/// 飞书WebSocket服务注册扩展
/// </summary>
public static class FeishuWebSocketServiceExtensions
{
    /// <summary>
    /// 使用建造者模式注册飞书WebSocket服务
    /// </summary>
    public static IFeishuWebSocketBuilder AddFeishuWebSocketBuilder(this IServiceCollection services)
    {
        return new FeishuWebSocketBuilder(services);
    }
}
/// <summary>
/// 飞书WebSocket服务建造者
/// </summary>
public class FeishuWebSocketBuilder
{
    private readonly IServiceCollection _services;
    private readonly List<Type> _handlerTypes = new();
    private FeishuWebSocketOptions _options = new();
    private bool _useMultiHandler = false;
    
    public FeishuWebSocketBuilder(IServiceCollection services)
    {
        _services = services ?? throw new ArgumentNullException(nameof(services));
    }
    
    /// <summary>
    /// 从配置文件读取配置
    /// </summary>
    public FeishuWebSocketBuilder ConfigureFrom(IConfiguration configuration)
    {
        configuration.GetSection("Feishu:WebSocket").Bind(_options);
        return this;
    }
    
    /// <summary>
    /// 配置选项
    /// </summary>
    public FeishuWebSocketBuilder ConfigureOptions(Action<FeishuWebSocketOptions> configure)
    {
        configure?.Invoke(_options);
        return this;
    }
    
    /// <summary>
    /// 启用多处理器模式
    /// </summary>
    public FeishuWebSocketBuilder UseMultiHandler()
    {
        _useMultiHandler = true;
        return this;
    }
    
    /// <summary>
    /// 添加事件处理器
    /// </summary>
    public FeishuWebSocketBuilder AddHandler<THandler>() where THandler : class, IFeishuEventHandler
    {
        _handlerTypes.Add(typeof(THandler));
        return this;
    }
    
    /// <summary>
    /// 构建服务注册
    /// </summary>
    public IServiceCollection Build()
    {
        // 注册配置选项
        _services.Configure(_options);
        
        if (_useMultiHandler)
        {
            // 多处理器模式
            _services.AddSingleton<IFeishuEventHandlerFactory, DefaultFeishuEventHandlerFactory>();
            
            // 注册所有处理器类型
            foreach (var handlerType in _handlerTypes)
            {
                _services.AddSingleton(typeof(IFeishuEventHandler), handlerType);
            }
            
            _services.AddSingleton<IFeishuWebSocketManager, FeishuWebSocketManager>();
        }
        else
        {
            // 单处理器模式
            var handlerType = _handlerTypes.FirstOrDefault();
            if (handlerType != null)
            {
                _services.AddSingleton(typeof(IFeishuEventHandler), handlerType);
                _services.AddSingleton<IFeishuWebSocketManager, FeishuWebSocketManager>();
            }
        }
        
        return _services;
    }
}

实际使用示例

// Program.cs 中的配置示例
var builder = WebApplication.CreateBuilder(args);
// 方式一:建造者模式注册(推荐)
builder.Services.AddFeishuWebSocketBuilder()
    .ConfigureFrom(builder.Configuration)           // 从配置文件读取
    .UseMultiHandler()                              // 启用多处理器模式
    .AddHandler<ReceiveMessageEventHandler>()        // 添加消息处理器
    .AddHandler<UserCreatedEventHandler>()           // 添加用户事件处理器
    .AddHandler<DepartmentCreatedEventHandler>()      // 添加部门事件处理器
    .Build();                                       // 构建服务注册
// 方式二:简化注册
builder.Services.AddFeishuWebSocketServiceWithSingleHandler<ReceiveMessageEventHandler>(
    options => {
        options.AutoReconnect = true;
        options.MaxReconnectAttempts = 5;
        options.HeartbeatIntervalMs = 30000;
        options.EnableLogging = true;
    });
// 方式三:从配置文件注册
builder.Services.AddFeishuWebSocketService(builder.Configuration);

4.4 Web API集成

RESTful API设计

/// <summary>
/// WebSocket管理控制器
/// </summary>
[ApiController]
[Route("api/[controller]")]
public class WebSocketController : ControllerBase
{
    private readonly IFeishuWebSocketManager _webSocketManager;
    private readonly ILogger<WebSocketController> _logger;
    public WebSocketController(IFeishuWebSocketManager webSocketManager,
        ILogger<WebSocketController> logger)
    {
        _webSocketManager = webSocketManager ?? throw new ArgumentNullException(nameof(webSocketManager));
        _logger = logger ?? throw new ArgumentNullException(nameof(logger));
    }
    
    /// <summary>
    /// 启动WebSocket连接
    /// </summary>
    [HttpPost("connect")]
    public async Task<IActionResult> ConnectAsync()
    {
        try
        {
            await _webSocketManager.StartAsync();
            
            return Ok(new 
            { 
                Success = true,
                Message = "WebSocket连接启动成功",
                Timestamp = DateTime.UtcNow
            });
        }
        catch (Exception ex)
        {
            _logger.LogError(ex, "启动WebSocket连接失败");
            return BadRequest(new 
            { 
                Success = false,
                Message = $"WebSocket连接启动失败: {ex.Message}",
                Timestamp = DateTime.UtcNow
            });
        }
    }
    
    /// <summary>
    /// 断开WebSocket连接
    /// </summary>
    [HttpPost("disconnect")]
    public async Task<IActionResult> DisconnectAsync()
    {
        try
        {
            await _webSocketManager.StopAsync();
            
            return Ok(new 
            { 
                Success = true,
                Message = "WebSocket连接断开成功",
                Timestamp = DateTime.UtcNow
            });
        }
        catch (Exception ex)
        {
            _logger.LogError(ex, "断开WebSocket连接失败");
            return BadRequest(new 
            { 
                Success = false,
                Message = $"WebSocket连接断开失败: {ex.Message}",
                Timestamp = DateTime.UtcNow
            });
        }
    }
    
    /// <summary>
    /// 获取连接状态
    /// </summary>
    [HttpGet("status")]
    public IActionResult GetStatus()
    {
        var stats = _webSocketManager.GetConnectionStats();
        var state = _webSocketManager.GetConnectionState();
        
        return Ok(new WebSocketStatusResponse
        {
            IsConnected = _webSocketManager.IsConnected,
            State = state.Status.ToString(),
            Uptime = stats.Uptime,
            ReconnectCount = stats.ReconnectCount,
            LastError = stats.LastError?.Message,
            Timestamp = DateTime.UtcNow
        });
    }
}

实时状态监控

/// <summary>
/// WebSocket状态监控服务
/// </summary>
public class WebSocketMonitoringService : IHostedService
{
    private readonly IFeishuWebSocketManager _webSocketManager;
    private readonly ILogger<WebSocketMonitoringService> _logger;
    private readonly Timer _monitoringTimer;
    private readonly ConcurrentQueue<ConnectionSnapshot> _snapshots = new();
    
    public WebSocketMonitoringService(IFeishuWebSocketManager webSocketManager,
        ILogger<WebSocketMonitoringService> logger)
    {
        _webSocketManager = webSocketManager;
        _logger = logger;
        
        // 每30秒收集一次状态快照
        _monitoringTimer = new Timer(CollectStatusSnapshot, null,
            TimeSpan.Zero, TimeSpan.FromSeconds(30));
    }
    
    /// <summary>
    /// 收集状态快照
    /// </summary>
    private void CollectStatusSnapshot(object? state)
    {
        var stats = _webSocketManager.GetConnectionStats();
        var connectionState = _webSocketManager.GetConnectionState();
        
        var snapshot = new ConnectionSnapshot
        {
            Timestamp = DateTime.UtcNow,
            IsConnected = stats.IsConnected,
            Uptime = stats.Uptime,
            ReconnectCount = stats.ReconnectCount,
            LastError = stats.LastError,
            ConnectionState = connectionState.Status.ToString()
        };
        
        _snapshots.Enqueue(snapshot);
        
        // 保持最近100个快照
        while (_snapshots.Count > 100)
        {
            _snapshots.TryDequeue(out _);
        }
        
        // 分析连接质量
        AnalyzeConnectionQuality();
    }
    
    /// <summary>
    /// 分析连接质量
    /// </summary>
    private void AnalyzeConnectionQuality()
    {
        var recentSnapshots = _snapshots.TakeLast(10).ToList();
        if (recentSnapshots.Count < 5) return;
        
        var connectedCount = recentSnapshots.Count(s => s.IsConnected);
        var connectivityRate = (double)connectedCount / recentSnapshots.Count;
        
        if (connectivityRate < 0.9)
        {
            _logger.LogWarning("连接质量较差 - 连接率: {ConnectivityRate:P2}", connectivityRate);
        }
        
        // 分析重连频率
        var reconnectEvents = recentSnapshots
            .Where(s => s.ReconnectCount > 0)
            .ToList();
        
        if (reconnectEvents.Count > 3)
        {
            _logger.LogWarning("重连频率过高 - 最近10次检测中有{Count}次重连", 
                reconnectEvents.Count);
        }
    }
}

🎯 总结

使用 Mud.Feishu.Abstractions 和 Mud.Feishu.WebSocket 两个核心项目构建企业级的飞书WebSocket长连接应用。通过组件化架构设计、策略模式的事件处理、完善的错误隔离机制和全面的监控体系,开发者可以快速构建稳定、可靠的实时事件处理系统。

总的来说,Mud.Feishu.Abstractions 和 Mud.Feishu.WebSocket 两个核心项目的目标是提供一个可靠、易于集成、高度可扩展的飞书WebSocket长连接应用框架,大大降低了飞书WebSocket集成的开发复杂度,让开发者能够专注于业务逻辑的实现。

转自https://www.cnblogs.com/mudtools/p/19320597


该文章在 2025/12/9 15:58:44 编辑过
关键字查询
相关文章
正在查询...
点晴ERP是一款针对中小制造业的专业生产管理软件系统,系统成熟度和易用性得到了国内大量中小企业的青睐。
点晴PMS码头管理系统主要针对港口码头集装箱与散货日常运作、调度、堆场、车队、财务费用、相关报表等业务管理,结合码头的业务特点,围绕调度、堆场作业而开发的。集技术的先进性、管理的有效性于一体,是物流码头及其他港口类企业的高效ERP管理信息系统。
点晴WMS仓储管理系统提供了货物产品管理,销售管理,采购管理,仓储管理,仓库管理,保质期管理,货位管理,库位管理,生产管理,WMS管理系统,标签打印,条形码,二维码管理,批号管理软件。
点晴免费OA是一款软件和通用服务都免费,不限功能、不限时间、不限用户的免费OA协同办公管理系统。
Copyright 2010-2025 ClickSun All Rights Reserved