Skip to content

Instantly share code, notes, and snippets.

@wooparadog
Created March 21, 2025 09:33
Show Gist options
  • Save wooparadog/3635e52cf116c58c887b0863aafc44e4 to your computer and use it in GitHub Desktop.
Save wooparadog/3635e52cf116c58c887b0863aafc44e4 to your computer and use it in GitHub Desktop.
import json
import os
import time
import matplotlib.pyplot as plt
import openai
from sklearn.cluster import MiniBatchKMeans
from sklearn.decomposition import PCA
from sklearn.metrics import silhouette_score
from orion.utils.cache import redis_cacher
EMBEDDING = "text-embedding-3-small-azure"
LIMIT = None
openai_client = openai.AzureOpenAI(
azure_endpoint="https://dolaapp.openai.azure.com",
azure_deployment=EMBEDDING,
api_key=os.environ["AZURE_OPENAI_EMBEDDING_KEY"],
api_version="2024-06-01",
)
with open("query_result_2024-11-11T13_19_24.360161+08_00.json") as f:
data = json.load(f)
if LIMIT:
data = data[:LIMIT]
def create_combined_text(entry):
return f"{entry['name']} {entry['search_keywords']} {entry['search_intention']}"
search_intentions = [create_combined_text(entry) for entry in data]
def chunk_list(lst, chunk_size=100):
return [lst[i : i + chunk_size] for i in range(0, len(lst), chunk_size)]
@redis_cacher.cache(60 * 60)
def get_embeddings_batch(model, text_batch):
response = openai_client.embeddings.create(model=model, input=text_batch)
return [e.embedding for e in response.data]
def get_embeddings(text_list):
embeddings = []
for batch in chunk_list(text_list, 100):
batch_embeddings = get_embeddings_batch(EMBEDDING, batch)
embeddings.extend(batch_embeddings)
time.sleep(1)
return embeddings
embedding_filename = f"embedding_{LIMIT or 'all'}-{EMBEDDING}.json"
try:
with open(embedding_filename) as f:
embeddings = json.load(f)
except Exception as e:
print("calculate embedding")
embeddings = get_embeddings(search_intentions)
with open(embedding_filename, "w") as f:
json.dump(embeddings, f)
print("embedding saved")
print("embeddings len:", len(embeddings))
# Dimensionality reduction
pca = PCA(n_components=50) # Adjust the number of components as needed
reduced_embeddings = pca.fit_transform(embeddings)
def find_optimal_clusters(embeddings, min_k, max_k, step=1):
iters = list(range(min_k, max_k + 1, step))
s = []
for k in iters:
start = time.time()
model = MiniBatchKMeans(n_clusters=k, random_state=42)
model.fit(embeddings)
s.append(silhouette_score(embeddings, model.labels_))
print("Time taken for k =", k, ":", time.time() - start)
return iters, s
def plot_silhouette_scores(iters, s):
plt.figure(figsize=(8, 6))
plt.plot(iters, s, marker="o")
plt.xlabel("Number of clusters")
plt.ylabel("Silhouette Score")
plt.title("Silhouette Score for Different Number of Clusters")
plt.savefig(f"silhouette_score-{LIMIT or 'all'}-{EMBEDDING}.png")
plt.show()
def generate_best_clustered_data(iters, s):
best_k = iters[s.index(max(s))]
best_model = MiniBatchKMeans(n_clusters=best_k, random_state=42)
best_model.fit(reduced_embeddings)
best_clustered_data = []
for i, entry in enumerate(data):
entry["cluster"] = int(best_model.labels_[i])
best_clustered_data.append(entry)
return best_clustered_data
if __name__ == "__main__":
iters, s = find_optimal_clusters(reduced_embeddings, 10000, 30000, step=25)
with open("silhouette_score.json", "w") as f:
json.dump({"iters": iters, "s": s}, f)
# plot_silhouette_scores(iters, s)
best = generate_best_clustered_data(iters, s)
with open("clustered_data.json", "w") as f:
json.dump(data, f, ensure_ascii=False, indent=2)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment