311 lines
14 KiB
Python
311 lines
14 KiB
Python
# -*- coding: utf-8 -*-
|
|
|
|
from flask import Blueprint, render_template, session, redirect, url_for, request, flash, abort, Flask
|
|
import time
|
|
from markupsafe import escape
|
|
import sqlite3
|
|
from markdown import markdown
|
|
from tools.filesutils import getFileSizeKo
|
|
import string
|
|
from shutil import copy
|
|
from tools.utils import login_required
|
|
|
|
blog = Blueprint('blog', __name__, template_folder='templates')
|
|
|
|
app = Flask( 'pywallter' )
|
|
app.config.from_pyfile('config.py')
|
|
|
|
|
|
########################### Variables Globales #################################
|
|
extensionimg = app.config['EXT_IMG']
|
|
DATABASE = app.config['DATABASE']
|
|
BASE_URL = app.config['BASE_URL']
|
|
DOSSIER_PERSO = app.config['DOSSIER_APP']+'/'
|
|
DOSSIER_PUBLIC = app.config['DOSSIER_PUBLIC']+'/'
|
|
TITLE_SERVER = app.config['TITLE_SERVER']
|
|
DESC_SERVER = app.config['DESC_SERVER']
|
|
################################################################################
|
|
|
|
@blog.route('/myblog/new-article/', methods=['GET', 'POST'])
|
|
@login_required
|
|
def new_article():
|
|
user = '%s'% escape(session['username'])
|
|
if request.method == 'POST':
|
|
title = request.form['title']
|
|
subtitle = request.form['subtitle']
|
|
category = request.form['category']
|
|
content = request.form['content']
|
|
status = request.form['status']
|
|
post_date = time.strftime("%d/%m/%Y %H:%M:%S")
|
|
if 'blog-unified' in request.form.keys():
|
|
status = status+'_unified'
|
|
|
|
conn = sqlite3.connect(DATABASE) # Connexion à la base de donnée
|
|
cursor = conn.cursor() # Création de l'objet "curseur"
|
|
cursor.execute("""INSERT INTO Blog_posts(title, subtitle, category, content, creation_date, author, status) VALUES(?, ?, ?, ?, ?, ?, ?)""", (title, subtitle, category, content, post_date, user, status)) # Insérer des valeurs
|
|
conn.commit()
|
|
|
|
return redirect(url_for('blog.list_articles_blog'))
|
|
else:
|
|
return render_template('new_article_blog.html')
|
|
|
|
@blog.route('/myblog/edit/<title>', methods=['GET', 'POST'])
|
|
@login_required
|
|
def edit(title):
|
|
user='%s'% escape(session['username'])
|
|
folder_blog = DOSSIER_PERSO + user + "/blog/articles/"
|
|
if request.method == 'POST' :
|
|
title = request.form['title']
|
|
subtitle = request.form['subtitle']
|
|
category = request.form['category']
|
|
newcontent = request.form['content']
|
|
newstatus = request.form['status']
|
|
updated = time.strftime("%d/%m/%Y %H:%M:%S")
|
|
conn = sqlite3.connect(DATABASE)
|
|
cursor = conn.cursor()
|
|
if 'blog-unified' in request.form.keys():
|
|
newstatus = newstatus+'_unified'
|
|
|
|
cursor.execute("""UPDATE Blog_posts SET title=?, subtitle=?, category=?, last_updated=?, status=?, content=? WHERE title=? AND author=?""", (title, subtitle, category, updated, newstatus, newcontent, title, user))
|
|
conn.commit()
|
|
conn.close()
|
|
return redirect(url_for('blog.list_articles_blog'))
|
|
else:
|
|
conn = sqlite3.connect(DATABASE) # Connexion à la base de donnée
|
|
cursor = conn.cursor() # Création de l'objet "curseur"
|
|
cursor.execute("""SELECT title, subtitle, category, content, status FROM Blog_posts WHERE title=? AND author=?""", (title, user))
|
|
oldpost = cursor.fetchone()
|
|
conn.close()
|
|
post = dict(title=oldpost[0], subtitle=oldpost[1], categoory=oldpost[2], content=oldpost[3], status=oldpost[4])
|
|
return render_template('edit_article.html',
|
|
section='Post-it',
|
|
oldpost=post)
|
|
|
|
|
|
@blog.route('/myblog/list-articles/', methods=['GET'])
|
|
@login_required
|
|
def list_articles_blog():
|
|
user = '%s'% escape(session['username'])
|
|
conn = sqlite3.connect(DATABASE) # Connexion à la base de donnée
|
|
cursor = conn.cursor() # Création de l'objet "curseur"
|
|
cursor.execute("""SELECT title, subtitle, creation_date, last_updated, status FROM Blog_posts WHERE author=? """, (user,) )
|
|
list_posts=cursor.fetchall()
|
|
posts=list()
|
|
nb_articles=0
|
|
for post in list_posts:
|
|
posts = [dict(title=post[0],
|
|
subtitle=post[1],
|
|
time=post[2],
|
|
last_updated=post[3],
|
|
status=post[4])] + posts
|
|
nb_articles = nb_articles + 1
|
|
|
|
return render_template('list_articles.html',
|
|
section="Articles",
|
|
list_posts=posts,
|
|
nb_articles=nb_articles )
|
|
|
|
@blog.route('/myblog/delete/<title>')
|
|
@login_required
|
|
def delete(title):
|
|
title = escape(title)
|
|
user='%s'% escape(session['username'])
|
|
folder_blog = DOSSIER_PERSO + user + "/blog/articles/"
|
|
folder_blog_public = DOSSIER_PUBLIC + user + "/blog/articles/"
|
|
conn = sqlite3.connect(DATABASE) # Connexion à la base de donnée
|
|
cursor = conn.cursor() # Création de l'objet "curseur"
|
|
cursor.execute("""DELETE FROM Blog_posts WHERE title=? AND author=?""", (title, user))
|
|
conn.commit()
|
|
conn.close()
|
|
return redirect(url_for('blog.list_articles_blog'))
|
|
|
|
|
|
@blog.route('/myblog/personnalize/', methods=['GET', 'POST'])
|
|
@login_required
|
|
def personnalize_blog():
|
|
user='%s' % escape(session['username'])
|
|
conn = sqlite3.connect(DATABASE) # Connexion à la base de donnée
|
|
cursor = conn.cursor() # Création de l'objet "curseur"
|
|
cursor.execute("""SELECT blog_unified, blog_theme FROM users WHERE name=?""", (user,))
|
|
blog_info = cursor.fetchone()
|
|
conn.close()
|
|
blog_unified = blog_info[0]
|
|
if request.method == 'POST' :
|
|
|
|
f = request.files['personnal-blog-theme']
|
|
|
|
blog_theme = escape(request.form['blog-theme'])
|
|
|
|
|
|
if blog_theme != "Default":
|
|
copy( "static/blog-"+blog_theme+".css",
|
|
DOSSIER_PERSO+ user +'blog.css' )
|
|
|
|
if f: # On vérifie qu'un fichier a bien été envoyé
|
|
nom = secure_filename(f.filename)
|
|
f.save(DOSSIER_PERSO + user + nom)
|
|
else:
|
|
conn = sqlite3.connect(DATABASE) # Connexion à la base de donnée
|
|
cursor = conn.cursor() # Création de l\'objet "curseur"
|
|
cursor.execute("UPDATE users SET blog_unified=?, blog_theme=? WHERE name=?", (blog_unified, blog_theme, user))
|
|
conn.commit()
|
|
|
|
return render_template('personnalize_blog.html', section='personnalize_blog', blog_unified=blog_unified, blog_theme=blog_info[1])
|
|
|
|
@blog.route('/myblog/', methods=['GET'])
|
|
@login_required
|
|
def viewmyblog():
|
|
user='%s' % escape(session['username'])
|
|
conn = sqlite3.connect(DATABASE) # Connexion à la base de donnée
|
|
cursor = conn.cursor() # Création de l'objet "curseur"
|
|
cursor.execute("""SELECT title, subtitle, creation_date, author, status FROM Blog_posts WHERE author=? AND status!='draft'""", (user,))
|
|
list_posts=cursor.fetchall()
|
|
posts=list()
|
|
id=0
|
|
conn.close()
|
|
if list_posts != None:
|
|
for post in list_posts:
|
|
posts=[dict(title=post[0], subtitle=post[1], creation_date=post[2], author=post[3], status=post[4])] + posts
|
|
|
|
return render_template('index_blog.html', section='Blog', posts=posts)
|
|
|
|
|
|
@blog.route('/private-blog/', methods=['GET'])
|
|
@login_required
|
|
def view_internal():
|
|
conn = sqlite3.connect(DATABASE) # Connexion à la base de donnée
|
|
cursor = conn.cursor() # Création de l'objet "curseur"
|
|
cursor.execute("""SELECT title, subtitle, content, creation_date, last_updated, author, status FROM Blog_posts WHERE status='private_unified' OR status='public_unified' """ )
|
|
list_posts=cursor.fetchall()
|
|
conn.close()
|
|
posts=list()
|
|
id=0
|
|
if list_posts != None:
|
|
for post in list_posts:
|
|
posts = [dict(title=post[0], subtitle=post[1], content=post[2], creation_date=post[3], last_updated=post[4], author=post[5], status=post[6] )] + posts
|
|
else:
|
|
return redirect(BASE_URL, code=404)
|
|
|
|
return render_template('index_blog.html', section='Blog', posts=posts)
|
|
|
|
@blog.route('/blog/', methods=['GET'])
|
|
def view():
|
|
conn = sqlite3.connect(DATABASE) # Connexion à la base de donnée
|
|
cursor = conn.cursor() # Création de l'objet "curseur"
|
|
cursor.execute("""SELECT title, subtitle, creation_date, author, status FROM Blog_posts WHERE status='public_unified'""" )
|
|
list_posts=cursor.fetchall()
|
|
posts=list()
|
|
id=0
|
|
conn.close()
|
|
if list_posts != None:
|
|
for post in list_posts:
|
|
posts=[dict(title=post[0], subtitle=post[1], creation_date=post[2], author=post[3], status=post[4])] + posts
|
|
|
|
return render_template('index_blog.html', section='Blog', posts=posts)
|
|
|
|
|
|
@blog.route('/blog/<author>/', methods=['GET'])
|
|
def viewuser(author):
|
|
author = escape(author)
|
|
conn = sqlite3.connect(DATABASE) # Connexion à la base de donnée
|
|
cursor = conn.cursor() # Création de l'objet "curseur"
|
|
if 'username' in session :
|
|
cursor.execute("""SELECT title, subtitle, creation_date, last_updated, author, status FROM Blog_posts WHERE author=? AND status != 'draft' """, (author,))
|
|
else:
|
|
cursor.execute("""SELECT title, subtitle, creation_date, last_updated, author, status FROM Blog_posts WHERE author=? AND status='public' OR status='public_unified' """, (author,))
|
|
list_posts=cursor.fetchall()
|
|
posts=None
|
|
id=0
|
|
conn.close()
|
|
if list_posts != None:
|
|
posts=list()
|
|
for post in list_posts:
|
|
posts=[dict(title=post[0], subtitle=post[1], creation_date=post[2], last_updated=post[3], author=post[4], status=post[5])] + posts
|
|
|
|
|
|
return render_template('index_author_blog.html', section='Blog', posts=posts)
|
|
|
|
@blog.route('/blog/<author>/rss.xml', methods=['GET'])
|
|
def viewauthorrss(author):
|
|
conn = sqlite3.connect(DATABASE) # Connexion à la base de donnée
|
|
cursor = conn.cursor() # Création de l'objet "curseur"
|
|
cursor.execute("""SELECT MAX (creation_date) FROM Blog_posts WHERE author=? AND status='public_unified' OR status='public'""", (author, ) )
|
|
last_article_date = cursor.fetchone()
|
|
cursor.execute("""SELECT title, subtitle, content, creation_date, author, status FROM Blog_posts WHERE author=? AND status='public_unified' OR status='public'""", (author, ) )
|
|
list_posts=cursor.fetchall()
|
|
posts=list()
|
|
id=0
|
|
conn.close()
|
|
if list_posts != None:
|
|
last_build=last_article_date[0]
|
|
for post in list_posts:
|
|
posts=[dict(title=post[0], subtitle=post[1], content=markdown(post[2]), creation_date=post[3], author=post[4], status=post[5])] + posts
|
|
|
|
return render_template('blog_rss.xml',
|
|
base_url=BASE_URL,
|
|
blog_name=author,
|
|
last_build_date=last_build,
|
|
posts=posts)
|
|
|
|
|
|
@blog.route('/blog/private_unified/<username>/<title>', methods=['GET'])
|
|
@blog.route('/blog/private/<username>/<title>', methods=['GET'])
|
|
@login_required
|
|
def viewPrivateArticle(username, title):
|
|
user = escape(username)
|
|
title = escape(title)
|
|
conn = sqlite3.connect(DATABASE) # Connexion à la base de donnée
|
|
cursor = conn.cursor() # Création de l'objet "curseur"
|
|
cursor.execute("""SELECT title, subtitle, content, creation_date, last_updated, author, status FROM Blog_posts WHERE author=? AND title=? AND status!='draft' """, (user, title))
|
|
post = cursor.fetchone()
|
|
conn.close()
|
|
if post != None:
|
|
post_info = (dict(title=post[0], subtitle=post[1], creation_date=post[3], last_updated=post[4],author=post[5]))
|
|
content= markdown(post[2])
|
|
return render_template('blog.html', post_info=post_info, content=content)
|
|
else:
|
|
return redirect(url_for('blog'), code=404)
|
|
|
|
|
|
@blog.route('/blog/public_unified/<username>/<title>', methods=['GET'])
|
|
@blog.route('/blog/public/<username>/<title>', methods=['GET'])
|
|
def viewArticle(username, title):
|
|
user = username
|
|
conn = sqlite3.connect(DATABASE) # Connexion à la base de donnée
|
|
cursor = conn.cursor() # Création de l'objet "curseur"
|
|
cursor.execute("""SELECT title, subtitle, content, creation_date, last_updated, author FROM Blog_posts WHERE author=? AND title=? AND status='public_unified' """, (user, title) )
|
|
post = cursor.fetchone()
|
|
conn.close()
|
|
if post != None:
|
|
post_info = (dict(title=post[0], subtitle=post[1], creation_date=post[3], last_updated=post[4],author=post[5]))
|
|
content= markdown(post[2])
|
|
|
|
return render_template('blog.html', post_info=post_info, content=content)
|
|
else:
|
|
return redirect(url_for('blog.view'), code=404)
|
|
|
|
@blog.route('/blog/rss.xml', methods=['GET'])
|
|
def viewrss():
|
|
conn = sqlite3.connect(DATABASE) # Connexion à la base de donnée
|
|
cursor = conn.cursor() # Création de l'objet "curseur"
|
|
cursor.execute("""SELECT MAX(creation_date) FROM Blog_posts WHERE status='public_unified'""")
|
|
last_article_date = cursor.fetchone()
|
|
print (last_article_date[0])
|
|
cursor.execute("""SELECT title, subtitle, content, creation_date, author, status FROM Blog_posts WHERE status='public_unified'""" )
|
|
list_posts=cursor.fetchall()
|
|
posts=list()
|
|
id=0
|
|
conn.close()
|
|
if list_posts != None:
|
|
last_build=last_article_date[0]
|
|
for post in list_posts:
|
|
posts=[dict(title=post[0], subtitle=post[1], content=markdown(post[2]), creation_date=post[3], author=post[4], status=post[5])] + posts
|
|
|
|
return render_template('blog_rss.xml',
|
|
base_url=BASE_URL,
|
|
blog_name= TITLE_SERVER,
|
|
last_build_date=last_build,
|
|
blog_description=DESC_SERVER,
|
|
posts=posts)
|
|
|