Created
March 21, 2025 09:33
-
-
Save wooparadog/3635e52cf116c58c887b0863aafc44e4 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import json | |
import os | |
import time | |
import matplotlib.pyplot as plt | |
import openai | |
from sklearn.cluster import MiniBatchKMeans | |
from sklearn.decomposition import PCA | |
from sklearn.metrics import silhouette_score | |
from orion.utils.cache import redis_cacher | |
EMBEDDING = "text-embedding-3-small-azure" | |
LIMIT = None | |
openai_client = openai.AzureOpenAI( | |
azure_endpoint="https://dolaapp.openai.azure.com", | |
azure_deployment=EMBEDDING, | |
api_key=os.environ["AZURE_OPENAI_EMBEDDING_KEY"], | |
api_version="2024-06-01", | |
) | |
with open("query_result_2024-11-11T13_19_24.360161+08_00.json") as f: | |
data = json.load(f) | |
if LIMIT: | |
data = data[:LIMIT] | |
def create_combined_text(entry): | |
return f"{entry['name']} {entry['search_keywords']} {entry['search_intention']}" | |
search_intentions = [create_combined_text(entry) for entry in data] | |
def chunk_list(lst, chunk_size=100): | |
return [lst[i : i + chunk_size] for i in range(0, len(lst), chunk_size)] | |
@redis_cacher.cache(60 * 60) | |
def get_embeddings_batch(model, text_batch): | |
response = openai_client.embeddings.create(model=model, input=text_batch) | |
return [e.embedding for e in response.data] | |
def get_embeddings(text_list): | |
embeddings = [] | |
for batch in chunk_list(text_list, 100): | |
batch_embeddings = get_embeddings_batch(EMBEDDING, batch) | |
embeddings.extend(batch_embeddings) | |
time.sleep(1) | |
return embeddings | |
embedding_filename = f"embedding_{LIMIT or 'all'}-{EMBEDDING}.json" | |
try: | |
with open(embedding_filename) as f: | |
embeddings = json.load(f) | |
except Exception as e: | |
print("calculate embedding") | |
embeddings = get_embeddings(search_intentions) | |
with open(embedding_filename, "w") as f: | |
json.dump(embeddings, f) | |
print("embedding saved") | |
print("embeddings len:", len(embeddings)) | |
# Dimensionality reduction | |
pca = PCA(n_components=50) # Adjust the number of components as needed | |
reduced_embeddings = pca.fit_transform(embeddings) | |
def find_optimal_clusters(embeddings, min_k, max_k, step=1): | |
iters = list(range(min_k, max_k + 1, step)) | |
s = [] | |
for k in iters: | |
start = time.time() | |
model = MiniBatchKMeans(n_clusters=k, random_state=42) | |
model.fit(embeddings) | |
s.append(silhouette_score(embeddings, model.labels_)) | |
print("Time taken for k =", k, ":", time.time() - start) | |
return iters, s | |
def plot_silhouette_scores(iters, s): | |
plt.figure(figsize=(8, 6)) | |
plt.plot(iters, s, marker="o") | |
plt.xlabel("Number of clusters") | |
plt.ylabel("Silhouette Score") | |
plt.title("Silhouette Score for Different Number of Clusters") | |
plt.savefig(f"silhouette_score-{LIMIT or 'all'}-{EMBEDDING}.png") | |
plt.show() | |
def generate_best_clustered_data(iters, s): | |
best_k = iters[s.index(max(s))] | |
best_model = MiniBatchKMeans(n_clusters=best_k, random_state=42) | |
best_model.fit(reduced_embeddings) | |
best_clustered_data = [] | |
for i, entry in enumerate(data): | |
entry["cluster"] = int(best_model.labels_[i]) | |
best_clustered_data.append(entry) | |
return best_clustered_data | |
if __name__ == "__main__": | |
iters, s = find_optimal_clusters(reduced_embeddings, 10000, 30000, step=25) | |
with open("silhouette_score.json", "w") as f: | |
json.dump({"iters": iters, "s": s}, f) | |
# plot_silhouette_scores(iters, s) | |
best = generate_best_clustered_data(iters, s) | |
with open("clustered_data.json", "w") as f: | |
json.dump(data, f, ensure_ascii=False, indent=2) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment