Created
March 22, 2023 23:02
-
-
Save oguzhanmeteozturk/e3d33d1948a1a032632f779b08e7e958 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from realtime.connection import Socket | |
from realtime.channel import Channel | |
def getURL(host: str, port: int, service_key: str): | |
return f"ws://{host}:{port}/realtime/v1/websocket?apikey={service_key}&vsn=1.0.0" | |
class EventTypes: | |
RAW_INSERT: str = "INSERT" | |
ITEM_METADATA_UPDATED: str = "item_metadata_updated" | |
ITEM_LISTED: str = "item_listed" | |
ITEM_SOLD: str = "item_sold" | |
ITEM_TRANSFERRED: str = "item_transferred" | |
ITEM_RECEIVED_OFFER: str = "item_received_offer" | |
ITEM_RECEIVED_BID: str = "item_received_bid" | |
ITEM_CANCELLED: str = "item_cancelled" | |
class Logger: | |
def __init__(self, message: str) -> None: | |
self.message = message | |
def debug(self): | |
print(f"[DEBUG]: {self.message}") | |
def info(self): | |
print(f"[INFO]: {self.message}") | |
def warn(self): | |
print(f"[WARN]: {self.message}") | |
def error(self): | |
print(f"[ERROR]: {self.message}") | |
class Hook: | |
def FlowUpdated(self, payload: dict): | |
try: | |
if flow_id := payload["payload"]["new"]["id"]: | |
return flow_id | |
except Exception as e: | |
Logger(f"No matching Hook{e}").error() | |
class Webhook: | |
OPENSEA_LOGO_URL = "https://pbs.twimg.com/profile_images/1544105652330631168/ZuvjfGkT_400x400.png" | |
FOOTER = "Made by jerem.#3609" | |
def __init__(self, webhook: str, color: str = "03a9fc", thumbnail: str = None, footer: str = FOOTER): | |
self.webhook = webhook | |
self.color = color | |
self.thumbnail = thumbnail | |
self.footer = footer | |
def ItemMetadataUpdated(self, payload: dict): | |
try: | |
embed = DiscordEmbed(title="New Metadata Updated !", color=self.color) | |
if name := payload["payload"]["item"]["metadata"].get("name"): | |
embed.add_embed_field(name="Name :", value=str(name)) | |
if chain_name := payload["payload"]["item"]["chain"].get("name"): | |
embed.add_embed_field(name="Chain :", value=str(chain_name)) | |
if image_url := payload["payload"]["item"]["metadata"].get("image_url"): | |
embed.set_thumbnail(url=str(image_url)) | |
elif self.thumbnail: | |
embed.set_thumbnail(url=str(self.thumbnail)) | |
else: | |
embed.set_thumbnail(url=str(self.OPENSEA_LOGO_URL)) | |
links = "" | |
if metadata_url := payload["payload"]["item"]["metadata"].get("metadata_url"): | |
links += f"[Metadata]({metadata_url}) • " | |
if permalink := payload["payload"]["item"].get("permalink"): | |
links += f"[Asset]({permalink})" | |
if links != "": | |
embed.add_embed_field(name="Links :", value=str(links), inline=False) | |
if self.footer: | |
embed.set_footer(text=str(self.footer)) | |
return embed | |
except Exception as e: | |
Logger(f"Failed to build embed {e}").error() | |
def ItemCancelled(self, payload: dict): | |
try: | |
embed = DiscordEmbed(title="New Item Cancelled !", color=self.color) | |
if name := payload["payload"]["item"]["metadata"].get("name"): | |
embed.add_embed_field(name="Name :", value=name) | |
if base_price := payload["payload"].get("base_price"): | |
embed.add_embed_field(name="Price :", value=str(int(base_price) / 10**18)) | |
if chain_name := payload["payload"]["item"]["chain"].get("name"): | |
embed.add_embed_field(name="Chain :", value=chain_name) | |
if image_url := payload["payload"]["item"]["metadata"].get("image_url"): | |
embed.set_thumbnail(url=image_url) | |
elif self.thumbnail: | |
embed.set_thumbnail(url=self.thumbnail) | |
else: | |
embed.set_thumbnail(url=self.OPENSEA_LOGO_URL) | |
links = "" | |
if payload["payload"].get("transaction") and payload["payload"]["transaction"].get("hash"): | |
hash_ = payload["payload"]["transaction"]["hash"] | |
links += f"[Order](https://etherscan.io/tx/{hash_}) • " | |
if maker := payload["payload"]["maker"].get("address"): | |
links += f"[Maker](https://etherscan.io/address/{maker}) • " | |
if metadata_url := payload["payload"]["item"]["metadata"].get("metadata_url"): | |
links += f"[Metadata]({metadata_url}) • " | |
if permalink := payload["payload"]["item"].get("permalink"): | |
links += f"[Asset]({permalink})" | |
if links != "": | |
embed.add_embed_field(name="Links :", value=str(links), inline=False) | |
if self.footer: | |
embed.set_footer(text=str(self.footer)) | |
return embed | |
except Exception as e: | |
Logger(f"Failed to build embed {e}").error() | |
def ItemListed(self, payload: dict): | |
try: | |
embed = DiscordEmbed(title="New Item Listed !", color=self.color) | |
if name := payload["payload"]["item"]["metadata"].get("name"): | |
embed.add_embed_field(name="Name :", value=name) | |
if base_price := payload["payload"].get("base_price"): | |
embed.add_embed_field(name="Price :", value=str(int(base_price) / 10**18)) | |
if chain_name := payload["payload"]["item"]["chain"].get("name"): | |
embed.add_embed_field(name="Chain :", value=chain_name) | |
if image_url := payload["payload"]["item"]["metadata"].get("image_url"): | |
embed.set_thumbnail(url=image_url) | |
elif self.thumbnail: | |
embed.set_thumbnail(url=self.thumbnail) | |
else: | |
embed.set_thumbnail(url=self.OPENSEA_LOGO_URL) | |
links = "" | |
if maker := payload["payload"]["maker"].get("address"): | |
links += f"[Maker](https://etherscan.io/address/{maker}) • " | |
if metadata_url := payload["payload"]["item"]["metadata"].get("metadata_url"): | |
links += f"[Metadata]({metadata_url}) • " | |
if permalink := payload["payload"]["item"].get("permalink"): | |
links += f"[Asset]({permalink})" | |
if links != "": | |
embed.add_embed_field(name="Links :", value=str(links), inline=False) | |
if self.footer: | |
embed.set_footer(text=str(self.footer)) | |
return embed | |
except Exception as e: | |
Logger(f"Failed to build embed {e}").error() | |
def ItemSold(self, payload: dict): | |
try: | |
embed = DiscordEmbed(title="New Item Sold !", color=self.color) | |
if name := payload["payload"]["item"]["metadata"].get("name"): | |
embed.add_embed_field(name="Name :", value=name) | |
if sale_price := payload["payload"].get("sale_price"): | |
embed.add_embed_field(name="Price :", value=str(int(sale_price) / 10**18)) | |
if chain_name := payload["payload"]["item"]["chain"].get("name"): | |
embed.add_embed_field(name="Chain :", value=chain_name) | |
if image_url := payload["payload"]["item"]["metadata"].get("image_url"): | |
embed.set_thumbnail(url=image_url) | |
elif self.thumbnail: | |
embed.set_thumbnail(url=self.thumbnail) | |
else: | |
embed.set_thumbnail(url=self.OPENSEA_LOGO_URL) | |
links = "" | |
if payload["payload"].get("transaction") and payload["payload"]["transaction"].get("hash"): | |
hash_ = payload["payload"]["transaction"]["hash"] | |
links += f"[Order](https://etherscan.io/tx/{hash_}) • " | |
if maker := payload["payload"]["maker"].get("address"): | |
links += f"[Maker](https://etherscan.io/address/{maker}) • " | |
if taker := payload["payload"]["taker"].get("address"): | |
links += f"[Taker](https://etherscan.io/address/{taker}) • " | |
if metadata_url := payload["payload"]["item"]["metadata"].get("metadata_url"): | |
links += f"[Metadata]({metadata_url}) • " | |
if permalink := payload["payload"]["item"].get("permalink"): | |
links += f"[Asset]({permalink})" | |
if links != "": | |
embed.add_embed_field(name="Links :", value=str(links), inline=False) | |
if self.footer: | |
embed.set_footer(text=str(self.footer)) | |
return embed | |
except Exception as e: | |
Logger(f"Failed to build embed {e}").error() | |
def ItemTransferred(self, payload: dict): | |
try: | |
embed = DiscordEmbed(title="New Item Transferred !", color=self.color) | |
if name := payload["payload"]["item"]["metadata"].get("name"): | |
embed.add_embed_field(name="Name :", value=name) | |
if sale_price := payload["payload"].get("sale_price"): | |
embed.add_embed_field(name="Price :", value=str(int(sale_price) / 10**18)) | |
if chain_name := payload["payload"]["item"]["chain"].get("name"): | |
embed.add_embed_field(name="Chain :", value=chain_name) | |
if quantity := payload["payload"].get("quantity"): | |
embed.add_embed_field(name="Quantity :", value=str(quantity)) | |
if image_url := payload["payload"]["item"]["metadata"].get("image_url"): | |
embed.set_thumbnail(url=image_url) | |
elif self.thumbnail: | |
embed.set_thumbnail(url=self.thumbnail) | |
else: | |
embed.set_thumbnail(url=self.OPENSEA_LOGO_URL) | |
links = "" | |
if payload["payload"].get("transaction") and payload["payload"]["transaction"].get("hash"): | |
hash_ = payload["payload"]["transaction"]["hash"] | |
links += f"[Order](https://etherscan.io/tx/{hash_}) • " | |
if maker := payload["payload"]["from_account"].get("address"): | |
links += f"[Maker](https://etherscan.io/address/{maker}) • " | |
if taker := payload["payload"]["to_account"].get("address"): | |
links += f"[Taker](https://etherscan.io/address/{taker}) • " | |
if metadata_url := payload["payload"]["item"]["metadata"].get("metadata_url"): | |
links += f"[Metadata]({metadata_url}) • " | |
if permalink := payload["payload"]["item"].get("permalink"): | |
links += f"[Asset]({permalink})" | |
if links != "": | |
embed.add_embed_field(name="Links :", value=str(links), inline=False) | |
if self.footer: | |
embed.set_footer(text=str(self.footer)) | |
return embed | |
except Exception as e: | |
Logger(f"Failed to build embed {e}").error() | |
def ItemReceivedOffer(self, payload: dict): | |
print(payload) | |
try: | |
embed = DiscordEmbed(title="New Item Received Offer !", color=self.color) | |
if name := payload["payload"]["item"]["metadata"].get("name"): | |
embed.add_embed_field(name="Name :", value=name) | |
if base_price := payload["payload"].get("base_price"): | |
embed.add_embed_field(name="Price :", value=str(int(base_price) / 10**18)) | |
if chain_name := payload["payload"]["item"]["chain"].get("name"): | |
embed.add_embed_field(name="Chain :", value=chain_name) | |
if quantity := payload["payload"].get("quantity"): | |
embed.add_embed_field(name="Quantity :", value=str(quantity)) | |
if image_url := payload["payload"]["item"]["metadata"].get("image_url"): | |
embed.set_thumbnail(url=image_url) | |
elif self.thumbnail: | |
embed.set_thumbnail(url=self.thumbnail) | |
else: | |
embed.set_thumbnail(url=self.OPENSEA_LOGO_URL) | |
links = "" | |
if maker := payload["payload"]["maker"].get("address"): | |
links += f"[Maker](https://etherscan.io/address/{maker}) • " | |
if payload["payload"].get("taker") and payload["payload"]["taker"]["address"]: | |
taker = payload["payload"]["taker"]["address"] | |
links += f"[Taker](https://etherscan.io/address/{taker}) • " | |
if metadata_url := payload["payload"]["item"]["metadata"].get("metadata_url"): | |
links += f"[Metadata]({metadata_url}) • " | |
if permalink := payload["payload"]["item"].get("permalink"): | |
links += f"[Asset]({permalink})" | |
if links != "": | |
embed.add_embed_field(name="Links :", value=str(links), inline=False) | |
if self.footer: | |
embed.set_footer(text=str(self.footer)) | |
return embed | |
except Exception as e: | |
Logger(f"Failed to build embed {e}").error() | |
def ItemReceivedBid(self, payload: dict): | |
try: | |
embed = DiscordEmbed(title="New Item Receive Bid !", color=self.color) | |
if name := payload["payload"]["item"]["metadata"].get("name"): | |
embed.add_embed_field(name="Name :", value=name) | |
if base_price := payload["payload"].get("base_price"): | |
embed.add_embed_field(name="Price :", value=str(int(base_price) / 10**18)) | |
if chain_name := payload["payload"]["item"]["chain"].get("name"): | |
embed.add_embed_field(name="Chain :", value=chain_name) | |
if quantity := payload["payload"].get("quantity"): | |
embed.add_embed_field(name="Quantity :", value=str(quantity)) | |
if image_url := payload["payload"]["item"]["metadata"].get("image_url"): | |
embed.set_thumbnail(url=image_url) | |
elif self.thumbnail: | |
embed.set_thumbnail(url=self.thumbnail) | |
else: | |
embed.set_thumbnail(url=self.OPENSEA_LOGO_URL) | |
links = "" | |
if maker := payload["payload"]["maker"].get("address"): | |
links += f"[Maker](https://etherscan.io/address/{maker}) • " | |
if payload["payload"].get("taker") and payload["payload"]["taker"]["address"]: | |
taker = payload["payload"]["taker"]["address"] | |
links += f"[Taker](https://etherscan.io/address/{taker}) • " | |
if metadata_url := payload["payload"]["item"]["metadata"].get("metadata_url"): | |
links += f"[Metadata]({metadata_url}) • " | |
if permalink := payload["payload"]["item"].get("permalink"): | |
links += f"[Asset]({permalink})" | |
if links != "": | |
embed.add_embed_field(name="Links :", value=str(links), inline=False) | |
if self.footer: | |
embed.set_footer(text=str(self.footer)) | |
return embed | |
except Exception as e: | |
Logger(f"Failed to build embed {e}").error() | |
def send(self, payload: dict): | |
try: | |
if payload.get("event_type") == EventTypes.ITEM_METADATA_UPDATED: | |
embed = self.ItemMetadataUpdated(payload) | |
elif payload.get("event_type") == EventTypes.ITEM_CANCELLED: | |
embed = self.ItemCancelled(payload) | |
elif payload.get("event_type") == EventTypes.ITEM_LISTED: | |
embed = self.ItemListed(payload) | |
elif payload.get("event_type") == EventTypes.ITEM_SOLD: | |
embed = self.ItemSold(payload) | |
elif payload.get("event_type") == EventTypes.ITEM_TRANSFERRED: | |
embed = self.ItemTransferred(payload) | |
elif payload.get("event_type") == EventTypes.ITEM_RECEIVED_OFFER: | |
embed = self.ItemReceivedOffer(payload) | |
elif payload.get("event_type") == EventTypes.ITEM_RECEIVED_BID: | |
embed = self.ItemReceivedBid(payload) | |
else: | |
Logger("Failed to get event type: " + str(payload.get("event_type"))).error() | |
return | |
self.webhook.remove_embeds() | |
self.webhook.add_embed(embed=embed) | |
self.webhook.execute() | |
except Exception as e: | |
Logger(f"Failled to execute webhook : {e}").error() | |
class SupabaseClient: | |
def __init__(self, url: str): | |
# socketUrl = f"{network}?token={api_key}" | |
self.socket = Socket(url) | |
self.connected = False | |
def connect(self): | |
try: | |
Logger("Connecting to socket").debug() | |
self.socket.connect() | |
self.connected = True | |
except Exception as e: | |
Logger(f"Failed to connect to socket : {e}").error() | |
def createChannel(self, collection_slug: str): | |
try: | |
channel: Channel = self.socket.set_channel(f"realtime:{collection_slug}") | |
channel.join() | |
Logger(f"Successfully joined channel {collection_slug}").info() | |
return channel | |
except Exception as e: | |
Logger(f"Failed to join channel : {e}").error() | |
def getChannels(self, collection_slug: str): | |
try: | |
if collection_slug not in list(self.socket.channels): | |
Logger(f"Creating channel for topic: {collection_slug}").debug() | |
return self.createChannel(collection_slug) | |
Logger(f"Channel for topic: {collection_slug} already exist").warn() | |
except Exception as e: | |
Logger(f"Failed to create channel for topic {collection_slug} : {e}").error() | |
def on(self, event_type: EventTypes, collection_slug: str, callback): | |
try: | |
if not self.connected: | |
self.socket.connect() | |
self.connected = True | |
channel = self.getChannels(collection_slug) | |
Logger(f"Subscribing to {event_type} events on {collection_slug}").info() | |
channel.on(event_type, callback) | |
except Exception as e: | |
Logger(e).error() | |
def startListening(self): | |
try: | |
Logger(f"Channel joined : {list(self.socket.channels)}").info() | |
Logger("Listenning").warn() | |
self.socket.listen() | |
except Exception as e: | |
Logger(e).error() | |
def onItemMetadataUpdated(self, collection_slug: str, callback): | |
try: | |
Logger(f"Listening for item metadata updates on {collection_slug}").debug() | |
return self.on(EventTypes.ITEM_METADATA_UPDATED, collection_slug, callback) | |
except Exception as e: | |
Logger(e).error() | |
def onEvents(self, slugs: list, event_type: EventTypes, callback): | |
try: | |
if type(slugs) != list: | |
Logger("slugs params must be a list").error() | |
return | |
if type(event_type) != list: | |
Logger("slugs params must be a list").error() | |
return | |
for slug in slugs: | |
for event in event_type: | |
self.on(event, slug, callback) | |
except Exception as e: | |
Logger(e).error() | |
nodes = [] | |
links = [] | |
def callback(payload: dict): | |
def get_name_by_id(id): | |
for node in nodes: | |
if node["id"] == id: | |
return node["name"] | |
try: | |
if payload["table"] == "Flow": | |
flow_id = payload["record"]["id"] | |
print(flow_id) | |
if payload["table"] == "Node": | |
node_id = payload["record"]["id"] | |
node_name = payload["record"]["name"] | |
nodes.append({"id": node_id, "name": node_name}) | |
print("Nodes:", nodes) | |
if payload["table"] == "Edge": | |
# edge_id = payload["record"]["id"] | |
edge_source = payload["record"]["sourceId"] | |
edge_target = payload["record"]["targetId"] | |
links.append({"source": get_name_by_id(edge_source), "target": get_name_by_id(edge_target)}) | |
print("Links:", links) | |
except Exception as e: | |
print("ERROR") | |
return | |
if __name__ == "__main__": | |
from decouple import config | |
SUPA_SERVICE_KEY = config("SUPA_SERVICE_KEY") | |
HOST_PORT = 8000 | |
URL = getURL("localhost", HOST_PORT, SUPA_SERVICE_KEY) | |
Client = SupabaseClient(URL) | |
Client.connect() | |
Client.getChannels("*") | |
Client.onEvents(["*"], [EventTypes.RAW_INSERT], callback) | |
# Client.onItemListed("collection-slug", callback) | |
# Client.onItemSold("collection-slug", callback) | |
Client.startListening() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment