import os import json import re import gradio as gr import requests from duckduckgo_search import DDGS from typing import List from pydantic import BaseModel, Field from langchain_community.vectorstores import FAISS from langchain_community.embeddings import HuggingFaceEmbeddings from langchain_core.documents import Document from huggingface_hub import InferenceClient import logging import pandas as pd import tempfile # Set up basic configuration for logging logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') # Environment variables and configurations huggingface_token = os.environ.get("HUGGINGFACE_TOKEN") MODELS = [ "mistralai/Mistral-7B-Instruct-v0.3", "mistralai/Mixtral-8x7B-Instruct-v0.1", "mistralai/Mistral-Nemo-Instruct-2407", "meta-llama/Meta-Llama-3.1-8B-Instruct", "meta-llama/Meta-Llama-3.1-70B-Instruct" ] MODEL_TOKEN_LIMITS = { "mistralai/Mistral-7B-Instruct-v0.3": 32768, "mistralai/Mixtral-8x7B-Instruct-v0.1": 32768, "mistralai/Mistral-Nemo-Instruct-2407": 32768, "meta-llama/Meta-Llama-3.1-8B-Instruct": 8192, "meta-llama/Meta-Llama-3.1-70B-Instruct": 8192, } DEFAULT_SYSTEM_PROMPT = """You are a world-class financial AI assistant, capable of complex reasoning and reflection. Reason through the query inside tags, and then provide your final response inside tags. Providing comprehensive and accurate information based on web search results is essential. Your goal is to synthesize the given context into a coherent and detailed response that directly addresses the user's query. Please ensure that your response is well-structured, factual. If you detect that you made a mistake in your reasoning at any point, correct yourself inside tags.""" def process_excel_file(file, model, temperature, num_calls, use_embeddings, system_prompt): try: df = pd.read_excel(file.name) results = [] for _, row in df.iterrows(): question = row['Question'] custom_system_prompt = row['System Prompt'] # Use the existing get_response_with_search function response_generator = get_response_with_search(question, model, num_calls, temperature, use_embeddings, custom_system_prompt) full_response = "" for partial_response, _ in response_generator: full_response = partial_response # Keep updating with the latest response if not full_response: full_response = "No response generated. Please check the input parameters and try again." results.append(full_response) df['Response'] = results # Save to a temporary file with tempfile.NamedTemporaryFile(delete=False, suffix='.xlsx') as tmp: df.to_excel(tmp.name, index=False) return tmp.name except Exception as e: logging.error(f"Error processing Excel file: {str(e)}") return None def upload_file(file): return file.name if file else None def download_file(file_path): return file_path def get_embeddings(): return HuggingFaceEmbeddings(model_name="sentence-transformers/stsb-roberta-large") def duckduckgo_search(query): with DDGS() as ddgs: results = ddgs.text(query, max_results=5) return results class CitingSources(BaseModel): sources: List[str] = Field( ..., description="List of sources to cite. Should be an URL of the source." ) def chatbot_interface(message, history, model, temperature, num_calls, use_embeddings, system_prompt): if not message.strip(): return "", history history = history + [(message, "")] try: for response in respond(message, history, model, temperature, num_calls, use_embeddings, system_prompt): history[-1] = (message, response) yield history except gr.CancelledError: yield history except Exception as e: logging.error(f"Unexpected error in chatbot_interface: {str(e)}") history[-1] = (message, f"An unexpected error occurred: {str(e)}") yield history def retry_last_response(history, model, temperature, num_calls, use_embeddings, system_prompt): if not history: return history last_user_msg = history[-1][0] history = history[:-1] # Remove the last response return chatbot_interface(last_user_msg, history, model, temperature, num_calls, use_embeddings, system_prompt) def respond(message, history, model, temperature, num_calls, use_embeddings, system_prompt): logging.info(f"User Query: {message}") logging.info(f"Model Used: {model}") logging.info(f"Use Embeddings: {use_embeddings}") logging.info(f"System Prompt: {system_prompt}") try: for main_content, sources in get_response_with_search(message, model, num_calls=num_calls, temperature=temperature, use_embeddings=use_embeddings, system_prompt=system_prompt): response = f"{main_content}\n\n{sources}" first_line = response.split('\n')[0] if response else '' yield response except Exception as e: logging.error(f"Error with {model}: {str(e)}") yield f"An error occurred with the {model} model: {str(e)}. Please try again or select a different model." def create_web_search_vectors(search_results): embed = get_embeddings() documents = [] for result in search_results: if 'body' in result: content = f"{result['title']}\n{result['body']}\nSource: {result['href']}" documents.append(Document(page_content=content, metadata={"source": result['href']})) return FAISS.from_documents(documents, embed) def get_response_with_search(query, model, num_calls=3, temperature=0.2, use_embeddings=True, system_prompt=DEFAULT_SYSTEM_PROMPT): search_results = duckduckgo_search(query) if use_embeddings: web_search_database = create_web_search_vectors(search_results) if not web_search_database: yield "No web search results available. Please try again.", "" return retriever = web_search_database.as_retriever(search_kwargs={"k": 5}) relevant_docs = retriever.get_relevant_documents(query) context = "\n".join([doc.page_content for doc in relevant_docs]) else: context = "\n".join([f"{result['title']}\n{result['body']}\nSource: {result['href']}" for result in search_results]) prompt = f"""Using the following context from web search results: {context} Write a detailed and complete research document that fulfills the following user request: '{query}' After writing the document, please provide a list of sources with their URLs used in your response.""" # Use Hugging Face API client = InferenceClient(model, token=huggingface_token) # Calculate input tokens (this is an approximation, you might need a more accurate method) input_tokens = len(prompt.split()) // 4 # Get the token limit for the current model model_token_limit = MODEL_TOKEN_LIMITS.get(model, 8192) # Default to 8192 if model not found # Calculate max_new_tokens max_new_tokens = min(model_token_limit - input_tokens, 6500) # Cap at 4096 to be safe main_content = "" for i in range(num_calls): try: response = client.chat_completion( messages=[ {"role": "system", "content": system_prompt}, {"role": "user", "content": prompt} ], max_tokens=max_new_tokens, temperature=temperature, stream=False, top_p=0.8, ) # Log the raw response for debugging logging.info(f"Raw API response: {response}") # Check if the response is a string (which might be an error message) if isinstance(response, str): logging.error(f"API returned an unexpected string response: {response}") yield f"An error occurred: {response}", "" return # If it's not a string, assume it's the expected object structure if hasattr(response, 'choices') and response.choices: for choice in response.choices: if hasattr(choice, 'message') and hasattr(choice.message, 'content'): chunk = choice.message.content main_content += chunk yield main_content, "" # Yield partial main content without sources else: logging.error(f"Unexpected response structure: {response}") yield "An unexpected error occurred. Please try again.", "" except Exception as e: logging.error(f"Error in API call: {str(e)}") yield f"An error occurred: {str(e)}", "" return def vote(data: gr.LikeData): if data.liked: print(f"You upvoted this response: {data.value}") else: print(f"You downvoted this response: {data.value}") css = """ /* Fine-tune chatbox size */ """ def initial_conversation(): return [ (None, "Welcome! I'm your AI assistant for web search. Here's how you can use me:\n\n" "1. Ask me any question, and I'll search the web for information.\n" "2. You can adjust the system prompt for fine-tuned responses, whether to use embeddings, and the temperature.\n" "To get started, ask me a question!") ] # Modify the Gradio interface with gr.Blocks() as demo: gr.Markdown("# AI-powered Web Search Assistant") gr.Markdown("Ask questions and get answers from web search results.") with gr.Row(): chatbot = gr.Chatbot( show_copy_button=True, likeable=True, layout="bubble", height=400, value=initial_conversation() ) with gr.Row(): message = gr.Textbox(placeholder="Ask a question", container=False, scale=7) submit_button = gr.Button("Submit") with gr.Accordion("⚙️ Parameters", open=False): model = gr.Dropdown(choices=MODELS, label="Select Model", value=MODELS[3]) temperature = gr.Slider(minimum=0.1, maximum=1.0, value=0.2, step=0.1, label="Temperature") num_calls = gr.Slider(minimum=1, maximum=5, value=1, step=1, label="Number of API Calls") use_embeddings = gr.Checkbox(label="Use Embeddings", value=False) system_prompt = gr.Textbox(label="System Prompt", lines=5, value=DEFAULT_SYSTEM_PROMPT) with gr.Accordion("Batch Processing", open=False): excel_file = gr.File(label="Upload Excel File", file_types=[".xlsx"]) process_button = gr.Button("Process Excel File") download_button = gr.File(label="Download Processed File") # Event handlers submit_button.click(chatbot_interface, inputs=[message, chatbot, model, temperature, num_calls, use_embeddings, system_prompt], outputs=chatbot) message.submit(chatbot_interface, inputs=[message, chatbot, model, temperature, num_calls, use_embeddings, system_prompt], outputs=chatbot) # Excel processing excel_file.change(upload_file, inputs=[excel_file], outputs=[excel_file]) process_button.click( process_excel_file, inputs=[excel_file, model, temperature, num_calls, use_embeddings, system_prompt], outputs=[download_button] ) if __name__ == "__main__": demo.launch(share=True)