Skip to content

Instantly share code, notes, and snippets.

@LaurentiuGabriel
Last active December 11, 2024 08:34
Show Gist options
  • Save LaurentiuGabriel/1ee1b20becd101504a81b6340b3b2bf7 to your computer and use it in GitHub Desktop.
Save LaurentiuGabriel/1ee1b20becd101504a81b6340b3b2bf7 to your computer and use it in GitHub Desktop.
Alpaca Trading
import alpaca_trade_api as tradeapi
import time
from datetime import datetime, timedelta
import pandas_ta as taa
# Alpaca API credentials (paper account)
API_KEY = ""
API_SECRET = ""
BASE_URL = "https://api.alpaca.markets"
# Initialize Alpaca API
api = tradeapi.REST(API_KEY, API_SECRET, BASE_URL, api_version='v2')
# Trading configuration
SYMBOL = 'BTC/USD'
ORDER_SIZE = 0.00005 # Order size in BTC
CHECK_INTERVAL = 60 # Check interval in seconds
SPREAD_BUFFER = 0.2 # Spread buffer percentage
def fetch_recent_bars(symbol, timeframe='1Min', limit=1):
"""Fetch the most recent bar data for the symbol."""
try:
bars = api.get_crypto_bars(symbol, timeframe).df
return bars.tail(limit)
except Exception as e:
print(f"Error fetching bars: {e}")
return None
def calculate_bid_ask(bars):
"""Estimate bid and ask prices from bar data."""
if bars is None or bars.empty:
return None, None
latest_bar = bars.iloc[-1]
bid = latest_bar['low']
ask = latest_bar['high']
return bid, ask
def get_sma(data, symbol):
data.ta.ema(close='close', length=50, append=True)
data.ta.ema(close='close', length=20, append=True)
crossover_signal = data['EMA_50'] < data['EMA_20']
signal = (
crossover_signal
)
return 'buy' if signal.iloc[-1] else 'sell'
def calculate_stop_loss_price(buy_price, stop_loss_percentage=1.0):
"""Calculate stop-loss price based on a percentage."""
return buy_price * (1 - stop_loss_percentage / 100)
def place_limit_order(symbol, qty, side, price):
"""Place a limit order."""
try:
order = api.submit_order(
symbol=symbol,
qty=qty,
side=side,
type='limit',
time_in_force='gtc',
limit_price=price
)
return order
print(f"{side.capitalize()} order placed: {order}")
except Exception as e:
print(f"Error placing {side} order: {e}")
def cancel_old_orders():
"""Cancel orders older than 2 minutes."""
try:
orders = api.list_orders(status='open')
current_time = datetime.utcnow()
for order in orders:
order_time = order.created_at.replace(tzinfo=None) # Remove timezone info for comparison
if (current_time - order_time) > timedelta(minutes=2):
print(f"Cancelling order {order.id} (older than 2 minutes)")
api.cancel_order(order.id)
except Exception as e:
print(f"Error cancelling old orders: {e}")
def place_bracket_order(symbol, qty, buy_price, sell_price, stop_loss_price):
"""Place a bracket order with profit target and stop-loss."""
try:
order = api.submit_order(
symbol=symbol,
qty=qty,
side='buy',
type='limit',
time_in_force='gtc',
limit_price=buy_price,
order_class='bracket',
take_profit={'limit_price': sell_price},
stop_loss={'stop_price': stop_loss_price}
)
print(f"Bracket order placed: {order}")
except Exception as e:
print(f"Error placing bracket order: {e}")
def fetch_data(symbol, start_date, end_date):
try:
now = datetime.utcnow()
print(now, "trying to load the data")
# Fetch historical data for the crypto
timeframe = '1Min' # Minute data
barset = api.get_crypto_bars(symbol, timeframe, start=start_date, end=end_date).df
return barset
except Exception as e:
print(f"Error in the fetch data method: {e}")
time.sleep(10)
def trade_bid_ask():
"""Execute bid-ask spread trading strategy."""
print("Starting bid-ask spread strategy...")
start_date = '2024-06-27'
end_date = datetime.now().strftime('%Y-%m-%d')
data = fetch_data(SYMBOL, start_date, end_date)
signal = get_sma(data, SYMBOL)
while True:
try:
# Cancel orders older than 5 minutes
cancel_old_orders()
bars = fetch_recent_bars(SYMBOL)
bid, ask = calculate_bid_ask(bars)
if signal == 'buy':
print("signal is to buy")
if bid is None or ask is None:
print("Could not fetch bid-ask prices. Retrying...")
time.sleep(CHECK_INTERVAL)
continue
# Calculate target prices
spread = ask - bid
print(f"Spread is: {spread}")
buy_price = bid + (spread * SPREAD_BUFFER / 100)
print(f"Buy price is: {buy_price}")
sell_price = ask - (spread * SPREAD_BUFFER / 100)
print(f"Sell price is: {sell_price}")
print(f"Bid: {bid}, Ask: {ask}, Buy Price: {buy_price}, Sell Price: {sell_price}")
if spread > 0.0:
buy_order = place_limit_order(SYMBOL, ORDER_SIZE, 'buy', buy_price)
# Step 2: Wait for the buy order to be filled
order_filled = False
wait_end_time = datetime.now() + timedelta(minutes=2)
while not order_filled and datetime.now() < wait_end_time:
order = api.get_order(buy_order.id)
if order.filled_at:
order_filled = True
place_limit_order(SYMBOL, ORDER_SIZE, 'sell', buy_price * 1.0002)
print(f"Buy order filled at {order.filled_at}.")
else:
print("Waiting for buy order to be filled...")
time.sleep(5)
# Step 3: Place a sell order
# Extract the quantity from the position
#qty_to_sell = position.qty
#api.submit_order(symbol=SYMBOL, qty=ORDER_SIZE, side='sell', type='market', time_in_force='gtc')
#stop_loss_percentage = 1.0 # Example: 1%
#stop_loss_price = calculate_stop_loss_price(buy_price, stop_loss_percentage)
#place_bracket_order(SYMBOL, ORDER_SIZE, buy_price, sell_price, stop_loss_price)
# Wait before the next iteration
time.sleep(350)
print("slept for a few minutes")
else:
print("signal is to sell")
time.sleep(CHECK_INTERVAL)
start_date = '2024-06-27'
end_date = datetime.now().strftime('%Y-%m-%d')
data = fetch_data(SYMBOL, start_date, end_date)
crossover = get_sma(data, SYMBOL)
except Exception as e:
print(e)
if __name__ == '__main__':
try:
trade_bid_ask()
except Exception as e:
print(f"Error in the main function: {e}")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment