Last active
June 30, 2018 07:56
-
-
Save GlulkAlex/f5c8efaff38e04b7e30372b13c324663 to your computer and use it in GitHub Desktop.
Game theory behaviour strategies simulation
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3.6 | |
# -*- coding: utf-8 -*- | |
import sys | |
from pprint import pprint, pformat | |
import logging | |
from collections import namedtuple | |
import random | |
from random import randint | |
from array import array | |
from enum import Enum#, unique | |
from typing import ( | |
Dict | |
, List | |
, Tuple | |
, Set | |
# Optional[X] is equivalent to Union[X, None] | |
, Optional | |
# an alternative to collections.namedtuple | |
# that supports type-checking. | |
, NamedTuple | |
# or | |
#?#, SimpleNamespace | |
, Type | |
# Callable[[int], str] is a function of (int) -> str. | |
, Callable | |
) | |
import inspect | |
from functools import wraps | |
"""merchants.py: | |
""" | |
__author__ = "GlukAlex" | |
task = """ | |
total 50 merchants | |
time span: 1 year or 365 days | |
each merchant made from 5 to 10 deals ( deals limit ) | |
with any ( every ) other merchant | |
during the time span | |
( as one tournament turn | iteration ) | |
Deal strategies (API): | |
fair ( honest ) | unfair ( dishonest | crooked ) | |
Deal outcomes: | |
1. ( fair, fair ) = ( 4, 4 ) | |
2. ( unfair, fair ) = ( 5, 1 ) or ( fair, unfair ) = ( 1, 5 ) | |
3. ( unfair, unfair ) = ( 2, 2 ) | |
@Done: how and when implement miscommunication ? | |
like each actor calculates its own outcome independently ? | |
based on interpretation of message from another party ? | |
Deal misunderstanding probability: 5% ( communication noise ) | |
when one or both merchants suspect a foul play . | |
Annual results: | |
20% of least successful ( profitable ) merchants | |
will be replaced by new ones | |
which adopt strategy of the top | |
20% of the most successful ( profitable ) merchants . | |
Note: no exchange ( disclosure ) of information | |
about the deals details happend | |
between merchants . <- inner state not directly observable ? | |
( Behavior (American English) or behaviour (Commonwealth English) ) | |
Merchants ( actors ) behaviours: | |
1. altruist - always honest | |
2. egoist - always dishonest | |
@Done: WTF ?!? and how trickster vs. trickster supposed to act ? | |
message exchange protocol is unclear | |
3. trickster - starts from fair | |
then copies the opponent | |
? what does this even mean ? | |
? most probable -> repeats the last opponent_Action ? | |
4. unpredictable - randomly chooses between fair and unfair | |
5. resentful | vindictive - starts as altruist ( always honest ) | |
from fair | |
then when opponent once deceive him | |
switched to | became egoist ( always dishonest ) | |
6. con man - starts from [ fair, unfair, fair, fair ] | |
at the time of next turn | deal | |
after fist 4 deals he was deceived | |
at least once | |
became egoist | |
else became trickster | |
initially it is ( almost ) equal amount of merchants | |
with each behaviour | |
50 / 6 = 8 * 5 + 10 * 1 | |
or 8 * 4 + 9 * 2 | |
or 7 * 1 + 8 * 2 + 9 * 3 | |
Goal: | |
1. implement task model . | |
2. find | estimate winning behaviour . | |
3. ( optional ) devise | invent new additional ( 7-th ) winning strategy | |
that beats all previous 6 | |
or way to create it | |
( to solve optimization problem ), | |
like ML | |
or use of The genetic algorithm . | |
Note: for every division operation floor result | |
""" | |
goal = """ | |
( optimization: minimax ) maximize ( overall ) profit | |
or ( dual ) | |
increase probability of highest outcome in | for | of each deal | |
""" | |
### @toDo: use history size per merchant limit as strategy trait | |
### like memory size | |
### @toDo: implement clonning from Prototype function | |
### @toDo: add `origin` field to strategy function for statistics | |
### like: preset or default | basic | average | breed or child | |
class LoggingContext( object ): | |
"""it would be useful | |
to temporarily change the logging configuration | |
and revert it back after doing something. | |
effectively implementing The memento pattern | |
""" | |
def __init__( self, logger, level = None ) -> None: | |
self.logger = logger | |
self.level = level | |
def __enter__( self ) -> None: | |
if self.level is not None: | |
self.old_level = self.logger.level | |
self.logger.setLevel( self.level ) | |
return None | |
def __exit__( | |
self | |
# exc_type, exc_value, traceback | |
, et, ev, tb | |
): | |
""" Exit the runtime context | |
related to this object. | |
The parameters describe | |
the exception | |
that caused the context to be exited. | |
If the context was exited without an exception, | |
all three arguments will be None. | |
""" | |
if self.level is not None: | |
self.logger.setLevel( self.old_level ) | |
return None | |
#logging.basicConfig( | |
# level=logging.DEBUG | |
#, format='%(relativeCreated)6d %(threadName)s %(message)s' ) | |
logger = logging.getLogger( | |
'merchants_App' | |
#__name__ | |
) | |
#>print( logger.name ) | |
logger.setLevel( logging.DEBUG ) | |
fh = logging.FileHandler( 'run.log', mode='w' ) | |
fh.setLevel( logging.DEBUG ) | |
#df = logging.Formatter('$asctime $name ${levelname} $message', style='$' ) | |
formatter = logging.Formatter( | |
#'%(asctime)s - %(name)s - %(levelname)s - %(message)s' | |
#'%(levelname)s: %(name)s - %(message)s' | |
#'%(module)s' '%(lineno)d' | |
'$module:$lineno - $message', style='$' | |
) | |
fh.setFormatter( formatter ) | |
#ch.setFormatter(formatter) | |
logger.addHandler( fh ) | |
#logger.addHandler(ch) | |
def deal_Outcomes( | |
self_Action: bool | |
, other_Action: bool | |
) -> ( int, int ): | |
""" | |
return Tuple[ int ]: ( self outcome, other outcome ) | |
>>> deal_Outcomes( True, True ) # doctest: +NORMALIZE_WHITESPACE -SKIP +REPORT_NDIFF +FAIL_FAST | |
(4, 4) | |
>>> deal_Outcomes( False, True ) # doctest: +NORMALIZE_WHITESPACE -SKIP | |
(5, 1) | |
>>> deal_Outcomes( True, False ) # doctest: +NORMALIZE_WHITESPACE -SKIP | |
(1, 5) | |
>>> deal_Outcomes( False, False ) # doctest: +NORMALIZE_WHITESPACE -SKIP | |
(2, 2) | |
""" | |
if ( self_Action and other_Action ): | |
return ( 4, 4 ) | |
elif ( self_Action == False and other_Action == True ): | |
return ( 5, 1 ) | |
elif ( self_Action == True and other_Action == False ): | |
return ( 1, 5 ) | |
else: # ( unfair, unfair ) | |
return ( 2, 2 ) | |
### @Note: generator once instantiated | primed | |
### does not need to allocate memory and recreate its state | |
### every time ( when ) it is called with next() | |
### but function do . So a function is more expensive . | |
def altruist_Fun( | |
# for common interface only | |
previous_State: dict = None | |
) -> bool: | |
""" altruist - always honest | |
>>> altruist_Fun( )# doctest: +NORMALIZE_WHITESPACE -SKIP +REPORT_NDIFF +FAIL_FAST | |
True | |
>>> altruist_Fun( { "history": [ True ] } )# doctest: +NORMALIZE_WHITESPACE -SKIP +REPORT_NDIFF +FAIL_FAST | |
True | |
>>> altruist_Fun( { "history": [ True, False ] } )# doctest: +NORMALIZE_WHITESPACE -SKIP +REPORT_NDIFF +FAIL_FAST | |
True | |
>>> altruist_Fun( dict() )# doctest: +NORMALIZE_WHITESPACE -SKIP +REPORT_NDIFF +FAIL_FAST | |
True | |
""" | |
return True | |
def egoist_Fun( | |
# for interface only | |
previous_State: dict = None | |
) -> bool: | |
""" egoist - always dishonest | |
>>> egoist_Fun( )# doctest: +NORMALIZE_WHITESPACE -SKIP +REPORT_NDIFF +FAIL_FAST | |
False | |
>>> egoist_Fun( { "history": [ True ] } )# doctest: +NORMALIZE_WHITESPACE -SKIP +REPORT_NDIFF +FAIL_FAST | |
False | |
>>> egoist_Fun( { "history": [ True, False ] } )# doctest: +NORMALIZE_WHITESPACE -SKIP +REPORT_NDIFF +FAIL_FAST | |
False | |
>>> egoist_Fun( dict() )# doctest: +NORMALIZE_WHITESPACE -SKIP +REPORT_NDIFF +FAIL_FAST | |
False | |
""" | |
return False | |
def unpredictable_Fun( | |
# for common interface only | |
previous_State: dict = None | |
) -> bool: | |
""" unpredictable - randomly chooses between fair and unfair | |
>>> type( unpredictable_Fun( ) ) is bool# doctest: +NORMALIZE_WHITESPACE -SKIP +REPORT_NDIFF +FAIL_FAST | |
True | |
""" | |
from random import random | |
return bool( random() < 0.5 ) | |
def trickster_Fun( | |
previous_State: dict = None | |
) -> bool: | |
""" trickster - starts from fair | |
when no previous records of the last opponent_Action exist | |
then mirrors | repeats the last opponent_Action | |
>>> trickster_Fun( )# doctest: +NORMALIZE_WHITESPACE -SKIP +REPORT_NDIFF +FAIL_FAST | |
True | |
>>> state = { "history": [ ] } | |
>>> trickster_Fun( state )# doctest: +NORMALIZE_WHITESPACE -SKIP +REPORT_NDIFF +FAIL_FAST | |
True | |
>>> state = { "history": [ True ] } | |
>>> trickster_Fun( state )# doctest: +NORMALIZE_WHITESPACE -SKIP +REPORT_NDIFF +FAIL_FAST | |
True | |
>>> state = { "history": [ True, False ] } | |
>>> trickster_Fun( state )# doctest: +NORMALIZE_WHITESPACE -SKIP +REPORT_NDIFF +FAIL_FAST | |
False | |
""" | |
if previous_State is None: | |
# initialization | |
return True#( True, other_Action ) | |
elif not previous_State.get( "history", [] ): | |
return True#( True, other_Action ) | |
else:# type( previous_State["history"] ) is list | |
# and len( previous_State["history"] ) > 0 | |
old_State = previous_State[ "history" ][-1] | |
return old_State#( old_State, other_Action ) | |
def vindictive_Fun( | |
previous_State: dict = None | |
, strategies_Pool: Tuple[ Callable[ [ dict ], bool ], int ] = ( ( egoist_Fun, 1 ), ) | |
) -> bool: | |
""" vindictive - starts as altruist ( always honest ) | |
from fair | |
when no previous records of the last opponent_Action exist | |
once deceived | |
switched to | became egoist ( always dishonest ) | |
>>> vindictive_Fun( )# doctest: +NORMALIZE_WHITESPACE -SKIP +REPORT_NDIFF +FAIL_FAST | |
True | |
>>> state = { "history": [ ] } | |
>>> vindictive_Fun( state )# doctest: +NORMALIZE_WHITESPACE -SKIP +REPORT_NDIFF +FAIL_FAST | |
True | |
>>> state = { "history": [ True ] } | |
>>> vindictive_Fun( state )# doctest: +NORMALIZE_WHITESPACE -SKIP +REPORT_NDIFF +FAIL_FAST | |
True | |
>>> state = { "history": [ True, False ] } | |
>>> vindictive_Fun( state )# doctest: +NORMALIZE_WHITESPACE -SKIP +REPORT_NDIFF +FAIL_FAST | |
False | |
>>> state[ "history" ] | |
[True, False] | |
#>>> state[ "behaviour" ].__name__ == "egoist_Fun" | |
#True | |
>>> state[ "behaviour_i" ] | |
1 | |
""" | |
if previous_State is None: | |
return True | |
elif not previous_State.get( "history", [] ): | |
return True | |
else:# type( previous_State["history"] ) is list | |
# and len( previous_State["history"] ) > 0 | |
was_Deceived = not all( previous_State["history"] ) | |
logger.debug( f"vindictive was_Deceived:{ was_Deceived }" ) | |
if was_Deceived: | |
# switch to next behaviour | |
( next_Behaviour, next_Behaviour_i ) = strategies_Pool[0] | |
#>previous_State["behaviour"] = next_Behaviour#egoist_Fun | |
# no `actions_List` for next_Behaviour here to set | reset | |
previous_State["behaviour_i"] = next_Behaviour_i#1 | |
logger.debug( f"vindictive next behaviour:{next_Behaviour}[{ next_Behaviour_i }]" ) | |
return next_Behaviour( | |
previous_State | |
) | |
else:# not was_Deceived | |
return True | |
def conMan_Fun( | |
# so `previous_State` will be mutated inside | |
### @toDo: behaviour Fun interface | |
### maybe return state | |
### with | plus switch to function if applicable explicitly ? | |
### <- sort of, it mutates passed state inside | |
previous_State: dict = { | |
"actions_List": [ True, False, True, True ] | |
} | |
, initial_Actions: tuple = ( True, False, True, True )# Tuple[ of bool ] | |
# In general inherited from Merchant class | |
, strategies_Pool: Tuple[ Callable[ [ dict ], bool ], int ] = ( | |
( egoist_Fun, 1 ), ( trickster_Fun, 2 ) ) | |
) -> bool: | |
""" ? composit function from other functions ? | |
con man - starts from [ fair, unfair, fair, fair ] | |
at the time of next turn | deal | |
after fist 4 deals he was deceived | |
at least once | |
became egoist | |
else became trickster | |
>>> conMan_Fun( None )# doctest: +NORMALIZE_WHITESPACE -SKIP +REPORT_NDIFF +FAIL_FAST | |
Traceback (most recent call last): | |
... | |
AssertionError | |
>>> state = { | |
... "actions_List": [ True, False, True, True ] | |
... , "history": [] | |
... } | |
>>> conMan_Fun( state )# doctest: +NORMALIZE_WHITESPACE -SKIP +REPORT_NDIFF +FAIL_FAST | |
True | |
>>> state# doctest: +NORMALIZE_WHITESPACE -SKIP +REPORT_NDIFF +FAIL_FAST | |
{'actions_List': [False, True, True], 'history': []} | |
>>> state["history"] = [ True ] | |
>>> conMan_Fun( state )# doctest: +NORMALIZE_WHITESPACE -SKIP +REPORT_NDIFF +FAIL_FAST | |
False | |
>>> conMan_Fun( state )# doctest: +NORMALIZE_WHITESPACE -SKIP +REPORT_NDIFF +FAIL_FAST | |
True | |
>>> conMan_Fun( state )# doctest: +NORMALIZE_WHITESPACE -SKIP +REPORT_NDIFF +FAIL_FAST | |
True | |
>>> conMan_Fun( state )# doctest: +NORMALIZE_WHITESPACE -SKIP +REPORT_NDIFF +FAIL_FAST | |
True | |
#>>> state[ "behaviour" ].__name__ == "trickster_Fun" | |
#True | |
>>> state[ "behaviour_i" ] | |
2 | |
>>> state["history"] = [ True, False, True, True ] | |
>>> conMan_Fun( state )# doctest: +NORMALIZE_WHITESPACE -SKIP +REPORT_NDIFF +FAIL_FAST | |
False | |
>>> state[ "actions_List" ]# doctest: +NORMALIZE_WHITESPACE -SKIP +REPORT_NDIFF +FAIL_FAST | |
[] | |
>>> state["history"]# doctest: +NORMALIZE_WHITESPACE -SKIP +REPORT_NDIFF +FAIL_FAST | |
[True, False, True, True] | |
#>>> state[ "behaviour" ].__name__ == "egoist_Fun" | |
#True | |
>>> state[ "behaviour_i" ] | |
1 | |
""" | |
assert( previous_State is not None ) | |
assert( type( previous_State ) is dict ) | |
logger.debug( f"conMan.previous_State was:{ previous_State }" ) | |
previous_State.setdefault( "actions_List", list( initial_Actions ) ) | |
### @WARN: it can fail on is not list | None | |
if len( previous_State[ "actions_List" ] ) > 0: | |
logger.debug( | |
"conMan pop first from previous_State.actions_List:" | |
f"{ previous_State[ 'actions_List' ] }" ) | |
return previous_State[ "actions_List" ].pop(0) | |
else:# previous_State[ "actions_List" ] is empty [] | |
logger.debug( | |
"conMan pop previous_State.actions_List is empty:" | |
f"{ previous_State[ 'actions_List' ] }" ) | |
previous_State.setdefault( "history", [ ] ) | |
history = previous_State[ 'history' ] | |
#>>> not all( [ True, False, True, True ] ) | |
#True | |
#>>> all( [ True, False, True, True ] ) | |
#False | |
was_Deceived = len( history ) >= len( initial_Actions ) and not all( history ) | |
logger.debug( f"conMan was_Deceived:{ was_Deceived }" ) | |
if was_Deceived: | |
# switch to next behaviour | |
#>previous_State["behaviour"] = egoist_Fun | |
# no `actions_List` for next_Behaviour here to set | reset | |
previous_State["behaviour_i"] = 1 | |
logger.debug( f"conMan next behaviour:{ egoist_Fun }" ) | |
### @toDo: what about `actions_List` ( in general ) ? | |
### it must merge | update some fields | |
### from | with defaults for next strategy | |
return egoist_Fun( | |
#previous_State | |
) | |
else:# not was_Deceived | |
# switch to next behaviour | |
#>previous_State["behaviour"] = trickster_Fun | |
# no `actions_List` for next_Behaviour here to set | reset | |
previous_State["behaviour_i"] = 2 | |
logger.debug( f"conMan next behaviour:{ trickster_Fun }" ) | |
### @toDo: what about `actions_List` ( in general ) ? | |
return trickster_Fun( previous_State ) | |
class Msg_Type( Enum ): | |
request = 1 | |
response = 2 | |
class Msg( NamedTuple ): | |
"""case Class | |
Represents an typed message with payload.""" | |
#type: str #= "deal request" | |
type: Msg_Type | |
action: bool | |
class Merchant( object ): | |
""" base class | |
Deal maker | |
""" | |
# map to function | |
# with or without inner state | |
# like @classmethod only this is a field | |
behaviours = [ | |
# 1. altruist - always honest | |
altruist_Fun | |
# 2. egoist - always dishonest | |
, egoist_Fun | |
# 3. trickster - starts from fair then copies the opponent | |
, trickster_Fun | |
# 4. unpredictable - randomly chooses between fair and unfair | |
, unpredictable_Fun | |
# 5. resentful | vindictive - starts from fair | |
# then when opponent once deceive him | |
# switched to | became egoist ( always dishonest ) | |
, vindictive_Fun | |
# 6. con man - starts from [ fair, unfair, fair, fair ] | |
# at the time of next turn | deal | |
# after fist 4 deals he was deceived | |
# at least once | |
# became egoist | |
# else became trickster | |
, conMan_Fun | |
] | |
#def __new__(cls[, ...]): | |
# """ It returns ( or not ) an instance of cls for __init__ """ | |
def __init__( | |
self | |
, behaviour_i: int = 0 | |
): | |
""" can create alterative constructors | |
with @classmethod | |
Note | |
---- | |
Do not include the `self` parameter in the ``Parameters`` section. | |
Parameters | |
---------- | |
behaviour_i : int | |
It can represent merchant type by initial behaviour | |
or serve for fast lookup for behaviour in behaviours | |
Returns | |
------- | |
Merchant | |
instance | |
""" | |
# initial behaviour like merchant type, expected to be immutable | |
# to allow inheritance ( create anew ) next year | |
self.behaviour_i = behaviour_i | |
self.profit = 0 | |
# explicit behaviour state | |
### @toDo: keep | store history_Map per merchant from every deal ? | |
### @toDo: limit history size per merchant ? e.g. 10 last items only ? | |
### current.history.for this merchant[:9].append( merchant.action ) | |
### because it could be | |
### from 5 * 49 = 245 up to 10 * 49 = 490 per year | |
### before merchants_Pool reset | |
self.previous_State = { | |
### @toDo: replace with `behaviour_i` | |
### because Merchant class contains all known strategies | |
### even if | the new ones generated at the beginning of the year | |
# type: Callable[ [ dict ], bool ] | |
#>"behaviour" : self.behaviours[ behaviour_i ] | |
"behaviour_i": behaviour_i | |
# per current deal partner | |
, "history": [] | |
# all deals history | |
# { merchant_Ref: history_List } | |
#?#, "deals_History_Map": dict() | |
} | |
self.deals_History_Map = dict() | |
@classmethod | |
def with_Custom_Behavior( | |
cls | |
, custom_Behaviour # type: Callable[ [ dict ], bool ] | |
): | |
"""genetic alterative constructor""" | |
if custom_Behaviour not in cls.behaviours: | |
cls.behaviours.append( custom_Behaviour ) | |
return cls( len( cls.behaviours ) - 1 ) | |
else: | |
return cls( cls.behaviours.index( custom_Behaviour ) ) | |
@classmethod | |
def altruist( cls ): | |
"""alterative constructor for build in strategy""" | |
return cls.with_Custom_Behavior( altruist_Fun ) | |
@classmethod | |
def egoist( cls ): | |
"""alterative constructor for build in strategy""" | |
return cls.with_Custom_Behavior( egoist_Fun ) | |
@classmethod | |
def trickster( cls ): | |
"""alterative constructor for build in strategy""" | |
return cls.with_Custom_Behavior( trickster_Fun ) | |
@classmethod | |
def unpredictable( cls ): | |
"""alterative constructor for build in strategy""" | |
return cls.with_Custom_Behavior( unpredictable_Fun ) | |
@classmethod | |
def vindictive( cls ): | |
""" alterative constructor for build in strategy""" | |
return cls.with_Custom_Behavior( vindictive_Fun ) | |
@classmethod | |
def conMan( cls ): | |
""" alterative constructor for build in strategy""" | |
return cls.with_Custom_Behavior( conMan_Fun ) | |
@staticmethod | |
def add_Noise( | |
action: bool | |
) -> bool: | |
""" Push signal through noise """ | |
if bool( random.random() < 0.05 ): | |
#!# {self.info()} | |
#?#logger.debug( f".add_Noise to {action} make it False" ) | |
return False | |
else: | |
return action | |
@property | |
def behaviour( self ): | |
""" get current behaviour from state | |
helper ? for convenience ?""" | |
#>logger.debug( f"get behaviour: {self.previous_State['behaviour'].__name__}" ) | |
#>return self.previous_State["behaviour"] | |
return Merchant.behaviours[ self.previous_State["behaviour_i"] ] | |
def info( self ) -> str: | |
""" nice representation | |
""" | |
self_Name = self.__class__.__name__ | |
self_ID = str( id( self ) )[-3:] | |
return f"{self_Name}:{self.behaviour_i}:{self_ID}" | |
def inform( | |
self | |
# target: 'Merchant'#Type[ Merchant ] | |
# from whom message came | |
# e.g. to reply back | |
, sender: 'Merchant'#Type[ Merchant ] | |
#, msg: dict = { "type": "deal request", "action": True } | |
#, msg: dict = { "type": "deal response", "action": False } | |
, msg: Msg | |
) -> None: | |
"""Deal making method | |
""" | |
#>with LoggingContext( logger, level=logging.INFO ): | |
logger.debug( ( | |
"{}.inform received: " | |
"{} from: {}" | |
).format( | |
self.info() | |
, msg | |
, sender.info | |
) | |
) | |
if msg.type is Msg_Type.request: | |
other_Action = self.add_Noise( msg.action ) | |
# get state | |
current_History = self.deals_History_Map.setdefault( sender, [ ] ) | |
self.previous_State[ "history" ] = current_History | |
#>with LoggingContext( logger, level=logging.INFO ): | |
logger.debug( f"add_Noise to {msg.action} got {other_Action}" ) | |
logger.debug( f"self.behaviour was: {self.behaviour.__name__}" ) | |
logger.debug( f"{self.info()}.inform.previous_State was: {self.previous_State}" ) | |
# expected possible change of | |
# `behaviour_i` | |
# and `actions_List` | |
# but not the `history` | |
action = self.behaviour( | |
previous_State = self.previous_State | |
) | |
#>with LoggingContext( logger, level=logging.INFO ): | |
logger.debug( f"self.behaviour became: {self.behaviour.__name__}" ) | |
logger.debug( f"{self.info()}.inform.previous_State became: {self.previous_State}" ) | |
# the actual current strategy | |
# defined inside function logic | |
# depending on passed `previous_State` | |
#>self.previous_State[ 'last_Action' ] = other_Action | |
#>self.previous_State.setdefault( "history", [ ] ) | |
#>self.previous_State[ "history" ].append( other_Action ) | |
# update state | |
#?#self.deals_History_Map[ sender ].append( other_Action ) | |
current_History.append( other_Action ) | |
# lookup for history in deals_History_Map | |
#?#self.previous_State[ "history" ] = self.deals_History_Map.get( sender, [] ) | |
self.previous_State[ "history" ] = current_History | |
outcomes = deal_Outcomes( action, other_Action ) | |
#>self.profit += deal_Outcomes( action, other_Action )[0] | |
self.profit += outcomes[0] | |
#>with LoggingContext( logger, level=logging.INFO ): | |
logger.debug( f"deal_Outcomes: {outcomes}" ) | |
logger.debug( f"update_State -> profit: {self.profit}" ) | |
logger.debug( f"about to reply to: {sender.info()}" ) | |
sender.inform( | |
self | |
, Msg( | |
type = Msg_Type.response | |
, action = action | |
) | |
) | |
elif msg.type is Msg_Type.response: | |
### @toDo: disable check in production | |
assert self.deals_History_Map.get( sender ) is not None, ( | |
f"self.deals_History_Map[ sender:{sender.info()} ] expected to be initialized" | |
" when response arrived" | |
f" {self.deals_History_Map}" | |
) | |
other_Action = self.add_Noise( msg.action ) | |
# expected possible change of | |
# `behaviour_i` | |
# and `actions_List` | |
# but not the `history` | |
self_Action = self.behaviour( self.previous_State ) | |
outcomes = deal_Outcomes( | |
self_Action | |
#?#self.behaviour( self.previous_State ) | |
, other_Action | |
) | |
#>with LoggingContext( logger, level=logging.INFO ): | |
logger.debug( "got response to close the deal" ) | |
logger.debug( f"add_Noise to {msg.action} got {other_Action}" ) | |
logger.debug( f"self.behaviour was: {self.behaviour.__name__}" ) | |
### @WARN: ! be cautious about mutation of `previous_State` here ! | |
logger.debug( "deal_Outcomes: " + str( outcomes ) ) | |
self.profit += outcomes[0] | |
### @toDo: maybe use string key instead of object reference ? | |
#?#self.deals_History_Map.setdefault( sender, [ ] ) | |
self.deals_History_Map[ sender ].append( other_Action ) | |
#>self.previous_State.setdefault( "history", [ ] ) | |
#>self.previous_State[ "history" ].append( other_Action ) | |
self.previous_State[ "history" ] = self.deals_History_Map[ sender ] | |
#>with LoggingContext( logger, level=logging.INFO ): | |
logger.debug( f"self.behaviour became: {self.behaviour.__name__}" ) | |
logger.debug( f"update_State -> profit: {self.profit}" ) | |
logger.debug( | |
f"deal between {self.info()} " | |
f"and {sender.info()} closed" ) | |
else: | |
pass | |
logger.warning( f"got unexpected Msg_Type: {msg.type}" ) | |
return None | |
def make_Deal( | |
self | |
, opponent: 'Merchant' | |
): | |
""" | |
so, here must be message exchange | |
both parties must send and receive | |
? ( initial ) proposal ? | |
? and ( final ) response | ( approval | rejection ) ? | |
as a result | |
both sides | parties must update their respective | |
- behaviours | strategies | |
- and profits | |
""" | |
# lookup for history in deals_History_Map | |
#?#current_History = self.deals_History_Map.get( opponent, [] ) | |
current_History = self.deals_History_Map.setdefault( opponent, [] ) | |
current_Behaviour_State = self.previous_State[ "behaviour_i" ] | |
current_Actions_List = self.previous_State.setdefault( "actions_List", [] ) | |
self.previous_State[ "history" ] = current_History | |
# >>> def f(): pass; | |
#... | |
#>>> f | |
#<function f at 0x7fb3c3a0cbf8> | |
#>>> type(f) | |
#<class 'function'> | |
### @toDo: creating copy might be expensive . get rid of it ? | |
### and just restore from `deals_History_Map` and old `behaviour_i` ? | |
# expected possible change of | |
# `behaviour_i` | |
# and `actions_List` | |
# but not the `history` | |
self_Action = self.behaviour( self.previous_State ) | |
# reset to initial | previous | |
self.previous_State[ "history" ] = current_History | |
self.previous_State[ "behaviour_i" ] = current_Behaviour_State | |
self.previous_State[ "actions_List" ] = current_Actions_List | |
#>if 1 == 1: | |
# it can be done at test level | inside test | |
#>with LoggingContext( logger, level=logging.INFO ): | |
logger.debug( f"{self.info()}.make_Deal with {opponent.info()}" ) | |
logger.debug( f"{self.info()}.make_Deal.previous_State was: {self.previous_State}" ) | |
logger.debug( | |
f"{self.info()}.make_Deal.previous_State['history']: " | |
f"{self.previous_State['history']}" ) | |
logger.debug( | |
f"{self.info()}.make_Deal.previous_State['behaviour_i']: " | |
f"{self.previous_State['behaviour_i']}" ) | |
logger.debug( f"\t{self.info()}.make_Deal.self_Action: {self_Action}" ) | |
# state update expected | |
opponent.inform( | |
self | |
, Msg( | |
type = Msg_Type.request | |
#?#, action = self.behaviour( self.previous_State.copy() ) | |
, action = self_Action | |
) | |
) | |
logger.debug( f"{self.info()}.make_Deal.previous_State became: {self.previous_State}" ) | |
def strategy_Builder( | |
#?#initial_State: dict | |
# actions_List: | |
# initial_Actions: tuple = ( True, False, True, True )# Tuple[ of bool ] | |
predefined_Actions: Tuple[ bool ] = tuple() | |
# In general inherited from Merchant class behaviours list indexes | |
, strategies_Pool: Tuple[ int ] = tuple( range( len( Merchant.behaviours ) ) ) | |
### @toDo: implement switch_Strategy | |
### it must depend only on available history | |
### mainly on False / True ratio, | |
### or last | longest consecutive strike | sequence | |
### and possibly randomly pick arbitrary next_Strategy | |
### from `strategies_Pool` | |
#?# | |
, switch_Condition: Callable = lambda h: False#[[history], bool] | |
, switch_Strategy_Fun: Callable = lambda h: 0#[[switch_Condition], int] | |
# from 1 to deals_Per_Year_Max * 2 | |
, history_Lookup_Window_Size: int = 20 | |
, default_Action_Fun: Callable[[dict], bool] = lambda d: True | |
, base_Class: Merchant = Merchant | |
) -> Callable[[dict], bool]: | |
""" Brand new strategy Factory | |
with Closure | |
""" | |
def generic_Fun( | |
previous_State: dict | |
) -> bool: | |
""" new strategy blueprint | |
with state parameters from parent function | |
""" | |
# setting public fields | |
# for unclear reasons fails here | |
setattr( generic_Fun, "42", 42 ) | |
logger.debug( f"?.previous_State was:{ previous_State }" ) | |
previous_State.setdefault( "actions_List", list( predefined_Actions ) ) | |
### @WARN: it can fail on is not list | None | |
if len( previous_State[ "actions_List" ] ) > 0: | |
logger.debug( | |
"? pop first from previous_State.actions_List:" | |
f"{ previous_State[ 'actions_List' ] }" ) | |
return previous_State[ "actions_List" ].pop(0) | |
else:# previous_State[ "actions_List" ] is empty [] | |
logger.debug( | |
"? pop previous_State.actions_List is empty:" | |
f"{ previous_State[ 'actions_List' ] }" ) | |
previous_State.setdefault( "history", [ ] ) | |
history = previous_State[ 'history' ][-history_Lookup_Window_Size:] | |
is_Switch_Strategy = ( | |
#?#len( history ) >= len( initial_Actions ) and | |
#?#switch_Strategy_Fun( history ) | |
switch_Condition( history ) | |
) | |
logger.debug( f"? is_Switch_Strategy:{ is_Switch_Strategy }" ) | |
if is_Switch_Strategy: | |
# switch to next behaviour | |
previous_State["behaviour_i"] = switch_Strategy_Fun( history ) | |
next_Strategy = base_Class.behaviours[ previous_State["behaviour_i"] ] | |
logger.debug( f"? next behaviour:{ next_Strategy }" ) | |
return next_Strategy( | |
previous_State | |
) | |
else:# not is_Switch_Strategy | |
return default_Action_Fun( previous_State ) | |
# setting public fields | |
# using nameSpace_With_Function_Closure | |
### ? setattr( env, key, val ) ? | |
generic_Fun.predefined_Actions = predefined_Actions | |
generic_Fun.strategies_Pool = strategies_Pool | |
generic_Fun.switch_Condition = switch_Condition | |
generic_Fun.switch_Strategy_Fun = switch_Strategy_Fun | |
#?#generic_Fun.history_Lookup_Window_Size = history_Lookup_Window_Size | |
setattr( generic_Fun, "history_Lookup_Window_Size", history_Lookup_Window_Size ) | |
generic_Fun.default_Action_Fun = default_Action_Fun | |
return generic_Fun | |
def merchant_Builder( | |
base_Class: Merchant = Merchant | |
, strategy_Builder: Callable[[dict], Callable[[dict], bool]] = strategy_Builder | |
, best_Strategies_Pool: List[Callable[[dict], bool]] = [] | |
) -> Merchant: | |
""" Factory | |
for creating new Merchant | |
with brand new strategy | |
by adding it to the base_Class | |
Available methods: | |
Basic: | |
( no inheritance ) | |
- generate next_Strategy | |
from generic_Strategy_Abstract_API_Draft | |
with random .fields parameters | |
within preset limits | |
Average: | |
( with inheritance ) | |
- get pool of top winning strategies | |
- average their .fields | |
e.g. by list length | |
or by combining | merging items inside | |
- if not present | |
randomly introduce new field(s) | |
within preset mutation limit | |
that can be randomized | |
Breed: | |
( with inheritance ) | |
- get pool of top winning strategies | |
- make | pick | choose random pair(s) of strategies | |
- pick half of .fields from each parent | |
- randomly introduce new field(s) | |
if there is room for them | |
or change inherited | |
within preset mutation limit | |
that can be randomized | |
it can pick | choose any generation method randomly | |
e.g: 0 <= and < 0.33 <= and < 0.66 <= and < 1 | |
""" | |
#import random | |
from random import random | |
roll_Dice = random() | |
### @toDo: implement Basic, Average, Breed | |
if roll_Dice < 0.33: | |
# create using | with Basic | |
pass | |
elif roll_Dice < 0.66: | |
# create using | with Average | |
pass | |
else:#if 0.66 <= roll_Dice < 1: | |
# create using | with Breed | |
pass | |
#brand_New_Strategy = lambda x: True | |
brand_New_Strategy = strategy_Builder( | |
) | |
return base_Class.with_Custom_Behavior( brand_New_Strategy ) | |
### @Done?: implement `initialize_Tournament` | |
def initialize_Tournament( | |
merchants_Pool_Size: int = 50 | |
# derived from Merchant.behaviours | |
#>, strategies_Pool_Size: int = 6 | |
): | |
""" | |
fill | create | populate initial merchants pool | |
Example: | |
-------- | |
divide 10 to 3 groups | |
3 3 4 by at least 3 = 10//3 and at most 5 = 3*2-1 items per group | |
((1,1,1),(2,2,2),(3,3,3,4)) | |
or last uncomplete group ((1,1,1),(2,2,2),(3,3,3),(4,)) | |
could be distributed ( more or less evenly ) | |
one by one across previous groups | |
like 8 // 3 = 2 -> ((1,1,1),(2,2,2),(3,3)) => | |
((1,1,1,3),(2,2,2,3)) | |
""" | |
merchants_Pool_List = [] | |
strategies_Pool_Size = len( Merchant.behaviours ) | |
assert strategies_Pool_Size > 0 | |
# expected at least one group with one (or more) item(s) inside | |
per_Strategy_Size_Min = 1 | |
per_Strategy_Size = max( | |
merchants_Pool_Size // strategies_Pool_Size, per_Strategy_Size_Min ) | |
per_Strategy_Size_Max = per_Strategy_Size * 2 - 1 | |
logger.debug( f"initialize_Tournament.merchants_Pool_Size: {merchants_Pool_Size}" ) | |
logger.debug( f"initialize_Tournament.per_Strategy_Size: {per_Strategy_Size}" ) | |
strategy_i = 0 | |
# items limit guard | |
m_Limit = 0 | |
items_Left = merchants_Pool_Size % strategies_Pool_Size | |
merchants_Groups_List = [ | |
[ group_i, per_Strategy_Size ] for group_i in range( 0, strategies_Pool_Size, 1 ) ] | |
logger.debug( f"initialize_Tournament.merchants_Groups_List: {merchants_Groups_List}" ) | |
logger.debug( f"initialize_Tournament.items_Left: {items_Left}" ) | |
# | |
strategy_i = strategies_Pool_Size - 1 | |
while items_Left > 0: | |
logger.debug( f"initialize_Tournament.group_i: {strategy_i}" ) | |
# group_Size | |
merchants_Groups_List[ strategy_i ][-1] += 1 | |
items_Left -= 1 | |
strategy_i -= 1 | |
# rotate | |
if strategy_i < 0: | |
strategy_i = strategies_Pool_Size - 1 | |
logger.debug( f"initialize_Tournament.merchants_Groups_List: {merchants_Groups_List}" ) | |
last_Item_i = 0 | |
for ( strategy_i, group_Size ) in merchants_Groups_List: | |
group_Start_From_i = last_Item_i | |
group_End_Until_i = group_Start_From_i + group_Size | |
# populate group | |
for m_i in range( 0, group_Size, 1 ): | |
merchants_Pool_List.append( | |
#>( strategy_i, m_i, group_Start_From_i, group_End_Until_i ) | |
Merchant.with_Custom_Behavior( Merchant.behaviours[ strategy_i ] ) | |
) | |
last_Item_i += 1 | |
logger.debug( f"Activated merchants poll size: {len( merchants_Pool_List )}" ) | |
return merchants_Pool_List | |
def round_Limits_Stat( | |
merchants_Pool_Size: int = 50 | |
, deals_Per_Year_Min: int = 5 | |
, deals_Per_Year_Max: int = 10 | |
): | |
""" exploratory analisys: | |
like | |
- max possible score: 49 * 5 * 10 = 2450 | |
""" | |
deals_Count_Total = 0 | |
deals_Count = 0 | |
deals_Count_Min = float( "inf" ) | |
deals_Count_Max = 0#-1 | |
freq_Map = dict() | |
min_Outcome_Score = 1 | |
max_Outcome_Score = 5 | |
#>>> 50 / 100 * 20 | |
#10.0 | |
#>>> 50 // 100 * 20 | |
#0 | |
### @Done: values are way off | |
### like order of magnitude bigger than expected ? @fixEd | |
for i in range( 0, merchants_Pool_Size, -1 ): | |
#?#for j in range( i + 1, merchants_Pool_Size, 1 ): | |
for j in range( 0, merchants_Pool_Size, 1 ): | |
deals_Count += 1 | |
deals_Count_Total += 1 | |
get_i = freq_Map.get( i, 0 ) | |
freq_Map[ i ] = get_i + 1 | |
get_j = freq_Map.get( j, 0 ) | |
freq_Map[ j ] = get_j + 1 | |
if deals_Count < deals_Count_Min: | |
deals_Count_Min = deals_Count | |
if deals_Count > deals_Count_Max: | |
deals_Count_Max = deals_Count | |
deals_Count_Min = ( merchants_Pool_Size - 1 ) * deals_Per_Year_Min | |
#deals_Count_Max = ( | |
# ( merchants_Pool_Size - 1 ) * deals_Per_Year_Max * merchants_Pool_Size ) | |
deals_Count_Max =( merchants_Pool_Size - 1 ) * deals_Per_Year_Max | |
#>logger.debug( "{:-^80}".format( 'play_Round()' ) ) | |
logger.debug( f"deals_Count_Min(50): {deals_Count_Min}" ) | |
logger.debug( f"deals_Count_Max(50): {deals_Count_Max}" ) | |
logger.debug( f"20 % of deals_Count_Max(50): {deals_Count_Max * 0.2}" ) | |
logger.debug( "distinct values() in freq_Map({}): {}".format( | |
len( freq_Map ), set( freq_Map.values() ) ) ) | |
min_Outcome_Score_Total = ( | |
min_Outcome_Score * deals_Count_Min * deals_Per_Year_Min ) | |
logger.debug( | |
( | |
"min_Outcome_Score_Total = " | |
"(min_Outcome_Score:{} * deals_Count_Min:{} * deals_Per_Year_Min:{:>2}): {:>5}" | |
).format( | |
min_Outcome_Score | |
, deals_Count_Min | |
, deals_Per_Year_Min | |
, min_Outcome_Score_Total | |
) ) | |
max_Outcome_Score_Total = ( | |
max_Outcome_Score * deals_Count_Max * deals_Per_Year_Max ) | |
logger.debug( | |
( | |
"max_Outcome_Score_Total = " | |
"(max_Outcome_Score:{} * deals_Count_Max:{} * deals_Per_Year_Max:{}): {}" | |
).format( | |
max_Outcome_Score | |
, deals_Count_Max | |
, deals_Per_Year_Max | |
, max_Outcome_Score_Total | |
) ) | |
logger.debug( | |
"scores span outcome max:{} - min:{} = {}".format( | |
max_Outcome_Score_Total | |
, min_Outcome_Score_Total | |
, max_Outcome_Score_Total - min_Outcome_Score_Total | |
) ) | |
bucket_Size = int( ( | |
max_Outcome_Score_Total - min_Outcome_Score_Total ) * 0.2 ) | |
logger.debug( | |
f"20 % of scores span: {bucket_Size}" ) | |
logger.debug( | |
"scores buckets: {}".format( | |
[ ( i, i + bucket_Size ) | |
for i in range( | |
min_Outcome_Score_Total | |
, max_Outcome_Score_Total | |
, int( bucket_Size ) ) ] ) ) | |
logger.debug( | |
"scores buckets: {}".format( | |
[ i for i in reversed( | |
range( | |
max_Outcome_Score_Total | |
, min_Outcome_Score_Total | |
, -int( bucket_Size ) ) ) ] ) ) | |
return deals_Count_Total | |
### @Done?: implement `play_Tour` | `play_Round` | |
def play_Round( | |
# it will be mutated inside | |
merchants_Pool: List[ Merchant ] | |
, merchants_Pool_Size: int = 50 | |
, deals_Per_Year_Min: int = 5 | |
, deals_Per_Year_Max: int = 10 | |
): | |
""" | |
play | make next turn | |
calculate pairs deals outcomes | |
? get annual results ? | |
? adjust | update individual behaviours | strategies ? | |
diagonal matrix pattern: | |
1 2 3 | |
( 1, 2 ), ( 1, 3 ) | |
( 2, 3 ) | |
1 2 3 4 | |
( 1, 2 ), ( 1, 3 ), ( 1, 4 ) | |
( 2, 3 ), ( 2, 4 ) | |
( 3, 4 ) | |
play_Round(50): 1225 | |
50 * 50 = 2500 | |
""" | |
import time | |
#>assert len( set( merchants_Pool ) ) == len( merchants_Pool ) | |
start_Time = time.monotonic() | |
for ( i, i_Merchant ) in enumerate( merchants_Pool ): | |
deals_Count = 0 | |
for ( j, j_Merchant ) in enumerate( merchants_Pool ): | |
if i == j:# i_Merchant == j_Merchant | |
continue # nested for | |
deals_To_Make = random.randint( deals_Per_Year_Min, deals_Per_Year_Max ) | |
#>assert deals_To_Make <= deals_Per_Year_Max | |
with LoggingContext( logger, level=logging.INFO ): | |
logger.debug( "deals_To_Make: {:_>16}".format( deals_To_Make ) ) | |
with LoggingContext( logger, level=logging.INFO ): | |
### @WARN: it is very time consuming operation | |
### and also creates a huge 208 Mb log file | |
### `LoggingContext` helps, alot | |
for deal_i in range( 0, deals_To_Make, 1 ): | |
i_Merchant.make_Deal( j_Merchant ) | |
stop_Time = time.monotonic() | |
total_Time = stop_Time - start_Time | |
average_Time_Per_Merchant = total_Time / merchants_Pool_Size | |
#average_Time_Per_Deal = ? | |
logger.debug( f"Merchants.making_Deals(49*10*50) takes: {total_Time} second(s)" ) | |
logger.debug( f"average_Time_Per_Merchant: {average_Time_Per_Merchant} second(s)" ) | |
return merchants_Pool | |
def top_Fifth_Stat( | |
merchants_Pool: List[ Merchant ] | |
### @forDeBugOnly: @removeIt in production | |
, deals_Per_Year_Min: int = 5 | |
, deals_Per_Year_Max: int = 10 | |
) -> None: | |
""" just for exploratory analisys | |
also for showing the winner | |
""" | |
merchants_Pool_Size = len( merchants_Pool ) | |
top_Fifth_Amount = int( merchants_Pool_Size / 100 * 20 ) | |
logger.debug( f"top_Fifth_Amount: {top_Fifth_Amount}" ) | |
# | |
merchants_Pool.sort( key = lambda m: m.profit ) | |
print( f"merchants_Pool list.size of {merchants_Pool_Size} sorted by profit:" ) | |
for ( i, merchant ) in enumerate( merchants_Pool ): | |
# d = { "a": [ 1, 2 ], "b": [ 3 ], "c": [ 4, 5, 6 ] } | |
# d.values() | |
#dict_values([[1, 2], [3], [4, 5, 6]]) | |
# tuple( d.values() )[-1] | |
#[4, 5, 6] | |
#>some_History = tuple( merchant.deals_History_Map.values() )[-1] | |
some_History = [] | |
some_Item_Info = None | |
total_Deals = 0 | |
if len( merchant.deals_History_Map ) > 0: | |
some_Item = tuple( merchant.deals_History_Map.items() )[-1] | |
some_History = some_Item[1] | |
some_History_Size = len( some_History ) | |
some_Item_Info = some_Item[0].info() | |
# d = { "a": [ True, False ], "b": [ False ], "c": [ True, True, False ] } | |
# sum( len(l) for l in d.values() ) | |
#6 | |
total_Deals = sum( len(l) for l in merchant.deals_History_Map.values() ) | |
### @Done?: @fixEd | |
### Note that: both parties making deals with each other | |
### so it might be twice as much | |
if 1 == 0: | |
assert deals_Per_Year_Min * 2 <= some_History_Size | |
assert some_History_Size <= deals_Per_Year_Max * 2, ( | |
f"for {merchant.info()} " | |
f"some_History_Size:{some_History_Size} " | |
f"deals with {some_Item[0].info()} " | |
f"expected to be <= {deals_Per_Year_Max}:deals_Per_Year_Max" | |
) | |
print( | |
( | |
"{:_<3}{}.profit:{} <- {}" | |
", total_Deals: {}" | |
", deals_History_Map.size: {}" | |
", some history({}) of {}" | |
).format( | |
i | |
, merchant.info() | |
, merchant.profit | |
, merchant.behaviour.__name__ | |
, total_Deals | |
, len( merchant.deals_History_Map ) | |
, len( some_History ) | |
#?#, some_History | |
#?#, some_Item[0].info() | |
, some_Item_Info | |
) | |
) | |
print( f"Top fifth ({top_Fifth_Amount}) of {merchants_Pool_Size}:" ) | |
for ( i, merchant ) in enumerate( merchants_Pool[-top_Fifth_Amount:] ): | |
print( | |
"{:_<3}{}.profit:{} <- {}".format( | |
i, merchant.info(), merchant.profit | |
, merchant.behaviour.__name__ | |
) | |
) | |
print( f"The winner strategy {merchants_Pool[-1].behaviour.__name__}:" ) | |
return top_Fifth_Amount | |
def is_Only_One_Type_Left( | |
merchants_List: List[ Merchant ] | |
) -> bool: | |
""" helper | |
to scan and count initial merchant behaviour | |
""" | |
if 1 == 0: | |
b_Sum = 0 | |
for merchant in merchants_List: | |
b_Sum += merchant.behaviour_i | |
return b_Sum % len( merchants_List ) == 0#True | |
if 1 == 0: | |
return sum( | |
merchant.behaviour_i for merchant in merchants_List | |
) % len( merchants_List ) == 0 | |
return len( | |
{ merchant.behaviour_i for merchant in merchants_List } | |
) == 1 | |
def update_Merchants_Pool( | |
# sorted ? | |
# will be mutated inside | |
merchants_List: List[ Merchant ] | |
# %-age = 20% | |
, fifth_Amount: int = 10 | |
, is_Use_Builder: bool = False | |
) -> List[ Merchant ]: | |
""" helper | |
replace first worth fifth with last best | |
and then | |
reset ( or recreate merchants with same behaviour_i ): | |
- profit to 0 | |
- behaviour in `previous_State` to 'behaviour_i' | |
- history to empty | |
for the rest of the list | |
""" | |
merchants_List.sort( key = lambda m: m.profit ) | |
# iterate over the best | |
#>>> l = list(range(9)) | |
#>>> l | |
#[0, 1, 2, 3, 4, 5, 6, 7, 8] | |
#>>> l[-3:] | |
#[6, 7, 8] | |
#>>> l[3:-3] | |
#[3, 4, 5] | |
for ( m_i, merchant ) in enumerate( merchants_List[-fifth_Amount:] ): | |
behaviour_i = merchant.behaviour_i | |
# create and insert new | |
if is_Use_Builder: | |
merchants_List[m_i] = merchant_Builder() | |
else: | |
merchants_List[m_i] = Merchant.with_Custom_Behavior( | |
Merchant.behaviours[ behaviour_i ] ) | |
# reset old | |
merchant.profit = 0 | |
merchant.deals_History_Map = dict() | |
merchant.previous_State = { "behaviour_i": behaviour_i } | |
# iterate over the rest of the list | |
for ( m_i, merchant ) in enumerate( merchants_List[fifth_Amount:-fifth_Amount] ): | |
# reset old | |
merchant.profit = 0 | |
merchant.deals_History_Map = dict() | |
merchant.previous_State = { "behaviour_i": merchant.behaviour_i } | |
return merchants_List | |
def get_Best( | |
# sorted ? | |
merchants_List: List[ Merchant ] | |
) -> ( int, int ): | |
""" helper | |
to get | determine best of the current round | |
""" | |
best_Score = 0 | |
best_Stratey_i = -1 | |
# iterate over the list | |
for merchant in merchants_List: | |
if best_Score < merchant.profit: | |
best_Score = merchant.profit | |
best_Stratey_i = merchant.behaviour_i | |
return ( best_Score, best_Stratey_i ) | |
### @Done?: implement `play_Game` | |
def play_Game( | |
rounds_Max: int = 100 | |
, is_Use_Builder: bool = False | |
#?#, merchants_Pool_Size: int = 50 | |
#?#, deals_Per_Year_Min: int = 5 | |
#?#, deals_Per_Year_Max: int = 10 | |
) -> List[ Merchant ]: | |
""" play game of rounds | |
until `rounds_Max` | |
or only one merchant type left | |
""" | |
import cProfile, pstats, io | |
# New in version 3.7: Added the SortKey enum. | |
#!#from pstats import SortKey | |
import time | |
# type : List[ Merchant ] | |
merchants_Pool = []#initialize_Tournament() | |
best_Score = 0 | |
best_Stratey_i = -1 | |
if 1 == 0: | |
# unrelated to actual goal, just for statistics | |
# Directly using the Profile class | |
# allows formatting profile results | |
# without writing the profile data to a file: | |
pr = cProfile.Profile( | |
#?#timer = time.monotonic | |
) | |
# start measuring profiling | |
pr.enable() | |
# ... do something ... | |
#with LoggingContext( logger, level = logging.INFO ): | |
# logger.debug( f"round: {round_i}, pool.size: {len( merchants_Pool_List )}" ) | |
for round_i in range( 0, rounds_Max, 1 ): | |
if round_i == 0: | |
merchants_Pool = initialize_Tournament() | |
#>assert type( merchants_Pool ) is list | |
#>assert len( merchants_Pool ) == 50 | |
else: | |
if is_Only_One_Type_Left( merchants_Pool ): | |
with LoggingContext( logger, level = logging.DEBUG ): | |
logger.debug( | |
f"Early return at round: {round_i}" | |
" because of is_Only_One_Type_Left in merchants_Pool is True" ) | |
break # for loop | |
### @Done: replace worth fifth with best | |
### @toDo: replace worth fifth with brand new | |
update_Merchants_Pool( merchants_Pool, is_Use_Builder ) | |
with LoggingContext( logger, level = logging.DEBUG ): | |
logger.debug( f"round: {round_i}, pool.size: {len( merchants_Pool )}" ) | |
logger.debug( f"best_Score: {best_Score}, best_Stratey_i: {best_Stratey_i}" ) | |
#else: | |
play_Round( merchants_Pool | |
#?#, merchants_Pool_Size, deals_Per_Year_Min, deals_Per_Year_Max | |
) | |
( running_Best_Score, running_best_Stratey_i ) = get_Best( merchants_Pool ) | |
if best_Score < running_Best_Score: | |
best_Score = running_Best_Score | |
best_Stratey_i = running_best_Stratey_i | |
with LoggingContext( logger, level = logging.DEBUG ): | |
logger.debug( | |
f"After {rounds_Max}" | |
f" round(s) best_Score: {best_Score}, best_Stratey_i: {best_Stratey_i}" ) | |
if 1 == 0: | |
# stop measuring profiling | |
pr.disable() | |
#dump_stats(filename) | |
# Write the results of the current profile to filename. | |
#>pr.dump_stats("stats.dmp") | |
s = io.StringIO() | |
#!#sortby = SortKey.CUMULATIVE | |
sortby = 'cumulative' | |
# class pstats.Stats(*filenames or profile, stream=sys.stdout) | |
ps = pstats.Stats( | |
pr, stream = s | |
).sort_stats( sortby ) | |
#print_stats(sort=-1) | |
# Create a Stats object | |
# based on the current profile | |
# and print the results to stdout. | |
ps.print_stats() | |
print( s.getvalue() ) | |
return merchants_Pool#None | |
def test( is_Run_Test = 1 == 1 ): | |
""" run unit tests in doc strings | |
""" | |
if is_Run_Test: | |
import doctest | |
doctest.testmod( | |
#>verbose = True | |
) | |
# | |
# | |
return None | |
# | |
### /// *** unit test *** /// ### | |
#$ python3.6 merchants.py | |
if __name__ == "__main__": | |
def run_If( | |
is_Run_Test: bool = 1 == 0 | |
, is_Run_Tests = 1 == 1 | |
): # -> Callable #!#-> None: | |
""" Decorator ( maker ) helper | |
to pass additional custom parameters | |
to the wrapped function | |
and itercept its own parameters | |
useful for conditional run | execution of unit tests | |
possible gain | benefit | |
wrapped function does not need actuall call | invocation | |
just | only declaration with decorator | |
""" | |
def empty_Func( *args, **kargs ): | |
pass | |
def test_Footer( test, description, *args, **kargs ): | |
""" | |
""" | |
test( *args, **kargs ) | |
logger.debug( "completed {:*^80}".format( | |
" " + test.__name__ + " " + description + " " ) ) | |
return None | |
def decorator( | |
test_To_Run# Callable | |
): | |
# (Fun fact: functools.wraps() is a decorator! ☺) | |
#?#@functools.wraps(func) | |
@wraps( test_To_Run ) | |
def wrapper( *w_args, **w_kw_args ): | |
""" to get ( access to ) `test_To_Run` parameters | |
Note: | |
except it does not sees defaults | |
only explicits | |
it must execute wrapped function | |
""" | |
import inspect | |
description = inspect.signature( test_To_Run ).parameters.get( "description" ) | |
description_Default = ".?" | |
if description is not None: | |
description_Default = "." + description.default | |
if is_Run_Test: | |
info = "running {:*^80}".format( | |
" " + test_To_Run.__name__ + " " + description_Default + " " ) | |
print( info ) | |
logger.debug( info ) | |
# executes test immediately | |
# either way | |
#>return test_To_Run() | |
#?#return test_To_Run | |
#>return test_To_Run( *w_args, **w_kw_args ) | |
return test_Footer( test_To_Run, description_Default, *w_args, **w_kw_args ) | |
else: | |
#>return lambda : print( f"skipping {test_To_Run.__name__}" ) | |
print( | |
f"skipping {test_To_Run.__name__}" + | |
#>"{defaults_Map.get( 'description' )}" | |
description_Default | |
) | |
return empty_Func( *w_args, **w_kw_args ) | |
# this suppress immediate `test_To_Run` execution | |
# so actuall wrapped function call | invocation needed | |
#>return wrapper | |
# this evaluates wrapped function right away | |
return wrapper() | |
if is_Run_Tests: | |
return decorator | |
else: | |
return empty_Func | |
@run_If( is_Run_Test = 1 == 0 ) | |
def test_1( description: str = "assert equal" ): | |
result = 10 | |
assert ( | |
result == 10 | |
), "{} expected to be equal to {}".format( result, 10 ) | |
@run_If( is_Run_Test = 1 == 0 ) | |
def test_12( description: str = "Merchant.inform" ): | |
m1 = Merchant.altruist() | |
m2 = Merchant.altruist() | |
#>print( "dir( m1 ):", dir( m1 ) ) | |
#>print( "m1.__class__:", m1.__class__ ) | |
#?#print( "dir( m1.__class__ ):", dir( m1.__class__ ) ) | |
#>print( "m1.__hash__:", m1.__hash__ ) | |
#>print( "dir( m1.__hash__ ):", dir( m1.__hash__ ) ) | |
m1.inform( | |
m2 | |
, Msg( | |
type = Msg_Type.request | |
, action = m1.behaviour( ) | |
) | |
) | |
m2.inform( | |
m1 | |
, Msg( | |
type = Msg_Type.response | |
, action = m2.behaviour( ) | |
) | |
) | |
@run_If( is_Run_Test = 1 == 0 ) | |
def test_13( description: str = "Merchant.with_Custom_Behavior" ): | |
print( f"len( Merchant.behaviours ): {len( Merchant.behaviours )}" ) | |
print( "Merchant.with_Custom_Behavior( altruist_Fun )" ) | |
m = Merchant.with_Custom_Behavior( altruist_Fun ) | |
print( m.behaviour ) | |
print( m.behaviour.__name__ ) | |
print( f"m.behaviour_i: {m.behaviour_i}" ) | |
print( f"m.behaviour(): {m.behaviour()}" ) | |
print( f"len( instance.behaviours ): {len( m.behaviours )}" ) | |
print( "Merchant.with_Custom_Behavior( lambda s: False )" ) | |
m = Merchant.with_Custom_Behavior( lambda s: False ) | |
print( f"len( Merchant.behaviours ): {len( Merchant.behaviours )}" ) | |
print( f"len( instance.behaviours ): {len( m.behaviours )}" ) | |
print( f"m.behaviour_i: {m.behaviour_i}" ) | |
print( m.behaviour( dict() ) ) | |
@run_If( is_Run_Test = 1 == 0 ) | |
def test_15( description: str = "Merchant.make_Deal" ): | |
with LoggingContext( | |
logger | |
#>, level=logging.INFO | |
, level = logging.DEBUG | |
): | |
m0 = Merchant.altruist() | |
m1 = Merchant.egoist() | |
m2 = Merchant.trickster() | |
m3 = Merchant.unpredictable() | |
m4 = Merchant.vindictive() | |
m5 = Merchant.conMan() | |
logger.debug( f"Merchant.behaviours: {Merchant.behaviours}" ) | |
assert len( m0.deals_History_Map ) == 0 | |
assert len( m0.previous_State[ "history" ] ) == 0 | |
m0.make_Deal( m1 ) | |
assert len( m1.deals_History_Map ) == 1 | |
assert len( m1.previous_State[ "history" ] ) == 1 | |
m1.make_Deal( m0 ) | |
assert len( m0.deals_History_Map ) == 1 | |
assert len( m0.previous_State[ "history" ] ) == 2 | |
assert len( m2.deals_History_Map ) == 0 | |
assert len( m2.previous_State[ "history" ] ) == 0 | |
assert len( m3.deals_History_Map ) == 0 | |
assert len( m3.previous_State[ "history" ] ) == 0 | |
m2.make_Deal( m3 ) | |
assert len( m2.deals_History_Map ) == 1 | |
assert len( m2.previous_State[ "history" ] ) == 1 | |
assert len( m3.deals_History_Map ) == 1 | |
assert len( m3.previous_State[ "history" ] ) == 1 | |
m3.make_Deal( m2 ) | |
assert len( m2.deals_History_Map ) == 1 | |
assert len( m2.previous_State[ "history" ] ) == 2 | |
assert len( m3.deals_History_Map ) == 1 | |
expected = 2 | |
actual = len( m3.previous_State[ "history" ] ) | |
assert actual == 2, f"{actual} != {expected}" | |
m2.make_Deal( m3 ) | |
assert len( m2.deals_History_Map ) == 1 | |
assert len( m2.previous_State[ "history" ] ) == 3 | |
assert len( m3.deals_History_Map ) == 1 | |
assert len( m3.previous_State[ "history" ] ) == 3 | |
# but less than `deals_Per_Year_Max` | |
for i in range( 0, 6, 1 ): | |
logger.debug( f"i:{i} m4.make_Deal( m5 )" ) | |
m4.make_Deal( m5 ) | |
assert len( m4.deals_History_Map ) == 1 | |
#>assert len( m4.previous_State[ "history" ] ) == 5 | |
expected = 6 | |
actual = len( m4.previous_State[ "history" ] ) | |
assert actual == expected, f"{m4.previous_State[ 'history' ]}{actual} != {expected}" | |
assert len( m5.deals_History_Map ) == 1 | |
assert len( m5.previous_State[ "history" ] ) == 6 | |
@run_If( is_Run_Test = 1 == 0 ) | |
def test_16( description: str = "Merchant.info" ): | |
with LoggingContext( logger, level=logging.DEBUG ): | |
m = Merchant.vindictive() | |
#>logger.setLevel( logging.INFO ) | |
logger.debug( f"m.info(): {m.info()}, id( m ): {id( m )}, m: {m}" ) | |
#>logger.setLevel( logging.DEBUG ) | |
@run_If( is_Run_Test = 1 == 0 ) | |
def test_17( description: str = "initialize_Tournament" ): | |
with LoggingContext( logger, level=logging.DEBUG ): | |
data_List = initialize_Tournament() | |
logger.debug( f"type(data_List[0]): {type(data_List[0])}" ) | |
print( | |
"initialize_Tournament():\n{}".format( pformat( | |
tuple( enumerate( | |
map( | |
lambda m: m.info() + " -> " + m.behaviours[ m.behaviour_i ].__name__, | |
data_List | |
) ) ) | |
) ) ) | |
@run_If( is_Run_Test = 1 == 0 ) | |
def test_18( description: str = "play_Round" ): | |
with LoggingContext( logger, level=logging.DEBUG ): | |
merchants_Pool_List = initialize_Tournament() | |
#>print( "play_Round(50):", play_Round( initialize_Tournament() ) ) | |
#>play_Round( initialize_Tournament(), 50, 5, 10 ) | |
# mutate or | and return | |
play_Round( merchants_Pool_List ) | |
top_Fifth_Amount = top_Fifth_Stat( merchants_Pool_List ) | |
@run_If( is_Run_Test = 1 == 1 ) | |
def test_19( description: str = "round_Limits_Stat" ): | |
with LoggingContext( logger, level=logging.DEBUG ): | |
round_Limits_Stat() | |
@run_If( is_Run_Test = 1 == 1 ) | |
def test_20( description: str = "play_Game" ): | |
with LoggingContext( logger, level=logging.INFO ): | |
merchants_Pool_List = play_Game( rounds_Max = 9 ) | |
assert type( merchants_Pool_List ) is list | |
assert len( merchants_Pool_List ) == 50 | |
top_Fifth_Amount = top_Fifth_Stat( merchants_Pool_List ) | |
@run_If( is_Run_Test = 1 == 0 ) | |
def test_21( description: str = "is_Only_One_Type_Left" ): | |
with LoggingContext( logger, level=logging.INFO ): | |
merchants_List = initialize_Tournament() | |
assert not is_Only_One_Type_Left( merchants_List ) | |
#for i in range( 0, len( merchants_List ), 1 ): | |
for ( i, merchant ) in enumerate( merchants_List ): | |
if merchant.behaviour_i != 0: | |
merchants_List[i] = Merchant.with_Custom_Behavior( Merchant.behaviours[ 0 ] ) | |
assert is_Only_One_Type_Left( merchants_List ) | |
@run_If( is_Run_Test = 1 == 0 ) | |
def test_22( description: str = "strategy_Builder" ): | |
with LoggingContext( | |
logger | |
#>, level=logging.INFO | |
, level = logging.DEBUG | |
): | |
brand_New_Strategy = strategy_Builder( | |
) | |
logger.debug( f"brand_New_Strategy: {brand_New_Strategy}" ) | |
logger.debug( f"type(brand_New_Strategy): {type(brand_New_Strategy)}" ) | |
logger.debug( | |
f"dir(brand_New_Strategy) filtered:" | |
f" {[ prop for prop in dir(brand_New_Strategy) if not prop.startswith( '__' ) ]}" ) | |
logger.debug( | |
f"brand_New_Strategy.get( 'history_Lookup_Window_Size' ):" | |
f" {brand_New_Strategy.history_Lookup_Window_Size}" ) | |
### @toDo: use getattr( instance, method_name ) | |
### to "extract" | access an object method | |
### e.g. to assign it to a variable for testing | |
### also works as key lookup in dictionary | |
### with fail-safe default if one provided | |
### @toDo: use a Context Manager ( decorator ) e.g. | |
### for enabling debug logs while testing | |
test() | |
#print( "50 * 50 = ", 50 * 50 ) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment