Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Save oguzhanmeteozturk/e3d33d1948a1a032632f779b08e7e958 to your computer and use it in GitHub Desktop.
Save oguzhanmeteozturk/e3d33d1948a1a032632f779b08e7e958 to your computer and use it in GitHub Desktop.
from realtime.connection import Socket
from realtime.channel import Channel
def getURL(host: str, port: int, service_key: str):
return f"ws://{host}:{port}/realtime/v1/websocket?apikey={service_key}&vsn=1.0.0"
class EventTypes:
RAW_INSERT: str = "INSERT"
ITEM_METADATA_UPDATED: str = "item_metadata_updated"
ITEM_LISTED: str = "item_listed"
ITEM_SOLD: str = "item_sold"
ITEM_TRANSFERRED: str = "item_transferred"
ITEM_RECEIVED_OFFER: str = "item_received_offer"
ITEM_RECEIVED_BID: str = "item_received_bid"
ITEM_CANCELLED: str = "item_cancelled"
class Logger:
def __init__(self, message: str) -> None:
self.message = message
def debug(self):
print(f"[DEBUG]: {self.message}")
def info(self):
print(f"[INFO]: {self.message}")
def warn(self):
print(f"[WARN]: {self.message}")
def error(self):
print(f"[ERROR]: {self.message}")
class Hook:
def FlowUpdated(self, payload: dict):
try:
if flow_id := payload["payload"]["new"]["id"]:
return flow_id
except Exception as e:
Logger(f"No matching Hook{e}").error()
class Webhook:
OPENSEA_LOGO_URL = "https://pbs.twimg.com/profile_images/1544105652330631168/ZuvjfGkT_400x400.png"
FOOTER = "Made by jerem.#3609"
def __init__(self, webhook: str, color: str = "03a9fc", thumbnail: str = None, footer: str = FOOTER):
self.webhook = webhook
self.color = color
self.thumbnail = thumbnail
self.footer = footer
def ItemMetadataUpdated(self, payload: dict):
try:
embed = DiscordEmbed(title="New Metadata Updated !", color=self.color)
if name := payload["payload"]["item"]["metadata"].get("name"):
embed.add_embed_field(name="Name :", value=str(name))
if chain_name := payload["payload"]["item"]["chain"].get("name"):
embed.add_embed_field(name="Chain :", value=str(chain_name))
if image_url := payload["payload"]["item"]["metadata"].get("image_url"):
embed.set_thumbnail(url=str(image_url))
elif self.thumbnail:
embed.set_thumbnail(url=str(self.thumbnail))
else:
embed.set_thumbnail(url=str(self.OPENSEA_LOGO_URL))
links = ""
if metadata_url := payload["payload"]["item"]["metadata"].get("metadata_url"):
links += f"[Metadata]({metadata_url}) • "
if permalink := payload["payload"]["item"].get("permalink"):
links += f"[Asset]({permalink})"
if links != "":
embed.add_embed_field(name="Links :", value=str(links), inline=False)
if self.footer:
embed.set_footer(text=str(self.footer))
return embed
except Exception as e:
Logger(f"Failed to build embed {e}").error()
def ItemCancelled(self, payload: dict):
try:
embed = DiscordEmbed(title="New Item Cancelled !", color=self.color)
if name := payload["payload"]["item"]["metadata"].get("name"):
embed.add_embed_field(name="Name :", value=name)
if base_price := payload["payload"].get("base_price"):
embed.add_embed_field(name="Price :", value=str(int(base_price) / 10**18))
if chain_name := payload["payload"]["item"]["chain"].get("name"):
embed.add_embed_field(name="Chain :", value=chain_name)
if image_url := payload["payload"]["item"]["metadata"].get("image_url"):
embed.set_thumbnail(url=image_url)
elif self.thumbnail:
embed.set_thumbnail(url=self.thumbnail)
else:
embed.set_thumbnail(url=self.OPENSEA_LOGO_URL)
links = ""
if payload["payload"].get("transaction") and payload["payload"]["transaction"].get("hash"):
hash_ = payload["payload"]["transaction"]["hash"]
links += f"[Order](https://etherscan.io/tx/{hash_}) • "
if maker := payload["payload"]["maker"].get("address"):
links += f"[Maker](https://etherscan.io/address/{maker}) • "
if metadata_url := payload["payload"]["item"]["metadata"].get("metadata_url"):
links += f"[Metadata]({metadata_url}) • "
if permalink := payload["payload"]["item"].get("permalink"):
links += f"[Asset]({permalink})"
if links != "":
embed.add_embed_field(name="Links :", value=str(links), inline=False)
if self.footer:
embed.set_footer(text=str(self.footer))
return embed
except Exception as e:
Logger(f"Failed to build embed {e}").error()
def ItemListed(self, payload: dict):
try:
embed = DiscordEmbed(title="New Item Listed !", color=self.color)
if name := payload["payload"]["item"]["metadata"].get("name"):
embed.add_embed_field(name="Name :", value=name)
if base_price := payload["payload"].get("base_price"):
embed.add_embed_field(name="Price :", value=str(int(base_price) / 10**18))
if chain_name := payload["payload"]["item"]["chain"].get("name"):
embed.add_embed_field(name="Chain :", value=chain_name)
if image_url := payload["payload"]["item"]["metadata"].get("image_url"):
embed.set_thumbnail(url=image_url)
elif self.thumbnail:
embed.set_thumbnail(url=self.thumbnail)
else:
embed.set_thumbnail(url=self.OPENSEA_LOGO_URL)
links = ""
if maker := payload["payload"]["maker"].get("address"):
links += f"[Maker](https://etherscan.io/address/{maker}) • "
if metadata_url := payload["payload"]["item"]["metadata"].get("metadata_url"):
links += f"[Metadata]({metadata_url}) • "
if permalink := payload["payload"]["item"].get("permalink"):
links += f"[Asset]({permalink})"
if links != "":
embed.add_embed_field(name="Links :", value=str(links), inline=False)
if self.footer:
embed.set_footer(text=str(self.footer))
return embed
except Exception as e:
Logger(f"Failed to build embed {e}").error()
def ItemSold(self, payload: dict):
try:
embed = DiscordEmbed(title="New Item Sold !", color=self.color)
if name := payload["payload"]["item"]["metadata"].get("name"):
embed.add_embed_field(name="Name :", value=name)
if sale_price := payload["payload"].get("sale_price"):
embed.add_embed_field(name="Price :", value=str(int(sale_price) / 10**18))
if chain_name := payload["payload"]["item"]["chain"].get("name"):
embed.add_embed_field(name="Chain :", value=chain_name)
if image_url := payload["payload"]["item"]["metadata"].get("image_url"):
embed.set_thumbnail(url=image_url)
elif self.thumbnail:
embed.set_thumbnail(url=self.thumbnail)
else:
embed.set_thumbnail(url=self.OPENSEA_LOGO_URL)
links = ""
if payload["payload"].get("transaction") and payload["payload"]["transaction"].get("hash"):
hash_ = payload["payload"]["transaction"]["hash"]
links += f"[Order](https://etherscan.io/tx/{hash_}) • "
if maker := payload["payload"]["maker"].get("address"):
links += f"[Maker](https://etherscan.io/address/{maker}) • "
if taker := payload["payload"]["taker"].get("address"):
links += f"[Taker](https://etherscan.io/address/{taker}) • "
if metadata_url := payload["payload"]["item"]["metadata"].get("metadata_url"):
links += f"[Metadata]({metadata_url}) • "
if permalink := payload["payload"]["item"].get("permalink"):
links += f"[Asset]({permalink})"
if links != "":
embed.add_embed_field(name="Links :", value=str(links), inline=False)
if self.footer:
embed.set_footer(text=str(self.footer))
return embed
except Exception as e:
Logger(f"Failed to build embed {e}").error()
def ItemTransferred(self, payload: dict):
try:
embed = DiscordEmbed(title="New Item Transferred !", color=self.color)
if name := payload["payload"]["item"]["metadata"].get("name"):
embed.add_embed_field(name="Name :", value=name)
if sale_price := payload["payload"].get("sale_price"):
embed.add_embed_field(name="Price :", value=str(int(sale_price) / 10**18))
if chain_name := payload["payload"]["item"]["chain"].get("name"):
embed.add_embed_field(name="Chain :", value=chain_name)
if quantity := payload["payload"].get("quantity"):
embed.add_embed_field(name="Quantity :", value=str(quantity))
if image_url := payload["payload"]["item"]["metadata"].get("image_url"):
embed.set_thumbnail(url=image_url)
elif self.thumbnail:
embed.set_thumbnail(url=self.thumbnail)
else:
embed.set_thumbnail(url=self.OPENSEA_LOGO_URL)
links = ""
if payload["payload"].get("transaction") and payload["payload"]["transaction"].get("hash"):
hash_ = payload["payload"]["transaction"]["hash"]
links += f"[Order](https://etherscan.io/tx/{hash_}) • "
if maker := payload["payload"]["from_account"].get("address"):
links += f"[Maker](https://etherscan.io/address/{maker}) • "
if taker := payload["payload"]["to_account"].get("address"):
links += f"[Taker](https://etherscan.io/address/{taker}) • "
if metadata_url := payload["payload"]["item"]["metadata"].get("metadata_url"):
links += f"[Metadata]({metadata_url}) • "
if permalink := payload["payload"]["item"].get("permalink"):
links += f"[Asset]({permalink})"
if links != "":
embed.add_embed_field(name="Links :", value=str(links), inline=False)
if self.footer:
embed.set_footer(text=str(self.footer))
return embed
except Exception as e:
Logger(f"Failed to build embed {e}").error()
def ItemReceivedOffer(self, payload: dict):
print(payload)
try:
embed = DiscordEmbed(title="New Item Received Offer !", color=self.color)
if name := payload["payload"]["item"]["metadata"].get("name"):
embed.add_embed_field(name="Name :", value=name)
if base_price := payload["payload"].get("base_price"):
embed.add_embed_field(name="Price :", value=str(int(base_price) / 10**18))
if chain_name := payload["payload"]["item"]["chain"].get("name"):
embed.add_embed_field(name="Chain :", value=chain_name)
if quantity := payload["payload"].get("quantity"):
embed.add_embed_field(name="Quantity :", value=str(quantity))
if image_url := payload["payload"]["item"]["metadata"].get("image_url"):
embed.set_thumbnail(url=image_url)
elif self.thumbnail:
embed.set_thumbnail(url=self.thumbnail)
else:
embed.set_thumbnail(url=self.OPENSEA_LOGO_URL)
links = ""
if maker := payload["payload"]["maker"].get("address"):
links += f"[Maker](https://etherscan.io/address/{maker}) • "
if payload["payload"].get("taker") and payload["payload"]["taker"]["address"]:
taker = payload["payload"]["taker"]["address"]
links += f"[Taker](https://etherscan.io/address/{taker}) • "
if metadata_url := payload["payload"]["item"]["metadata"].get("metadata_url"):
links += f"[Metadata]({metadata_url}) • "
if permalink := payload["payload"]["item"].get("permalink"):
links += f"[Asset]({permalink})"
if links != "":
embed.add_embed_field(name="Links :", value=str(links), inline=False)
if self.footer:
embed.set_footer(text=str(self.footer))
return embed
except Exception as e:
Logger(f"Failed to build embed {e}").error()
def ItemReceivedBid(self, payload: dict):
try:
embed = DiscordEmbed(title="New Item Receive Bid !", color=self.color)
if name := payload["payload"]["item"]["metadata"].get("name"):
embed.add_embed_field(name="Name :", value=name)
if base_price := payload["payload"].get("base_price"):
embed.add_embed_field(name="Price :", value=str(int(base_price) / 10**18))
if chain_name := payload["payload"]["item"]["chain"].get("name"):
embed.add_embed_field(name="Chain :", value=chain_name)
if quantity := payload["payload"].get("quantity"):
embed.add_embed_field(name="Quantity :", value=str(quantity))
if image_url := payload["payload"]["item"]["metadata"].get("image_url"):
embed.set_thumbnail(url=image_url)
elif self.thumbnail:
embed.set_thumbnail(url=self.thumbnail)
else:
embed.set_thumbnail(url=self.OPENSEA_LOGO_URL)
links = ""
if maker := payload["payload"]["maker"].get("address"):
links += f"[Maker](https://etherscan.io/address/{maker}) • "
if payload["payload"].get("taker") and payload["payload"]["taker"]["address"]:
taker = payload["payload"]["taker"]["address"]
links += f"[Taker](https://etherscan.io/address/{taker}) • "
if metadata_url := payload["payload"]["item"]["metadata"].get("metadata_url"):
links += f"[Metadata]({metadata_url}) • "
if permalink := payload["payload"]["item"].get("permalink"):
links += f"[Asset]({permalink})"
if links != "":
embed.add_embed_field(name="Links :", value=str(links), inline=False)
if self.footer:
embed.set_footer(text=str(self.footer))
return embed
except Exception as e:
Logger(f"Failed to build embed {e}").error()
def send(self, payload: dict):
try:
if payload.get("event_type") == EventTypes.ITEM_METADATA_UPDATED:
embed = self.ItemMetadataUpdated(payload)
elif payload.get("event_type") == EventTypes.ITEM_CANCELLED:
embed = self.ItemCancelled(payload)
elif payload.get("event_type") == EventTypes.ITEM_LISTED:
embed = self.ItemListed(payload)
elif payload.get("event_type") == EventTypes.ITEM_SOLD:
embed = self.ItemSold(payload)
elif payload.get("event_type") == EventTypes.ITEM_TRANSFERRED:
embed = self.ItemTransferred(payload)
elif payload.get("event_type") == EventTypes.ITEM_RECEIVED_OFFER:
embed = self.ItemReceivedOffer(payload)
elif payload.get("event_type") == EventTypes.ITEM_RECEIVED_BID:
embed = self.ItemReceivedBid(payload)
else:
Logger("Failed to get event type: " + str(payload.get("event_type"))).error()
return
self.webhook.remove_embeds()
self.webhook.add_embed(embed=embed)
self.webhook.execute()
except Exception as e:
Logger(f"Failled to execute webhook : {e}").error()
class SupabaseClient:
def __init__(self, url: str):
# socketUrl = f"{network}?token={api_key}"
self.socket = Socket(url)
self.connected = False
def connect(self):
try:
Logger("Connecting to socket").debug()
self.socket.connect()
self.connected = True
except Exception as e:
Logger(f"Failed to connect to socket : {e}").error()
def createChannel(self, collection_slug: str):
try:
channel: Channel = self.socket.set_channel(f"realtime:{collection_slug}")
channel.join()
Logger(f"Successfully joined channel {collection_slug}").info()
return channel
except Exception as e:
Logger(f"Failed to join channel : {e}").error()
def getChannels(self, collection_slug: str):
try:
if collection_slug not in list(self.socket.channels):
Logger(f"Creating channel for topic: {collection_slug}").debug()
return self.createChannel(collection_slug)
Logger(f"Channel for topic: {collection_slug} already exist").warn()
except Exception as e:
Logger(f"Failed to create channel for topic {collection_slug} : {e}").error()
def on(self, event_type: EventTypes, collection_slug: str, callback):
try:
if not self.connected:
self.socket.connect()
self.connected = True
channel = self.getChannels(collection_slug)
Logger(f"Subscribing to {event_type} events on {collection_slug}").info()
channel.on(event_type, callback)
except Exception as e:
Logger(e).error()
def startListening(self):
try:
Logger(f"Channel joined : {list(self.socket.channels)}").info()
Logger("Listenning").warn()
self.socket.listen()
except Exception as e:
Logger(e).error()
def onItemMetadataUpdated(self, collection_slug: str, callback):
try:
Logger(f"Listening for item metadata updates on {collection_slug}").debug()
return self.on(EventTypes.ITEM_METADATA_UPDATED, collection_slug, callback)
except Exception as e:
Logger(e).error()
def onEvents(self, slugs: list, event_type: EventTypes, callback):
try:
if type(slugs) != list:
Logger("slugs params must be a list").error()
return
if type(event_type) != list:
Logger("slugs params must be a list").error()
return
for slug in slugs:
for event in event_type:
self.on(event, slug, callback)
except Exception as e:
Logger(e).error()
nodes = []
links = []
def callback(payload: dict):
def get_name_by_id(id):
for node in nodes:
if node["id"] == id:
return node["name"]
try:
if payload["table"] == "Flow":
flow_id = payload["record"]["id"]
print(flow_id)
if payload["table"] == "Node":
node_id = payload["record"]["id"]
node_name = payload["record"]["name"]
nodes.append({"id": node_id, "name": node_name})
print("Nodes:", nodes)
if payload["table"] == "Edge":
# edge_id = payload["record"]["id"]
edge_source = payload["record"]["sourceId"]
edge_target = payload["record"]["targetId"]
links.append({"source": get_name_by_id(edge_source), "target": get_name_by_id(edge_target)})
print("Links:", links)
except Exception as e:
print("ERROR")
return
if __name__ == "__main__":
from decouple import config
SUPA_SERVICE_KEY = config("SUPA_SERVICE_KEY")
HOST_PORT = 8000
URL = getURL("localhost", HOST_PORT, SUPA_SERVICE_KEY)
Client = SupabaseClient(URL)
Client.connect()
Client.getChannels("*")
Client.onEvents(["*"], [EventTypes.RAW_INSERT], callback)
# Client.onItemListed("collection-slug", callback)
# Client.onItemSold("collection-slug", callback)
Client.startListening()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment