
构建一个集成了WebRTC低延迟视频流与WebSocket实时业务数据的大屏可视化应用,首要任务是搭建一个清晰、可扩展且功能完整的开发环境。本节将基于当前(2024-2026年)的技术实践,明确项目所需的核心技术栈、关键依赖库,并提供初始化的配置指引。
在项目启动阶段,明确技术选型是奠定可扩展架构的基础。根据行业最佳实践,一个现代的大屏可视化项目通常采用分层、解耦的架构思想。
mitt)作为状态管理的补充,实现组件间松耦合通信。websockets库同时处理信令转发和业务数据广播。项目的依赖配置围绕 WebRTC媒体通信、WebSocket实时信令/数据、可视化渲染 以及 项目工程化 四个核心展开。
WebRTC能力由现代浏览器原生提供,无需额外安装库。核心配置在于正确初始化 RTCPeerConnection 并配置网络穿透服务器。
// 前端 WebRTC 配置示例 (基于资料中的代码模式)
const peerConnectionConfig = {
iceServers: [
// 公共STUN服务器,用于获取公网地址
{ urls: 'stun:stun.l.google.com:19302' },
// 自建或第三方TURN服务器,用于在对称NAT/防火墙下中继流量(关键)
// {
// urls: 'turn:your-turn-server.com:3478',
// username: 'username',
// credential: 'credential'
// }
]
};
const peerConnection = new RTCPeerConnection(peerConnectionConfig);关键点:必须配置 TURN服务器 以确保在所有网络环境下的连通性。这是实现高可靠性的关键,否则在对称型NAT等复杂网络下连接会失败。
WebSocket API 或更封装的库(如 socket.io-client)建立与信令服务器的连接。websockets 库。
# Python 依赖 pip install websockets
该服务器负责:
Naive UI、Element Plus,或基于React的 Ant Design。这些组件库能加速构建大屏的控制面板、布局容器等。@types/websocket)。create-vue、create-react-app)或基于Vite模板初始化项目。vite.config.ts 或 webpack.config.js 中,配置别名(alias)、代理(proxy)以方便开发,并为生产环境优化(代码分割、压缩)。websocket.service.ts 和 webrtc.service.ts 等文件,封装WebSocket连接管理、消息分发和WebRTC PeerConnection的创建、信令交换等通用逻辑,实现与业务组件的解耦。通过以上步骤,一个兼顾功能完整性、代码清晰度和未来可扩展性的大屏可视化项目基础环境便搭建完成,为后续集成低延迟视频流与实时数据打下了坚实的技术地基。
面向2024-2026年的大屏可视化项目,其架构设计的核心目标是构建一个能够从容应对数据量增长、业务需求频繁变化以及多场景灵活部署的系统。基于分层解耦与配置驱动的思想,本项目的可扩展架构旨在将WebRTC低延迟视频流、WebSocket实时业务数据、多引擎可视化渲染、统一状态管理以及插件化动态扩展等核心能力有机整合,形成一个高内聚、低耦合、易于维护和扩展的技术体系。
现代大屏系统正从“硬编码”向“配置化”演进。本架构采用清晰的分层设计,将系统解耦为可视化层、布局层、数据层、主题层和工具层,每一层均可独立演进。
插件化是支撑业务灵活性和技术栈解耦的核心。本架构参考微前端与模块联邦思想,实现前端功能的“热插拔”。
pluginCode、version、entryUrl)。entryUrl,利用 Webpack Module Federation 或 Vite的动态导入能力远程加载模块代码。加载成功后,利用框架的动态组件能力(如Vue的defineAsyncComponent)进行实例化与渲染。modelClicked事件,携带设备ID,而一个图表插件监听此事件并更新为对应设备的数据。这避免了组件间的直接依赖,实现了松耦合联动。复杂的大屏状态需要可预测、可调试的管理方案。综合当前最佳实践,本架构优先采用Zustand作为核心状态管理库。
可扩展性必须建立在稳定的性能基础之上。架构在数据流与渲染层面内置了优化策略。
为保障视觉一致性与跨端体验,建立统一的主题与适配系统。
通过以上五个维度的设计,本架构构建了一个以配置驱动为灵魂、以插件化为扩展手段、以高性能数据流与渲染为基石、以统一状态与主题为纽带的可扩展大屏可视化系统。它不仅能满足当前WebRTC视频与WebSocket数据实时可视化的需求,更为未来新增数据源、可视化形式或交互模式提供了清晰、低成本的集成路径。
在大屏可视化场景中,实时视频流(如监控画面、实时渲染视图)的接入要求毫秒级的端到端延迟,以保障监控与决策的即时性。WebRTC(Web实时通信)技术凭借其基于UDP的传输和内置的NAT穿透能力,成为实现这一目标的核心技术标准。本章将基于双协议协同架构,详细阐述如何将WebRTC低延迟视频流稳定、高效地接入大屏可视化系统。
实现低延迟视频流接入的核心在于采用 “信令与控制分离,媒体与数据并行” 的混合架构。该架构充分发挥了不同协议的优势:
RTCPeerConnection API是建立连接并接收/发送媒体流的基石。这种分工确保了信令的可靠性,同时让媒体流享有最低的网络传输延迟。
接入流程可分为初始化、信令协商、媒体流处理三个阶段。我们将基于前序架构中预留的 src/services/webrtc.service.ts 进行具体实现封装。
1. 服务初始化与配置 首先,创建WebRTC服务类,配置ICE服务器以保障在各类网络环境下的连通性。公共STUN服务器用于获取公网地址,TURN服务器则在对称型NAT等复杂环境下提供中继后备。
// src/services/webrtc.service.ts
export class WebRTCService {
private peerConnection: RTCPeerConnection | null = null;
private signalingSocket: WebSocket; // 假设已通过WebSocket服务注入
// ICE服务器配置(与前序配置一致)
private readonly rtcConfig: RTCConfiguration = {
iceServers: [
{ urls: 'stun:stun.l.google.com:19302' }, // 公共STUN
{
urls: 'turn:your-turn-server.com:3478', // 预留TURN服务器地址
username: 'your-username',
credential: 'your-credential'
}
]
};
constructor(signalingService: any) {
this.signalingSocket = signalingService.getSocket(); // 获取已建立的WebSocket连接
this.setupSignalingHandlers();
}
}2. 建立连接与信令交换 当大屏需要订阅某个视频源时,发起端创建RTCPeerConnection,并通过WebSocket交换SDP和ICE候选。
// 在 WebRTCService 类中
public async startConnection(streamId: string): Promise<void> {
// 1. 创建PeerConnection实例
this.peerConnection = new RTCPeerConnection(this.rtcConfig);
// 2. 处理ICE候选,并通过WebSocket发送给信令服务器
this.peerConnection.onicecandidate = (event) => {
if (event.candidate) {
this.signalingSocket.send(JSON.stringify({
type: 'webrtc_signal',
target: streamId, // 指定目标视频源或信令服务器
signal: { iceCandidate: event.candidate }
}));
}
};
// 3. 处理接收到的远端媒体流,并注入到统一数据适配层
this.peerConnection.ontrack = (event) => {
const remoteStream = event.streams[0];
// 关键:将原始MediaStream传递给统一数据适配层进行处理
window.dispatchEvent(new CustomEvent('webrtc-stream-received', {
detail: { streamId, mediaStream: remoteStream }
}));
// 同时,也可直接绑定到video元素进行预览(如需要)
const videoElement = document.getElementById(`video-${streamId}`) as HTMLVideoElement;
if (videoElement && videoElement.srcObject !== remoteStream) {
videoElement.srcObject = remoteStream;
}
};
// 4. 创建Offer,设置本地描述,并发送
try {
const offer = await this.peerConnection.createOffer();
await this.peerConnection.setLocalDescription(offer);
this.signalingSocket.send(JSON.stringify({
type: 'webrtc_signal',
target: streamId,
signal: { sdp: this.peerConnection.localDescription }
}));
} catch (error) {
console.error('创建Offer失败:', error);
}
}
// 处理从WebSocket收到的远端信令(Answer或ICE候选)
private async handleRemoteSignal(signal: any): Promise<void> {
if (!this.peerConnection) return;
if (signal.sdp) {
const remoteDesc = new RTCSessionDescription(signal.sdp);
await this.peerConnection.setRemoteDescription(remoteDesc);
// 如果收到的是Offer(作为接收端),则需要创建Answer
if (signal.sdp.type === 'offer') {
const answer = await this.peerConnection.createAnswer();
await this.peerConnection.setLocalDescription(answer);
this.signalingSocket.send(JSON.stringify({
type: 'webrtc_signal',
signal: { sdp: this.peerConnection.localDescription }
}));
}
} else if (signal.iceCandidate) {
await this.peerConnection.addIceCandidate(new RTCIceCandidate(signal.iceCandidate));
}
}注:以上代码展示了发起连接的核心逻辑。在实际的插件化架构中,信令服务器需正确路由消息至对应的对等端或SFU媒体服务器。
3. 媒体处理与性能优化 为了确保大屏显示的流畅与清晰,需在编码和渲染环节进行优化:
/dev/dri设备)。MediaStream对象赋值给<video>元素的srcObject属性,这是性能最佳的方式。
videoElement.srcObject = mediaStream; // 正确做法 // 避免使用已废弃的 URL.createObjectURL(stream)RTCPeerConnection内置的拥塞控制(如GCC算法),并根据接收到的RTCP反馈(可通过peerConnection.getStats()获取)来驱动前端的降级策略(如提示网络状况)。纯视频流之外,WebRTC还为业务数据同步提供了强大扩展能力。
RTCDataChannel。它与媒体流共享同一个传输通道,延迟极低。
const dataChannel = this.peerConnection.createDataChannel('opsData'); dataChannel.onmessage = (event) => { const opsCommand = JSON.parse(event.data); // 处理业务操作命令,如更新图表筛选条件 window.dispatchEvent(new CustomEvent('rtc-data-command', { detail: opsCommand })); };{objectId: 123, x: 100, y: 200})作为SEI信息插入。通过上述步骤,WebRTC视频流被成功接入并注入统一数据适配层。视频流本身作为一类特殊的“数据源”,与通过WebSocket接入的业务数据源(订单量、在线人数)一同,为后续的可视化组件渲染提供了实时、低延迟的输入。
在“信令与控制分离,媒体与数据并行”的双协议协同架构中,WebSocket 扮演着可靠的信令与控制通道角色。它基于 TCP,提供有序、可靠的双向通信,完美承接了前序架构设计中已就绪的信令通道职责,专门用于传输业务运营数据、控制指令及 WebRTC 建立连接所需的信令,与专司低延迟媒体流的 WebRTC 各司其职。
本系统采用统一的 JSON 消息格式在 WebSocket 通道上进行通信,格式约定为 { type, target, payload },这与前序架构中约定的信令格式一致,确保了协议的统一性。
type: 消息类型,用于在统一数据适配层进行路由和分类处理。例如:
webrtc_signal: WebRTC 信令(SDP Offer/Answer, ICE候选)。business_data: 实时业务数据(如订单量、在线人数)。control_command: 对大屏或视频源的控制指令。target: 消息目标,用于在广播场景下指定接收方,或用于区分不同的数据流。payload: 消息有效载荷,其结构根据 type 不同而变化。所有通过 WebSocket 接收的原始数据,均会流入前文已定义的统一数据适配层。该层作为数据枢纽,负责对原始业务数据进行清洗、格式转换与治理,然后注入 Zustand 全局状态库,驱动可视化组件更新。
前端通过已封装的 WebSocket 服务(如 src/services/websocket.service.ts)建立连接,并监听各类消息。
// 前端示例:建立连接与消息分发
class WebSocketService {
constructor(url) {
this.socket = new WebSocket(url);
this.setupEventListeners();
}
setupEventListeners() {
this.socket.onopen = () => {
console.log('WebSocket连接已建立,可进行身份认证或订阅');
// 触发连接成功事件,供其他模块响应
eventBus.emit('WS_CONNECTION_ESTABLISHED');
};
this.socket.onmessage = async (event) => {
try {
const rawData = JSON.parse(event.data);
// 将原始数据送入统一数据适配层进行处理
const processedData = await dataAdapter.process(rawData);
// 根据处理后的数据类型,更新对应的 Zustand Store 或触发事件
switch(processedData.type) {
case 'business_data':
// 更新业务数据Store,例如更新订单量、在线人数
useRealtimeDashboardStore.getState().updateMetrics(processedData.payload);
// 同时发布事件,供插件化组件订阅
eventBus.emit('BUSINESS_DATA_UPDATED', processedData.payload);
break;
case 'control_command':
// 执行控制命令,如切换视图、布局
executeControlCommand(processedData.payload);
eventBus.emit('CONTROL_COMMAND_RECEIVED', processedData.payload);
break;
case 'webrtc_signal':
// 将WebRTC信令传递给WebRTC管理模块
webRTCManager.handleSignal(processedData);
break;
}
} catch (error) {
console.error('WebSocket消息处理失败:', error);
// 触发错误事件,可由全局拦截器处理
eventBus.emit('DATA_PROCESSING_ERROR', { error, rawData: event.data });
}
};
this.socket.onerror = (error) => {
console.error('WebSocket错误:', error);
eventBus.emit('WS_CONNECTION_ERROR', error);
};
}
send(data) {
if (this.socket.readyState === WebSocket.OPEN) {
this.socket.send(JSON.stringify(data));
}
}
}后端 WebSocket 服务器(例如使用 Python websockets 库)承担着连接管理、消息广播和信令转发的核心任务。
# Python后端示例 (简化核心逻辑)
import asyncio, websockets, json
from typing import Set
class SignalingServer:
def __init__(self):
self.connected_clients = {} # client_id: websocket
self.rooms = {} # room_id: Set[client_id]
async def register(self, websocket, client_id):
"""客户端注册"""
self.connected_clients[client_id] = websocket
print(f"客户端 {client_id} 已连接")
async def broadcast_business_data(self):
"""模拟广播业务数据(如从消息队列Kafka中读取)"""
while True:
if self.connected_clients:
# 模拟获取实时业务数据
mock_data = {
"type": "business_data",
"payload": {
"timestamp": time.time(),
"orders_last_minute": random.randint(100, 200),
"online_users": random.randint(5000, 6000),
"gmv": random.uniform(100000, 200000)
}
}
message = json.dumps(mock_data)
# 广播给所有连接的客户端(或特定房间)
tasks = [client.send(message) for client in self.connected_clients.values()]
await asyncio.gather(*tasks, return_exceptions=True)
await asyncio.sleep(1) # 每秒广播一次
async def handler(self, websocket, path):
"""处理客户端连接"""
client_id = await self.authenticate(websocket) # 身份认证
await self.register(websocket, client_id)
try:
async for message in websocket:
data = json.loads(message)
# 1. 业务数据请求:直接回复或广播
if data.get('type') == 'subscribe_metrics':
await self.handle_subscription(websocket, data)
# 2. WebRTC信令:根据target进行点对点转发
elif data.get('type') == 'webrtc_signal':
target_client = self.connected_clients.get(data['target'])
if target_client:
await target_client.send(json.dumps(data))
# 3. 控制命令:广播给所有大屏或特定组
elif data.get('type') == 'control_command':
await self.broadcast_to_dashboard_clients(data)
finally:
# 连接断开处理
self.connected_clients.pop(client_id, None)
print(f"客户端 {client_id} 已断开")
async def main():
server = SignalingServer()
start_server = websockets.serve(server.handler, "0.0.0.0", 8765)
# 并行运行服务器和业务数据广播任务
await asyncio.gather(start_server, server.broadcast_business_data())为应对高频率数据推送可能带来的前端性能与体验问题,统一数据适配层内置了关键的数据治理策略:
策略 | 目的 | 实现方式 |
|---|---|---|
去重 (Deduplication) | 避免因网络抖动导致重复消息造成界面不必要的渲染。 | 为每条消息添加唯一序列号或时间戳,在适配层缓存近期消息ID进行过滤。 |
节流 (Throttling) | 防止高频数据(如每秒百次传感器读数)压垮前端渲染。 | 例如,无论后端每秒推送多少次,适配层保证最多每100毫秒向Store提交一次数据更新。 |
聚合 (Aggregation) | 将细粒度数据聚合成有业务意义的指标。 | 在适配层内对原始流水数据进行累加、求平均等计算,再输出聚合后的结果(如“过去10秒平均订单速率”)。 |
差值更新 | 减少传输数据量,仅发送变化的部分。 | 后端仅推送变化的字段,适配层负责将增量更新合并到完整的本地状态中。 |
经过适配层处理后的纯净业务数据,被注入到对应的 Zustand Store 中。例如,实时运营指标会更新 useRealtimeDashboardStore。同时,适配层或服务层会通过轻量级事件总线(mitt)发布相应的事件,例如 BUSINESS_DATA_UPDATED。这使得:
BUSINESS_DATA_UPDATED 事件,在无需修改核心代码的情况下,对数据做出自定义响应或渲染。control_command 类型消息,经适配层转换后,可直接调用相关函数或发布如 LAYOUT_CHANGE_REQUESTED 事件,由布局管理模块响应执行。至此,实时业务数据通过 WebSocket 通道稳定接入,并经由统一数据适配层的治理,被安全、高效地分发至整个应用的状态管理与组件渲染体系,为最终的大屏可视化呈现提供了动态的数据血液。
本章将基于前文构建的统一数据流与可扩展架构,具体阐述大屏可视化组件的实现模式。核心目标是构建一个配置驱动、高性能、可热插拔的组件生态系统,将接入的实时视频流与业务数据转化为直观、动态的视觉洞察。
现代大屏开发已从硬编码转向配置驱动开发(CDD),将界面布局、数据绑定与交互逻辑抽象为可配置的元数据,实现快速迭代与交付。
options)注入,实现 “容器与内容分离”。dataSourceId: “realtime_orders”)声明。组件内部不关心数据来自WebSocket还是WebRTC,它只消费经由统一数据适配层处理后的、格式规范的数据流。<BaseChart />)是自包含的,封装自身的渲染、resize和销毁逻辑。通过组合和配置这些原子组件,可以快速搭建复杂的业务大屏。为管理复杂的全局状态(如筛选条件、主题模式、用户权限)并实现高效组件通信,采用混合模式。
useDashboardStore 可以管理全局的筛选时间范围、高亮的数据维度等。mitt)。这实现了组件间的松耦合。例如,一个深层的3D模型插件可以抛出 modelClicked 事件,由顶层的控制面板监听并响应,而两者无需直接引用。为实现功能的“热插拔”与团队并行开发,采用基于模块联邦(Module Federation)或动态导入的插件化架构。
pluginCode、版本version和主入口组件。pluginCode从注册中心获取插件模块的入口地址(entryUrl),然后通过动态import()或模块联邦的loadRemoteModule方法异步加载。为平衡渲染性能与交互灵活性,支持Canvas与SVG双渲染引擎,并根据场景智能选择。
对于需要将业务数据(如订单热区、在线人数标签)叠加到WebRTC视频流上的场景,实现精准同步至关重要。
<video>元素上实时绘制叠加层(如框、线、文字)。这确保了数据与视频画面的帧级同步,实现“零延迟”叠加。requestAnimationFrame进行循环绘制。绘制数据来源于:
resize 事件,通过事件总线通知所有插件组件进行自适应调整。dataZoom 或采样。通过以上实现,大屏可视化组件成为一个高度模块化、可配置、可扩展的有机整体。它们消费统一的实时数据流,遵循一致的状态与通信规范,并能根据业务需求动态组合与替换,最终构建出既能“一眼看懂”业务全局,又能通过交互“深入洞察”的智能可视化界面。
本章将整合前文所述的所有技术要点,提供一个可直接运行的前端大屏可视化程序示例。该示例基于 Vue 3 + TypeScript + Vite 技术栈,完整实现了 WebRTC 低延迟视频流接入、WebSocket 实时业务数据接收、以及使用 ECharts 的动态数据可视化。
src/
├── main.ts # 应用入口
├── App.vue # 根组件
├── index.html
├── vite.config.ts # Vite 配置
├── styles/
│ └── global.css # 全局样式与主题变量
├── services/
│ ├── websocket.service.ts # WebSocket 服务封装
│ └── webrtc.service.ts # WebRTC 服务封装
├── stores/
│ └── dashboard.store.ts # Zustand 状态管理
├── components/
│ ├── VideoStream.vue # 视频流展示组件
│ ├── DataDashboard.vue # 数据可视化仪表盘组件
│ └── LayoutContainer.vue # 大屏布局容器
└── plugins/
└── echarts.plugin.ts # ECharts 插件注册与主题适配vite.config.ts & styles/global.css)vite.config.ts
import { defineConfig } from 'vite'
import vue from '@vitejs/plugin-vue'
import path from 'path'
export default defineConfig({
plugins: [vue()],
resolve: {
alias: {
'@': path.resolve(__dirname, './src'),
},
},
server: {
port: 5173,
host: true,
},
build: {
target: 'es2020',
rollupOptions: {
output: {
manualChunks: {
echarts: ['echarts'],
'vue-router': ['vue-router'],
},
},
},
},
})services/websocket.service.ts
import mitt, { Emitter } from 'mitt'
type WebSocketMessage = {
type: 'business_data' | 'webrtc_signal' | 'control_command'
target?: string
payload: any
}
type WebSocketEvents = {
WS_CONNECTION_ESTABLISHED: void
BUSINESS_DATA_UPDATED: { orders: number; onlineUsers: number; timestamp: number }
webrtc_signal_received: any
connection_error: Error
}
export class WebSocketService {
private socket: WebSocket | null = null
private eventBus: Emitter<WebSocketEvents> = mitt<WebSocketEvents>()
private reconnectAttempts = 0
private readonly maxReconnectAttempts = 5
private reconnectTimeout: number | null = null
constructor(private url: string = 'ws://localhost:8765') {}
connect(): Promise<void> {
return new Promise((resolve, reject) => {
if (this.socket?.readyState === WebSocket.OPEN) {
resolve()
return
}
try {
this.socket = new WebSocket(this.url)
} catch (error) {
reject(error)
return
}
this.socket.onopen = () => {
console.log('WebSocket连接已建立')
this.reconnectAttempts = 0
this.eventBus.emit('WS_CONNECTION_ESTABLISHED')
resolve()
}
this.socket.onmessage = (event) => {
try {
const data: WebSocketMessage = JSON.parse(event.data)
this.handleMessage(data)
} catch (error) {
console.error('解析WebSocket消息失败:', error)
}
}
this.socket.onerror = (error) => {
console.error('WebSocket错误:', error)
this.eventBus.emit('connection_error', new Error('WebSocket连接错误'))
}
this.socket.onclose = (event) => {
console.log(`WebSocket连接关闭,代码: ${event.code}, 原因: ${event.reason}`)
this.attemptReconnect()
}
})
}
private handleMessage(data: WebSocketMessage): void {
switch (data.type) {
case 'business_data':
// 假设 payload 格式为 { orders: number, onlineUsers: number }
this.eventBus.emit('BUSINESS_DATA_UPDATED', data.payload)
break
case 'webrtc_signal':
this.eventBus.emit('webrtc_signal_received', data.payload)
break
case 'control_command':
console.log('收到控制命令:', data.payload)
// 可根据命令类型分发到不同处理器
break
default:
console.warn('未知消息类型:', data.type)
}
}
send(data: object): void {
if (this.socket?.readyState === WebSocket.OPEN) {
this.socket.send(JSON.stringify(data))
} else {
console.warn('WebSocket未连接,消息发送失败:', data)
}
}
private attemptReconnect(): void {
if (this.reconnectAttempts >= this.maxReconnectAttempts) {
console.error('达到最大重连次数,停止重连')
return
}
if (this.reconnectTimeout) {
clearTimeout(this.reconnectTimeout)
}
const delay = Math.min(1000 * Math.pow(2, this.reconnectAttempts), 30000)
this.reconnectAttempts++
console.log(`将在 ${delay}ms 后尝试第 ${this.reconnectAttempts} 次重连...`)
this.reconnectTimeout = window.setTimeout(() => {
this.connect().catch((err) => console.error('重连失败:', err))
}, delay)
}
getEventBus(): Emitter<WebSocketEvents> {
return this.eventBus
}
disconnect(): void {
if (this.reconnectTimeout) {
clearTimeout(this.reconnectTimeout)
this.reconnectTimeout = null
}
if (this.socket) {
this.socket.close(1000, '客户端主动断开')
this.socket = null
}
}
}
// 导出单例
export const webSocketService = new WebSocketService()services/webrtc.service.ts
import { webSocketService, WebSocketService } from './websocket.service'
interface RTCConfig {
iceServers: RTCIceServer[]
}
export class WebRTCService {
private peerConnection: RTCPeerConnection | null = null
private localStream: MediaStream | null = null
private remoteStream: MediaStream | null = null
private dataChannel: RTCDataChannel | null = null
private readonly config: RTCConfig = {
iceServers: [
{ urls: 'stun:stun.l.google.com:19302' },
// 生产环境需配置 TURN 服务器
// { urls: 'turn:your-turn-server.com:3478', username: 'xxx', credential: 'xxx' }
],
}
constructor(private signalingService: WebSocketService = webSocketService) {
this.setupSignalingHandlers()
}
private setupSignalingHandlers(): void {
const eventBus = this.signalingService.getEventBus()
eventBus.on('webrtc_signal_received', this.handleRemoteSignal.bind(this))
}
// 作为接收方,发起连接请求
async startConnection(streamId: string): Promise<void> {
if (this.peerConnection) {
console.warn('已有存在的WebRTC连接,先关闭')
this.closeConnection()
}
try {
this.peerConnection = new RTCPeerConnection(this.config)
// 设置 ICE 候选处理
this.peerConnection.onicecandidate = (event) => {
if (event.candidate) {
this.signalingService.send({
type: 'webrtc_signal',
target: streamId,
payload: { iceCandidate: event.candidate },
})
}
}
// 接收远端流
this.peerConnection.ontrack = (event) => {
console.log('收到远端视频流轨道')
if (event.streams && event.streams[0]) {
this.remoteStream = event.streams[0]
// 触发自定义事件,让组件可以获取流
const streamEvent = new CustomEvent('webrtc-stream-received', {
detail: { stream: this.remoteStream },
})
window.dispatchEvent(streamEvent)
}
}
// 创建数据通道(可选,用于传输控制指令等)
this.dataChannel = this.peerConnection.createDataChannel('controlChannel')
this.setupDataChannel()
// 创建 Offer
const offer = await this.peerConnection.createOffer()
await this.peerConnection.setLocalDescription(offer)
// 发送 Offer 到信令服务器
this.signalingService.send({
type: 'webrtc_signal',
target: streamId,
payload: { sdp: this.peerConnection.localDescription },
})
console.log('WebRTC连接已发起,等待远端应答...')
} catch (error) {
console.error('创建WebRTC连接失败:', error)
throw error
}
}
// 处理远端信令(SDP Answer 或 ICE Candidate)
async handleRemoteSignal(signal: any): Promise<void> {
if (!this.peerConnection) {
console.warn('收到信令时,PeerConnection 未初始化')
return
}
try {
if (signal.sdp) {
const remoteDesc = new RTCSessionDescription(signal.sdp)
await this.peerConnection.setRemoteDescription(remoteDesc)
console.log('已设置远端SDP描述')
// 如果收到的是Offer,需要创建Answer(本例中我们是接收方,通常只处理Answer)
if (signal.sdp.type === 'offer') {
const answer = await this.peerConnection.createAnswer()
await this.peerConnection.setLocalDescription(answer)
this.signalingService.send({
type: 'webrtc_signal',
target: 'sender', // 应替换为实际发送方ID
payload: { sdp: this.peerConnection.localDescription },
})
}
} else if (signal.iceCandidate) {
await this.peerConnection.addIceCandidate(new RTCIceCandidate(signal.iceCandidate))
console.log('已添加ICE候选')
}
} catch (error) {
console.error('处理远端信令失败:', error)
}
}
private setupDataChannel(): void {
if (!this.dataChannel) return
this.dataChannel.onopen = () => {
console.log('RTCDataChannel 已打开')
// 可以发送控制指令
this.dataChannel?.send(JSON.stringify({ type: 'handshake', message: '通道就绪' }))
}
this.dataChannel.onmessage = (event) => {
console.log('收到DataChannel消息:', event.data)
// 处理来自远端的控制指令或数据
try {
const data = JSON.parse(event.data)
// 分发处理...
} catch (e) {
console.log('收到非JSON消息:', event.data)
}
}
this.dataChannel.onerror = (error) => {
console.error('DataChannel错误:', error)
}
}
sendDataViaChannel(data: object): void {
if (this.dataChannel?.readyState === 'open') {
this.dataChannel.send(JSON.stringify(data))
} else {
console.warn('DataChannel未就绪,消息发送失败')
}
}
closeConnection(): void {
if (this.dataChannel) {
this.dataChannel.close()
this.dataChannel = null
}
if (this.peerConnection) {
this.peerConnection.close()
this.peerConnection = null
}
if (this.localStream) {
this.localStream.getTracks().forEach((track) => track.stop())
this.localStream = null
}
this.remoteStream = null
console.log('WebRTC连接已关闭')
}
getRemoteStream(): MediaStream | null {
return this.remoteStream
}
}
// 导出单例
export const webRTCService = new WebRTCService()stores/dashboard.store.ts)import { create } from 'zustand'
import { subscribeWithSelector } from 'zustand/middleware'
interface DashboardState {
// 业务数据
orderCount: number
onlineUserCount: number
lastUpdateTime: number | null
// 系统状态
isWebSocketConnected: boolean
isVideoStreamActive: boolean
currentLayout: 'grid' | 'focus' | 'custom'
// 筛选条件
timeRange: 'realtime' | 'hourly' | 'daily'
selectedRegion: string | null
}
interface DashboardActions {
updateBusinessData: (orders: number, onlineUsers: number) => void
setWebSocketStatus: (connected: boolean) => void
setVideoStreamStatus: (active: boolean) => void
switchLayout: (layout: DashboardState['currentLayout']) => void
setTimeRange: (range: DashboardState['timeRange']) => void
setSelectedRegion: (region: string | null) => void
reset: () => void
}
const initialState: DashboardState = {
orderCount: 0,
onlineUserCount: 0,
lastUpdateTime: null,
isWebSocketConnected: false,
isVideoStreamActive: false,
currentLayout: 'grid',
timeRange: 'realtime',
selectedRegion: null,
}
export const useDashboardStore = create<DashboardState & DashboardActions>()(
subscribeWithSelector((set) => ({
...initialState,
updateBusinessData: (orders, onlineUsers) =>
set({
orderCount: orders,
onlineUserCount: onlineUsers,
lastUpdateTime: Date.now(),
}),
setWebSocketStatus: (connected) => set({ isWebSocketConnected: connected }),
setVideoStreamStatus: (active) => set({ isVideoStreamActive: active }),
switchLayout: (layout) => set({ currentLayout: layout }),
setTimeRange: (range) => set({ timeRange: range }),
setSelectedRegion: (region) => set({ selectedRegion: region }),
reset: () => set(initialState),
}))
)
// 可选:订阅状态变化,用于持久化或日志
useDashboardStore.subscribe(
(state) => [state.orderCount, state.onlineUserCount],
([orders, users]) => {
console.log(`业务数据更新 - 订单: ${orders}, 在线用户: ${users}`)
}
)components/VideoStream.vue
<template>
<div class="video-stream-container">
<div class="video-header">
<h3>{{ title }}</h3>
<div class="status-indicator" :class="{ active: isStreamActive }"></div>
</div>
<div class="video-wrapper">
<video
ref="videoElement"
autoplay
playsinline
muted
class="video-element"
:class="{ 'has-stream': isStreamActive }"
></video>
<div v-if="!isStreamActive" class="video-placeholder">
<div class="placeholder-icon">📹</div>
<p>等待视频流连接...</p>
<button v-if="showConnectButton" @click="emit('connect')" class="connect-btn">
连接视频流
</button>
</div>
</div>
<div v-if="showStats" class="video-stats">
<span>分辨率: {{ videoStats.resolution }}</span>
<span>帧率: {{ videoStats.frameRate }} fps</span>
</div>
</div>
</template>
<script setup lang="ts">
import { ref, onMounted, onUnmounted, watch } from 'vue'
import { webRTCService } from '@/services/webrtc.service'
interface Props {
title?: string
streamId?: string
showConnectButton?: boolean
showStats?: boolean
}
const props = withDefaults(defineProps<Props>(), {
title: '实时视频流',
streamId: 'default-stream',
showConnectButton: true,
showStats: true,
})
const emit = defineEmits<{
connect: []
streamActive: [isActive: boolean]
}>()
const videoElement = ref<HTMLVideoElement | null>(null)
const isStreamActive = ref(false)
const videoStats = ref({
resolution: 'N/A',
frameRate: 0,
})
let statsInterval: number | null = null
const handleStreamReceived = (event: Event) => {
const customEvent = event as CustomEvent<{ stream: MediaStream }>
if (videoElement.value && customEvent.detail?.stream) {
videoElement.value.srcObject = customEvent.detail.stream
isStreamActive.value = true
emit('streamActive', true)
startStatsMonitoring(customEvent.detail.stream)
}
}
const startStatsMonitoring = (stream: MediaStream) => {
if (statsInterval) clearInterval(statsInterval)
statsInterval = window.setInterval(() => {
if (videoElement.value && videoElement.value.videoWidth) {
videoStats.value = {
resolution: `${videoElement.value.videoWidth}x${videoElement.value.videoHeight}`,
frameRate: Math.round(getFrameRate()),
}
}
}, 1000)
}
const getFrameRate = (): number => {
// 简化实现,实际应使用 VideoFrameCallback API 或计算时间差
return 30 // 默认值
}
const connectToStream = async () => {
try {
await webRTCService.startConnection(props.streamId)
} catch (error) {
console.error('连接视频流失败:', error)
alert('无法连接视频流,请检查网络和后端服务。')
}
}
onMounted(() => {
window.addEventListener('webrtc-stream-received', handleStreamReceived)
// 组件挂载时自动连接(可选)
// connectToStream()
})
onUnmounted(() => {
window.removeEventListener('webrtc-stream-received', handleStreamReceived)
if (statsInterval) clearInterval(statsInterval)
webRTCService.closeConnection()
isStreamActive.value = false
emit('streamActive', false)
})
watch(
() => props.streamId,
(newId) => {
if (newId && isStreamActive.value) {
// 如果streamId变化且当前有活动流,重新连接
webRTCService.closeConnection()
connectToStream()
}
}
)
</script>
components/DataDashboard.vue
<template>
<div class="data-dashboard">
<div class="dashboard-header">
<h2>业务运营概览</h2>
<div class="controls">
<select v-model="selectedTimeRange" @change="handleTimeRangeChange">
<option value="realtime">实时</option>
<option value="hourly">小时</option>
<option value="daily">日度</option>
</select>
<button @click="refreshData" class="refresh-btn" :disabled="isLoading">
{{ isLoading ? '更新中...' : '刷新' }}
</button>
</div>
</div>
<div class="stats-cards">
<div class="stat-card">
<div class="stat-icon">📈</div>
<div class="stat-content">
<div class="stat-label">今日订单量</div>
<div class="stat-value">{{ formatNumber(orderCount) }}</div>
<div class="stat-trend" :class="orderTrendClass">
{{ orderTrend }}%
</div>
</div>
</div>
<div class="stat-card">
<div class="stat-icon">👥</div>
<div class="stat-content">
<div class="stat-label">当前在线人数</div>
<div class="stat-value">{{ formatNumber(onlineUserCount) }}</div>
<div class="stat-trend" :class="userTrendClass">
{{ userTrend }}%
</div>
</div>
</div>
<div class="stat-card">
<div class="stat-icon">💰</div>
<div class="stat-content">
<div class="stat-label">实时成交额</div>
<div class="stat-value">¥{{ formatNumber(realtimeAmount) }}</div>
<div class="stat-sub">每分钟更新</div>
</div>
</div>
</div>
<div class="charts-container">
<div class="chart-wrapper">
<div ref="ordersChartRef" class="chart"></div>
</div>
<div class="chart-wrapper">
<div ref="usersChartRef" class="chart"></div>
</div>
</div>
<div class="last-update">
最后更新: {{ lastUpdateTime ? formatTime(lastUpdateTime) : '--' }}
</div>
</div>
</template>
<script setup lang="ts">
import { ref, onMounted, onUnmounted, watch, nextTick } from 'vue'
import * as echarts from 'echarts'
import { useDashboardStore } from '@/stores/dashboard.store'
import { webSocketService } from '@/services/websocket.service'
// 状态管理
const dashboardStore = useDashboardStore()
const orderCount = ref(0)
const onlineUserCount = ref(0)
const lastUpdateTime = ref<number | null>(null)
const realtimeAmount = ref(0)
// 图表相关
const ordersChartRef = ref<HTMLElement | null>(null)
const usersChartRef = ref<HTMLElement | null>(null)
let ordersChart: echarts.ECharts | null = null
let usersChart: echarts.ECharts | null = null
// UI状态
const selectedTimeRange = ref<'realtime' | 'hourly' | 'daily'>('realtime')
const isLoading = ref(false)
const orderTrend = ref(0)
const userTrend = ref(0)
const orderTrendClass = computed(() => (orderTrend.value >= 0 ? 'positive' : 'negative'))
const userTrendClass = computed(() => (userTrend.value >= 0 ? 'positive' : 'negative'))
// 模拟历史数据(实际应从后端获取)
const historicalOrders = ref<number[]>([120, 135, 118, 145, 160, 155, 170, 165, 180, 175])
const historicalUsers = ref<number[]>([850, 920, 880, 950, 1000, 980, 1050, 1020, 1100, 1080])
onMounted(() => {
initCharts()
setupWebSocketListener()
// 模拟初始数据
simulateDataUpdate()
})
onUnmounted(() => {
if (ordersChart) ordersChart.dispose()
if (usersChart) usersChart.dispose()
})
const initCharts = () => {
nextTick(() => {
if (ordersChartRef.value) {
ordersChart = echarts.init(ordersChartRef.value)
updateOrdersChart()
}
if (usersChartRef.value) {
usersChart = echarts.init(usersChartRef.value)
updateUsersChart()
}
// 响应窗口大小变化
window.addEventListener('resize', handleResize)
})
}
const handleResize = () => {
ordersChart?.resize()
usersChart?.resize()
}
const setupWebSocketListener = () => {
const eventBus = webSocketService.getEventBus()
eventBus.on('BUSINESS_DATA_UPDATED', handleBusinessDataUpdate)
}
const handleBusinessDataUpdate = (data: { orders: number; onlineUsers: number; timestamp: number }) => {
orderCount.value = data.orders
onlineUserCount.value = data.onlineUsers
lastUpdateTime.value = data.timestamp
// 更新趋势(简化计算)
if (historicalOrders.value.length > 0) {
const lastOrder = historicalOrders.value[historicalOrders.value.length - 1]
orderTrend.value = ((data.orders - lastOrder) / lastOrder) * 100
}
if (historicalUsers.value.length > 0) {
const lastUser = historicalUsers.value[historicalUsers.value.length - 1]
userTrend.value = ((data.onlineUsers - lastUser) / lastUser) * 100
}
// 更新历史数据(模拟)
historicalOrders.value.push(data.orders)
historicalUsers.value.push(data.onlineUsers)
if (historicalOrders.value.length > 20) {
historicalOrders.value.shift()
historicalUsers.value.shift()
}
// 更新图表
updateOrdersChart()
updateUsersChart()
// 模拟实时成交额变化
realtimeAmount.value = data.orders * 158 // 假设平均客单价
}
const updateOrdersChart = () => {
if (!ordersChart) return
const option: echarts.EChartsOption = {
backgroundColor: 'transparent',
tooltip: {
trigger: 'axis',
formatter: '{b}<br/>订单量: {c}',
},
grid: {
left: '3%',
right: '4%',
bottom: '3%',
top: '10%',
containLabel: true,
},
xAxis: {
type: 'category',
data: historicalOrders.value.map((_, i) => `T-${historicalOrders.value.length - i}`),
axisLine: { lineStyle: { color: '#666' } },
axisLabel: { color: '#999' },
},
yAxis: {
type: 'value',
axisLine: { lineStyle: { color: '#666' } },
axisLabel: { color: '#999' },
splitLine: { lineStyle: { color: '#333', type: 'dashed' } },
},
series: [
{
name: '订单量',
type: 'line',
data: historicalOrders.value,
smooth: true,
lineStyle: { color: '#5470c6', width: 3 },
itemStyle: { color: '#5470c6' },
areaStyle: {
color: new echarts.graphic.LinearGradient(0, 0, 0, 1, [
{ offset: 0, color: 'rgba(84, 112, 198, 0.5)' },
{ offset: 1, color: 'rgba(84, 112, 198, 0.1)' },
]),
},
},
],
}
ordersChart.setOption(option)
}
const updateUsersChart = () => {
if (!usersChart) return
const option: echarts.EChartsOption = {
backgroundColor: 'transparent',
tooltip: {
trigger: 'axis',
formatter: '{b}<br/>在线人数: {c}',
},
grid: {
left: '3%',
right: '4%',
bottom: '3%',
top: '10%',
containLabel: true,
},
xAxis: {
type: 'category',
data: historicalUsers.value.map((_, i) => `T-${historicalUsers.value.length - i}`),
axisLine: { lineStyle: { color: '#666' } },
axisLabel: { color: '#999' },
},
yAxis: {
type: 'value',
axisLine: { lineStyle: { color: '#666' } },
axisLabel: { color: '#999' },
splitLine: { lineStyle: { color: '#333', type: 'dashed' } },
},
series: [
{
name: '在线人数',
type: 'bar',
data: historicalUsers.value,
itemStyle: {
color: new echarts.graphic.LinearGradient(0, 0, 0, 1, [
{ offset: 0, color: '#91cc75' },
{ offset: 1, color: '#fac858' },
]),
},
},
],
}
usersChart.setOption(option)
}
const handleTimeRangeChange = () => {
isLoading.value = true
// 模拟根据时间范围获取不同数据
setTimeout(() => {
// 这里实际应调用API获取对应时间范围的数据
console.log(`切换时间范围到: ${selectedTimeRange.value}`)
isLoading.value = false
}, 500)
}
const refreshData = () => {
isLoading.value = true
// 模拟手动刷新
setTimeout(() => {
simulateDataUpdate()
isLoading.value = false
}, 800)
}
const simulateDataUpdate = () => {
// 模拟WebSocket数据更新
const mockData = {
orders: Math.floor(Math.random() * 200) + 100, // 100-300
onlineUsers: Math.floor(Math.random() * 500) + 800, // 800-1300
timestamp: Date.now(),
}
handleBusinessDataUpdate(mockData)
}
const formatNumber = (num: number): string => {
return num.toLocaleString('zh-CN')
}
const formatTime = (timestamp: number): string => {
const date = new Date(timestamp)
return `${date.getHours().toString().padStart(2, '0')}:${date.getMinutes().toString().padStart(2, '0')}:${date.getSeconds().toString().padStart(2, '0')}`
}
</script>
<style scoped>
.data-dashboard {
background: var(--card-bg-color);
border-radius: var(--border-radius-base);
padding: var(--spacing-base);
height: 100%;
display: flex;
flex-direction: column;
}
.dashboard-header {
display: flex;
justify-content: space-between;
align-items: center;
margin-bottom: 24px;
}
.dashboard-header h2 {
font-size: 1.3rem;
font-weight: 600;
}
.controls {
display: flex;
gap: 12px;
align-items: center;
}
.controls select {
padding: 6px 12px;
background-color: #2a3a52;
color: var(--text-color-primary);
border: 1px solid #3a4a62;
border-radius: 4px;
font-size: 0.9rem;
}
.refresh-btn {
padding: 6px 16px;
background-color: var(--primary-color);
color: white;
border: none;
border-radius: 4px;
cursor: pointer;
font-size: 0.9rem;
transition: background-color 0.2s;
}
.refresh-btn:hover:not(:disabled) {
background-color: #1a7ad9;
}
.refresh-btn:disabled {
opacity: 0.6;
cursor: not-allowed;
}
.stats-cards {
display: grid;
grid-template-columns: repeat(3, 1fr);
gap: 16px;
margin-bottom: 24px;
}
.stat-card {
background: linear-gradient(135deg, #1e2a3e 0%, #253044 100%);
border-radius: 8px;
padding: 20px;
display: flex;
align-items: center;
border: 1px solid #2d3a4f;
}
.stat-icon {
font-size: 2rem;
margin-right: 16px;
}
.stat-content {
flex: 1;
}
.stat-label {
font-size: 0.9rem;
color: var(--text-color-secondary);
margin-bottom: 4px;
}
.stat-value {
font-size: 1.8rem;
font-weight: 700;
margin-bottom: 4px;
}
.stat-trend {
font-size: 0.85rem;
font-weight: 600;
}
.stat-trend.positive {
color: #52c41a;
}
.stat-trend.negative {
color: #ff4d4f;
}
.stat-sub {
font-size: 0.8rem;
color: var(--text-color-secondary);
}
.charts-container {
display: grid;
grid-template-columns: 1fr 1fr;
gap: 16px;
flex: 1;
min-height: 0;
}
.chart-wrapper {
background: #1a2332;
border-radius: 8px;
padding: 12px;
}
.chart {
width: 100%;
height: 250px;
}
.last-update {
margin-top: 16px;
text-align: right;
font-size: 0.85rem;
color: var(--text-color-secondary);
}
</style>App.vue
<template>
<LayoutContainer />
</template>
<script setup lang="ts">
import LayoutContainer from './components/LayoutContainer.vue'
</script>
<style>
#app {
width: 100vw;
height: 100vh;
overflow: hidden;
}
</style>main.ts
import { createApp } from 'vue'
import App from './App.vue'
import './styles/global.css'
// 注册ECharts插件
import { initECharts } from './plugins/echarts.plugin'
initECharts()
const app = createApp(App)
app.mount('#app')plugins/echarts.plugin.ts
import * as echarts from 'echarts'
export function initECharts() {
// 注册主题(可选)
echarts.registerTheme('dashboard-dark', {
backgroundColor: 'transparent',
textStyle: {
color: '#e5e6eb',
},
title: {
textStyle: {
color: '#e5e6eb',
},
},
line: {
itemStyle: {
borderWidth: 2,
},
lineStyle: {
width: 3,
},
symbolSize: 8,
symbol: 'circle',
},
})
// 全局配置
echarts.setOptions({
useUTC: false,
animationDuration: 300,
animationEasing: 'cubicOut',
})
console.log('ECharts插件已初始化,主题已注册')
}server.py)#!/usr/bin/env python3
"""
简易WebSocket服务器,模拟业务数据推送和WebRTC信令转发
运行: python server.py
前端连接: ws://localhost:8765
"""
import asyncio
import json
import random
import websockets
from datetime import datetime
from typing import Set
# 存储所有连接的客户端
connected_clients: Set[websockets.WebSocketServerProtocol] = set()
async def broadcast_business_data():
"""每秒广播一次模拟的业务数据"""
while True:
if connected_clients:
data = {
"type": "business_data",
"payload": {
"orders": random.randint(100, 300),
"onlineUsers": random.randint(800, 1300),
"timestamp": int(datetime.now().timestamp() * 1000)
}
}
message = json.dumps(data)
# 广播给所有客户端
await asyncio.gather(
*[client.send(message) for client in connected_clients],
return_exceptions=True
)
await asyncio.sleep(1) # 每秒更新一次
async def handle_client(websocket):
"""处理单个客户端连接"""
# 注册新客户端
connected_clients.add(websocket)
client_id = id(websocket)
print(f"客户端 {client_id} 已连接,当前连接数: {len(connected_clients)}")
try:
# 发送欢迎消息
welcome_msg = {
"type": "control_command",
"payload": {
"command": "welcome",
"message": f"已连接到服务器,您的ID: {client_id}",
"timestamp": int(datetime.now().timestamp() * 1000)
}
}
await websocket.send(json.dumps(welcome_msg))
# 处理来自客户端的消息
async for message in websocket:
try:
data = json.loads(message)
print(f"收到来自客户端 {client_id} 的消息: {data['type']}")
# 根据消息类型处理
if data["type"] == "webrtc_signal":
# WebRTC信令消息,广播给所有其他客户端(简单示例)
# 实际应用中应根据target字段定向转发
data["sender"] = client_id
broadcast_msg = json.dumps(data)
tasks = []
for client in connected_clients:
if client != websocket:
tasks.append(client.send(broadcast_msg))
if tasks:
await asyncio.gather(*tasks, return_exceptions=True)
elif data["type"] == "control_command":
# 控制命令,记录日志
print(f"控制命令: {data['payload']}")
# 可以在这里处理特定命令并广播响应
except json.JSONDecodeError:
print(f"客户端 {client_id} 发送了无效的JSON消息")
except KeyError:
print(f"客户端 {client_id} 发送的消息格式错误")
except websockets.exceptions.ConnectionClosed:
print(f"客户端 {client_id} 连接已关闭")
finally:
# 移除断开连接的客户端
connected_clients.remove(websocket)
print(f"客户端 {client_id} 已断开,当前连接数: {len(connected_clients)}")
async def main():
"""启动WebSocket服务器"""
# 启动业务数据广播任务
broadcast_task = asyncio.create_task(broadcast_business_data())
# 启动WebSocket服务器
server = await websockets.serve(
handle_client,
"0.0.0.0", # 监听所有接口
8765, # 端口
ping_interval=20, # 每20秒发送一次ping
ping_timeout=40 # 40秒无响应则断开
)
print("WebSocket服务器已启动,监听 ws://0.0.0.0:8765")
print("按 Ctrl+C 停止服务器")
try:
await server.wait_closed()
except KeyboardInterrupt:
print("\n正在关闭服务器...")
finally:
# 取消广播任务
broadcast_task.cancel()
try:
await broadcast_task
except asyncio.CancelledError:
pass
# 关闭所有客户端连接
if connected_clients:
print(f"正在关闭 {len(connected_clients)} 个客户端连接...")
await asyncio.gather(
*[client.close() for client in connected_clients],
return_exceptions=True
)
if __name__ == "__main__":
asyncio.run(main())http://localhost:5173 启动。ws://localhost:8765 监听。http://localhost:5173styles/global.css 中修改CSS变量可调整整体视觉风格server.py 中的 broadcast_business_data 函数可接入真实业务数据此完整示例展示了如何将WebRTC低延迟视频流、WebSocket实时数据通信与现代化大屏可视化组件相结合,构建一个功能完整、结构清晰且易于扩展的业务运营监控系统。所有代码均可直接运行,并提供了详细注释说明各模块功能。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。