from dataclasses import dataclass
from io import StringIO
from lxml import html, etree
from github import Github
import multiprocessing
import paramiko.client
from APiHole import PiHole
import transmission_rpc
import configparser
import math as maths
import requests
import datetime
import urllib
import docker
import random
import subprocess
import fabric
import queue
import json
import time
import os
theLastId = 0
CONFIG = configparser.ConfigParser(interpolation = None)
CONFIG.read("edaweb.conf")
def humanbytes(B):
'Return the given bytes as a human friendly KB, MB, GB, or TB string'
B = float(B)
KB = float(1024)
MB = float(KB ** 2) # 1,048,576
GB = float(KB ** 3) # 1,073,741,824
TB = float(KB ** 4) # 1,099,511,627,776
if B < KB:
return '{0} {1}'.format(B,'Bytes' if 0 == B > 1 else 'Byte')
elif KB <= B < MB:
return '{0:.2f} KB'.format(B/KB)
elif MB <= B < GB:
return '{0:.2f} MB'.format(B/MB)
elif GB <= B < TB:
return '{0:.2f} GB'.format(B/GB)
elif TB <= B:
return '{0:.2f} TB'.format(B/TB)
@dataclass
class SafebooruImage:
id_: int
url: str
searchTags: list
tags: list
source: str
imurl: str
def remove_tag(self, tag):
return list(set(self.searchTags).difference(set([tag])))
@dataclass
class DownloadedImage:
imurl: str
def __enter__(self):
self.filename = os.path.join("static", "images", "random.jpg")
req = urllib.request.Request(self.imurl, headers = {'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_5_8) AppleWebKit/534.50.2 (KHTML, like Gecko) Version/5.0.6 Safari/533.22.3'})
mediaContent = urllib.request.urlopen(req).read()
with open(self.filename, "wb") as f:
f.write(mediaContent)
return self.filename
def __exit__(self, type, value, traceback):
os.remove(self.filename)
def get_num_pages(tags):
pages_url = "https://safebooru.org/index.php?page=post&s=list&tags=%s" % "+".join(tags)
tree = html.fromstring(requests.get(pages_url).content)
try:
finalpage_element = tree.xpath("/html/body/div[6]/div/div[2]/div[2]/div/a[12]")[0]
except IndexError:
return 1
else:
return int(int(urllib.parse.parse_qs(finalpage_element.get("href"))["pid"][0]) / (5*8))
def get_id_from_url(url):
return int(urllib.parse.parse_qs(url)["id"][0])
def get_random_image(tags):
global theLastId
searchPage = random.randint(1, get_num_pages(tags)) * 5 * 8
url = "https://safebooru.org/index.php?page=post&s=list&tags=%s&pid=%i" % ("+".join(tags), searchPage)
tree = html.fromstring(requests.get(url).content)
imageElements = [e for e in tree.xpath("/html/body/div[6]/div/div[2]/div[1]")[0].iter(tag = "a")]
try:
element = random.choice(imageElements)
except IndexError:
# raise ConnectionError("Couldn't find any images")
return get_random_image(tags)
url = "https://safebooru.org/" + element.get("href")
if get_id_from_url(url) == theLastId:
return get_random_image(tags)
theLastId = get_id_from_url(url)
try:
sbi = SafebooruImage(
id_ = get_id_from_url(url),
url = url,
tags = element.find("img").get("alt").split(),
searchTags = tags,
source = fix_source_url(get_source(url)),
imurl = get_imurl(url)
)
except (ConnectionError, KeyError) as e:
print("[ERROR]", e)
return get_random_image(tags)
if link_deleted(sbi.url):
print("Retried since the source was deleted...")
return get_random_image(tags)
return sbi
def get_source(url):
tree = html.fromstring(requests.get(url).content)
for element in tree.xpath('//*[@id="stats"]')[0].iter("li"):
if element.text.startswith("Source: h"):
return element.text[8:]
elif element.text.startswith("Source:"):
for child in element.iter():
if child.get("href") is not None:
return child.get("href")
raise ConnectionError("Couldn't find source image for id %i" % get_id_from_url(url))
def fix_source_url(url):
parsed = urllib.parse.urlparse(url)
if parsed.netloc == "www.pixiv.net":
return "https://www.pixiv.net/en/artworks/" + urllib.parse.parse_qs(parsed.query)["illust_id"][0]
elif parsed.netloc in ["bishie.booru.org", "www.secchan.net"]:
return ConnectionError("Couldn't get source")
elif "pximg.net" in parsed.netloc or "pixiv.net" in parsed.netloc:
return "https://www.pixiv.net/en/artworks/" + parsed.path.split("/")[-1][:8]
elif parsed.netloc == "twitter.com":
return url.replace("twitter.com", "nitter.eda.gay")
return url
def get_imurl(url):
tree = html.fromstring(requests.get(url).content)
return tree.xpath('//*[@id="image"]')[0].get("src")
def link_deleted(url):
text = requests.get(url).text
return text[text.find("
") + 7 : text.find("")] in ["Error | nitter", "イラストコミュニケーションサービス[pixiv]"]
def request_recent_commits(since = datetime.datetime.now() - datetime.timedelta(days=7)):
g = Github(CONFIG.get("github", "access_code"))
out = []
for repo in g.get_user().get_repos():
# print(repo.name, list(repo.get_branches()))
try:
for commit in repo.get_commits(since = since):
out.append({
"repo": repo.name,
"message": commit.commit.message,
"url": commit.html_url,
"datetime": commit.commit.author.date,
"stats": {
"additions": commit.stats.additions,
"deletions": commit.stats.deletions,
"total": commit.stats.total
}
})
except Exception as e:
print(repo, e)
return sorted(out, key = lambda a: a["datetime"], reverse = True)
def scrape_nitter(username, get_until:int):
new_tweets = []
nitter_url = CONFIG.get("nitter", "internalurl")
nitter_port = CONFIG.getint("nitter", "internalport")
scrape_new_pages = True
url = "http://%s:%d/%s" % (nitter_url, nitter_port, username)
while scrape_new_pages:
tree = html.fromstring(requests.get(url).content)
for i, tweetUrlElement in enumerate(tree.xpath('//*[@class="tweet-link"]'), 0):
if i > 0 and tweetUrlElement.get("href").split("/")[1] == username:
id_ = int(urllib.parse.urlparse(tweetUrlElement.get("href")).path.split("/")[-1])
tweet_link = "http://%s:%d%s" % (nitter_url, nitter_port, tweetUrlElement.get("href"))
if id_ == get_until:
scrape_new_pages = False
break
try:
dt, replying_to, text, images = parse_tweet(tweet_link)
new_tweets.append((id_, dt, replying_to, text, username, images))
print(dt, "'%s'" % text)
except IndexError:
print("Couldn't get any more tweets")
scrape_new_pages = False
break
except ConnectionError:
print("Rate limited, try again later")
return []
try:
cursor = tree.xpath('//*[@class="show-more"]/a')[0].get("href")
except IndexError:
# no more elements
break
url = "http://%s:%d/%s%s" % (nitter_url, nitter_port, username, cursor)
return new_tweets
def parse_tweet(tweet_url):
# print(tweet_url)
tree = html.fromstring(requests.get(tweet_url).content)
# with open("2images.html", "r") as f:
# tree = html.fromstring(f.read())
rate_limited_elem = tree.xpath("/html/body/div/div/div/span")
if rate_limited_elem != []:
if rate_limited_elem[0].text == "Instance has been rate limited.":
raise ConnectionError("Instance has been rate limited.")
main_tweet_elem = tree.xpath('//*[@class="main-tweet"]')[0]
dt_str = main_tweet_elem.xpath('//*[@class="tweet-published"]')[0].text
dt = datetime.datetime.strptime(dt_str.replace("Â", ""), "%b %d, %Y · %I:%M %p UTC")
text = tree.xpath('//*[@class="main-tweet"]/div/div/div[2]')[0].text_content()
if text == "":
text = "[Image only]"
replying_to_elems = tree.xpath('//*[@class="before-tweet thread-line"]/div/a')
if replying_to_elems != []:
replying_to = int(urllib.parse.urlparse(replying_to_elems[-1].get("href")).path.split("/")[-1])
else:
replying_to = None
images = []
images_elems = tree.xpath('//*[@class="main-tweet"]/div/div/div[3]/div/div/a/img')
for image_elem in images_elems:
images.append("https://" + CONFIG.get("nitter", "outsideurl") + urllib.parse.urlparse(image_elem.get("src")).path)
return dt, replying_to, text, images
def scrape_whispa(whispa_url, since):
tree = html.fromstring(requests.get(whispa_url).content.decode())
qnas = []
# we're not doing proper HTML scraping here really... since the site uses client side rendering
# we rather parse the JS scripts to get the JSON payload of useful information... sadly this looks horrible
for i, script in enumerate(tree.xpath("/html/body/script"), 0):
js = str(script.text)
if "receivedFeedback" in js:
# my god this is horrible...
for j in json.loads(json.loads(js[19:-1])[1][2:])[0][3]["loadedUser"]["receivedFeedback"]:
dt = datetime.datetime.fromisoformat(j["childFeedback"][0]["createdAt"][:-1])
qnas.append({
# "id": int(str(maths.modf(maths.log(int(j["id"], 16)))[0])[2:]),
"id": int(dt.timestamp()),
"link": None,
"datetime": dt,
"question": j["content"],
"answer": j["childFeedback"][0]["content"],
"host": "whispa.sh"
})
return qnas
def get_docker_containers(host, ssh_key_path):
result = fabric.Connection(
host = host,
user = "root",
connect_kwargs = {
"key_filename": ssh_key_path,
"look_for_keys": False
}
).run('docker ps -a -s --format "table {{.Names}};{{.Status}};{{.Image}}"', hide = True)
return [line.split(";") for line in result.stdout.split("\n")[1:-1]]
def get_all_docker_containers(ssh_key_path):
containers = {}
for host, name in CONFIG["docker_hosts"].items():
print(host)
containers[(host, name)] = get_docker_containers(host, ssh_key_path)
return containers
def timeout(func):
# cant get this to work with queue.Queue() for some reason?
# this works but Manager() uses an extra thread than Queue()
manager = multiprocessing.Manager()
returnVan = manager.list()
# ti = time.time()
def runFunc(q, func):
q.append(func())
def beginTimeout():
t = multiprocessing.Process(target = runFunc, args = (returnVan, func))
t.start()
t.join(timeout = CONFIG["servicetimeout"].getint("seconds"))
# print("Request took:", time.time() - ti)
try:
return returnVan[0]
except IndexError:
if t.is_alive():
t.terminate()
return beginTimeout
@timeout
def get_torrent_stats():
client = transmission_rpc.client.Client(
host = CONFIG.get("transmission", "host")
)
s = vars(client.session_stats())["fields"]
return {
"Active torrents:": s["activeTorrentCount"],
"Downloaded:": humanbytes(s["cumulative-stats"]["downloadedBytes"]),
"Uploaded:": humanbytes(s["cumulative-stats"]["uploadedBytes"]),
"Active time:": str(datetime.timedelta(seconds = s["cumulative-stats"]["secondsActive"])),
"Files added:": s["cumulative-stats"]["filesAdded"],
"Current upload speed": humanbytes(s["uploadSpeed"]) + "s/S",
"Current download speed:": humanbytes(s["downloadSpeed"]) + "s/S"
}
@timeout
def get_pihole_stats():
return PiHole.GetSummary(CONFIG.get("pihole", "url"), CONFIG.get("pihole", "key"), True)
if __name__ == "__main__":
# print(get_trans_stats())
#print(scrape_nitter(CONFIG.get("twitter", "diary_account"), 1697430888617840909))
# print(scrape_nitter("estrogenizedboy", 1698107440489734640))
# print(parse_tweet("https://nitter.net/HONMISGENDERER/status/1694231618443981161#m"))
# print(request_recent_commits(since = datetime.datetime.now() - datetime.timedelta(days=30)))
# print(scrape_whispa(CONFIG.get("qnas", "url"), datetime.datetime.fromtimestamp(0.0)))
# print(get_all_docker_containers(os.path.join(os.path.dirname(__file__), "edaweb-docker.pem")))
print(get_torrent_stats())