from os import getenv, environ import hvac from oracledb import Error, create_pool, init_oracle_client, CLOB from fastapi import HTTPException from dateutil import parser init_oracle_client() class SimpleDB: def __init__(self): self._connect_hvac() self.pool init_oracle_client() def _connect_hvac(self): """ Подключение к Vault и получение параметров БД """ try: client = hvac.Client( url='https://vlt.dataekb.ru:8222', token=environ.get('VAULT_TOKEN'), ) read_response = client.secrets.kv.v2.read_secret_version( path='oracledb', mount_point='kv' ) # Создаем пул соединений self.pool = create_pool( user=read_response['data']['data']['user'], password=read_response['data']['data']['password'], dsn=read_response['data']['data']['cs'], min=2, max=10, increment=1 ) except Exception as e: raise HTTPException(status_code=500, detail=f"Database connection failed: {e}") def data_transfer_in_database(self, dict_data: dict): """ Передача данных в базу Процедура на вставку: P_RK_GOVERNMENT_REPORTS_INSERS ( ID IN NUMBER, ORGAN IN VARCHAR2, NAMES IN VARCHAR2, DATE_FROM in date, DATE_TO in date, VERS IN VARCHAR2) """ if not self.pool: raise HTTPException(status_code=500, detail="Database pool not initialized") if not dict_data: raise ValueError(dict_data, "No data to process") try: with self.pool.acquire() as connection: with connection.cursor() as cursor: result_var = cursor.var(CLOB) # Только один параметр - выходной! cursor.callproc('P_RK_GOVERNMENT_REPORTS_INSERS', [result_var]) json_result = result_var.getvalue() print("JSON результат:", json_result) # Узнать сигнатуру процедуры cursor.execute(""" SELECT position, argument_name, data_type, in_out FROM all_arguments WHERE object_name = 'P_RK_GOVERNMENT_REPORTS_INSERS' ORDER BY position """) print("Сигнатура процедуры P_RK_GOVERNMENT_REPORTS_INSERS:") for arg in cursor: # Преобразуем все в строки для безопасного вывода pos_str = str(arg[0]) if arg[0] is not None else "0" name_str = str(arg[1]) if arg[1] is not None else "RETURN" type_str = str(arg[2]) if arg[2] is not None else "UNKNOWN" dir_str = str(arg[3]) if arg[3] is not None else "UNKNOWN" print(f"Позиция {pos_str}: {name_str} ({type_str}) - {dir_str}") for id, value in dict_data.items(): if len(value) != 5: continue organ, names, date_from_str, date_to_str, ver = value # Парсим даты date_from = parser.parse(date_from_str).date() date_to = parser.parse(date_to_str).date() if date_to_str else None print("Вывод отправляемых агрументов: ", int(id), organ, names, date_from, date_to, ver) print(type(int(id)), type(organ), type(names), type(date_from), type(date_to), type(ver)) result_var = cursor.var(CLOB) cursor.callproc('P_RK_GOVERNMENT_REPORTS_INSERS', [ int(id), organ, names, date_from, date_to, ver, result_var # OUT-параметр всегда последний ]) json_result = result_var.getvalue() print("JSON результат:", json_result) except Error as e: raise HTTPException(status_code=500, detail=f"Database error: {e}")