parse_saby/app/working_database.py

108 lines
4.3 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

from os import environ
import hvac
import hvac.exceptions
from oracledb import Error, create_pool, init_oracle_client
init_oracle_client()
class SimpleDB:
def __init__(self):
self._create_db_pool_from_vault()
self.pool
init_oracle_client()
def _handle_vault_exception(self, e: Exception, message: str):
"""
Обработка исключений Vault с возвратом понятного сообщения.
"""
print(message)
if isinstance(e, hvac.exceptions.InvalidPath):
raise hvac.exceptions.InvalidPath("Database configuration not found in Vault")
elif isinstance(e, hvac.exceptions.Forbidden):
raise hvac.exceptions.Forbidden("Permission denied to access Vault secrets")
elif isinstance(e, hvac.exceptions.Unauthorized):
raise hvac.exceptions.Unauthorized("Invalid Vault token")
elif isinstance(e, hvac.exceptions.VaultError):
raise hvac.exceptions.VaultError(f"Vault secret retrieval failed: {e}")
elif isinstance(e, hvac.exceptions.InvalidRequest):
raise hvac.exceptions.InvalidRequest(f"Missing database parameter in Vault response: {e}")
elif isinstance(e, hvac.exceptions.VaultDown):
raise hvac.exceptions.VaultDown(f"Database server not available: {e}")
else:
raise Exception (f'Unexpected error reading from Vault: {e}')
#TODO: rename +
def _create_db_pool_from_vault(self):
"""
Подключение к Vault и получение параметров для подключеник к БД
"""
#TODO разнести в разные try_exception
try:
# Подключение к Vault
client = hvac.Client(
url='https://vlt.dataekb.ru:8222',
token=environ.get('VAULT_TOKEN'),
)
except Exception as e:
self._handle_vault_exception(e, "Ошибка при создание покдлючения c Vault")
try:
# Чтение секретов из Vault
secret_read_response = client.secrets.kv.v2.read_secret_version(
path='oracledb',
mount_point='kv'
)
except Exception as e:
self._handle_vault_exception(e, "Ошибка чтение скретов из Vault")
try:
# Создаем пул соединений
self.pool = create_pool(
user=secret_read_response['data']['data']['user'],
password=secret_read_response['data']['data']['password'],
dsn=secret_read_response['data']['data']['cs'],
min=2,
max=10,
increment=1
)
except Exception as e:
self._handle_vault_exception(e, "Ошибка при создание пула для подключение к Oracle")
def data_transfer_in_database(self, list_data: list):
"""
Передача данных в базу
Процедура на вставку:
P_RK_GOVERNMENT_REPORTS_INSERS
(
ID IN NUMBER,
ORGAN IN VARCHAR2,
NAMES IN VARCHAR2,
DATE_FROM in date,
DATE_TO in date,
VERS IN VARCHAR2)
"""
if not self.pool:
raise ValueError(self.pool, "Database pool not initialized")
if not list_data:
raise ValueError(list_data, "No data to process")
try:
with self.pool.acquire() as connection:
with connection.cursor() as cursor:
for list_argument_bd in list_data:
id, organ, names, date_from, date_to, ver = list_argument_bd
cursor.callproc('P_RK_GOVERNMENT_REPORTS_INSERS', [
int(id),
organ,
names,
date_from,
date_to,
ver,
])
except Error as e:
raise Error(f'Ошибка при отправке данных в БД: {e}')
except Exception as e:
raise Error(f'Неожиданная ошибка: {e}')