baoxiang/test_websocket.py

50 lines
1.5 KiB
Python
Raw Normal View History

2025-12-16 18:06:50 +08:00
#!/usr/bin/env python3
"""
测试WebSocket连接的简单脚本
"""
import asyncio
import websockets
import json
import sys
async def test_websocket():
"""测试WebSocket连接"""
uri = "ws://localhost:8000/socket.io/?role=streamer&id=1&token=test"
try:
print(f"尝试连接到: {uri}")
async with websockets.connect(uri) as websocket:
print("✅ WebSocket连接成功")
# 发送测试消息
test_message = {
"type": "ping",
"data": "test"
}
await websocket.send(json.dumps(test_message))
print(f"📤 发送消息: {test_message}")
# 接收消息超时5秒
try:
response = await asyncio.wait_for(websocket.recv(), timeout=5.0)
print(f"📥 收到消息: {response}")
except asyncio.TimeoutError:
print("⏱️ 5秒内未收到消息这是正常的后端可能会广播消息")
print("\n✅ WebSocket测试通过")
return True
except websockets.exceptions.InvalidURI as e:
print(f"❌ 无效的URI: {e}")
return False
except websockets.exceptions.ConnectionRefused as e:
print(f"❌ 连接被拒绝,请确保服务器正在运行: {e}")
return False
except Exception as e:
print(f"❌ 连接错误: {type(e).__name__}: {e}")
return False
if __name__ == "__main__":
result = asyncio.run(test_websocket())
sys.exit(0 if result else 1)