Created
January 17, 2025 15:09
-
-
Save YazzyYaz/dc2057ad3eea1ca8f6c87dd68328b01c to your computer and use it in GitHub Desktop.
Solana Code
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import time | |
from solana.rpc.api import Client | |
from solana.rpc.core import RPCException | |
from solana.publickey import PublicKey | |
from solana.transaction import Transaction | |
from solana.keypair import Keypair | |
from spl.token.instructions import transfer_checked, TransferCheckedParams | |
from spl.token.constants import TOKEN_PROGRAM_ID | |
# Initialize Solana RPC client | |
RPC_URL = "https://api.mainnet-beta.solana.com" # Replace with your RPC provider | |
client = Client(RPC_URL) | |
# Define the monitored wallet (the wallet launching tokens) | |
MONITORED_WALLET = "MonitoredWalletPublicKeyHere" # Replace with the wallet public key | |
# Define the buyer wallet | |
PRIVATE_KEY = [<your-private-key-array>] # Replace with your wallet private key | |
buyer_wallet = Keypair.from_secret_key(bytes(PRIVATE_KEY)) | |
# Define buy parameters | |
BUY_AMOUNT = 1_000_000 # Example: 1 token (adjust for decimals) | |
BUY_TOKEN_DECIMALS = 6 # Adjust based on token decimals | |
BUY_RECIPIENT = "RecipientWalletAddressHere" # Replace with the liquidity pool or seller wallet | |
def check_new_mints(monitored_wallet, last_signature=None): | |
"""Check for new token mint events from the monitored wallet.""" | |
try: | |
# Get recent transactions | |
params = {"limit": 10} | |
if last_signature: | |
params["before"] = last_signature | |
transactions = client.get_confirmed_signatures_for_address2( | |
PublicKey(monitored_wallet), **params | |
) | |
if transactions["result"]: | |
return transactions["result"] | |
else: | |
return [] | |
except RPCException as e: | |
print(f"Error checking transactions: {e}") | |
return [] | |
def parse_token_creation(transactions): | |
"""Parse transactions for token creation events.""" | |
new_mints = [] | |
for txn in transactions: | |
signature = txn["signature"] | |
# Fetch transaction details | |
txn_details = client.get_confirmed_transaction(signature) | |
instructions = txn_details.get("result", {}).get("transaction", {}).get("message", {}).get("instructions", []) | |
for instruction in instructions: | |
if instruction.get("programId") == str(TOKEN_PROGRAM_ID): | |
# Check if it contains a token mint creation | |
accounts = instruction.get("accounts", []) | |
if len(accounts) > 0: | |
mint_address = accounts[0] # The first account is usually the mint | |
new_mints.append(mint_address) | |
return new_mints | |
def buy_token(mint_address): | |
"""Trigger a buy for the detected token mint.""" | |
try: | |
# Find buyer's associated token account | |
response = client.get_token_accounts_by_owner( | |
buyer_wallet.public_key, | |
{"mint": mint_address}, | |
) | |
if response["result"]["value"]: | |
source_token_account = response["result"]["value"][0]["pubkey"] | |
else: | |
raise Exception(f"No associated token account found for mint: {mint_address}") | |
# Create transaction | |
txn = Transaction() | |
txn.add( | |
transfer_checked( | |
TransferCheckedParams( | |
program_id=TOKEN_PROGRAM_ID, | |
source=PublicKey(source_token_account), | |
dest=PublicKey(BUY_RECIPIENT), | |
owner=buyer_wallet.public_key, | |
amount=BUY_AMOUNT, | |
decimals=BUY_TOKEN_DECIMALS, | |
mint=PublicKey(mint_address), | |
) | |
) | |
) | |
# Send transaction | |
print(f"Buying token from mint: {mint_address}") | |
txn_signature = client.send_transaction(txn, buyer_wallet) | |
print(f"Transaction sent. Signature: {txn_signature}") | |
except Exception as e: | |
print(f"Failed to buy token: {e}") | |
def main(): | |
"""Main monitoring loop.""" | |
last_signature = None | |
while True: | |
print("Checking for new mints...") | |
transactions = check_new_mints(MONITORED_WALLET, last_signature) | |
if transactions: | |
# Update the last seen signature | |
last_signature = transactions[0]["signature"] | |
# Parse for token creation events | |
new_mints = parse_token_creation(transactions) | |
# Trigger buy for each new mint | |
for mint in new_mints: | |
buy_token(mint) | |
time.sleep(5) # Wait 5 seconds before checking again | |
if __name__ == "__main__": | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment