############################################################################################################# # Title: Gradio Interface to LLM-chatbot (for recommending AI) with RAG-funcionality and ChromaDB on HF-Hub # Author: Andreas Fischer # Date: December 30th, 2023 # Last update: May 27th, 2024 ############################################################################################################## # Chroma-DB #----------- import os import chromadb dbPath="/home/af/Schreibtisch/gradio/Chroma/db" if(os.path.exists(dbPath)==False): dbPath="/home/user/app/db" print(dbPath) #client = chromadb.Client() path=dbPath client = chromadb.PersistentClient(path=path) print(client.heartbeat()) print(client.get_version()) print(client.list_collections()) from chromadb.utils import embedding_functions default_ef = embedding_functions.DefaultEmbeddingFunction() sentence_transformer_ef = embedding_functions.SentenceTransformerEmbeddingFunction(model_name="T-Systems-onsite/cross-en-de-roberta-sentence-transformer") #instructor_ef = embedding_functions.InstructorEmbeddingFunction(model_name="hkunlp/instructor-large", device="cuda") print(str(client.list_collections())) global collection if("name=ChromaDB1" in str(client.list_collections())): print("ChromaDB1 found!") collection = client.get_collection(name="ChromaDB1", embedding_function=sentence_transformer_ef) else: print("ChromaDB1 created!") collection = client.create_collection( "ChromaDB1", embedding_function=sentence_transformer_ef, metadata={"hnsw:space": "cosine"}) collection.add( documents=[ "Text generating AI model mistralai/Mixtral-8x7B-Instruct-v0.1: Suitable for text generation, e.g., social media content, marketing copy, blog posts, short stories, etc.", "Image generating AI model stabilityai/sdxl-turbo: Suitable for image generation, e.g., illustrations, graphics, AI art, etc.", "Audio transcribing AI model openai/whisper-large-v3: Suitable for audio-transcription in different languages", "Speech synthesizing AI model coqui/XTTS-v2: Suitable for generating audio from text and for voice-cloning", "Code generating AI model deepseek-ai/deepseek-coder-6.7b-instruct: Suitable for programming in Python, JavaScript, PHP, Bash and many other programming languages.", "Translation AI model Helsinki-NLP/opus-mt: Suitable for translating text, e.g., from English to German or vice versa", "Search result-integrating AI model phind/phind-v9-model: Suitable for researching current topics and for obtaining precise and up-to-date answers to questions based on web search results" ], metadatas=[{"source": "AF"}, {"source": "AF"}, {"source": "AF"}, {"source": "AF"}, {"source": "AF"}, {"source": "AF"}, {"source": "AF"}], ids=["ai1", "ai2", "ai3", "ai4", "ai5", "ai6", "ai7"], ) print("Database ready!") print(collection.count()) # Model #------- onPrem=False myModel="mistralai/Mixtral-8x7B-Instruct-v0.1" if(onPrem==False): modelPath=myModel from huggingface_hub import InferenceClient import gradio as gr client = InferenceClient( model=modelPath, #token="hf_..." ) else: import os import requests import subprocess #modelPath="/home/af/gguf/models/c4ai-command-r-v01-Q4_0.gguf" #modelPath="/home/af/gguf/models/Discolm_german_7b_v1.Q4_0.gguf" modelPath="/home/af/gguf/models/Mixtral-8x7b-instruct-v0.1.Q4_0.gguf" if(os.path.exists(modelPath)==False): #url="https://huggingface.co/TheBloke/DiscoLM_German_7b_v1-GGUF/resolve/main/discolm_german_7b_v1.Q4_0.gguf?download=true" url="https://huggingface.co/TheBloke/Mixtral-8x7B-Instruct-v0.1-GGUF/resolve/main/mixtral-8x7b-instruct-v0.1.Q4_0.gguf?download=true" response = requests.get(url) with open("./Mixtral-8x7b-instruct.gguf", mode="wb") as file: file.write(response.content) print("Model downloaded") modelPath="./Mixtral-8x7b-instruct.gguf" print(modelPath) n="20" if("Mixtral-8x7b-instruct" in modelPath): n="0" # mixtral seems to cause problems here... command = ["python3", "-m", "llama_cpp.server", "--model", modelPath, "--host", "0.0.0.0", "--port", "2600", "--n_threads", "8", "--n_gpu_layers", n] subprocess.Popen(command) print("Server ready!") # Check template #---------------- if(False): from transformers import AutoTokenizer #mod="mistralai/Mixtral-8x22B-Instruct-v0.1" #mod="mistralai/Mixtral-8x7b-instruct-v0.1" mod="VAGOsolutions/Llama-3-SauerkrautLM-8b-Instruct" tok=AutoTokenizer.from_pretrained(mod) #,token="hf_...") cha=[{"role":"system","content":"A"},{"role":"user","content":"B"},{"role":"assistant","content":"C"}] res=tok.apply_chat_template(cha) print(tok.decode(res)) cha=[{"role":"user","content":"U1"},{"role":"assistant","content":"A1"},{"role":"user","content":"U2"},{"role":"assistant","content":"A2"}] res=tok.apply_chat_template(cha) print(tok.decode(res)) # Gradio-GUI #------------ import gradio as gr import json import re def extend_prompt(message="", history=None, system=None, RAGAddon=None, system2=None, zeichenlimit=None,historylimit=4, removeHTML=True): startOfString="" if zeichenlimit is None: zeichenlimit=1000000000 # :-) template0=" [INST]{system}\n [/INST] " template1=" [INST] {message} [/INST]" template2=" {response}" if("command-r" in modelPath): #https://huggingface.co/CohereForAI/c4ai-command-r-v01 ## <|START_OF_TURN_TOKEN|><|USER_TOKEN|>Hello, how are you?<|END_OF_TURN_TOKEN|><|START_OF_TURN_TOKEN|><|CHATBOT_TOKEN|> template0="<|START_OF_TURN_TOKEN|><|SYSTEM_TOKEN|> {system}<|END_OF_TURN_TOKEN|>" template1="<|START_OF_TURN_TOKEN|><|USER_TOKEN|>{message}<|END_OF_TURN_TOKEN|><|START_OF_TURN_TOKEN|><|CHATBOT_TOKEN|>" template2="{response}<|END_OF_TURN_TOKEN|>" if("Gemma-" in modelPath): # https://huggingface.co/mistralai/Mixtral-8x7B-Instruct-v0.1 template0="user{system}" template1="user{message}model" template2="{response}" if("Mixtral-8x22B-Instruct" in modelPath): # AutoTokenizer: [INST] U1[/INST] A1[INST] U2[/INST] A2 startOfString="" template0="[INST]{system}\n [/INST] " template1="[INST] {message}[/INST]" template2=" {response}" if("Mixtral-8x7b-instruct" in modelPath): # https://huggingface.co/mistralai/Mixtral-8x7B-Instruct-v0.1 startOfString="" # AutoTokenzizer: [INST] U1 [/INST]A1 [INST] U2 [/INST]A2 template0=" [INST]{system}\n [/INST] " template1=" [INST] {message} [/INST]" template2=" {response}" if("Mistral-7B-Instruct" in modelPath): #https://huggingface.co/mistralai/Mistral-7B-Instruct-v0.2 startOfString="" template0="[INST]{system}\n [/INST]" template1="[INST] {message} [/INST]" template2=" {response}" if("Openchat-3.5" in modelPath): #https://huggingface.co/TheBloke/openchat-3.5-0106-GGUF template0="GPT4 Correct User: {system}<|end_of_turn|>GPT4 Correct Assistant: Okay.<|end_of_turn|>" template1="GPT4 Correct User: {message}<|end_of_turn|>GPT4 Correct Assistant: " template2="{response}<|end_of_turn|>" if(("Discolm_german_7b" in modelPath) or ("SauerkrautLM-7b-HerO" in modelPath)): #https://huggingface.co/VAGOsolutions/SauerkrautLM-7b-HerO template0="<|im_start|>system\n{system}<|im_end|>\n" template1="<|im_start|>user\n{message}<|im_end|>\n<|im_start|>assistant\n" template2="{response}<|im_end|>\n" if("Llama-3-SauerkrautLM-8b-Instruct" in modelPath): #https://huggingface.co/VAGOsolutions/SauerkrautLM-7b-HerO template0="<|begin_of_text|><|start_header_id|>system<|end_header_id|>\n\n{system}<|eot_id|>" template1="<|start_header_id|>user<|end_header_id|>\n\n{message}<|eot_id|><|start_header_id|>assistant<|end_header_id|>\n\n" template2="{response}<|eot_id|>\n" if("WizardLM-13B-V1.2" in modelPath): #https://huggingface.co/WizardLM/WizardLM-13B-V1.2 template0="{system} " # template1="USER: {message} ASSISTANT: " template2="{response}" if("Phi-2" in modelPath): #https://huggingface.co/TheBloke/phi-2-GGUF template0="Instruct: {system}\nOutput: Okay.\n" template1="Instruct: {message}\nOutput:" template2="{response}\n" prompt = "" if RAGAddon is not None: system += RAGAddon if system is not None: prompt += template0.format(system=system) #"" if history is not None: for user_message, bot_response in history[-historylimit:]: if user_message is None: user_message = "" if bot_response is None: bot_response = "" bot_response = re.sub("\n\n.*?","", bot_response, flags=re.DOTALL) # remove RAG-compontents if removeHTML==True: bot_response = re.sub("<(.*?)>","\n", bot_response) # remove HTML-components in general (may cause bugs with markdown-rendering) if user_message is not None: prompt += template1.format(message=user_message[:zeichenlimit]) if bot_response is not None: prompt += template2.format(response=bot_response[:zeichenlimit]) if message is not None: prompt += template1.format(message=message[:zeichenlimit]) if system2 is not None: prompt += system2 return startOfString+prompt def response( message, history, temperature=0.9, max_new_tokens=500, top_p=0.95, repetition_penalty=1.0, ): addon="" first_message = history[0][0] if history else message last_message = history[-1][0] if history else message # RAG based on last user-message results=collection.query( query_texts=[last_message], n_results=2, #where={"source": "google-docs"} #where_document={"$contains":"search_string"} ) dists=["
(relevance: "+str(round((1-d)*100)/100)+";" for d in results['distances'][0]] sources=["source: "+s["source"]+")" for s in results['metadatas'][0]] results=results['documents'][0] combination = zip(results,dists,sources) combination = [' '.join(triplets) for triplets in combination] print(combination) if(len(results)>1): addon=" Bitte berücksichtige bei deiner Antwort auf die Fragen des Users ggf. folgende Auszüge aus unserer Datenbank, sofern sie für die Antwort erforderlich sind. Beantworte die Frage knapp und präzise. Ignoriere unpassende Datenbank-Auszüge OHNE sie zu kommentieren, zu erwähnen oder aufzulisten:\n"+"\n".join(results) system="Du bist ein deutschsprachiges KI-basiertes Assistenzsystem, das zu jedem Anliegen möglichst geeignete KI-Tools empfiehlt." #+addon #+"\n\nUser-Anliegen:" #body={"prompt":system+"### Instruktion:\n"+message+"\n\n### Antwort:","max_tokens":500, "echo":"False","stream":"True"} #e.g. SauerkrautLM #formatted_prompt = extend_prompt(system+"\n"+prompt, None) #history) removeHTML=True prompt=extend_prompt( message, # current message of the user history, # complete history system, # system prompt addon, # RAG-component added to the system prompt None, # fictive first words of the AI (neither displayed nor stored) historylimit=2, # number of past messages to consider for response to current message removeHTML=removeHTML # remove HTML-components from History (to prevent bugs with Markdown) ) ## Request response from model #------------------------------ print("AI running on prem!" if(onPrem) else "AI running HFHub!") print(prompt) if(onPrem==False): temperature=float(0.9) max_new_tokens=1000 top_p=0.95 repetition_penalty=1.0 if temperature < 1e-2: temperature = 1e-2 top_p = float(top_p) generate_kwargs = dict( temperature=temperature, max_new_tokens=max_new_tokens, top_p=top_p, repetition_penalty=repetition_penalty, do_sample=True, seed=42, ) stream = client.text_generation(prompt, **generate_kwargs, stream=True, details=True, return_full_text=False) response = "" #print("User: "+message+"\nAI: ") for text in stream: part=text.token.text #print(part, end="", flush=True) response += part if removeHTML==True: response = re.sub("<(.*?)>","\n", response) # remove HTML-components in general (may cause bugs with markdown-rendering) yield response if(True): #len(history)==0): response=response+"\n\n
Sources
    "+ "".join(["
  • " + s + "
  • " for s in combination])+"
" yield response if(onPrem==True): # url="https://afischer1985-wizardlm-13b-v1-2-q4-0-gguf.hf.space/v1/completions" url="http://0.0.0.0:2600/v1/completions" body={"prompt":prompt,"max_tokens":None, "echo":"False","stream":"True"} # e.g. Mixtral-Instruct if("Discolm_german_7b" in modelPath): body.update({"stop": ["<|im_end|>"]}) # fix stop-token of DiscoLM if("Gemma-" in modelPath): body.update({"stop": ["<|im_end|>",""]}) # fix stop-token of Gemma response="" #+"("+myType+")\n" buffer="" #print("URL: "+url) #print("User: "+message+"\nAI: ") for text in requests.post(url, json=body, stream=True): #-H 'accept: application/json' -H 'Content-Type: application/json' if buffer is None: buffer="" buffer=str("".join(buffer)) # print("*** Raw String: "+str(text)+"\n***\n") text=text.decode('utf-8') if((text.startswith(": ping -")==False) & (len(text.strip("\n\r"))>0)): buffer=buffer+str(text) # print("\n*** Buffer: "+str(buffer)+"\n***\n") buffer=buffer.split('"finish_reason": null}]}') if(len(buffer)==1): buffer="".join(buffer) pass if(len(buffer)==2): part=buffer[0]+'"finish_reason": null}]}' if(part.lstrip('\n\r').startswith("data: ")): part=part.lstrip('\n\r').replace("data: ", "") try: part = str(json.loads(part)["choices"][0]["text"]) #print(part, end="", flush=True) response=response+part buffer="" # reset buffer except Exception as e: print("Exception:"+str(e)) pass if removeHTML==True: response = re.sub("<(.*?)>","\n", response) # remove HTML-components in general (may cause bugs with markdown-rendering) yield response if(True): #len(history)==0): response=response+"\n\n
Sources
    "+ "".join(["
  • " + s + "
  • " for s in combination])+"
" yield response #history.append((message, response)) # add current dialog to history gr.ChatInterface(response, chatbot=gr.Chatbot(value=[[None,"Herzlich willkommen! Ich bin ein KI-basiertes Assistenzsystem, das für jede Anfrage die am besten geeigneten KI-Tools empfiehlt.
Aktuell bin ich wenig mehr als eine Tech-Demo und kenne nur 7 KI-Modelle - also sei bitte nicht zu streng mit mir.
Was ist dein Anliegen?"]],render_markdown=True),title="German AI-RAG-Interface to the Hugging Face Hub").queue().launch(share=True) #False, server_name="0.0.0.0", server_port=7864) print("Interface up and running!")