Last active
February 25, 2023 09:11
-
-
Save alimuradov/8d0ad59676e334f8585e3def96029041 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import csv | |
from datetime import datetime | |
from fastapi import FastAPI, File, UploadFile, APIRouter, Security, Depends | |
from fastapi.responses import JSONResponse | |
from sqlalchemy.ext.asyncio import AsyncSession | |
from app.models.user import User | |
from app.api.deps import get_current_active_user, get_session | |
from app.core.constants import Roles | |
from app.models.user import User | |
from app.models.appointment import Appointment | |
from app.models.patient import Patient | |
from app.utils import generate_phone_number, generate_random_email, generate_random_color, convert_uuid1_to_uuid4 | |
from app.core import settings, security | |
router = APIRouter() | |
@router.post("/csv/upload_users") | |
async def upload_users_from_csv( | |
db: AsyncSession = Depends(get_session), | |
file: UploadFile = File(...), | |
current_user: User = Security( | |
get_current_active_user, | |
scopes=[Roles.super_admin, Roles.admin, Roles.chief],) | |
): | |
"""Загрузка пользователей""" | |
contents = await file.read() | |
decoded_file = contents.decode("utf-8").splitlines() | |
fieldnames = ['id', 'first_name', 'last_name', 'patronymic', 'phone', 'gender', \ | |
'birthday'] | |
reader = csv.DictReader(decoded_file, fieldnames=fieldnames, delimiter='|') | |
# пропускаем первую строку | |
next(reader) | |
for row in reader: | |
phone_number = row['phone'] if row['phone'] else generate_phone_number() | |
user = User( | |
id=convert_uuid1_to_uuid4(row['id']), | |
first_name=row['first_name'], | |
last_name=row['last_name'], | |
patronymic=row['patronymic'], | |
email=generate_random_email(), | |
phone = phone_number, | |
color=generate_random_color(), | |
hashed_password=security.get_hashed_password(phone_number), | |
is_active=True, | |
role="doctor", | |
gender=row['gender'], | |
birthday=datetime.fromisoformat(row['birthday']), | |
clinic_id=current_user.clinic_id, | |
account_id=current_user.account_id | |
) | |
db.add(user) | |
await db.commit() | |
return JSONResponse("Ok") | |
@router.post("/csv/upload_patients") | |
async def upload_patients_from_csv( | |
db: AsyncSession = Depends(get_session), | |
file: UploadFile = File(...), | |
current_user: User = Security( | |
get_current_active_user, | |
scopes=[Roles.super_admin, Roles.admin, Roles.chief],) | |
): | |
"""Загрузка пациентов""" | |
contents = await file.read() | |
decoded_file = contents.decode("utf-8").splitlines() | |
fieldnames = ['id', 'first_name', 'last_name', 'patronymic', 'phone', 'gender', \ | |
'birthday', 'address'] | |
reader = csv.DictReader(decoded_file, fieldnames=fieldnames, delimiter='|') | |
# пропускаем первую строку | |
next(reader) | |
for row in reader: | |
patient = Patient( | |
id=convert_uuid1_to_uuid4(row['id']), | |
first_name=row['first_name'], | |
last_name=row['last_name'], | |
patronymic=row['patronymic'], | |
phone = row['phone'], | |
address = row['address'], | |
gender=row['gender'], | |
birthday=datetime.fromisoformat(row['birthday']), | |
clinic_id=current_user.clinic_id | |
) | |
db.add(patient) | |
await db.commit() | |
return JSONResponse("Ok") | |
@router.post("/csv/upload_appointments") | |
async def upload_appointments_from_csv( | |
db: AsyncSession = Depends(get_session), | |
file: UploadFile = File(...), | |
current_user: User = Security( | |
get_current_active_user, | |
scopes=[Roles.super_admin, Roles.admin, Roles.chief],) | |
): | |
"""Загрузка записей на прием""" | |
contents = await file.read() | |
decoded_file = contents.decode("utf-8").splitlines() | |
fieldnames = ['id', 'number', 'patient_id', 'doctor_id', 'start', 'end', \ | |
'status'] | |
reader = csv.DictReader(decoded_file, fieldnames=fieldnames, delimiter='|') | |
# пропускаем первую строку | |
next(reader) | |
for row in reader: | |
try: | |
number = int(row['number']) | |
except ValueError: | |
number = None | |
if number: | |
appointment = Appointment( | |
id=convert_uuid1_to_uuid4(row['id']), | |
number = int(row['number']), | |
patient_id = convert_uuid1_to_uuid4(row['patient_id']), | |
doctor_id = convert_uuid1_to_uuid4(row['doctor_id']), | |
start = datetime.fromisoformat(row['start']), | |
end = datetime.fromisoformat(row['end']), | |
status = int(row['status']), | |
clinic_id = current_user.clinic_id, | |
notify_to_whatsapp = False, | |
notify_to_telegram=False, | |
notify_to_sms=False | |
) | |
db.add(appointment) | |
await db.commit() | |
return JSONResponse("Ok") |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment