-
-
Save cyperdev/7e943ad8123248cd2c84f8b030fafeb9 to your computer and use it in GitHub Desktop.
Detect postmessage
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from playwright.async_api import async_playwright | |
import asyncio, json, hashlib, sys | |
init_script = """ | |
// Override addEventListener to detect 'message' handlers | |
const originalAddEventListener = window.addEventListener; | |
window.addEventListener = function(type, listener, options){ | |
if (typeof type == 'string' && type.toLowerCase() === 'message'){ | |
const stack = new Error(); | |
const stacker = stack.stack.split('\\n'); | |
const scriptLocation = stacker.find(line => | |
line.includes('.js') || line.includes('://') | |
) || 'Unknown location'; | |
console.warn(JSON.stringify({ | |
"type":"detected_event_message", | |
"listener": listener.toString(), | |
"stack": stack.stack | |
})); | |
} | |
return originalAddEventListener.call(this, type, listener, options); | |
}; | |
""" | |
console_logs = [] | |
hashes = [] | |
def log_console(msg, url): | |
msg = msg.text | |
if 'detected_event_message' in msg: | |
msg = json.loads(msg) | |
hash_struct = { | |
"lines": msg['stack'].split('\n')[0:5] | |
} | |
hashx = hashlib.md5(json.dumps(hash_struct).encode()).hexdigest() | |
if hashx not in hashes: | |
with open('results.json', 'a+') as f: | |
json.dump({'url':url, "full_stack":msg['stack']}, f, indent=2, ensure_ascii=False) | |
print("[Detected]", url, msg['stack']) | |
hashes.append(hashx) | |
async def worker(context, url, semaphore): | |
async with semaphore: | |
page = await context.new_page() | |
try: | |
# Setup monitoring | |
await page.add_init_script(init_script) | |
page.on("console", lambda msg: log_console(msg, url)) | |
# Navigate with aggressive timeouts | |
await page.goto(url, wait_until='domcontentloaded', timeout=20000) | |
await page.wait_for_timeout(1500) | |
except Exception as e: | |
print(f"Error on {url}: {str(e)}") | |
finally: | |
await page.close() | |
async def main(): | |
urls = [line.strip() for line in sys.stdin] | |
async with async_playwright() as p: | |
browser = await p.chromium.launch( | |
headless=False, | |
args=["--disable-blink-features=AutomationControlled"] | |
) | |
# Single shared context for all pages | |
context = await browser.new_context() | |
# control concurrency with semaphore | |
semaphore = asyncio.Semaphore(10) | |
tasks = [worker(context, url, semaphore) for url in urls] | |
await asyncio.gather(*tasks) | |
await context.close() | |
await browser.close() | |
if __name__ == "__main__": | |
asyncio.run(main()) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment