Module chillow.controller
Expand source code
from chillow.controller.online_controller import OnlineController
from chillow.controller.offline_controller import OfflineController
from chillow.controller.ai_evaluation_controller import AIEvaluationController
__all__ = ['OnlineController', 'OfflineController', 'AIEvaluationController']
Sub-modules
chillow.controller.ai_evaluation_controller
chillow.controller.controller
chillow.controller.offline_controller
chillow.controller.online_controller
Classes
class AIEvaluationController (runs: int, db_path: str, evaluation_type: int)
-
Executes multiple games after each other with randomly created games and players.
The result of every game and the execution time for each player in each round is saved in an SQLite database.
Creates a new AI evaluation controller.
Args
runs
- The number of games to be simulated.
db_path
- The path of the SQLite database file.
evaluation_type
- Defines which evaluation should be performed
Expand source code
class AIEvaluationController(OfflineController): """Executes multiple games after each other with randomly created games and players. The result of every game and the execution time for each player in each round is saved in an SQLite database.""" def __init__(self, runs: int, db_path: str, evaluation_type: int): """ Creates a new AI evaluation controller. Args: runs: The number of games to be simulated. db_path: The path of the SQLite database file. evaluation_type: Defines which evaluation should be performed """ super().__init__(HeadlessView()) self.__runs = runs self.__db_path = db_path if 1 <= evaluation_type <= 2: self.__evaluation_type = evaluation_type else: self.__evaluation_type = 1 self.__connection = None self.__cursor = None self.__current_game_id = None def play(self): """See base class.""" with closing(sqlite3.connect(self.__db_path)) as connection: with closing(connection.cursor()) as cursor: self.__connection = connection self.__cursor = cursor self.__create_db_tables() max_game_id = self.__cursor.execute("SELECT MAX(id) FROM games").fetchone()[0] if max_game_id is None: max_game_id = 0 self.__run_simulations(max_game_id) def _create_game(self) -> None: height = randint(30, 70) width = randint(30, 70) player_count = randint(3, 6) players = [] occupied_coordinates = [] for i in range(1, player_count + 1): next_coordinate = (randint(0, width - 1), randint(0, height - 1)) while next_coordinate in occupied_coordinates: next_coordinate = (randint(0, width - 1), randint(0, height - 1)) occupied_coordinates.append(next_coordinate) player = Player(i, next_coordinate[0], next_coordinate[1], Direction.get_random_direction(), 1, True, str(i)) players.append(player) cells = [[Cell() for _ in range(width)] for _ in range(height)] for player in players: cells[player.y][player.x] = Cell([player]) self._game = Game(width, height, cells, players, 1, True, datetime.now() + timedelta(5, 15)) self._game_round = 0 self._ais = [] if self.__evaluation_type == 1: self.__generate_ais_for_first_evaluation(player_count, players) elif self.__evaluation_type == 2: self.__generate_ais_for_second_evaluation(player_count, players) def __generate_ais_for_first_evaluation(self, player_count: int, players: List[Player]) -> None: self._ais.append(PathfindingAI(players[0], randint(1, 3), randint(1, 3) * 25)) self._ais.append(PathfindingSearchTreeAI(players[1], randint(1, 3), randint(1, 3) * 25, randint(2, 3), 0.75, randint(1, 3) * 10)) self._ais.append(SearchTreePathfindingAI(players[2], randint(1, 3), randint(1, 3) * 25, 2, randint(1, 3) * 10)) if player_count > 3: self._ais.append(SearchTreeAI(players[3], randint(1, 3), randint(2, 3), True, randint(1, 3) * 10)) if player_count > 4: self._ais.append(NotKillingItselfAI(players[4], [AIOptions.max_distance], randint(1, 3), 0, randint(1, 3))) if player_count > 5: self._ais.append(RandomAI(players[5], randint(1, 3))) def __generate_ais_for_second_evaluation(self, player_count: int, players: List[Player]) -> None: used_ai_indices = [] for i in range(player_count): ai_index = randint(0, len(best_ais_configurations) - 1) # Prevent that the same AI configuration is used in one game while ai_index in used_ai_indices: ai_index = randint(0, len(best_ais_configurations) - 1) used_ai_indices.append(ai_index) ai = best_ais_configurations[ai_index] self._ais.append(globals()[ai[0]](players[i], *ai[1])) def __run_simulations(self, max_game_id): for i in range(self.__runs): self.__current_game_id = i + 1 + max_game_id super().play() self.__cursor.execute("INSERT INTO games VALUES ({}, {}, {}, '{}', NULL)" .format(self.__current_game_id, self._game.width, self._game.height, datetime.now(timezone.utc))) winner_player = self._game.get_winner() for ai in self._ais: ai_class = ai.__class__.__name__ ai_info = ai.get_information() player_id = self.__get_player_id(ai_class, ai_info) # Save how often an AI configuration participated in a game self.__cursor.execute("INSERT INTO participants VALUES ({}, {})" .format(player_id, self.__current_game_id)) # Save how often an AI configuration won a game if ai.player == winner_player: self.__cursor.execute("UPDATE games SET winner_id = {} WHERE id = {}" .format(player_id, self.__current_game_id)) self.__connection.commit() def __create_db_tables(self): self.__cursor.execute("CREATE TABLE IF NOT EXISTS players (" "id INTEGER NOT NULL PRIMARY KEY," "class TEXT NOT NULL," "info TEXT)") self.__cursor.execute("CREATE TABLE IF NOT EXISTS games (" "id INTEGER NOT NULL PRIMARY KEY," "width INTEGER NOT NULL," "height INTEGER NOT NULL," "date TEXT NOT NULL," "winner_id INTEGER," "FOREIGN KEY (winner_id) REFERENCES players (id))") self.__cursor.execute("CREATE TABLE IF NOT EXISTS participants (" "player_id INTEGER NOT NULL," "game_id INTEGER NOT NULL," "PRIMARY KEY(player_id, game_id)," "FOREIGN KEY (player_id) REFERENCES players (id)," "FOREIGN KEY (game_id) REFERENCES games (id))") self.__cursor.execute("CREATE TABLE IF NOT EXISTS execution_times (" "player_id INTEGER NOT NULL," "game_id INTEGER NOT NULL," "game_round INTEGER NOT NULL," "execution REAL NOT NULL," "PRIMARY KEY(player_id, game_id, game_round)," "FOREIGN KEY (player_id) REFERENCES players (id)," "FOREIGN KEY (game_id) REFERENCES games (id))") def _log_execution_time(self, ai: ArtificialIntelligence, execution_time: float): ai_class = ai.__class__.__name__ ai_info = ai.get_information() player_id = self.__get_player_id(ai_class, ai_info) self.__cursor.execute("INSERT INTO execution_times VALUES ({}, {}, {}, {})" .format(player_id, self.__current_game_id, self._game_round, execution_time)) def __get_player_id(self, ai_class: str, ai_info: str) -> int: player_id = self.__cursor.execute( "SELECT MAX(id) FROM players p WHERE p.class = '{}' AND p.info = '{}'" .format(ai_class, ai_info)).fetchone()[0] if player_id is None: max_player_id = self.__cursor.execute("SELECT MAX(id) FROM players").fetchone()[0] if max_player_id is None: max_player_id = 0 player_id = max_player_id + 1 self.__cursor.execute("INSERT INTO players VALUES ({}, '{}', '{}')".format(player_id, ai_class, ai_info)) return player_id
Ancestors
Inherited members
class OfflineController (view: View)
-
Connects the services to execute the game logic and controls an UI.
Creates a new offline controller.
Args
view
- The UI that should be used.
Expand source code
class OfflineController(Controller): def __init__(self, view: View): """Creates a new offline controller. Args: view: The UI that should be used. """ super().__init__(view) self.__you = None self._game = None self._game_round = 0 self._ais = [] def play(self): """See base class.""" self._create_game() game_service = GameService(self._game, ignore_deadline=False) self._view.update(self._game) while self._game.running: self._game_round += 1 time_to_react = randint(4, 16) self.__reset_game_deadline(time_to_react) # Read input from user if there is a human player player_action = None if self.__you is not None and self.__you.active: player_action = self._view.read_next_action() if datetime.now(time_zone) > self._game.deadline: player_action = Action.get_default() self.__reset_game_deadline(time_to_react) for ai in self._ais: if ai is not None and ai.player.active: action = self.__choose_ai_action(ai, time_to_react) game_service.do_action(ai.player, action) self.__reset_game_deadline(time_to_react) # Perform action of human player after AIs finished their calculations # Otherwise the AIs would already know the players move if self.__you is not None and player_action is not None: game_service.do_action(self.__you, player_action) self._view.update(self._game) def _create_game(self) -> None: player1 = Player(1, 5, 5, Direction.down, 1, True, "Human Player 1") player2 = Player(2, 25, 5, Direction.down, 1, True, "AI Player 1") player3 = Player(3, 5, 15, Direction.up, 1, True, "AI Player 2") player4 = Player(4, 25, 15, Direction.up, 1, True, "AI Player 4") players = [player1, player2, player3, player4] height = 20 width = 30 cells = [[Cell() for _ in range(width)] for _ in range(height)] cells[player1.y][player1.x] = Cell([player1]) cells[player2.y][player2.x] = Cell([player2]) cells[player3.y][player3.x] = Cell([player3]) cells[player4.y][player4.x] = Cell([player4]) self._game = Game(width, height, cells, players, 1, True, datetime.now(time_zone)) self._game_round = 0 self.__you = None ai0 = ai_classes.RandomAI(player1) # Make a comment out of the next two lines if you do not want to play on your own. self.__you = player1 ai0 = None ai1 = ai_classes.PathfindingAI(player2, 2, 75) ai2 = ai_classes.NotKillingItselfAI(player3, [ai_classes.AIOptions.max_distance], 1, 0, 3) ai3 = ai_classes.SearchTreeAI(player4, 2, 1, True, 10) self._ais = [ai0, ai1, ai2, ai3] def _log_execution_time(self, ai: ArtificialIntelligence, execution_time: float): pass def __reset_game_deadline(self, time_to_react: int): self._game.deadline = datetime.now(time_zone) + timedelta(0, time_to_react) def __choose_ai_action(self, ai: ArtificialIntelligence, time_to_react: int) -> Action: return_value = multiprocessing.Value('i', Action.get_default().get_index()) process = multiprocessing.Process(target=Controller.call_ai, args=(ai, self._game.copy(), return_value,)) start = datetime.now(time_zone) process.start() process.join(time_to_react - 1) # wait time_to_react seconds minus one for the calculation to be finished if process.is_alive(): # If an execution is terminated, the execution time is set to 1 minute. start = datetime.now(time_zone) - timedelta(seconds=60) process.terminate() self._log_execution_time(ai, (datetime.now(time_zone) - start).total_seconds()) return Action.get_by_index(return_value.value)
Ancestors
Subclasses
Methods
def play(self)
-
See base class.
Expand source code
def play(self): """See base class.""" self._create_game() game_service = GameService(self._game, ignore_deadline=False) self._view.update(self._game) while self._game.running: self._game_round += 1 time_to_react = randint(4, 16) self.__reset_game_deadline(time_to_react) # Read input from user if there is a human player player_action = None if self.__you is not None and self.__you.active: player_action = self._view.read_next_action() if datetime.now(time_zone) > self._game.deadline: player_action = Action.get_default() self.__reset_game_deadline(time_to_react) for ai in self._ais: if ai is not None and ai.player.active: action = self.__choose_ai_action(ai, time_to_react) game_service.do_action(ai.player, action) self.__reset_game_deadline(time_to_react) # Perform action of human player after AIs finished their calculations # Otherwise the AIs would already know the players move if self.__you is not None and player_action is not None: game_service.do_action(self.__you, player_action) self._view.update(self._game)
class OnlineController (view: View, url: str, key: str, server_time_url: str, data_loader: DataLoader, data_writer: DataWriter, ai_class: str, ai_params)
-
Connects the services to execute the game logic and controls an UI.
Creates a new online controller.
Args
view
- The UI that should be used.
url
- The URL of the spe_ed server.
key
- The API key.
server_time_url
- The URL to request the current time of the server.
data_loader
- Object to load data.
data_writer
- Object to write data.
ai_class
- The name of the AI class to be used.
ai_params
- The parameters of the AI.
Expand source code
class OnlineController(Controller): def __init__(self, view: View, url: str, key: str, server_time_url: str, data_loader: DataLoader, data_writer: DataWriter, ai_class: str, ai_params): """Creates a new online controller. Args: view: The UI that should be used. url: The URL of the spe_ed server. key: The API key. server_time_url: The URL to request the current time of the server. data_loader: Object to load data. data_writer: Object to write data. ai_class: The name of the AI class to be used. ai_params: The parameters of the AI. """ super().__init__(view) self.__url = url self.__key = key self.__server_time_url = server_time_url self.__data_loader = data_loader self.__data_writer = data_writer self.__ai = None self.__default_ai = None self.__ai_class = ai_class self.__ai_params = ai_params def play(self): """See base class.""" asyncio.get_event_loop().run_until_complete(self.__play()) self._view.end() self.__ai = None self.__default_ai = None async def __play(self): async with websockets.connect(f"{self.__url}?key={self.__key}") as websocket: while True: game_data = await websocket.recv() game = self.__data_loader.load(game_data) self._view.update(game) if not game.running: break try: time_data = requests.get(self.__server_time_url).text server_time = self.__data_loader.read_server_time(time_data) except (RequestException, ValueError): server_time = datetime.now(timezone.utc) own_time = datetime.now(server_time.tzinfo) game.normalize_deadline(server_time, own_time) if self.__ai is None: self.__ai = globals()[self.__ai_class](game.you, *self.__ai_params) self.__default_ai = NotKillingItselfAI(game.you, [AIOptions.max_distance], 1, 0, 3) # noqa: F405 if game.you.active: action = self.__choose_action(game, server_time.tzinfo) data_out = self.__data_writer.write(action) await websocket.send(data_out) def __choose_action(self, game: Game, time_zone: datetime.tzinfo) -> Action: return_value = multiprocessing.Value('i') self.__default_ai.create_next_action(game, return_value) own_time = datetime.now(time_zone) seconds_for_calculation = (game.deadline - own_time).seconds process = multiprocessing.Process(target=Controller.call_ai, args=(self.__ai, game, return_value,)) process.start() process.join(seconds_for_calculation - 1) if process.is_alive(): process.terminate() return Action.get_by_index(return_value.value)
Ancestors
Methods
def play(self)
-
See base class.
Expand source code
def play(self): """See base class.""" asyncio.get_event_loop().run_until_complete(self.__play()) self._view.end() self.__ai = None self.__default_ai = None