первая тестовая рабочия версия и тестирование докер.

main
Лев 2025-08-27 10:13:20 +05:00
parent eecaab9775
commit 3b753c1722
4 changed files with 141 additions and 85 deletions

11
Jenkinsfile vendored
View File

@ -4,16 +4,19 @@ pipeline {
stages { stages {
stage('recreate > build > reun container') { stage('recreate > build > reun container') {
steps { steps {
sh "docker images"
sh "docker-compose down --rmi all"
sh "docker images"
sh "docker-compose up --force-recreate --build -d parse_saby" sh "docker-compose up --force-recreate --build -d parse_saby"
sh "docker image prune -f" sh "docker images"
sh "docker image remove -f"
sh "docker images"
} }
} }
stage('show logs') { stage('show logs') {
steps { steps {
sh ''' sh "docker-compose logs -f"
docker-compose logs -f
'''
} }
} }
} }

View File

@ -1,5 +1,6 @@
import working_database import working_database
import parse_saby import parse_saby
#parse_saby.process_reports_parse() result_dict_data = parse_saby.process_reports_parse()
working_database.connect_hvac() test = working_database.SimpleDB()
test.data_transfer_in_database(result_dict_data)

View File

@ -13,6 +13,7 @@ def parse_html(url: str):
if not(200 <= response.status_code <= 299): if not(200 <= response.status_code <= 299):
# Повторный запрос # Повторный запрос
response = requests.get(url) response = requests.get(url)
if not(200 <= response.status_code <= 299):
print("Ошибка при запросе: ", response.status_code) print("Ошибка при запросе: ", response.status_code)
return response.status_code return response.status_code
# Создание обьекта BeautifulSoup(HTML страница) # Создание обьекта BeautifulSoup(HTML страница)
@ -30,53 +31,56 @@ def parse_date_report(url: str):
""" """
# HTML report # HTML report
soup = parse_html(url) soup = parse_html(url)
# Если не удалось отправить запрос, ворзвращаем URL и код ошибки. # Проверка на ошибку:
# Они будут занесы в файл. if soup == int: raise ValueError("Объект soup не должен быть None")
if soup == int:
return url, soup
# Поиск в HTML строки ввида: 'Действующий формат (с 10.01.23) 5.01' # Поиск в HTML строки ввида: 'Действующий формат (с 10.01.23) 5.01'
div_element = soup.find('div', class_='controls-Dropdown__text') div_element = soup.find('div', class_='controls-Dropdown__text')
# Извлекаем текст из элемента # Извлекаем текст из элемента
text = div_element.get_text() text = div_element.get_text()
# Парсим нужные данные # Парсим нужные данные.
# Уловия ловит: text = 'Действующий формат 02-04-2025 _2025_001' if '_' in text: # Уовия ловит: text = 'Действующий формат 02-04-2025 _2025_001
if '_' in text:
regex = r'\w+\s\w+\s(\S+) (\S+)' regex = r'\w+\s\w+\s(\S+) (\S+)'
date, id_date = re.search(regex, text).groups() date, id_date = re.search(regex, text).groups()
return date, id_date return date, None, id_date
# Все остальные else: # Все остальные
else: regex = r'(\d{2}\D\d{2}\D\d{2})' \
regex = r'(\d{2}\D\d{2}\D\d{2})(?:(?:.+)?\))? #?(\d+(?:\D\d+)?)' r'(?:\D+(\d{1,2}\D\d{2}\D\d{2}))?.*?\)' \
date, id_date = re.search(regex, text).groups() r'\s*#?\s*(\d+(?:\D\d+)?)'
return date, id_date match = re.search(regex, text)
from_date = match.group(1) # Первая дата (обязательная)
to_date = match.group(2) # Вторая дата (может быть None)
version = match.group(3) # Число (обязательное)
return from_date, to_date, version
def parse_reports(soup=BeautifulSoup, # HTML объект def parse_reports(soup:BeautifulSoup, # HTML объект
report_title = str, # строка ввида: 'report/fns' report_title:str, # строка ввида: 'report/fns'
url_formats = str, # Строка ввида: 'https://formats.saby.ru' url_formats:str, # Строка ввида: 'https://formats.saby.ru'
name_title = str): # имя тайтла: 'fns' name_title:str): # имя тайтла: 'fns'
""" """
Достаются все необходимые данные, возвращаются в ввиде словаря: Достаются все необходимые данные, возвращаются в ввиде словаря:
{106538: ('fns', 'НД по косвенным налогам', '01.08.23', '5.04')} {106538: ('fns', 'НД по косвенным налогам', '01.08.23', '5.04')}
""" """
result_dict_data = {} result_dict_data = {}
# Перебарает все URL, ищутся по тегу 'a' # Перебарает все URL, ищутся по тегу 'a'
for link in soup.find_all('a'): for link in soup.find_all('a'):
try:
# Ищет по тегу: href # Ищет по тегу: href
href = link.get('href') href = link.get('href')
if f'{report_title}/' in href: if f'{report_title}/' in href:
# id report # id report
id = href.rstrip('/').split('/')[-1] id = href.rstrip('/').split('/')[-1]
#URL report # URL report
url_report = f'{url_formats}{href}' url_report = f'{url_formats}{href}'
link = soup.find('a', href=href) link = soup.find('a', href=href)
# Name report # Name report
span = link.find('span', class_="ProxySbisRu__registry-BrowserItem_typeName") span = link.find('span', class_="ProxySbisRu__registry-BrowserItem_typeName")
# Данные получены из url после парсинга # Данные получены из url после парсинга
date, id_date = parse_date_report(url_report) from_date, to_date, version = parse_date_report(url_report)
# Добавление всех данных в итоговый словарь # Добавление всех данных в итоговый словарь
result_dict_data.update({id: (name_title, span.text, date, id_date)}) result_dict_data.update({id: (name_title, span.text, from_date, to_date, version)})
except Exception as e:
print(f"Ошибка при обработке отчета {report_title}: {str(e)}")
continue
return result_dict_data return result_dict_data
def write_report_data(dict_name:dict, name_title:str): def write_report_data(dict_name:dict, name_title:str):
@ -87,8 +91,6 @@ def write_report_data(dict_name:dict, name_title:str):
... ...
""" """
#Блок для красивого офорлмения файла #Блок для красивого офорлмения файла
# Вычисляем количество подчёркиваний слева и справа
def center_text(): def center_text():
""" """
Функия парсит сколько нужно подчеркивания, Функия парсит сколько нужно подчеркивания,
@ -107,37 +109,34 @@ def write_report_data(dict_name:dict, name_title:str):
list_result.append('_' * left + text + '_' * right) list_result.append('_' * left + text + '_' * right)
return list_result[:2] return list_result[:2]
dash_start, dash_end = center_text() dash_start, dash_end = center_text()
#Конец блока #Конец блока
#Запись в файл с красивым офрмление в виде нижнего подчеркивания
#Вывод с красивым офрмление в виде нижнего подчеркивания
print(f'\n{dash_start}\n') print(f'\n{dash_start}\n')
for key, value in dict_name.items(): for key, value in dict_name.items():
str_k_v = f'{key}: {value}\n' str_k_v = f'{key}: {value}\n'
print(str_k_v, end='') print(str_k_v, end='')
print(f'{dash_end}\n') print(f'{dash_end}\n')
def search_title(): def search_title(url_format_report = 'https://formats.saby.ru/report'):
""" """
Функция ищет все тайтлы на странице formats.saby.ru/report. Функция ищет все тайтлы на странице formats.saby.ru/report.
Парамметры функции:
url по которому в котором будет происходить поиск
Возвращает: Возвращает:
Список URL-путей, например: ['/report/fns', '/report/example', ...] Список URL-путей, например: ['/report/fns', '/report/example', ...]
Исключения: Исключения:
ValueError: Если запрос к странице завершился с ошибкой (неверный статус). ValueError: Если запрос к странице завершился с ошибкой (неверный статус).
""" """
# url по которому в котором будет происходить поиск,
url_format_report = 'https://formats.saby.ru/report'
# Получаем HTML-cтраницу # Получаем HTML-cтраницу
html = parse_html(url_format_report) html = parse_html(url_format_report)
# Проверяем, что html не является кодом ошибки (int) # Проверяем, что html не является кодом ошибки (int)
if isinstance(html, int): if isinstance(html, int):
error_message = f'Ошибка при запросе {url_format_report}: {html}' error_message = f'Ошибка при запросе {url_format_report}: {html}'
raise Exception(error_message) raise Exception(error_message)
# < в который будут заноситься тайтлы # Множество в который будут заноситься тайтлы
report_urls = set() report_urls = set()
# Ищем все ссылки <a> с href, соответствующие шаблону /report/{title} # Ищем все ссылки <a> с href, соответствующие шаблону /report/{title}
@ -146,43 +145,44 @@ def search_title():
href = link['href'] href = link['href']
# Проверям что href содержит: /report/{title} # Проверям что href содержит: /report/{title}
if re.search(r'\/report\/(\w+)$', href): if re.search(r'\/report\/(\w+)$', href):
report_urls .add(href) report_urls.add(href)
return report_urls return report_urls
def process_reports_parse(): def process_reports_parse(url_formats = 'https://formats.saby.ru'):
""" """
Функция пробегается по каждому тайтлу. Функция пробегается по каждому тайтлу.
Для всех записей(reports) выполняется запрос HTML страницы, Для всех записей(reports) выполняется запрос HTML страницы,
которая парситься в объект BeautifulSoup(HTML страница). которая парситься в объект BeautifulSoup(HTML страница).
Из это обьекта достаются не обходимые данные, Из это обьекта достаются не обходимые данные
которые записываются в текстовый файл. Параметр функции:
url_formats - используется для создание полных URL
""" """
# Лист имеет вид: ['/report/fns', '/report/sfr'...] # Лист имеет вид: ['/report/fns', '/report/sfr'...]
list_title = search_title() set_title = search_title()
dict_result = {}
for report_title in list_title: for report_title in set_title:
try: try:
# используется для создание полных URL
url_formats = 'https://formats.saby.ru'
# Получаем имя тайтла через парсинг # Получаем имя тайтла через парсинг
name_title = report_title.rstrip('/').split('/')[-1] name_title = report_title.rstrip('/').split('/')[-1]
# URL на конкретный title # URL на конкретный title
url_title = f'{url_formats}{report_title}' url_title = f'{url_formats}{report_title}'
# Объект HTML, конкретного title # Объект HTML, одного(конкретного) title
soup = parse_html(url_title) soup = parse_html(url_title)
if isinstance(soup, int):
print(f'Не удалось установить соедение c {url_title}')
continue
# Словарь с нужными данными # Словарь с нужными данными
dict_result = parse_reports(soup, report_title, url_formats, name_title) dict_result_title = parse_reports(soup, report_title, url_formats, name_title)
dict_result.update(dict_result_title)
# Запись данных в текстовый файл # Вывод на стандратный поток вывода
write_report_data(dict_result, name_title) write_report_data(dict_result, name_title)
except Exception as e: except Exception as e:
print(f"Ошибка при обработке отчета {report_title}: {str(e)}") print(f"Ошибка при обработке отчета {report_title}: {str(e)}")
continue continue
return dict_result
if __name__ == '__main__': if __name__ == '__main__':
process_reports_parse() process_reports_parse()

View File

@ -1,21 +1,73 @@
from os import getenv, environ from os import getenv, environ
import hvac import hvac
from oracledb import Error, create_pool
from fastapi import HTTPException
from dateutil import parser
class SimpleDB:
def __init__(self):
self._connect_hvac()
self.pool
def connect_hvac(): def _connect_hvac(self):
"""
Подключение к Vault и получение параметров БД
"""
try:
client = hvac.Client( client = hvac.Client(
url='https://vlt.dataekb.ru:8222', url='https://vlt.dataekb.ru:8222',
token= environ.get('VAULT_TOKEN'), token=environ.get('VAULT_TOKEN'),
) )
print(f"Token exists: {bool(getenv('VAULT_TOKEN'))}")
print(f"Authenticated: {client.is_authenticated()}")
read_response = client.secrets.kv.v2.read_secret_version(path='oracledb', mount_point='kv') read_response = client.secrets.kv.v2.read_secret_version(
pw = read_response['data']['data']['password'] path='oracledb',
un = read_response['data']['data']['user'] mount_point='kv'
cs = read_response['data']['data']['cs'] )
print(read_response) # Создаем пул соединений
print(pw) self.pool = create_pool(
print(un) user=read_response['data']['data']['user'],
print(cs) password=read_response['data']['data']['password'],
dsn=read_response['data']['data']['cs'],
min=2,
max=10,
increment=1
)
except Exception as e:
raise HTTPException(status_code=500, detail=f"Database connection failed: {e}")
def data_transfer_in_database(self, dict_data: dict):
"""
Передача данных в базу
Процедура на вставку:
P_RK_GOVERNMENT_REPORTS_INSERS
(
ID IN NUMBER,
ORGAN IN VARCHAR2,
NAMES IN VARCHAR2,
DATE_FROM in date,
DATE_TO in date,
VERS IN VARCHAR2)
"""
if not self.pool:
raise HTTPException(status_code=500, detail="Database pool not initialized")
if not dict_data:
raise ValueError(dict_data, "No data to process")
try:
with self.pool.acquire() as connection:
with connection.cursor() as cursor:
for id, value in dict_data.items():
if len(value) != 5:
continue
organ, names, date_from_str, date_to_str, ver = value
# Парсим даты
date_from = parser.parse(date_from_str)
if date_to != None: date_to = parser.parse(date_to_str)
# тестого
print(value)
cursor.callproc('P_RK_SEND_JSON_LIST_FACEACC', [int(id), organ,names, date_from, date_to, ver])
except Error as e:
raise HTTPException(status_code=500, detail=f"Database error: {e}")