- flask默认使用wsgi支持http协议,如需使用websocket需要安装gevent-websocket模块,http,websocket协议都可以支持
Django应用:channel
Tornado应用:自己有pip3 install gevent-websocket
app.py
from flask import Flask,request,render_template,redirect,sessionimport uuidfrom geventwebsocket.handler import WebSocketHandlerfrom gevent.pywsgi import WSGIServerimport jsonapp = Flask(__name__)app.secret_key = ';lkjnfdidiclsjek'GENTIEMAN = { '1':{ 'name':'钢弹','count':0}, '2':{ 'name':'铁锤','count':0}, '3':{ 'name':'闫帅','count':0},}WEBSOCKET_DICT = {}@app.before_requestdef before_reuqest(): if request.path == '/login': return None user_info = session.get('user_info') if user_info: return None return redirect('login')@app.route('/login',methods=['GET','POST'])def login(): if request.method == 'GET': return render_template('login.html') else: uid = str(uuid.uuid4()) session['user_info'] = { 'id':uid,'name':request.form.get('user')} return redirect('/index')@app.route('/index')def index(): return render_template('index.html',users=GENTIEMAN)@app.route('/message')def message(): # 1. 判断是否为Websocket请求,http不包含wsgi.websocket ws = request.environ.get('wsgi.websocket') if not ws: return 'use websocket' # 此处连接成功 current_user_id = session['user_info']['id'] WEBSOCKET_DICT[current_user_id] = ws while True: # 2. 等待用户发送消息,并接受 message = ws.receive() # 投票对应的ID # 关闭 mesaage = None if not message: del WEBSOCKET_DICT[current_user_id] break # 3. 获取用户要投票的ID,并+1 old = GENTIEMAN[message]['count'] new = old + 1 GENTIEMAN[message]['count'] = new data = { 'user_id':message,'count':new,'type':'vote'} # 给所有客户端推送消息 for conn in WEBSOCKET_DICT.values(): conn.send(json.dumps(data)) return '完毕'@app.route('/notify')def notify(): data = { 'data':'订单生成','type':'alert'} for conn in WEBSOCKET_DICT.values(): conn.send(json.dumps(data)) return '完毕'if __name__ == '__main__': # 如果是http请求走app使用原有的wsgi处理,如果是websocket请求走WebSocketHandler处理 http_server = WSGIServer(('127.0.0.1', 5000), app, handler_class=WebSocketHandler) http_server.serve_forever()
index.html
投票系统:参与投票的人
- { % for k,v in users.items() %}