Skip to content

Instantly share code, notes, and snippets.

@rubpy
Created June 20, 2024 18:39
Show Gist options
  • Save rubpy/6c57e9d12acd4b6ed84e9f205372631d to your computer and use it in GitHub Desktop.
Save rubpy/6c57e9d12acd4b6ed84e9f205372631d to your computer and use it in GitHub Desktop.
Fetching Pump.fun bonding curve state and calculating price of token/SOL.
import * as web3 from "@solana/web3.js";
//////////////////////////////////////////////////
function readBytes(buf: Buffer, offset: number, length: number): Buffer {
const end = offset + length;
if (buf.byteLength < end) throw new RangeError("range out of bounds");
return buf.subarray(offset, end);
}
function readBigUintLE(buf: Buffer, offset: number, length: number): bigint {
switch (length) {
case 1: return BigInt(buf.readUint8(offset));
case 2: return BigInt(buf.readUint16LE(offset));
case 4: return BigInt(buf.readUint32LE(offset));
case 8: return buf.readBigUint64LE(offset);
}
throw new Error(`unsupported data size (${length} bytes)`);
}
function readBoolean(buf: Buffer, offset: number, length: number): boolean {
const data = readBytes(buf, offset, length);
for (const b of data) {
if (b) return true;
}
return false;
}
//////////////////////////////////////////////////
const PUMP_CURVE_TOKEN_DECIMALS = 6;
// Calculated as the first 8 bytes of: `sha256("account:BondingCurve")`.
const PUMP_CURVE_STATE_SIGNATURE = Uint8Array.from([0x17, 0xb7, 0xf8, 0x37, 0x60, 0xd8, 0xac, 0x60]);
const PUMP_CURVE_STATE_SIZE = 0x29;
const PUMP_CURVE_STATE_OFFSETS = {
VIRTUAL_TOKEN_RESERVES: 0x08,
VIRTUAL_SOL_RESERVES: 0x10,
REAL_TOKEN_RESERVES: 0x18,
REAL_SOL_RESERVES: 0x20,
TOKEN_TOTAL_SUPPLY: 0x28,
COMPLETE: 0x30,
};
interface PumpCurveState {
virtualTokenReserves: bigint
virtualSolReserves: bigint
realTokenReserves: bigint
realSolReserves: bigint
tokenTotalSupply: bigint
complete: boolean
}
// Fetches account data of a Pump.fun bonding curve, and deserializes it
// according to `accounts.BondingCurve` (see: Pump.fun program's Anchor IDL).
async function getPumpCurveState(conn: web3.Connection, curveAddress: web3.PublicKey): Promise<PumpCurveState> {
const response = await conn.getAccountInfo(curveAddress);
if (!response || !response.data || response.data.byteLength < PUMP_CURVE_STATE_SIGNATURE.byteLength + PUMP_CURVE_STATE_SIZE) {
throw new Error("unexpected curve state");
}
const idlSignature = readBytes(response.data, 0, PUMP_CURVE_STATE_SIGNATURE.byteLength);
if (idlSignature.compare(PUMP_CURVE_STATE_SIGNATURE) !== 0) {
throw new Error("unexpected curve state IDL signature");
}
return {
virtualTokenReserves: readBigUintLE(response.data, PUMP_CURVE_STATE_OFFSETS.VIRTUAL_TOKEN_RESERVES, 8),
virtualSolReserves: readBigUintLE(response.data, PUMP_CURVE_STATE_OFFSETS.VIRTUAL_SOL_RESERVES, 8),
realTokenReserves: readBigUintLE(response.data, PUMP_CURVE_STATE_OFFSETS.REAL_TOKEN_RESERVES, 8),
realSolReserves: readBigUintLE(response.data, PUMP_CURVE_STATE_OFFSETS.REAL_SOL_RESERVES, 8),
tokenTotalSupply: readBigUintLE(response.data, PUMP_CURVE_STATE_OFFSETS.TOKEN_TOTAL_SUPPLY, 8),
complete: readBoolean(response.data, PUMP_CURVE_STATE_OFFSETS.COMPLETE, 1),
};
}
// Calculates token price (in SOL) of a Pump.fun bonding curve.
function calculatePumpCurvePrice(curveState: PumpCurveState): number {
if (curveState === null || typeof curveState !== "object"
|| !(typeof curveState.virtualTokenReserves === "bigint" && typeof curveState.virtualSolReserves === "bigint")) {
throw new TypeError("curveState must be a PumpCurveState");
}
if (curveState.virtualTokenReserves <= 0n || curveState.virtualSolReserves <= 0n) {
throw new RangeError("curve state contains invalid reserve data");
}
return (Number(curveState.virtualSolReserves) / web3.LAMPORTS_PER_SOL) / (Number(curveState.virtualTokenReserves) / 10 ** PUMP_CURVE_TOKEN_DECIMALS);
}
//////////////////////////////////////////////////
(async (rpcUrl: string) => {
const conn = new web3.Connection(rpcUrl, "confirmed");
const curveAddress = new web3.PublicKey("5BwXbPNGbfd2UuE8rkvASmJYXWXSiqmrhqJ1FX6rQnKd");
const curveState = await getPumpCurveState(conn, curveAddress);
if (!curveState) return;
const tokenPriceSol = calculatePumpCurvePrice(curveState);
//////////////////////////////////////////////////
console.log("Token price:");
console.log(` ${(tokenPriceSol.toFixed(10))} SOL`);
})(process.env.SOL_RPC_URL || "https://mainnet.helius-rpc.com/?api-key=00000000-0000-0000-0000-000000000000");
@elmgo
Copy link

elmgo commented Apr 2, 2025

This works great! I was searching everywhere for this. thanks!

@DominicOrga
Copy link

Thanks for this. Just tested and I can guarantee that the SOL price is correct. Just make sure that you are using the token's bonding curve address and not the token address itself.

@noname85x
Copy link

noname85x commented May 9, 2025

thankyou but why shows the value of the previous few seconds `import base64
import asyncio
from decimal import Decimal, getcontext

from solders.pubkey import Pubkey
from solana.rpc.async_api import AsyncClient
from borsh_construct import CStruct, U64, Bool, U8
PUMP_PROGRAM_ADDRESS_STR = "6EF8rrecthR5Dkzon8Nwu78hRvfCKubJ14M5uBEwF6P"
HELIUS_RPC_ENDPOINT = "https://mainnet.helius-rpc.com/?x"

MINT_ADDRESS_STR = "U6FMG6UUhVzaYfCa8h7ZrGkMyw5ipwAF9t1tQiPAMhR" # İzlenecek token mint adresi

PUMP_CURVE_TOKEN_DECIMALS_COUNT = 6

PUMP_CURVE_STATE_IDENTIFIER = bytes([0x17, 0xb7, 0xf8, 0x37, 0x60, 0xd8, 0xac, 0x60])
PUMP_CURVE_STATE_FIELD_OFFSETS = {
"VIRTUAL_TOKEN_RESERVES": 0x08,
"VIRTUAL_SOL_RESERVES": 0x10,
"REAL_TOKEN_RESERVES": 0x18,
"REAL_SOL_RESERVES": 0x20,
"TOKEN_TOTAL_SUPPLY": 0x28,
"COMPLETE": 0x30,
}

--- Bonding Curve Şeması ---

PUMP_CURVE_DATA_LAYOUT = CStruct(
"signature" / U8[8],
"padding1" / U8[PUMP_CURVE_STATE_FIELD_OFFSETS["VIRTUAL_TOKEN_RESERVES"] - 8],
"virtualTokenReserves" / U64,
"virtualSolReserves" / U64,
"realTokenReserves" / U64,
"realSolReserves" / U64,
"tokenTotalSupply" / U64,
"complete" / Bool,
)

--- Program Derived Address (PDA) hesaplama ---

async def find_pump_curve_address(mint_address: Pubkey, program_address: Pubkey):
seeds = [b"bonding-curve", bytes(mint_address)]
program_derived_address, bump_seed = Pubkey.find_program_address(seeds, program_address)
return program_derived_address

async def get_pump_curve_state(conn: AsyncClient, curve_address: Pubkey):
try:
resp = await conn.get_account_info(curve_address)
value = resp.value
if not value or not value.data:
raise ValueError(f"Curve hesabı verisi alınamadı: {curve_address}")

    data_raw = value.data

    if isinstance(data_raw, list) and len(data_raw) == 2:
        b64_data = base64.b64decode(data_raw[0])
    elif isinstance(data_raw, str):
        b64_data = base64.b64decode(data_raw)
    elif isinstance(data_raw, bytes):
        b64_data = data_raw
    else:
        raise ValueError(f"Hesap verisi beklenmeyen formatta: {type(data_raw)}")

    if b64_data[:8] != PUMP_CURVE_STATE_IDENTIFIER:
        raise ValueError("Geçersiz imza, Pump.fun curve değil.")

    parsed_data = PUMP_CURVE_DATA_LAYOUT.parse(b64_data)
    return {
        "virtualTokenReserves": parsed_data.virtualTokenReserves,
        "virtualSolReserves": parsed_data.virtualSolReserves,
        "realTokenReserves": parsed_data.realTokenReserves,
        "realSolReserves": parsed_data.realSolReserves,
        "tokenTotalSupply": parsed_data.tokenTotalSupply,
        "complete": parsed_data.complete,
    }
except Exception as e:
    print(f"Hata oluştu (get_pump_curve_state): {e}")
    return None

def calculate_pump_curve_price(curve_state):
if not curve_state:
return None

getcontext().prec = 18  # Yüksek hassasiyet

virtual_token_reserves = Decimal(curve_state["virtualTokenReserves"])
virtual_sol_reserves = Decimal(curve_state["virtualSolReserves"])

sol_decimals = Decimal(1_000_000_000)
token_scaling_factor = Decimal(10 ** PUMP_CURVE_TOKEN_DECIMALS_COUNT)

if virtual_token_reserves <= 0 or virtual_sol_reserves <= 0:
    print("Uyarı: Geçersiz rezerv verisi (sıfır veya negatif). Fiyat hesaplanamıyor.")
    return None

try:
    price = (virtual_sol_reserves / sol_decimals) / (virtual_token_reserves / token_scaling_factor)
    return float(price)
except Exception as e:
    print(f"Hata oluştu (calculate_pump_curve_price): {e}")
    return None

async def main():
conn = AsyncClient(HELIUS_RPC_ENDPOINT)
mint_pubkey = Pubkey.from_string(MINT_ADDRESS_STR)
pump_program_pubkey = Pubkey.from_string(PUMP_PROGRAM_ADDRESS_STR)

pump_curve_address = await find_pump_curve_address(mint_pubkey, pump_program_pubkey)
print(f"Bulunan Bonding Curve Adresi: {pump_curve_address}")

while True:
    curve_state = await get_pump_curve_state(conn, pump_curve_address)
    if curve_state:
        price = calculate_pump_curve_price(curve_state)
        if price is not None:
            print(f"Fiyatı: {price:.10f} SOL")
    await asyncio.sleep(1)

if name == "main":
asyncio.run(main())`

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment