Created
May 22, 2024 07:16
-
-
Save wooparadog/88fac497cf9bf5fef3cf1e0e500eb050 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import datetime | |
import logging | |
from typing import List, Optional | |
from orion.biz.common import UserTrafficData | |
from orion.biz.handlers.message.message import query_message | |
from orion.biz.handlers.user import get_user, update_user | |
from orion.biz.models.user_traffic import UserTraffic | |
from orion.common import Role | |
from orion.utils.cache import redis_cacher | |
from orion.utils.db import DBSession, transactional_session | |
from orion.utils.random import random_lower_string | |
logger = logging.getLogger(__name__) | |
def create_user_traffic( | |
medium: str, | |
source: str, | |
campaign: str, | |
created_by: int, | |
emoji: str, | |
end_transaction=True, | |
) -> UserTraffic: | |
""" | |
Create a new UserTraffic record. | |
""" | |
with transactional_session( | |
UserTraffic.GetDBSession(), end_transaction=end_transaction | |
) as session: | |
new_traffic = UserTraffic( | |
medium=medium, | |
source=source, | |
campaign=campaign, | |
emoji=emoji, | |
uuid=random_lower_string(5), | |
created_by=created_by, | |
) | |
session.add(new_traffic) | |
return new_traffic | |
def query_user_traffic( | |
medium: Optional[str] = None, | |
source: Optional[str] = None, | |
campaign: Optional[str] = None, | |
) -> UserTrafficData: | |
return UserTraffic.get_first_or_raise( | |
UserTraffic.source == source, UserTraffic.campaign == campaign | |
).as_data() | |
def get_all_user_traffic() -> List[UserTrafficData]: | |
""" | |
Retrieve all UserTraffic records. | |
""" | |
return [ut.as_data() for ut in UserTraffic.query_by()] | |
@redis_cacher.cache( | |
int(datetime.timedelta(minutes=5).total_seconds()), | |
) | |
def _get_all_user_traffic_with_cache() -> List[UserTrafficData]: | |
return get_all_user_traffic() | |
def update_user_traffic( | |
traffic_id: int, | |
updated_by: int, | |
medium: Optional[str] = None, | |
source: Optional[str] = None, | |
campaign: Optional[str] = None, | |
emoji: Optional[str] = None, | |
end_transaction: bool = True, | |
) -> UserTraffic: | |
""" | |
Update an existing UserTraffic record. | |
""" | |
with transactional_session( | |
UserTraffic.GetDBSession(), end_transaction=end_transaction | |
): | |
traffic = UserTraffic.get_by_id_or_raise(traffic_id) | |
if medium: | |
traffic.medium = medium # type: ignore | |
if source: | |
traffic.source = source # type: ignore | |
if campaign: | |
traffic.campaign = campaign # type: ignore | |
if emoji: | |
traffic.emoji = emoji # type: ignore | |
traffic.created_by = updated_by # type: ignore | |
return traffic | |
def detect_user_traffic(user_id: int, end_transaction: bool = True): | |
first_messages, _ = query_message( | |
user_id=user_id, role=Role.USER, page_size=1, valid_only=False | |
) | |
if not first_messages: | |
return | |
first_message = first_messages[0] | |
logger.info(f"Try to detect where the user come from: {user_id}") | |
all_uts = _get_all_user_traffic_with_cache() | |
for ut in sorted(all_uts, key=lambda x: -x.id): | |
if ut.emoji in first_message.content or ut.uuid in first_message.content: | |
with transactional_session(DBSession, end_transaction=end_transaction): | |
user = get_user(user_id) | |
if not user: | |
logger.error(f"Failed to detect user: {user_id}") | |
return | |
user_data = user.as_data() | |
user_data.settings.properties["utm"] = str(ut.id) | |
update_user(user_id, user_settings=user_data.settings) | |
return |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment