Created
March 7, 2024 23:30
-
-
Save JoshuaPurtell/c1182551fa609736d47df4af82f7c5ab to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import asyncio | |
import copy | |
import hashlib | |
import json | |
import os | |
import random | |
from dataclasses import dataclass | |
from typing import Any, Dict, List, Optional | |
import numpy as np | |
import requests | |
import torch | |
from neural_cherche import models, retrieve, train, utils | |
from openai import AsyncOpenAI | |
import wandb | |
TRAINING_BATCH_SIZE=8 | |
LR=10e-5 | |
N_EPOCHS=20 | |
INFERENCE_BATCH_SIZE=32 | |
DATA_SAVE_PATH = "colbert-finetuning/data" | |
MODEL_SAVE_PATH = "colbert-finetuning/models" | |
CHECKPOINT_DIR = "colbert-finetuning/checkpoints" | |
LLM_MODEL = "gpt-3.5-turbo-0125" | |
EMBEDDING_MODEL = "text-embedding-3-large" | |
N2P_RATIO = 10 | |
client = AsyncOpenAI( | |
api_key=os.environ.get("OPENAI_API_KEY"), | |
) | |
# Random Boilerplate - replace in your own code | |
import os | |
import json | |
class LLM: | |
model_name: str | |
embeddings_cache_path: str = "embeddings_cache.json" | |
generations_cache_path: str = "generations_cache.json" | |
embeddings_cache: dict | |
generations_cache: dict | |
def __init__(self, model_name: str, embeddings_mode: str, temperature: Optional[float] = 0, max_tokens: Optional[int] = None, top_p: Optional[float] = None, frequency_penalty: Optional[float] = None, presence_penalty: Optional[float] = None, stop: Optional[str] = None): | |
self.model_name = model_name | |
self.embeddings_model = embeddings_mode | |
self.temperature = temperature | |
# Load caches | |
self.embeddings_cache = self.load_cache(self.embeddings_cache_path) | |
self.generations_cache = self.load_cache(self.generations_cache_path) | |
def load_cache(self, path): | |
if os.path.exists(path): | |
with open(path, 'r') as file: | |
return json.load(file) | |
return {} | |
def save_cache(self, path, cache): | |
with open(path, 'w') as file: | |
json.dump(cache, file) | |
async def generate(self, system_prompt, user_prompt): | |
cache_key = json.dumps({"system_prompt": system_prompt, "user_prompt": user_prompt}) | |
if cache_key in self.generations_cache: | |
return self.generations_cache[cache_key] | |
messages = [ | |
{"role": "system", "content": system_prompt}, | |
{"role": "user", "content": user_prompt}, | |
] | |
response = await client.chat.completions.create( | |
messages=messages, | |
model=self.model_name, | |
) | |
generated_content = response.choices[0].message.content | |
self.generations_cache[cache_key] = generated_content | |
self.save_cache(self.generations_cache_path, self.generations_cache) | |
return generated_content | |
async def embed(self, text): | |
if text in self.embeddings_cache: | |
return self.embeddings_cache[text] | |
response = await client.embeddings.create( | |
model=self.embeddings_model, | |
input=text, | |
) | |
embedding = response.data[0].embedding | |
self.embeddings_cache[text] = embedding | |
self.save_cache(self.embeddings_cache_path, self.embeddings_cache) | |
return embedding | |
@dataclass | |
class ExampleDocument: | |
text: str | |
chunks: List[str] | |
class ColberFinetuningDataBuilder: | |
documents: List[ExampleDocument] | |
LLM: LLM | |
data_save_path: str | |
def __init__(self, documents_as_dictionaries: List[Dict[str, str]],data_save_path=DATA_SAVE_PATH,): | |
self.documents = [ExampleDocument(**doc) for doc in documents_as_dictionaries] | |
self.llm = LLM(LLM_MODEL, EMBEDDING_MODEL) | |
self.train_triplets = [] | |
self.test_triplets = [] | |
self.data_save_path = data_save_path | |
# doing hard negatives with embeddings improves over random negatives | |
async def build_synthetic_triplets(self, n_positive: int = 20, ratio_negative_to_positive: int = N2P_RATIO, negative_type: str = "random"): | |
if os.path.exists(self.data_save_path+"/train_triplets.json"): | |
with open(self.data_save_path+"/train_triplets.json") as f: | |
self.train_triplets = json.load(f) | |
with open(self.data_save_path+"/test_triplets.json") as f: | |
self.test_triplets = json.load(f) | |
return | |
triplets = [] | |
n_positives_remaining = n_positive | |
while n_positives_remaining > 0: | |
doc_index = random.randint(0, len(self.documents) - 1) | |
random_document = self.documents[doc_index] | |
random_positive_index = random.randint(0, len(random_document.chunks) - 1) | |
random_positive = random_document.chunks[random_positive_index] | |
all_negatives = [chunk for document in self.documents for chunk in document.chunks] | |
query = await self.llm.generate( | |
system_prompt="Given the following text chunk, formulate a question that is non-trivial to answer and that depends on the information in the text chunk", | |
user_prompt=f"# Text Chunk\n{random_positive}\nYour question:" | |
) | |
if negative_type == "random": | |
random_negative_indices = random.sample([i for i in range(len(self.documents[doc_index].chunks)) if abs(i-random_positive_index)>5], ratio_negative_to_positive) | |
negatives = [random_document.chunks[random_negative_index] for random_negative_index in random_negative_indices] | |
else: | |
query_embedding = await self.llm.embed(query) | |
negative_embeddings = [await self.llm.embed(negative) for negative in all_negatives] | |
similarities = [np.dot(query_embedding, negative_embedding) / (np.linalg.norm(query_embedding) * np.linalg.norm(negative_embedding)) for negative_embedding in negative_embeddings] | |
negatives = [all_negatives[i] for i in np.argsort(similarities)[:ratio_negative_to_positive]] | |
for negative in negatives: | |
triplets.append((query, random_positive, negative)) | |
n_positives_remaining -= 1 | |
self.train_triplets = triplets[: int(len(triplets) * 0.5)] | |
self.test_triplets = triplets[int(len(triplets) * 0.5) :] | |
with open(self.data_save_path+"/train_triplets.json", "w") as file: | |
json.dump(self.train_triplets, file) | |
with open(self.data_save_path+"/test_triplets.json", "w") as file: | |
json.dump(self.test_triplets, file) | |
class ColbertRetriever: | |
def __init__(self, document: ExampleDocument, path_to_model: str = "raphaelsty/neural-cherche-colbert"): | |
self.document = document | |
self.colbert_model = models.ColBERT( | |
model_name_or_path=path_to_model, | |
device="cuda" if torch.cuda.is_available() else "mps", # or mps/cpu | |
) | |
self.retriever = retrieve.ColBERT( | |
key="id", | |
on=["chunk"], # the field to search on, can be a list of fields | |
model=self.colbert_model, | |
) | |
documents_embeddings = self.retriever.encode_documents( | |
documents=[{'id':i, 'chunk':chunk} for i, chunk in enumerate(document.chunks)], | |
batch_size=INFERENCE_BATCH_SIZE, | |
) | |
self.retriever = self.retriever.add( | |
documents_embeddings=documents_embeddings, | |
) | |
def batch_retrieve(self, queries:List[str], k: int=20) -> List[List[Dict[str,Any]]]: | |
queries_embeddings = self.retriever.encode_queries( | |
queries=queries, # list of queries | |
batch_size=INFERENCE_BATCH_SIZE, | |
) | |
chunk_scores_by_query: List[List[Dict[str,Any]]] = self.retriever( | |
queries_embeddings=queries_embeddings, | |
batch_size=INFERENCE_BATCH_SIZE, | |
k=k, # number of documents to retrieve | |
) | |
return [self.document.chunks[chunk_score['id']] for query_chunk_scores in chunk_scores_by_query for chunk_score in query_chunk_scores] | |
class OpenaiVectorRetriever: | |
def __init__(self, document: ExampleDocument): | |
self.document = document | |
self.llm = LLM(LLM_MODEL, EMBEDDING_MODEL) | |
self.chunk_embeddings = asyncio.run(self.batch_embed(self.document.chunks)) | |
async def batch_embed(self,chunks:List[str]) -> List[str]: | |
embeddings = [] | |
for chunk in chunks: | |
embeddings.append(await self.llm.embed(chunk)) | |
return embeddings | |
def cosine_similarity(self, a, b): | |
return np.dot(a, b) / (np.linalg.norm(a) * np.linalg.norm(b)) | |
async def retrieve(self, query: str, k: int = 20) -> List[str]: | |
query_embedding = await self.llm.embed(query) | |
similarities = [self.cosine_similarity(query_embedding, chunk_embedding) for chunk_embedding in self.chunk_embeddings] | |
retrieved = [document_chunk for document_chunk, similarity in sorted(zip(self.document.chunks, similarities), key=lambda x: x[1], reverse=True)[:k]] | |
return retrieved | |
class ColbertFineTuner: | |
data_builder: ColberFinetuningDataBuilder | |
base_model: models.ColBERT | |
optimizer: torch.optim.Adam | |
def __init__( | |
self, | |
documents: List[ExampleDocument], | |
finetuning_databuilder: ColberFinetuningDataBuilder, | |
model_save_dir_path, | |
): | |
self.data_builder = finetuning_databuilder | |
self.base_model = models.ColBERT( | |
model_name_or_path="raphaelsty/neural-cherche-colbert", | |
device="cuda" if torch.cuda.is_available() else "mps", # or mps | |
) | |
self.optimizer = torch.optim.AdamW(self.base_model.parameters(), lr=LR)#,weight_decay=0.01 | |
self.documents = documents | |
self.model_save_dir_path = model_save_dir_path | |
def local_finetuning(self): | |
if os.path.exists(self.model_save_dir_path+"/model.safetensors"): | |
return | |
wandb.init(project="colbert_finetuning", name="local_finetuning_run") | |
model = models.ColBERT( | |
model_name_or_path="raphaelsty/neural-cherche-colbert", | |
device="cuda" if torch.cuda.is_available() else "mps", # or mps | |
) | |
optimizer = torch.optim.AdamW(model.parameters(), lr=LR) | |
X = copy.deepcopy(self.data_builder.train_triplets) | |
model_weight_sums = [] | |
for step, (anchor, positive, negative) in enumerate( | |
utils.iter( | |
X, | |
epochs=N_EPOCHS, # number of epochs | |
batch_size=TRAINING_BATCH_SIZE, # number of triples per batch | |
shuffle=True, | |
) | |
): | |
loss = train.train_colbert( | |
model=model, | |
optimizer=optimizer, | |
anchor=anchor, | |
positive=positive, | |
negative=negative, | |
step=step, | |
gradient_accumulation_steps=50, | |
) | |
if step % 10 == 0: | |
loss_f = loss['loss'].detach().cpu().numpy() | |
wandb.log({"loss": loss_f, "step": step + 1}) | |
model_weight_sums.append(sum(param.data.cpu().sum() for param in model.parameters())) | |
if step % 50 == 0: | |
model.save_pretrained(CHECKPOINT_DIR) | |
loss_f = loss['loss'].detach().cpu().numpy() | |
print(f"Final Step {step + 1}: Loss = {loss_f:.4f}") | |
model.save_pretrained(self.model_save_dir_path) | |
wandb.finish() | |
torch.mps.empty_cache() | |
def benchmarking(self, k:int=10): | |
baseline_vector_hits = {} | |
baseline_colbert_hits = {} | |
finetuned_colbert_hits = {} | |
base_vector_retriever = OpenaiVectorRetriever(self.data_builder.documents[0]) | |
base_retriever = ColbertRetriever(self.data_builder.documents[0]) | |
fine_tuned_retriever = ColbertRetriever(self.data_builder.documents[0], path_to_model=self.model_save_dir_path) | |
for triplet in self.data_builder.test_triplets: | |
query, positive, _ = triplet | |
pair_hash = hashlib.sha256((query + positive).encode()).hexdigest() | |
if pair_hash in baseline_vector_hits: | |
continue | |
base_vector_retrieved = asyncio.run(base_vector_retriever.retrieve(query, k=k)) | |
base_colbert_retrieved = base_retriever.batch_retrieve([query], k=k) | |
fine_tuned_retrieved = fine_tuned_retriever.batch_retrieve([query], k=k) | |
baseline_vector_hits[pair_hash] = positive in base_vector_retrieved[0] | |
baseline_colbert_hits[pair_hash] = positive in base_colbert_retrieved[0] | |
finetuned_colbert_hits[pair_hash] = positive in fine_tuned_retrieved[0] | |
if baseline_vector_hits[pair_hash]!= finetuned_colbert_hits[pair_hash]: | |
print("Query: ", query) | |
print("Positive: ", positive) | |
print("Baseline Vector: ", base_vector_retrieved[0]) | |
print("Baseline Colbert: ", base_colbert_retrieved[0]) | |
print("Fine-tuned Colbert: ", fine_tuned_retrieved[0]) | |
print(baseline_vector_hits[pair_hash]) | |
print(baseline_colbert_hits[pair_hash]) | |
print(f"Mean top-{k} hit-rate for baseline Vector: ", sum(baseline_vector_hits.values())/len(baseline_vector_hits)) | |
print(f"Mean top-{k} hit-rate for baseline Colbert: ", sum(baseline_colbert_hits.values())/len(baseline_colbert_hits)) | |
print(f"Mean top-{k} hit-rate for fine-tuned Colbert: ", sum(finetuned_colbert_hits.values())/len(finetuned_colbert_hits)) | |
if __name__ == "__main__": | |
os.makedirs("colbert-finetuning", exist_ok=True) | |
os.makedirs("colbert-finetuning/data/pg", exist_ok=True) | |
os.makedirs("colbert-finetuning/models/pg", exist_ok=True) | |
#Ensure you source a .env with your openai api key @ OPENAI_API_KEY | |
# Ensure you have wandb installed and have run wandb login | |
# Ensure you have the neural_cherche package installed https://github.com/raphaelsty/neural-cherche | |
# Do pip install neural-cherche | |
## REPLACE WITH YOUR OWN DOCUMENT(S) - Admittedly, finetuning on a single document as we do with the PG essay here doesn't have a huge impact | |
## This is llama-index's text file - please verify yourself :-) | |
essay_url = "https://raw.githubusercontent.com/run-llama/llama_index/main/docs/examples/data/paul_graham/paul_graham_essay.txt" | |
response = requests.get(essay_url) | |
document_text = response.text if response.status_code == 200 else "Failed to load document." | |
chunks = document_text.split("\n\n") | |
example_document = { | |
"text": document_text, | |
"chunks": [chunk for chunk in chunks if len(chunk.strip()) > 1] | |
} | |
# Run the finetuning job | |
N_POSITIVE_EXAMPLES = 300 | |
data_builder = ColberFinetuningDataBuilder([example_document], data_save_path="colbert-finetuning/data/pg") | |
asyncio.run(data_builder.build_synthetic_triplets(n_positive=N_POSITIVE_EXAMPLES)) | |
finetuner = ColbertFineTuner([example_document], data_builder, "colbert-finetuning/models/pg") | |
finetuner.local_finetuning() | |
# Evaluate the finetuning vs baseline colbert vs baseline vector retriever | |
TOP_K_CHUNKS_TO_RETRIEVE = 1 | |
finetuner.benchmarking(k=TOP_K_CHUNKS_TO_RETRIEVE) | |
#Spoiler, you might get something like (when doing 200 examples and 20 epochs @ 3e-6 lr): | |
#Mean top-1 hit-rate for baseline Vector: 0.9259259259259259 | |
#Mean top-1 hit-rate for baseline Colbert: 0.8271604938271605 | |
#Mean top-1 hit-rate for fine-tuned Colbert: 0.8765432098765432 | |
# After 400 examples and 25 epochs @ 5e-5 lr: | |
# Mean top-1 hit-rate for baseline Vector: 0.9152542372881356 | |
# Mean top-1 hit-rate for baseline Colbert: 0.7796610169491526 | |
# Mean top-1 hit-rate for fine-tuned Colbert: 0.9152542372881356 | |
# After 400 examples and 40 epochs @ 10e-5 lr: | |
# Mean top-1 hit-rate for baseline Vector: 0.925531914893617 | |
# Mean top-1 hit-rate for baseline Colbert: 0.8191489361702128 | |
# Mean top-1 hit-rate for fine-tuned Colbert: 0.9361702127659575 | |
#Fine tuning works! We can see that vector embedding is pretty good for this task, though. | |
#In general, I think you'll find that for harder tasks, the fine-tuned colbert will outperform the vector embedding. And that longer training times and more examples will help. |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment