Skip to content

Instantly share code, notes, and snippets.

@NeoHBz
Created March 19, 2025 17:37
Show Gist options
  • Save NeoHBz/2398810c167ca51a75bafcf0ba29ea49 to your computer and use it in GitHub Desktop.
Save NeoHBz/2398810c167ca51a75bafcf0ba29ea49 to your computer and use it in GitHub Desktop.
#!/usr/bin/env python3
import socket
import sys
import threading
import time
import os
target_host = ''
target_port = 443
def clear_screen():
os.system('cls' if os.name == 'nt' else 'clear')
def print_banner():
clear_screen()
print("=" * 40)
print(" Port Forwarding Utility")
print("=" * 40)
print("Forwarding localhost:9999 to " + target_host + ":" + str(target_port))
print("Press Ctrl+C to disconnect")
print("=" * 40)
print("")
def forward_connection(client_socket, target_host, target_port):
# Connect to the target
target_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
try:
target_socket.connect((target_host, target_port))
except Exception as e:
print(f"Error connecting to {target_host}:{target_port} - {str(e)}")
client_socket.close()
return
# Forward data in both directions
client_to_target = threading.Thread(
target=forward_data,
args=(client_socket, target_socket, "client → target")
)
target_to_client = threading.Thread(
target=forward_data,
args=(target_socket, client_socket, "target → client")
)
client_to_target.start()
target_to_client.start()
def forward_data(source, destination, direction):
try:
while True:
data = source.recv(4096)
if not data:
break
destination.sendall(data)
except:
pass
finally:
try:
source.close()
destination.close()
except:
pass
def main():
try:
print_banner()
# Configuration
local_host = '127.0.0.1'
local_port = 9999
# Create server socket
server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
try:
server_socket.bind((local_host, local_port))
except OSError as e:
if e.errno == 48 or e.errno == 10048: # Address already in use
print(f"Error: Port {local_port} is already in use.")
print("Please close any application using this port and try again.")
sys.exit(1)
raise
server_socket.listen(5)
print(f"Listening on {local_host}:{local_port}")
connection_count = 0
while True:
client_socket, client_address = server_socket.accept()
connection_count += 1
print(f"New connection from {client_address[0]}:{client_address[1]} (#{connection_count})")
# Start forwarding in a new thread
proxy_thread = threading.Thread(
target=forward_connection,
args=(client_socket, target_host, target_port)
)
proxy_thread.daemon = True
proxy_thread.start()
except KeyboardInterrupt:
print("\nShutting down...")
except Exception as e:
print(f"Error: {str(e)}")
finally:
try:
server_socket.close()
except:
pass
print("Port forwarding stopped.")
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment