Add 2FA support

This commit is contained in:
2025-11-05 20:10:01 +01:00
parent 12669a86fa
commit f9b092e456
4 changed files with 132 additions and 33 deletions

View File

@@ -8,7 +8,8 @@ import os
from shutil import copy
from socket import gethostname
from flask_bcrypt import Bcrypt
from tools.utils import email_disp, append_to_log, gen_token, valid_passwd, valid_token_register, get_user_by_token
from tools.utils import email_disp, append_to_log, gen_token, valid_passwd, valid_token_register, get_user_by_token, totp_is_valid
from pyotp import random_base32
profil = Blueprint('profil', __name__, template_folder='templates')
@@ -28,7 +29,7 @@ DATAS_USER = app.config['DOSSIER_APP']
MAIL_SERVER = app.config['MAIL_SERVER']
XMPP_SERVER = app.config['XMPP_SERVER']
SETUID = app.config['SETUID']
BASE_URL = app.config['BASE_URL']
BASE_URL= "http://"+app.config['HOST']+":"+app.config['PORT']
BACKUP_TIME = app.config['BACKUP_TIME']
##################################################################################################
@@ -135,34 +136,36 @@ def homepage():
@profil.route('/profil/change-password/', methods=['GET','POST'] )
def change_passwd() :
if 'username' in session:
UTILISATEUR='%s' % escape(session['username'])
user='%s' % escape(session['username'])
conn = sqlite3.connect(DATABASE) # Connexion à la base de donnée
cursor = conn.cursor() # Création de l'objet "curseur"
cursor.execute("""SELECT Mail, alias, xmpp FROM users WHERE name=?""", (UTILISATEUR,))
cursor.execute("""SELECT Mail, alias, xmpp, totp FROM users WHERE name=?""", (user,))
tmp = cursor.fetchone()
mailbox = dict()
mailbox['Mail'] = tmp[0]
mailbox['alias'] = tmp[1]
mailbox['xmpp'] = tmp[2]
shared_key_validate=True
account = dict()
account['Mail'] = tmp[0]
account['alias'] = tmp[1]
account['xmpp'] = tmp[2]
account['totp'] = tmp[3]
if request.method == 'POST' :
password = request.form['password']
password_confirm = request.form['passwd_confirm']
if password == password_confirm and valid_passwd(password):
if not(password == "") and password == password_confirm and valid_passwd(password):
mail_passwd_change = 0
xmpp_passwd_change = 0
passwd = request.form['password']
if MAIL_SERVER:
cmd = SETUID+ ' set_mail_passwd ' + '"'+mailbox['Mail']+'" '+ '"'+passwd+'"'
cmd = SETUID+ ' set_mail_passwd ' + '"'+account['Mail']+'" '+ '"'+passwd+'"'
mail_passwd_change = os.system(cmd)
if XMPP_SERVER:
tmp = mailbox['Mail'].split('@')
tmp = account['Mail'].split('@')
cmd = SETUID+ " prosodyctl register '"+tmp[0]+"' " + "'"+tmp[1]+"' " + "'"+passwd+"'"
xmpp_passwd_change = os.system(cmd)
if xmpp_passwd_change != 0:
@@ -172,26 +175,35 @@ def change_passwd() :
if mail_passwd_change == 0:
passwd_bcrypt = bcrypt.generate_password_hash(passwd)
cursor.execute("UPDATE users SET passwd=? WHERE name=?",
(passwd_bcrypt, UTILISATEUR))
(passwd_bcrypt, user))
conn.commit()
TIME=time.strftime("%A %d %B %Y %H:%M:%S")
IP=request.environ['REMOTE_ADDR']
CLIENT_PLATFORM=request.headers.get('User-Agent')
log=TIME + ' - ' + IP + ' - ' + UTILISATEUR + ' - ' + CLIENT_PLATFORM + '\n' + '---> ' + "Changement du mot de passe" + '\n'
append_to_log(log, UTILISATEUR)
flash(u'Votre mot de passe a été changé', 'succes')
log=TIME + ' - ' + IP + ' - ' + user + ' - ' + CLIENT_PLATFORM + '\n' + '---> ' + "Changement du mot de passe" + '\n'
append_to_log(log, user)
flash(u'Votre mot de passe a été changé', 'success')
else:
if not( valid_passwd(password) ):
flash(u'Le mot de passe ne peut pas contenir les caractères " et &', 'error')
elif password == "":
flash(u' Vous ne pouvez pas ne pas mettre de mot de passe ou un mot de passe vide', 'error')
else:
flash(u'Les mot de passes ne sont pas identique :/ ', 'error')
flash(u'Les mot de passes ne sont pas identiques :/ ', 'error')
conn.close()
return render_template('mailbox.html',
if not(account['totp']):
account['totp'] = random_base32()
shared_key_validate = False
return render_template('mypassword.html',
section="Profil",
address=mailbox['Mail'],
alias=mailbox['alias'],
username=UTILISATEUR)
address=account['Mail'],
alias=account['alias'],
totp_shared_key=account['totp'],
shared_key_validate=shared_key_validate,
username=user)
else :
return redirect(BASE_URL, code=401)
@@ -253,7 +265,8 @@ def change_passwd_lost(token) :
log=TIME + ' - ' + IP + ' - ' + user + ' - ' + CLIENT_PLATFORM + '\n' + '---> ' + "Changement du mot de passe" + '\n'
append_to_log(log, user)
flash(u'Votre mot de passe a été changé', 'succes')
cursor.execute("""UPDATE users set Lost_password_token='' where name=?""", (user,))
cursor.execute("""UPDATE users SET Lost_password_token='' where name=?""", (user,))
conn.commit()
conn.close()
resp = redirect(url_for('loginlogout.login'))
@@ -273,7 +286,42 @@ def change_passwd_lost(token) :
return redirect(BASE_URL, code=401)
@profil.route('/set_totp/', methods=['POST'])
def set_totp():
if 'username' in session:
user='%s' % escape(session['username'])
conn = sqlite3.connect(DATABASE) # Connexion à la base de donnée
cursor = conn.cursor() # Création de l'objet "curseur"
shared_key = request.form['shared_key']
code_totp = request.form['code_totp']
if totp_is_valid(shared_key, code_totp) and code_totp !="" and shared_key != "":
print("shared_key: " +shared_key)
cursor.execute("""UPDATE users SET totp=? WHERE name=?""", (shared_key, user,))
conn.commit()
flash(u'Votre mot de passe à usage unique est configuré et actif.', 'success')
else:
flash(u'Le code de validation totp n\'est pas valide.', 'error')
conn.close()
return redirect(url_for('profil.change_passwd', _external=True))
else:
return redirect(BASE_URL, code=401)
@profil.route('/del_totp/', methods=['GET'])
def del_totp():
if 'username' in session:
user='%s' % escape(session['username'])
conn = sqlite3.connect(DATABASE) # Connexion à la base de donnée
cursor = conn.cursor() # Création de l'objet "curseur"
cursor.execute("""UPDATE users SET totp="" WHERE name=?""", (user,))
conn.commit()
conn.close()
return redirect(url_for('profil.change_passwd', _external=True))
@profil.route('/deltoken-password-lost/<token>', methods=['GET','POST'] )
def deltoken_passwd_lost(token) :
@@ -415,6 +463,8 @@ def invitation():
else:
return redirect(BASE_URL, code=401)
@profil.route('/gen_token/', methods=['GET'])
def generate_token():
if 'username' in session: