Add login_required decorator

This commit is contained in:
2025-12-01 02:19:46 +01:00
parent 57c2fb4ce9
commit e14677e701
10 changed files with 569 additions and 584 deletions

View File

@@ -8,7 +8,7 @@ import os
from shutil import copy
from socket import gethostname
from flask_bcrypt import Bcrypt
from tools.utils import email_disp, append_to_log, gen_token, valid_passwd, valid_token_register, get_user_by_token, totp_is_valid
from tools.utils import email_disp, append_to_log, gen_token, valid_passwd, valid_token_register, get_user_by_token, totp_is_valid, login_required
from pyotp import random_base32
import qrcode
@@ -38,54 +38,51 @@ BACKUP_TIME = app.config['BACKUP_TIME']
@profil.route( '/profil/<user>/<img>', methods=['GET'] )
@login_required
def profil_img(user, img) :
if 'username' in session :
return send_from_directory( os.path.join(DOSSIER_PERSO, user, 'profile'), img )
else:
return redirect(BASE_URL, code=401)
return send_from_directory( os.path.join(DOSSIER_PERSO, user, 'profile'), img )
@profil.route('/profil/', methods=['GET','POST'])
@login_required
def profile() :
if 'username' in session :
user='%s' % escape(session['username'])
conn = sqlite3.connect(DATABASE) # Connexion à la base de donnée
cursor = conn.cursor() # Création de l'objet "curseur"
cursor.execute("""SELECT avatar, nom, prenom, age, Mail_rescue FROM users WHERE name=?""", (user,))
tmp = (cursor.fetchone())
profil_user = dict()
profil_user['avatar'] = tmp[0]
profil_user['nom'] = tmp[1]
profil_user['prenom'] = tmp[2]
profil_user['age'] = tmp[3]
profil_user['mail_rescue'] = tmp[4]
conn.close()
user='%s' % escape(session['username'])
conn = sqlite3.connect(DATABASE) # Connexion à la base de donnée
cursor = conn.cursor() # Création de l'objet "curseur"
cursor.execute("""SELECT avatar, nom, prenom, age, Mail_rescue FROM users WHERE name=?""", (user,))
tmp = (cursor.fetchone())
profil_user = dict()
profil_user['avatar'] = tmp[0]
profil_user['nom'] = tmp[1]
profil_user['prenom'] = tmp[2]
profil_user['age'] = tmp[3]
profil_user['mail_rescue'] = tmp[4]
conn.close()
if request.method == 'POST' :
if request.method == 'POST' :
f = request.files['fic']
f = request.files['fic']
if request.form['theme'] != "Default":
copy( "static/vendors/picocss/pico.fluid.classless."+request.form['theme']+".min.css",
DOSSIER_PERSO+ user +'/theme.min.css' )
if request.form['theme'] != "Default":
copy( "static/vendors/picocss/pico.fluid.classless."+request.form['theme']+".min.css",
DOSSIER_PERSO+ user +'/theme.min.css' )
if request.form['nom']:
if request.form['nom']:
profil_user['nom'] = request.form['nom']
if request.form['prenom']:
if request.form['prenom']:
profil_user['prenom'] = request.form['prenom']
if request.form['age']:
if request.form['age']:
profil_user['age'] = request.form['age']
if '@' in request.form['mail_rescue']:
if '@' in request.form['mail_rescue']:
if len(request.form['mail_rescue']) > 4:
profil_user['mail_rescue'] = request.form['mail_rescue']
profil_user['mail_rescue'] = request.form['mail_rescue']
else:
flash(u'Adresse de courriel invalide', 'error')
else:
flash(u'Adresse de courriel invalide', 'error')
else:
flash(u'Adresse de courriel de secour invalide', 'error')
if f: # On vérifie qu'un fichier a bien été envoyé
if f: # On vérifie qu'un fichier a bien été envoyé
nom = secure_filename(f.filename)
f.save(DOSSIER_PERSO + user + '/profile/' + nom)
image = DOSSIER_PERSO + user + '/profile/' + nom
@@ -102,7 +99,7 @@ def profile() :
conn.close()
flash(u'Image de profil mise à jour', 'success')
else:
else:
conn = sqlite3.connect(DATABASE) # Connexion à la base de donnée
cursor = conn.cursor() # Création de l\'objet "curseur"
cursor.execute("UPDATE users SET nom=?, prenom=?, age=?, mail_rescue=? WHERE name=?",
@@ -114,56 +111,53 @@ def profile() :
return render_template('profil.html',
return render_template('profil.html',
section="Profil",
profil=profil_user,
username=user)
else :
return redirect(BASE_URL, code=401)
@profil.route('/profil/homepage', methods=['GET'] )
@login_required
def homepage():
if 'username' in session :
username='%s' % escape(session['username'])
return render_template('homepage.html',
section="Profil",
username=username)
username='%s' % escape(session['username'])
return render_template('homepage.html',
section="Profil",
username=username)
@profil.route('/profil/change-password/', methods=['GET','POST'] )
@login_required
def change_passwd() :
if 'username' in session:
user='%s' % escape(session['username'])
conn = sqlite3.connect(DATABASE) # Connexion à la base de donnée
cursor = conn.cursor() # Création de l'objet "curseur"
cursor.execute("""SELECT Mail, alias, xmpp, totp FROM users WHERE name=?""", (user,))
tmp = cursor.fetchone()
shared_key_validate=True
account = dict()
account['Mail'] = tmp[0]
account['alias'] = tmp[1]
account['xmpp'] = tmp[2]
account['totp'] = tmp[3]
user='%s' % escape(session['username'])
conn = sqlite3.connect(DATABASE) # Connexion à la base de donnée
cursor = conn.cursor() # Création de l'objet "curseur"
cursor.execute("""SELECT Mail, alias, xmpp, totp FROM users WHERE name=?""", (user,))
tmp = cursor.fetchone()
shared_key_validate=True
account = dict()
account['Mail'] = tmp[0]
account['alias'] = tmp[1]
account['xmpp'] = tmp[2]
account['totp'] = tmp[3]
if request.method == 'POST' :
if request.method == 'POST' :
password = request.form['password']
password_confirm = request.form['passwd_confirm']
password = request.form['password']
password_confirm = request.form['passwd_confirm']
if not(password == "") and password == password_confirm and valid_passwd(password):
if not(password == "") and password == password_confirm and valid_passwd(password):
mail_passwd_change = 0
xmpp_passwd_change = 0
passwd = request.form['password']
if MAIL_SERVER:
cmd = SETUID+ ' set_mail_passwd ' + '"'+account['Mail']+'" '+ '"'+passwd+'"'
mail_passwd_change = os.system(cmd)
cmd = SETUID+ ' set_mail_passwd ' + '"'+account['Mail']+'" '+ '"'+passwd+'"'
mail_passwd_change = os.system(cmd)
if XMPP_SERVER:
@@ -185,34 +179,32 @@ def change_passwd() :
log=TIME + ' - ' + IP + ' - ' + user + ' - ' + CLIENT_PLATFORM + '\n' + '---> ' + "Changement du mot de passe" + '\n'
append_to_log(log, user)
flash(u'Votre mot de passe a été changé', 'success')
else:
if not( valid_passwd(password) ):
flash(u'Le mot de passe ne peut pas contenir les caractères " et &', 'error')
elif password == "":
flash(u' Vous ne pouvez pas ne pas mettre de mot de passe ou un mot de passe vide', 'error')
else:
flash(u'Les mot de passes ne sont pas identiques :/ ', 'error')
else:
if not( valid_passwd(password) ):
flash(u'Le mot de passe ne peut pas contenir les caractères " et &', 'error')
elif password == "":
flash(u' Vous ne pouvez pas ne pas mettre de mot de passe ou un mot de passe vide', 'error')
else:
flash(u'Les mot de passes ne sont pas identiques :/ ', 'error')
conn.close()
conn.close()
if not(account['totp']):
account['totp'] = random_base32()
img = qrcode.make('otpauth://totp/'+BASE_URL+'?secret='+account['totp'])
img.save(DOSSIER_PERSO + user + "/totp.png")
shared_key_validate = False
if not(account['totp']):
account['totp'] = random_base32()
img = qrcode.make('otpauth://totp/'+BASE_URL+'?secret='+account['totp'])
img.save(DOSSIER_PERSO + user + "/totp.png")
shared_key_validate = False
return render_template('mypassword.html',
section="Profil",
address=account['Mail'],
alias=account['alias'],
totp_shared_key=account['totp'],
shared_key_validate=shared_key_validate,
username=user,
base_url=BASE_URL)
else :
return redirect(BASE_URL, code=401)
return render_template('mypassword.html',
section="Profil",
address=account['Mail'],
alias=account['alias'],
totp_shared_key=account['totp'],
shared_key_validate=shared_key_validate,
username=user,
base_url=BASE_URL)
@profil.route('/change-password-lost/<token>', methods=['GET','POST'] )
def change_passwd_lost(token) :
@@ -292,56 +284,51 @@ def change_passwd_lost(token) :
return redirect(BASE_URL, code=401)
@profil.route('/set_totp/', methods=['POST'])
@login_required
def set_totp():
if 'username' in session:
user='%s' % escape(session['username'])
conn = sqlite3.connect(DATABASE) # Connexion à la base de donnée
cursor = conn.cursor() # Création de l'objet "curseur"
user='%s' % escape(session['username'])
conn = sqlite3.connect(DATABASE) # Connexion à la base de donnée
cursor = conn.cursor() # Création de l'objet "curseur"
shared_key = request.form['shared_key']
code_totp = request.form['code_totp']
shared_key = request.form['shared_key']
code_totp = request.form['code_totp']
if totp_is_valid(shared_key, code_totp) and code_totp !="" and shared_key != "":
print("shared_key: " +shared_key)
cursor.execute("""UPDATE users SET totp=? WHERE name=?""", (shared_key, user,))
conn.commit()
img = qrcode.make('otpauth://totp/'+BASE_URL+'?secret='+shared_key)
img.save(DOSSIER_PERSO + user + "/totp.png")
flash(u'Votre mot de passe à usage unique est configuré et actif.', 'success')
else:
flash(u'Le code de validation totp n\'est pas valide.', 'error')
conn.close()
return redirect(url_for('profil.change_passwd', _external=True))
if totp_is_valid(shared_key, code_totp) and code_totp !="" and shared_key != "":
print("shared_key: " +shared_key)
cursor.execute("""UPDATE users SET totp=? WHERE name=?""", (shared_key, user,))
conn.commit()
img = qrcode.make('otpauth://totp/'+BASE_URL+'?secret='+shared_key)
img.save(DOSSIER_PERSO + user + "/totp.png")
flash(u'Votre mot de passe à usage unique est configuré et actif.', 'success')
else:
return redirect(BASE_URL, code=401)
flash(u'Le code de validation totp n\'est pas valide.', 'error')
conn.close()
return redirect(url_for('profil.change_passwd', _external=True))
@profil.route('/del_totp/', methods=['GET'])
@login_required
def del_totp():
if 'username' in session:
user='%s' % escape(session['username'])
conn = sqlite3.connect(DATABASE) # Connexion à la base de donnée
cursor = conn.cursor() # Création de l'objet "curseur"
cursor.execute("""UPDATE users SET totp="" WHERE name=?""", (user,))
conn.commit()
conn.close()
return redirect(url_for('profil.change_passwd', _external=True))
user='%s' % escape(session['username'])
conn = sqlite3.connect(DATABASE) # Connexion à la base de donnée
cursor = conn.cursor() # Création de l'objet "curseur"
cursor.execute("""UPDATE users SET totp="" WHERE name=?""", (user,))
conn.commit()
conn.close()
return redirect(url_for('profil.change_passwd', _external=True))
@profil.route('/totp.png', methods=['GET'])
@login_required
def totp_qrcode():
if 'username' in session :
user='%s' % escape(session['username'])
return send_file(
os.path.join(DOSSIER_PERSO, user, "totp.png"), "totp.png")
else :
return redirect(BASE_URL, code=401)
user='%s' % escape(session['username'])
return send_file(
os.path.join(DOSSIER_PERSO, user, "totp.png"), "totp.png")
@profil.route('/deltoken-password-lost/<token>', methods=['GET','POST'] )
def deltoken_passwd_lost(token) :
if valid_token_register(token, "Lost password"):
user = get_user_by_token(token, "Lost password")
conn = sqlite3.connect(DATABASE) # Connexion à la base de donnée
@@ -358,106 +345,102 @@ def deltoken_passwd_lost(token) :
@profil.route('/invitation/', methods=['GET'])
@login_required
def invitation():
if 'username' in session:
UTILISATEUR='%s' % escape(session['username'])
conn = sqlite3.connect(DATABASE) # Connexion à la base de donnée
cursor = conn.cursor() # Création de l'objet "curseur"
cursor.execute("""SELECT Token, invitations FROM users WHERE name=?""", (UTILISATEUR,))
tmp = cursor.fetchone()
token = tmp[0]
if token:
url_invitation = BASE_URL + 'inscription/' + token
else:
url_invitation = ""
invitations_count = tmp[1]
conn.close()
return render_template('invitation.html',
section='Profil',
nb_invitation=invitations_count,
token=token,
url_invitation=url_invitation)
UTILISATEUR='%s' % escape(session['username'])
conn = sqlite3.connect(DATABASE) # Connexion à la base de donnée
cursor = conn.cursor() # Création de l'objet "curseur"
cursor.execute("""SELECT Token, invitations FROM users WHERE name=?""", (UTILISATEUR,))
tmp = cursor.fetchone()
token = tmp[0]
if token:
url_invitation = BASE_URL + 'inscription/' + token
else:
return redirect(BASE_URL, code=401)
url_invitation = ""
invitations_count = tmp[1]
conn.close()
return render_template('invitation.html',
section='Profil',
nb_invitation=invitations_count,
token=token,
url_invitation=url_invitation)
@profil.route('/gen_token/', methods=['GET'])
@login_required
def generate_token():
if 'username' in session:
UTILISATEUR='%s' % escape(session['username'])
conn = sqlite3.connect(DATABASE) # Connexion à la base de donnée
cursor = conn.cursor() # Création de l'objet "curseur"
token = gen_token("Invitation")
cursor.execute("UPDATE users SET Token=? WHERE name=?",
(token, UTILISATEUR))
conn.commit()
conn.close()
return redirect(BASE_URL+'invitation/')
else:
return redirect(BASE_URL, code=401)
UTILISATEUR='%s' % escape(session['username'])
conn = sqlite3.connect(DATABASE) # Connexion à la base de donnée
cursor = conn.cursor() # Création de l'objet "curseur"
token = gen_token("Invitation")
cursor.execute("UPDATE users SET Token=? WHERE name=?",
(token, UTILISATEUR))
conn.commit()
conn.close()
return redirect(BASE_URL+'invitation/')
@profil.route( '/delete_me/', methods=['GET','POST'])
@login_required
def delete_account():
if 'username' in session :
UTILISATEUR='%s'% escape(session['username'])
resp = render_template('delete_account.html', time_backup=BACKUP_TIME)
if request.method == 'POST' :
conn = sqlite3.connect(DATABASE) # Connexion à la base de donnée
cursor = conn.cursor() # Création de l'objet "curseur"
cursor.execute("""SELECT passwd FROM users WHERE name=?""", (UTILISATEUR,))
passwd = cursor.fetchone()[0]
cursor.execute("""SELECT mail FROM users WHERE name=?""", (UTILISATEUR,))
mail = cursor.fetchone()[0]
conn.close()
password = request.form['passwd']
if bcrypt.check_password_hash(passwd, password) is True:
not_error = True
UTILISATEUR='%s'% escape(session['username'])
resp = render_template('delete_account.html', time_backup=BACKUP_TIME)
if request.method == 'POST' :
conn = sqlite3.connect(DATABASE) # Connexion à la base de donnée
cursor = conn.cursor() # Création de l'objet "curseur"
cursor.execute("""SELECT passwd FROM users WHERE name=?""", (UTILISATEUR,))
passwd = cursor.fetchone()[0]
cursor.execute("""SELECT mail FROM users WHERE name=?""", (UTILISATEUR,))
mail = cursor.fetchone()[0]
conn.close()
password = request.form['passwd']
if bcrypt.check_password_hash(passwd, password) is True:
not_error = True
if MAIL_SERVER:
try:
cmd = SETUID + ' set_mail_passwd del ' + '"'+mail+'"'
print(cmd)
os.system(cmd)
except:
not_error = False
flash(u'Erreur lors de la suppression de votre compte Mail.', 'error')
if MAIL_SERVER:
try:
cmd = SETUID + ' set_mail_passwd del ' + '"'+mail+'"'
print(cmd)
os.system(cmd)
except:
not_error = False
flash(u'Erreur lors de la suppression de votre compte Mail.', 'error')
if XMPP_SERVER:
try:
tmp = mail.split('@')
cmd = SETUID+ ' prosodyctl deluser ' "'"+tmp[0]+"' " + "'"+tmp[1]+"'"
os.system(cmd)
except:
not_error = False
flash(u'Erreur lors de la suppression de votre compte XMPP.', 'error')
if XMPP_SERVER:
try:
tmp = mail.split('@')
cmd = SETUID+ ' prosodyctl deluser ' "'"+tmp[0]+"' " + "'"+tmp[1]+"'"
os.system(cmd)
except:
not_error = False
flash(u'Erreur lors de la suppression de votre compte XMPP.', 'error')
if not_error:
try:
cmd = 'rm -r ' + DATAS_USER + '/' + UTILISATEUR
if os.system(cmd) != 0:
raise TypeError("Remove directory error")
except:
flash(u'Erreur lors de la suppression de votre dossier utilisateur.', 'error')
if not_error:
try:
cmd = 'rm -r ' + DATAS_USER + '/' + UTILISATEUR
if os.system(cmd) != 0:
raise TypeError("Remove directory error")
except:
flash(u'Erreur lors de la suppression de votre dossier utilisateur.', 'error')
try:
conn = sqlite3.connect(DATABASE)
cursor = conn.cursor()
cursor.execute("""DELETE FROM users WHERE name=?""", (UTILISATEUR,))
cursor.execute("""DELETE FROM posts WHERE author=?""", (UTILISATEUR,))
conn.commit()
conn.close()
except:
flash(u'Erreur lors de la suppression de votre compte.', 'error')
else:
flash(u'Désinscription réalisé avec succés, y\'a plus rien !', 'succes')
resp = redirect(url_for('loginlogout.logout'))
else:
flash(u'Mauvais mot de passe', 'error')
return resp
try:
conn = sqlite3.connect(DATABASE)
cursor = conn.cursor()
cursor.execute("""DELETE FROM users WHERE name=?""", (UTILISATEUR,))
cursor.execute("""DELETE FROM posts WHERE author=?""", (UTILISATEUR,))
conn.commit()
conn.close()
except:
flash(u'Erreur lors de la suppression de votre compte.', 'error')
else:
flash(u'Désinscription réalisé avec succés, y\'a plus rien !', 'succes')
resp = redirect(url_for('loginlogout.logout'))
else:
flash(u'Mauvais mot de passe', 'error')
return resp