Data Provider and tests

This commit is contained in:
2019-01-04 15:51:02 +05:00
parent 5d239307ad
commit 752826d872
9 changed files with 404 additions and 1 deletions

2
.gitignore vendored
View File

@ -155,3 +155,5 @@ fabric.properties
# Editor-based Rest Client
.idea/httpRequests
.doit.*

View File

@ -27,5 +27,5 @@
5. Запустите прожект
```
python 3 app.py
python3 app.py
```

0
data/__init__.py Executable file
View File

224
data/data_provider.py Executable file
View File

@ -0,0 +1,224 @@
import os
from typing import List
import psycopg2
from data.models import Enterprise, Activity, Division, PhoneNumber, \
ContactInfo, Head, Location
class DataProvider:
def __init__(self, params):
self._connection: psycopg2 = psycopg2.connect(params)
self.params = params
def migrate(self):
with self._connection.cursor() as cur:
f = open(os.path.join("migrations", "201831231.2049_initial.sql"),
"r")
cur.execute(f.read())
f.close()
def get_enterprises(self) -> List[Enterprise]:
with self._connection.cursor() as cursor:
cursor.execute("SELECT id, name, short_name FROM enterprise;")
res = [Enterprise(
ent_id,
self.get_activities_by_enterprpise(ent_id),
self.get_divisions_by_enterprise(ent_id),
self.get_contact_info_by_enterprise(ent_id),
name,
short_name
) for ent_id, name, short_name in cursor.fetchall()]
return res
def get_enterprise_by_id(self, ent_id: int):
with self._connection.cursor() as cursor:
cursor.execute("SELECT id, name, short_name "
"FROM enterprise where id=%s;", (ent_id,))
ent_id, name, short_name = cursor.fetchone()
return Enterprise(
ent_id,
self.get_activities_by_enterprpise(ent_id),
self.get_divisions_by_enterprise(ent_id),
self.get_contact_info_by_enterprise(ent_id),
name,
short_name
)
def get_activities_by_enterprpise(self, ent_id: int) -> List[Activity]:
with self._connection.cursor() as cursor:
cursor.execute(
"SELECT activity.id, name FROM activity "
"right join do_activity da on activity.id = da.activity "
"where enterprise = %s",
(ent_id,))
return [Activity(act_id, name) for act_id, name
in cursor.fetchall()]
def get_divisions_by_enterprise(self, ent_id: int) -> List[Division]:
with self._connection.cursor() as cur:
cur.execute(
"SELECT id from division where enterprise = %s", (ent_id,)
)
return [Division(div_id,
self.get_contact_info_by_division(div_id),
self.get_activities_by_division(div_id)
) for div_id in cur.fetchall()]
def get_contact_info_by_enterprise(self,
ent_id: int) -> ContactInfo:
with self._connection.cursor() as cur:
cur.execute("SELECT id, email from contact_info "
"where id = (SELECT contact_info "
"FROM enterprise WHERE enterprise.id = %s)",
(ent_id,))
con_id, email = cur.fetchone()
return ContactInfo(
con_id,
email,
self.get_phone_numbers_by_contact(con_id),
self.get_head_by_contact(con_id),
self.get_locations_by_contact(con_id)
)
def get_contact_info_by_division(self,
div_id: int) -> ContactInfo:
with self._connection.cursor() as cur:
cur.execute("SELECT id, email "
"from contact_info where id = (SELECT contact_info "
"FROM division WHERE division.id = %s)",
(div_id,))
con_id, email = cur.fetchone()
return ContactInfo(
con_id,
email,
self.get_phone_numbers_by_contact(con_id),
self.get_head_by_contact(con_id),
self.get_locations_by_contact(con_id)
)
def get_phone_numbers_by_contact(self, con_id: int) -> List[PhoneNumber]:
with self._connection.cursor() as cur:
cur.execute("SELECT id, date_of_start, date_of_end "
"FROM phone_number where contact_info = %s",
(con_id,))
return [PhoneNumber(
ph_id,
date_start,
date_end
) for ph_id, date_start, date_end in cur.fetchall()]
def get_head_by_contact(self, con_id: int) -> List[Head]:
with self._connection.cursor() as cur:
cur.execute("SELECT id, date_of_start, date_of_end "
"FROM head WHERE contact_info = %s",
(con_id,))
cur.fetchall()
return [Head(id, date_start, date_end) for id, date_start, date_end
in cur]
def get_locations_by_contact(self, con_id: int) -> List[Location]:
with self._connection.cursor() as cur:
cur.execute(
"SELECT id, country, city, street, house, room, "
"date_of_start, date_of_end FROM location "
"where contact_info = %s",
(con_id,))
return [
Location(id, country, city, street, house, room, date_of_start,
date_of_end) for
id, country, city, street, house, room,
date_of_start, date_of_end
in cur.fetchall()]
def get_activities_by_division(self, div_id: int) -> List[Activity]:
with self._connection.cursor() as cursor:
cursor.execute(
"SELECT activity.id, name FROM activity "
"right join do_activity da on activity.id = da.activity "
"where division = %s",
(div_id,))
return [Activity(act_id, name) for act_id, name
in cursor.fetchall()]
def create_contact_info(self, info: ContactInfo):
with self._connection.cursor() as cur:
cur.execute("INSERT INTO contact_info (email) VALUES (%s) "
"RETURNING id", (info.email, ))
info.id = cur.fetchone()[0]
for head in info.head:
self.create_head(head, info.id)
for loc in info.locations:
self.create_location(loc, info.id)
for phone in info.phone_numbers:
self.create_phone_number(phone, info.id)
def create_head(self, head: Head, con_id: int):
with self._connection.cursor() as cur:
cur.execute("INSERT INTO head "
"(date_of_start, date_of_end, contact_info) "
"VALUES (%s, %s, %s) RETURNING id",
(head.date_of_start, head.date_of_end, con_id))
head.id = cur.fetchone()[0]
def create_location(self, loc: Location, con_id: int):
with self._connection.cursor() as cur:
cur.execute("INSERT INTO location (country, city, street, house, "
"room, date_of_start, date_of_end, contact_info) "
"VALUES (%s, %s, %s, %s, %s, %s, %s, %s) RETURNING id",
(loc.country, loc.city, loc.street, loc.house,
loc.room, loc.date_of_start, loc.date_of_end, con_id))
loc.id = cur.fetchone()[0]
def create_phone_number(self, phone: PhoneNumber, con_id: int):
with self._connection.cursor() as cur:
cur.execute("INSERT INTO phone_number "
"(contact_info, date_of_start, date_of_end) VALUES "
"(%s, %s, %s) RETURNING id",
(con_id, phone.date_of_start, phone.date_of_end))
phone.id = cur.fetchone()[0]
def create_division(self, division: Division, ent_id: int):
with self._connection.cursor() as cur:
self.create_contact_info(division.contact_info)
cur.execute("INSERT INTO division (enterprise, contact_info) "
"VALUES (%s, %s) RETURNING id",
(ent_id, division.contact_info.id))
division.id = cur.fetchone()[0]
for act in division.activities:
self.add_activity_to_division(division, act)
def create_enterprise(self, enterprise: Enterprise):
with self._connection.cursor() as cur:
self.create_contact_info(enterprise.contact_info)
cur.execute("INSERT INTO enterprise "
"(name, short_name, contact_info) "
"VALUES (%s, %s, %s) RETURNING id",
(enterprise.name,
enterprise.short_name,
enterprise.contact_info.id))
enterprise.id = cur.fetchone()[0]
for act in enterprise.activities:
self.add_activity_to_enterprise(enterprise, act)
for division in enterprise.divisions:
self.create_division(division, enterprise.id)
def create_activity(self, activity: Activity):
with self._connection.cursor() as cur:
cur.execute("INSERT INTO activity (name) VALUES (%s) RETURNING id",
(activity.name, ))
activity.id = cur.fetchone()[0]
def add_activity_to_division(self, division: Division, activity: Activity):
with self._connection.cursor() as cur:
cur.execute("INSERT INTO do_activity (activity, division) "
"VALUES (%s, %s)", (activity.id, division.id))
def add_activity_to_enterprise(self, enterprise: Enterprise,
activity: Activity):
with self._connection.cursor() as cur:
cur.execute("INSERT INTO do_activity (activity, enterprise) "
"VALUES (%s, %s)", (activity.id, enterprise.id))

75
data/models.py Executable file
View File

@ -0,0 +1,75 @@
from dataclasses import dataclass
from datetime import date
from typing import Optional, List
class Enterprise:
def __init__(
self,
_id: Optional[int],
activities: List['Activity'],
divisions: List['Division'],
contact_info: 'ContactInfo',
name: str,
short_name: str
):
self.id = _id
self.activities = activities
self.divisions = divisions
self.contact_info = contact_info
self.name = name
self.short_name = short_name
@dataclass
class PhoneNumber:
id: Optional[int]
date_of_start: date
date_of_end: Optional[date]
class ContactInfo:
def __init__(self, _id: Optional[int],
email: str,
phone_numbers: List[PhoneNumber],
head: List['Head'],
locations: List['Location']
):
self.id = _id
self.phone_numbers = phone_numbers
self.head = head
self.email = email
self.locations = locations
@dataclass
class Activity:
id: Optional[int]
name: str
@dataclass
class Division:
id: Optional[int]
contact_info: ContactInfo
activities: List[Activity]
@dataclass
class Head:
id: Optional[int]
date_of_start: date
date_of_end: Optional[date]
@dataclass
class Location:
id: Optional[int]
country: str
city: str
street: str
house: str
room: str
date_of_start: date
date_of_end: Optional[date]

0
data/test/__init__.py Normal file
View File

View File

@ -0,0 +1,85 @@
from datetime import date
import testing.postgresql
from unittest import TestCase
from data.data_provider import DataProvider
from data.models import ContactInfo, Head, Location, PhoneNumber, Division, \
Enterprise, Activity
class TestDataProvider(TestCase):
def setUp(self):
self.db = testing.postgresql.Postgresql()
self.dp = DataProvider(self.db.url())
self.dp.migrate()
def test_create_contact(self):
head = Head(None, date(1998, 10, 11), None)
loc = Location(None, "Russia", "Orenburg", "Elovaya", "16", "259A",
date(2010, 11, 28), None)
num = PhoneNumber(None, date(2018, 10, 10), None)
cont = ContactInfo(None, "hello@gmail.com", [num], [head], [loc])
self.dp.create_contact_info(cont)
print(f"ContactInfo id is {cont.id}")
self.assertIsNotNone(cont.id)
print(f"Head id is {head.id}")
self.assertIsNotNone(head.id)
print(f"Location id is {loc.id}")
self.assertIsNotNone(loc.id)
print(f"PhoneNumber id is {num.id}")
self.assertIsNotNone(num.id)
def test_create_enterprise(self):
head = Head(None, date(1998, 10, 11), None)
loc = Location(None, "Russia", "Orenburg", "Elovaya", "16", "259A",
date(2010, 11, 28), None)
num = PhoneNumber(None, date(2018, 10, 10), None)
cont = ContactInfo(None, "hello@gmail.com", [num], [head], [loc])
head_div = Head(None, date(1998, 10, 11), None)
loc_div = Location(None, "Russia", "Orenburg", "Elovaya", "16", "259A",
date(2010, 11, 28), None)
num_div = PhoneNumber(None, date(2018, 10, 10), None)
cont_div = ContactInfo(None, "eeeeee", [num_div],
[head_div], [loc_div])
div = Division(None, cont_div, [])
enterprise = Enterprise(None, [], [div], cont, "hello", "world")
self.dp.create_enterprise(enterprise)
self.assertIsNotNone(div.id)
self.assertIsNotNone(enterprise.id)
self.assertIsNotNone(cont.id)
def test_create_activity(self):
act = Activity(None, "some activity")
self.dp.create_activity(act)
self.assertIsNotNone(act.id)
def test_get_enterprise_activities(self):
act = Activity(None, "some activity")
self.dp.create_activity(act)
head = Head(None, date(1998, 10, 11), None)
loc = Location(None, "Russia", "Orenburg", "Elovaya", "16", "259A",
date(2010, 11, 28), None)
num = PhoneNumber(None, date(2018, 10, 10), None)
cont = ContactInfo(None, "hello@gmail.com", [num], [head], [loc])
head_div = Head(None, date(1998, 10, 11), None)
loc_div = Location(None, "Russia", "Orenburg", "Elovaya", "16", "259A",
date(2010, 11, 28), None)
num_div = PhoneNumber(None, date(2018, 10, 10), None)
cont_div = ContactInfo(None, "eeeeee", [num_div],
[head_div], [loc_div])
div = Division(None, cont_div, [act])
enterprise = Enterprise(None, [act], [div], cont, "hello", "world")
self.dp.create_enterprise(enterprise)
test_enterprise = self.dp.get_enterprise_by_id(enterprise.id)
self.assertGreater(len(test_enterprise.activities), 0)
self.assertEqual(test_enterprise.activities[0].name, act.name)
self.assertEqual(len(test_enterprise.divisions), 1)
self.assertEqual(len(test_enterprise.divisions[0].activities), 1)
self.assertEqual(test_enterprise.divisions[0].activities[0].name,
act.name)
self.assertEqual(len(self.dp.get_enterprises()), 1)
def tearDown(self):
self.db.stop()

14
dodo.py Executable file
View File

@ -0,0 +1,14 @@
#! /usr/bin/env python3
def task_test():
return {
'actions': ['pytest'],
'verbosity': 2,
}
if __name__ == '__main__':
import doit
doit.run(globals())

3
requirements-dev.txt Normal file
View File

@ -0,0 +1,3 @@
pytest==4.0.2
doit==0.31.1
testing.postgresql==1.3.0