Created
August 17, 2025 22:48
-
-
Save KrishnanSriram/0e24853ed0dc86a3fbd7ad5fd86af1a4 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| from typing import TypedDict, Annotated, List, Literal | |
| from langgraph.graph import StateGraph, END, START | |
| from langchain_core.tools import tool | |
| from langchain_ollama import ChatOllama | |
| from bs4 import BeautifulSoup | |
| from langchain_core.runnables import RunnablePassthrough | |
| from langchain_core.prompts import PromptTemplate | |
| from langchain_core.output_parsers import StrOutputParser | |
| import os | |
| import requests | |
| import uuid | |
| MODEL_NAME = "llama3.2" | |
| # --- Define the Tools --- | |
| # Tool 1: Extracts the base filename from a URL string. | |
| @tool | |
| def extract_html_name(url: str) -> str: | |
| """ | |
| Extracts the filename (without extension) from a public link URL. | |
| Example: 'https://example.com/path/to/my_file.html' -> 'my_file' | |
| """ | |
| print(f"Extracting filename from URL: {url}") | |
| try: | |
| base_name = os.path.basename(url) | |
| if '.' in base_name: | |
| file_name = base_name.split('.')[0] | |
| else: | |
| file_name = base_name | |
| print(f"Extracted filename: {file_name}") | |
| return file_name | |
| except Exception as e: | |
| return f"Error extracting filename: {e}" | |
| # Tool 2: Fetches and parses the content of a remote HTML page. | |
| @tool | |
| def fetch_webpage_content(url: str) -> (str, str): | |
| """ | |
| Fetches a public URL and extracts the human-readable text content. | |
| """ | |
| print(f"Fetching and parsing content from URL: {url}") | |
| try: | |
| response = requests.get(url, timeout=10, verify=False) | |
| response.raise_for_status() | |
| # Use BeautifulSoup to parse the HTML and extract text | |
| soup = BeautifulSoup(response.text, 'html.parser') | |
| text = soup.get_text(separator=' ', strip=True) | |
| print("Content fetched and parsed successfully.") | |
| return text, None | |
| except requests.exceptions.RequestException as e: | |
| return (None, f"Error fetching content: {e}") | |
| except Exception as e: | |
| return (None, f"Error parsing HTML: {e}") | |
| # Tool 3: Summarizes the provided text using an LLM. | |
| # NOTE: This tool now uses the same LLM initialized below. | |
| @tool | |
| def summarized_text(text: str) -> str: | |
| """ | |
| Summarizes the provided text using a language model in not more than 20 lines. | |
| """ | |
| print("Summarizing content with LLM...") | |
| # Initialize the LLM specifically for summarization | |
| llm = ChatOllama(model=MODEL_NAME, temperature=0.3) | |
| # Create a prompt template for summarization | |
| summary_prompt = PromptTemplate.from_template( | |
| "Please provide a concise summary of the following text in not more than 10 lines as plain continuous text only:\n\n{text}" | |
| ) | |
| summarization_chain = {"text": RunnablePassthrough()} | summary_prompt | llm | StrOutputParser() | |
| summary = summarization_chain.invoke(text) | |
| # final_message = write_content_to_file(file_name, summary) | |
| return summary | |
| # Tool 4: Writes content to a local text file. | |
| @tool | |
| # def write_content_to_file(content: str, file_name: str) -> str: | |
| def write_content_to_file(content: str) -> str: | |
| """ | |
| Persist contents into a local text file, given content and filename is passed as arrays or lists. | |
| """ | |
| file_name = str(uuid.uuid4()) + ".txt" | |
| try: | |
| with open(file_name, 'w', encoding='utf-8') as f: | |
| f.write(content) | |
| print(f"Successfully created and wrote content to file: {file_name}") | |
| return f"File '{file_name}' created successfully with summarized content." | |
| except Exception as e: | |
| print(f"Error creating file: {e}") | |
| return f"Error creating file: {e}" | |
| # Define the AgentState, which will be passed between nodes in the graph. | |
| # We're now using the built-in add_messages reducer for cleaner code. | |
| class AgentState(TypedDict): | |
| remote_webpage: str | |
| html_page_content: str | |
| page_content: str | |
| file_name: str | |
| page_summary: str | |
| error_message: str | |
| final_message: str | |
| # -------------------------- | |
| # WRAPPER FUNCTIONS FOR TOOLS | |
| # -------------------------- | |
| def tool1_node(state: AgentState) -> AgentState: | |
| print(f"Invoked tool1 - {state}") | |
| file_name = extract_html_name(state["remote_webpage"]) | |
| state["file_name"] = file_name + ".txt" | |
| return state | |
| def tool2_node(state: AgentState) -> AgentState: | |
| print(f"Invoked tool2 - {state}") | |
| (page_content, error_message) = fetch_webpage_content(state["remote_webpage"]) | |
| if error_message: | |
| state["error_message"] = error_message | |
| state["final_message"] = "ERROR: Failed to fetch content. Check error_message for more information!!" | |
| else: | |
| state["page_content"] = page_content | |
| return state | |
| def tool3_node(state: AgentState) -> AgentState: | |
| print(f"Invoked tool3 - {state["file_name"]}") | |
| page_summary = summarized_text(state["page_content"]) | |
| state["page_summary"] = page_summary | |
| return state | |
| def tool4_node(state: AgentState) -> AgentState: | |
| print(f"Invoked tool4 - {state['file_name']}") | |
| # content_info = [state["file_name"],state["page_summary"]] | |
| final_message = write_content_to_file(state["page_summary"]) | |
| state["final_message"] = final_message | |
| return state | |
| def should_continue(state: AgentState) -> Literal["END", "Continue"]: | |
| if "error_message" in state: | |
| return "END" | |
| return "Continue" | |
| # -------------------------- | |
| # BUILD GRAPH | |
| # -------------------------- | |
| def main(): | |
| graph = StateGraph(AgentState) | |
| graph.add_node("tool1", tool1_node) | |
| graph.add_node("tool2", tool2_node) | |
| graph.add_node("tool3", tool3_node) | |
| graph.add_node("tool4", tool4_node) | |
| # graph.add_node("final", final_node) | |
| graph.add_edge(START, "tool1") | |
| graph.add_edge("tool1", "tool2") | |
| graph.add_conditional_edges("tool2", should_continue, {"END": END, "Continue": "tool3"}) | |
| # graph.add_edge("tool2", "tool3") | |
| graph.add_edge("tool3", "tool4") | |
| graph.add_edge("tool4", END) | |
| # graph.add_edge("final", END) | |
| app = graph.compile() | |
| return app | |
| if __name__ == "__main__": | |
| # -------------------------- | |
| # TEST RUNS | |
| # -------------------------- | |
| app = main() | |
| print("=== Example 1: With numbers ===") | |
| input_prompt = "I have 8 apples and 3 oranges" | |
| result1 = app.invoke({"remote_webpage": "https://www.usbank.com/credit-cards/bank-smartly-visa-signature-credit-card.html"}) | |
| print(result1["final_message"]) | |
| # print("\n=== Example 2: Without numbers ===") | |
| # result2 = app.invoke({"sentence": "I have apples and oranges"}) | |
| # print(result2["final_result"]) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment