Created
February 13, 2026 02:00
-
-
Save jmanhype/d2055feeb07d439e9f31c237e8f6e346 to your computer and use it in GitHub Desktop.
Research any topic across X (Twitter) + Reddit using Bird CLI + Perplexity API
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #!/usr/bin/env python3 | |
| """ | |
| last30days_combo.py - Research any topic across X (Twitter) + Reddit | |
| Uses: | |
| - Bird CLI for X/Twitter (free, uses browser cookies) | |
| - Perplexity API for Reddit (has real URLs + upvotes) | |
| Setup: | |
| 1. Install Bird: npm install -g @steipete/bird | |
| 2. Login to X in your browser, then: bird whoami | |
| 3. Get Perplexity API key from: https://www.perplexity.ai/settings/api | |
| 4. Set environment variables (or edit the script) | |
| Usage: | |
| python last30days_combo.py "your search topic" | |
| python last30days_combo.py "AI agent security" --x-only | |
| python last30days_combo.py "OpenClaw hosting" --reddit-only | |
| Environment Variables: | |
| AUTH_TOKEN - X/Twitter auth token (from browser cookies) | |
| CT0 - X/Twitter ct0 token (from browser cookies) | |
| PERPLEXITY_API_KEY - Perplexity API key | |
| Author: @daemon_agent / @StraughterG | |
| License: MIT | |
| """ | |
| import argparse | |
| import json | |
| import os | |
| import subprocess | |
| import sys | |
| from datetime import datetime, timedelta | |
| import requests | |
| # ============================================================================= | |
| # Configuration | |
| # ============================================================================= | |
| PERPLEXITY_API_KEY = os.environ.get('PERPLEXITY_API_KEY', '') | |
| PERPLEXITY_API_URL = "https://api.perplexity.ai/chat/completions" | |
| # Default subreddits to search | |
| DEFAULT_SUBREDDITS = [ | |
| "LocalLLaMA", | |
| "selfhosted", | |
| "MachineLearning", | |
| "artificial", | |
| "ClaudeAI", | |
| "ChatGPT", | |
| ] | |
| # ============================================================================= | |
| # X/Twitter via Bird CLI | |
| # ============================================================================= | |
| def search_x(topic: str, limit: int = 15) -> list[dict]: | |
| """Search X/Twitter using Bird CLI. | |
| Args: | |
| topic: Search query | |
| limit: Max results to return | |
| Returns: | |
| List of post dicts with keys: text, url, author, date, engagement | |
| """ | |
| try: | |
| result = subprocess.run( | |
| ["npx", "@steipete/bird", "search", topic, "-n", str(limit), "--json"], | |
| capture_output=True, | |
| text=True, | |
| timeout=60, | |
| ) | |
| if result.returncode != 0: | |
| # Bird outputs human-readable format by default, parse it | |
| return _parse_bird_text_output(result.stdout or result.stderr) | |
| return json.loads(result.stdout) | |
| except subprocess.TimeoutExpired: | |
| print("β οΈ Bird search timed out", file=sys.stderr) | |
| return [] | |
| except FileNotFoundError: | |
| print("β οΈ Bird CLI not found. Install with: npm install -g @steipete/bird", file=sys.stderr) | |
| return [] | |
| except json.JSONDecodeError: | |
| # Parse text output if JSON fails | |
| return _parse_bird_text_output(result.stdout) | |
| def _parse_bird_text_output(text: str) -> list[dict]: | |
| """Parse Bird's human-readable output format.""" | |
| posts = [] | |
| current_post = {} | |
| for line in text.split('\n'): | |
| line = line.strip() | |
| if line.startswith('@') and '(' in line: | |
| # New post starts with @handle (Name): | |
| if current_post: | |
| posts.append(current_post) | |
| parts = line.split('(', 1) | |
| handle = parts[0].strip() | |
| name = parts[1].rstrip('):') if len(parts) > 1 else '' | |
| current_post = {'author': handle, 'author_name': name, 'text': ''} | |
| elif line.startswith('π '): | |
| current_post['date'] = line[2:].strip() | |
| elif line.startswith('π'): | |
| current_post['url'] = line[2:].strip() | |
| elif line.startswith('β'): | |
| # Separator, finalize post | |
| if current_post and current_post.get('url'): | |
| posts.append(current_post) | |
| current_post = {} | |
| elif current_post and not line.startswith(('πΌοΈ', 'π¬', 'ββ', 'β', 'ββ')): | |
| # Regular text content | |
| if current_post.get('text'): | |
| current_post['text'] += ' ' + line | |
| else: | |
| current_post['text'] = line | |
| # Don't forget the last post | |
| if current_post and current_post.get('url'): | |
| posts.append(current_post) | |
| return posts | |
| # ============================================================================= | |
| # Reddit via Perplexity API | |
| # ============================================================================= | |
| def search_reddit( | |
| topic: str, | |
| subreddits: list[str] = None, | |
| limit: int = 15, | |
| ) -> list[dict]: | |
| """Search Reddit using Perplexity API. | |
| Args: | |
| topic: Search query | |
| subreddits: List of subreddit names to search | |
| limit: Target number of results | |
| Returns: | |
| List of thread dicts with keys: title, url, subreddit, upvotes, summary | |
| """ | |
| if not PERPLEXITY_API_KEY: | |
| print("β οΈ PERPLEXITY_API_KEY not set", file=sys.stderr) | |
| return [] | |
| if subreddits is None: | |
| subreddits = DEFAULT_SUBREDDITS | |
| headers = { | |
| "Authorization": f"Bearer {PERPLEXITY_API_KEY}", | |
| "Content-Type": "application/json", | |
| } | |
| subreddit_str = ", ".join([f"r/{s}" for s in subreddits[:3]]) # Limit for query clarity | |
| prompt = f"""Search Reddit {subreddit_str} for discussions about: {topic} | |
| Return {limit} relevant threads with: | |
| - Thread title | |
| - Full Reddit URL (https://www.reddit.com/r/.../comments/...) | |
| - Subreddit name | |
| - Upvote count | |
| - Brief summary | |
| Format as a markdown table.""" | |
| payload = { | |
| "model": "sonar", | |
| "messages": [{"role": "user", "content": prompt}], | |
| } | |
| try: | |
| response = requests.post( | |
| PERPLEXITY_API_URL, | |
| headers=headers, | |
| json=payload, | |
| timeout=120, | |
| ) | |
| response.raise_for_status() | |
| data = response.json() | |
| content = data.get("choices", [{}])[0].get("message", {}).get("content", "") | |
| return _parse_perplexity_reddit(content) | |
| except requests.RequestException as e: | |
| print(f"β οΈ Perplexity API error: {e}", file=sys.stderr) | |
| return [] | |
| def _parse_perplexity_reddit(content: str) -> list[dict]: | |
| """Parse Perplexity's markdown response into structured data.""" | |
| threads = [] | |
| # Look for markdown table rows or list items with Reddit URLs | |
| import re | |
| # Pattern for markdown links with Reddit URLs | |
| url_pattern = r'\[([^\]]+)\]\((https://[^\)]*reddit\.com[^\)]+)\)' | |
| matches = re.findall(url_pattern, content) | |
| for title, url in matches: | |
| # Try to extract subreddit from URL | |
| sub_match = re.search(r'/r/(\w+)/', url) | |
| subreddit = sub_match.group(1) if sub_match else "" | |
| # Try to find upvotes near this URL in the content | |
| upvotes = 0 | |
| upvote_match = re.search(rf'{re.escape(title)}[^\d]*(\d+(?:,\d+)?(?:\.\d+)?k?)\s*(?:upvotes?|pts?|points?)', content, re.IGNORECASE) | |
| if upvote_match: | |
| upvote_str = upvote_match.group(1).replace(',', '') | |
| if 'k' in upvote_str.lower(): | |
| upvotes = int(float(upvote_str.lower().replace('k', '')) * 1000) | |
| else: | |
| upvotes = int(float(upvote_str)) | |
| threads.append({ | |
| "title": title.strip(), | |
| "url": url.strip(), | |
| "subreddit": subreddit, | |
| "upvotes": upvotes, | |
| }) | |
| # Also store raw content for manual parsing if needed | |
| if not threads and content: | |
| threads.append({ | |
| "title": "Raw Perplexity Response", | |
| "raw_content": content, | |
| "url": "", | |
| "subreddit": "", | |
| "upvotes": 0, | |
| }) | |
| return threads | |
| # ============================================================================= | |
| # Combined Research | |
| # ============================================================================= | |
| def research( | |
| topic: str, | |
| x_limit: int = 15, | |
| reddit_limit: int = 15, | |
| subreddits: list[str] = None, | |
| x_only: bool = False, | |
| reddit_only: bool = False, | |
| ) -> dict: | |
| """Research a topic across X and Reddit. | |
| Args: | |
| topic: Search query | |
| x_limit: Max X results | |
| reddit_limit: Max Reddit results | |
| subreddits: Reddit subreddits to search | |
| x_only: Only search X | |
| reddit_only: Only search Reddit | |
| Returns: | |
| Dict with 'x' and 'reddit' keys containing results | |
| """ | |
| results = { | |
| "topic": topic, | |
| "timestamp": datetime.now().isoformat(), | |
| "x": [], | |
| "reddit": [], | |
| } | |
| if not reddit_only: | |
| print(f"π΅ Searching X for: {topic}", file=sys.stderr) | |
| results["x"] = search_x(topic, x_limit) | |
| print(f" Found {len(results['x'])} posts", file=sys.stderr) | |
| if not x_only: | |
| print(f"π Searching Reddit for: {topic}", file=sys.stderr) | |
| results["reddit"] = search_reddit(topic, subreddits, reddit_limit) | |
| print(f" Found {len(results['reddit'])} threads", file=sys.stderr) | |
| return results | |
| def format_results(results: dict, output_format: str = "markdown") -> str: | |
| """Format research results for display. | |
| Args: | |
| results: Output from research() | |
| output_format: 'markdown', 'json', or 'compact' | |
| Returns: | |
| Formatted string | |
| """ | |
| if output_format == "json": | |
| return json.dumps(results, indent=2, default=str) | |
| lines = [ | |
| f"# Research: {results['topic']}", | |
| f"*Generated: {results['timestamp']}*", | |
| "", | |
| ] | |
| # X Results | |
| if results.get("x"): | |
| lines.append("## π΅ X/Twitter") | |
| lines.append("") | |
| for post in results["x"]: | |
| author = post.get("author", "Unknown") | |
| text = post.get("text", "")[:200] | |
| url = post.get("url", "") | |
| date = post.get("date", "") | |
| lines.append(f"**{author}** ({date})") | |
| lines.append(f"> {text}") | |
| if url: | |
| lines.append(f"[Link]({url})") | |
| lines.append("") | |
| # Reddit Results | |
| if results.get("reddit"): | |
| lines.append("## π Reddit") | |
| lines.append("") | |
| lines.append("| Thread | Subreddit | Upvotes |") | |
| lines.append("|--------|-----------|---------|") | |
| for thread in results["reddit"]: | |
| if thread.get("raw_content"): | |
| # Raw response, just include it | |
| lines.append("") | |
| lines.append(thread["raw_content"]) | |
| break | |
| title = thread.get("title", "")[:60] | |
| url = thread.get("url", "") | |
| sub = thread.get("subreddit", "") | |
| upvotes = thread.get("upvotes", 0) | |
| if url: | |
| lines.append(f"| [{title}]({url}) | r/{sub} | {upvotes} |") | |
| else: | |
| lines.append(f"| {title} | r/{sub} | {upvotes} |") | |
| lines.append("") | |
| return "\n".join(lines) | |
| # ============================================================================= | |
| # CLI | |
| # ============================================================================= | |
| def main(): | |
| parser = argparse.ArgumentParser( | |
| description="Research any topic across X (Twitter) and Reddit", | |
| formatter_class=argparse.RawDescriptionHelpFormatter, | |
| epilog=""" | |
| Examples: | |
| python last30days_combo.py "AI agent security" | |
| python last30days_combo.py "OpenClaw hosting" --format json | |
| python last30days_combo.py "LLM fine-tuning" --reddit-only | |
| python last30days_combo.py "Claude Code tips" --subreddits ClaudeAI,LocalLLaMA | |
| """, | |
| ) | |
| parser.add_argument("topic", help="Search topic") | |
| parser.add_argument("--x-limit", type=int, default=15, help="Max X results (default: 15)") | |
| parser.add_argument("--reddit-limit", type=int, default=15, help="Max Reddit results (default: 15)") | |
| parser.add_argument("--subreddits", help="Comma-separated subreddit names") | |
| parser.add_argument("--x-only", action="store_true", help="Only search X") | |
| parser.add_argument("--reddit-only", action="store_true", help="Only search Reddit") | |
| parser.add_argument("--format", choices=["markdown", "json", "compact"], default="markdown", help="Output format") | |
| parser.add_argument("--output", "-o", help="Output file (default: stdout)") | |
| args = parser.parse_args() | |
| subreddits = args.subreddits.split(",") if args.subreddits else None | |
| results = research( | |
| topic=args.topic, | |
| x_limit=args.x_limit, | |
| reddit_limit=args.reddit_limit, | |
| subreddits=subreddits, | |
| x_only=args.x_only, | |
| reddit_only=args.reddit_only, | |
| ) | |
| output = format_results(results, args.format) | |
| if args.output: | |
| with open(args.output, "w", encoding="utf-8") as f: | |
| f.write(output) | |
| print(f"β Saved to {args.output}", file=sys.stderr) | |
| else: | |
| print(output) | |
| if __name__ == "__main__": | |
| main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment