Last active
December 6, 2020 12:26
-
-
Save Zwork101/b36a05b336f146e1afcde5f905c260e2 to your computer and use it in GitHub Desktop.
steam bot
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import asyncpg | |
import asyncio | |
class DB: | |
def __init__(self): | |
pass | |
def get_credits(self, id: str): | |
pass |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
{ | |
"steamid": "", | |
"shared_secret": "", | |
"identity_secret": "", | |
"bp_key": "" | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import gevent.monkey | |
gevent.monkey.patch_socket() | |
gevent.monkey.patch_ssl() | |
import gevent | |
from utils import ItemManager | |
from steam.client import SteamClient | |
from steampy.client import SteamClient as TradeClient | |
from steampy.guard import generate_one_time_code | |
from steampy.models import Asset, GameOptions | |
import json | |
import time | |
client = SteamClient() | |
trade_client = TradeClient("apikey") | |
key = None | |
HELP_MSG = """ | |
List of commands: | |
$help (displays this message) | |
$key-buy (displays price for buying keys from the bot) | |
$key-sell (displays price for selling key to the bot) | |
$stock (displays amount of keys, and amount of pure in bot) | |
$alert [msg] (sends a message to the bot's admin Zwork101 | |
$buy [amount] (sends a trade depending on the amount) | |
$sell [amount] (sends a trade depending on the amount) | |
""" | |
STOCK_MSG = "We have {} keys in stock!" | |
SELL_MSG = "We are buying keys for {}" | |
BUY_MSG = "We are selling keys for {}" | |
BUY_INV = "Invalid use of command, please do something like $buy 7" | |
SELL_INV = "Invalid use of command, please do something like $sell 3" | |
OUT_OF_STOCK_US = "Sorry, we only have {} keys, you wanted {} keys." | |
OUT_OF_STOCK_THEM = "Sorry, you ony have {} keys, you wanted {} keys." | |
ESCROW = "Sorry, we don't trade with people that have escrow,\ntry again when it's over!" | |
THANK = "Offer sent! Thank you for using me, have a great day!" | |
@client.on(client.EVENT_CHAT_MESSAGE) | |
def handle_chat_message(friend, msg): | |
if msg.startswith('$'): | |
cmd = msg[1:] | |
if cmd.startswith("help"): | |
friend.send_message(HELP_MSG) | |
elif cmd == "test": | |
pass | |
elif cmd.startswith("stock"): | |
amount = len(trade_client.get_my_keys()) | |
friend.send_message(STOCK_MSG.format(amount)) | |
elif cmd.startswith("key-buy"): | |
amount = trade_client.get_key_price() | |
friend.send_message(BUY_MSG.format(amount)) | |
elif cmd.startswith("key-sell"): | |
amount = trade_client.key_buy_price() | |
friend.send_message(SELL_MSG.format(amount)) | |
elif cmd.startswith("buy"): | |
print("Buying...") | |
cmds = msg.split(' ') | |
amount = trade_client.get_my_keys() | |
if len(cmds) != 2: | |
friend.send_message(BUY_INV) | |
elif not cmds[1].isdigit(): | |
friend.send_message(BUY_INV) | |
elif len(amount) < int(cmds[1]): | |
friend.send_message(OUT_OF_STOCK_US.format(len(amount), int(cmds[1]))) | |
elif trade_client.check_escrow(friend.steam_id.as_32): | |
friend.send_message(ESCROW) | |
else: | |
friend.send_message("Working...") | |
price = 0 | |
for i in range(int(cmds[1])): | |
print("Adding value") | |
price = trade_client.add(price, trade_client.key_buy_price()) | |
inv = trade_client.manager.get_partner_inventory(friend.steam_id.as_64, GameOptions.TF2) | |
resp = trade_client.get_price(price, inv) | |
if not resp[0]: | |
friend.send_message(resp[1]) | |
else: | |
mine = [Asset(item['assetid'], GameOptions.TF2) for item in amount[:int(cmds[1])]] | |
theirs = [Asset(item['assetid'], GameOptions.TF2) for item in resp[1]] | |
resp = trade_client.manager.make_offer(mine, theirs, friend.steam_id.as_64, f"Thank you for buying {cmds[1]} key(s)") | |
if "exceed the maximum number of items" in resp.get('strError', ''): | |
friend.send_message("Sorry, my backpack is full, maybe sell some keys, or come back later.") | |
print("INVENTORY FULL") | |
else: | |
friend.send_message(THANK) | |
elif cmd.startswith("sell"): | |
print("Selling...") | |
cmds = msg.split(' ') | |
amount = trade_client.get_their_keys(friend.steam_id) | |
if len(cmds) != 2: | |
friend.send_message(SELL_INV) | |
elif not cmds[1].isdigit(): | |
friend.send_message(SELL_INV) | |
elif len(amount) < int(cmds[1]): | |
friend.send_message(OUT_OF_STOCK_US.format(len(amount), int(cmds[1]))) | |
elif trade_client.check_escrow(friend.steam_id.as_32): | |
friend.send_message(ESCROW) | |
else: | |
friend.send_message("Working...") | |
price = 0 | |
for i in range(int(cmds[1])): | |
print("Adding value") | |
price = trade_client.add(price, trade_client.get_key_price()) | |
resp = trade_client.get_price(price, trade_client.inv()) | |
if not resp[0]: | |
friend.send_message(resp[1]) | |
else: | |
theirs = [Asset(item['assetid'], GameOptions.TF2) for item in amount[:int(cmds[1])]] | |
mine = [Asset(item['assetid'], GameOptions.TF2) for item in resp[1]] | |
resp = trade_client.manager.make_offer(mine, theirs, friend.steam_id.as_64, | |
f"Thank you for selling {cmds[1]} key(s)") | |
if "exceed the maximum number of items" in resp.get('strError', ''): | |
friend.send_message("Sorry, my backpack is full, maybe sell some keys, or come back later.") | |
print("INVENTORY FULL") | |
elif "strError" in resp: | |
print(resp) | |
friend.send_message("An error occurred. This incident has been reported.") | |
else: | |
friend.send_message(THANK) | |
@client.friends.on(client.friends.EVENT_FRIEND_INVITE) | |
def handle_friend_invite(friend): | |
print("Received friend invite from: %s (%s)" % (repr(friend.name), friend.steam_id.community_url)) | |
client.friends.add(friend.steam_id) | |
@client.friends.on(client.friends.EVENT_FRIEND_NEW) | |
def handle_new_friend(friend): | |
print("New friend: %s (%s)" % (repr(friend.name), friend.steam_id.community_url)) | |
friend.send_message("Welcome!\nType $help for a list of commands!\n($ is the prefix for all commands)") | |
@client.on(client.EVENT_LOGGED_ON) | |
def logged_in(): | |
global trade_client | |
print("+---------------------------------+") | |
print("| Logged in! |") | |
print(f"| Username: {client.username}{' '*(22-len(client.username))}|") | |
print(f"| Steam ID: {client.steam_id}{' '*(22-len(str(client.steam_id)))}|") | |
print("+---------------------------------+") | |
print("Waiting, delay until next login") | |
gevent.sleep(120) | |
trade_client.login("username", "password", "guard.json") | |
print("Logged in to web steam") | |
trade_client = ItemManager(trade_client, key) | |
with open('guard.json') as guard: | |
creds = json.loads(guard.read()) | |
key = creds["bp_key"] | |
code = generate_one_time_code(creds["shared_secret"], int(time.time())) | |
print("Using code: " + code) | |
client.login("username", "password", two_factor_code=code) | |
client.run_forever() |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from steampy.client import SteamClient | |
from steampy.utils import GameOptions | |
import requests | |
import time | |
import re | |
def cache(timeout): | |
def func_wrap(func): | |
def wrapper(*args, **kwargs): | |
self = args[0] | |
if func.__name__ in self.cache_funcs: | |
if int(time.time() - self.cache_funcs[func.__name__][0]) > timeout: | |
return func(*args, **kwargs) | |
return self.cache_funcs[func.__name__][1] | |
self.cache_funcs[func.__name__] = [time.time(), func(*args, **kwargs)] | |
return self.cache_funcs[func.__name__][1] | |
return wrapper | |
return func_wrap | |
class ItemManager: | |
def __init__(self, client: SteamClient, key: str): | |
self.manager = client | |
self.cache_funcs = {} | |
self.key = key | |
def inv(self): | |
return self.manager.get_my_inventory(GameOptions.TF2) | |
def get_my_keys(self): | |
keys = [] | |
for asset, values in self.inv().items(): | |
if values['market_hash_name'] == "Mann Co. Supply Crate Key": | |
values["assetid"] = str(asset) | |
keys.append(values) | |
return keys | |
@cache(300) | |
def get_key_price(self): | |
resp = requests.get("http://backpack.tf/api/IGetCurrencies/v1", params={"key": self.key}) | |
return resp.json()['response']['currencies']["keys"]["price"]["value"] | |
def key_buy_price(self): | |
price = self.get_key_price() + 1 | |
return self.add(price, .11) | |
def check_escrow(self, accountid: int): | |
url = "https://steamcommunity.com/tradeoffer/new/?partner=" + str(accountid) | |
page = self.manager._session.get(url) | |
if re.match("var g_daysTheirEscrow = (\d+);", page.text) is None: | |
return False | |
return True | |
@staticmethod | |
def get_ref(inv): | |
ref = [] | |
for asset, values in inv.items(): | |
if values['market_hash_name'] == "Refined Metal": | |
values["assetid"] = str(asset) | |
ref.append(values) | |
return ref | |
@staticmethod | |
def get_rec(inv): | |
rec = [] | |
for asset, values in inv.items(): | |
if values['market_hash_name'] == "Reclaimed Metal": | |
values["assetid"] = str(asset) | |
rec.append(values) | |
return rec | |
@staticmethod | |
def get_scrap(inv): | |
scraps = [] | |
for asset, values in inv.items(): | |
if values['market_hash_name'] == "Scrap Metal": | |
values["assetid"] = str(asset) | |
scraps.append(values) | |
return scraps | |
@staticmethod | |
def add(num1, num2): | |
num1, num2 = float(num1), float(num2) | |
integer = int(num1) + int(num2) | |
num1 = int(("%.2f" % (num1 - int(num1)))[len("%.2f" % (num1 - int(num1)))-2:]) | |
num2 = int(("%.2f" % (num2 - int(num2)))[len("%.2f" % (num2 - int(num2)))-2:]) | |
while num2: | |
num2 -= 11 | |
if num1 == 88: | |
integer += 1 | |
num1 = 0 | |
num1 += 11 | |
return float("{}.{}".format(integer, num1)) | |
@staticmethod | |
def sub(num1, num2): | |
num1, num2 = float(num1), float(num2) | |
int1, int2 = int(num1), int(num2) | |
num1 = int(("%.2f" % (num1 - int(num1)))[len("%.2f" % (num1 - int(num1))) - 2:]) | |
num2 = int(("%.2f" % (num2 - int(num2)))[len("%.2f" % (num2 - int(num2))) - 2:]) | |
while num2: | |
if num1 == 0: | |
num1 = 88 | |
int2 += 1 | |
else: | |
num1 -= 11 | |
num2 -= 11 | |
return float("{}.{}".format(int1 - int2, num1)) | |
def get_price(self, price, inv_other): | |
print(price) | |
items = [] | |
ref = self.get_ref(inv_other) | |
rec = self.get_rec(inv_other) | |
scrap = self.get_scrap(inv_other) | |
while True: | |
if price - 1 < 0 or len(ref) <= 0: | |
break | |
print("Added Ref", price) | |
items.append(ref.pop(0)) | |
price = self.sub(price, 1) | |
while True: | |
if self.sub(price, .33) < 0 or len(rec) <= 0: | |
break | |
print("Added Rec", price) | |
items.append(rec.pop(0)) | |
price = self.sub(price, .33) | |
while True: | |
if self.sub(price, .11) < 0 or len(scrap) <= 0: | |
break | |
print("Added Scrap", price) | |
items.append(scrap.pop(0)) | |
price = self.sub(price, .11) | |
print(price) | |
print(len(ref + rec + scrap)) | |
if price <= 0: | |
return True, items | |
elif len(ref + rec + scrap) == 0: | |
return False, "You don't have enough metal!" | |
else: | |
return False, "You need change, unfortunately, I don't know how to do that. So if you could convert some" \ | |
" ref into scrap, that would be great!" | |
def get_their_keys(self, id): | |
keys = [] | |
inv = self.manager.get_partner_inventory(id.as_64, GameOptions.TF2) | |
for asset, values in inv.items(): | |
if values['market_hash_name'] == "Mann Co. Supply Crate Key": | |
values["assetid"] = str(asset) | |
keys.append(values) | |
return keys | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment