Compare commits

..

No commits in common. "main" and "@v1" have entirely different histories.
main ... @v1

12 changed files with 120 additions and 322 deletions

View File

@ -1,16 +1,30 @@
# Наследуем от Oracle образа и добавляем Python
FROM proxy.docker.dataekb.ru/local_cache/oracleclient_docker:stable
# отключает буферизацию вывода Python.
FROM python:3.11-slim-bookworm
ENV PYTHONUNBUFFERED=1
RUN apt-get update
RUN apt-get -y install wget libaio1 unzip alien
RUN cd /home && wget -q https://download.oracle.com/otn_software/linux/instantclient/2112000/oracle-instantclient-basic-21.12.0.0.0-1.el8.x86_64.rpm &&\
alien -i --scripts /home/oracle-instantclient-basic-21.12.0.0.0-1.el8.x86_64.rpm &&\
rm /home/oracle-instantclient-basic-21.12.0.0.0-1.el8.x86_64.rpm &&\
export LD_LIBRARY_PATH=/usr/lib/oracle/21.12/client64/lib/${LD_LIBRARY_PATH:+:$LD_LIBRARY_PATH}
RUN cd /home/ && wget -q https://download.oracle.com/otn_software/linux/instantclient/2112000/oracle-instantclient-sqlplus-21.12.0.0.0-1.el8.x86_64.rpm &&\
alien -i --scripts /home/oracle-instantclient-sqlplus-21.12.0.0.0-1.el8.x86_64.rpm &&\
rm /home/oracle-instantclient-sqlplus-21.12.0.0.0-1.el8.x86_64.rpm
RUN cd /home/ && wget -q https://download.oracle.com/otn_software/linux/instantclient/2112000/oracle-instantclient-devel-21.12.0.0.0-1.el8.x86_64.rpm && \
alien -i --scripts /home/oracle-instantclient-devel-21.12.0.0.0-1.el8.x86_64.rpm &&\
rm /home/oracle-instantclient-devel-21.12.0.0.0-1.el8.x86_64.rpm &&\
ldconfig
EXPOSE 8000
WORKDIR /code/app
COPY ./requirements.txt /code/requirements.txt
COPY ./app /code/app
RUN python3 -m pip install --upgrade pip
RUN echo Y | python3 -m pip install --no-cache-dir --upgrade -r /code/requirements.txt
RUN python3.11 -m pip install --upgrade pip
RUN echo Y | python3.11 -m pip install --no-cache-dir --upgrade -r /code/requirements.txt
CMD ["python", "main.py"]

10
Jenkinsfile vendored
View File

@ -2,9 +2,15 @@ pipeline {
agent { label 'agent_smith' }
stages {
stage('recreate > build > run container') {
stage('recreate > build > reun container') {
steps {
sh "docker-compose up -d --force-recreate --build parse_saby"
sh "docker-compose up --force-recreate --build -d parse_saby"
}
}
stage('show logs') {
steps {
sh "docker-compose logs -f"
}
}
}

View File

@ -1,78 +0,0 @@
# About
- Получаем и парсим данные с https://formats.saby.ru/
- Выполняем валидацию данных для БД
- Отправляем данные в БД oracle
- Планировщик настроенный на определеное время. Повторящий все процессы описанные в About сверху вниз.
# Usage
**Боевой** запуск через **main.py**
**Тестовый** запуск чере **test.py**. В test.py не выполняется подключение к БД,
выполяется лишь подготовка данных. Благодоря этому **не имеет значение где запщуен код**.
Можно полностью отследить процесс парсинга и валидации.
# How it's works
Запуск кода осущетсвляется через main.py или test.py
**Важно:** в test.py не выполняется 4 и 5 шаг.
## 1. Получение данных с сайта.
C перва запускается скрипт parse_saby.py
```
result_dict_data = parse_saby.process_reports_parse()
```
Основным модулем для забора нужных данных является bs4 с классом BeautifulSoup.
BeautifulSoup представляет объект html страницы, это позволяет обращаеться по тэгам,
что бы достать нужные данные. Модуль возвращает cписок словарей. Пример:
```
result_dict_data = [{128513: ('fns', 'Уведомление о налогах для ЕНП', '01.07.22', None, '5.03')},
{132526: ('sfr', 'АДВ-1 Анкета застрахованного лица', '09.01.23', '31.12.34', '2.0')},
{...}]
```
## 2. parse_data_in_list()
```
list_data = parse_data_in_list(result_dict_data)
```
Легкий парсиннг для преобразование, нужное в дальнейшем.
## 3. Валидация данных
```
list_data_validated = DataValid.validate_data(list_data)
```
Тут из листа по листу передаем данные в класс DataValid из модуля validation.py.
DataValid наследуется от класса BaseModul модуля pydantic.
Сначало парсится дата, вторая дата в списке может быть None.
Потом проверяется соответсвие типов, и может выполняется явное преобразование.
Забираем валидные данные в новый лист.
Класс в рамках цикла пересоздается, для валидации следующего листа.
## 4. Отправка данных в БД
```
working_database.SimpleDB().data_transfer_in_database(list_data_validated)
```
### 4.1. Инциализация модуля working_database.py, подключение к hvac
_Сервер hvac настрое на работу в тихом режиме, реализуется с помощью: init_oracle_client()_
Тут выполняется сначало подключение к hvac серверу, получение секретов,
необохдимых для подключения к БД ```_create_db_pool_from_vault()```. Что бы подключиться к серверу hvac
используется секретный токен. Он забирается из переменной окружения ОС,
передается при создание контейнера(определенно в docker-compouse.yaml).
### 4.2. Подключение и отправка данных в БД
Метод класса ```data_transfer_in_database()``` получает данные для отправки в БД.
Данные имеют структуру лист словарей. Выполняется подключение к БД используя пул секретов из шага 4.1.
После чего передаются данные в процедуру P_RK_GOVERNMENT_REPORTS_INSERS.
## 5. Планировщик заданий.
```
scheduler.launch_the_scheduler()
```
Планировщик работает в фоновом режиме, пока не наступит заданое время.
Когда наступает заданое время запукает main.py. Время запуска по умолчанию 6 часов 0 минут.
Время можно изменить например на 9:30 следующим образом:
```
scheduler.launch_the_scheduler(h=9, m=30)
```

View File

@ -1,36 +1,6 @@
import working_database
import parse_saby
from validation import DataValid
import scheduler
import write_error_to_log
print("Запуск main.py")
def parse_data_in_list(dict_data: dict) -> list:
"""
argument:
[{128513: ('fns', 'Уведомление о налогах для ЕНП', '01.07.22', None, '5.03')},
{...}]
return:
[[128513, 'fns', 'Уведомление о налогах для ЕНП', '01.07.22', None, '5.03'],
[...]]
"""
result = []
for key_id, value_data in dict_data.items():
if len(value_data) != 5: continue
result.append([key_id, *value_data])
return result
print("Очиcтка лог файла")
write_error_to_log.clear_to_log()
print("1/5. Запуск парсинга сайта")
result_dict_data = parse_saby.process_reports_parse()
print('2/5. Предварительный парсинг')
list_data = parse_data_in_list(result_dict_data)
print("3/5. Запуск валидации")
list_data_validated = DataValid.validate_data(list_data)
print("4/5 Отправка данных в БД")
working_database.SimpleDB().data_transfer_in_database(list_data_validated)
print("5/5. Запуск планировщика")
scheduler.launch_the_scheduler()
test = working_database.SimpleDB()
test.data_transfer_in_database(result_dict_data)

View File

@ -1,11 +1,6 @@
# pyright: reportOptionalMemberAccess=false, reportAttributeAccessIssue=false, reportOperatorIssue=false
# pyright: reportArgumentType=false, reportIndexIssue=false, reportCallIssue=false, reportGeneralTypeIssues=false
# Подсветка синтаксиса отключена, т.к. тип данных везде кооректены и обрабатывается if.
import requests
from bs4 import BeautifulSoup
import re
import write_error_to_log
def parse_html(url: str):
"""
@ -14,13 +9,13 @@ def parse_html(url: str):
"""
# Запрос страницы
response = requests.get(url)
# Проверка статуса
# Проверка статуса в ответе на запрос
if not(200 <= response.status_code <= 299):
# Повторный запрос
response = requests.get(url)
if not(200 <= response.status_code <= 299):
print("Ошибка при запросе: ", response.status_code)
return response.status_code
if not(200 <= response.status_code <= 299):
print("Ошибка при запросе: ", response.status_code)
return response.status_code
# Создание обьекта BeautifulSoup(HTML страница)
soup = BeautifulSoup(response.text, 'html.parser')
return soup
@ -37,7 +32,7 @@ def parse_date_report(url: str):
# HTML report
soup = parse_html(url)
# Проверка на ошибку:
if soup == int: return None # Вызовит ошибку, что бы пропустить данный url
if soup == int: raise ValueError("Объект soup не должен быть None")
# Поиск в HTML строки ввида: 'Действующий формат (с 10.01.23) 5.01'
div_element = soup.find('div', class_='controls-Dropdown__text')
# Извлекаем текст из элемента
@ -52,15 +47,15 @@ def parse_date_report(url: str):
r'(?:\D+(\d{1,2}\D\d{2}\D\d{2}))?.*?\)' \
r'\s*#?\s*(\d+(?:\D\d+)?)'
match = re.search(regex, text)
from_date = match.group(1) # Первая дата (обязательная)
to_date = match.group(2) # Вторая дата (может быть None)
version = match.group(3) # Число (обязательное)
from_date = match.group(1) # Первая дата (обязательная)
to_date = match.group(2) # Вторая дата (может быть None)
version = match.group(3) # Число (обязательное)
return from_date, to_date, version
def parse_reports(soup:BeautifulSoup, # HTML объект
report_title:str, # Строка ввида: 'report/fns'
report_title:str, # строка ввида: 'report/fns'
url_formats:str, # Строка ввида: 'https://formats.saby.ru'
name_title:str): # Имя тайтла: 'fns'
name_title:str): # имя тайтла: 'fns'
"""
Достаются все необходимые данные, возвращаются в ввиде словаря:
{106538: ('fns', 'НД по косвенным налогам', '01.08.23', '5.04')}
@ -79,20 +74,21 @@ def parse_reports(soup:BeautifulSoup, # HTML объект
link = soup.find('a', href=href)
# Name report
span = link.find('span', class_="ProxySbisRu__registry-BrowserItem_typeName")
try:
# Данные получены из url после парсинга
from_date, to_date, version = parse_date_report(url_report)
except Exception: continue # Может быть ошибка если url не доступен
# Данные получены из url после парсинга
from_date, to_date, version = parse_date_report(url_report)
# Добавление всех данных в итоговый словарь
result_dict_data.update({id: (name_title, span.text, from_date, to_date, version)})
except Exception as e:
print(f"Ошибка при обработке отчета {report_title}: ", e)
print(f"Ошибка при обработке отчета {report_title}: {str(e)}")
continue
return result_dict_data
def print_report_data(dict_name:dict, name_title:str):
def write_report_data(dict_name:dict, name_title:str):
"""
Вывод на стандартный поток вывода итоговых данных
Сохраняем запись, каждая запись с новой строки:
'ключ: значение'
'ключ: значение'
...
"""
#Блок для красивого офорлмения файла
def center_text():
@ -126,17 +122,19 @@ def search_title(url_format_report = 'https://formats.saby.ru/report'):
"""
Функция ищет все тайтлы на странице formats.saby.ru/report.
Парамметры функции:
url в котором будет происходить поиск
url по которому в котором будет происходить поиск
Возвращает:
Список URL-путей, например: ['/report/fns', '/report/example', ...]
Исключения:
ValueError: Если запрос к странице завершился с ошибкой (неверный статус).
"""
# Получаем HTML-cтраницу
html = parse_html(url_format_report)
# Проверяем, что html не является кодом ошибки (int)
if isinstance(html, int):
raise Exception(f'Ошибка при запросе {url_format_report}: {html}')
error_message = f'Ошибка при запросе {url_format_report}: {html}'
raise Exception(error_message)
# Множество в который будут заноситься тайтлы
report_urls = set()
@ -159,6 +157,7 @@ def process_reports_parse(url_formats = 'https://formats.saby.ru'):
Параметр функции:
url_formats - используется для создание полных URL
"""
# Лист имеет вид: ['/report/fns', '/report/sfr'...]
set_title = search_title()
dict_result = {}
@ -179,15 +178,13 @@ def process_reports_parse(url_formats = 'https://formats.saby.ru'):
dict_result_title = parse_reports(soup, report_title, url_formats, name_title)
dict_result.update(dict_result_title)
# Вывод на стандратный поток вывода
print_report_data(dict_result_title, name_title)
write_report_data(dict_result, name_title)
except Exception as e:
print(f"Ошибка при обработке отчета {report_title}: {str(e)}")
error_message = f"ERROR-PARSE_SABY: {e} DATA: {report_title}"
continue
return dict_result
if __name__ == '__main__':
# Можно запустить отдельно от всего проекта
process_reports_parse()

View File

@ -1,20 +0,0 @@
from apscheduler.schedulers.blocking import BlockingScheduler
from apscheduler.triggers.cron import CronTrigger
import subprocess
def run_parser(filename = 'main.py'):
print(f"Запуск {filename}...")
subprocess.run(['python', filename])
def launch_the_scheduler(h=6, m=0):
scheduler = BlockingScheduler() # Создание планировщика
# Каждый день в 6:00 утра запуск run_parser()
scheduler.add_job(run_parser, trigger=CronTrigger(hour=h, minute=m))
print("Планировщик запущен. Нажмите Ctrl+C для остановки.")
try:
scheduler.start()
except KeyboardInterrupt:
print("Планировщик остановлен")
except Exception as e:
print("Не непредвиденная ошибка: ", e)

5
app/test.py 100644
View File

@ -0,0 +1,5 @@
import working_database
test_dict = {128513: ('fns', 'Уведомление о налогах для ЕНП', '01.01.2000', '01.01.2001', '1.0')}
Test = working_database.SimpleDB()
Test.data_transfer_in_database(test_dict)

View File

@ -1,54 +0,0 @@
from pydantic import BaseModel, field_validator
from datetime import date
from dateutil import parser
class DataValid(BaseModel):
id: int
organ: str
names: str
date_from: date
date_to: date | None
ver: str
# Дополнительный конструктор для списка
@classmethod
def from_list(cls, data_list: list):
return cls(
id=data_list[0],
organ=data_list[1],
names=data_list[2],
date_from=data_list[3],
date_to=data_list[4],
ver=data_list[5]
)
@field_validator('date_from', mode='before')
def parse_date_to(cls, date):
if isinstance(date, str):
try:
return parser.parse(date)
except (ValueError, TypeError): return date
return date
@field_validator('date_to', mode='before')
def parse_date_from(date):
if isinstance(date, str):
try:
return parser.parse(date)
except(ValueError, TypeError): return date
return date
@staticmethod
def validate_data(list_data: list[list]) -> list:
"""
Функция принимает список списков, выполняет парсинг и валидацию
с пмошью класса DataValid из файла validation.py.
Возвращает распршеный валидный список словарей для БД.
"""
result = []
for intem_list in list_data:
try:
validated_data = DataValid.from_list(intem_list) # Передаем один лист
result.append(validated_data.model_dump()) # Добаляем валидный и распаршеный словарь
except: continue
return result

View File

@ -1,76 +1,45 @@
from os import environ
from os import getenv, environ
import hvac
import hvac.exceptions
from oracledb import Error, create_pool, init_oracle_client
import write_error_to_log
from oracledb import Error, create_pool, init_oracle_client, DatabaseError
from fastapi import HTTPException
from dateutil import parser
import datetime
init_oracle_client()
class SimpleDB:
def __init__(self):
self._create_db_pool_from_vault()
self._connect_hvac()
self.pool
def _handle_vault_exception(self, e: Exception, message: str):
init_oracle_client()
def _connect_hvac(self):
"""
Обработка исключений Vault с возвратом понятного сообщения.
"""
print(message)
if isinstance(e, hvac.exceptions.InvalidPath):
raise hvac.exceptions.InvalidPath("Database configuration not found in Vault")
elif isinstance(e, hvac.exceptions.Forbidden):
raise hvac.exceptions.Forbidden("Permission denied to access Vault secrets")
elif isinstance(e, hvac.exceptions.Unauthorized):
raise hvac.exceptions.Unauthorized("Invalid Vault token")
elif isinstance(e, hvac.exceptions.VaultError):
raise hvac.exceptions.VaultError(f"Vault secret retrieval failed: {e}")
elif isinstance(e, hvac.exceptions.InvalidRequest):
raise hvac.exceptions.InvalidRequest(f"Missing database parameter in Vault response: {e}")
elif isinstance(e, hvac.exceptions.VaultDown):
raise hvac.exceptions.VaultDown(f"Database server not available: {e}")
else:
raise Exception (f'Unexpected error reading from Vault: {e}')
def _create_db_pool_from_vault(self):
"""
Подключение к Vault и создание пула соеденения.
Подключение к Vault и получение параметров БД
"""
try:
# Подключение к Vault
client = hvac.Client(
url='https://vlt.dataekb.ru:8222',
token=environ.get('VAULT_TOKEN'),
)
except Exception as e:
self._handle_vault_exception(e, "Ошибка при создание покдлючения c Vault")
# Проверка авторизации в Vault
if not client.is_authenticated():
raise Exception("Vault authentication failed")
try:
# Чтение секретов из Vault
secret_read_response = client.secrets.kv.v2.read_secret_version(
read_response = client.secrets.kv.v2.read_secret_version(
path='oracledb',
mount_point='kv'
)
except Exception as e:
self._handle_vault_exception(e, "Ошибка чтение скретов из Vault")
try:
# Создаем пул соединений
self.pool = create_pool(
user=secret_read_response['data']['data']['user'],
password=secret_read_response['data']['data']['password'],
dsn=secret_read_response['data']['data']['cs'],
user=read_response['data']['data']['user'],
password=read_response['data']['data']['password'],
dsn=read_response['data']['data']['cs'],
min=2,
max=10,
increment=1
)
except Exception as e:
self._handle_vault_exception(e, "Ошибка при создание пула для подключение к Oracle")
def data_transfer_in_database(self, list_data: list):
raise HTTPException(status_code=500, detail=f"Database connection failed: {e}")
def data_transfer_in_database(self, dict_data: dict):
"""
Передача данных в базу
Процедура на вставку:
@ -84,36 +53,41 @@ class SimpleDB:
DATE_TO in date,
VERS IN VARCHAR2)
"""
# Данные для БД не могут быть пустыми
if not list_data:
raise ValueError(list_data, "No data to process")
if not self.pool:
raise HTTPException(status_code=500, detail="Database pool not initialized")
if not dict_data:
raise ValueError(dict_data, "No data to process")
try:
with self.pool.acquire() as connection:
with connection.cursor() as cursor:
for dict_argument_bd in list_data:
print("Отправляемые аргрументы: ", dict_argument_bd)
print("Типы данных: ", *map(type, dict_argument_bd.values()))
try:
cursor.callproc('P_RK_GOVERNMENT_REPORTS_INSERS', [
dict_argument_bd['id'],
dict_argument_bd['organ'],
dict_argument_bd['names'],
dict_argument_bd['date_from'],
dict_argument_bd['date_to'], # Может быть None
dict_argument_bd['ver'],
])
except Error as e:
print(e) # нужно убрать, проверка работат ли.
# Проверка является ли запись дублирующей
if 'ORA-00001' in str(e): continue
# В остальных случаях запись ошибки и пропуск данных.
else:
error_message = f"ERROR_DB-WRITE: {e} DATA: {dict_argument_bd}"
# Запись логов
write_error_to_log.write_to_log(error_message)
continue
except Exception as e:
error_message = f"ERROR_DB-GLOBAL: {e}"
write_error_to_log.write_to_log(error_message)
raise Error(f'Неожиданная ошибка: {e}')
# Передача данных в БД
for id, value in dict_data.items():
if len(value) != 5:
continue
organ, names, date_from_str, date_to_str, ver = value
# Парсим даты
if date_from_str:
date_from = parser.parse(date_from_str).date()
#date_from = str(date_from.strftime('%d.%m.%Y'))
else: date_from = parser.parse('01.01.2000')
if date_to_str:
date_to = parser.parse(date_to_str).date()
#date_to = str(date_to.strftime('%d.%m.%Y'))
else: date_to = parser.parse('01.01.2000')
print("Вывод отправляемых агрументов: ", id, organ, names, date_from, date_to, ver)
print(type(int(id)), type(organ), type(names), type(date_from), type(date_to), type(ver))
cursor.callproc('P_RK_GOVERNMENT_REPORTS_INSERS', [
int(id),
organ,
names,
date_from,
date_to,
ver,
])
except Error as e:
raise HTTPException(status_code=500, detail=f"Database error: {e}")

View File

@ -1,19 +0,0 @@
import datetime
TIMESTAMP = datetime.datetime.now().strftime("%d-%m-%Y %H:%M:%S")
LOG_FILE = "error_log.txt"
def write_to_log(error_message, log_file=LOG_FILE):
"""
Записывает ошибку в лог-файл с временной меткой
"""
print("Сообщение ошибки перед записю в файл: ", error_message) # Отладочная информация
with open(log_file, 'a', encoding='utf-8') as f:
f.write(f"[{TIMESTAMP}] {error_message}\n")
def clear_to_log(log_file=LOG_FILE):
"""
Очищает лог-файл и записывает новую шапку
"""
with open(log_file, 'w', encoding='utf-8') as f:
f.write(f'=== Лог ошибок создан в {TIMESTAMP} ===\n\n')

View File

@ -2,7 +2,7 @@ version: "3.8"
services:
parse_saby:
# image: git.dataekb.ru/sadikov/parse_saby/parse_saby_main:latest
image: git.dataekb.ru/sadikov/parse_saby/parse_saby_main:latest
container_name: parse_saby
build: .
volumes:

View File

@ -1,11 +1,12 @@
annotated-types==0.7.0
APScheduler==3.11.0
anyio==4.10.0
beautifulsoup4==4.13.5
bs4==0.0.2
certifi==2025.8.3
cffi==1.17.1
charset-normalizer==3.4.3
cryptography==45.0.7
cryptography==45.0.6
fastapi==0.116.1
hvac==2.3.0
idna==3.10
oracledb==3.3.0
@ -15,8 +16,10 @@ pydantic_core==2.33.2
python-dateutil==2.9.0.post0
requests==2.32.5
six==1.17.0
soupsieve==2.8
sniffio==1.3.1
soupsieve==2.7
starlette==0.47.3
typing-inspection==0.4.1
typing_extensions==4.15.0
tzlocal==5.3.1
urllib3==2.5.0
datetime