Last active
March 15, 2020 05:42
-
-
Save jskDr/e4b8a64aebafe822e70043d13d9304a1 to your computer and use it in GitHub Desktop.
Revisions
-
jskDr renamed this gist
Mar 15, 2020 . 1 changed file with 52 additions and 1 deletion.There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -228,6 +228,57 @@ def play(self, P_no): S = np.zeros((N_A,),dtype='int16') # #state == #action if self.disp_flag: print('S:', S) done = False while done == False: action, done = self.get_action(P_no, S) Buff['P_no'].append(P_no) Buff['S'].append(S.copy()) Buff['a'].append(action) set_state_inplace(S, action, P_no) Buff['S_next'].append(S.copy()) if self.disp_flag: print('S:', S) win_player = calc_reward_tf(S) reward = 0 if win_player == 0 else 1 Buff['r'].append(reward) P_no = 1 if P_no == 2 else 2 if win_player: done = True if self.disp_flag: if win_player: print(f'player {win_player} win') else: print(f'Tie game') return Buff def play_with_random(self, P_no): """ Buff = play(self, P_no) [Inputs] P_no: player number, which is 1 or 2 [Returns] Buff = {'P_no': [], 'S':[], 'a':[], 'r':[], 'S_next': []}: gathered information during learning where S, a, r, S_next are state, action, rewrd, and next state [Examples] 1. Buff = self.play(1) 2. Buff = self.play(2) """ N_A = self.N_A Buff = {'P_no': [], 'S':[], 'a':[], 'r':[], 'S_next': []} S = np.zeros((N_A,),dtype='int16') # #state == #action if self.disp_flag: print('S:', S) @@ -561,7 +612,7 @@ def plot_cnt_trace(cnt_trace): plt.ylabel('Count') plt.legend(loc=0) plt.title('Learned (P#1) vs. Random (P#2) policies during learning') plt.show() def learning_stage(N_episodes=100, save_flag=True, fig_flag=False): -
jskDr renamed this gist
Mar 15, 2020 . 1 changed file with 0 additions and 0 deletions.There are no files selected for viewing
File renamed without changes. -
jskDr created this gist
Mar 15, 2020 .There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -0,0 +1,622 @@ import tensorflow as tf import numpy as np import matplotlib.pyplot as plt import pickle # TicTacToe game has nine stateus with nine actions. An user can put his ston on any postion in the borad except def set_state_inplace(S, action, P_no): ''' S is numpy array.''' assert S[action] == 0, 'position should be empty to put a new stone' S[action] = P_no # User numpy to insert action in the specific position def calc_reward(S): mask_l = tf.constant([[1,1,1,0,0,0,0,0,0], [0,0,0,1,1,1,0,0,0], [0,0,0,0,0,0,1,1,1], [1,0,0,1,0,0,1,0,0], [0,1,0,0,1,0,0,1,0], [0,0,1,0,0,1,0,0,1], [1,0,0,0,1,0,0,0,1], [0,0,1,0,1,0,1,0,0]], dtype=tf.int16) for mask in mask_l: # print(mask) mask_S = mask * S # print(mask_S) for player in [1,2]: abs_err = tf.reduce_sum(tf.abs(mask_S - player * mask)) # print(abs_err) if abs_err == 0: # print(f'Player{player} wins') return player return 0 def calc_reward_tf(S): mask_l = tf.constant([[1,1,1,0,0,0,0,0,0], [0,0,0,1,1,1,0,0,0], [0,0,0,0,0,0,1,1,1], [1,0,0,1,0,0,1,0,0], [0,1,0,0,1,0,0,1,0], [0,0,1,0,0,1,0,0,1], [1,0,0,0,1,0,0,0,1], [0,0,1,0,1,0,1,0,0]], dtype=tf.int32) S = tf.constant(S, dtype=tf.int32) S = tf.reshape(S, shape=(1,-1)) S_cp = tf.matmul(tf.ones((mask_l.shape[0],1),dtype=tf.int32), S) mask_S = mask_l * S_cp for player in [1, 2]: if tf.reduce_any(tf.reduce_sum(tf.abs(mask_S - player * mask_l),axis=1) == 0): return player return 0 def one_of_amax(arr, disp_flag=False): results = np.where(arr == np.amax(arr))[0] if disp_flag: print('Equally max actions:', results) action = results[np.random.randint(0, len(results), 1)[0]] return action class Q_System: def __init__(self, N_A=9, N_Symbols=3, epsilon=0.01, disp_flag=False): """ N_A : Number of actions N_Symbols : Number of possible symbols in each point: 0, 1, 2, representing empty, player1, player2 N_S : Number of states """ if N_A is not None: self.disp_flag = disp_flag N_S = N_Symbols**N_A self.Qsa = [np.zeros((N_S,N_A)), np.zeros((N_S,N_A))] self.N_A = N_A self.N_Symbols = N_Symbols self.epsilon = epsilon else: self.disp_flag = False self.Qsa = None self.N_A = None self.N_Symbols = None self.epsilon = None def save(self): f = open('tictactoe_data.pckl', 'wb') obj = [self.N_A, self.N_Symbols, self.epsilon, self.Qsa] pickle.dump(obj, f) f.close() def load(self): f = open('tictactoe_data.pckl', 'rb') obj = pickle.load(f) [self.N_A, self.N_Symbols, self.epsilon, self.Qsa] = obj f.close() def calc_S_idx(self, S): S_idx = 0 unit = 1 for s in S: S_idx += s*unit unit *= self.N_Symbols return S_idx def _policy_random(self, S, a): return 1 / self.N_A def policy_random(self, P_no, S, action_list): action_prob = [] S_idx = self.calc_S_idx(S) for _ in action_list: action_prob.append(1/len(action_list)) action_idx = tf.squeeze(tf.random.categorical(tf.math.log([action_prob]),1)).numpy() if action_idx == len(action_prob): # if all zeros in actoin_prob action = action_list[tf.squeeze(np.random.randint(0, len(action_list), 1))] else: action = action_list[action_idx] if self.disp_flag: print('S_idx', S_idx, 'action', action, 'action_list', action_list, 'action_prob', action_prob) return action def policy(self, P_no, S, action_list): action_prob = [] S_idx = self.calc_S_idx(S) for a in action_list: action_prob.append(self.Qsa[P_no-1][S_idx,a]) # We consider max Q with epsilon greedy if tf.squeeze(tf.random.uniform([1,1])) > self.epsilon: action = action_list[one_of_amax(action_prob)] else: action = action_list[np.random.randint(0,len(action_list),1)[0]] if self.disp_flag: print('S_idx', S_idx, 'action', action, 'action_list', action_list, 'action_prob', action_prob) return action def _r0_policy(self, P_no, S, action_list): action_prob = [] S_idx = self.calc_S_idx(S) for a in action_list: action_prob.append(self.Qsa[P_no-1][S_idx, a]) action_idx = tf.squeeze(tf.random.categorical(tf.math.log([action_prob]),1)).numpy() if action_idx == len(action_prob): # if all zeros in actoin_prob action = action_list[tf.squeeze(np.random.randint(0, len(action_list), 1))] else: action = action_list[action_idx] if self.disp_flag: print('S_idx', S_idx, 'action', action, 'action_list', action_list, 'action_prob', action_prob) return action def find_action_list(self, S): action_list = [] no_occupied = 0 for a in range(self.N_A): if S[a] == 0: action_list.append(a) else: no_occupied += 1 return action_list, no_occupied # Take action_prob at the given state def get_action(self, P_no, S): """Return action, done """ action_list, no_occupied = self.find_action_list(S) # Since number of possible actions are reduced, # denominator is also updated. action = self.policy(P_no, S, action_list) done = no_occupied == (self.N_A - 1) return action, done def get_action_with_random(self, P_no, S): """Return action, done """ action_list, no_occupied = self.find_action_list(S) # Since number of possible actions are reduced, # denominator is also updated. if P_no == 1: action = self.policy(P_no, S, action_list) else: action = self.policy_random(P_no, S, action_list) done = no_occupied == (self.N_A - 1) return action, done def get_action_with_human(self, P_no, S): """ action, done = get_action_with_human(self, P_no, S) - Playing with human [Inputs] P_no : Human player index which represents 1=fist, 2=second playing """ action_list, no_occupied = self.find_action_list(S) # Since number of possible actions are reduced, # denominator is also updated. print('The current game state is:') print(S.reshape(3,3)) print('Action index:') print(np.array(range(9)).reshape(3,3)) print('Avaiable actions: ', action_list) rand_idx = np.random.randint(0, len(action_list)) random_action = action_list[int(rand_idx)] action = None while action not in action_list: action = input_default(f'Type your action (default={random_action}): ', random_action, int) if action not in action_list: print('Type action again among in the avaible action list:', action_list) done = no_occupied == (self.N_A - 1) return action, done def play(self, P_no): """ Buff = play(self, P_no) [Inputs] P_no: player number, which is 1 or 2 [Returns] Buff = {'P_no': [], 'S':[], 'a':[], 'r':[], 'S_next': []}: gathered information during learning where S, a, r, S_next are state, action, rewrd, and next state [Examples] 1. Buff = self.play(1) 2. Buff = self.play(2) """ N_A = self.N_A Buff = {'P_no': [], 'S':[], 'a':[], 'r':[], 'S_next': []} S = np.zeros((N_A,),dtype='int16') # #state == #action if self.disp_flag: print('S:', S) done = False while done == False: action, done = self.get_action_with_random(P_no, S) Buff['P_no'].append(P_no) Buff['S'].append(S.copy()) Buff['a'].append(action) set_state_inplace(S, action, P_no) Buff['S_next'].append(S.copy()) if self.disp_flag: print('S:', S) win_player = calc_reward_tf(S) reward = 0 if win_player == 0 else 1 Buff['r'].append(reward) P_no = 1 if P_no == 2 else 2 if win_player: done = True if self.disp_flag: if win_player: print(f'player {win_player} win') else: print(f'Tie game') return Buff def play_with_human(self, player_human=1): """ Buff = play_with_human(self, P_no) - Playing with human [Inputs] P_no: player number, which is 1 or 2 [Returns] Buff = {'P_no': [], 'S':[], 'a':[], 'r':[], 'S_next': []}: gathered information during learning where S, a, r, S_next are state, action, rewrd, and next state [Examples] 1. Buff = self.play(1) 2. Buff = self.play(2) """ N_A = self.N_A Buff = {'P_no': [], 'S':[], 'a':[], 'r':[], 'S_next': []} S = np.zeros((N_A,),dtype='int16') # #state == #action if self.disp_flag: print('S:', S) P_no = 1 # set P_no = 1 while player_human could be 1 or 2 done = False while done == False: if player_human == P_no: action, done = self.get_action_with_human(P_no, S) else: P_no_trained_agent = 1 # random agent is 2 action, done = self.get_action_with_random(P_no_trained_agent, S) Buff['P_no'].append(P_no) Buff['S'].append(S.copy()) Buff['a'].append(action) set_state_inplace(S, action, P_no) Buff['S_next'].append(S.copy()) if self.disp_flag: print('S:', S) win_player = calc_reward_tf(S) reward = 0 if win_player == 0 else 1 Buff['r'].append(reward) P_no = 1 if P_no == 2 else 2 if win_player: done = True print(S.reshape(3,3)) if win_player == player_human: print('You win') elif win_player != 0: print('You lose') else: print('Tie game') return Buff def update_Qsa_inplace(self, Buff, ff=0.9, lr=0.01): def discounted_inplace(Buff_r): """discounted_inplace(Buff_r): Convert a reward vector to a discounted return vector using ff, where ff means forgeting factor. [Input] Buff_r = Buff[r]: stores rewards in each episode """ g_prev = 0 for i, r_l in enumerate(reversed(Buff_r)): Buff_r[-i-1] = r_l + ff * g_prev g_prev = Buff_r[-i-1] def updateQsa_inplace(Qsa_player, Buff_player): if self.disp_flag: print('---------------------------------------') print('S, S_idx, a, lr * r, Qsa_player[S_idx,a]') for S, a, r in zip(Buff_player['S'], Buff_player['a'], Buff_player['r']): S_idx = self.calc_S_idx(S) Qsa_player[S_idx,a] += lr * r if self.disp_flag: print(S, S_idx, a, lr * r, Qsa_player[S_idx,a]) # def updateQsa_stages_inplace(player, Qsa_player, Buff_player, disp_flag=True): def buff_depart(Buff): Buff_dual = [{'S':[], 'a':[], 'r':[]}, {'S':[], 'a':[], 'r':[]}] for i, (p, S, a, r) in enumerate(zip(Buff['P_no'], Buff['S'], Buff['a'], Buff['r'])): if i > 0: # final reward for a player is reward of a next player prev_p = 2 if p==1 else 1 Buff_dual[prev_p-1]['r'][-1] = -r # 1 for player#2 --> -1 for player#1, vice versa if self.disp_flag: print('i, prev_p, Buff_dual[prev_p-1]') print(i, prev_p, Buff_dual[prev_p-1]) Buff_dual[p-1]['S'].append(S) Buff_dual[p-1]['a'].append(a) Buff_dual[p-1]['r'].append(r) if self.disp_flag: print('i, p, Buff_dual[p-1]') print(i, p, Buff_dual[p-1]) return Buff_dual Buff_dual = buff_depart(Buff) # player#1 for player in [1,2]: discounted_inplace(Buff_dual[player-1]['r']) # for player#1 if self.disp_flag: print('player:', player) print("Buff_dual[player-1]['r']", Buff_dual[player-1]['r']) updateQsa_inplace(self.Qsa[player-1], Buff_dual[player-1]) # updateQsa_stages_inplace(player, self.Qsa_stages[player-1], Buff_dual[player-1]) def learning(self, N_episodes=2, ff=0.9, lr=0.01, print_cnt=10): """Return: cnt_trace = [cnt, ...]: cnt vector are stacked in cnt_trace """ cnt = [0, 0, 0] # tie, p1, p2 cnt_trace = [cnt.copy()] player = 1 for episode in range(N_episodes): # print('===================================') # Can save this data for play 2 as well # Decrease epsilon with respect to epside self.epsilon = 1 / (1 + episode/100) # self.epsilon = 1 / (1 + episode) Buff = self.play(player) self.update_Qsa_inplace(Buff, ff=ff, lr=lr) win_player = 0 if Buff['r'][-1] == 0 else Buff['P_no'][-1] cnt[win_player] += 1 cnt_trace.append(cnt.copy()) player = 2 if player == 1 else 1 if episode % print_cnt == 0: print(episode, cnt) print('S = [0,0,0, 0,0,0, 0,0,0]') print('Qsa[0][0,:]', [f'{self.Qsa[0][0,a]:.1e}' for a in range(9)]) print('Qsa[1][0,:]', [f'{self.Qsa[1][0,a]:.1e}' for a in range(9)]) S = [1,1,0, 2,1,2, 1,2,2] S_idx = self.calc_S_idx(S) print('S = ', S) print(f'Qsa[0][{S_idx},:]', [f'{self.Qsa[0][S_idx,a]:.1e}' for a in range(9)]) print(f'Qsa[1][{S_idx},:]', [f'{self.Qsa[1][S_idx,a]:.1e}' for a in range(9)]) S = [1,1,0, 2,0,0, 2,0,0] S_idx = self.calc_S_idx(S) print('S = ', S) print(f'Qsa[0][{S_idx},:]', [f'{self.Qsa[0][S_idx,a]:.1e}' for a in range(9)]) print(f'Qsa[1][{S_idx},:]', [f'{self.Qsa[1][S_idx,a]:.1e}' for a in range(9)]) return cnt_trace class Testing: def __init__(self, fn_name): '''Usages: - Testing('calc_reward_tf') ''' if fn_name == 'calc_reward_tf': self.test_calc_reward_tf() elif fn_name == 'find_action_list': self.test_find_action_list() elif fn_name == 'get_action': self.test_get_action() elif fn_name == 'all': self.test_calc_reward_tf() self.test_find_action_list() self.test_get_action() def test_calc_reward_tf(self): S_examples = tf.constant([[0,0,0, 0,0,0, 0,0,0], [1,1,1, 2,0,2, 2,0,0], [0,0,2, 1,2,1, 2,0,0]]) print('===================================') print('Testing: calc_reward_tf') print('[Anwer]') answer = [0, 1, 2] print(answer) print('-------------------------------------') print('[Test]') test = [calc_reward_tf(S) for S in S_examples] print(test) if test == answer: print('Test Ok') else: print('Test fail') def test_find_action_list(self): print('===================================') print('Testing: test_find_action_list') print('[Answer]') print('''[[0 0 0] [0 0 0] [0 0 0]] [0, 1, 2, 3, 4, 5, 6, 7, 8] 0 [[0 2 0] [0 1 0] [1 0 2]] [0, 2, 3, 5, 7] 4''') N_A = 9 N_Symbols = 3 my_Q_System = Q_System(N_A, N_Symbols) print('-------------------------------------') print('[Test]') S_l = [[0,0,0, 0,0,0, 0,0,0], [0,2,0, 0,1,0, 1,0,2]] for S in S_l: action_list, no_occupied = my_Q_System.find_action_list(S) print(np.reshape(S,(3,3)), action_list, no_occupied) def test_get_action(self): print('===================================') print('Testing: get_action') print('''[Answer] Equally max actions: [0] S_idx 0 action 0 action_list [0, 1, 2, 3, 4, 5, 6, 7, 8] action_prob [1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0] [[0 0 0] [0 0 0] [0 0 0]] 1 0 Equally max actions: [0] S_idx 13950 action 0 action_list [0, 1, 3, 5, 7] action_prob [1.0, 0.0, 0.0, 0.0, 0.0] [[0 0 2] [0 1 0] [1 0 2]] 1 0 Equally max actions: [0] S_idx 0 action 0 action_list [0, 1, 2, 3, 4, 5, 6, 7, 8] action_prob [1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0] [[0 0 0] [0 0 0] [0 0 0]] 2 0 Equally max actions: [0] S_idx 13950 action 0 action_list [0, 1, 3, 5, 7] action_prob [1.0, 0.0, 0.0, 0.0, 0.0] [[0 0 2] [0 1 0] [1 0 2]] 2 0''') N_A = 9 N_Symbols = 3 my_Q_System = Q_System(N_A, N_Symbols) print('-------------------------------------') print('[Test]') S_l = [[0,0,0, 0,0,0, 0,0,0], [0,0,2, 0,1,0, 1,0,2]] for P_no in [1,2]: for S in S_l: S_idx = my_Q_System.calc_S_idx(S) my_Q_System.Qsa[P_no-1][S_idx,:] = np.array([1.0,0.0,0, 0,0,0, 0,0,0]) action, _ = my_Q_System.get_action(P_no, S) print(np.reshape(S,(3,3)), P_no, action) def _main(): ff = 0.9 lr = 0.01 N_episodes = 2 N_Symbols = 3 # 0=empty, 1=plyaer1, 2=player2 N_A = 9 # (0,0), (0,1), ..., (2,2) my_Q_System = Q_System(N_A, N_Symbols) cnt = [0, 0, 0] # tie, p1, p2 player = 1 for episode in range(N_episodes): # print('===================================') # Can save this data for play 2 as well Buff = my_Q_System.play(player) my_Q_System.update_Qsa_inplace(Buff, ff=ff, lr=lr) win_player = 0 if Buff['r'][-1] == 0 else Buff['P_no'][-1] cnt[win_player] += 1 player = 2 if player == 1 else 1 if episode % 10 == 0: print(episode, cnt) print(cnt) def plot_cnt_trace(cnt_trace): N_cnt = len(cnt_trace) cnt_d = {'Equal':np.zeros(N_cnt,dtype=int), 'P1':np.zeros(N_cnt,dtype=int), 'P2':np.zeros(N_cnt,dtype=int)} for i, cnt in enumerate(cnt_trace): cnt_d['Equal'][i] = cnt[0] cnt_d['P1'][i] = cnt[1] cnt_d['P2'][i] = cnt[2] plt.plot(range(N_cnt), cnt_d['Equal'], label='Equal') plt.plot(range(N_cnt), cnt_d['P1'], label='Plyaer1 wins') plt.plot(range(N_cnt), cnt_d['P2'], label='Plyaer2 wins') plt.xlabel('Episode') plt.ylabel('Count') plt.legend(loc=0) plt.title('Learned (P#1) vs. Random (P#2) policies during learning') plt.show(True) def learning_stage(N_episodes=100, save_flag=True, fig_flag=False): ff = 0.9 lr = 0.01 N_Symbols = 3 # 0=empty, 1=plyaer1, 2=player2 N_A = 9 # (0,0), (0,1), ..., (2,2) print_cnt = N_episodes / 10 my_Q_System = Q_System(N_A, N_Symbols) cnt_trace = my_Q_System.learning(N_episodes=N_episodes, ff=ff, lr=lr, print_cnt=print_cnt) print('-------------------') cnt = cnt_trace[-1] print(N_episodes, cnt) if save_flag: my_Q_System.save() if fig_flag: plot_cnt_trace(cnt_trace) return my_Q_System def input_default(str, defalut_value, dtype=int): answer = input(str) if answer == '': return defalut_value else: return dtype(answer) def main(): Q1 = input_default('1. Loading a trained agent (0) or Learning a new agent (1)? (default=0) ', 0, int) if Q1 == 0: print('Loading the trained agent...') Q2 = input_default('2. Do you want to start first?(0=yes,1=no,default=0) ', 0, int) player_human = Q2 + 1 if player_human == 1: print('You=1, Agent=2') else: print('Agent=1, You=2') trained_Q_System = Q_System(None) trained_Q_System.load() trained_Q_System.play_with_human(player_human) # print(len(trained_Q_System.Qsa)) else: print('Start to learn a new agent...') Q2 = input_default('2. How many episode do you want to learn?(default=10000) ', 10000, int) # my_Q_System = learning_stage(N_episodes=Q2, fig_flag=True) _ = learning_stage(N_episodes=Q2, fig_flag=True) # print(len(my_Q_System.Qsa)) if __name__ == "__main__": main() # Testing('all') pass