Skip to content

Instantly share code, notes, and snippets.

@alimuradov
Last active February 25, 2023 09:11
Show Gist options
  • Save alimuradov/8d0ad59676e334f8585e3def96029041 to your computer and use it in GitHub Desktop.
Save alimuradov/8d0ad59676e334f8585e3def96029041 to your computer and use it in GitHub Desktop.
import csv
from datetime import datetime
from fastapi import FastAPI, File, UploadFile, APIRouter, Security, Depends
from fastapi.responses import JSONResponse
from sqlalchemy.ext.asyncio import AsyncSession
from app.models.user import User
from app.api.deps import get_current_active_user, get_session
from app.core.constants import Roles
from app.models.user import User
from app.models.appointment import Appointment
from app.models.patient import Patient
from app.utils import generate_phone_number, generate_random_email, generate_random_color, convert_uuid1_to_uuid4
from app.core import settings, security
router = APIRouter()
@router.post("/csv/upload_users")
async def upload_users_from_csv(
db: AsyncSession = Depends(get_session),
file: UploadFile = File(...),
current_user: User = Security(
get_current_active_user,
scopes=[Roles.super_admin, Roles.admin, Roles.chief],)
):
"""Загрузка пользователей"""
contents = await file.read()
decoded_file = contents.decode("utf-8").splitlines()
fieldnames = ['id', 'first_name', 'last_name', 'patronymic', 'phone', 'gender', \
'birthday']
reader = csv.DictReader(decoded_file, fieldnames=fieldnames, delimiter='|')
# пропускаем первую строку
next(reader)
for row in reader:
phone_number = row['phone'] if row['phone'] else generate_phone_number()
user = User(
id=convert_uuid1_to_uuid4(row['id']),
first_name=row['first_name'],
last_name=row['last_name'],
patronymic=row['patronymic'],
email=generate_random_email(),
phone = phone_number,
color=generate_random_color(),
hashed_password=security.get_hashed_password(phone_number),
is_active=True,
role="doctor",
gender=row['gender'],
birthday=datetime.fromisoformat(row['birthday']),
clinic_id=current_user.clinic_id,
account_id=current_user.account_id
)
db.add(user)
await db.commit()
return JSONResponse("Ok")
@router.post("/csv/upload_patients")
async def upload_patients_from_csv(
db: AsyncSession = Depends(get_session),
file: UploadFile = File(...),
current_user: User = Security(
get_current_active_user,
scopes=[Roles.super_admin, Roles.admin, Roles.chief],)
):
"""Загрузка пациентов"""
contents = await file.read()
decoded_file = contents.decode("utf-8").splitlines()
fieldnames = ['id', 'first_name', 'last_name', 'patronymic', 'phone', 'gender', \
'birthday', 'address']
reader = csv.DictReader(decoded_file, fieldnames=fieldnames, delimiter='|')
# пропускаем первую строку
next(reader)
for row in reader:
patient = Patient(
id=convert_uuid1_to_uuid4(row['id']),
first_name=row['first_name'],
last_name=row['last_name'],
patronymic=row['patronymic'],
phone = row['phone'],
address = row['address'],
gender=row['gender'],
birthday=datetime.fromisoformat(row['birthday']),
clinic_id=current_user.clinic_id
)
db.add(patient)
await db.commit()
return JSONResponse("Ok")
@router.post("/csv/upload_appointments")
async def upload_appointments_from_csv(
db: AsyncSession = Depends(get_session),
file: UploadFile = File(...),
current_user: User = Security(
get_current_active_user,
scopes=[Roles.super_admin, Roles.admin, Roles.chief],)
):
"""Загрузка записей на прием"""
contents = await file.read()
decoded_file = contents.decode("utf-8").splitlines()
fieldnames = ['id', 'number', 'patient_id', 'doctor_id', 'start', 'end', \
'status']
reader = csv.DictReader(decoded_file, fieldnames=fieldnames, delimiter='|')
# пропускаем первую строку
next(reader)
for row in reader:
try:
number = int(row['number'])
except ValueError:
number = None
if number:
appointment = Appointment(
id=convert_uuid1_to_uuid4(row['id']),
number = int(row['number']),
patient_id = convert_uuid1_to_uuid4(row['patient_id']),
doctor_id = convert_uuid1_to_uuid4(row['doctor_id']),
start = datetime.fromisoformat(row['start']),
end = datetime.fromisoformat(row['end']),
status = int(row['status']),
clinic_id = current_user.clinic_id,
notify_to_whatsapp = False,
notify_to_telegram=False,
notify_to_sms=False
)
db.add(appointment)
await db.commit()
return JSONResponse("Ok")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment