Module chillow.controller

Expand source code
from chillow.controller.online_controller import OnlineController
from chillow.controller.offline_controller import OfflineController
from chillow.controller.ai_evaluation_controller import AIEvaluationController

__all__ = ['OnlineController', 'OfflineController', 'AIEvaluationController']

Sub-modules

chillow.controller.ai_evaluation_controller
chillow.controller.controller
chillow.controller.offline_controller
chillow.controller.online_controller

Classes

class AIEvaluationController (runs: int, db_path: str, evaluation_type: int)

Executes multiple games after each other with randomly created games and players.

The result of every game and the execution time for each player in each round is saved in an SQLite database.

Creates a new AI evaluation controller.

Args

runs
The number of games to be simulated.
db_path
The path of the SQLite database file.
evaluation_type
Defines which evaluation should be performed
Expand source code
class AIEvaluationController(OfflineController):
    """Executes multiple games after each other with randomly created games and players.

    The result of every game and the execution time for each player in each round is saved in an SQLite database."""

    def __init__(self, runs: int, db_path: str, evaluation_type: int):
        """ Creates a new AI evaluation controller.

        Args:
            runs: The number of games to be simulated.
            db_path: The path of the SQLite database file.
            evaluation_type: Defines which evaluation should be performed
        """
        super().__init__(HeadlessView())
        self.__runs = runs
        self.__db_path = db_path
        if 1 <= evaluation_type <= 2:
            self.__evaluation_type = evaluation_type
        else:
            self.__evaluation_type = 1
        self.__connection = None
        self.__cursor = None
        self.__current_game_id = None

    def play(self):
        """See base class."""
        with closing(sqlite3.connect(self.__db_path)) as connection:
            with closing(connection.cursor()) as cursor:
                self.__connection = connection
                self.__cursor = cursor

                self.__create_db_tables()

                max_game_id = self.__cursor.execute("SELECT MAX(id) FROM games").fetchone()[0]
                if max_game_id is None:
                    max_game_id = 0

                self.__run_simulations(max_game_id)

    def _create_game(self) -> None:
        height = randint(30, 70)
        width = randint(30, 70)

        player_count = randint(3, 6)
        players = []
        occupied_coordinates = []
        for i in range(1, player_count + 1):
            next_coordinate = (randint(0, width - 1), randint(0, height - 1))
            while next_coordinate in occupied_coordinates:
                next_coordinate = (randint(0, width - 1), randint(0, height - 1))
            occupied_coordinates.append(next_coordinate)
            player = Player(i, next_coordinate[0], next_coordinate[1], Direction.get_random_direction(), 1, True,
                            str(i))
            players.append(player)

        cells = [[Cell() for _ in range(width)] for _ in range(height)]
        for player in players:
            cells[player.y][player.x] = Cell([player])

        self._game = Game(width, height, cells, players, 1, True, datetime.now() + timedelta(5, 15))
        self._game_round = 0

        self._ais = []

        if self.__evaluation_type == 1:
            self.__generate_ais_for_first_evaluation(player_count, players)
        elif self.__evaluation_type == 2:
            self.__generate_ais_for_second_evaluation(player_count, players)

    def __generate_ais_for_first_evaluation(self, player_count: int, players: List[Player]) -> None:
        self._ais.append(PathfindingAI(players[0], randint(1, 3), randint(1, 3) * 25))
        self._ais.append(PathfindingSearchTreeAI(players[1], randint(1, 3), randint(1, 3) * 25, randint(2, 3), 0.75,
                                                 randint(1, 3) * 10))
        self._ais.append(SearchTreePathfindingAI(players[2], randint(1, 3), randint(1, 3) * 25, 2,
                                                 randint(1, 3) * 10))
        if player_count > 3:
            self._ais.append(SearchTreeAI(players[3], randint(1, 3), randint(2, 3), True, randint(1, 3) * 10))
            if player_count > 4:
                self._ais.append(NotKillingItselfAI(players[4], [AIOptions.max_distance], randint(1, 3), 0,
                                                    randint(1, 3)))
                if player_count > 5:
                    self._ais.append(RandomAI(players[5], randint(1, 3)))

    def __generate_ais_for_second_evaluation(self, player_count: int, players: List[Player]) -> None:
        used_ai_indices = []
        for i in range(player_count):
            ai_index = randint(0, len(best_ais_configurations) - 1)
            # Prevent that the same AI configuration is used in one game
            while ai_index in used_ai_indices:
                ai_index = randint(0, len(best_ais_configurations) - 1)

            used_ai_indices.append(ai_index)
            ai = best_ais_configurations[ai_index]
            self._ais.append(globals()[ai[0]](players[i], *ai[1]))

    def __run_simulations(self, max_game_id):
        for i in range(self.__runs):
            self.__current_game_id = i + 1 + max_game_id
            super().play()

            self.__cursor.execute("INSERT INTO games VALUES ({}, {}, {}, '{}', NULL)"
                                  .format(self.__current_game_id, self._game.width, self._game.height,
                                          datetime.now(timezone.utc)))

            winner_player = self._game.get_winner()
            for ai in self._ais:
                ai_class = ai.__class__.__name__
                ai_info = ai.get_information()
                player_id = self.__get_player_id(ai_class, ai_info)

                # Save how often an AI configuration participated in a game
                self.__cursor.execute("INSERT INTO participants VALUES ({}, {})"
                                      .format(player_id, self.__current_game_id))

                # Save how often an AI configuration won a game
                if ai.player == winner_player:
                    self.__cursor.execute("UPDATE games SET winner_id = {} WHERE id = {}"
                                          .format(player_id, self.__current_game_id))

            self.__connection.commit()

    def __create_db_tables(self):
        self.__cursor.execute("CREATE TABLE IF NOT EXISTS players ("
                              "id INTEGER NOT NULL PRIMARY KEY,"
                              "class TEXT NOT NULL,"
                              "info TEXT)")
        self.__cursor.execute("CREATE TABLE IF NOT EXISTS games ("
                              "id INTEGER NOT NULL PRIMARY KEY,"
                              "width INTEGER NOT NULL,"
                              "height INTEGER NOT NULL,"
                              "date TEXT NOT NULL,"
                              "winner_id INTEGER,"
                              "FOREIGN KEY (winner_id) REFERENCES players (id))")
        self.__cursor.execute("CREATE TABLE IF NOT EXISTS participants ("
                              "player_id INTEGER NOT NULL,"
                              "game_id INTEGER NOT NULL,"
                              "PRIMARY KEY(player_id, game_id),"
                              "FOREIGN KEY (player_id) REFERENCES players (id),"
                              "FOREIGN KEY (game_id) REFERENCES games (id))")
        self.__cursor.execute("CREATE TABLE IF NOT EXISTS execution_times ("
                              "player_id INTEGER NOT NULL,"
                              "game_id INTEGER NOT NULL,"
                              "game_round INTEGER NOT NULL,"
                              "execution REAL NOT NULL,"
                              "PRIMARY KEY(player_id, game_id, game_round),"
                              "FOREIGN KEY (player_id) REFERENCES players (id),"
                              "FOREIGN KEY (game_id) REFERENCES games (id))")

    def _log_execution_time(self, ai: ArtificialIntelligence, execution_time: float):
        ai_class = ai.__class__.__name__
        ai_info = ai.get_information()
        player_id = self.__get_player_id(ai_class, ai_info)

        self.__cursor.execute("INSERT INTO execution_times VALUES ({}, {}, {}, {})"
                              .format(player_id, self.__current_game_id, self._game_round, execution_time))

    def __get_player_id(self, ai_class: str, ai_info: str) -> int:
        player_id = self.__cursor.execute(
            "SELECT MAX(id) FROM players p WHERE p.class = '{}' AND p.info = '{}'"
            .format(ai_class, ai_info)).fetchone()[0]

        if player_id is None:
            max_player_id = self.__cursor.execute("SELECT MAX(id) FROM players").fetchone()[0]
            if max_player_id is None:
                max_player_id = 0
            player_id = max_player_id + 1
            self.__cursor.execute("INSERT INTO players VALUES ({}, '{}', '{}')".format(player_id, ai_class, ai_info))

        return player_id

Ancestors

Inherited members

class OfflineController (view: View)

Connects the services to execute the game logic and controls an UI.

Creates a new offline controller.

Args

view
The UI that should be used.
Expand source code
class OfflineController(Controller):

    def __init__(self, view: View):
        """Creates a new offline controller.

        Args:
            view: The UI that should be used.
        """
        super().__init__(view)
        self.__you = None
        self._game = None
        self._game_round = 0
        self._ais = []

    def play(self):
        """See base class."""
        self._create_game()
        game_service = GameService(self._game, ignore_deadline=False)

        self._view.update(self._game)

        while self._game.running:
            self._game_round += 1
            time_to_react = randint(4, 16)
            self.__reset_game_deadline(time_to_react)

            # Read input from user if there is a human player
            player_action = None
            if self.__you is not None and self.__you.active:
                player_action = self._view.read_next_action()
                if datetime.now(time_zone) > self._game.deadline:
                    player_action = Action.get_default()
                self.__reset_game_deadline(time_to_react)

            for ai in self._ais:
                if ai is not None and ai.player.active:
                    action = self.__choose_ai_action(ai, time_to_react)
                    game_service.do_action(ai.player, action)
                    self.__reset_game_deadline(time_to_react)

            # Perform action of human player after AIs finished their calculations
            # Otherwise the AIs would already know the players move
            if self.__you is not None and player_action is not None:
                game_service.do_action(self.__you, player_action)

            self._view.update(self._game)

    def _create_game(self) -> None:
        player1 = Player(1, 5, 5, Direction.down, 1, True, "Human Player 1")
        player2 = Player(2, 25, 5, Direction.down, 1, True, "AI Player 1")
        player3 = Player(3, 5, 15, Direction.up, 1, True, "AI Player 2")
        player4 = Player(4, 25, 15, Direction.up, 1, True, "AI Player 4")
        players = [player1, player2, player3, player4]
        height = 20
        width = 30
        cells = [[Cell() for _ in range(width)] for _ in range(height)]
        cells[player1.y][player1.x] = Cell([player1])
        cells[player2.y][player2.x] = Cell([player2])
        cells[player3.y][player3.x] = Cell([player3])
        cells[player4.y][player4.x] = Cell([player4])

        self._game = Game(width, height, cells, players, 1, True, datetime.now(time_zone))
        self._game_round = 0

        self.__you = None
        ai0 = ai_classes.RandomAI(player1)
        # Make a comment out of the next two lines if you do not want to play on your own.
        self.__you = player1
        ai0 = None

        ai1 = ai_classes.PathfindingAI(player2, 2, 75)
        ai2 = ai_classes.NotKillingItselfAI(player3, [ai_classes.AIOptions.max_distance], 1, 0, 3)
        ai3 = ai_classes.SearchTreeAI(player4, 2, 1, True, 10)

        self._ais = [ai0, ai1, ai2, ai3]

    def _log_execution_time(self, ai: ArtificialIntelligence, execution_time: float):
        pass

    def __reset_game_deadline(self, time_to_react: int):
        self._game.deadline = datetime.now(time_zone) + timedelta(0, time_to_react)

    def __choose_ai_action(self, ai: ArtificialIntelligence, time_to_react: int) -> Action:
        return_value = multiprocessing.Value('i', Action.get_default().get_index())

        process = multiprocessing.Process(target=Controller.call_ai, args=(ai, self._game.copy(), return_value,))
        start = datetime.now(time_zone)
        process.start()
        process.join(time_to_react - 1)  # wait time_to_react seconds minus one for the calculation to be finished

        if process.is_alive():
            # If an execution is terminated, the execution time is set to 1 minute.
            start = datetime.now(time_zone) - timedelta(seconds=60)
            process.terminate()

        self._log_execution_time(ai, (datetime.now(time_zone) - start).total_seconds())
        return Action.get_by_index(return_value.value)

Ancestors

Subclasses

Methods

def play(self)

See base class.

Expand source code
def play(self):
    """See base class."""
    self._create_game()
    game_service = GameService(self._game, ignore_deadline=False)

    self._view.update(self._game)

    while self._game.running:
        self._game_round += 1
        time_to_react = randint(4, 16)
        self.__reset_game_deadline(time_to_react)

        # Read input from user if there is a human player
        player_action = None
        if self.__you is not None and self.__you.active:
            player_action = self._view.read_next_action()
            if datetime.now(time_zone) > self._game.deadline:
                player_action = Action.get_default()
            self.__reset_game_deadline(time_to_react)

        for ai in self._ais:
            if ai is not None and ai.player.active:
                action = self.__choose_ai_action(ai, time_to_react)
                game_service.do_action(ai.player, action)
                self.__reset_game_deadline(time_to_react)

        # Perform action of human player after AIs finished their calculations
        # Otherwise the AIs would already know the players move
        if self.__you is not None and player_action is not None:
            game_service.do_action(self.__you, player_action)

        self._view.update(self._game)
class OnlineController (view: View, url: str, key: str, server_time_url: str, data_loader: DataLoader, data_writer: DataWriter, ai_class: str, ai_params)

Connects the services to execute the game logic and controls an UI.

Creates a new online controller.

Args

view
The UI that should be used.
url
The URL of the spe_ed server.
key
The API key.
server_time_url
The URL to request the current time of the server.
data_loader
Object to load data.
data_writer
Object to write data.
ai_class
The name of the AI class to be used.
ai_params
The parameters of the AI.
Expand source code
class OnlineController(Controller):

    def __init__(self, view: View, url: str, key: str, server_time_url: str, data_loader: DataLoader,
                 data_writer: DataWriter, ai_class: str, ai_params):
        """Creates a new online controller.

        Args:
            view: The UI that should be used.
            url: The URL of the spe_ed server.
            key: The API key.
            server_time_url: The URL to request the current time of the server.
            data_loader: Object to load data.
            data_writer: Object to write data.
            ai_class: The name of the AI class to be used.
            ai_params: The parameters of the AI.
        """

        super().__init__(view)
        self.__url = url
        self.__key = key
        self.__server_time_url = server_time_url
        self.__data_loader = data_loader
        self.__data_writer = data_writer
        self.__ai = None
        self.__default_ai = None
        self.__ai_class = ai_class
        self.__ai_params = ai_params

    def play(self):
        """See base class."""
        asyncio.get_event_loop().run_until_complete(self.__play())
        self._view.end()
        self.__ai = None
        self.__default_ai = None

    async def __play(self):
        async with websockets.connect(f"{self.__url}?key={self.__key}") as websocket:
            while True:
                game_data = await websocket.recv()
                game = self.__data_loader.load(game_data)

                self._view.update(game)

                if not game.running:
                    break

                try:
                    time_data = requests.get(self.__server_time_url).text
                    server_time = self.__data_loader.read_server_time(time_data)
                except (RequestException, ValueError):
                    server_time = datetime.now(timezone.utc)
                own_time = datetime.now(server_time.tzinfo)
                game.normalize_deadline(server_time, own_time)

                if self.__ai is None:
                    self.__ai = globals()[self.__ai_class](game.you, *self.__ai_params)
                    self.__default_ai = NotKillingItselfAI(game.you, [AIOptions.max_distance], 1, 0, 3)  # noqa: F405

                if game.you.active:
                    action = self.__choose_action(game, server_time.tzinfo)
                    data_out = self.__data_writer.write(action)
                    await websocket.send(data_out)

    def __choose_action(self, game: Game, time_zone: datetime.tzinfo) -> Action:
        return_value = multiprocessing.Value('i')
        self.__default_ai.create_next_action(game, return_value)

        own_time = datetime.now(time_zone)
        seconds_for_calculation = (game.deadline - own_time).seconds

        process = multiprocessing.Process(target=Controller.call_ai, args=(self.__ai, game, return_value,))
        process.start()
        process.join(seconds_for_calculation - 1)

        if process.is_alive():
            process.terminate()

        return Action.get_by_index(return_value.value)

Ancestors

Methods

def play(self)

See base class.

Expand source code
def play(self):
    """See base class."""
    asyncio.get_event_loop().run_until_complete(self.__play())
    self._view.end()
    self.__ai = None
    self.__default_ai = None