-
Notifications
You must be signed in to change notification settings - Fork 2
/
cme.py
374 lines (279 loc) · 12.7 KB
/
cme.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
import numpy as np
from tqdm import tqdm
import logging
logger = logging.getLogger(__name__)
import pdist
dtype = np.double
class Reaction:
""" Reaction base class
Every reaction has the following parameters:
- rate: float
Base rate of the reaction
- products: array or None (default: None)
List of products created during the reaction. Entries must be of the following forms:
* spec: int
Denotes one reactant of the given species
* (b: float, spec: int)
Denotes a burst of the given species following the geometric distribution with mean b
(by convention the geometric distribution starts at 0).
"""
def __init__(self, rate, products = None):
self.rate = rate
self.products = products
class GenReaction(Reaction):
"""
The GenReaction class represents reactions without educts.
"""
def __init__(self, rate, products = None):
super().__init__(rate, dist, products)
def __str__(self):
return "GenReaction(rate={}, products={})".format(self.rate, self.products)
class UniReaction(Reaction):
""" The UniReaction class represents unimolecular reactions.
Parameters:
- spec: int
The species undergoing the reaction
"""
def __init__(self, rate, spec, products = None):
super().__init__(rate, products)
self.spec = spec
def __str__(self):
return "UniReaction(rate={}, spec={}, products={})".format(self.rate, self.spec, self.products)
class BiReaction(Reaction):
"""
The BiReaction class represents bimolecular reactions.
Parameters:
- specA, specB: int
The species of the two particles undergoing the reaction
"""
def __init__(self, rate, specA, specB, products = None):
super().__init__(rate, products)
self.specA = specA
self.specB = specB
def __str__(self):
return "BiReaction(rate={}, specA={}, specB={}, products={})".format(self.rate, self.specA, self.specB, self.products)
class ReactionSystem:
""" This class stores information about a reaction system.
Arguments:
n_species: int
Number of species in the system. Species are
enumerated starting from 0.
reactions: array of reactions (default: ())
List of allowed reactions in the system
initial_state: array of ints
Initial number of particles per species.
"""
def __init__(self, n_species, reactions = (), initial_state = None):
self.n_species = n_species
if initial_state is None:
initial_state = np.zeros(n_species, dtype=int)
self.initial_state = initial_state
self.reactions = reactions
self.sort_reactions(reactions)
def sort_reactions(self, reactions):
""" Sort reactions by type during initialisation """
self.reactions_gen = []
self.reactions_uni = []
self.reactions_bi = []
for reac in reactions:
if isinstance(reac, GenReaction):
self.reactions_gen.append(reac)
elif isinstance(reac, UniReaction):
self.reactions_uni.append(reac)
elif isinstance(reac, BiReaction):
self.reactions_bi.append(reac)
else:
raise TypeError("Unknown reaction type for '{}'".format(reac))
def create_particle_system(self, seed=None):
""" Create a new instance of ParticleSystem associated to this reaction system """
return ParticleSystem(self, seed=seed)
class ParticleSystem:
""" This class represents the state of a particle system associated to a ReactionSystem
Args:
system: ReactionSystem
Associated reaction system defining the model to use
seed: int or None (default: None)
Optional seed for random number generator
Attributes:
counts: n_species array of ints
Per-species particle counts
t: float
Current time in the system
events: array of events
Ordered list of time-stamped events in the system
"""
def __init__(self, system, seed=None):
self.system = system
self.counts = np.zeros(system.n_species, dtype=int)
self.rng = np.random.RandomState(seed=seed)
# Precompute some things for ease of access during simulations
self.gen_rates = np.asarray([ r.rate for r in self.system.reactions_gen ])
self.uni_rates = np.asarray([ r.rate for r in self.system.reactions_uni ])
self.uni_spec = np.asarray([ r.spec for r in self.system.reactions_uni ], dtype=int)
self.t = 0
self.events = []
self.add_initial_molecules()
### INTERNALS ###
def add_products(self, products):
""" Add list of particles to the reaction system. Returns list of particles placed.
"""
# Create list of products to be placed
products = self.expand_products(products)
log = []
for product in products:
self.counts[product] += 1
log.append((product,))
return log
def expand_products(self, products):
"""
Convert description of reaction products into list of products to be
added, drawing particle numbers for products produced in bursts.
"""
ret = []
for prod in products:
if type(prod) == int:
ret.append(prod)
else:
b, spec = prod
p = 1 / b
# Numpy's geometric distribution starts at 1
n = self.rng.geometric(p) - 1
ret += [ spec for i in range(n) ]
return ret
def add_initial_molecules(self):
""" Place initial molecules in the system """
assert self.t == 0
for spec, n_init in enumerate(self.system.initial_state):
product_log = self.add_products([ spec for i in range(n_init) ])
event = ("gen", None, product_log)
self.events.append((0, event))
def compute_bi_rates(self):
""" Compute reaction rates for bimolecular reactions """
rates = np.empty(len(self.system.reactions_bi),)
for i, reac in enumerate(self.system.reactions_bi):
# Do not overcount reactant pairs if both educts are of the same species
if reac.specA == reac.specB:
combs = 0.5 * self.counts[reac.specA] * (self.counts[reac.specA] - 1)
else:
combs = self.counts[reac.specA] * self.counts[reac.specB]
rates[i] = reac.rate * combs
return rates
### UPDATES ###
def perform_gen_reaction(self, reaction):
""" Simulate one occurrence of the specified generative reaction """
product_log = self.add_products(reaction.products)
return ("gen", reaction, product_log)
def perform_uni_reaction(self, reaction):
""" Simulate one occurrence of the specified unimolecular reaction """
self.counts[reaction.spec] -= 1
product_log = ()
if reaction.products is not None:
product_log = self.add_products(reaction.products)
return ("uni", reaction, product_log)
def perform_bi_reaction(self, reaction, rates):
""" Simulate one occurrence of the specified bimolecular reaction.
Refer to perform_uni_reaction for more information. """
self.counts[reaction.specA] -= 1
self.counts[reaction.specB] -= 1
product_log = ()
if reaction.products is not None:
product_log = self.add_products(reaction.products)
return ("bi", reaction, product_log)
### MAIN LOOP ###
def run(self, tmax, disable_pbar=True):
""" Run simulation for tmax time units using the Gillespie algorithm """
t0 = self.t
gen_rates = self.gen_rates
with tqdm(total=tmax,
desc="Time simulated: ",
unit="s",
disable=disable_pbar) as pbar:
while True:
uni_rates = self.uni_rates * self.counts[self.uni_spec]
bi_rates = self.compute_bi_rates()
rate = np.sum(gen_rates) + np.sum(uni_rates) + np.sum(bi_rates)
# Nothing happening
if rate == 0.0 or not np.isfinite(rate):
if not np.isfinite(rate):
logger.warning("Numerical blow-up in CME simulation")
# Pretend the last reaction happened at time tmax.
# This is necessary for updating progress bar correctly.
dt = 0
break
dt = self.rng.exponential(1 / rate)
self.t += dt
if self.t > t0 + tmax:
break
pbar.update(dt)
# The Gillespie algorithm randomly samples a possible event
# with probabilities proportional to the rates
p = self.rng.uniform(0, rate)
# Zero-molecular reaction happening
if p <= np.sum(gen_rates):
for reac, rate in zip(self.system.reactions_gen, gen_rates):
if p >= rate:
p -= rate
continue
event = self.perform_gen_reaction(reac)
self.events.append((self.t, event))
break
# Unimolecular reaction happening
elif p <= np.sum(gen_rates) + np.sum(uni_rates):
p -= np.sum(gen_rates)
for reac in self.system.reactions_uni:
if p >= reac.rate * self.counts[reac.spec]:
p -= reac.rate * self.counts[reac.spec]
continue
event = self.perform_uni_reaction(reac)
self.events.append((self.t, event))
break
# Bimolecular reaction happening
else:
p -= np.sum(gen_rates) + np.sum(uni_rates)
for reac, rates in zip(self.system.reactions_bi, bi_rates):
if p >= np.sum(rates):
p -= np.sum(rates)
continue
event = self.perform_bi_reaction(reac, rates)
self.events.append((self.t, event))
break
# This set sprogress bar value to t0 + tmax
pbar.update(dt - (self.t - t0 - tmax))
self.t = t0 + tmax
def get_dist(self, t_max=None):
"""
Return the distribution of particle numbers over the lifetime of the system
using time-averaging.
"""
counts = np.zeros((self.system.n_species, len(self.events) + 1), dtype=int)
weights = np.zeros(len(self.events) + 1)
if t_max is None:
t_max = self.t
t_last = 0
i = 0
for t, e in self.events:
if t > t_max:
continue
weights[i] = t - t_last
t_last = t
i += 1
counts[:,i] = counts[:,i-1]
if e[0] == "gen":
product_log = e[2]
for spec_product in product_log:
counts[spec_product][i] += 1
if e[0] == "uni":
reac = e[1]
counts[reac.spec][i] -= 1
product_log = e[2]
for spec_product in product_log:
counts[spec_product][i] += 1
elif e[0] == "bi":
reac = e[1]
counts[reac.specA][i] -= 1
counts[reac.specB][i] -= 1
product_log = e[2]
for spec_product in product_log:
counts[spec_product][i] += 1
weights[i] = t_max - t_last
return pdist.ParticleDistribution(counts[:,:i+1], weights=weights[:i+1])