Created
December 5, 2024 04:04
-
-
Save fr0gger/8da9a77f862ad73cac3ded7ac9a9908e to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# Model Chaining CTI | |
# Author: Thomas Roccia @fr0gger_ | |
# pip install aisuite requests beautifulsoup4 argparse | |
# for ollama you need a local server and the model specified | |
import aisuite as ai | |
import requests | |
from bs4 import BeautifulSoup | |
import argparse | |
import os | |
# Configure your API key | |
os.environ['ANTHROPIC_API_KEY'] ="" | |
os.environ['OPENAI_API_KEY'] = "" | |
os.environ['HUGGINGFACE_TOKEN'] = "" | |
client = ai.Client() | |
# Define the models for chaining | |
models = [ | |
"openai:gpt-4o-mini", | |
"anthropic:claude-3-5-sonnet-20240620", | |
"ollama:llama3.2:latest", | |
"openai:o1-mini" | |
] | |
def scrape_url_content(url: str) -> str: | |
""" | |
Scrapes the content of a URL and extracts the main text content. | |
Args: | |
url (str): The URL to scrape. | |
Returns: | |
str: The extracted text content from the URL. | |
""" | |
try: | |
response = requests.get(url, timeout=10) | |
response.raise_for_status() | |
soup = BeautifulSoup(response.text, "html.parser") | |
return soup.get_text(separator="\n", strip=True) | |
except requests.RequestException as e: | |
print(f"Error fetching URL content: {e}") | |
return "" | |
except Exception as e: | |
print(f"Error processing URL content: {e}") | |
return "" | |
# Function to chain prompts and responses through models | |
def model_chaining(client, url, models): | |
print(f"Step 1: Scraping content from URL: {url}") | |
url_content = scrape_url_content(url) | |
if not url_content: | |
print("Failed to scrape content from URL.") | |
return None | |
step_1_prompt = f"Summarize the following threat report and extract key findings, including TTPs and IOCs:\n\n{url_content}" | |
step_2_prompt_template = ( | |
"Based on the findings, create a markdown table of MITRE ATT&CK techniques and a separate table of IOCs:\n\n" | |
"Findings:\n{findings}" | |
) | |
step_3_prompt_template = ( | |
"Based on the provided information, generate an action plan with recommendations and potential mitigations:\n\n" | |
"Findings:\n{findings}" | |
) | |
step_4_prompt_template = ( | |
"Compile the following into a structured threat intelligence report. Include sections for Executive Summary, " | |
"Technical Analysis, MITRE Mapping, IOCs, Recommendations, and Action Plan:\n\n" | |
"Findings:\n{findings}\n\n" | |
"Recommendations and Action Plan:\n{recommendations}" | |
) | |
messages = [{"role": "user", "content": step_1_prompt}] | |
results = {} | |
for i, model in enumerate(models): | |
print(f"\n{'⭐️' * (i + 1)} Model #{i + 1}: {model}") | |
print("-" * 32) | |
print(f"📝 Current Prompt: {messages[-1]['content']}") | |
response = client.chat.completions.create( | |
model=model, | |
messages=messages, | |
temperature=1, | |
) | |
result = response.choices[0].message.content.strip() | |
print(f"🎯 Response:\n{result}") | |
if i == 0: | |
results["step_1_findings"] = result | |
step_2_prompt = step_2_prompt_template.format(findings=result) | |
messages = [{"role": "user", "content": step_2_prompt}] | |
elif i == 1: | |
results["step_2_tables"] = result | |
step_3_prompt = step_3_prompt_template.format(findings=result) | |
messages = [{"role": "user", "content": step_3_prompt}] | |
elif i == 2: | |
results["step_3_recommendations"] = result | |
step_4_prompt = step_4_prompt_template.format( | |
findings=results["step_1_findings"], | |
recommendations=result, | |
) | |
messages = [{"role": "user", "content": step_4_prompt}] | |
else: | |
results["final_report"] = result | |
return results.get("final_report") | |
if __name__ == "__main__": | |
parser = argparse.ArgumentParser(description="Process a threat report URL.") | |
parser.add_argument("url", type=str, help="The URL of the threat report") | |
args = parser.parse_args() | |
final_report = model_chaining(client, args.url, models) | |
if final_report: | |
print("\nFinal Report:") | |
print(final_report) | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment