gogo2/agent-py-bot/agent.py

182 lines
7.4 KiB
Python

import logging
import asyncio, nest_asyncio
from telegram import Bot, Message, Update
from telegram.constants import ParseMode
from telegram.ext import Application, CommandHandler, MessageHandler, filters, ContextTypes, CallbackContext
# import "gopkg.in/telebot.v3/middleware"
import requests
import json
import base64
from selenium import webdriver
from selenium.webdriver.chrome.options import Options
from io import BytesIO
from PIL import Image
# Apply nest_asyncio
nest_asyncio.apply()
# Set up logging
logging.basicConfig(format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', level=logging.INFO)
logger = logging.getLogger(__name__)
# Telegram Bot Token
# t.me/kevin_ai_robot
TOKEN = '6805059978:AAHNJKuOeazMSJHc3-BXRCsFfEVyFHeFnjw'
# t.me/artitherobot 6749075936:AAHUHiPTDEIu6JH7S2fQdibwsu6JVG3FNG0
# This can be your own ID, or one for a developer group/channel.
# You can use the /start command of this bot to see your chat id.
DEVELOPER_CHAT_ID = "777826553"
# LLM API Endpoint
LLM_ENDPOINT = "http://192.168.0.11:11434/api/chat"
#! Selenium WebDriver setup for screenshots
#chrome_options = Options()
#chrome_options.add_argument("--headless")
#driver = webdriver.Chrome(options=chrome_options)
async def start(update: Update, context: CallbackContext):
await context.bot.send_message(chat_id=update.effective_chat.id, text="Hi! I'm your AI bot. Ask me aything with /ask")
async def echo(update: Update, context: CallbackContext):
# Check if in ask mode
if context.chat_data.get('ask_mode'):
await context.bot.send_chat_action(chat_id=update.effective_chat.id, action="typing")
# Process as if it's an ask command
context.chat_data['messages'].append(update.message.text)
# Process the concatenated messages
user_message = ' '.join(context.chat_data['messages'])
llm_response = await query_llm(user_message)
await update.message.reply_text("[ask]"+llm_response)
else:
# Regular echo behavior
await update.message.reply_text(update.message.text)
# await context.bot.send_message(chat_id=update.effective_chat.id, text=update.message.text)
async def ask(update: Message, context: CallbackContext):
try:
context.chat_data['ask_mode'] = True
context.chat_data['messages'] = []
# Send typing action
#await context.bot.send_chat_action(chat_id=update.effective_chat.id, action=telegram.ChatAction.TYPING)
await context.bot.send_chat_action(chat_id=update.effective_chat.id, action="typing")
user_message = ' '.join(context.args)
llm_response = await query_llm(user_message)
await update.message.reply_text(llm_response)
except Exception as e:
# Log the exception
logger.error(f"An error occurred: {e}")
# Optionally, send a message to the user about the error
await update.message.reply_text("An error occurred while processing your request.")
async def ok(update: Update, context: CallbackContext):
context.chat_data['ask_mode'] = False
context.chat_data['messages'] = []
await update.message.reply_text("Exiting ask mode.")
#https://github.com/jmorganca/ollama/blob/main/docs/api.md#generate-a-completion
async def query_llm(user_message):
"""Query the LLM with the user's message."""
data = {
"model": "llama2:latest",
"messages": [{"role": "user", "content": user_message}],
"stream": False
}
response = requests.post(LLM_ENDPOINT, json=data)
if response.status_code == 200:
response_data = response.json()
if "error" in response_data:
error_message = response_data.get('error', 'Unknown error')
# Log the error
logger.error(f"LLM Error: {error_message}")
# Return a user-friendly error message
return "Sorry, there was an error processing your request."
return response_data.get('message', {}).get('content', 'No response from AI')
else:
logger.error(f"Error reaching LLM: {response.text}")
return "Error: Unable to reach the AI agent."
async def execute_code(code_block):
"""
Execute the given Python code in a separate, sandboxed environment.
Returns the output or any errors encountered.
"""
try:
# Example: Using subprocess to run code in an isolated environment
# This is a basic example and not secure. Use a proper sandbox setup.
result = subprocess.run(['python', '-c', code_block],
capture_output=True, text=True, timeout=5)
return result.stdout or result.stderr
except subprocess.TimeoutExpired:
return "Execution timed out."
except Exception as e:
return f"An error occurred: {str(e)}"
async def main():
"""Start the bot."""
# Create an Application instance
application = Application.builder().token(TOKEN).build()
# Add handlers to the application
# Command handlers should be registered before the generic message handler
application.add_handler(CommandHandler("start", start))
# application.add_handler(CommandHandler("screenshot", screenshot)) # Ensure screenshot function is async
application.add_handler(CommandHandler("ask", ask))
application.add_handler(CommandHandler("ok", ok))
application.add_handler(CommandHandler("bad_command", bad_command))
# This handler should be last as it's the most generic
application.add_handler(MessageHandler(filters.TEXT, echo))
# ...and the error handler
application.add_error_handler(error_handler)
# Run the bot
await application.run_polling()
import html
import traceback
async def error_handler(update: object, context: ContextTypes.DEFAULT_TYPE) -> None:
"""Log the error and send a telegram message to notify the developer."""
# Log the error before we do anything else, so we can see it even if something breaks.
logger.error("Exception while handling an update:", exc_info=context.error)
# traceback.format_exception returns the usual python message about an exception, but as a
# list of strings rather than a single string, so we have to join them together.
tb_list = traceback.format_exception(None, context.error, context.error.__traceback__)
tb_string = "".join(tb_list)
# Build the message with some markup and additional information about what happened.
# You might need to add some logic to deal with messages longer than the 4096 character limit.
update_str = update.to_dict() if isinstance(update, Update) else str(update)
message = (
"An exception was raised while handling an update\n"
f"<pre>update = {html.escape(json.dumps(update_str, indent=2, ensure_ascii=False))}"
"</pre>\n\n"
f"<pre>context.chat_data = {html.escape(str(context.chat_data))}</pre>\n\n"
f"<pre>context.user_data = {html.escape(str(context.user_data))}</pre>\n\n"
f"<pre>{html.escape(tb_string)}</pre>"
)
# Finally, send the message
await context.bot.send_message(
chat_id=DEVELOPER_CHAT_ID, text=message, parse_mode=ParseMode.HTML
)
async def bad_command(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
"""Raise an error to trigger the error handler."""
await context.bot.wrong_method_name() # type: ignore[attr-defined]
if __name__ == '__main__':
loop = asyncio.get_event_loop()
if loop.is_running():
loop.create_task(main())
else:
asyncio.run(main())