Что нового?

Статья Чат-бот на питоне. Часть 2.

Lex

New member
Новичок

Lex

New member
Новичок
Статус
Оффлайн
Регистрация
20 Мар 2019
Сообщения
4
Реакции
2
В прошлом гайде мы создали класс для ведения логов и запускатор нашего бота.
В этой части мы создадим "движок" и тело бота, так же добавим базу для плагинов, приступим.
Перед тем, как продолжить, сразу создадим класс для удобного использования апи вк, я буду использоваю
You cant view this link please login.
(Много кода, но это не весь):
Python:
from Settings import BotSettings as Set
from Log import Log
import random
import vk_api
import time


vk = vk_api.VkApi(login=Set.Bot.Login, password=Set.Bot.Password)
vk.auth()


class VK(object):
    def __init__(self):
        self.vk = vk

    def exec(self, method, parameters):
        time.sleep(Set.Bot.SleepTime)
        return self.vk.method(method, parameters)

    class Messages(object):
        def __init__(self):
            self.chatID = Set.Chat.ChatID
            self.chatPeer = Set.Chat.ChatPeer
            self.parameters = dict()
            self.method = ''

        def send(self, message=None, lat=None, long=None, attachment=None, forward_messages=None):
            self.method = 'messages.send'
            self.parameters = {'peer_id': self.chatPeer, 'chat_id': self.chatID, 'random_id': random.randint(1, 2**64)}
            if message:
                self.parameters['message'] = message
            if lat:
                self.parameters['lat'] = lat
            if long:
                self.parameters['long'] = long
            if attachment:
                self.parameters['attachment'] = attachment
            if forward_messages:
                self.parameters['forward_messages'] = forward_messages
            try:
                return VK().exec(self.method, self.parameters)
            except Exception as e:
                Log.show_error(e)
                return False

        def get_history(self, count=None, offset=None, start_message_id=None, fields=None, rev=None):
            self.method = 'messages.getHistory'
            self.parameters['peer_id'] = self.chatPeer

            if count is not None:
                self.parameters['count'] = count
            else:
                self.parameters['count'] = 5

            if offset:
                self.parameters['offset'] = offset

            if fields:
                self.parameters['fields'] = fields

            if rev:
                self.parameters['rev'] = rev

            if start_message_id:
                self.parameters['start_message_id'] = start_message_id

            try:
                return VK().exec(self.method, self.parameters)
            except Exception as e:
                Log.show_error(e)
                return False

        def remove_chat_user(self, member_id=None):
            try:
                self.method = 'messages.removeChatUser'
                self.parameters['chat_id'] = self.chatID
                self.parameters['member_id'] = member_id
                return VK().exec(self.method, self.parameters)
            except Exception as e:
                Log.show_error(e)
                return False

        def set_chat_photo(self, file=None):
            try:
                self.method = 'messages.setChatPhoto'
                if file:
                    self.parameters['file'] = file
                else:
                    self.parameters['file'] = Set.Chat.Photo
                return VK().exec(self.method, self.parameters)
            except Exception as e:
                Log.show_error(e)
                return False

        def edit_chat(self, title=None):
            try:
                self.method = 'messages.editChat'
                self.parameters['chat_id'] = self.chatID
                if title:
                    self.parameters['title'] = title
                else:
                    self.parameters['title'] = Set.Chat.Title
                return VK().exec(self.method, self.parameters)
            except Exception as e:
                Log.show_error(e)
                return False

        def add_chat_user(self, from_id=None):
            self.method = 'messages.addChatUser'
            self.parameters['chat_id'] = self.chatID
            if from_id:
                self.parameters['from_id'] = from_id
            try:
                return VK().exec(self.method, self.parameters)
            except Exception as e:
                Log.show_error(e)
                return False

        def edit(self, message=None, message_id=None, lat=None, long=None, attachment=None):
            self.method = 'messages.edit'
            self.parameters = {'peer_id': self.chatPeer}
            if message:
                self.parameters['message'] = message
            if lat:
                self.parameters['lat'] = lat
            if long:
                self.parameters['long'] = long
            if attachment:
                self.parameters['attachment'] = attachment
            if message_id:
                self.parameters['message_id'] = message_id
            try:
                return VK().exec(self.method, self.parameters)
            except Exception as e:
                Log.show_error(e)
                return False

        def get_by_id(self, message_ids=None):
            self.method = 'messages.getById'
            self.parameters['message_ids'] = message_ids
            try:
                return VK().exec(self.method, self.parameters)
            except Exception as e:
                Log.show_error(e)
                return False

        def get_chat(self, chat_id=None, fields=None):
            self.method = 'messages.getChat'
            self.parameters['chat_id'] = self.chatID
            if chat_id:
                self.parameters['chat_id'] = chat_id
            if fields:
                self.parameters['fields'] = fields
            try:
                return VK().exec(self.method, self.parameters)
            except Exception as e:
                Log.show_error(e)
                return False

        def get_invite_link(self, reset=None):
            self.method = 'messages.getInviteLink'
            self.parameters['peer_id'] = self.chatPeer
            if reset:
                self.parameters['reset'] = reset
            try:
                return VK().exec(self.method, self.parameters)
            except Exception as e:
                Log.show_error(e)
                return False

        def pin(self, message_id=None):
            self.method = 'messages.pin'
            self.parameters['peer_id'] = self.chatPeer
            if message_id:
                self.parameters['message_id'] = message_id
            try:
                return VK().exec(self.method, self.parameters)
            except Exception as e:
                Log.show_error(e)
                return False

        def unpin(self, group_id=None):
            self.method = 'messages.pin'
            self.parameters['peer_id'] = self.chatPeer
            if group_id:
                self.parameters['group_id'] = group_id
            try:
                return VK().exec(self.method, self.parameters)
            except Exception as e:
                Log.show_error(e)
                return False

    class Users(object):
        def __init__(self):
            self.parameters = dict()
            self.method = ''

        def get(self, user_ids=None, fields=None, name_case=None):
            self.method = 'users.get'
            if user_ids:
                self.parameters['user_ids'] = user_ids
            if fields:
                self.parameters['fields'] = fields
            if name_case:
                self.parameters['name_case'] = name_case
            try:
                return VK().exec(self.method, self.parameters)
            except Exception as e:
                Log.show_error(e)
                return False

        def get_followers(self):
            self.method = 'users.getFollowers'
            try:
                return VK().exec(self.method, self.parameters)
            except Exception as e:
                Log.show_error(e)
                return False

        def get_nearby(self, latitude=None, longitude=None, radius=None, fields=None):
            self.method = 'users.getNearby'
            if latitude:
                self.parameters['latitude'] = latitude
            if longitude:
                self.parameters['longitude'] = longitude
            if radius:
                self.parameters['radius'] = radius
            if fields:
                self.parameters['fields'] = fields
            try:
                return VK().exec(self.method, self.parameters)
            except Exception as e:
                Log.show_error(e)
                return False
Теперь приступим к Main.py:
Здесь будет основное "тело" бота, с которого начинается прочитывание сообщений и передача данных движку (Опять много кода)
Python:
from Settings import BotSettings as Set
from Settings import DataBase as DB
from threading import Thread
from Engine import Engine
from Log import Log
from VK import VK
import Utils


class Main(object):
    @staticmethod
    def start():
        try:
            Log.show_info(f'Бот запущен.')
            if not Set.Bot.Start:
                VK().Messages().send(message=f'{Set.Bot.Name}: Успешный запуск.')
                Set.Bot.GlobalID = VK().Messages().get_history(count=1)['items'][0]['id']
                Set.Bot.Start = True
            Set.Bot.Users = VK().Messages().get_chat()['users']
            while True:
                chat_title = VK().Messages().get_chat()['title']
                if chat_title != Set.Chat.Title:
                    VK().Messages().edit_chat(title=Set.Chat.Title)
                response = VK().Messages().get_history(count=10, offset=-10, start_message_id=Set.Chat.GlobalID, fields='first_name, last_name')
                if response is not False:  # Если запрос правильный
                    response = response['items'][::-1]
                    if len(response):
                        for item in response:
                            if int(item['id']) > Set.Chat.GlobalID and item['from_id'] != Set.Bot.MainID:
                                Set.Chat.GlobalID = int(item['id'])  # проверяем только новые сообщения
                                s = ''
                                p = None
                                if 'text' in item:  # если сообщение содержит текст
                                    if len(item['text']) > 0:
                                        s += str.encode(item['text']).decode('utf-8')
                                    else:
                                        s += '(None)'
                                log = s
                                u = item['from_id']
                                if 'attachments' in item:  # если в сообщении есть прикрепленные материалы
                                    if len(item['attachments']):
                                        log += ' ('
                                        for a in item['attachments']:
                                            tp = a['type']
                                            a = a[str(tp)]
                                            if tp == 'doc':  # документ
                                                log += f'doc: {a["title"]} (doc{a["owner_id"]}_{a["id"]}'
                                                if 'access_key' in a:
                                                    log += '_' + a["access_key"]
                                                log += ')'
                                            if tp == 'photo':  # фото
                                                p = a['sizes'][len(a['sizes']) - 1]['url']
                                                log += f'photo: photo{a["owner_id"]}_{a["id"]}'
                                                if 'access_key' in a:
                                                    log += '_' + a["access_key"]
                                            if tp == 'audio':  # аудио
                                                log += f'audio: {a["artist"]} - ' \
                                                       f'{a["title"]} (audio{a["owner_id"]}_{a["id"]}'
                                                if 'access_key' in a:
                                                    log += '_' + a["access_key"]
                                                log += ')'
                                            if tp == 'video':  # видео
                                                log += f'video: {a["title"]} (video{a["owner_id"]}_{a["id"]}'
                                                if 'access_key' in a:
                                                    log += '_' + a["access_key"]
                                                log += ')'
                                            if tp == 'link':  # ссылка
                                                log += f'link: {a["title"]} ({a["url"]})'
                                            if tp == 'sticker':  # стикер
                                                log += f'sticker: product_{a["product_id"]} id_{a["sticker_id"]}'
                                            log += ', '
                                        log = log[:len(log) - 2]
                                        log += ')'
                                if 'geo' in item:  # если кто-то отправил гео-положение
                                    log += f' (geo: {item["geo"]["coordinates"]})'
                                    if 'place' in item['geo']:
                                        log += f' ({item["geo"]["place"]["country"]}, {item["geo"]["place"]["city"]})'
                                if 'fwd_messages' in item:  # если кто-то переслал сообщение
                                    if len(item['fwd_messages']):
                                        log += f' (forward: '
                                        for mes in item['fwd_messages']:
                                            log += f'{Utils.VKEngine.get_username_by_id(mes["from_id"])} ({mes["text"]}), '
                                        log = log[:len(log) - 2]
                                        log += ')'
                                Log.show_vk(Utils.VKEngine.get_username_by_id(u) + ': ' + log)
                                if 'action' in item:  # если произошло какое-то действие
                                    if item['action']['type'] == 'chat_title_update':  # если изменили имя чата
                                        VK().Messages().send(message='Название менять нельзя.')
                                        VK().Messages().remove_chat_user(member_id=item['action']['member_id'])
                                        VK().Messages().edit_chat(title=Set.Chat.Title)
                                    if item['action']['type'] == 'chat_photo_update':  # если изменили фото чата
                                        VK().Messages().send(message='Фото менять нельзя.')
                                        VK().Messages().remove_chat_user(member_id=item['action']['member_id'])
                                        VK().Messages().set_chat_photo(file=Set.Chat.Photo)
                                    if item['action']['type'] == 'chat_invite_user_by_link':  # если пользователь зашел в чат по ссылке
                                        if not int(item['from_id']) in DB.BlackList:
                                            VK().Messages().send(message='Добро пожаловать, ' + Utils.VKEngine.get_link_by_id(item['from_id']) + '.')
                                        else:
                                            VK().Messages().send(message='Сорен, ты в ЧС.')
                                            VK().Messages().remove_chat_user(member_id=item['action']['member_id'])
                                    if item['action']['type'] == 'chat_invite_user' or item['action']['type'] == 'chat_kick_user':  # если кто-то пригласил или кикнул кого-то
                                        kick = False
                                        if not int(item['from_id']) in DB.Privilege:
                                            VK().Messages().remove_chat_user(member_id=item['from_id'])
                                            kick = True
                                            if int(item['from_id']) != int(item['action']['member_id']):
                                                VK().Messages().remove_chat_user(member_id=item['action']['member_id'])
                                        response = VK().Messages().get_chat()['users']
                                        for user in response:
                                            if int(user) in DB.BlackList:
                                                kick = True
                                                VK().Messages().remove_chat_user(member_id=user)
                                        if not kick:
                                            VK().Messages().send(message='Добро пожаловать, ' + Utils.VKEngine.get_username_by_id(item['action']['member_id'] + '.'))
                                    if item['action']['type'] == 'chat_pin_message':  # если кто-то закрепил сообщение
                                        if not int(item['action']['member_id']) in DB.Admins:
                                            VK().Messages().send(message='Закреп менять нельзя.')
                                            VK().Messages().remove_chat_user(member_id=item['action']['member_id'])
                                            VK().Messages().unpin()
                                            VK().Messages().pin(message_id=Set.Chat.PinID['message_id'])
                                        else:
                                            Set.Chat.PinID['message_id'] = item['id']
                                            open('DB\\pin.txt', 'w').write(str(Set.Chat.PinID))
                                    if item['action'] == 'chat_unpin_message':  # если кто-то открепил сообщение
                                        if not int(item['action_mid']) in DB.Admins:
                                            VK().Messages().send(message='Закреп удалять нельзя.')
                                            VK().Messages().remove_chat_user(member_id=item['action']['member_id'])
                                        VK().Messages().pin(message_id=Set.Chat.PinID['message_id'])
                                elif len(s) > 2:  # если длина сообщения больше 2, то она будет обрабатываться
                                    if (Set.Bot.Debug and int(u) == Set.Bot.CreatorID) or not Set.Bot.Debug:
                                        c = s[0]
                                        if c in Set.Chat.Prefixes:
                                            s = s[1:len(s)]
                                            md = 'a' if int(u) in DB.Admins else ('m' if int(u) in DB.Moders else 'u')

                                            def start_thread():  # запускаем обработку в отдельном потоке, чтобы бот дальше принимал сообщения
                                                Engine().cmd(s, p, u, md, item['id'])

                                            Thread(target=start_thread).start()
                                    else:
                                        if s[0] in Set.Chat.Prefixes:
                                            VK().Messages().send('Бот в режиме дебага.')
        except Exception as e:
            Log.show_error(e)
Теперь бот умеет запускаться, но обрабатывать сообщения он не будет, исправим это:
Движок будет инициализировать плагины при старте и обрабатывать сообщения, если это команда, то отправлять плагину
Python:
import os
import inspect
from Plugins.BasePlugin import BasePlugin as Base
from Settings import BotSettings as Set
from Log import Log
from VK import VK
import Utils
import collections


class Engine(object):
    def __init__(self):
        self.plugin_dir = Set.Bot.PluginsDir
        self.modules = []
        self.package_obj = None

    def initialize(self):
        check1 = []
        check2 = []
        check3 = []
        if not Set.Bot.Run:
            tr = 0
            fl = 0
            al = 0
            Set.Bot.Run = True
            len_plugins = len(os.listdir(self.plugin_dir)) - 4
            counter_plugins = 1
            for fname in os.listdir(self.plugin_dir):
                if fname.endswith(".py"):
                    module_name = fname[: -3]
                    if module_name != "Base" and module_name != "__init__":
                        try:
                            self.package_obj = __import__(self.plugin_dir + "." + module_name)
                            self.modules.append(module_name)
                            Log.show_service(f"[{counter_plugins}/{len_plugins}] {module_name} loaded")
                            tr += 1
                        except Exception as e:
                            Log.show_error(f"[{counter_plugins}/{len_plugins}] {module_name} not loaded: {e}")
                            fl += 1
                        al += 1
                        counter_plugins += 1
            Log.show_service(f'Loaded: {tr}/{al} ({fl} errors)')

        for module_name in self.modules:
            module_obj = getattr(self.package_obj, module_name)

            for elem in dir(module_obj):
                obj = getattr(module_obj, elem)
                if inspect.isclass(obj):
                    if issubclass(obj, Base):
                        a = obj()
                        if 'BasePlugin' not in str(a):
                            Set.Bot.Plugins.append(a)
                            name = str(a)[str(a).find('.')+1:str(a).find(' ')]
                            name = name[name.find('.')+1:]
                            check1.append(name)
                            check3.append(a.name)
                            for word in a.words:
                                check2.append(word)
        if len(check1) != len(set(check1)):
            _mas = [item for item, count in collections.Counter(check1).items() if count > 1]
            Log.show_error('There are plugins with the same plugin name: ' + str(_mas))  # проверяем, есть ли плагины с одинаковым именем
            raise KeyboardInterrupt
        if len(check2) != len(set(check2)):
            _mas = [item for item, count in collections.Counter(check2).items() if count > 1]
            Log.show_error('There are plugins with the same keywords: ' + str(_mas))  # проверяем, есть ли плагины с одинаковыми ключевыми словами
            raise KeyboardInterrupt
        if len(check3) != len(set(check3)):
            _mas = [item for item, count in collections.Counter(check1).items() if count > 1]
            Log.show_error('There are plugins with the same name: ' + str(_mas))  # проверяем, есть ли плагины с одинаковым названием
            raise KeyboardInterrupt
        Set.Bot.Words = check2

    @staticmethod
    def cmd(t=None, p=None, u=None, m=None, d=None):
        md = f"{Set.Bot.Name} [v{Set.Bot.Version}]"
        Set.Bot.Answer = False
        Set.Bot.StartNow = False
        for a in Set.Bot.Plugins:
            res = dict()
            res['message'] = ''
            res['attachment'] = ''
            res = a.cmd(t, p, u, m, d)  # тут он отправляет плагину сообщение для обработки
            if res is not False:  # если ответ есть, то обрабатывает дальше
                if len(str(res)) > 0:
                    p = str(a)[str(a).find('.')+1:str(a).find(' ')]
                    p = p[p.find('.')+1:]
                    Log.show_plugin(res['plugin'] + ' (' + p + ')')
                    Set.Bot.Answer = True
                    if not res['message'] is None or not res['attachment'] is None:
                        if res['message']:  # отправляет текст
                            res['message'] = f"{res['message']}\n{md}"
                            VK().Messages().send(message=res['message'], forward_messages=d)
                        elif res['attachment']:  # или приложение
                            VK().Messages().send(message=md, attachment=res['attachment'], forward_messages=d)
                    if not res['lat'] is None and not res['long'] is None:
                        Set.Bot.Answer = True  # здесь местоположение
                        VK().Messages().send(message=md, lat=res['lat'], long=res['long'], forward_messages=d)
                    if not res['forward_messages'] is None:  # пересланные сообщения
                        VK().Messages().send(message=md, forward_messages=res['forward_messages'])
        if not Set.Bot.Answer and m != 'b' and not Set.Bot.StartNow:
            Utils.HelpEngine.what()  # если такой команды нет
            Log.show_plugin('None')
Основное есть, теперь он будет обрабатывать все сообщения, которые являются командами, напишем теперь базу для плагинов:
Python:
import Utils
from Settings import BotSettings as Set


class BasePlugin(object):
    def __init__(self):  # тут основные свойства плагина
        self.name = 'name'  # его имя
        self.description = 'description'  # описание
        self.words = ['word']  # ключевые слова
        self.mode = ''  # режим
        self.text = ''  # текст
        self.photo = ''  # фото
        self.result = {'message': '', 'attachment': None, 'lat': None, 'long': None, 'forward_messages': None, 'plugin': None}
        self.level = 'bamu'  # уровень доступа
        self.message = None  # сообщение
        self.ulvl = None  # опять доступ
        self.user = None  # пользователь
        self.hide = False  # если True, то плагин скрывается
        self.job = True  # если False, то плагин не работает
        self.debug = False  # если True, то плагин в режиме дебага, с ним работать могут только админы

    def run(self):
        self.result['plugin'] = self.name
        if self.mode == '?':  # отправляет информацию о плагине
            self.get_info()
        else:  # иначе запускает его
            self.func()
        return self.result

    def cmd(self, text, photo, user, mode, message_id):
        self.message = message_id
        self.result['message'] = ''
        self.result['attachment'] = ''
        self.user = user
        self.photo = photo
        self.text = str.encode(text).decode('utf-8')
        if text[0] == '?':
            self.mode = '?'
            text = text[1:len(text)]
        elif text[0] == '-':
            self.mode = '-'
            text = text[1:len(text)]
        elif text[0] == '+':
            self.mode = '+'
            text = text[1:len(text)]
        else:
            self.mode = 'None'
        helping = ''
        if ' ' in text:
            helping = text[text.find(' ')+1:len(text)]
            text = text[0:text.find(' ')]
        if text.lower() in self.words:
            if self.job or self.debug:
                if not self.debug:
                    if mode in self.level:
                        self.ulvl = mode
                        self.text = helping
                        return self.run()
                    else:
                        Set.Bot.StartNow = True
                        Utils.HelpEngine.non()
                        return False
                else:
                    if mode == 'a':
                        self.ulvl = mode
                        self.text = helping
                        return self.run()
                    else:
                        Set.Bot.StartNow = True
                        Utils.HelpEngine.debug()
                        return False
            else:
                Utils.HelpEngine.job()
        return False

    def func(self):
        pass

    def get_info(self):
        self.result['message'] = 'Название плагина: ' + self.name + \
                                 '\nОписание: ' + self.description + \
                                 '\nКоманды: ' + ', '.join(self.words)
Все готово, Settings.py изменился по ходу написания на:
Python:
class BotSettings:
    class Log:
        All = True
        Text = True
        Console = True

    class Bot:
        Login = ''  # логин
        Password = ''  # пароль
        Start = False
        Name = 'DataStockBot'
        Version = '1'
        SleepTime = 0.25
        Plugins = list()
        MainID = None  # ид бота
        Debug = False
        CreatorID = None  # ваш ид
        StartNow = False
        Run = False
        Answer = False
        Words = None
        PluginsDir = 'Plugins'  # папка с плагинами

    class Chat:
        Users = list()
        Title = 'DataStock Chat'  # название чата
        Photo = ''  # тут должно быть фото, как его загрузить я покажу в следующей части
        ChatID = 1  # ид чата от лица бота
        ChatPeer = 2000000000 + ChatID
        GlobalID = None  # ид последнего сообщения
        PinID = None  # ид закрепленного сообщения
        Prefixes = ('!', '/', '\\')


class DataBase:
    BlackList = list()
    WhiteList = list()
    Admins = list()
    Moders = list()
    Stuff = Admins + Moders
    Privilege = WhiteList + Stuff
На этом все, в следующей части мы напишем первые простенькие плагины и посмотрим как работает наш бот.
 
  • Мне нравится
Реакции: Dust

QS1SL

New member
Новичок

QS1SL

New member
Новичок
Статус
Оффлайн
Регистрация
15 Мар 2020
Сообщения
1
Реакции
0
Ждём продолжение-с
 
Сверху