Skip to content

Instantly share code, notes, and snippets.

@ericflo
Created August 5, 2024 21:05
Show Gist options
  • Save ericflo/ec3affbd5d5be8f5b029c941f4095620 to your computer and use it in GitHub Desktop.
Save ericflo/ec3affbd5d5be8f5b029c941f4095620 to your computer and use it in GitHub Desktop.
# Deploy 1x AMD MI300X
# python -m vllm.entrypoints.openai.api_server --port 8083 --host 127.0.0.1 --model meta-llama/Meta-Llama-3.1-70B-Instruct --max-model-len 120000
# NUM_WORKERS=32 MODEL_NAME="meta-llama/Meta-Llama-3.1-70B-Instruct" OPENAI_API_URL="http://127.0.0.1:8083/v1" python agent_instruction_database.py
import copy
import os
import json
import traceback
import random
from pprint import pprint
from typing import Union
import threading
from concurrent.futures import ThreadPoolExecutor, as_completed
from functools import wraps
from general_function_calling import run_inference
DB_FILENAME = os.environ.get("DB_FILENAME", "db-multi-70b.json")
MAX_CATEGORIES = 60 # Start at 20, run for quite a while, then to 60
MAX_ITEMS = 100
MAX_TRIES = 8
CATEGORY_DB = {}
VERBOSE = True
LOCK = threading.Lock()
DEFAULT_DB_MULTI = """{
"mathematical_analysis": [],
"sentiment_analysis": [],
"text_summarization": [],
"code_generation": [],
"language_translation": [],
"literary_analysis": [],
"theoretical_science": [],
"emotional_intelligence": [],
"logic_puzzle_solving": [],
"creative_writing": [],
"data_visualization": [],
"music_composition": [],
"artistic_design": [],
"philosophical_discussion": [],
"historical_research": [],
"cultural_understanding": [],
"ethical_decision_making": [],
"legal_advice": [],
"medical_diagnosis": [],
"psychological_counseling": [],
"mathematical_proof_and_theorem_proving": [],
"financial_analysis": [],
"engineering_design": [],
"space_exploration": [],
"quantum_physics": [],
"machine_learning": []
}"""
def synchronized(func):
@wraps(func)
def wrapper(*args, **kwargs):
with LOCK:
return func(*args, **kwargs)
return wrapper
@synchronized
def list_categories() -> list[dict[str, list[dict]]]:
"""
List all categories in the database and their example lengths.
"""
if VERBOSE:
print(f"list_categories()")
categories = [{"name": k, "count": len(v)} for k, v in CATEGORY_DB.items()]
random.shuffle(categories)
return {"categories": categories, "max_categories": MAX_CATEGORIES}
@synchronized
def get_category_count(name: str) -> dict[str, int]:
"""
Get the count of examples of a category by its name.
"""
if VERBOSE:
print(f"get_category_count({name})")
return {name: len(CATEGORY_DB[name]), "max_examples": MAX_ITEMS}
@synchronized
def get_category_examples(name: str) -> dict[str, list[Union[str, int]]]:
"""
Get a subset of examples from a category in the database.
"""
if VERBOSE:
print(f"get_category_examples({name})")
examples = copy.deepcopy(CATEGORY_DB[name])
random.shuffle(examples)
return {name: examples, "max_examples": MAX_ITEMS}
@synchronized
def add_category_example(name: str, example: str) -> dict[str, int]:
"""
Add an example to a category.
"""
example = example.strip()
if VERBOSE:
print(f"add_category_example({name}, {example})")
if name not in CATEGORY_DB and len(CATEGORY_DB.keys()) >= MAX_CATEGORIES:
raise ValueError(
f"You may not add more than {MAX_CATEGORIES} categories, please add to an existing one."
)
items = CATEGORY_DB.get(name, [])
if len(items) >= MAX_ITEMS:
raise ValueError(
f"You may not add more than {max(MAX_ITEMS, len(items))} examples per category, please add a new category, add an example to another category, or delete a low-quality or low-diversity example."
)
if example in items:
raise ValueError(
"This exact example already exists in the category, and that is extremely dissappointing - strive hard for diversity"
)
items.append(example)
CATEGORY_DB[name] = items
print(f"Saving {sum([len(v) for v in CATEGORY_DB.values()])} entries")
with open(DB_FILENAME, "w") as f:
json.dump(
{k: sorted(v) for k, v in CATEGORY_DB.items()},
f,
indent=2,
sort_keys=True,
)
return {name: len(CATEGORY_DB[name]), "max_examples": MAX_ITEMS}
@synchronized
def delete_category_example(name: str, verbatim_example: str) -> dict[str, int]:
"""
Deletes the example from the category. Must provide the example to delete verbatim or it won't be found and deleted.
"""
if VERBOSE:
print(f"delete_category_example({name}, {verbatim_example})")
items = CATEGORY_DB.get(name, [])
prev_count = len(items)
CATEGORY_DB[name] = [
i for i in items if i.strip().lower() != verbatim_example.strip().lower()
]
print(f"Saving {sum([len(v) for v in CATEGORY_DB.values()])} entries")
with open(DB_FILENAME, "w") as f:
json.dump(
{k: sorted(v) for k, v in CATEGORY_DB.items()},
f,
indent=2,
sort_keys=True,
)
return {"deleted": len(CATEGORY_DB[name]) < prev_count, "max_examples": MAX_ITEMS}
@synchronized
def delete_category(name: str) -> dict[str, Union[bool, int]]:
"""
Deletes an entire category and all its examples. Only use if the category is too similar to another category, or it's too obscure, and you're truly absolutely certain of that fact. This tool call deletes a lot of (your) past work and cannot be undone, so do not do it without serious consideration first of the consequences.
"""
if VERBOSE:
print(f"delete_category({name})")
resp = {
"deleted": bool(CATEGORY_DB.pop(name, None)),
"max_categories": MAX_CATEGORIES,
}
print(f"Saving {sum([len(v) for v in CATEGORY_DB.values()])} entries")
with open(DB_FILENAME, "w") as f:
json.dump(
{k: sorted(v) for k, v in CATEGORY_DB.items()},
f,
indent=2,
sort_keys=True,
)
return resp
def worker():
messages = [
{
"role": "system",
"content": """You are joining onto a project whose mission is to create the most diverse, balanced, incredible, complex, varied dataset of LLM prompts/tasks/challenges/exams/tests/evaluations/banter/instructions. A single example might be something like:
* "Write a 10 paragraph essay on the history of AI." or
* "Given the following two math equations, solve the system of equations: x + y = 5 and 2x - y = 3" or
* "Write a Python function that calls out to a public stock API using nothing but the `requests` library, takes in a ticker symbol as an argument, and returns the current price."
* etc etc. the above is just a teeny tiny sliver subset of the awesome diverse examples I want you to create please!
You are expected to quickly get a lay of the land, then come up with a general idea for a new extremely high quality example to add. See if there's a category related to that high level concept (preferably a low-example-count category). Quickly scan the category to make sure a similar example doesn't already exist, and if one does, abandon your idea and come up with a new one (preferably in a low-example-count category).
In rare cases, if you notice a bad pattern in the already-generated data, or repeats, or something that needs correction, you may design and implement a cleanup pass where you filter out examples you think are poor or detrimental to the quality of the dataset, instead of adding a new example. Improving statistical diversity and excellence is welcome, although achieving that by adding volume is generally preferred to filtering. It's just that sometimes, after a lot of entries and many files, you simply have to do some filtering. As a heuristic let's say wait until at least 10 categories each with over 20 entries in it.
If an instruction or question requires content or a document, **be sure to include that document or content in the example directly** and it must be the actual content because placeholders will ruin the example.""",
}
]
try:
run_inference(
messages=messages,
tools=[
list_categories,
# get_category_count,
get_category_examples,
add_category_example,
delete_category_example,
# delete_category,
],
max_tries=MAX_TRIES,
)
print(messages[-1]["content"])
except Exception as e:
traceback.print_exception(e)
def main():
if os.path.exists(DB_FILENAME):
with LOCK:
with open(DB_FILENAME) as f:
CATEGORY_DB.clear()
CATEGORY_DB.update(json.load(f))
else:
CATEGORY_DB.clear()
CATEGORY_DB.update(json.loads(DEFAULT_DB_MULTI))
futures = []
with ThreadPoolExecutor(
max_workers=int(os.environ.get("NUM_WORKERS", 8))
) as executor:
for _ in range(20000):
futures.append(executor.submit(worker))
for future in as_completed(futures):
_ = future.result()
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment