Closed wu-clan closed 6 days ago
pip install "python-socketio[asyncio]"
from socketio import AsyncServer
def create_socketio( cors_allowed_origins: str | list = '*', async_mode: str = 'asgi', kwargs ) -> AsyncServer: return AsyncServer( cors_credentials=True, async_mode=async_mode, cors_allowed_origins=cors_allowed_origins, kwargs )
sio = create_socketio()
@sio.event async def connect(sid, environ, auth): print(f'connected auth={auth} sid={sid}') await sio.emit('hello', ({'hello': 'you'}), to=sid)
## 设置一个 socketio 的路由
- backend/main.py
```py
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import socketio
import uvicorn
from backend.core.registrar import register_app
from backend.core.socketio import sio
fastapi_app = register_app()
combined_asgi_app = socketio.ASGIApp(socketio_server=sio, other_asgi_app=fastapi_app)
fastapi_app.add_route('/socket.io/', route=combined_asgi_app, methods=['GET', 'POST'])
fastapi_app.add_websocket_route('/socket.io/', combined_asgi_app)
if __name__ == '__main__':
try:
uvicorn.run(combined_asgi_app, host='127.0.0.1', port=8000, reload=True, log_level='info')
except Exception as e:
raise e
pnpm add socket.io-client
demo.vue
import { io } from 'socket.io-client'
onMounted(() => {
const socket = io('http://127.0.0.1:8000', {
autoConnect: true, // 是否自动连接
path: '/socket.io',
reconnection: true, // 是否自动重新连接
reconnectionAttempts: 3, // 重新连接尝试次数
reconnectionDelay: 1000, // 重新连接延迟时间(ms)
transports: ['websocket'], // 指定传输方式,如WebSocket
})
socket.on('hello', (message) => {
console.log(message) // {hello: 'you'}
})
})
@elkon028 非常感谢您提供帮助
Done.
related: #210 #212 #328