Created
June 20, 2024 18:39
-
-
Save rubpy/6c57e9d12acd4b6ed84e9f205372631d to your computer and use it in GitHub Desktop.
Fetching Pump.fun bonding curve state and calculating price of token/SOL.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import * as web3 from "@solana/web3.js"; | |
////////////////////////////////////////////////// | |
function readBytes(buf: Buffer, offset: number, length: number): Buffer { | |
const end = offset + length; | |
if (buf.byteLength < end) throw new RangeError("range out of bounds"); | |
return buf.subarray(offset, end); | |
} | |
function readBigUintLE(buf: Buffer, offset: number, length: number): bigint { | |
switch (length) { | |
case 1: return BigInt(buf.readUint8(offset)); | |
case 2: return BigInt(buf.readUint16LE(offset)); | |
case 4: return BigInt(buf.readUint32LE(offset)); | |
case 8: return buf.readBigUint64LE(offset); | |
} | |
throw new Error(`unsupported data size (${length} bytes)`); | |
} | |
function readBoolean(buf: Buffer, offset: number, length: number): boolean { | |
const data = readBytes(buf, offset, length); | |
for (const b of data) { | |
if (b) return true; | |
} | |
return false; | |
} | |
////////////////////////////////////////////////// | |
const PUMP_CURVE_TOKEN_DECIMALS = 6; | |
// Calculated as the first 8 bytes of: `sha256("account:BondingCurve")`. | |
const PUMP_CURVE_STATE_SIGNATURE = Uint8Array.from([0x17, 0xb7, 0xf8, 0x37, 0x60, 0xd8, 0xac, 0x60]); | |
const PUMP_CURVE_STATE_SIZE = 0x29; | |
const PUMP_CURVE_STATE_OFFSETS = { | |
VIRTUAL_TOKEN_RESERVES: 0x08, | |
VIRTUAL_SOL_RESERVES: 0x10, | |
REAL_TOKEN_RESERVES: 0x18, | |
REAL_SOL_RESERVES: 0x20, | |
TOKEN_TOTAL_SUPPLY: 0x28, | |
COMPLETE: 0x30, | |
}; | |
interface PumpCurveState { | |
virtualTokenReserves: bigint | |
virtualSolReserves: bigint | |
realTokenReserves: bigint | |
realSolReserves: bigint | |
tokenTotalSupply: bigint | |
complete: boolean | |
} | |
// Fetches account data of a Pump.fun bonding curve, and deserializes it | |
// according to `accounts.BondingCurve` (see: Pump.fun program's Anchor IDL). | |
async function getPumpCurveState(conn: web3.Connection, curveAddress: web3.PublicKey): Promise<PumpCurveState> { | |
const response = await conn.getAccountInfo(curveAddress); | |
if (!response || !response.data || response.data.byteLength < PUMP_CURVE_STATE_SIGNATURE.byteLength + PUMP_CURVE_STATE_SIZE) { | |
throw new Error("unexpected curve state"); | |
} | |
const idlSignature = readBytes(response.data, 0, PUMP_CURVE_STATE_SIGNATURE.byteLength); | |
if (idlSignature.compare(PUMP_CURVE_STATE_SIGNATURE) !== 0) { | |
throw new Error("unexpected curve state IDL signature"); | |
} | |
return { | |
virtualTokenReserves: readBigUintLE(response.data, PUMP_CURVE_STATE_OFFSETS.VIRTUAL_TOKEN_RESERVES, 8), | |
virtualSolReserves: readBigUintLE(response.data, PUMP_CURVE_STATE_OFFSETS.VIRTUAL_SOL_RESERVES, 8), | |
realTokenReserves: readBigUintLE(response.data, PUMP_CURVE_STATE_OFFSETS.REAL_TOKEN_RESERVES, 8), | |
realSolReserves: readBigUintLE(response.data, PUMP_CURVE_STATE_OFFSETS.REAL_SOL_RESERVES, 8), | |
tokenTotalSupply: readBigUintLE(response.data, PUMP_CURVE_STATE_OFFSETS.TOKEN_TOTAL_SUPPLY, 8), | |
complete: readBoolean(response.data, PUMP_CURVE_STATE_OFFSETS.COMPLETE, 1), | |
}; | |
} | |
// Calculates token price (in SOL) of a Pump.fun bonding curve. | |
function calculatePumpCurvePrice(curveState: PumpCurveState): number { | |
if (curveState === null || typeof curveState !== "object" | |
|| !(typeof curveState.virtualTokenReserves === "bigint" && typeof curveState.virtualSolReserves === "bigint")) { | |
throw new TypeError("curveState must be a PumpCurveState"); | |
} | |
if (curveState.virtualTokenReserves <= 0n || curveState.virtualSolReserves <= 0n) { | |
throw new RangeError("curve state contains invalid reserve data"); | |
} | |
return (Number(curveState.virtualSolReserves) / web3.LAMPORTS_PER_SOL) / (Number(curveState.virtualTokenReserves) / 10 ** PUMP_CURVE_TOKEN_DECIMALS); | |
} | |
////////////////////////////////////////////////// | |
(async (rpcUrl: string) => { | |
const conn = new web3.Connection(rpcUrl, "confirmed"); | |
const curveAddress = new web3.PublicKey("5BwXbPNGbfd2UuE8rkvASmJYXWXSiqmrhqJ1FX6rQnKd"); | |
const curveState = await getPumpCurveState(conn, curveAddress); | |
if (!curveState) return; | |
const tokenPriceSol = calculatePumpCurvePrice(curveState); | |
////////////////////////////////////////////////// | |
console.log("Token price:"); | |
console.log(` ${(tokenPriceSol.toFixed(10))} SOL`); | |
})(process.env.SOL_RPC_URL || "https://mainnet.helius-rpc.com/?api-key=00000000-0000-0000-0000-000000000000"); |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
thankyou but why shows the value of the previous few seconds `import base64
import asyncio
from decimal import Decimal, getcontext
from solders.pubkey import Pubkey
from solana.rpc.async_api import AsyncClient
from borsh_construct import CStruct, U64, Bool, U8
PUMP_PROGRAM_ADDRESS_STR = "6EF8rrecthR5Dkzon8Nwu78hRvfCKubJ14M5uBEwF6P"
HELIUS_RPC_ENDPOINT = "https://mainnet.helius-rpc.com/?x"
MINT_ADDRESS_STR = "U6FMG6UUhVzaYfCa8h7ZrGkMyw5ipwAF9t1tQiPAMhR" # İzlenecek token mint adresi
PUMP_CURVE_TOKEN_DECIMALS_COUNT = 6
PUMP_CURVE_STATE_IDENTIFIER = bytes([0x17, 0xb7, 0xf8, 0x37, 0x60, 0xd8, 0xac, 0x60])
PUMP_CURVE_STATE_FIELD_OFFSETS = {
"VIRTUAL_TOKEN_RESERVES": 0x08,
"VIRTUAL_SOL_RESERVES": 0x10,
"REAL_TOKEN_RESERVES": 0x18,
"REAL_SOL_RESERVES": 0x20,
"TOKEN_TOTAL_SUPPLY": 0x28,
"COMPLETE": 0x30,
}
--- Bonding Curve Şeması ---
PUMP_CURVE_DATA_LAYOUT = CStruct(
"signature" / U8[8],
"padding1" / U8[PUMP_CURVE_STATE_FIELD_OFFSETS["VIRTUAL_TOKEN_RESERVES"] - 8],
"virtualTokenReserves" / U64,
"virtualSolReserves" / U64,
"realTokenReserves" / U64,
"realSolReserves" / U64,
"tokenTotalSupply" / U64,
"complete" / Bool,
)
--- Program Derived Address (PDA) hesaplama ---
async def find_pump_curve_address(mint_address: Pubkey, program_address: Pubkey):
seeds = [b"bonding-curve", bytes(mint_address)]
program_derived_address, bump_seed = Pubkey.find_program_address(seeds, program_address)
return program_derived_address
async def get_pump_curve_state(conn: AsyncClient, curve_address: Pubkey):
try:
resp = await conn.get_account_info(curve_address)
value = resp.value
if not value or not value.data:
raise ValueError(f"Curve hesabı verisi alınamadı: {curve_address}")
def calculate_pump_curve_price(curve_state):
if not curve_state:
return None
async def main():
conn = AsyncClient(HELIUS_RPC_ENDPOINT)
mint_pubkey = Pubkey.from_string(MINT_ADDRESS_STR)
pump_program_pubkey = Pubkey.from_string(PUMP_PROGRAM_ADDRESS_STR)
if name == "main":
asyncio.run(main())`