import os from typing import List import psycopg2 from data.models import Enterprise, Activity, Division, PhoneNumber, \ ContactInfo, Head, Location class DataProvider: def __init__(self, params): self._connection: psycopg2 = psycopg2.connect(params) self.params = params def migrate(self): with self._connection.cursor() as cur: f = open(os.path.join("migrations", "201831231.2049_initial.sql"), "r") cur.execute(f.read()) f.close() def get_enterprises(self) -> List[Enterprise]: with self._connection.cursor() as cursor: cursor.execute("SELECT id, name, short_name FROM enterprise;") res = [Enterprise( ent_id, self.get_activities_by_enterprpise(ent_id), self.get_divisions_by_enterprise(ent_id), self.get_contact_info_by_enterprise(ent_id), name, short_name ) for ent_id, name, short_name in cursor.fetchall()] return res def get_enterprise_by_id(self, ent_id: int): with self._connection.cursor() as cursor: cursor.execute("SELECT id, name, short_name " "FROM enterprise where id=%s;", (ent_id,)) ent_id, name, short_name = cursor.fetchone() return Enterprise( ent_id, self.get_activities_by_enterprpise(ent_id), self.get_divisions_by_enterprise(ent_id), self.get_contact_info_by_enterprise(ent_id), name, short_name ) def get_activities_by_enterprpise(self, ent_id: int) -> List[Activity]: with self._connection.cursor() as cursor: cursor.execute( "SELECT activity.id, name FROM activity " "right join do_activity da on activity.id = da.activity " "where enterprise = %s", (ent_id,)) return [Activity(act_id, name) for act_id, name in cursor.fetchall()] def get_divisions_by_enterprise(self, ent_id: int) -> List[Division]: with self._connection.cursor() as cur: cur.execute( "SELECT id from division where enterprise = %s", (ent_id,) ) return [Division(div_id, self.get_contact_info_by_division(div_id), self.get_activities_by_division(div_id) ) for div_id in cur.fetchall()] def get_contact_info_by_enterprise(self, ent_id: int) -> ContactInfo: with self._connection.cursor() as cur: cur.execute("SELECT id, email from contact_info " "where id = (SELECT contact_info " "FROM enterprise WHERE enterprise.id = %s)", (ent_id,)) con_id, email = cur.fetchone() return ContactInfo( con_id, email, self.get_phone_numbers_by_contact(con_id), self.get_head_by_contact(con_id), self.get_locations_by_contact(con_id) ) def get_contact_info_by_division(self, div_id: int) -> ContactInfo: with self._connection.cursor() as cur: cur.execute("SELECT id, email " "from contact_info where id = (SELECT contact_info " "FROM division WHERE division.id = %s)", (div_id,)) con_id, email = cur.fetchone() return ContactInfo( con_id, email, self.get_phone_numbers_by_contact(con_id), self.get_head_by_contact(con_id), self.get_locations_by_contact(con_id) ) def get_phone_numbers_by_contact(self, con_id: int) -> List[PhoneNumber]: with self._connection.cursor() as cur: cur.execute("SELECT id, date_of_start, date_of_end " "FROM phone_number where contact_info = %s", (con_id,)) return [PhoneNumber( ph_id, date_start, date_end ) for ph_id, date_start, date_end in cur.fetchall()] def get_head_by_contact(self, con_id: int) -> List[Head]: with self._connection.cursor() as cur: cur.execute("SELECT id, date_of_start, date_of_end " "FROM head WHERE contact_info = %s", (con_id,)) cur.fetchall() return [Head(id, date_start, date_end) for id, date_start, date_end in cur] def get_locations_by_contact(self, con_id: int) -> List[Location]: with self._connection.cursor() as cur: cur.execute( "SELECT id, country, city, street, house, room, " "date_of_start, date_of_end FROM location " "where contact_info = %s", (con_id,)) return [ Location(id, country, city, street, house, room, date_of_start, date_of_end) for id, country, city, street, house, room, date_of_start, date_of_end in cur.fetchall()] def get_activities_by_division(self, div_id: int) -> List[Activity]: with self._connection.cursor() as cursor: cursor.execute( "SELECT activity.id, name FROM activity " "right join do_activity da on activity.id = da.activity " "where division = %s", (div_id,)) return [Activity(act_id, name) for act_id, name in cursor.fetchall()] def create_contact_info(self, info: ContactInfo): with self._connection.cursor() as cur: cur.execute("INSERT INTO contact_info (email) VALUES (%s) " "RETURNING id", (info.email, )) info.id = cur.fetchone()[0] for head in info.head: self.create_head(head, info.id) for loc in info.locations: self.create_location(loc, info.id) for phone in info.phone_numbers: self.create_phone_number(phone, info.id) def create_head(self, head: Head, con_id: int): with self._connection.cursor() as cur: cur.execute("INSERT INTO head " "(date_of_start, date_of_end, contact_info) " "VALUES (%s, %s, %s) RETURNING id", (head.date_of_start, head.date_of_end, con_id)) head.id = cur.fetchone()[0] def create_location(self, loc: Location, con_id: int): with self._connection.cursor() as cur: cur.execute("INSERT INTO location (country, city, street, house, " "room, date_of_start, date_of_end, contact_info) " "VALUES (%s, %s, %s, %s, %s, %s, %s, %s) RETURNING id", (loc.country, loc.city, loc.street, loc.house, loc.room, loc.date_of_start, loc.date_of_end, con_id)) loc.id = cur.fetchone()[0] def create_phone_number(self, phone: PhoneNumber, con_id: int): with self._connection.cursor() as cur: cur.execute("INSERT INTO phone_number " "(contact_info, date_of_start, date_of_end) VALUES " "(%s, %s, %s) RETURNING id", (con_id, phone.date_of_start, phone.date_of_end)) phone.id = cur.fetchone()[0] def create_division(self, division: Division, ent_id: int): with self._connection.cursor() as cur: self.create_contact_info(division.contact_info) cur.execute("INSERT INTO division (enterprise, contact_info) " "VALUES (%s, %s) RETURNING id", (ent_id, division.contact_info.id)) division.id = cur.fetchone()[0] for act in division.activities: self.add_activity_to_division(division, act) def create_enterprise(self, enterprise: Enterprise): with self._connection.cursor() as cur: self.create_contact_info(enterprise.contact_info) cur.execute("INSERT INTO enterprise " "(name, short_name, contact_info) " "VALUES (%s, %s, %s) RETURNING id", (enterprise.name, enterprise.short_name, enterprise.contact_info.id)) enterprise.id = cur.fetchone()[0] for act in enterprise.activities: self.add_activity_to_enterprise(enterprise, act) for division in enterprise.divisions: self.create_division(division, enterprise.id) def create_activity(self, activity: Activity): with self._connection.cursor() as cur: cur.execute("INSERT INTO activity (name) VALUES (%s) RETURNING id", (activity.name, )) activity.id = cur.fetchone()[0] def add_activity_to_division(self, division: Division, activity: Activity): with self._connection.cursor() as cur: cur.execute("INSERT INTO do_activity (activity, division) " "VALUES (%s, %s)", (activity.id, division.id)) def add_activity_to_enterprise(self, enterprise: Enterprise, activity: Activity): with self._connection.cursor() as cur: cur.execute("INSERT INTO do_activity (activity, enterprise) " "VALUES (%s, %s)", (activity.id, enterprise.id))