From 0e8ada1eff8799437c9be1bc2af10d6198fa8cad Mon Sep 17 00:00:00 2001 From: jwansek Date: Tue, 29 Apr 2025 18:51:40 +0100 Subject: Added inline HTML into blogposts, refactored a bit --- database.py | 248 ------------------------------------------------------------ 1 file changed, 248 deletions(-) delete mode 100644 database.py (limited to 'database.py') diff --git a/database.py b/database.py deleted file mode 100644 index 49de9ef..0000000 --- a/database.py +++ /dev/null @@ -1,248 +0,0 @@ -from urllib.parse import urlparse -from dataclasses import dataclass -from lxml import html -import configparser -import threading -import services -import operator -import datetime -import requests -import twython -import pymysql -import random -import os -import re - -@dataclass -class Database: - safeLogin:bool = True #automatically login with the user in the config file, who is read only - user:str = None #otherwise, login with the given username and passwd - passwd:str = None - - def __enter__(self): - self.config = configparser.ConfigParser(interpolation = None) - self.config.read(os.path.join(os.path.dirname(__file__), "edaweb.conf")) - - if self.safeLogin: - self.__connection = pymysql.connect( - **self.config["mysql"], - charset = "utf8mb4" - ) - else: - self.__connection = pymysql.connect( - user = self.user, - passwd = self.passwd, - host = self.config["mysql"]["host"], - db = self.config["mysql"]["db"], - charset = "utf8mb4" - ) - return self - - def __exit__(self, type, value, traceback): - self.__connection.commit() - self.__connection.close() - - def get_header_links(self): - with self.__connection.cursor() as cursor: - cursor.execute("SELECT name, link FROM headerLinks ORDER BY name;") - return cursor.fetchall() - - def get_image(self, imageName): - with self.__connection.cursor() as cursor: - cursor.execute("SELECT alt, url FROM images WHERE imageName = %s;", (imageName, )) - return cursor.fetchone() - - def get_pfp_images(self): - with self.__connection.cursor() as cursor: - cursor.execute("SELECT alt, url FROM images WHERE pfp_img = 1;") - return cursor.fetchall() - - def get_sidebar_images(self): - with self.__connection.cursor() as cursor: - cursor.execute("SELECT alt, url FROM images WHERE sidebar_image = 1;") - return cursor.fetchall() - - def get_header_articles(self): - with self.__connection.cursor() as cursor: - cursor.execute("SELECT articleName, link FROM headerArticles;") - return cursor.fetchall() - - def get_all_categories(self): - with self.__connection.cursor() as cursor: - cursor.execute("SELECT category_name FROM categories;") - return [i[0] for i in cursor.fetchall()] - - def add_category(self, category): - if not category in self.get_all_categories(): - with self.__connection.cursor() as cursor: - cursor.execute("INSERT INTO categories (category_name) VALUES (%s);", (category, )) - - self.__connection.commit() - return True - - return False - - def add_thought(self, category, title, markdown): - with self.__connection.cursor() as cursor: - cursor.execute(""" - INSERT INTO thoughts (category_id, title, markdown_text) - VALUES (( - SELECT category_id FROM categories WHERE category_name = %s - ), %s, %s);""", (category, title, markdown)) - self.__connection.commit() - - def get_thought(self, id_): - with self.__connection.cursor() as cursor: - cursor.execute(""" - SELECT categories.category_name, thoughts.title, thoughts.dt, thoughts.markdown_text, thoughts.redirect - FROM thoughts INNER JOIN categories - ON thoughts.category_id = categories.category_id - WHERE thought_id = %s;""", (id_, )) - return cursor.fetchone() - - def get_similar_thoughts(self, category, id_): - with self.__connection.cursor() as cursor: - cursor.execute(""" - SELECT thought_id, title, dt, category_name FROM thoughts - INNER JOIN categories ON thoughts.category_id = categories.category_id - WHERE category_name = %s AND thought_id != %s;""", - (category, id_)) - return cursor.fetchall() - - def get_featured_thoughts(self): - with self.__connection.cursor() as cursor: - cursor.execute("SELECT thought_id, title FROM thoughts WHERE featured = 1;") - return cursor.fetchall() - - def update_thought_markdown(self, id_, markdown): - with self.__connection.cursor() as cursor: - cursor.execute("UPDATE thoughts SET markdown_text = %s WHERE thought_id = %s;", (markdown, id_)) - self.__connection.commit() - - def get_categories_not(self, category_name): - with self.__connection.cursor() as cursor: - cursor.execute("SELECT category_name FROM categories WHERE category_name != %s;", (category_name, )) - return [i[0] for i in cursor.fetchall()] - - def get_all_thoughts(self): - with self.__connection.cursor() as cursor: - cursor.execute(""" - SELECT thought_id, title, dt, category_name FROM thoughts - INNER JOIN categories ON thoughts.category_id = categories.category_id; - """) - return cursor.fetchall() - - def get_cached_tweets(self, numToGet = None): - with self.__connection.cursor() as cursor: - sql = "SELECT tweet, tweet_id, account FROM diary WHERE account = %s ORDER BY tweeted_at DESC" - args = (self.config.get("twitter", "main_account"), ) - if numToGet is not None: - sql += " LIMIT %s;" - args = (self.config.get("twitter", "main_account"), numToGet) - else: - sql += ";" - cursor.execute(sql, args) - - return [(i[0], "https://%s/%s/status/%d" % (self.config.get("nitter", "outsideurl"), i[2], i[1])) for i in cursor.fetchall()] - - def get_cached_commits(self, since = None, recurse = True): - with self.__connection.cursor() as cursor: - if since is not None: - cursor.execute("SELECT DISTINCT message, url, commitTime, additions, deletions, total FROM commitCache WHERE commitTime > %s ORDER BY commitTime DESC;", (since, )) - else: - cursor.execute("SELECT DISTINCT message, url, commitTime, additions, deletions, total FROM commitCache ORDER BY commitTime DESC;") - # i think i might have spent too long doing functional programming - return [{ - "repo": urlparse(i[1]).path.split("/")[2], - "github_repo_url": "https://github.com" + "/".join(urlparse(i[1]).path.split("/")[:3]), - "git_repo_url": "https://%s/%s.git/about" % (self.config.get("github", "personal_domain"), urlparse(i[1]).path.split("/")[2]), - "message": i[0], - "github_commit_url": i[1], - "git_commit_url": "https://%s/%s.git/commit/?id=%s" % ( - self.config.get("github", "personal_domain"), - urlparse(i[1]).path.split("/")[2], - urlparse(i[1]).path.split("/")[-1] - ), - "datetime": i[2], - "stats": { - "additions": i[3], - "deletions": i[4], - "total": i[5] - } - } for i in cursor.fetchall()] - - def update_commit_cache(self, requested): - with self.__connection.cursor() as cursor: - for commit in requested: - cursor.execute("SELECT DISTINCT url FROM commitCache;") - urls = [i[0] for i in cursor.fetchall()] - - if commit["url"] not in urls: - cursor.execute(""" - INSERT INTO commitCache (message, url, commitTime, additions, deletions, total) - VALUES (%s, %s, %s, %s, %s, %s)""", - (commit["message"], commit["url"], commit["datetime"], commit["stats"]["additions"], commit["stats"]["deletions"], commit["stats"]["total"]) - ) - self.__connection.commit() - - def get_last_commit_time(self): - with self.__connection.cursor() as cursor: - cursor.execute("SELECT MAX(commitTime) FROM commitCache;") - return cursor.fetchone()[0] - - def get_my_twitter(self): - with self.__connection.cursor() as cursor: - cursor.execute("SELECT link FROM headerLinks WHERE name = 'twitter';") - return cursor.fetchone()[0] - - def get_my_diary_twitter(self): - return self.config.get("twitter", "diary_account") - - def get_iso_cd_options(self): - iso_dir = self.config.get("cds", "location") - return [ - i - for i in os.listdir(iso_dir) - if os.path.splitext(i)[-1].lower() in [".iso"] - and os.path.getsize(os.path.join(iso_dir, i)) < self.config.getint("cds", "maxsize") - ] - - def append_cd_orders(self, iso, email, house, street, city, county, postcode, name): - with self.__connection.cursor() as cursor: - cursor.execute(""" - INSERT INTO cd_orders_2 (iso, email, house, street, city, county, postcode, name) - VALUES (%s, %s, %s, %s, %s, %s, %s, %s); - """, (iso, email, house, street, city, county, postcode, name)) - id_ = cursor.lastrowid - self.__connection.commit() - return id_ - - def append_qnas(self, qnas): - with self.__connection.cursor() as cursor: - for qna in qnas: - cursor.execute("SELECT curiouscat_id FROM qnas WHERE curiouscat_id = %s;", (qna["id"], )) - if cursor.fetchone() is None: - - cursor.execute("INSERT INTO `qnas` VALUES (%s, %s, %s, %s, %s, %s);", ( - qna["id"], qna["link"], qna["datetime"], qna["question"], qna["answer"], qna["host"] - )) - print("Appended question with timestamp %s" % qna["datetime"].isoformat()) - - else: - print("Skipped question with timestamp %s" % qna["datetime"].isoformat()) - self.__connection.commit() - - def get_oldest_qna(self): - with self.__connection.cursor() as cursor: - cursor.execute("SELECT MAX(timestamp) FROM qnas;") - return cursor.fetchone()[0] - - def get_qnas(self): - with self.__connection.cursor() as cursor: - cursor.execute("SELECT * FROM qnas;") - return sorted(cursor.fetchall(), key = operator.itemgetter(2), reverse = True) - - - - - -- cgit v1.2.3