Files
yaric359/data/data_provider.py
2019-01-04 16:43:39 +05:00

252 lines
11 KiB
Python
Executable File

import os
from typing import List
import psycopg2
from data.models import Enterprise, Activity, Division, PhoneNumber, \
ContactInfo, Head, Location
class DataProvider:
def __init__(self, params):
self._connection: psycopg2 = psycopg2.connect(params)
self.params = params
def migrate(self):
with self._connection.cursor() as cur:
f = open(os.path.join("migrations", "201831231.2049_initial.sql"),
"r")
cur.execute(f.read())
f.close()
self._connection.commit()
def get_enterprises(self) -> List[Enterprise]:
with self._connection.cursor() as cursor:
cursor.execute("SELECT id, name, short_name FROM enterprise;")
res = [Enterprise(
ent_id,
self.get_activities_by_enterprpise(ent_id),
self.get_divisions_by_enterprise(ent_id),
self.get_contact_info_by_enterprise(ent_id),
name,
short_name
) for ent_id, name, short_name in cursor.fetchall()]
self._connection.commit()
return res
def get_enterprise_by_id(self, ent_id: int):
with self._connection.cursor() as cursor:
cursor.execute("SELECT id, name, short_name "
"FROM enterprise where id=%s;", (ent_id,))
ent_id, name, short_name = cursor.fetchone()
self._connection.commit()
return Enterprise(
ent_id,
self.get_activities_by_enterprpise(ent_id),
self.get_divisions_by_enterprise(ent_id),
self.get_contact_info_by_enterprise(ent_id),
name,
short_name
)
def get_activities_by_enterprpise(self, ent_id: int) -> List[Activity]:
with self._connection.cursor() as cursor:
cursor.execute(
"SELECT activity.id, name FROM activity "
"right join do_activity da on activity.id = da.activity "
"where enterprise = %s",
(ent_id,))
self._connection.commit()
return [Activity(act_id, name) for act_id, name
in cursor.fetchall()]
def get_divisions_by_enterprise(self, ent_id: int) -> List[Division]:
with self._connection.cursor() as cur:
cur.execute(
"SELECT id from division where enterprise = %s", (ent_id,)
)
self._connection.commit()
return [Division(div_id,
self.get_contact_info_by_division(div_id),
self.get_activities_by_division(div_id)
) for div_id in cur.fetchall()]
def get_contact_info_by_enterprise(self,
ent_id: int) -> ContactInfo:
with self._connection.cursor() as cur:
cur.execute("SELECT id, email from contact_info "
"where id = (SELECT contact_info "
"FROM enterprise WHERE enterprise.id = %s)",
(ent_id,))
con_id, email = cur.fetchone()
self._connection.commit()
return ContactInfo(
con_id,
email,
self.get_phone_numbers_by_contact(con_id),
self.get_head_by_contact(con_id),
self.get_locations_by_contact(con_id)
)
def get_contact_info_by_division(self,
div_id: int) -> ContactInfo:
with self._connection.cursor() as cur:
cur.execute("SELECT id, email "
"from contact_info where id = (SELECT contact_info "
"FROM division WHERE division.id = %s)",
(div_id,))
con_id, email = cur.fetchone()
self._connection.commit()
return ContactInfo(
con_id,
email,
self.get_phone_numbers_by_contact(con_id),
self.get_head_by_contact(con_id),
self.get_locations_by_contact(con_id)
)
def get_phone_numbers_by_contact(self, con_id: int) -> List[PhoneNumber]:
with self._connection.cursor() as cur:
cur.execute("SELECT id, date_of_start, date_of_end "
"FROM phone_number where contact_info = %s",
(con_id,))
self._connection.commit()
return [PhoneNumber(
ph_id,
date_start,
date_end
) for ph_id, date_start, date_end in cur.fetchall()]
def get_head_by_contact(self, con_id: int) -> List[Head]:
with self._connection.cursor() as cur:
cur.execute("SELECT id, date_of_start, date_of_end "
"FROM head WHERE contact_info = %s",
(con_id,))
cur.fetchall()
self._connection.commit()
return [Head(id, date_start, date_end) for id, date_start, date_end
in cur]
def get_locations_by_contact(self, con_id: int) -> List[Location]:
with self._connection.cursor() as cur:
cur.execute(
"SELECT id, country, city, street, house, room, "
"date_of_start, date_of_end FROM location "
"where contact_info = %s",
(con_id,))
self._connection.commit()
return [
Location(id, country, city, street, house, room, date_of_start,
date_of_end) for
id, country, city, street, house, room,
date_of_start, date_of_end
in cur.fetchall()]
def get_activities_by_division(self, div_id: int) -> List[Activity]:
with self._connection.cursor() as cursor:
cursor.execute(
"SELECT activity.id, name FROM activity "
"right join do_activity da on activity.id = da.activity "
"where division = %s",
(div_id,))
self._connection.commit()
return [Activity(act_id, name) for act_id, name
in cursor.fetchall()]
def create_contact_info(self, info: ContactInfo):
with self._connection.cursor() as cur:
cur.execute("INSERT INTO contact_info (email) VALUES (%s) "
"RETURNING id", (info.email, ))
info.id = cur.fetchone()[0]
self._connection.commit()
for head in info.head:
self.create_head(head, info.id)
for loc in info.locations:
self.create_location(loc, info.id)
for phone in info.phone_numbers:
self.create_phone_number(phone, info.id)
def create_head(self, head: Head, con_id: int):
with self._connection.cursor() as cur:
cur.execute("INSERT INTO head "
"(date_of_start, date_of_end, contact_info) "
"VALUES (%s, %s, %s) RETURNING id",
(head.date_of_start, head.date_of_end, con_id))
head.id = cur.fetchone()[0]
self._connection.commit()
def create_location(self, loc: Location, con_id: int):
with self._connection.cursor() as cur:
cur.execute("INSERT INTO location (country, city, street, house, "
"room, date_of_start, date_of_end, contact_info) "
"VALUES (%s, %s, %s, %s, %s, %s, %s, %s) RETURNING id",
(loc.country, loc.city, loc.street, loc.house,
loc.room, loc.date_of_start, loc.date_of_end, con_id))
loc.id = cur.fetchone()[0]
self._connection.commit()
def create_phone_number(self, phone: PhoneNumber, con_id: int):
with self._connection.cursor() as cur:
cur.execute("INSERT INTO phone_number "
"(contact_info, date_of_start, date_of_end) VALUES "
"(%s, %s, %s) RETURNING id",
(con_id, phone.date_of_start, phone.date_of_end))
phone.id = cur.fetchone()[0]
self._connection.commit()
def create_division(self, division: Division, ent_id: int):
with self._connection.cursor() as cur:
self.create_contact_info(division.contact_info)
cur.execute("INSERT INTO division (enterprise, contact_info) "
"VALUES (%s, %s) RETURNING id",
(ent_id, division.contact_info.id))
division.id = cur.fetchone()[0]
for act in division.activities:
self.add_activity_to_division(division, act)
self._connection.commit()
def create_enterprise(self, enterprise: Enterprise):
with self._connection.cursor() as cur:
self.create_contact_info(enterprise.contact_info)
cur.execute("INSERT INTO enterprise "
"(name, short_name, contact_info) "
"VALUES (%s, %s, %s) RETURNING id",
(enterprise.name,
enterprise.short_name,
enterprise.contact_info.id))
enterprise.id = cur.fetchone()[0]
for act in enterprise.activities:
self.add_activity_to_enterprise(enterprise, act)
for division in enterprise.divisions:
self.create_division(division, enterprise.id)
self._connection.commit()
def create_activity(self, activity: Activity):
with self._connection.cursor() as cur:
cur.execute("INSERT INTO activity (name) VALUES (%s) RETURNING id",
(activity.name, ))
activity.id = cur.fetchone()[0]
self._connection.commit()
def add_activity_to_division(self, division: Division, activity: Activity):
with self._connection.cursor() as cur:
cur.execute("INSERT INTO do_activity (activity, division) "
"VALUES (%s, %s)", (activity.id, division.id))
self._connection.commit()
def add_activity_to_enterprise(self, enterprise: Enterprise,
activity: Activity):
with self._connection.cursor() as cur:
cur.execute("INSERT INTO do_activity (activity, enterprise) "
"VALUES (%s, %s)", (activity.id, enterprise.id))
self._connection.commit()
def get_activities(self):
with self._connection.cursor() as cur:
cur.execute("SELECT id, name FROM activity")
res = [Activity(id, name) for id, name in cur.fetchall()]
self._connection.commit()
return res