ug2024finals/bot.py

306 lines
12 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

from aiogram import Bot, Dispatcher, executor, types
from aiogram.types import ChatType, ReplyKeyboardRemove, ReplyKeyboardMarkup, KeyboardButton, InlineKeyboardMarkup, InlineKeyboardButton
import socket, json
chat_id = -1001710050337
API_TOKEN = '7074773527:AAFlDwOONJtLoeQXpRpOCjdpEFRHh_YIyHw'
bot = Bot(token=API_TOKEN)
dp = Dispatcher(bot)
users = {
"1":[],
"2":[],
"3":[]
}
cdata = {}
def get_data():
global cdata
if open('data.json', 'r').read() != "{}":
with open('data.json', 'r') as w:
cdata = json.load(w)
print("GOT FROM FILE")
else:
if not(cdata):
# if session['loggedin']:
sensors = ['pozhar', 'pozharolikv', 'soundstatus','soundlikv', 'doorst', 'gaz', 'gazolikv']
cdata = {'1': {},
'2': {},
'3': {},
'potok': 100,
'earthquake': 1,
'techka': 1,
'tok': 100,
'nasostoggle': 1,
'lightst':1
}
for i in range(1,4):
for j in sensors:
if j in ['earthquake', 'techka', 'nasostoggle', 'pozhar', 'soundstatus', 'gazolikv', 'pozharolikv', 'gaz','soundlikv']:
cdata[str(i)][j+str(i)] = random.randint(1,1)
else:
cdata[str(i)][j+str(i)] = random.randint(50, 50)
with open("data.json", 'w') as f:
json.dump(cdata, f, indent=4)
return cdata
else:
with open("data.json", 'w') as f:
json.dump(cdata, f, indent=4)
print("not created")
return cdata
cdata = get_data()
def get_value(sensorid):
global cdata
if sensorid in cdata.keys():
return cdata[sensorid]
else:
for i in cdata.keys():
try:
if sensorid in cdata[str(i)].keys():
return cdata[str(i)][sensorid]
except:
break
def change_data(sensorid, value):
global cdata
if sensorid in cdata.keys():
cdata[sensorid] = value
with open('data.json', 'w') as r:
json.dump(cdata, r)
print("changed", sensorid, value)
else:
for i in cdata.keys():
try:
if sensorid in cdata[str(i)].keys():
cdata[str(i)][sensorid] = value
print("chengci", sensorid, value)
with open('data.json', 'w') as r:
json.dump(cdata, r)
break
except:
break
return '1'
@dp.message_handler(commands=['start'])
async def start(message: types.Message):
tt = message.from_user.id
k=1
for i in users.keys():
if tt in users[i]:
k = 0
if k:
keyboard = InlineKeyboardMarkup()
keyboard.add(
InlineKeyboardButton(text="1", callback_data="1"),
InlineKeyboardButton(text="2", callback_data="2"),
InlineKeyboardButton(text="3", callback_data="3")
)
await message.answer("В каком подъезде вы живёте?", reply_markup=keyboard)
else:
await message.answer("Вы уже зарегистрированы")
@dp.callback_query_handler(lambda query: query.data in ['1', '2', '3'])
async def process_callback(callback_query: types.CallbackQuery):
user_id = callback_query.from_user.id
users[callback_query.data].append(user_id)
await bot.delete_message(chat_id=callback_query.from_user.id, message_id=callback_query.message.message_id)
await bot.send_message(user_id, f"Вы добавлены в подъезд {callback_query.data}")
@dp.message_handler(content_types=types.ContentTypes.NEW_CHAT_MEMBERS)
async def add_chat_members(message: types.Message):
for member in message.new_chat_members:
user_id = member.id
if user_id not in users:
await message.answer(f"Пользователь {member.full_name}, зарегистрируйтесь у меня для работы с умным домом")
@dp.message_handler(lambda message: message.text == "Открыть дверь")
async def open_the_door(message: types.Message):
k = ''
for i in users.keys():
if message.from_user.id in users[i]:
k = i
change_data("doorst"+k, 0)
print(cdata[k]["doorst"+k])
data = get_data()
k = ''
keyboard = types.ReplyKeyboardMarkup(resize_keyboard=True)
button_open = types.KeyboardButton("Открыть дверь")
button_toggle = types.KeyboardButton("Включить освещение")
button_untoggle = types.KeyboardButton("Выключить освещение")
button_close = types.KeyboardButton("Закрыть дверь")
button_clear_snow = types.KeyboardButton("Очистка снега")
button_doctor = types.KeyboardButton("Врач")
keyboard.add(button_clear_snow, button_doctor)
if get_value("lightst"):
keyboard.add(button_untoggle)
else:
keyboard.add(button_toggle)
if not(get_value("doorst"+k)):
print("opened")
keyboard.add(button_close)
else:
print("closed")
keyboard.add(button_open)
await message.answer("Вы открыли дверь" + cdata,reply_markup=keyboard)
@dp.message_handler(lambda message: message.text == "Закрыть дверь")
async def close_the_door(message: types.Message):
k = ''
for i in users.keys():
if message.from_user.id in users[i]:
k = i
change_data("doorst"+k, 1)
print(cdata[k]["doorst"+k])
data = get_data()
k = ''
keyboard = types.ReplyKeyboardMarkup(resize_keyboard=True)
button_open = types.KeyboardButton("Открыть дверь")
button_toggle = types.KeyboardButton("Включить освещение")
button_untoggle = types.KeyboardButton("Выключить освещение")
button_close = types.KeyboardButton("Закрыть дверь")
button_clear_snow = types.KeyboardButton("Очистка снега")
button_doctor = types.KeyboardButton("Врач")
keyboard.add(button_clear_snow, button_doctor)
if get_value("lightst"):
keyboard.add(button_untoggle)
else:
keyboard.add(button_toggle)
if not(get_value("doorst"+k)):
print("opened")
keyboard.add(button_close)
else:
keyboard.add(button_open)
await message.answer("Вы закрыли дверь",reply_markup=keyboard)
@dp.message_handler(lambda message: message.text == "Включить освещение")
async def turn_on_light(message: types.Message):
for i in users.keys():
if message.from_user.id in users[i]:
k = i
change_data("lightst", 1)
data = get_data()
k = ''
keyboard = types.ReplyKeyboardMarkup(resize_keyboard=True)
button_open = types.KeyboardButton("Открыть дверь")
button_toggle = types.KeyboardButton("Включить освещение")
button_untoggle = types.KeyboardButton("Выключить освещение")
button_close = types.KeyboardButton("Закрыть дверь")
button_clear_snow = types.KeyboardButton("Очистка снега")
button_doctor = types.KeyboardButton("Врач")
keyboard.add(button_clear_snow, button_doctor)
if get_value("lightst"):
keyboard.add(button_untoggle)
else:
keyboard.add(button_toggle)
if not(get_value("doorst"+k)):
print("opened")
keyboard.add(button_close)
else:
keyboard.add(button_open)
print(cdata["lightst"])
await message.answer("Вы включили свет",reply_markup=keyboard)
@dp.message_handler(lambda message: message.text == "Выключить освещение")
async def turn_off_light(message: types.Message):
for i in users.keys():
if message.from_user.id in users[i]:
k = i
change_data("lightst", 0)
print(cdata["lightst"])
data = get_data()
k = ''
keyboard = types.ReplyKeyboardMarkup(resize_keyboard=True)
button_open = types.KeyboardButton("Открыть дверь")
button_toggle = types.KeyboardButton("Включить освещение")
button_untoggle = types.KeyboardButton("Выключить освещение")
button_close = types.KeyboardButton("Закрыть дверь")
button_clear_snow = types.KeyboardButton("Очистка снега")
button_doctor = types.KeyboardButton("Врач")
keyboard.add(button_clear_snow, button_doctor)
if get_value("lightst"):
keyboard.add(button_untoggle)
else:
keyboard.add(button_toggle)
if not(get_value("doorst"+k)):
keyboard.add(button_close)
else:
keyboard.add(button_open)
await message.answer("Вы выключили свет",reply_markup=keyboard)
@dp.message_handler(lambda message: message.text == "Очистка снега")
async def clear_snow(message: types.Message):
for i in users.keys():
if message.from_user.id in users[i]:
k = i
await message.answer("Вы вызвали очистку снега",reply_markup=keyboard)
@dp.message_handler(lambda message: message.text == "Врач")
async def call_doctor(message: types.Message):
for i in users.keys():
if message.from_user.id in users[i]:
k = i
await message.answer("Вы вызвали врача",reply_markup=keyboard)
@dp.message_handler(lambda message: message.chat.type == ChatType.PRIVATE)
async def private_chat(message: types.Message):
data = get_data()
k = ''
keyboard = types.ReplyKeyboardMarkup(resize_keyboard=True)
button_open = types.KeyboardButton("Открыть дверь")
button_toggle = types.KeyboardButton("Включить освещение")
button_untoggle = types.KeyboardButton("Выключить освещение")
button_close = types.KeyboardButton("Закрыть дверь")
button_clear_snow = types.KeyboardButton("Очистка снега")
button_doctor = types.KeyboardButton("Врач")
for i in users.keys():
if message.from_user.id in users[i]:
k = i
keyboard.add(button_clear_snow, button_doctor)
if get_value("lightst"):
keyboard.add(button_untoggle)
else:
keyboard.add(button_toggle)
if not(get_value("doorst"+k)):
keyboard.add(button_close)
else:
keyboard.add(button_open)
await message.answer("Выберите действие:", reply_markup=keyboard)
async def udp_listener():
with socket.socket(socket.AF_INET, socket.SOCK_DGRAM) as sock:
sock.bind(('0.0.0.0', 9876))
while True:
data, addr = sock.recvfrom(1024)
message = data.decode('utf-8')
try:
lst = message.split(',')
if lst[1] == 'pozhar':
await bot.send_message(chat_id, f"У вас пожар")
if lst[1] == 'sound':
for user in users[lst[0]]:
await bot.send_message(user, f"У вас слишком громкий звук")
if lst[1] == 'gaz':
await bot.send_message(chat_id, f"У вас утечка газа")
if lst[1] == 'protechka':
await bot.send_message(chat_id, f"У вас протечка")
except:
continue
if __name__ == '__main__':
executor.start_polling(dp, skip_updates=True)
executor.start(udp_listener())