import pandas as pd import requests import isort import black import flair import time from bs4 import BeautifulSoup import re import numpy as np import os from flair.data import Sentence from flair.models import SequenceTagger from transformers import AutoTokenizer, AutoModelForQuestionAnswering, pipeline import string import textwrap import tweepy import gradio as gr URL = "https://www.formula1.com/content/fom-website/en/latest/all.xml" api_key = os.environ['api_key'] secret_api_key = os.environ['secret_api_key'] access_token = os.environ['access_token'] secret_access_token = os.environ['secret_access_token'] bearer_token = os.environ['bearer_token'] def get_xml(url): # xpath is only for formula1 # use urllib.parse to check for formula1.com website or other news xml = pd.read_xml(url,xpath='channel/item') return xml cols_list = ['title', 'description', 'link', 'creator', 'guid'] previous_xml = pd.DataFrame(columns=cols_list) # care taken to only consider results where there are more words not a single word quotes def extract_quote(string): # Use the re.findall function to extract the quoted text results = re.findall(r'[“\"](.*?)[”\"]', string) quotes = [] for result in results: split_result = result.split() if len(split_result) >3: quotes.append(result) return quotes def get_names(text): # # load the NER tagger tagger = SequenceTagger.load('ner') sentence = Sentence(text) tagger.predict(sentence) names = [] for label in sentence.get_labels('ner'): if label.value == "PER": names.append(f"{label.data_point.text}") # convert to a set to remove some of the repetitions names = list(set(names)) return names def get_text(new_articles_df): """ quotes outputs a list of quotes """ dfs_dict = {} for article in new_articles_df.iterrows(): link = article[1]["guid"] request = requests.get(link) soup = BeautifulSoup(request.content, "html.parser") # class_ below will be different for different websites s = soup.find("div", class_="col-lg-8 col-xl-7 offset-xl-1 f1-article--content") lines = s.find_all("p") text_content = pd.DataFrame(data={"text": []}) for i, line in enumerate(lines): df = pd.DataFrame(data={"text": [line.text]}) text_content = pd.concat([text_content, df], ignore_index=True) strongs = s.find_all("strong") strong_content = pd.DataFrame(data={"text": []}) for i, strong in enumerate(strongs): if i > 0: df = pd.DataFrame(data={"text": [strong.text]}) strong_content = pd.concat([strong_content, df], ignore_index=True) # df has content df = text_content[~text_content["text"].isin(strong_content["text"])].reset_index( drop=True ) # df["quote"] = df["text"].apply(lambda row: extract_quote(row)) # # combine all rows into context context = "" for i,row in df.iterrows(): context += f" {row['text']}" quotes = extract_quote(context) # to save some time not computing unnecessary NER if len(quotes) != 0: speakers = get_names(context) else: speakers = () dfs_dict[link] = {'context':context, 'quotes':quotes, 'speakers':speakers} return dfs_dict def load_speaker_model(): model_name = f"deepset/xlm-roberta-large-squad2" tokenizer = AutoTokenizer.from_pretrained(model_name) model = AutoModelForQuestionAnswering.from_pretrained(model_name) question_answerer = pipeline("question-answering", model=model, tokenizer=tokenizer) return question_answerer question_answerer = load_speaker_model() def remove_punctuations(text): modified_text = "".join([character for character in text if character not in string.punctuation]) modified_text = modified_text.lstrip(" ") modified_text = modified_text.rstrip(" ") return modified_text def get_speaker_quotes(dfs_dict, question_answerer): speaker_quote = [] for link in dfs_dict: context = dfs_dict[link]['context'] quotes = dfs_dict[link]['quotes'] potential_speakers = dfs_dict[link]['speakers'] if len(quotes) != 0: #loop through the list of quotes for quote in quotes: # max_seq_len == 384 : https://huggingface.co/deepset/roberta-base-squad2 full_quote = quote if len(quote) >380: quote = quote[:384] speaker_dict = question_answerer(question=f"Who said '{quote}'?", context=context) speaker = speaker_dict['answer'] if len(speaker) >0: speaker = remove_punctuations(speaker_dict['answer']) if speaker not in potential_speakers: speaker = "" quote = "" else: pair = {'speaker':speaker, 'quote': quote, 'source':link} speaker_quote.append(pair) return speaker_quote def post_to_twitter(): twitter_api_key = api_key twitter_secret_api_key = secret_api_key twitter_access_token = access_token twitter_secret_access_token = secret_access_token twitter_bearer_token = bearer_token api = tweepy.Client(bearer_token=twitter_bearer_token, consumer_key=twitter_api_key, consumer_secret=twitter_secret_api_key, access_token=twitter_access_token, access_token_secret=twitter_secret_access_token,wait_on_rate_limit=True ) #tweet = api.create_tweet(text=post_title, in_reply_to_tweet_id=in_reply_to_tweet_id) return api def split_near_space(string, max_length): # Split the string into lines based on the maximum line width, breaking only at spaces lines = textwrap.wrap(string, width=max_length,) return lines def send_tweets(speaker_quote): for i, pair in enumerate(speaker_quote): speaker = pair['speaker'] quote = pair['quote'] source = pair['source'] total_tweet_length = len(speaker) + len(quote) + 10 # 10 is for emojis and #f1 hashtag tweet_text = f"🗣️ | {speaker}: '{quote}'" api = post_to_twitter() if total_tweet_length < 280: try: first_tweet = api.create_tweet(text=tweet_text, ) first_tweet_id = first_tweet.data['id'] second_tweet = api.create_tweet(text=f"Source: {source}", in_reply_to_tweet_id=first_tweet_id) except: continue else: quotes_list = split_near_space(quote, 280 - len(speaker) -10) thread_id = None try: for i, quote in enumerate(quotes_list): tweet_text = f"'...{quote}...'" if i == 0: tweet_text = f"🗣️ | {speaker}: '{quote}...'" if i ==len(quotes_list) -1: tweet_text = f"'...{quote}'" recent_tweet = api.create_tweet(text=tweet_text, in_reply_to_tweet_id=thread_id) thread_id = recent_tweet.data['id'] last_tweet = api.create_tweet(text=f"Source: {source}", in_reply_to_tweet_id=thread_id) except: continue def check_updates(every=300): while True: time.sleep(every) latest_xml = get_xml(URL) if ~previous_xml.equals(latest_xml): print('New articles found') new_articles_df = latest_xml[~latest_xml["guid"].isin(previous_xml["guid"])] # loops through new articles and gets the necessary text, quotes and speakers dfs_dict = get_text(new_articles_df) speaker_quote = get_speaker_quotes(dfs_dict, question_answerer) send_tweets(speaker_quote) else: print('No New article is found') demo = gr.Interface(fn=check_updates, inputs="number", outputs="text", analytics_enabled=True) demo.launch(max_threads=1, show_api=False)