-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathpolicy.py
135 lines (100 loc) · 3.69 KB
/
policy.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
import numpy as np
class Policy():
def __init__(self, **kwargs):
pass
def initialize(self):
raise NotImplementedError
def update_memory(self, i, reward):
if reward == 1:
self.memory[i][0] += 1
else:
self.memory[i][1] += 1
def choose_action(self):
raise NotImplementedError
def step(self):
raise NotImplementedError
def run(self, num_steps):
self.initialize()
cum_reward = []
for i in range(num_steps):
cum_reward.append(self.step())
return cum_reward
class GreedyPolicy(Policy):
def __init__(self, memory=None, estimators=None, bandit=None, eps=0.0):
self.bandit = bandit
if memory is None:
self.memory = dict([(i, [0,0]) for i in range(self.bandit.n)])
else:
self.memory = memory
self.eps = eps
if estimators is None:
self.estimators = [0.5] * self.bandit.n
else:
self.estimators = estimators
def initialize(self):
self.memory = dict([(i, [0,0]) for i in range(self.bandit.n)])
self.estimators = [0.5] * self.bandit.n
def update_estimator(self, i, reward):
self.estimators[i] = self.memory[i][0] / (self.memory[i][0] + self.memory[i][1])
def choose_action(self):
seed = np.random.random()
if seed < self.eps:
action = np.random.randint(self.bandit.n)
else:
action = np.argmax(self.estimators)
return action
def step(self):
action = self.choose_action()
reward = self.bandit.generate_reward(action)
self.update_memory(action, reward)
self.update_estimator(action, reward)
return reward
class ThompsonSampling(Policy):
def __init__(self, memory=None, bandit=None):
self.bandit = bandit
if memory is None:
self.memory = dict([(i,[1,1]) for i in range(self.bandit.n)])
else:
self.memory = memory
def initialize(self):
self.memory = dict([(i,[1,1]) for i in range(self.bandit.n)])
def choose_action(self):
rewards = [np.random.beta(self.memory[i][0], self.memory[i][1]) for i in range(self.bandit.n)]
action = np.argmax(rewards)
return action
def step(self):
action = self.choose_action()
reward = self.bandit.generate_reward(action)
self.update_memory(action, reward)
return reward
class UCB(Policy):
def __init__(self, memory=None, bandit=None):
self.bandit = bandit
if memory is None:
self.memory = dict([(i, [0,0]) for i in range(self.bandit.n)])
else:
self.memory = memory
def initialize(self):
self.memory = dict([(i, [0,0]) for i in range(self.bandit.n)])
self.counter = [1] * self.bandit.n
self.estimators = [0.5] * self.bandit.n
def update_counter(self, i):
self.counter[i] += 1
def update_estimator(self, i):
self.estimators[i] = self.memory[i][0] / np.sum(self.memory[i])
def choose_action(self, num_step):
action = np.argmax(self.estimators + np.sqrt(2*np.log(num_step+1)/self.counter))
return action
def step(self, num_step):
action = self.choose_action(num_step)
reward = self.bandit.generate_reward(action)
self.update_memory(action, reward)
self.update_counter(action)
self.update_estimator(action)
return reward
def run(self, num_steps):
self.initialize()
cum_reward = []
for i in range(num_steps):
cum_reward.append(self.step(i))
return cum_reward