Skip to content

Instantly share code, notes, and snippets.

@jul
Last active August 9, 2025 14:16
Show Gist options
  • Save jul/0c9bc59c954bc5e9829599a6cd501b6b to your computer and use it in GitHub Desktop.
Save jul/0c9bc59c954bc5e9829599a6cd501b6b to your computer and use it in GitHub Desktop.
text indexation
import json
import os
import sys
import spacy
from langdetect import detect
from archery import mdict, vdict
from time import time
import re
from json import load, dumps, loads
import psycopg2 as sq
from nltk.corpus import stopwords
from nltk.stem.snowball import SnowballStemmer
stemmer = SnowballStemmer(language='french')
stop_words = set(stopwords.words('french'))
nlp = spacy.load("fr_core_news_sm")
con = sq.connect(dbname="trollo", user="jul")
cur = con.cursor()
def counter(list_of_words):
res = mdict()
for i in list_of_words:
res += vdict({ i : 1 })
return res
def return_token_sent(sentence):
# Tokeniser la phrase
doc = nlp(sentence)
# Retourner le texte de chaque phrase
return [X.text for X in doc.sents]
def return_stem(sentence):
doc = nlp(sentence)
return [stemmer.stem(X.text) for X in doc if X.text not in stop_words]
def dbg(msg): sys.stderr.write(str(msg));sys.stderr.flush()
cur.execute("select post, is_spam from posts where is_spam is not NULL");
av_vect = vdict(ham=vdict(), spam=vdict())
from emoji import is_emoji
def parse(post):
text = post["record"]["text"]
res = vdict()
try:
if not post or detect(text)!="fr":
return vdict()
except:
return res
if text:
res = vdict(sum(map(counter,map(return_stem, return_token_sent(text)))))
for w in text.split():
if w.startswith("#") or w.startswith("@"):
res+=vdict({w:1})
res+=vdict(weired=1)
for c in text:
if is_emoji(c):
res+=vdict({"emoji":1})
res+=vdict({c:1})
return res
while res := cur.fetchone():
post, is_spam = res
av_vect[["ham", "spam"][is_spam]] += parse(post)
print(av_vect)
#from pdb import set_trace; set_trace()
ratio_detection = vdict()
avg_len = vdict(ham = vdict(total=0, n=0), spam = vdict(total=0, n=0))
cur.execute("select post, is_spam from posts where is_spam is not NULL");
while res := cur.fetchone():
post, is_spam = res
if post["record"]["text"]:
text = parse(post)
avg_len[["ham", "spam"][is_spam]]["n"]+=1
avg_len[["ham", "spam"][is_spam]]["total"]+=len(text)
if abs(text):
ratio_detection += vdict({["ham","spam"][ is_spam]: vdict({["spam", "ham"][text.cos(av_vect["ham"]) > text.cos(av_vect["spam"])] :1 })})
print(ratio_detection)
avg_len['ham']["avg"] = avg_len["ham"]["total"]/avg_len["ham"]["n"]
avg_len['spam']["avg"] = avg_len["spam"]["total"]/avg_len["spam"]["n"]
alpha = 1
voc = av_vect["ham"] + av_vect["spam"]
n_vocabulary = len(voc)
parameters = vdict(vdict(ham=vdict(), spam=vdict()))
for word in voc:
n_word_given_spam = av_vect.get("spam",0)
p_word_given_spam = (av_vect["spam"].get(word,0) + alpha) / (avg_len["spam"]["avg"] + alpha*n_vocabulary)
parameters["spam"] += vdict( { word : p_word_given_spam })
n_word_given_ham = av_vect.get("ham",0)
p_word_given_ham = (av_vect["ham"].get(word,0) + alpha) / (avg_len["ham"]["avg"] + alpha*n_vocabulary)
parameters["ham"] += vdict( { word : p_word_given_ham })
ratio_detection = vdict()
cur.execute("select post, is_spam from posts where is_spam is not NULL");
while res := cur.fetchone():
post, is_spam = res
if post["record"]["text"]:
text = parse(post)
if abs(text):
ratio_detection += vdict({["ham","spam"][ is_spam]: vdict({["spam", "ham"][text.cos(parameters["ham"]) > text.cos(parameters["spam"])] :1 })})
if is_spam and text.cos(parameters["ham"]) > text.cos(parameters["spam"]):
print("wrong spam")
print(text)
print(post["uri"])
if not is_spam and text.cos(parameters["ham"]) < text.cos(parameters["spam"]):
print("wrong ham")
print(text)
print(post["uri"])
print(ratio_detection)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment