轮询:通过setInterval向后台请求数据,更新html
from flask import Flask,render_template,request,jsonifyapp = Flask(__name__)USERS = { '1':{ 'name':'贝贝','count':1}, '2':{ 'name':'小东北','count':0}, '3':{ 'name':'何伟明','count':0},}@app.route('/user/list')def user_list(): import time return render_template('user_list.html',users=USERS)@app.route('/vote',methods=['POST'])def vote(): uid = request.form.get('uid') USERS[uid]['count'] += 1 return "投票成功"@app.route('/get/vote',methods=['GET'])def get_vote(): return jsonify(USERS)if __name__ == '__main__': # app.run(host='192.168.13.253',threaded=True) app.run(threaded=True)
Title
- { % for key,val in users.items() %}
- { {val.name}} ({ {val.count}}) { % endfor %}
长轮询:基于时间循环和队列的请求等待,实现数据的实时更新
from flask import Flask,render_template,request,jsonify,sessionimport uuidimport queueapp = Flask(__name__)app.secret_key = 'asdfasdfasd'USERS = { '1':{ 'name':'贝贝','count':1}, '2':{ 'name':'小东北','count':0}, '3':{ 'name':'何伟明','count':0},}QUEQUE_DICT = { # 'asdfasdfasdfasdf':Queue()}@app.route('/user/list')def user_list(): user_uuid = str(uuid.uuid4()) QUEQUE_DICT[user_uuid] = queue.Queue() session['current_user_uuid'] = user_uuid return render_template('user_list.html',users=USERS)@app.route('/vote',methods=['POST'])def vote(): uid = request.form.get('uid') USERS[uid]['count'] += 1 for q in QUEQUE_DICT.values(): q.put(USERS) return "投票成功"@app.route('/get/vote',methods=['GET'])def get_vote(): user_uuid = session['current_user_uuid'] q = QUEQUE_DICT[user_uuid] ret = { 'status':True,'data':None} try: users = q.get(timeout=5) ret['data'] = users except queue.Empty: ret['status'] = False return jsonify(ret)if __name__ == '__main__': app.run(host='192.168.13.253',threaded=True) # app.run(threaded=True)
Title
- { % for key,val in users.items() %}
- { {val.name}} ({ {val.count}}) { % endfor %}
from flask import Flask,render_template,request,jsonifyapp = Flask(__name__)import queueq = queue.Queue()@app.route('/get/vote')def get_vote(): try: val = q.get(timeout=20) except queue.Empty: val = "已超时" return val@app.route('/vote')def vote(): q.put('10') return '投票成功'if __name__ == '__main__': # app.run(host='192.168.13.253',threaded=True) app.run(threaded=True)