Created
March 19, 2025 17:37
-
-
Save NeoHBz/2398810c167ca51a75bafcf0ba29ea49 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
import socket | |
import sys | |
import threading | |
import time | |
import os | |
target_host = '' | |
target_port = 443 | |
def clear_screen(): | |
os.system('cls' if os.name == 'nt' else 'clear') | |
def print_banner(): | |
clear_screen() | |
print("=" * 40) | |
print(" Port Forwarding Utility") | |
print("=" * 40) | |
print("Forwarding localhost:9999 to " + target_host + ":" + str(target_port)) | |
print("Press Ctrl+C to disconnect") | |
print("=" * 40) | |
print("") | |
def forward_connection(client_socket, target_host, target_port): | |
# Connect to the target | |
target_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) | |
try: | |
target_socket.connect((target_host, target_port)) | |
except Exception as e: | |
print(f"Error connecting to {target_host}:{target_port} - {str(e)}") | |
client_socket.close() | |
return | |
# Forward data in both directions | |
client_to_target = threading.Thread( | |
target=forward_data, | |
args=(client_socket, target_socket, "client → target") | |
) | |
target_to_client = threading.Thread( | |
target=forward_data, | |
args=(target_socket, client_socket, "target → client") | |
) | |
client_to_target.start() | |
target_to_client.start() | |
def forward_data(source, destination, direction): | |
try: | |
while True: | |
data = source.recv(4096) | |
if not data: | |
break | |
destination.sendall(data) | |
except: | |
pass | |
finally: | |
try: | |
source.close() | |
destination.close() | |
except: | |
pass | |
def main(): | |
try: | |
print_banner() | |
# Configuration | |
local_host = '127.0.0.1' | |
local_port = 9999 | |
# Create server socket | |
server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) | |
server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) | |
try: | |
server_socket.bind((local_host, local_port)) | |
except OSError as e: | |
if e.errno == 48 or e.errno == 10048: # Address already in use | |
print(f"Error: Port {local_port} is already in use.") | |
print("Please close any application using this port and try again.") | |
sys.exit(1) | |
raise | |
server_socket.listen(5) | |
print(f"Listening on {local_host}:{local_port}") | |
connection_count = 0 | |
while True: | |
client_socket, client_address = server_socket.accept() | |
connection_count += 1 | |
print(f"New connection from {client_address[0]}:{client_address[1]} (#{connection_count})") | |
# Start forwarding in a new thread | |
proxy_thread = threading.Thread( | |
target=forward_connection, | |
args=(client_socket, target_host, target_port) | |
) | |
proxy_thread.daemon = True | |
proxy_thread.start() | |
except KeyboardInterrupt: | |
print("\nShutting down...") | |
except Exception as e: | |
print(f"Error: {str(e)}") | |
finally: | |
try: | |
server_socket.close() | |
except: | |
pass | |
print("Port forwarding stopped.") | |
if __name__ == "__main__": | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment