API requests instead of local json
BOT_open_sesam/pipeline/head This commit looks good Details

test
dl 2025-05-29 11:29:48 +05:00
parent a869a07eba
commit 8582bdca9e
7 changed files with 69 additions and 108 deletions

View File

@ -1,22 +1,4 @@
# Usage
Создать файл .env в корневой дирректории проекта, объявить и присвоить значения переменным:
TOKEN=
LOCK_IP=
CARD_ID=
AUTH_API=
, где
TOKEN - токен телеграм для взаимодействия с ботом
LOCK_IP - ip адресс замка
CARD_ID - уникальный номер ключ-карты
AUTH_API - уникальный набор символов, для взаимодействия с API замка
Дополнительные подробности можно найти в instruction_http_api_v5.pdf и исходном коде программы (см. main.py)
## TODO
# TODO
- [x] Написать базовый функционал
~~- [ ] Создать соотношение типа "id_пользователя: досутп_к_замку:"~~
@ -24,4 +6,6 @@ AUTH_API - уникальный набор символов, для взаимо
- [x] Удалить модуль getenv
- [x] Создать и парсить json с информацией о пользователях и их номерах
~~- [ ] Проверять, является ли пользователь администратором, если является, выводить дополнительную кнопку, предлагающую добавить номер в БД~~
- [ ] Сделать логирование о том, что кто-то открыл дверь в конкретное время
- [x] Изменить json хранящийся локально на запросы к API
- [ ] Изменить названия комнта в локальном json
- [ ] Ограничить логи внутри докера

65
auth.py
View File

@ -1,38 +1,55 @@
from config import config
import re
ALLOWED_PHONE_NUMBERS = list(config.get("users", {}).keys())
import aiohttp
AUTHORIZED_USERS = {}
def check_user_auth(phone: str) -> bool:
return phone in ALLOWED_PHONE_NUMBERS
def normalize_phone(phone: str) -> str:
phone = phone.strip()
phone = re.sub(r"[^\d+]", "", phone)
if not phone.startswith("+"):
phone = "+" + phone
return phone
print(phone)
digits = re.sub(r"\D", "", phone)
if len(digits) > 10:
digits = digits[-10:]
print(digits)
return digits
def authorize_user(user_id: int, phone: str) -> bool:
normalized_phone = normalize_phone(phone)
if normalized_phone in ALLOWED_PHONE_NUMBERS:
AUTHORIZED_USERS[user_id] = normalized_phone
# if check_user_auth(phone):
# AUTHORIZED_USERS[user_id] = phone
print(f"{user_id} авторизован с номером: {normalized_phone}")
return True
else:
print(
f"Пользователь {user_id} пытался авторизоваться с номером {normalized_phone}"
)
async def authorize_user(user_id: int, phone: str) -> bool:
normalized = normalize_phone(phone)
api_url = config.get("user_api_url")
if not api_url:
print("Не указан user_api_url в конфигурации")
return False
async with aiohttp.ClientSession() as session:
try:
async with session.get(api_url, params={"tel": normalized}) as response:
print(response.status)
if response.status != 200:
print(f"Ошибка запроса к API: статус {response.status}")
return False
data = await response.json()
except Exception as e:
print(f"Исключение при запросе к API: {str(e)}")
return False
if data.get("response") != "1":
print(f"Доступ запрещен для номера: {normalized}")
return False
zone_str = data["data"].get("zone", "")
zones = [zone.strip() for zone in zone_str.split(";") if zone.strip()]
card_code = data["data"].get("card-code", "")
if not zones or not card_code:
print("Некорректный ответ API: отсутствуют зоны или код карты.")
return False
AUTHORIZED_USERS[user_id] = {"tel": normalized, "card": card_code, "zones": zones}
print(
f"{user_id} авторизован с номером: {normalized}, карта: {card_code}, зоны: {zones}"
)
return True
def is_user_auth(user_id: int) -> bool:
return user_id in AUTHORIZED_USERS

View File

@ -1,23 +1,14 @@
{
"bot_token": "",
"user_api_url": "https://papi.dataekb.ru/check_access",
"locks": {
"Room418": {
"ЦОД418": {
"ip": "10.9.1.26",
"auth_api": "73B15D12"
"pass": "73B15D12"
},
"Floor4": {
"2634952899": {
"ip": "10.9.1.27",
"auth_api": "F901C40A"
}
},
"users": {
"+79000959392": {
"access_card": "000D001195DD",
"lock_id": ["Room418", "Floor4"]
},
"+79221716513": {
"access_card": "",
"lock_id": ["Room418", "Floor4"]
"pass": "F901C40A"
}
}
}

View File

@ -2,8 +2,7 @@ from aiogram import Dispatcher, F
from aiogram.types import Message
from keyboard import get_locks_keyboard, get_contact_keyboard
from auth import authorize_user, normalize_phone
from config import config
from auth import authorize_user, AUTHORIZED_USERS
def register_contact_handler(dp: Dispatcher):
@ -15,21 +14,18 @@ def register_contact_handler(dp: Dispatcher):
await msg.answer("Ошибка: номер телефона не получен")
return
phone = normalize_phone(msg.contact.phone_number)
if not authorize_user(user_id, phone):
phone = msg.contact.phone_number
if not await authorize_user(user_id, phone):
await msg.answer("Доступ запрещен, номер не идентифицирован")
return
user_conf = config.get("users", {}).get(phone)
print(f"***user_conf для {phone}: {user_conf}***")
if not user_conf:
await msg.answer("Пользователь не опознан")
user_data = AUTHORIZED_USERS.get(user_id)
if not user_data:
await msg.answer("Ошибка получения данных пользователя")
return
allowed_locks = user_conf.get("lock_id", [])
print(f"***allowed_locks = {allowed_locks}***")
reply_markup = get_locks_keyboard(allowed_locks)
allowed_zones = user_data["zones"]
reply_markup = get_locks_keyboard(allowed_zones)
await msg.answer(
"Номер подтвержден. Выберите дверь для открытия", reply_markup=reply_markup

View File

@ -11,46 +11,29 @@ from config import config
def register_open_door_handler(dp: Dispatcher):
@dp.message()
async def open_door_handler(msg: Message):
print(
f"DEBUG: Получено сообщение от пользователя {msg.from_user.id}: '{msg.text}'"
)
user_id = msg.from_user.id
if not is_user_auth(user_id):
await msg.answer(
"Доступ запрщен. Необходимо предоставить свой номер телефона."
)
else:
print("OK")
phone = AUTHORIZED_USERS.get(user_id)
print(AUTHORIZED_USERS)
print(f"***user_id={user_id},phone={phone}***")
user_conf = config.get("users", {}).get(phone)
print(f"***phone={phone}, user_conf={user_conf}***")
# allowed_locks = user_conf.get("locks", [])
# print(f"***allowed_locks={allowed_locks}***")
# if msg.text not in allowed_locks:
# print("**********************")
# print(f"***{allowed_locks}***")
# print(f"***{msg.text}***")
# print("**********************")
# return
user_data = AUTHORIZED_USERS.get(user_id)
if not user_data:
await msg.answer("Ошибка авторизации пользователя.")
return
lock_key = msg.text
lock_conf = config.get("locks", {}).get(msg.text)
if not lock_conf:
await msg.answer("Информации по замку не найдено")
return
url = f"http://{lock_conf['ip']}/cgi-bin/ext"
auth_info = ("ext", lock_conf["auth_api"])
payload = f"CARD={user_conf['access_card']}&DIR=0"
auth_info = ("ext", lock_conf["pass"])
payload = f"CARD={user_data['card']}&DIR=0"
headers = {"Content-Type": "application/x-www-form-urlencoded"}
try:
print(
f"***DEBUG: Отправляю запрос к {url} c payload: {payload} и auth: {auth_info}"
)
response = await asyncio.to_thread(
requests.post,
url,
@ -59,9 +42,6 @@ def register_open_door_handler(dp: Dispatcher):
headers=headers,
timeout=5,
)
print(
f"DEBUG: URL: {url}, status: {response.status_code}, response: {response.text}"
)
if response.status_code == 200:
await msg.answer("Открыто")
else:

View File

@ -4,7 +4,6 @@ from aiogram.filters import CommandStart
from keyboard import get_contact_keyboard, get_locks_keyboard
from auth import is_user_auth, AUTHORIZED_USERS
from config import config
def register_start_handler(dp: Dispatcher):
@ -12,18 +11,13 @@ def register_start_handler(dp: Dispatcher):
async def command_start_handler(msg: Message):
user_id = msg.from_user.id
if is_user_auth(user_id):
phone = AUTHORIZED_USERS.get(user_id)
if not phone:
user_data = AUTHORIZED_USERS.get(user_id)
if not user_data:
await msg.answer("Номер не найден")
return
user_conf = config.get("user", {}).get(phone)
if not user_conf:
await msg.answer("Пользователь не найден в конфигурации")
return
allowed_locks = user_conf.get("locks_id", [])
reply_markup = get_locks_keyboard(allowed_locks)
allowed_zones = user_data["zones"]
reply_markup = get_locks_keyboard(allowed_zones)
await msg.answer(
"Авторизация прошла успешно",
reply_markup=reply_markup,

View File

@ -3,7 +3,6 @@ from aiogram.types import KeyboardButton
def get_locks_keyboard(allowed_locks: list):
print(f"DEBUG: allowed_locks = {allowed_locks}")
kb = ReplyKeyboardBuilder()
for lock in allowed_locks:
kb.button(text=lock)