gogo2/agent-py-bot/agent.py

235 lines
9.1 KiB
Python

import logging
import asyncio, nest_asyncio
from telegram import Bot, Message, Update
from telegram.constants import ParseMode
from telegram.ext import Application, CommandHandler, MessageHandler, filters, ContextTypes, CallbackContext
import os
import requests
import json
import base64
from selenium import webdriver
from selenium.webdriver.chrome.options import Options
from io import BytesIO
from PIL import Image
# Apply nest_asyncio
nest_asyncio.apply()
# Set up logging
logging.basicConfig(format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', level=logging.INFO)
logger = logging.getLogger(__name__)
# Telegram Bot Token
# t.me/kevin_ai_robot
TOKEN = '6805059978:AAHNJKuOeazMSJHc3-BXRCsFfEVyFHeFnjw'
# t.me/artitherobot 6749075936:AAHUHiPTDEIu6JH7S2fQdibwsu6JVG3FNG0
# This can be your own ID, or one for a developer group/channel.
# You can use the /start command of this bot to see your chat id.
DEVELOPER_CHAT_ID = "777826553"
# LLM API Endpoint
LLM_ENDPOINT = "http://192.168.0.11:11434/api/chat"
APPEND_RESULTS = os.getenv('APPEND_RESULTS', 'True') == 'True'
async def start(update: Update, context: CallbackContext):
await context.bot.send_message(chat_id=update.effective_chat.id, text="Hi! I'm your AI bot. Ask me aything with /ask")
async def echo(update: Update, context: CallbackContext):
# Check if in ask mode
if context.chat_data.get('ask_mode'):
await context.bot.send_chat_action(chat_id=update.effective_chat.id, action="typing")
# Process as if it's an ask command
context.chat_data['messages'].append(update.message.text)
# Process the concatenated messages
user_message = ' '.join(context.chat_data['messages'])
llm_response = await query_llm(user_message)
await update.message.reply_text("[ask]"+llm_response)
else:
# Regular echo behavior
await update.message.reply_text(update.message.text)
# await context.bot.send_message(chat_id=update.effective_chat.id, text=update.message.text)
async def ask(update: Message, context: CallbackContext):
try:
context.chat_data['ask_mode'] = True
context.chat_data['messages'] = []
# Send typing action
#await context.bot.send_chat_action(chat_id=update.effective_chat.id, action=telegram.ChatAction.TYPING)
await context.bot.send_chat_action(chat_id=update.effective_chat.id, action="typing")
user_message = ' '.join(context.args)
llm_response = await query_llm(user_message)
await update.message.reply_text(llm_response)
except Exception as e:
# Log the exception
logger.error(f"An error occurred: {e}")
# Optionally, send a message to the user about the error
await update.message.reply_text("An error occurred while processing your request.")
async def ok(update: Update, context: CallbackContext):
context.chat_data['ask_mode'] = False
context.chat_data['messages'] = []
await update.message.reply_text("Exiting ask mode.")
# CODE RUNNER
import re
from agents.runner import execute_python_code
#https://github.com/jmorganca/ollama/blob/main/docs/api.md#generate-a-completion
async def query_llm(user_message):
"""Query the LLM with the user's message."""
data = {
"model": "llama2:latest",
"messages": [{"role": "user", "content": user_message}],
"stream": False
}
response = requests.post(LLM_ENDPOINT, json=data)
if response.status_code == 200:
response_data = response.json()
if "error" in response_data:
error_message = response_data.get('error', 'Unknown error')
# Log the error
logger.error(f"LLM Error: {error_message}")
# Return a user-friendly error message
return "Sorry, there was an error processing your request."
# handle response
content = response_data.get('message', {}).get('content', 'No response')
# Find and execute all code blocks
code_blocks = re.findall(r"```(.*?)```", content, re.DOTALL)
for code in code_blocks:
execution_result = execute_python_code(code.strip())
if APPEND_RESULTS:
# Append the result after the code block
content = content.replace(f"```{code}```", f"```{code}```\n```{execution_result}```")
else:
# Replace the code block with its result
content = content.replace(f"```{code}```", f"```{execution_result}```")
return content
else:
logger.error(f"Error reaching LLM: {response.text}")
return "Error: Unable to reach the AI agent."
async def execute_code(code_block):
"""
Execute the given Python code in a separate, sandboxed environment.
Returns the output or any errors encountered.
"""
try:
# Example: Using subprocess to run code in an isolated environment
# This is a basic example and not secure. Use a proper sandbox setup.
result = subprocess.run(['python', '-c', code_block],
capture_output=True, text=True, timeout=5)
return result.stdout or result.stderr
except subprocess.TimeoutExpired:
return "Execution timed out."
except Exception as e:
return f"An error occurred: {str(e)}"
async def main():
"""Start the bot."""
# Create an Application instance
application = Application.builder().token(TOKEN).build()
# Add handlers to the application
# Command handlers should be registered before the generic message handler
application.add_handler(CommandHandler("start", start))
# application.add_handler(CommandHandler("screenshot", screenshot)) # Ensure screenshot function is async
application.add_handler(CommandHandler("ask", ask))
application.add_handler(CommandHandler("ok", ok))
application.add_handler(CommandHandler("bad_command", bad_command))
# This handler should be last as it's the most generic
application.add_handler(MessageHandler(filters.TEXT, echo))
# ...and the error handler
application.add_error_handler(error_handler)
# Run the bot
await application.run_polling()
import html
import traceback
async def error_handler(update: object, context: ContextTypes.DEFAULT_TYPE) -> None:
"""Log the error and send a telegram message to notify the developer."""
# Log the error before we do anything else, so we can see it even if something breaks.
logger.error("Exception while handling an update:", exc_info=context.error)
# traceback.format_exception returns the usual python message about an exception, but as a
# list of strings rather than a single string, so we have to join them together.
tb_list = traceback.format_exception(None, context.error, context.error.__traceback__)
tb_string = "".join(tb_list)
# Build the message with some markup and additional information about what happened.
# You might need to add some logic to deal with messages longer than the 4096 character limit.
update_str = update.to_dict() if isinstance(update, Update) else str(update)
message = (
"An exception was raised while handling an update\n"
f"<pre>update = {html.escape(json.dumps(update_str, indent=2, ensure_ascii=False))}"
"</pre>\n\n"
f"<pre>context.chat_data = {html.escape(str(context.chat_data))}</pre>\n\n"
f"<pre>context.user_data = {html.escape(str(context.user_data))}</pre>\n\n"
f"<pre>{html.escape(tb_string)}</pre>"
)
# Finally, send the message
await context.bot.send_message(
chat_id=DEVELOPER_CHAT_ID, text=message, parse_mode=ParseMode.HTML
)
async def bad_command(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
"""Raise an error to trigger the error handler."""
await context.bot.wrong_method_name() # type: ignore[attr-defined]
#------------------------- webagent --------------------------#
import schedule
import time
from agents.webagent import run_web_agent
async def async_main():
# Assuming this is your asynchronous main function with its full details
loop = asyncio.get_event_loop()
if loop.is_running():
loop.create_task(main())
else:
await main()
def sync_main():
# Synchronous part for scheduling
topic = "tesla news"
interval = 1 # in minutes
folder = "agent-py-bot/scrape/raw"
schedule.every(interval).minutes.do(run_web_agent, topic=topic, folder=folder)
# Run once at the start
news_json = run_web_agent(topic=topic, folder=folder)
while True:
schedule.run_pending()
time.sleep(1)
# Check if there's new data obtained from web agent
new_data, new_summary = run_web_agent(topic=topic, folder=folder)
# Use the new data to call the async function
user_message = f"New data received: {new_data}"
query_result = query_llm(user_message)
if __name__ == '__main__':
loop = asyncio.get_event_loop()
# Run the asynchronous part
if loop.is_running():
loop.create_task(async_main())
else:
loop.run_until_complete(async_main())
# Run the synchronous part
sync_main()