# -*- coding: utf-8 -*- from flask import Blueprint, render_template, session, redirect, url_for, request, flash, abort, Flask, make_response import time from markupsafe import escape import sqlite3 from markdown import markdown from tools.filesutils import getFileSizeKo import string from shutil import copy from tools.utils import login_required blog = Blueprint('blog', __name__, template_folder='templates') app = Flask( 'pywallter' ) app.config.from_pyfile('config.py') ########################### Variables Globales ################################# extensionimg = app.config.get('EXT_IMG') DATABASE = app.config.get('DATABASE') BASE_URL = app.config.get('BASE_URL') DOSSIER_PERSO = app.config.get('DOSSIER_APP')+'/' DOSSIER_PUBLIC = app.config.get('DOSSIER_PUBLIC')+'/' TITLE_SERVER = app.config.get('TITLE_SERVER') DESC_SERVER = app.config.get('DESC_SERVER') MARKDOWN_EXT=["extra", "toc", "codehilite", "nl2br", "extra", "admonition", "sane_lists", "smarty"] ################################################################################ @blog.route('/myblog/new-article/', methods=['GET', 'POST']) @login_required def new_article(): user = '%s' % escape(session['username']) if request.method == 'POST': title_id = escape(request.form['title'].rstrip()) title = request.form['title'].rstrip() subtitle = request.form['subtitle'] category = request.form['category'] content = request.form['content'] status = request.form['status'] post_date = time.strftime("%d/%m/%Y %H:%M:%S") if 'blog-unified' in request.form.keys(): status = status+'_unified' conn = sqlite3.connect(DATABASE) # Connexion à la base de donnée cursor = conn.cursor() # Création de l'objet "curseur" cursor.execute("""INSERT INTO Blog_posts(title_id, title, subtitle, category, content, creation_date, author, status) VALUES(?, ?, ?, ?, ?, ?, ?, ?)""", (title_id, title, subtitle, category, content, post_date, user, status)) # Insérer des valeurs conn.commit() return redirect(url_for('blog.list_articles_blog')) else: return render_template('new_article_blog.html') @blog.route('/myblog/edit/', methods=['GET']) @login_required def edit(title_id): user = '%s' % escape(session['username']) conn = sqlite3.connect(DATABASE) # Connexion à la base de donnée cursor = conn.cursor() # Création de l'objet "curseur" cursor.execute("""SELECT title_id, title, subtitle, creation_date, category, content, status FROM Blog_posts WHERE title_id=? AND author=?""", (title_id, user)) oldpost = cursor.fetchone() post = dict(title_id=oldpost[0], title=oldpost[1], subtitle=oldpost[2], creation_date=oldpost[3], category=oldpost[4], content=oldpost[5], status=oldpost[6]) conn.close() return render_template('edit_article.html', section='Post-it', oldpost=post) @blog.route('/myblog/update_blogpost', methods=['POST']) @login_required def update(): user = '%s' % escape(session['username']) title_id = str(request.form['title_id']) title = str(request.form['title']) subtitle = str(request.form['subtitle']) creation_date = str(request.form['creation_date']) category = str(request.form['category']) content = str(request.form['content']) status = str(request.form['status']) updated = time.strftime("%d/%m/%Y %H:%M:%S") conn = sqlite3.connect(DATABASE) cursor = conn.cursor() if 'blog-unified' in request.form.keys(): status = status+'_unified' cursor.execute("""DELETE FROM Blog_posts WHERE title_id=? AND author=?""", (title_id, user)) cursor.execute("""INSERT INTO Blog_posts(title_id, title, subtitle, category, content, creation_date, last_updated, author, status)VALUES(?, ?, ?, ?, ?, ?, ?, ?, ?)""", (title_id, title, subtitle, category, content, creation_date, updated, user, status)) conn.commit() conn.close() post = dict(title_id=title_id, title=title, subtitle=subtitle, category=category, content=content, status=status) flash(u'Article mis à jour avec succès le: '+ updated , 'success' ) return render_template('edit_article.html', section='Post-it', oldpost=post) @blog.route('/myblog/list-articles/', methods=['GET']) @login_required def list_articles_blog(): user = '%s' % escape(session['username']) conn = sqlite3.connect(DATABASE) # Connexion à la base de donnée cursor = conn.cursor() # Création de l'objet "curseur" cursor.execute("""SELECT title_id, title, subtitle, creation_date, last_updated, status FROM Blog_posts WHERE author=? """, (user,) ) list_posts=cursor.fetchall() posts=list() nb_articles=0 for post in list_posts: posts = [dict(title_id=post[0], title=post[1], subtitle=post[2], time=post[3], last_updated=post[4], status=post[5])] + posts nb_articles = nb_articles + 1 return render_template('list_articles.html', section="Articles", list_posts=posts, nb_articles=nb_articles ) @blog.route('/myblog/delete/') @login_required def delete(title_id): title_id = '%s' % str(title_id).rstrip() user='%s'% escape(session['username']) folder_blog = DOSSIER_PERSO + user + "/blog/articles/" folder_blog_public = DOSSIER_PUBLIC + user + "/blog/articles/" conn = sqlite3.connect(DATABASE) # Connexion à la base de donnée cursor = conn.cursor() # Création de l'objet "curseur" cursor.execute("""DELETE FROM Blog_posts WHERE title_id=? AND author=?""", (title_id, user)) conn.commit() conn.close() return redirect(url_for('blog.list_articles_blog')) @blog.route('/myblog/personnalize/', methods=['GET', 'POST']) @login_required def personnalize_blog(): user='%s' % escape(session['username']) conn = sqlite3.connect(DATABASE) # Connexion à la base de donnée cursor = conn.cursor() # Création de l'objet "curseur" cursor.execute("""SELECT blog_theme FROM users WHERE name=?""", (user,)) blog_info = cursor.fetchone() conn.close() blog_unified = blog_info[0] if request.method == 'POST' : f = request.files['personnal-blog-theme'] blog_theme = str(request.form['blog-theme']) print(blog_theme) if blog_theme != "Default": copy( "static/blog-"+blog_theme+".css", DOSSIER_PERSO+ user +'/blog.css' ) if f: # On vérifie qu'un fichier a bien été envoyé nom = secure_filename(f.filename) f.save(DOSSIER_PERSO + user + nom) else: conn = sqlite3.connect(DATABASE) # Connexion à la base de donnée cursor = conn.cursor() # Création de l\'objet "curseur" cursor.execute("UPDATE users SET blog_theme=? WHERE name=?", (blog_theme, user)) conn.commit() return render_template('personnalize_blog.html', section='personnalize_blog', blog_theme=blog_info[0]) @blog.route('/myblog/', methods=['GET']) @login_required def viewmyblog(): user='%s' % escape(session['username']) conn = sqlite3.connect(DATABASE) # Connexion à la base de donnée cursor = conn.cursor() # Création de l'objet "curseur" cursor.execute("""SELECT title_id, title, subtitle, creation_date, author, status FROM Blog_posts WHERE author=? AND status!='draft'""", (user,)) list_posts=cursor.fetchall() posts=list() id=0 conn.close() if list_posts != None: for post in list_posts: posts=[dict(title_id=post[0], title=post[1], subtitle=post[2], creation_date=post[3], author=post[4], status=post[5])] + posts return render_template('index_blog.html', section='Blog', posts=posts, author=user) @blog.route('/private-blog/', methods=['GET']) @login_required def view_internal(): conn = sqlite3.connect(DATABASE) # Connexion à la base de donnée cursor = conn.cursor() # Création de l'objet "curseur" cursor.execute("""SELECT title_id, title, subtitle, content, creation_date, last_updated, author, status FROM Blog_posts WHERE status='private_unified' OR status='public_unified' """ ) list_posts=cursor.fetchall() conn.close() posts=list() id=0 if list_posts != None: for post in list_posts: posts = [dict(title_id=[0], title=post[1], subtitle=post[2], content=post[3], creation_date=post[4], last_updated=post[5], author=post[6], status=post[7] )] + posts else: return redirect(BASE_URL, code=404) return render_template('index_blog.html', section='Blog', posts=posts) @blog.route('/blog/', methods=['GET']) def view(): conn = sqlite3.connect(DATABASE) # Connexion à la base de donnée cursor = conn.cursor() # Création de l'objet "curseur" cursor.execute("""SELECT title, subtitle, creation_date, author, status FROM Blog_posts WHERE status='public_unified'""" ) list_posts=cursor.fetchall() posts=list() id=0 conn.close() if list_posts != None: for post in list_posts: posts=[dict(title_id=post[0], title=post[1], subtitle=post[2], creation_date=post[3], author=post[4], status=post[5])] + posts return render_template('index_blog.html', section='Blog', posts=posts) @blog.route('/blog//', methods=['GET']) def viewuser(author): author = escape(author) conn = sqlite3.connect(DATABASE) # Connexion à la base de donnée cursor = conn.cursor() # Création de l'objet "curseur" if 'username' in session : cursor.execute("""SELECT title_id, title, subtitle, creation_date, last_updated, author, status FROM Blog_posts WHERE author=? AND status != 'draft' """, (author,)) else: cursor.execute("""SELECT title_id, title, subtitle, creation_date, last_updated, author, status FROM Blog_posts WHERE author=? AND status='public' OR status='public_unified' """, (author,)) list_posts=cursor.fetchall() posts=None id=0 conn.close() if list_posts != None: posts=list() for post in list_posts: posts=[dict(title_id=post[0], title=post[1], subtitle=post[2], creation_date=post[3], last_updated=post[4], author=post[5], status=post[6])] + posts return render_template('index_blog.html', section='Blog', posts=posts, author=author) @blog.route('/blog//rss.xml', methods=['GET']) def viewauthorrss(author): conn = sqlite3.connect(DATABASE) # Connexion à la base de donnée cursor = conn.cursor() # Création de l'objet "curseur" cursor.execute("""SELECT MAX (creation_date) FROM Blog_posts WHERE author=? AND status='public_unified' OR status='public'""", (author, ) ) last_article_date = cursor.fetchone() cursor.execute("""SELECT title, subtitle, content, creation_date, author, status FROM Blog_posts WHERE author=? AND status='public_unified' OR status='public'""", (author, ) ) list_posts=cursor.fetchall() posts=list() id=0 conn.close() if list_posts != None: last_build=last_article_date[0] for post in list_posts: posts=[dict(title_id[0], title=post[1], subtitle=post[2], content=markdown(post[3], extensions=MARKDOWN_EXT), creation_date=post[4], author=post[5], status=post[6])] + posts return render_template('blog_rss.xml', base_url=BASE_URL, blog_name=author, last_build_date=last_build, posts=posts) @blog.route('/blog/private_unified//', methods=['GET']) @blog.route('/blog/private//', methods=['GET']) @login_required def viewPrivateArticle(username, title_id): user = '%s' % escape(username) conn = sqlite3.connect(DATABASE) # Connexion à la base de donnée cursor = conn.cursor() # Création de l'objet "curseur" cursor.execute("""SELECT title_id, title, subtitle, content, creation_date, last_updated, author, status FROM Blog_posts WHERE author=? AND title_id=? AND status!='draft' """, (user, title_id)) post = cursor.fetchone() conn.close() if post != None: post_info = (dict(title_id=post[0], title=post[1], subtitle=post[2], creation_date=post[4], last_updated=post[5],author=post[6])) content = markdown(post[3], extensions=MARKDOWN_EXT) return render_template('blog.html', post_info=post_info, content=content) else: return redirect(url_for('blog'), code=404) @blog.route('/blog/public_unified//', methods=['GET']) @blog.route('/blog/public//', methods=['GET']) def viewArticle(username, title_id): user = '%s' % escape(username) conn = sqlite3.connect(DATABASE) # Connexion à la base de donnée cursor = conn.cursor() # Création de l'objet "curseur" cursor.execute("""SELECT title_id, title, subtitle, content, creation_date, last_updated, author FROM Blog_posts WHERE author=? AND title=? AND status='public_unified' """, (user, title_id) ) post = cursor.fetchone() conn.close() if post != None: post_info = (dict(title_id=post[0], title=post[1], subtitle=post[2], creation_date=post[4], last_updated=post[5],author=post[6])) content= markdown(post[3], extensions=MARKDOWN_EXT) return render_template('blog.html', post_info=post_info, content=content) else: return redirect(url_for('blog.view'), code=404) @blog.route('/blog/rss.xml', methods=['GET']) def viewrss(): conn = sqlite3.connect(DATABASE) # Connexion à la base de donnée cursor = conn.cursor() # Création de l'objet "curseur" cursor.execute("""SELECT MAX(creation_date) FROM Blog_posts WHERE status='public_unified'""") last_article_date = cursor.fetchone() print (last_article_date[0]) cursor.execute("""SELECT title_id, title, subtitle, content, creation_date, author, status FROM Blog_posts WHERE status='public_unified'""" ) list_posts=cursor.fetchall() posts=list() id=0 conn.close() if list_posts != None: last_build=last_article_date[0] for post in list_posts: posts=[dict(title_id=post[0], title=post[1], subtitle=post[2], content=markdown(post[3], extensions=MARKDOWN_EXT), creation_date=post[4], author=post[5], status=post[6])] + posts return render_template('blog_rss.xml', base_url=BASE_URL, blog_name= TITLE_SERVER, last_build_date=last_build, blog_description=DESC_SERVER, posts=posts)