-
-
Save rubpy/6c57e9d12acd4b6ed84e9f205372631d to your computer and use it in GitHub Desktop.
import * as web3 from "@solana/web3.js"; | |
////////////////////////////////////////////////// | |
function readBytes(buf: Buffer, offset: number, length: number): Buffer { | |
const end = offset + length; | |
if (buf.byteLength < end) throw new RangeError("range out of bounds"); | |
return buf.subarray(offset, end); | |
} | |
function readBigUintLE(buf: Buffer, offset: number, length: number): bigint { | |
switch (length) { | |
case 1: return BigInt(buf.readUint8(offset)); | |
case 2: return BigInt(buf.readUint16LE(offset)); | |
case 4: return BigInt(buf.readUint32LE(offset)); | |
case 8: return buf.readBigUint64LE(offset); | |
} | |
throw new Error(`unsupported data size (${length} bytes)`); | |
} | |
function readBoolean(buf: Buffer, offset: number, length: number): boolean { | |
const data = readBytes(buf, offset, length); | |
for (const b of data) { | |
if (b) return true; | |
} | |
return false; | |
} | |
////////////////////////////////////////////////// | |
const PUMP_CURVE_TOKEN_DECIMALS = 6; | |
// Calculated as the first 8 bytes of: `sha256("account:BondingCurve")`. | |
const PUMP_CURVE_STATE_SIGNATURE = Uint8Array.from([0x17, 0xb7, 0xf8, 0x37, 0x60, 0xd8, 0xac, 0x60]); | |
const PUMP_CURVE_STATE_SIZE = 0x29; | |
const PUMP_CURVE_STATE_OFFSETS = { | |
VIRTUAL_TOKEN_RESERVES: 0x08, | |
VIRTUAL_SOL_RESERVES: 0x10, | |
REAL_TOKEN_RESERVES: 0x18, | |
REAL_SOL_RESERVES: 0x20, | |
TOKEN_TOTAL_SUPPLY: 0x28, | |
COMPLETE: 0x30, | |
}; | |
interface PumpCurveState { | |
virtualTokenReserves: bigint | |
virtualSolReserves: bigint | |
realTokenReserves: bigint | |
realSolReserves: bigint | |
tokenTotalSupply: bigint | |
complete: boolean | |
} | |
// Fetches account data of a Pump.fun bonding curve, and deserializes it | |
// according to `accounts.BondingCurve` (see: Pump.fun program's Anchor IDL). | |
async function getPumpCurveState(conn: web3.Connection, curveAddress: web3.PublicKey): Promise<PumpCurveState> { | |
const response = await conn.getAccountInfo(curveAddress); | |
if (!response || !response.data || response.data.byteLength < PUMP_CURVE_STATE_SIGNATURE.byteLength + PUMP_CURVE_STATE_SIZE) { | |
throw new Error("unexpected curve state"); | |
} | |
const idlSignature = readBytes(response.data, 0, PUMP_CURVE_STATE_SIGNATURE.byteLength); | |
if (idlSignature.compare(PUMP_CURVE_STATE_SIGNATURE) !== 0) { | |
throw new Error("unexpected curve state IDL signature"); | |
} | |
return { | |
virtualTokenReserves: readBigUintLE(response.data, PUMP_CURVE_STATE_OFFSETS.VIRTUAL_TOKEN_RESERVES, 8), | |
virtualSolReserves: readBigUintLE(response.data, PUMP_CURVE_STATE_OFFSETS.VIRTUAL_SOL_RESERVES, 8), | |
realTokenReserves: readBigUintLE(response.data, PUMP_CURVE_STATE_OFFSETS.REAL_TOKEN_RESERVES, 8), | |
realSolReserves: readBigUintLE(response.data, PUMP_CURVE_STATE_OFFSETS.REAL_SOL_RESERVES, 8), | |
tokenTotalSupply: readBigUintLE(response.data, PUMP_CURVE_STATE_OFFSETS.TOKEN_TOTAL_SUPPLY, 8), | |
complete: readBoolean(response.data, PUMP_CURVE_STATE_OFFSETS.COMPLETE, 1), | |
}; | |
} | |
// Calculates token price (in SOL) of a Pump.fun bonding curve. | |
function calculatePumpCurvePrice(curveState: PumpCurveState): number { | |
if (curveState === null || typeof curveState !== "object" | |
|| !(typeof curveState.virtualTokenReserves === "bigint" && typeof curveState.virtualSolReserves === "bigint")) { | |
throw new TypeError("curveState must be a PumpCurveState"); | |
} | |
if (curveState.virtualTokenReserves <= 0n || curveState.virtualSolReserves <= 0n) { | |
throw new RangeError("curve state contains invalid reserve data"); | |
} | |
return (Number(curveState.virtualSolReserves) / web3.LAMPORTS_PER_SOL) / (Number(curveState.virtualTokenReserves) / 10 ** PUMP_CURVE_TOKEN_DECIMALS); | |
} | |
////////////////////////////////////////////////// | |
(async (rpcUrl: string) => { | |
const conn = new web3.Connection(rpcUrl, "confirmed"); | |
const curveAddress = new web3.PublicKey("5BwXbPNGbfd2UuE8rkvASmJYXWXSiqmrhqJ1FX6rQnKd"); | |
const curveState = await getPumpCurveState(conn, curveAddress); | |
if (!curveState) return; | |
const tokenPriceSol = calculatePumpCurvePrice(curveState); | |
////////////////////////////////////////////////// | |
console.log("Token price:"); | |
console.log(` ${(tokenPriceSol.toFixed(10))} SOL`); | |
})(process.env.SOL_RPC_URL || "https://mainnet.helius-rpc.com/?api-key=00000000-0000-0000-0000-000000000000"); |
thankyou but why shows the value of the previous few seconds `import base64
import asyncio
from decimal import Decimal, getcontext
from solders.pubkey import Pubkey
from solana.rpc.async_api import AsyncClient
from borsh_construct import CStruct, U64, Bool, U8
PUMP_PROGRAM_ADDRESS_STR = "6EF8rrecthR5Dkzon8Nwu78hRvfCKubJ14M5uBEwF6P"
HELIUS_RPC_ENDPOINT = "https://mainnet.helius-rpc.com/?x"
MINT_ADDRESS_STR = "U6FMG6UUhVzaYfCa8h7ZrGkMyw5ipwAF9t1tQiPAMhR" # İzlenecek token mint adresi
PUMP_CURVE_TOKEN_DECIMALS_COUNT = 6
PUMP_CURVE_STATE_IDENTIFIER = bytes([0x17, 0xb7, 0xf8, 0x37, 0x60, 0xd8, 0xac, 0x60])
PUMP_CURVE_STATE_FIELD_OFFSETS = {
"VIRTUAL_TOKEN_RESERVES": 0x08,
"VIRTUAL_SOL_RESERVES": 0x10,
"REAL_TOKEN_RESERVES": 0x18,
"REAL_SOL_RESERVES": 0x20,
"TOKEN_TOTAL_SUPPLY": 0x28,
"COMPLETE": 0x30,
}
--- Bonding Curve Şeması ---
PUMP_CURVE_DATA_LAYOUT = CStruct(
"signature" / U8[8],
"padding1" / U8[PUMP_CURVE_STATE_FIELD_OFFSETS["VIRTUAL_TOKEN_RESERVES"] - 8],
"virtualTokenReserves" / U64,
"virtualSolReserves" / U64,
"realTokenReserves" / U64,
"realSolReserves" / U64,
"tokenTotalSupply" / U64,
"complete" / Bool,
)
--- Program Derived Address (PDA) hesaplama ---
async def find_pump_curve_address(mint_address: Pubkey, program_address: Pubkey):
seeds = [b"bonding-curve", bytes(mint_address)]
program_derived_address, bump_seed = Pubkey.find_program_address(seeds, program_address)
return program_derived_address
async def get_pump_curve_state(conn: AsyncClient, curve_address: Pubkey):
try:
resp = await conn.get_account_info(curve_address)
value = resp.value
if not value or not value.data:
raise ValueError(f"Curve hesabı verisi alınamadı: {curve_address}")
data_raw = value.data
if isinstance(data_raw, list) and len(data_raw) == 2:
b64_data = base64.b64decode(data_raw[0])
elif isinstance(data_raw, str):
b64_data = base64.b64decode(data_raw)
elif isinstance(data_raw, bytes):
b64_data = data_raw
else:
raise ValueError(f"Hesap verisi beklenmeyen formatta: {type(data_raw)}")
if b64_data[:8] != PUMP_CURVE_STATE_IDENTIFIER:
raise ValueError("Geçersiz imza, Pump.fun curve değil.")
parsed_data = PUMP_CURVE_DATA_LAYOUT.parse(b64_data)
return {
"virtualTokenReserves": parsed_data.virtualTokenReserves,
"virtualSolReserves": parsed_data.virtualSolReserves,
"realTokenReserves": parsed_data.realTokenReserves,
"realSolReserves": parsed_data.realSolReserves,
"tokenTotalSupply": parsed_data.tokenTotalSupply,
"complete": parsed_data.complete,
}
except Exception as e:
print(f"Hata oluştu (get_pump_curve_state): {e}")
return None
def calculate_pump_curve_price(curve_state):
if not curve_state:
return None
getcontext().prec = 18 # Yüksek hassasiyet
virtual_token_reserves = Decimal(curve_state["virtualTokenReserves"])
virtual_sol_reserves = Decimal(curve_state["virtualSolReserves"])
sol_decimals = Decimal(1_000_000_000)
token_scaling_factor = Decimal(10 ** PUMP_CURVE_TOKEN_DECIMALS_COUNT)
if virtual_token_reserves <= 0 or virtual_sol_reserves <= 0:
print("Uyarı: Geçersiz rezerv verisi (sıfır veya negatif). Fiyat hesaplanamıyor.")
return None
try:
price = (virtual_sol_reserves / sol_decimals) / (virtual_token_reserves / token_scaling_factor)
return float(price)
except Exception as e:
print(f"Hata oluştu (calculate_pump_curve_price): {e}")
return None
async def main():
conn = AsyncClient(HELIUS_RPC_ENDPOINT)
mint_pubkey = Pubkey.from_string(MINT_ADDRESS_STR)
pump_program_pubkey = Pubkey.from_string(PUMP_PROGRAM_ADDRESS_STR)
pump_curve_address = await find_pump_curve_address(mint_pubkey, pump_program_pubkey)
print(f"Bulunan Bonding Curve Adresi: {pump_curve_address}")
while True:
curve_state = await get_pump_curve_state(conn, pump_curve_address)
if curve_state:
price = calculate_pump_curve_price(curve_state)
if price is not None:
print(f"Fiyatı: {price:.10f} SOL")
await asyncio.sleep(1)
if name == "main":
asyncio.run(main())`
Thanks for this. Just tested and I can guarantee that the SOL price is correct. Just make sure that you are using the token's bonding curve address and not the token address itself.