Skip to content

Instantly share code, notes, and snippets.

@jongan69
Last active March 20, 2025 06:02
Show Gist options
  • Save jongan69/eed8957f8721f596bb70e548469b5dee to your computer and use it in GitHub Desktop.
Save jongan69/eed8957f8721f596bb70e548469b5dee to your computer and use it in GitHub Desktop.
A bunch of python Solami code, not sure what I was doing
from solders.pubkey import Pubkey # type: ignore
from spl.token.instructions import get_associated_token_address
from construct import Padding, Struct, Int64ul, Flag
from walletTradingFunctions.config import client
from walletTradingFunctions.constants import PUMP_FUN_PROGRAM
# from config import client
# from constants import PUMP_FUN_PROGRAM
def get_virtual_reserves(bonding_curve: Pubkey):
bonding_curve_struct = Struct(
Padding(8),
"virtualTokenReserves" / Int64ul,
"virtualSolReserves" / Int64ul,
"realTokenReserves" / Int64ul,
"realSolReserves" / Int64ul,
"tokenTotalSupply" / Int64ul,
"complete" / Flag
)
try:
account_info = client.get_account_info(bonding_curve)
data = account_info.value.data
parsed_data = bonding_curve_struct.parse(data)
return parsed_data
except Exception:
return None
def derive_bonding_curve_accounts(mint_str: str):
try:
mint = Pubkey.from_string(mint_str)
bonding_curve, _ = Pubkey.find_program_address(
["bonding-curve".encode(), bytes(mint)],
PUMP_FUN_PROGRAM
)
associated_bonding_curve = get_associated_token_address(bonding_curve, mint)
return bonding_curve, associated_bonding_curve
except Exception:
return None, None
def get_coin_data(mint_str: str):
bonding_curve, associated_bonding_curve = derive_bonding_curve_accounts(mint_str)
if bonding_curve is None or associated_bonding_curve is None:
return None
virtual_reserves = get_virtual_reserves(bonding_curve)
if virtual_reserves is None:
return None
try:
virtual_token_reserves = int(virtual_reserves.virtualTokenReserves)
virtual_sol_reserves = int(virtual_reserves.virtualSolReserves)
token_total_supply = int(virtual_reserves.tokenTotalSupply)
complete = bool(virtual_reserves.complete)
return {
"mint": mint_str,
"bonding_curve": str(bonding_curve),
"associated_bonding_curve": str(associated_bonding_curve),
"virtual_token_reserves": virtual_token_reserves,
"virtual_sol_reserves": virtual_sol_reserves,
"token_total_supply": token_total_supply,
"complete": complete
}
except Exception:
return None
from solana.rpc.api import Client
from solders.keypair import Keypair #type: ignore
import os
from dotenv import load_dotenv
# Load environment variables from .env
load_dotenv()
PRIV_KEY = os.getenv("PrivateKey")
RPC = os.getenv("RPC_HTTPS_URL")
UNIT_BUDGET = 100_000
UNIT_PRICE = 1_000_000
client = Client(RPC)
payer_keypair = Keypair.from_base58_string(PRIV_KEY)
from solders.pubkey import Pubkey #type: ignore
# All constants needed when interacting with the pumpfun program
GLOBAL = Pubkey.from_string("4wTV1YmiEkRvAtNtsSGPtUrqRYQMe5SKy2uB4Jjaxnjf")
FEE_RECIPIENT = Pubkey.from_string("CebN5WGQ4jvEPvsVU4EoHEpgzq1VV7AbicfhtW4xC9iM")
SYSTEM_PROGRAM = Pubkey.from_string("11111111111111111111111111111111")
TOKEN_PROGRAM = Pubkey.from_string("TokenkegQfeZyiNwAJbNbGKPFXCWuBvf9Ss623VQ5DA")
ASSOC_TOKEN_ACC_PROG = Pubkey.from_string("ATokenGPvbdGVxr1b2hvZbsiqW5xWH25efTNsLJA8knL")
RENT = Pubkey.from_string("SysvarRent111111111111111111111111111111111")
EVENT_AUTHORITY = Pubkey.from_string("Ce6TQqeHC9p8KetsN6JsjHK7UTZk7nasjjnr7XxXp9F1")
PUMP_FUN_PROGRAM = Pubkey.from_string("6EF8rrecthR5Dkzon8Nwu78hRvfCKubJ14M5uBEwF6P")
SOL_DECIMAL = 10**9
import struct
from solana.transaction import AccountMeta, Transaction
from solana.rpc.types import TokenAccountOpts, TxOpts
from spl.token.instructions import (
create_associated_token_account,
get_associated_token_address,
close_account,
CloseAccountParams
)
from solders.pubkey import Pubkey # type: ignore
from solders.instruction import Instruction # type: ignore
from solders.compute_budget import set_compute_unit_limit, set_compute_unit_price # type: ignore
from walletTradingFunctions.config import client, payer_keypair, UNIT_BUDGET, UNIT_PRICE
from walletTradingFunctions.constants import *
from walletTradingFunctions.pumputils import get_token_balance, confirm_txn
from walletTradingFunctions.coin_data import get_coin_data
# from config import client, payer_keypair, UNIT_BUDGET, UNIT_PRICE
# from constants import *
# from pumputils import get_token_balance, confirm_txn
# from coin_data import get_coin_data
def buy(mint_str: str, sol_in: float = 0.01, slippage: int = 25) -> bool:
try:
print(f"Starting buy transaction for mint: {mint_str}")
coin_data = get_coin_data(mint_str)
print("Coin data retrieved:", coin_data)
if not coin_data:
print("Failed to retrieve coin data...")
return
print(f"Virtual Sol Reserves: {coin_data['virtual_sol_reserves']}")
print(f"Virtual Token Reserves: {coin_data['virtual_token_reserves']}")
if coin_data['virtual_sol_reserves'] == 0 or coin_data['virtual_token_reserves'] == 0:
print(f"Error: Virtual reserves are zero, cannot proceed with transaction: {coin_data}")
return False
owner = payer_keypair.pubkey()
mint = Pubkey.from_string(mint_str)
token_account, token_account_instructions = None, None
try:
account_data = client.get_token_accounts_by_owner(owner, TokenAccountOpts(mint))
token_account = account_data.value[0].pubkey
token_account_instructions = None
print("Token account retrieved:", token_account)
except:
token_account = get_associated_token_address(owner, mint)
token_account_instructions = create_associated_token_account(owner, owner, mint)
print("Token account created:", token_account)
print("Calculating transaction amounts...")
virtual_sol_reserves = coin_data['virtual_sol_reserves']
virtual_token_reserves = coin_data['virtual_token_reserves']
sol_in_lamports = sol_in * SOL_DECIMAL
amount = int(sol_in_lamports * virtual_token_reserves / virtual_sol_reserves)
slippage_adjustment = 1 + (slippage / 100)
sol_in_with_slippage = sol_in * slippage_adjustment
max_sol_cost = int(sol_in_with_slippage * SOL_DECIMAL)
print(f"Amount: {amount} | Max Sol Cost: {max_sol_cost}")
MINT = Pubkey.from_string(coin_data['mint'])
BONDING_CURVE = Pubkey.from_string(coin_data['bonding_curve'])
ASSOCIATED_BONDING_CURVE = Pubkey.from_string(coin_data['associated_bonding_curve'])
ASSOCIATED_USER = token_account
USER = owner
print("Creating swap instructions...")
keys = [
AccountMeta(pubkey=GLOBAL, is_signer=False, is_writable=False),
AccountMeta(pubkey=FEE_RECIPIENT, is_signer=False, is_writable=True),
AccountMeta(pubkey=MINT, is_signer=False, is_writable=False),
AccountMeta(pubkey=BONDING_CURVE, is_signer=False, is_writable=True),
AccountMeta(pubkey=ASSOCIATED_BONDING_CURVE, is_signer=False, is_writable=True),
AccountMeta(pubkey=ASSOCIATED_USER, is_signer=False, is_writable=True),
AccountMeta(pubkey=USER, is_signer=True, is_writable=True),
AccountMeta(pubkey=SYSTEM_PROGRAM, is_signer=False, is_writable=False),
AccountMeta(pubkey=TOKEN_PROGRAM, is_signer=False, is_writable=False),
AccountMeta(pubkey=RENT, is_signer=False, is_writable=False),
AccountMeta(pubkey=EVENT_AUTHORITY, is_signer=False, is_writable=False),
AccountMeta(pubkey=PUMP_FUN_PROGRAM, is_signer=False, is_writable=False)
]
data = bytearray()
data.extend(bytes.fromhex("66063d1201daebea"))
data.extend(struct.pack('<Q', amount))
data.extend(struct.pack('<Q', max_sol_cost))
data = bytes(data)
swap_instruction = Instruction(PUMP_FUN_PROGRAM, data, keys)
print("Building transaction...")
recent_blockhash = client.get_latest_blockhash().value.blockhash
txn = Transaction(recent_blockhash=recent_blockhash, fee_payer=owner)
txn.add(set_compute_unit_price(UNIT_PRICE))
txn.add(set_compute_unit_limit(UNIT_BUDGET))
if token_account_instructions:
txn.add(token_account_instructions)
txn.add(swap_instruction)
print("Signing and sending transaction...")
txn.sign(payer_keypair)
txn_sig = client.send_transaction(txn, payer_keypair, opts=TxOpts(skip_preflight=True)).value
#txn_sig = client.send_legacy_transaction(txn, payer_keypair, opts=TxOpts(skip_preflight=True)).value
print("Transaction Signature:", txn_sig)
print("Confirming transaction...")
confirmed = confirm_txn(txn_sig)
print("Transaction confirmed:", confirmed)
return confirmed
except Exception as e:
print("Error:", e)
return None
def sell(mint_str: str, percentage: int = 100, slippage: int = 25, close_token_account: bool = True) -> bool:
try:
print(f"Starting sell transaction for mint: {mint_str}")
if not (1 <= percentage <= 100):
print("Percentage must be between 1 and 100.")
return False
coin_data = get_coin_data(mint_str)
print("Coin data retrieved:", coin_data)
if not coin_data:
print("Failed to retrieve coin data...")
return
owner = payer_keypair.pubkey()
mint = Pubkey.from_string(mint_str)
token_account = get_associated_token_address(owner, mint)
print("Calculating token price...")
sol_decimal = 10**9
token_decimal = 10**6
virtual_sol_reserves = coin_data['virtual_sol_reserves'] / sol_decimal
virtual_token_reserves = coin_data['virtual_token_reserves'] / token_decimal
token_price = virtual_sol_reserves / virtual_token_reserves
print(f"Token Price: {token_price:.20f} SOL")
print("Retrieving token balance...")
token_balance = get_token_balance(mint_str)
print("Token Balance:", token_balance)
if token_balance == 0:
print("Token Balance is zero, nothing to sell")
return
print("Calculating transaction amounts...")
token_balance = token_balance * (percentage / 100)
amount = int(token_balance * token_decimal)
sol_out = float(token_balance) * float(token_price)
slippage_adjustment = 1 - (slippage / 100)
sol_out_with_slippage = sol_out * slippage_adjustment
min_sol_output = int(sol_out_with_slippage * SOL_DECIMAL)
print(f"Amount: {amount} | Minimum Sol Out: {min_sol_output}")
MINT = Pubkey.from_string(coin_data['mint'])
BONDING_CURVE = Pubkey.from_string(coin_data['bonding_curve'])
ASSOCIATED_BONDING_CURVE = Pubkey.from_string(coin_data['associated_bonding_curve'])
ASSOCIATED_USER = token_account
USER = owner
print("Creating swap instructions...")
keys = [
AccountMeta(pubkey=GLOBAL, is_signer=False, is_writable=False),
AccountMeta(pubkey=FEE_RECIPIENT, is_signer=False, is_writable=True),
AccountMeta(pubkey=MINT, is_signer=False, is_writable=False),
AccountMeta(pubkey=BONDING_CURVE, is_signer=False, is_writable=True),
AccountMeta(pubkey=ASSOCIATED_BONDING_CURVE, is_signer=False, is_writable=True),
AccountMeta(pubkey=ASSOCIATED_USER, is_signer=False, is_writable=True),
AccountMeta(pubkey=USER, is_signer=True, is_writable=True),
AccountMeta(pubkey=SYSTEM_PROGRAM, is_signer=False, is_writable=False),
AccountMeta(pubkey=ASSOC_TOKEN_ACC_PROG, is_signer=False, is_writable=False),
AccountMeta(pubkey=TOKEN_PROGRAM, is_signer=False, is_writable=False),
AccountMeta(pubkey=EVENT_AUTHORITY, is_signer=False, is_writable=False),
AccountMeta(pubkey=PUMP_FUN_PROGRAM, is_signer=False, is_writable=False)
]
data = bytearray()
data.extend(bytes.fromhex("33e685a4017f83ad"))
data.extend(struct.pack('<Q', amount))
data.extend(struct.pack('<Q', min_sol_output))
data = bytes(data)
swap_instruction = Instruction(PUMP_FUN_PROGRAM, data, keys)
print("Building transaction...")
recent_blockhash = client.get_latest_blockhash().value.blockhash
txn = Transaction(recent_blockhash=recent_blockhash, fee_payer=owner)
txn.add(set_compute_unit_price(UNIT_PRICE))
txn.add(set_compute_unit_limit(UNIT_BUDGET))
txn.add(swap_instruction)
if percentage == 100:
print("Preparing to close token account after swap...")
close_account_instructions = close_account(CloseAccountParams(TOKEN_PROGRAM, token_account, owner, owner))
txn.add(close_account_instructions)
print("Signing and sending transaction...")
txn.sign(payer_keypair)
txn_sig = client.send_transaction(txn, payer_keypair, opts=TxOpts(skip_preflight=True)).value
#txn_sig = client.send_legacy_transaction(txn, payer_keypair, opts=TxOpts(skip_preflight=True)).value
print("Transaction Signature:", txn_sig)
print("Confirming transaction...")
confirmed = confirm_txn(txn_sig)
print("Transaction confirmed:", confirmed)
return confirmed
except Exception as e:
print("Error:", e)
return None
import json
import time
from typing import Optional, Union
import requests
from solana.transaction import Signature
from walletTradingFunctions.config import RPC, payer_keypair, client
from walletTradingFunctions.coin_data import get_coin_data
# from config import RPC, payer_keypair, client
# from coin_data import get_coin_data
def find_data(data: Union[dict, list], field: str) -> Optional[str]:
if isinstance(data, dict):
if field in data:
return data[field]
else:
for value in data.values():
result = find_data(value, field)
if result is not None:
return result
elif isinstance(data, list):
for item in data:
result = find_data(item, field)
if result is not None:
return result
return None
def get_token_balance(mint_str: str):
try:
pubkey_str = str(payer_keypair.pubkey())
headers = {"accept": "application/json", "content-type": "application/json"}
payload = {
"id": 1,
"jsonrpc": "2.0",
"method": "getTokenAccountsByOwner",
"params": [
pubkey_str,
{"mint": mint_str},
{"encoding": "jsonParsed"},
],
}
response = requests.post(RPC, json=payload, headers=headers)
ui_amount = find_data(response.json(), "uiAmount")
return float(ui_amount)
except Exception as e:
return None
def confirm_txn(txn_sig: Signature, max_retries: int = 20, retry_interval: int = 3) -> bool:
retries = 1
while retries < max_retries:
try:
txn_res = client.get_transaction(txn_sig, encoding="json", commitment="confirmed", max_supported_transaction_version=0)
txn_json = json.loads(txn_res.value.transaction.meta.to_json())
if txn_json['err'] is None:
print("Transaction confirmed... try count:", retries)
return True
print("Error: Transaction not confirmed. Retrying...")
if txn_json['err']:
print("Transaction failed.")
return False
except Exception as e:
print("Awaiting confirmation... try count:", retries)
retries += 1
time.sleep(retry_interval)
print("Max retries reached. Transaction confirmation failed.")
return None
def get_token_price(mint_str: str) -> float:
try:
coin_data = get_coin_data(mint_str)
if not coin_data:
print("Failed to retrieve coin data...")
return None
virtual_sol_reserves = coin_data['virtual_sol_reserves'] / 10**9
virtual_token_reserves = coin_data['virtual_token_reserves'] / 10**6
token_price = virtual_sol_reserves / virtual_token_reserves
print(f"Token Price: {token_price:.20f} SOL")
return token_price
except Exception as e:
print(f"Error calculating token price: {e}")
return None
import asyncio
import os
import logging
import requests
from dotenv import load_dotenv
from solathon import Client
from solana.rpc.api import Keypair
# from telegramMessaging import send_split_message, send_telegram_message
from tweeting import create_tweet
from utils import format_large_number, get_price, get_market_cap, get_token_details, truncate_text
from tabulate import tabulate
from datetime import datetime
import json
from walletPnl import get_wallet_pnl
from walletTradingFunctions.simpleBuy import buy
from walletTradingFunctions.pumpbuy import pumpbuy
from walletTradingFunctions.pumpsell import pumpsell
from walletTradingFunctions.simpleSell import sell
# Load environment variables from .env
load_dotenv()
logging.basicConfig(level=logging.INFO)
# Use environment variable or default to mainnet
solana_rpc_url = "https://api.mainnet-beta.solana.com"
solana_client = Client(solana_rpc_url)
wallet_address = os.getenv("SOLANA_WALLET_ADDRESS")
PAYER = Keypair.from_base58_string(os.getenv("PrivateKey"))
TOKEN_PROGRAM_ID = "TokenkegQfeZyiNwAJbNbGKPFXCWuBvf9Ss623VQ5DA"
SOLANA_MINT_ADDRESS = os.getenv("SOLANA_MINT_ADDRESS")
# Global set to track purchased coins
purchased_coins = set()
# Load transaction history and update purchased_coins
try:
with open('transaction_history.json', 'r') as file:
transaction_data = json.load(file)
for wallet, tokens in transaction_data.items():
for mint_address in tokens.keys():
purchased_coins.add(mint_address)
except FileNotFoundError:
logging.info("Transaction history file not found. Starting with an empty set of purchased coins.")
async def get_token_mint_addresses(wallet_address):
# Solana JSON RPC API endpoint
endpoint = "https://api.mainnet-beta.solana.com"
# Request payload to get token accounts by owner
payload = {
"jsonrpc": "2.0",
"id": 1,
"method": "getTokenAccountsByOwner",
"params": [
wallet_address,
{
"programId": "TokenkegQfeZyiNwAJbNbGKPFXCWuBvf9Ss623VQ5DA"
},
{
"encoding": "jsonParsed"
}
]
}
# Make the HTTP request
response = requests.post(endpoint, json=payload)
response_data = response.json()
# Check if the response contains any result
if "result" in response_data:
token_accounts = response_data["result"]["value"]
# Extract the mint addresses
tokens = []
dogshit = []
for account in token_accounts:
if "account" in account and "data" in account["account"]:
pubkey = account["pubkey"]
amount = account["account"]["data"]["parsed"]["info"]["tokenAmount"]["uiAmount"]
mint_address = account["account"]["data"]["parsed"]["info"]["mint"]
details, metadata = get_token_details(mint_address)
if details is not None:
logging.info("Token Details:", details)
price = get_price(mint_address)
market_cap = get_market_cap(mint_address)
if price is not None:
value = amount * get_price(mint_address)
formatted_market_cap = f"${format_large_number(market_cap)}" if market_cap != None else "N/A"
tokens.append({"pubkey": pubkey, "mint": mint_address, "amount": amount, "usd_value": f"{value:.2f}", "usd_formatted_value": f"${value:.2f}", "market_cap": f"{market_cap:.2f}" if market_cap != None else "N/A", "market_cap_formatted": formatted_market_cap, "image_uri": details["imageUri"], "name": details["name"], "symbol": f"${details['symbol']}"})
else:
if metadata is not None:
dogshit.append({"name": metadata["name"], "amount": amount})
else:
dogshit.append({"name": truncate_text(mint_address, 10), "amount": amount})
else:
dogshit.append({"name": truncate_text(mint_address, 10), "amount": amount})
return tokens, dogshit
else:
raise Exception("Error fetching token accounts")
async def get_portfolio_table(solana_wallet, wallet_name):
if solana_wallet:
wallet_pnl = get_wallet_pnl(solana_wallet)
tokens, dogshit = await get_token_mint_addresses(solana_wallet)
if tokens:
# Modified table data format
headers = [""] # Changed to single column
table_data = [
[f"{token['symbol']}: {token['amount']:.2f} (${float(token['usd_value']):.2f})"]
for token in tokens
]
# Create token value array
token_values = [
{
"name": token["name"],
"symbol": token["symbol"],
"amount": token["amount"],
"usd_value": float(token["usd_value"]),
"mint": token["mint"]
}
for token in tokens
]
else:
print("No tokens found.")
token_values = []
formatted_table = None
formatted_dogshit = None
total_value = 0
if table_data:
# Calculate total portfolio value
total_value = sum(float(token["usd_value"]) for token in tokens)
table = tabulate(table_data, headers=headers, tablefmt="plain")
# Use HTML preformatted text for better readability
formatted_table = f"<b>{wallet_name} Total Value:</b> ${total_value:.2f}\n{f'Total PnL: ${wallet_pnl}' if wallet_pnl is not None else ''}\n<b>Holdings:</b>\n{table}"
if len(dogshit) > 0:
# Correct headers for dogshit list
dogshit_headers = ["Name", "Amount"]
dogshit_table_data = [
[item["name"], item["amount"]]
for item in dogshit
]
dogshit_table = tabulate(dogshit_table_data, headers=dogshit_headers, tablefmt="plain")
formatted_dogshit = f"\n<b>Rugged Shitcoins:</b>\n\n{dogshit_table}"
return formatted_table, formatted_dogshit, total_value, token_values
else:
print("SOLANA_WALLET_ADDRESS environment variable is not set.")
async def get_wallet_positions_with_pnl(wallet_address):
tokens, _ = await get_token_mint_addresses(wallet_address)
positions = []
for token in tokens:
mint_address = token["mint"]
current_price = get_price(mint_address)
# Fetch transaction history to determine purchase date
transaction_history = await get_transaction_history(wallet_address, mint_address)
purchase_date = get_purchase_date(transaction_history)
# Get historical price at the purchase date
purchase_price = get_historical_price(mint_address, purchase_date)
if current_price is not None and purchase_price is not None:
amount = token["amount"]
current_value = amount * current_price
purchase_value = amount * purchase_price
pnl = current_value - purchase_value
pnl_percentage = (pnl / purchase_value) * 100 if purchase_value != 0 else 0
positions.append({
"name": token["name"],
"symbol": token["symbol"],
"amount": amount,
"current_value": current_value,
"purchase_value": purchase_value,
"pnl": pnl,
"pnl_percentage": pnl_percentage
})
return positions
async def get_transaction_history(wallet_address, mint_address):
# Load transaction history from a local JSON file
with open('transaction_history.json', 'r') as file:
data = json.load(file)
# Find transactions for the specific wallet and mint address
transactions = data.get(wallet_address, {}).get(mint_address, [])
return transactions
def get_purchase_date(transaction_history):
# Assume the purchase date is the date of the first transaction
if transaction_history:
first_transaction = transaction_history[0]
purchase_date = first_transaction.get('date')
return datetime.strptime(purchase_date, '%Y-%m-%d')
return None
async def display_wallet_positions_with_pnl(wallet_address, wallet_name):
positions = await get_wallet_positions_with_pnl(wallet_address)
if positions:
headers = ["Name", "Amount", "Current Value", "Purchase Value", "PnL", "PnL %"]
table_data = [
[
truncate_text(position["name"], 10, padding_on=True),
position["amount"],
f"${position['current_value']:.2f}",
f"${position['purchase_value']:.2f}",
f"${position['pnl']:.2f}",
f"{position['pnl_percentage']:.2f}%"
]
for position in positions
]
table = tabulate(table_data, headers=headers, tablefmt="plain")
formatted_table = f"\n<b>{wallet_name} Positions:</b>\n{table}"
logging.info(formatted_table)
return formatted_table
# await send_split_message(formatted_table)
else:
print("No positions found.")
def save_transaction(wallet_address, mint_address, transaction_details):
# Load existing transaction history from the JSON file
try:
with open('transaction_history.json', 'r') as file:
data = json.load(file)
except FileNotFoundError:
data = {}
# Ensure the structure for the wallet and mint address exists
if wallet_address not in data:
data[wallet_address] = {}
if mint_address not in data[wallet_address]:
data[wallet_address][mint_address] = []
# Append the new transaction details
data[wallet_address][mint_address].append(transaction_details)
# Save the updated transaction history back to the JSON file
with open('transaction_history.json', 'w') as file:
json.dump(data, file, indent=4)
async def sell_with_env_wallet(token_to_sell, amount):
# To sell entire balance, don't pass an amount
is_pump_token = token_to_sell.endswith("pump")
if is_pump_token:
sell_transaction = pumpsell(token_to_sell, amount, 10)
logging.info(f"Pumpfun Sell Transaction Result: {sell_transaction}")
if sell_transaction is not False:
logging.info(f"Pumpfun Sell Transaction Result: {sell_transaction}")
# await send_telegram_message(f"Pumpfun Sell Transaction Result: {sell_transaction}")
else:
logging.error(f"Failed to sell {amount} of mint {token_to_sell}, trying with normal sell")
sell_transaction = await sell(PAYER, token_to_sell, amount)
if sell_transaction is not False:
logging.info(f"Normal Sell Transaction Result: {sell_transaction}")
# await send_telegram_message(f"Normal Sell Transaction Result: {sell_transaction}")
else:
logging.error(f"Failed to sell {token_to_sell}")
# await send_telegram_message(f"Failed to sell {token_to_sell}")
else:
sell_transaction = await sell(PAYER, token_to_sell, amount)
# Get the current price of the token in USD
token_price = get_price(token_to_sell)
if token_price is not None and sell_transaction is not False:
# Calculate the total USD value of the sold amount
total_usd_value = amount * token_price
# Save transaction details
transaction_details = {
"date": datetime.now().strftime('%Y-%m-%d'),
"amount": -amount, # Negative value for selling
"price": token_price,
"cost": total_usd_value
}
save_transaction(wallet_address, token_to_sell, transaction_details)
else:
logging.error(f"Failed to sell token {amount} of mint {token_to_sell}")
async def buy_with_env_wallet(token_to_buy, amount):
global purchased_coins
# Check if the coin has already been purchased
if token_to_buy in purchased_coins:
logging.info(f"Coin {token_to_buy} has already been purchased. Skipping.")
return
is_pump_token = token_to_buy.endswith("pump")
if is_pump_token:
buy_transaction = pumpbuy(token_to_buy, amount, 10)
logging.info(f"Pumpfun Buy Transaction Result: {buy_transaction}")
if buy_transaction is not False:
logging.info(f"Pumpfun Buy Transaction Result: {buy_transaction}")
# await send_telegram_message(f"Pumpfun Buy Transaction Result: {buy_transaction}")
else:
logging.error(f"Failed to buy token {amount} of mint {token_to_buy}, trying with normal buy")
# await send_telegram_message(f"Failed to buy {token_to_buy} off pumpfun, trying with normal buy")
buy_transaction = await buy(PAYER, token_to_buy, amount)
logging.info(f"Normal Buy Transaction Result: {buy_transaction}")
if buy_transaction is not False:
logging.info(f"Normal Buy Transaction Result: {buy_transaction}")
# await send_telegram_message(f"Normal Buy Transaction Result: {buy_transaction}")
else:
logging.error(f"Failed to buy {token_to_buy}")
# await send_telegram_message(f"Failed to buy {token_to_buy}")
else:
buy_transaction = await buy(PAYER, token_to_buy, amount) # Enter amount of sol you wish to spend
print(buy_transaction)
# Get the current price of SOL in USD
sol_price = get_price(SOLANA_MINT_ADDRESS)
if sol_price is not None and buy_transaction is not False:
# Calculate the total USD value of the spent SOL
total_usd_value = amount * sol_price
await create_tweet(f"The bot aped into https://dexscreener.com/solana/{token_to_buy}")
# Save transaction details
transaction_details = {
"date": datetime.now().strftime('%Y-%m-%d'),
"amount": amount, # Positive value for buying
"price": sol_price,
"cost": total_usd_value
}
save_transaction(wallet_address, token_to_buy, transaction_details)
# Add the coin to the set of purchased coins
purchased_coins.add(token_to_buy)
# Update transaction history JSON file
try:
with open('transaction_history.json', 'r') as file:
transaction_data = json.load(file)
except FileNotFoundError:
transaction_data = {}
if wallet_address not in transaction_data:
transaction_data[wallet_address] = {}
if token_to_buy not in transaction_data[wallet_address]:
transaction_data[wallet_address][token_to_buy] = []
transaction_data[wallet_address][token_to_buy].append(transaction_details)
with open('transaction_history.json', 'w') as file:
json.dump(transaction_data, file, indent=4)
else:
logging.error(f"Failed to buy token {amount} of mint {token_to_buy}")
def get_historical_price(mint_address, date):
try:
# Load transaction history from a local JSON file
with open('transaction_history.json', 'r') as file:
data = json.load(file)
# Check if the mint address exists in the data
if mint_address in data:
# Iterate over transactions to find the price on the given date
for transaction in data[mint_address]:
if transaction.get('date') == date:
return transaction.get('price')
logging.warning(f"No historical price found for {mint_address} on {date}")
return None
except FileNotFoundError:
logging.error("Transaction history file not found for historical price.")
return None
def calculate_pnl():
try:
# Load transaction history from a local JSON file
with open('transaction_history.json', 'r') as file:
data = json.load(file)
pnl_results = {}
for wallet_address, tokens in data.items():
for mint_address, transactions in tokens.items():
total_cost = 0
total_amount = 0
# Debug: Log the transactions for the current mint address
logging.debug(f"Transactions for {wallet_address} - {mint_address}: {transactions}")
# Calculate total cost and amount from transaction history
for transaction in transactions:
# Debug: Log the current transaction
logging.debug(f"Processing transaction: {transaction}")
print(transaction.get('amount'))
# Ensure transaction is a dictionary
if transaction:
date = transaction["date"]
amount = transaction["amount"]
price = transaction["price"]
cost = transaction["cost"]
print(f" Date: {date}, Amount: {amount}, Price: {price}")
total_cost += amount * price
total_amount += amount
else:
logging.error(f"Unexpected transaction format: {transaction}")
# Get the current price of the token
current_price = get_price(mint_address)
if current_price is not None:
current_value = total_amount * current_price
pnl = current_value - total_cost
pnl_percentage = (pnl / total_cost) * 100 if total_cost != 0 else 0
pnl_results[mint_address] = {
"total_cost": total_cost,
"total_amount": total_amount,
"current_value": current_value,
"pnl": pnl,
"pnl_percentage": pnl_percentage
}
else:
logging.warning(f"Failed to get current price for {mint_address}")
return pnl_results
except FileNotFoundError:
logging.error("Transaction history file not found for PnL calculation.")
return None
async def check_and_sell_if_profitable():
pnl_results = calculate_pnl()
if not pnl_results:
print("No PnL data available.")
return
for mint_address, pnl_data in pnl_results.items():
pnl = pnl_data["pnl"]
pnl_percentage = pnl_data["pnl_percentage"]
total_cost = pnl_data["total_cost"]
total_amount = pnl_data["current_value"] / get_price(mint_address)
# Check if the PnL percentage is over 100%
if pnl_percentage > 100:
# Calculate the amount to sell (initial investment)
initial_investment_amount = total_cost / get_price(mint_address)
# Ensure the amount to sell is less than or equal to the total amount
amount_to_sell = min(initial_investment_amount, total_amount)
logging.info(f"Selling initial investment for {mint_address}: {amount_to_sell} tokens. PnL: ${pnl:.2f}")
# await send_telegram_message(f"Selling initial investment for {mint_address}: {amount_to_sell} tokens. PnL: ${pnl:.2f}")
# Perform the sell operation
await sell_with_env_wallet(mint_address, amount_to_sell)
else:
logging.info(f"No action needed for {mint_address}, PnL: ${pnl:.2f}")
# Example usage
# asyncio.run(check_and_sell_if_profitable())

Comprehensive Tutorial: Solana Trading Script

Overview

This script demonstrates essential Solana operations, including interacting with token accounts, managing virtual reserves, and performing token swaps using the SPL Token program. The script highlights functions like fetching token data, buying and selling tokens, and calculating portfolio performance.

Required Libraries

Ensure you have the following Python libraries installed:

  • solders for Solana data structures
  • solana for Solana RPC API
  • requests for API calls
  • construct for data parsing

You can install them using:

pip install solders solana requests construct

Setup

  1. Environment Variables: Store sensitive data like private keys and RPC URLs in a .env file.
  2. Constants: Define key program IDs and addresses for Solana operations in a constants file.
# config.py
from solana.rpc.api import Client
from solders.keypair import Keypair
import os
from dotenv import load_dotenv

load_dotenv()

PRIV_KEY = os.getenv("PrivateKey")
RPC = os.getenv("RPC_HTTPS_URL")
client = Client(RPC)
payer_keypair = Keypair.from_base58_string(PRIV_KEY)
# constants.py
from solders.pubkey import Pubkey

PUMP_FUN_PROGRAM = Pubkey.from_string("6EF8rrecthR5Dkzon8Nwu78hRvfCKubJ14M5uBEwF6P")
TOKEN_PROGRAM = Pubkey.from_string("TokenkegQfeZyiNwAJbNbGKPFXCWuBvf9Ss623VQ5DA")

Main Functions

1. Fetching Virtual Reserves

This function reads on-chain data from a bonding curve account.

from solders.pubkey import Pubkey
from construct import Struct, Int64ul, Padding, Flag
from config import client

bonding_curve_struct = Struct(
    Padding(8),
    "virtualTokenReserves" / Int64ul,
    "virtualSolReserves" / Int64ul,
    "realTokenReserves" / Int64ul,
    "realSolReserves" / Int64ul,
    "tokenTotalSupply" / Int64ul,
    "complete" / Flag
)

def get_virtual_reserves(bonding_curve: Pubkey):
    try:
        account_info = client.get_account_info(bonding_curve)
        data = account_info.value.data
        parsed_data = bonding_curve_struct.parse(data)
        return parsed_data
    except Exception as e:
        print("Error fetching virtual reserves:", e)
        return None

2. Deriving Bonding Curve Accounts

Derives accounts related to the bonding curve using Solana program-derived addresses (PDAs).

from spl.token.instructions import get_associated_token_address

def derive_bonding_curve_accounts(mint_str: str):
    try:
        mint = Pubkey.from_string(mint_str)
        bonding_curve, _ = Pubkey.find_program_address(
            ["bonding-curve".encode(), bytes(mint)], PUMP_FUN_PROGRAM
        )
        associated_bonding_curve = get_associated_token_address(bonding_curve, mint)
        return bonding_curve, associated_bonding_curve
    except Exception as e:
        print("Error deriving bonding curve accounts:", e)
        return None, None

3. Buying Tokens

A function to purchase tokens from the bonding curve.

from solana.transaction import Transaction
from solders.instruction import Instruction
from solders.compute_budget import set_compute_unit_limit, set_compute_unit_price
from config import client, payer_keypair, UNIT_BUDGET, UNIT_PRICE

def buy(mint_str: str, sol_in: float = 0.01, slippage: int = 25) -> bool:
    try:
        coin_data = get_coin_data(mint_str)
        if not coin_data:
            print("Failed to retrieve coin data.")
            return False

        owner = payer_keypair.pubkey()
        mint = Pubkey.from_string(mint_str)
        token_account = get_associated_token_address(owner, mint)

        sol_in_lamports = int(sol_in * 10**9)
        amount = int(sol_in_lamports * coin_data['virtual_token_reserves'] / coin_data['virtual_sol_reserves'])
        max_sol_cost = int(sol_in_lamports * (1 + slippage / 100))

        keys = [
            AccountMeta(pubkey=GLOBAL, is_signer=False, is_writable=False),
            AccountMeta(pubkey=FEE_RECIPIENT, is_signer=False, is_writable=True),
            AccountMeta(pubkey=mint, is_signer=False, is_writable=False),
            AccountMeta(pubkey=coin_data['bonding_curve'], is_signer=False, is_writable=True),
            AccountMeta(pubkey=coin_data['associated_bonding_curve'], is_signer=False, is_writable=True),
            AccountMeta(pubkey=token_account, is_signer=False, is_writable=True),
            AccountMeta(pubkey=owner, is_signer=True, is_writable=True),
            AccountMeta(pubkey=TOKEN_PROGRAM, is_signer=False, is_writable=False),
        ]

        data = struct.pack('<Q', amount) + struct.pack('<Q', max_sol_cost)
        swap_instruction = Instruction(PUMP_FUN_PROGRAM, data, keys)

        txn = Transaction(fee_payer=owner)
        txn.add(set_compute_unit_limit(UNIT_BUDGET))
        txn.add(set_compute_unit_price(UNIT_PRICE))
        txn.add(swap_instruction)

        txn.sign(payer_keypair)
        txn_sig = client.send_transaction(txn, payer_keypair).value

        print("Transaction confirmed:", txn_sig)
        return True
    except Exception as e:
        print("Error buying tokens:", e)
        return False

4. Selling Tokens

Implements the selling logic for tokens held in a bonding curve account.

def sell(mint_str: str, percentage: int = 100, slippage: int = 25) -> bool:
    try:
        coin_data = get_coin_data(mint_str)
        if not coin_data:
            print("Failed to retrieve coin data.")
            return False

        owner = payer_keypair.pubkey()
        mint = Pubkey.from_string(mint_str)
        token_account = get_associated_token_address(owner, mint)

        token_balance = get_token_balance(mint_str)
        if token_balance == 0:
            print("No tokens available to sell.")
            return False

        amount = int(token_balance * (percentage / 100))
        min_sol_output = int((amount / coin_data['virtual_token_reserves']) * coin_data['virtual_sol_reserves'] * (1 - slippage / 100))

        keys = [
            AccountMeta(pubkey=GLOBAL, is_signer=False, is_writable=False),
            AccountMeta(pubkey=FEE_RECIPIENT, is_signer=False, is_writable=True),
            AccountMeta(pubkey=mint, is_signer=False, is_writable=False),
            AccountMeta(pubkey=coin_data['bonding_curve'], is_signer=False, is_writable=True),
            AccountMeta(pubkey=coin_data['associated_bonding_curve'], is_signer=False, is_writable=True),
            AccountMeta(pubkey=token_account, is_signer=False, is_writable=True),
            AccountMeta(pubkey=owner, is_signer=True, is_writable=True),
            AccountMeta(pubkey=TOKEN_PROGRAM, is_signer=False, is_writable=False),
        ]

        data = struct.pack('<Q', amount) + struct.pack('<Q', min_sol_output)
        swap_instruction = Instruction(PUMP_FUN_PROGRAM, data, keys)

        txn = Transaction(fee_payer=owner)
        txn.add(set_compute_unit_limit(UNIT_BUDGET))
        txn.add(set_compute_unit_price(UNIT_PRICE))
        txn.add(swap_instruction)

        txn.sign(payer_keypair)
        txn_sig = client.send_transaction(txn, payer_keypair).value

        print("Transaction confirmed:", txn_sig)
        return True
    except Exception as e:
        print("Error selling tokens:", e)
        return False

Utility Functions

  1. Fetch token balances
def get_token_balance(mint_str: str):
    try:
        owner = payer_keypair.pubkey()
        response = client.get_token_accounts_by_owner(owner, TokenAccountOpts(mint=mint_str))
        return response.value[0].account.data.parsed.info.tokenAmount.uiAmount
    except Exception as e:
        print("Error fetching token balance:", e)
        return 0
  1. Confirm transactions
def confirm_txn(txn_sig: str):
    try:
        response = client.get_transaction(txn_sig, commitment="confirmed")
        return response.value.transaction.meta.err is None
    except Exception as e:
        print("Error confirming transaction:", e)
        return False

Conclusion

This script integrates key Solana blockchain functions, enabling efficient interactions with token accounts and bonding curves. Modify the code as needed for specific use cases like portfolio management, trading bots, or DeFi applications.

To expand this tutorial, you can integrate additional features like:

  • Automating trades based on market conditions.
  • Tracking and reporting portfolio performance.
  • Adding more robust error handling and logging.

Feel free to reach out for further enhancements or explanations!

import requests
import logging
from requests.exceptions import RequestException
from urllib3.exceptions import ProtocolError
from http.client import RemoteDisconnected
import sys
import time
logging.basicConfig(
level=logging.INFO, # or DEBUG for more detail
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[logging.StreamHandler(sys.stdout)]
)
logger = logging.getLogger(__name__)
PRICE_ENDPOINT = "https://api.jup.ag/price/v2?ids="
LOCKIN_HOLDER_DATA_ENDPOINT = "https://lockin.chat/api/holderscan"
COUSIN_FLOOR_PRICE_ENDPOINT = "https://www.retardio.exposed/api/floor"
COUSIN_TOKEN_ADDRESS = "GrRjEpwHbLE1KY3uxtAMU4ravHfbMGzWEL8HcERPb3Ad"
LOCKIN_TOKEN_ADDRESS = "8Ki8DpuWNxu9VsS3kQbarsCWMcFGWkzzA8pUPto9zBd5"
DEGENCDN_NFT_DETAILS_ENDPOINT = "https://api.degencdn.com/v1/nfts/"
RUG_CHECK_TOKEN_ENDPOINT = "https://api.rugcheck.xyz/v1/tokens/"
# Helper Function for Formatting
def format_large_number(value):
"""Format large numbers with suffixes like k, M, B, etc."""
if value >= 1_000_000_000:
return f"{value / 1_000_000_000:.2f}B"
elif value >= 1_000_000:
return f"{value / 1_000_000:.2f}M"
elif value >= 1_000:
return f"{value / 1_000:.2f}k"
else:
return f"{value:.2f}"
def truncate_text(text, max_length, suffix="...", padding_on=False, suffix_on=True):
"""
Truncates or pads the given text to a specified maximum length.
Parameters:
text (str): The text to truncate or pad.
max_length (int): The exact length the returned string should be.
suffix (str): The string to append if truncation occurs. Defaults to "...".
padding_on (bool): Whether to pad the text with periods if it's shorter than max_length. Defaults to False.
suffix_on (bool): Whether to append a suffix if truncation occurs. Defaults to True.
Returns:
str: The text, either truncated with suffix or padded with periods to match max_length.
"""
if padding_on:
padding = " "
else:
padding = ""
if len(text) > max_length:
if suffix_on:
return text[:max_length - len(suffix)] + suffix
else:
return text[:max_length]
elif len(text) < max_length:
return text + padding * (max_length - len(text))
return text
# API Caller Helper Function
def api_caller(endpoint, method, params=None, attribute=None, retries=5):
# logger.info(f"API Caller is called for {endpoint} with method {method} and params {params if params else 'None'} and attribute {attribute if attribute else 'None'}")
headers = {'Content-Type': 'application/json'}
for attempt in range(retries):
try:
if method == 'GET':
response = requests.get(endpoint, params=params)
# logger.info(f"The GET Response: {response}")
elif method == 'POST':
response = requests.post(endpoint, json=params, headers=headers)
# logger.info(f"The POST Response: {response}")
else:
logger.error("Unsupported HTTP method")
return None
response.raise_for_status() # Raises an HTTPError for bad responses
try:
data = response.json()
# logger.info(f"The API Data: {data}")
responseData = data.get(attribute) if attribute else data
# logger.info(f"The API Response Data: {responseData}")
return responseData
except ValueError:
logger.error(f"Response content is not valid JSON from {endpoint}")
return None
except requests.exceptions.HTTPError as http_err:
logger.error(f"HTTP error occurred: {http_err} - {response.text}")
if response.status_code == 429: # Rate limit error
logger.warning(f"Rate limit exceeded for {endpoint}. Retrying {attempt + 1}/{retries}...")
time.sleep(3 ** attempt) # Exponential backoff
except (RequestException, ProtocolError, RemoteDisconnected) as conn_err:
logger.error(f"Connection error on {endpoint}: {conn_err}")
except Exception as err:
logger.error(f"An unexpected error occurred: {err}")
return None
def is_valid_image(uri):
try:
response = requests.head(uri, timeout=5)
# Check if the request was successful and the content type is an image
return response.status_code == 200 and 'image' in response.headers.get('Content-Type', '')
except requests.RequestException as e:
logger.error(f"Error validating image URI {uri}: {e}")
return False
# Fetch $lockin and Retardio Cousin Data
def fetch_lockin_and_cousin_data():
logger.info("Fetching $Lockin and Retardio Cousin data")
"""Fetch additional data for $lockin holders, price, and Retardio Cousin floor price."""
# Fetch and log the full response for debugging
lockin_price_response = get_price(LOCKIN_TOKEN_ADDRESS)
lockin_holders = api_caller(LOCKIN_HOLDER_DATA_ENDPOINT, 'GET', attribute='currentHolders')
cousin_floor_price = api_caller(
COUSIN_FLOOR_PRICE_ENDPOINT, 'POST',
params={'tokenAddress': COUSIN_TOKEN_ADDRESS},
attribute='uiFormmatted'
)
if lockin_price_response is not None and lockin_holders is not None and cousin_floor_price is not None:
logger.debug(f"Lockin Price: ${lockin_price_response:.2f}")
logger.debug(f"Lockin Holder Count: {lockin_holders}")
logger.debug(f"Retardio Cousin Floor Price: {cousin_floor_price}")
lockin_price = round(lockin_price_response, 5)
else:
logger.error("Failed to fetch data.")
return lockin_holders, lockin_price, cousin_floor_price
def get_token_details(mint):
"""Fetch the details of a token using its mint address."""
logger.info(f"Fetching details for mint: {mint}")
details = api_caller(f"{DEGENCDN_NFT_DETAILS_ENDPOINT}{mint}", 'GET')
if details is not None:
metadata = details.get("metadata", None)
logger.info(f"Token Metadata: {metadata}")
else:
logger.error(f"Failed to fetch details for {mint}")
return None, None
return details, metadata
def get_token_supply(mint):
"""Fetch the supply of a token using its mint address."""
logger.info(f"Fetching supply for mint: {mint}")
supply_response = api_caller(f"{RUG_CHECK_TOKEN_ENDPOINT}{mint}/report", 'GET')
# Early return if supply_response is None
if not supply_response:
logger.error(f"No supply response received for {mint}")
return None
# Safely get nested values
token_data = supply_response.get("token", {})
if not token_data:
logger.error(f"No token data found in response for {mint}")
return None
supply = token_data.get("supply")
if supply is None:
logger.error(f"No supply data found for {mint}")
return None
decimals = token_data.get("decimals", 0) or 0 # Default to 0 if None or falsy
total_supply = supply / (10 ** decimals)
return total_supply
def get_market_cap(mint):
"""Fetch the market cap of a token using its mint address."""
logger.info(f"Fetching market cap for mint: {mint}")
try:
price = get_price(mint)
supply = get_token_supply(mint)
if price is not None and supply is not None:
market_cap = price * supply
else:
market_cap = None
except Exception as e:
logger.error(f"Error fetching market cap for {mint}: {e}")
return None
return market_cap
def get_price(mint):
"""Fetch the price of a token using its mint address."""
# logger.info(f"Fetching price for mint: {mint}")
# Call the API and fetch the price data
price_data = api_caller(PRICE_ENDPOINT + mint, 'GET', attribute='data')
if price_data and mint in price_data:
price_info = price_data[mint]
if price_info and 'price' in price_info:
price = float(price_info['price'])
# logger.info(f"Price for {mint} is {price}")
return price
else:
logger.error(f"Price information not found for {mint}")
else:
logger.error(f"Failed to fetch price data for {mint}")
return None
@jongan69
Copy link
Author

Can probably run this through ai to create a walk through of interacting with solana using python

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment