diff options
Diffstat (limited to 'src/database.py')
-rw-r--r-- | src/database.py | 434 |
1 files changed, 434 insertions, 0 deletions
diff --git a/src/database.py b/src/database.py new file mode 100644 index 0000000..d67b96c --- /dev/null +++ b/src/database.py @@ -0,0 +1,434 @@ +from dataclasses import dataclass +import operator +import datetime +import pymysql +import pandas +import app +import os + +@dataclass +class PayGapDatabase: + + postcode_lookup_obj = None + host: str = "db" + user: str = "root" + passwd: str = None + db: str = "paygap" + port: int = 3306 + + def __enter__(self): + if self.passwd is None: + self.passwd = os.environ["MYSQL_ROOT_PASSWORD"] + + try: + self.__connection = self.__get_connection() + except Exception as e: + print(e) + if e.args[0] == 1049: + self.__connection = self.__build_db() + return self + + def __exit__(self, type, value, traceback): + self.__connection.close() + + def __get_connection(self): + return pymysql.connect( + host = self.host, + port = self.port, + user = self.user, + passwd = self.passwd, + charset = "utf8mb4", + database = self.db + ) + + def __build_db(self): + print("Building database...") + self.__connection = pymysql.connect( + host = self.host, + port = self.port, + user = self.user, + passwd = self.passwd, + charset = "utf8mb4", + ) + with self.__connection.cursor() as cursor: + # unsafe: + cursor.execute("CREATE DATABASE %s" % self.db) + cursor.execute("USE %s" % self.db) + + cursor.execute(""" + CREATE TABLE sic_sections( + sic_section_id CHAR(1) NOT NULL PRIMARY KEY, + sic_section_name VARCHAR(128) NOT NULL + ); + """) + + cursor.execute(""" + CREATE TABLE sic( + sic_code INT UNSIGNED NOT NULL PRIMARY KEY, + sic_description VARCHAR(512) NOT NULL, + sic_section CHAR(1) NOT NULL, + FOREIGN KEY (sic_section) REFERENCES sic_sections(sic_section_id) + ); + """) + + cursor.execute(""" + CREATE TABLE employer( + company_number CHAR(8) NOT NULL PRIMARY KEY, + name VARCHAR(512) NOT NULL, + address TEXT NOT NULL, + postcode VARCHAR(8) NOT NULL, + policy_link VARCHAR(256) NULL, + responsible_person VARCHAR(128) NOT NULL, + size VARCHAR(20) NOT NULL, + current_name VARCHAR(512) NULL, + status VARCHAR(32) NULL, + type_ VARCHAR(128) NULL, + incorporated DATETIME NULL + ) + """) + + cursor.execute(""" + CREATE TABLE employer_sic( + company_number CHAR(8) NOT NULL, + sic_code INT UNSIGNED NOT NULL, + PRIMARY KEY (company_number, sic_code), + FOREIGN KEY (company_number) REFERENCES employer(company_number), + FOREIGN KEY (sic_code) REFERENCES sic(sic_code) + ); + """) + + cursor.execute(""" + CREATE TABLE pay( + pay_id INT UNSIGNED NOT NULL AUTO_INCREMENT PRIMARY KEY, + company_number CHAR(8) NOT NULL, + source VARCHAR(64) NOT NULL, + date_submitted DATETIME NOT NULL, + DiffMeanHourlyPercent DECIMAL(8,3) NOT NULL, + DiffMedianHourlyPercent DECIMAL(8,3) NOT NULL, + DiffMeanBonusPercent DECIMAL(8,3) NOT NULL, + DiffMedianBonusPercent DECIMAL(8,3) NOT NULL, + MaleBonusPercent DECIMAL(8,3) NOT NULL, + FemaleBonusPercent DECIMAL(8,3) NOT NULL, + MaleLowerQuartile DECIMAL(8,3) NOT NULL, + FemaleLowerQuartile DECIMAL(8,3) NOT NULL, + MaleLowerMiddleQuartile DECIMAL(8,3) NOT NULL, + FemaleLowerMiddleQuartile DECIMAL(8,3) NOT NULL, + MaleUpperMiddleQuartile DECIMAL(8,3) NOT NULL, + FemaleUpperMiddleQuartile DECIMAL(8,3) NOT NULL, + MaleTopQuartile DECIMAL(8,3) NOT NULL, + FemaleTopQuartile DECIMAL(8,3) NOT NULL, + FOREIGN KEY (company_number) REFERENCES employer(company_number) + ); + """) + + self.__connection.commit() + return self.__connection + + def _wrap_percent(self, word): + return "%%%s%%" % (word) + + def append_sic_sections(self, section_id, description): + # print("Section ID: '%s', Description: '%s'" % (section_id, description)) + with self.__connection.cursor() as cursor: + cursor.execute(""" + INSERT INTO sic_sections VALUES (%s, %s) ON DUPLICATE KEY UPDATE sic_section_name = %s; + """, (section_id, description, description)) + self.__connection.commit() + + def append_sic(self, code, description, section_id): + print("Appended code %d (%s) under section %s" % (code, description, section_id)) + with self.__connection.cursor() as cursor: + cursor.execute(""" + INSERT INTO sic VALUES (%s, %s, %s) ON DUPLICATE KEY UPDATE sic_description = %s, sic_section = %s; + """, (code, description, section_id, description, section_id)) + self.__connection.commit() + + def append_employer(self, company_number, name, address, postcode, policy_link, responsible_person, size, current_name, \ + status, type_, incorporated, sics): + + # print("incorporated: %s" % str(incorporated)) + # print("sics", sics) + # print("name: %s" % name) + with self.__connection.cursor() as cursor: + cursor.execute(""" + INSERT INTO employer (company_number, name, address, postcode, policy_link, responsible_person, size, current_name, status, type_, incorporated) + VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s) + ON DUPLICATE KEY UPDATE + name = %s, address = %s, postcode = %s, policy_link = %s, responsible_person = %s, size = %s, + current_name = %s, status = %s, type_ = %s, incorporated = %s; + """, ( + company_number, name, address, postcode, policy_link, responsible_person, size, current_name, status, type_, incorporated, + name, address, postcode, policy_link, responsible_person, size, current_name, status, type_, incorporated + )) + # sql = """INSERT INTO employer (company_number, name, address, postcode, policy_link, responsible_person, size, current_name, status, type_, incorporated) + # VALUES ('%s', '%s', '%s', '%s', '%s', '%s', '%s', '%s', '%s', '%s', '%s');""" % ( + # company_number, name, address, postcode, policy_link, responsible_person, size, current_name, status, type_, incorporated + # ) + # print(sql) + + self.append_employer_sics(company_number, sics) + self.__connection.commit() + + def append_pay_info(self, company_number, source, date_submitted, diff_mean_hourly_percent, diff_median_hourly_percent, \ + diff_mean_bonus_percent, diff_median_bonus_percent, male_bonus_percent, female_bonus_percent, male_lower_quartile, \ + female_lower_quartile, male_lower_middle_quartile, female_lower_middle_quartile, male_upper_middle_quartile, \ + female_upper_middle_quartile, male_top_quartile, female_top_quartile): + + try: + float(diff_mean_hourly_percent) + except ValueError: + diff_mean_hourly_percent = None + + + with self.__connection.cursor() as cursor: + cursor.execute("DELETE FROM pay WHERE company_number = %s AND source = %s;", (company_number, source)) + + try: + cursor.execute(""" + INSERT INTO pay (company_number, source, date_submitted, DiffMeanHourlyPercent, DiffMedianHourlyPercent, + DiffMeanBonusPercent, DiffMedianBonusPercent, MaleBonusPercent, FemaleBonusPercent, MaleLowerQuartile, + FemaleLowerQuartile, MaleLowerMiddleQuartile, FemaleLowerMiddleQuartile, MaleUpperMiddleQuartile, + FemaleUpperMiddleQuartile, MaleTopQuartile, FemaleTopQuartile) VALUES ( + %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s + ); + """, ( + company_number, source, date_submitted, diff_mean_hourly_percent, diff_median_hourly_percent, + diff_mean_bonus_percent, diff_median_bonus_percent, male_bonus_percent, female_bonus_percent, male_lower_quartile, + female_lower_quartile, male_lower_middle_quartile, female_lower_middle_quartile, male_upper_middle_quartile, + female_upper_middle_quartile, male_top_quartile, female_top_quartile + )) + except pymysql.err.DataError: + return + + self.__connection.commit() + + + def append_employer_sics(self, company_number, sics): + with self.__connection.cursor() as cursor: + cursor.execute("DELETE FROM employer_sic WHERE company_number = %s", (company_number, )) + + for sic in sics: + cursor.execute("SELECT * FROM sic WHERE sic_code = %s", (sic, )) + if cursor.fetchone() != None: + cursor.execute("INSERT INTO employer_sic VALUES (%s, %s);", (company_number, sic)) + + def search_company(self, company_prefix): + with self.__connection.cursor() as cursor: + cursor.execute(""" + SELECT name, company_number FROM employer + WHERE name LIKE '%s' OR current_name LIKE '%s'; + """ % ( + self._wrap_percent(company_prefix), + self._wrap_percent(company_prefix) + )) + + return [(i[0].title(), i[1]) for i in cursor.fetchall()] + + def get_company_types(self): + with self.__connection.cursor() as cursor: + cursor.execute("SELECT DISTINCT type_ FROM employer WHERE type_ IS NOT NULL;") + return [i[0] for i in cursor.fetchall()] + + def get_company_sizes(self): + return [ + "Not Provided", + "Less than 250", + "250 to 499", + "500 to 999", + "1000 to 4999", + "5000 to 19,999", + "20,000 or more" + ] + + def get_sic_sections(self): + with self.__connection.cursor() as cursor: + cursor.execute("SELECT sic_section_name FROM sic_sections") + return [i[0] for i in cursor.fetchall()] + + def _source_name_to_year(self, source): + return os.path.splitext(source)[0].split("-")[-1].strip().replace("to", "-") + + def get_pay_by_year(self, pay_type, sic_section_name = None, employer_type = None, employer_size = None): + sql = "SELECT source, -AVG(" + if pay_type.lower() == "hourly": + sql += "DiffMedianHourlyPercent" + elif pay_type.lower() == "bonuses": + sql += "DiffMedianBonusPercent" + sql += ") FROM pay" + + subqueries = [] + args = [] + if sic_section_name is not None: + subqueries.append(""" + company_number IN ( + SELECT DISTINCT company_number FROM employer_sic WHERE sic_code IN ( + SELECT DISTINCT sic_code FROM sic WHERE sic_section = ( + SELECT sic_section_id FROM sic_sections WHERE sic_section_name = %s + ) + ) + )""") + args.append(sic_section_name) + if employer_type is not None: + subqueries.append(""" + company_number IN ( + SELECT company_number FROM employer WHERE type_ = %s + ) + """) + args.append(employer_type) + if employer_size is not None: + subqueries.append(""" + company_number IN ( + SELECT company_number FROM employer WHERE size = %s + ) + """) + args.append(employer_size) + + with self.__connection.cursor() as cursor: + if sic_section_name is not None or employer_type is not None or employer_size is not None: + sql += " WHERE {}".format(" OR ".join(subqueries)) + + sql += " GROUP BY source ORDER BY source;" + cursor.execute(sql, tuple(args)) + + else: + sql += " GROUP BY source ORDER BY source;" + cursor.execute(sql) + + # print(sql) + # print(tuple(args)) + return [(self._source_name_to_year(i[0]), float(i[1])) for i in cursor.fetchall()] + + def get_years(self): + with self.__connection.cursor() as cursor: + cursor.execute("SELECT DISTINCT source FROM pay;") + return [self._source_name_to_year(i[0]) for i in cursor.fetchall()] + + def get_pay_by_sic_section(self, pay_type, year = None): + pay = [] + for section_name in self.get_sic_sections(): + sql = "SELECT -AVG(" + if pay_type.lower() == "hourly": + sql += "DiffMedianHourlyPercent" + elif pay_type.lower() == "bonuses": + sql += "DiffMedianBonusPercent" + sql += """ + ) FROM pay WHERE company_number IN ( + SELECT DISTINCT company_number FROM employer_sic WHERE sic_code IN ( + SELECT DISTINCT sic_code FROM sic WHERE sic_section = ( + SELECT sic_section_id FROM sic_sections WHERE sic_section_name = %s + ) + ) + ) + """ + + if year is not None: + sql += " AND source LIKE %s" + + sql += ";" + + with self.__connection.cursor() as cursor: + # print(sql, (section_name, "%" + year.replace("to", "-") + "%")) + if year is None: + cursor.execute(sql, (section_name, )) + else: + cursor.execute(sql, (section_name, "%" + year.replace("-", "to") + "%")) + + f = cursor.fetchone()[0] + if f is not None: + pay.append((section_name, float(f))) + + return sorted(pay, key = operator.itemgetter(1), reverse = True) + + def get_heatmap_data(self, pay_type, year = None): + sql = "SELECT insinuated_loc, COUNT(insinuated_loc), -AVG(" + if pay_type.lower() == "hourly": + sql += "DiffMedianHourlyPercent" + elif pay_type.lower() == "bonuses": + sql += "DiffMedianBonusPercent" + sql += """ + ) FROM employer INNER JOIN pay ON pay.company_number = employer.company_number + WHERE insinuated_loc_type IS NOT NULL + """ + if year is not None: + sql += " AND source LIKE %s" + + sql += " GROUP BY insinuated_loc;" + + with self.__connection.cursor() as cursor: + if year is None: + cursor.execute(sql) + else: + cursor.execute(sql, ("%" + year.replace("-", "to") + "%", )) + + return [[i[0], i[1], float(i[2])] for i in cursor.fetchall()] + + def _get_postcode_lookup_obj(self, path_): + return pandas.read_csv(path_) + + def _get_counties(self): + return {feature["properties"]["name"] for feature in app.UK_GEOJSON["features"]} + + def append_counties(self, path_): + if self.postcode_lookup_obj is None: + self.postcode_lookup_obj = self._get_postcode_lookup_obj(path_) + + counties = self._get_counties() + postcodes = self._get_postcodes() + + with self.__connection.cursor() as cursor: + + cursor.execute("ALTER TABLE employer ADD COLUMN IF NOT EXISTS insinuated_loc VARCHAR(69) DEFAULT NULL;") + cursor.execute("ALTER TABLE employer ADD COLUMN IF NOT EXISTS insinuated_loc_type VARCHAR(25) DEFAULT NULL;") + + for i, j in enumerate(postcodes, 1): + id_, postcode = j + found_locations = self.postcode_lookup_obj[ + (self.postcode_lookup_obj["Postcode 1"] == postcode) | + (self.postcode_lookup_obj["Postcode 2"] == postcode) | + (self.postcode_lookup_obj["Postcode 3"] == postcode) + ] + if len(found_locations) == 1: + county, la = found_locations[["County Name", "Local Authority Name"]].values[0] + if la in counties: + cursor.execute("UPDATE employer SET insinuated_loc = %s, insinuated_loc_type = 'Local Authority' WHERE company_number = %s", (la, id_)) + + print("[%d/%d] Using local authority '%s' for postcode '%s'" % (i, len(postcodes), la, postcode)) + elif county in counties: + cursor.execute("UPDATE employer SET insinuated_loc = %s, insinuated_loc_type = 'County' WHERE company_number = %s", (county, id_)) + + print("[%d/%d] Using county '%s' for postcode '%s'" % (i, len(postcodes), county, postcode)) + elif "Northamptonshire" in la: + print("Manually fixing Northamptonshire...") + cursor.execute("UPDATE employer SET insinuated_loc = %s, insinuated_loc_type = 'County' WHERE company_number = %s", ("Northamptonshire", id_)) + elif "Bournemouth" in la: + print("Manually fixing Bournemouth...") + cursor.execute("UPDATE employer SET insinuated_loc = %s, insinuated_loc_type = 'County' WHERE company_number = %s", ("Bournemouth", id_)) + else: + print("[%d/%d] Didn't recoginse the local authority '%s' or the county '%s'" % (i, len(postcodes), la, county)) + else: + print("[%d/%d] Couldn't find a county for postcode '%s' (company id '%s')" % (i, len(postcodes), postcode, id_)) + + # break + self.__connection.commit() + + def _get_postcodes(self): + with self.__connection.cursor() as cursor: + cursor.execute("SELECT company_number, TRIM(SUBSTRING_INDEX(address, ',', -1)) FROM employer;") + return cursor.fetchall() + + + +if __name__ == "__main__": + if not os.path.exists(".docker"): + import dotenv + dotenv.load_dotenv(dotenv_path = "db.env") + host = "srv.home" + else: + host = "db" + + with PayGapDatabase(host = host) as db: + # print(db.get_years()) + # print(db.get_pay_by_sic_section("bonuses", None)) + print(db.get_heatmap_data("hourly", db.get_years()[0])) + # print(db.append_counties()) + |