Spaces:
Running
Running
import spaces | |
from datetime import datetime, timedelta | |
import logging | |
import urllib.parse | |
import asyncio | |
import threading | |
import schedule | |
import os | |
import regex as re | |
from huggingface_hub import InferenceClient | |
import gradio as gr | |
from jinja2 import Environment, FileSystemLoader | |
import json | |
import re | |
import requests | |
import httpx | |
from bs4 import BeautifulSoup | |
from urllib.request import Request, urlopen | |
import time | |
import pandas as pd | |
import concurrent.futures | |
from duckduckgo_search import DDGS | |
from supabase import create_client, Client | |
from requests_html import AsyncHTMLSession | |
from groq import Groq | |
import google.generativeai as genai | |
# client = InferenceClient("meta-llama/Meta-Llama-3.1-8B-Instruct") | |
# Required for saving the query & response in DB | |
db_url: str = os.environ.get("SUPABASE_URL") | |
db_key: str = os.environ.get("SUPABASE_KEY") | |
supabase: Client = create_client(db_url, db_key) | |
logging.basicConfig(level=logging.INFO, format='%(message)s') | |
display_ticker=[] | |
part = "day" | |
genai.configure(api_key=os.environ["GEMINI_KEY"]) | |
client = genai.GnerativeModel(model_name='gemini-1.5-flash') | |
# client = InferenceClient("mistralai/Mistral-7B-Instruct-v0.3") | |
# client = InferenceClient("meta-llama/Meta-Llama-3-8B-Instruct") | |
# client = InferenceClient("google/gemma-2-2b-it") | |
client_func_call = InferenceClient("mistralai/Mistral-7B-Instruct-v0.1") | |
# client_func_call = InferenceClient("microsoft/Phi-3-mini-4k-instruct") | |
def time_logger(func): | |
async def wrapper(*args, **kwargs): | |
start_time = time.time() | |
result = await func(*args, **kwargs) | |
end_time = time.time() | |
elapsed_time = end_time - start_time | |
logging.info(f"{func.__name__} took {elapsed_time:.2f} seconds to complete") | |
return result | |
return wrapper | |
async def latest_earning(): | |
earning_link=[] | |
# URL of the webpage you want to scrape | |
url = "https://www.moneycontrol.com/markets/earnings/india-inc-earnings/?selected=all" | |
# Send a GET request to fetch the raw HTML content | |
response = requests.get(url,headers={'User-Agent': 'Mozilla/5.0'}) | |
# Parse the content using BeautifulSoup | |
soup = BeautifulSoup(response.content, "html.parser") | |
# Find all elements with the class rapidResCardWeb_blkTxtOne__cigbf | |
elements_with_class = soup.find_all(class_='rapidResCardWeb_blkTxtOne__cigbf') | |
# Iterate over all the elements found | |
for element in elements_with_class: | |
anchor_tag = element.find('a') # Find the first anchor tag within each element | |
if anchor_tag and 'href' in anchor_tag.attrs: | |
href = anchor_tag['href'] | |
earning_link.append(f"<a href='{href}'>{href.split('/')[-2]}</a>") | |
return ('\n'.join(earning_link)) | |
async def todays_news(): | |
url = 'https://trendlyne.com/markets-today/' | |
# logging.info("getting news from %s", url) | |
# Fetch the HTML content of the webpage | |
html_content = requests.get(url,headers={'User-Agent': 'Mozilla/5.0'}).text | |
soup = BeautifulSoup(html_content, 'html.parser') | |
insights = soup.find_all(class_='insight-box') | |
timestamps=[] | |
stock_names=[] | |
stock_href=[] | |
insight_label=[] | |
notification=[] | |
for insight in insights: | |
timestamp = insight.find(class_='insight-timestamp') | |
timestampo = timestamp.text.strip() if timestamp else timestampo | |
timestamps.append(timestampo) | |
stock_names.append(f"[{insight.find(class_='stock-name').text.strip()}](https://trendlyne.com{insight.find(class_='stock-name').find('a')['href']})") | |
insight_label.append(insight.find(class_='stock-insight-label').text.strip()) | |
notification.append(insight.find(class_='insight-notification').text.strip()) | |
df = pd.DataFrame({"Timestamp": timestamps, "Stock": stock_names, "Label": insight_label, "Notification": notification}) | |
# logging.info("Dataframe created for stocks in news today") | |
# logging.info(df.head(3)) | |
df_dict = df.to_dict('records') | |
return df_dict | |
async def get_moneycontrol_news(): | |
# Function to extract paragraphs and list items from a webpage | |
# Send a GET request to the URL | |
response = requests.get("https://www.moneycontrol.com/news/business/stocks/") | |
linkx =[] | |
# Check if the request was successful | |
if response.status_code == 200: | |
# Parse the HTML content of the page | |
soup = BeautifulSoup(response.content, 'html.parser') | |
# Find all <li> tags | |
li_tags = soup.find_all('li') | |
# Extract links from <a> tags within <li> tags that contain <p> tags | |
for li_tag in li_tags: | |
if li_tag.find('p'): # Check if <li> tag contains <p> tag | |
a_tags = li_tag.find_all('a', href=True) | |
for a_tag in a_tags: | |
linkx.append(a_tag['href']) | |
filtered_links = list(set([link for link in linkx if link.endswith('.html')])) | |
else: | |
# If the request was not successful, print an error message | |
print("Failed to retrieve article links from the moneycontrol") | |
common_content = "" | |
# Iterate over each link | |
for link in filtered_links: | |
# Send a GET request to the link | |
response = requests.get(link) | |
# Check if the request was successful | |
if response.status_code == 200: | |
# Parse the HTML content of the page | |
soup = BeautifulSoup(response.text, 'html.parser') | |
scripts = soup.find_all('script') | |
for script in scripts: | |
if 'articleBody' in script.text: | |
split_article = script.text.split('articleBody', 1)[1] | |
split_author = split_article.split('author', 1)[0] | |
# print(split_author) | |
heading = "Heading -" + link.split('/')[-1].replace("-"," ") | |
body = "Body -" + re.sub('[:\n-\";]|amp', ' ', split_author) | |
print(heading) | |
common_content = common_content + str({heading: body}) +","+"\n" | |
print(f"Article Scraped Successfully from {link}") | |
print("Creating context file...") | |
today = datetime.now() | |
formatted_date = today.strftime('%d%B%Y') | |
filename = f"templates/{formatted_date}" + '.txt' | |
with open(filename, 'w') as file: | |
file.write(f"The news given below was available on moneycontrol on {formatted_date}:\n") | |
file.write(common_content) | |
print(f"{filename} file generated") | |
return(common_content,filename) | |
greet = f"Good {part}!" | |
PLACEHOLDER =f"""<div class="message-bubble-border" style="display: flex; max-width: 700px; border-width: 1px; border-radius: 8px; box-shadow: 0 4px 6px rgba(0, 0, 0, 0.1); backdrop-filter: blur(10px);"> | |
<figure style="margin: 0; width: 200px; flex-shrink: 0; height: auto;"> | |
<img src="https://i.pinimg.com/originals/02/55/6a/02556a88bdc3d4e89787be346c6faa00.jpg" alt="Logo" style="width: 100%; height: 100%; border-top-left-radius: 8px; border-bottom-left-radius: 8px; object-fit: cover;"> | |
</figure> | |
<div style="padding: 1rem; flex-grow: 1;"> | |
<h3 style="text-align: left; font-size: 1.2rem; font-weight: 700; margin-bottom: 0.5rem;">Hi, {greet}</h3> | |
<p style="text-align: left; font-size: 16px; line-height: 1.5; margin-bottom: 15px;">Welcome! I'm your AI assistant for Indian market research and stock analysis. Ask away things like</p> | |
<ul style="text-align: left; padding-left: 20px; margin-bottom: 15px;"> | |
<li>What is the market news today?</li> | |
<li>How is HCL share performing?</li> | |
<li>Compare the financial performance of Infosys and Cyient?</li> | |
<li>Who are the promoters of Brightcomm Group?</li> | |
</ul> | |
<div style="display: flex; justify-content: space-between; align-items: center;"> | |
<div style="display: flex; flex-flow: column; justify-content: space-between;"> | |
<span style="display: inline-flex; align-items: center; border-radius: 0.375rem; background-color: rgba(229, 70, 77, 0.1); padding: 0.1rem 0.75rem; font-size: 0.75rem; font-weight: 500; color: #f88181; margin-bottom: 2.5px;"> | |
Mixtral 8x7B Instruct v0.1 | |
</span> | |
</div> | |
<div style="display: flex; justify-content: flex-end; align-items: center;"> | |
<a href="https://in.linkedin.com/in/sharad-deep-shukla" target="_blank" rel="noreferrer" style="padding: 0.5rem;"> | |
<svg xmlns="http://www.w3.org/2000/svg" width="24" height="24" fill="currentColor" viewBox="0 0 24 24"> | |
<title>LinkedIn</title> | |
<path d="M20.447 20.452h-3.554v-5.569c0-1.328-.027-3.037-1.85-3.037-1.851 0-2.135 1.445-2.135 2.935v5.671h-3.554v-11.5h3.413v1.571h.048c.475-.899 1.637-1.85 3.368-1.85 3.601 0 4.268 2.369 4.268 5.451v6.328zm-14.454-13.497c-1.145 0-2.072-.928-2.072-2.073 0-1.145.928-2.073 2.072-2.073 1.145 0 2.073.928 2.073 2.073-.001 1.145-.928 2.073-2.073 2.073zm1.777 13.497h-3.554v-11.5h3.554v11.5zm15.23-24h-18.141c-1.423 0-2.583 1.16-2.583 2.583v18.833c0 1.423 1.16 2.583 2.583 2.583h18.141c1.422 0 2.583-1.16 2.583-2.583v-18.833c-.001-1.423-1.161-2.583-2.584-2.583z"/> | |
</svg> | |
</a> | |
</div> | |
</div> | |
</div> | |
</div> | |
""" | |
def get_the_ticker(stock_name): | |
if stock_name == []: | |
final_matches=[] | |
else: | |
final_matches=[] | |
for stock in stock_name: | |
raw_query = f"YAHOO FINANCE TICKER SYMBOL OF {stock.upper()}" | |
query = raw_query.replace(" ", "+") | |
url = f'https://www.google.com/search?q={query}&FORM=HDRSC7' | |
# logging.info("searching ticker using url: %s",url) | |
# Fetch the HTML content of the webpage | |
html_content = requests.get(url,headers={'User-Agent': 'Mozilla/5.0'}).text # Fix: Added .text to access the response text | |
soup = BeautifulSoup(html_content, "html.parser") | |
pattern = re.compile(r'(\w+%[0-9A-Fa-f]{2}(?:[0-9A-Fa-f]{2}|[0-9A-Fa-f])*|[\w\.&%-]+)\.NS') # This pattern matches any word followed by .NS | |
matches = pattern.findall(str(soup)) | |
step1 = [urllib.parse.unquote(i) for i in matches] | |
matches = [urllib.parse.unquote(i) for i in step1] | |
matches = list(set(matches[:2])) | |
final_matches.extend(matches) | |
logging.info("List of matches obtained: %s", final_matches) | |
return final_matches | |
async def get_the_ticker_stat(stock): | |
try: | |
combination=[] | |
url = f'https://www.google.com/search?q={urllib.parse.quote(stock)}+site:businesstoday.in/stocks/&num=1&sca_esv=28795b6719ac1a08&sxsrf=ACQVn08xDA1EP1V6hJ-q4jLjjXSWWxgHTw:1711450545062&source=lnt&tbs=li:1&sa=X&ved=2ahUKEwj426eO4pGFAxX4n2MGHRXqBTUQpwV6BAgBEBM&biw=1280&bih=567&dpr=1.5' | |
# logging.info("getting ticker stat url from: %s", url) | |
# Fetch the HTML content of the webpage | |
html_content = requests.get(url,headers={'User-Agent': 'Mozilla/5.0'}).text # Fix: Added .text to access the response text | |
pattern = r'href="/url[?]q=(https://www.businesstoday.in/stocks/[^"]+)"' | |
# Find all matches using re.findall | |
links = re.findall(pattern, html_content) | |
links = list(set(links)) | |
# logging.info("List of links obtained for ticker stat: %s", links) | |
url = (links[0].split("&"))[0] | |
# logging.info("Final URL to fetch stats %s" , url) | |
# Fetch the HTML content of the webpage | |
html_content = requests.get(url,headers={'User-Agent': 'Mozilla/5.0','Cache-Control': 'no-cache'}).content | |
soup = BeautifulSoup(html_content, "html.parser") | |
script = soup.find("script", type="application/ld+json") | |
# Parse the JSON-LD script | |
json_data = json.loads(script.text) | |
# logging.info(json_data) | |
# Iterate over the "mainEntity" array | |
qa_dict={} | |
for entity in json_data["mainEntity"]: | |
# Get the question and answer | |
question = entity["name"].replace("'", "") | |
answer = entity["acceptedAnswer"]["text"].replace("'", "") | |
# logging.info the question and answer | |
qa_dict[question]=answer | |
combination.append(qa_dict) | |
return(combination) | |
except Exception as e: | |
logging.warning('get_the_ticker_stat failed due to %s', e) | |
return [] | |
def get_the_ticker_stat_sync(stock): | |
try: | |
combination=[] | |
url = f'https://www.google.com/search?q={urllib.parse.quote(stock)}+site:businesstoday.in/stocks/&num=1&sca_esv=28795b6719ac1a08&sxsrf=ACQVn08xDA1EP1V6hJ-q4jLjjXSWWxgHTw:1711450545062&source=lnt&tbs=li:1&sa=X&ved=2ahUKEwj426eO4pGFAxX4n2MGHRXqBTUQpwV6BAgBEBM&biw=1280&bih=567&dpr=1.5' | |
# logging.info("getting ticker stat url from: %s", url) | |
# Fetch the HTML content of the webpage | |
html_content = requests.get(url,headers={'User-Agent': 'Safari/605.1.1'}).text # Fix: Added .text to access the response text | |
pattern = r'href="/url[?]q=(https://www.businesstoday.in/stocks/[^"]+)"' | |
# Find all matches using re.findall | |
links = re.findall(pattern, html_content) | |
links = list(set(links)) | |
# logging.info("List of links obtained for ticker stat: %s", links) | |
url = (links[0].split("&"))[0] | |
# logging.info("Final URL to fetch stats %s" , url) | |
# Fetch the HTML content of the webpage | |
html_content = requests.get(url,headers={'User-Agent': 'Safari/605.1.1'}).text | |
soup = BeautifulSoup(html_content, "html.parser") | |
script = soup.find("script", type="application/ld+json") | |
# Parse the JSON-LD script | |
json_data = json.loads(script.text) | |
# logging.info(json_data) | |
# Iterate over the "mainEntity" array | |
qa_dict={} | |
for entity in json_data["mainEntity"]: | |
# Get the question and answer | |
question = entity["name"].replace("'", "") | |
answer = entity["acceptedAnswer"]["text"].replace("'", "") | |
# logging.info the question and answer | |
qa_dict[question]=answer | |
combination.append(qa_dict) | |
return(combination) | |
except Exception as e: | |
logging.warning('get_the_ticker_stat failed due to %s', e) | |
return [] | |
async def get_the_ticker_news(stock): | |
try: | |
all_news=[] | |
url = f'https://www.google.com/search?q={urllib.parse.quote(stock)}+site:trendlyne.com/research-reports&num=3&sca_esv=28795b6719ac1a08&sxsrf=ACQVn08xDA1EP1V6hJ-q4jLjjXSWWxgHTw:1711450545062&source=lnt&tbs=li:1&sa=X&ved=2ahUKEwj426eO4pGFAxX4n2MGHRXqBTUQpwV6BAgBEBM&biw=1280&bih=567&dpr=1.5' | |
# logging.info(url) | |
# Fetch the HTML content of the webpage | |
html_content = requests.get(url,headers={'User-Agent': 'Mozilla/5.0'}).text # Fix: Added .text to access the response text | |
pattern = r'href="/url[?]q=(https://trendlyne.com/research-reports/[^"]+)"' | |
# Find all matches using re.findall | |
links = re.findall(pattern, html_content) | |
links = list(set(links)) | |
# logging.info("Links fetched to get trendlyne research report: %s",links) | |
if "/%" in links[0]: | |
fetched_reports_url = links[0].split("%")[0] | |
else: | |
fetched_reports_url = links[0].split("&")[0] | |
# fetched url may look like this - https://trendlyne.com/research-reports/post/ROLTA/1146/rolta-india-ltd/ | |
# logging.info("finalised url: %s", fetched_reports_url) | |
pattern = '\/\/.*?(\d+).*\/' | |
match = re.search(pattern, fetched_reports_url) | |
if match: | |
# logging.info("unique number identified in url") | |
split_url = fetched_reports_url.split("/") | |
# logging.info(split_url) | |
unique_no = match.group(1) | |
# logging.info("Unique no identified: %s",unique_no) | |
company_name = split_url[-2] | |
# logging.info("Company name identified: %s",company_name) | |
reports_url = f"https://trendlyne.com/research-reports/stock/{unique_no}/{urllib.parse.quote(stock)}/{company_name}/" | |
else: | |
# logging.info("unique number not identified in url continuing basic flow") | |
reports_url = fetched_reports_url | |
financials_url = reports_url.replace("research-reports/stock","equity") | |
url = reports_url.replace("research-reports/stock","latest-news") | |
# logging.info("\nURL to fetch news links: %s", url) | |
# logging.info(f"\nURL to fetch health of financial insights:\n {financials_url}") | |
# logging.info(f"\nURL to fetch rating info:\n {reports_url}") | |
# Fetch the HTML content of the webpage | |
# req = Request( | |
# url=url, | |
# headers={'User-Agent': 'Mozilla/5.0'} | |
# ) | |
# webpage = urlopen(req).read() | |
# logging.info(webpage) | |
html_content = requests.get(url,headers={'User-Agent': 'Mozilla/5.0'}).text | |
html_content_financials = requests.get(financials_url,headers={'User-Agent': 'Mozilla/5.0'}).text | |
soup = BeautifulSoup(html_content, "html.parser") | |
# logging.info(soup) | |
href = None | |
a_tags = soup.find_all('a', class_='newslink') | |
if a_tags is not None: | |
links = [a_tag["href"] for a_tag in a_tags] | |
# logging.info(f"\nNews Links:\n{links}") | |
fin_soup = BeautifulSoup(html_content_financials, "html.parser") | |
matches = re.findall(r'data-companyinsights="\[(.*?)\]"', str(fin_soup)) | |
company_insight = matches[0].replace(""","").replace("\\u20b", "").replace("parameter","Metric").replace("insight_color", "Trend").replace("insight_text", "Insight") | |
all_news.append(company_insight) | |
# logging.info("All news insights obtained: %s",all_news) | |
return all_news, reports_url | |
except Exception as e: | |
logging.warning('get_the_ticker_news failed due to %s', e) | |
return [], "" | |
async def trade_setup(): | |
today = datetime.now() | |
plus_one_day = today + timedelta(days=1) | |
todays = today.strftime('%B %d') | |
tomorrow = plus_one_day.strftime('%B %d') | |
market_data= [] | |
results = DDGS().text(f'intitle:Trade Setup {tomorrow} site:cnbctv18.com', max_results=1, timelimit='w') | |
todays_url = "https://www.cnbctv18.com/market-live/" | |
todays_response = requests.get(todays_url,headers={'User-Agent': 'Mozilla/5.0'}) | |
soup = BeautifulSoup(todays_response.content, 'html.parser') | |
paragraphs = soup.find_all('p') | |
for p in paragraphs: | |
market_data.append(p.get_text()) | |
url = results[0]['href'] | |
logging.warning(url) | |
response = requests.get(url,headers={'User-Agent': 'Mozilla/5.0'}) | |
# Parse the content with BeautifulSoup | |
soup = BeautifulSoup(response.content, 'html.parser') | |
# Find all <script> tags with type "application/ld+json" | |
script_tags = soup.find_all('script', type='application/ld+json') | |
# Extract and parse JSON-LD data from each script tag | |
for script in script_tags: | |
try: | |
json_data = json.loads(script.string) | |
# Check if the JSON data is a dictionary and has '@type': 'NewsArticle' | |
if isinstance(json_data, dict) and json_data.get('@type') == 'NewsArticle': | |
summary = {"How is market right now?": market_data, "refer market": "https://www.cnbctv18.com/market-live/", "Trade setup": json_data['articleBody'], "refer setup": url} # Pretty print the filtered JSON data | |
return summary | |
except json.JSONDecodeError as e: | |
print("Error decoding JSON: ", e) | |
return {} | |
async def get_google_news(queries, max_results): | |
try: | |
results=[] ## checking | |
task = [] | |
async def duckduckgo_search(query, max_results): | |
query = query + "+ available on NSE" | |
results = DDGS().news(query, max_results, timelimit="w") | |
news = [{f"[{doc['title']}]({doc['url']})": doc['body'] for doc in results}] | |
return news | |
for query in queries: | |
task.append(duckduckgo_search(query, max_results)) | |
results = await asyncio.gather(*task) | |
if not results: | |
logging.info("No news from duckduckgo on %s", queries) | |
return results | |
except Exception as e: | |
logging.warning('get_google_news failed due to %s', e) | |
return [] | |
async def get_duckai_news(queries): | |
task = [] | |
results = DDGS().news(queries[0] + " +blogs", region='in-en', max_results=4, timelimit="w") | |
prompt = f"""#Instruction: | |
Summarise the impactful points for {queries[0]} from the input context given and mention the news link and date of publish at the end if available | |
#Format: | |
Output need to be in a json format | |
#Input: | |
""" | |
context = prompt + str(results) | |
try: | |
results = DDGS().chat(keywords=context, model="gpt-4o-mini") | |
print("Ai news",results) | |
return results | |
except Exception as e: | |
logging.warning("duckduckgo ai chat failed to bring news", e) | |
return results | |
# Function to scrape headings and body from a webpage | |
async def scrape_webpage(url): | |
# Fetch the HTML content of the webpage | |
html_content = requests.get(url,headers={'User-Agent': 'Mozilla/5.0'}).text | |
# Parse HTML using BeautifulSoup | |
soup = BeautifulSoup(html_content, 'html.parser') | |
# Find heading | |
swot_dict={} | |
try: | |
swot_div = soup.find('div', id='swot-widget') | |
# Extract the value of 'data-swotparams' attribute | |
data_swotparams = swot_div.get('data-swotparams') | |
# Decode the JSON data | |
swot_data = json.loads(data_swotparams) | |
for swot in swot_data: | |
new_dict = {swot['name']: [sublist[1] for sublist in swot['z']]} | |
swot_dict.update(new_dict) | |
except Exception as e: | |
logging.warning('scrape_webpage for swot failed due to %s', e) | |
swot_dict = {"info":"no data found"} | |
return swot_dict | |
async def raw_news(raw_query, subqueries, todays_news_func_call, ticker): | |
swot_analysis_link = f'https://widgets.trendlyne.com/web-widget/swot-widget/Poppins/{urllib.parse.quote(ticker)}/' | |
tasks = [get_the_ticker_stat(ticker), | |
get_the_ticker_news(ticker), | |
get_google_news(subqueries, str(10)), | |
scrape_webpage(swot_analysis_link)] | |
try: | |
ticker_stats, ticker_news, google_news, swot_analysis = await asyncio.gather(*tasks) | |
except Exception as exc: | |
logging.error(f'gathering all data in parllel failed with an exception: {exc}') | |
if ticker_news: | |
ticker_financials, reports = ticker_news | |
ticker_stats = get_the_ticker_stat_sync(ticker) | |
print(ticker_stats) | |
# logging.info("Brokers report link %s", reports) | |
ticker_stats_str = '' | |
for ticker_stat in ticker_stats: | |
ticker_stats_str += json.dumps(ticker_stat).replace("&", "'").replace("'", "'").replace(""", "'").replace("Link", "").replace("Heading", "").replace("Body", "").replace("Text", "").replace("{", "").replace("}", "") | |
return swot_analysis, ticker_stats_str, ticker_financials, reports, google_news | |
def format_prompt(message, history): | |
prompt = "" | |
for user_prompt, bot_response in history: | |
prompt += f'{user_prompt}' | |
prompt += f" {bot_response}" | |
prompt += f"{message}" | |
return message | |
async def generate_function_call(prompt, tomorrow, todays): | |
generate_kwargs = dict( | |
temperature=0.001, | |
max_new_tokens=200, | |
top_p=0.88, | |
repetition_penalty=1.0, | |
do_sample=True, | |
seed=42, | |
) | |
env = Environment(loader=FileSystemLoader("templates/"), autoescape=True) | |
template = env.get_template("function_calling.txt") | |
content = template.render(question=prompt, tomorrow=tomorrow, todays=todays) | |
stream = client.text_generation(content, **generate_kwargs, stream=True, details=True, return_full_text=True) | |
output = "" | |
for response in stream: | |
output += response.token.text | |
# Find the first and last curly braces in the output | |
start_index = output.rfind("{") # Find the last occurrence of "{" | |
end_index = output.find("}") # Find the first occurrence of "}" | |
if start_index != -1 and end_index != -1 and start_index < end_index: | |
json_string = output[start_index:end_index + 1] | |
try: | |
# Attempt to parse the trimmed string as JSON | |
parsed_json = json.loads(json_string) | |
return parsed_json | |
except json.JSONDecodeError: | |
return {"error": "Invalid JSON format"} | |
else: | |
return {"error": "No valid JSON found in output"} | |
def count_words(text): | |
words = text.split() | |
return f"{len(words)} words" | |
def insert_in_db(query, ticker_financials, context_files, ticker_stats, reports, news_link, news_googles, content, output): | |
try: | |
response = ( | |
supabase.table("stockx") | |
.insert({"query": query, "ticker_financials": ticker_financials, 'swot_analysis':context_files, 'ticker_stats':ticker_stats, 'reports':reports, 'other_links':news_link, 'google_news':news_googles, 'final_prompt':content, 'answer':output}) | |
.execute() | |
) | |
return response | |
except Exception as e: | |
logging.warning("some error occured in saving data to db %s", e) | |
return None | |
def generate_final_response(prompt, history): | |
global display_ticker | |
# logging the dates | |
today = datetime.now() | |
logging.info("Todays date: %s", today) | |
plus_one_day = today + timedelta(days=1) | |
todays = today.strftime('%B %d') | |
tomorrow = plus_one_day.strftime('%B %d') | |
context_files=[] | |
ticker_stats=[] | |
reports=[] | |
ticker_financials=[] | |
news_link=[] | |
news_googles=[] | |
generate_kwargs = dict(temperature=0.001,max_new_tokens=2048,top_p=0.99,repetition_penalty=1.0,do_sample=False,seed=42) | |
todays_date = today.strftime('%d%B%Y') | |
question = format_prompt(prompt, history) | |
chat_completion_params = asyncio.run(generate_function_call(question, tomorrow, todays)) | |
logging.info(chat_completion_params) | |
subqueries=chat_completion_params['alternate_query'] | |
ticker = [] | |
stock_names = chat_completion_params["stock_name"] | |
# logging.info("Getting into get_the_ticker()") | |
ticker = get_the_ticker(stock_names) | |
# logging.info("Final Ticker: %s", ticker) | |
try: | |
if len(ticker)<1: | |
# logging.info("Getting Latest News Headlines") | |
news_link.append(asyncio.run(trade_setup())) | |
news_link.append(asyncio.run(get_duckai_news(subqueries))) | |
elif chat_completion_params['todays_news_flag'] and len(ticker)>0: | |
for tick in chat_completion_params["stock_name"]: | |
news_googles.append(f"Latest News for {tick}\n\n {asyncio.run(get_google_news(subqueries, str(10)))}") | |
elif (chat_completion_params['follow_up_query'] and ticker_stats != []) or (display_ticker == ticker and ticker_stats != []): | |
# logging.info("\n\nAssigned into a followup query\n\n") | |
chat_completion_params['follow_up_query'] = True | |
else: | |
# logging.info("prompt & ticker: %s, %s", question, ticker ) | |
# logging.info("Getting into raw_news()") | |
for stock in ticker: | |
context_file, ticker_stat, ticker_financial, report, news_google = asyncio.run(raw_news(raw_query=question, subqueries=subqueries,todays_news_func_call=chat_completion_params["todays_news_flag"], ticker=stock)) | |
# Append each detail to its corresponding list | |
context_files.append({f"SWOT signals of {stock}" :context_file}) | |
ticker_stats.append({f"Stock stats of {stock}" :ticker_stat}) | |
ticker_financials.append({f"Financial stats of {stock}":ticker_financial}) | |
reports.append({f"Brokers report on {stock}":report}) | |
news_googles.append({f"News on {stock}":news_google}) | |
logging.info(f"Generating response for **{question}**") | |
env = Environment(loader=FileSystemLoader("templates/"), autoescape=True) | |
template = env.get_template("system_prompt.txt") | |
content = template.render(todays_date=todays_date,ticker_financials=ticker_financials ,response_type="Response-1",chat_completion_params=chat_completion_params,context_file=context_files, question=question,ticker=ticker, ticker_stats = ticker_stats, reports=reports, news_link=news_link, news_googles=news_googles) | |
token_size = count_words(content) | |
logging.info("Total context sent to llm: %s \n\n\n", token_size) | |
output="" | |
try: | |
##LLAMA | |
# for message in client.chat_completion( | |
# messages=[{"role": "user", "content": f"{content}"}], | |
# max_tokens=500, | |
# stream=True, | |
# ): | |
# stream = message.choices[0].delta.content | |
## GEMINI | |
stream = client.generate_content(content, stream=True) | |
## MIXTRAL | |
# stream = client.text_generation(content, **generate_kwargs, stream=True, details=True, return_full_text=True) | |
output = "" | |
for response in stream: | |
output += response.text | |
# output += response.token.text | |
yield output | |
except StopAsyncIteration: | |
yield "Sorry, could you provide more details to clarify your query" | |
finally: | |
db_response=insert_in_db(question, ticker_financials, context_files, ticker_stats, reports, news_link, news_googles, content, output) | |
logging.info("Data stored in db sucessfully" if db_response else "Failed to save the response in db") | |
except Exception as e: | |
yield f"Sorry, your query couldn't be processed. Retry with correct name of stock - An error occurred: {e}" | |
theme ="JohnSmith9982/small_and_pretty" | |
js_func = """ | |
function refresh() { | |
const url = new URL(window.location); | |
if (url.searchParams.get('__theme') !== 'dark') { | |
url.searchParams.set('__theme', 'dark'); | |
window.location.href = url.href; | |
} | |
} | |
""" | |
my_chatbot = gr.Chatbot( | |
label="Ask Anything", | |
show_label=True, | |
container=True, | |
scale=2, | |
min_width=160, | |
visible=True , | |
elem_id="my-chatbot", | |
render=True, | |
height="400%", | |
show_share_button=True, | |
avatar_images=[None, "./agenttt.png"], | |
sanitize_html=True, | |
render_markdown=True, | |
bubble_full_width=False, | |
line_breaks=False, | |
likeable=True, | |
layout="panel", | |
placeholder = PLACEHOLDER | |
) | |
demo = gr.ChatInterface( | |
fn=generate_final_response, | |
chatbot=my_chatbot, | |
title = '<h1 style="color: #FFFFFF; font-weight: bold; font-family: \'Arial\', sans-serif; text-align: center;">StockX</h1>', | |
theme=theme, | |
js= js_func, | |
css = """.gradio-container { | |
background-image: url('https://mir-s3-cdn-cf.behance.net/project_modules/max_1200/db907386019783.5d8cd86e1ce2b.jpg'); | |
background-size: auto; | |
}""" | |
) | |
demo.queue(max_size=10).launch(show_api=False) | |