r"""The solver module provides numerical ODE solver functions.
These functions are either implemented here or provide an interface
to other solvers.
"""
import fipy
import numpy as np
import matplotlib.pyplot as plt
from multiprocessing import Pool, cpu_count
from functools import partial
import progressbar
import logging
from scarce import silicon, tools
[docs]def solve(var, equation, **kwargs):
''' Interface to the fipy solver used for the 2d poisson equation.
Parameters
----------
var : fipy.CellVariable
Fipy meshed cell variables to solve the equation for
equation : fipy equation
e.g. fipy.DiffusionTerm() = 0
kwargs : kwargs
Arguments of fipy equation.solve(kwargs)
Notes
-----
A linear LU solver is forced here, since otherwise pysparse based solver
do not converge properly.
'''
# Force a linear solver
# The others do not always converge (?!)
solver = fipy.solvers.LinearLUSolver
# Reasonable and more strict convergence criteria
tolerance = 1e-20
iterations = 10000
equation.solve(var=var,
solver=solver(tolerance=tolerance, iterations=iterations),
**kwargs)
[docs]class DriftDiffusionSolver(object):
''' Solve the drift-diffusion equation for pseudo particles.
Solving via euler forward difference.
'''
def __init__(self, pot_descr, pot_w_descr,
T=300, geom_descr=None, diffusion=True,
t_e_trapping=0., t_h_trapping=0.,
t_e_t1=0., t_h_t1=0.,
t_r=0., save_frac=20):
'''
Parameters
----------
pot_descr : fields.Description
Description object to describe the potential
pot_w_descr : fields.Description
Description object to describe the weighting potential
T : number
Temperatur in Kelvin
geom_descr : scarce.geometry.SensorDescription3D
Describes the 3D sensor geometry
t_e_trapping : number
Trapping time for electrons in ns
t_h_trapping : number
Trapping time for holes in ns
t_e_t1 : number
E-Field dependent trapping time in ns / (V / um)
t_h_t1 : number
E-Field dependent trapping time in ns / (V / um)
t_r : number
Peaking time of CSA
save_frac : number
Fraction of time steps to save for each e-h pair.
E.g.: 100 means that the position and current is saved for every
100th time step. Be aware that the sampling might be too low to get
the maxima correctly. If you want to be sure to get the correct
single e-h pair values save_frac should be set to 1. Usually one
is interessted in the total current. This value is independent of
save_frac but not available for each e-h pair.
Notes
-----
'''
self.pot_descr = pot_descr
self.pot_w_descr = pot_w_descr
self.T = T # Temperatur in Kelvin
self.t_e_trapping = t_e_trapping # Trapping time in ns
self.t_h_trapping = t_h_trapping # Trapping time in ns
self.t_e_t1 = t_e_t1 # Trapping time in ns
self.t_h_t1 = t_h_t1 # Trapping time in ns
self.t_r = t_r # Rise time of Amplifier
self.geom_descr = geom_descr
self.diffusion = diffusion
self.save_frac = save_frac
def solve(self, p0, q0, dt, n_steps, multicore=True):
''' Solve the drift diffusion equation for quasi partciles and calculates
the total induced current.
Parameters
----------
p0 : array_like, shape = (n, 2)
n positions of e-h-pairs in x, y at t = 0
q0 : array_like, shape = (n, )
Charge of quasi particles at t = 0.
To give in electrons is a good choise.
dt : number
Time step in ns. Influences presicion. Should be <= 0.001 (ps).
n_steps : number
Number of steps in time. Has to be large enough that all
particles reach the readout electrode.
'''
if dt > 0.001:
logging.warning('A time step > 1 ps result in wrong diffusion')
if not multicore:
# E-h pairs Start positions
p_e_0, p_h_0 = p0.copy(), p0.copy()
return _solve_dd(p_e_0,
p_h_0,
q0=q0,
n_steps=n_steps,
dt=dt,
geom_descr=self.geom_descr,
pot_w_descr=self.pot_w_descr,
pot_descr=self.pot_descr,
temp=self.T,
diffusion=self.diffusion,
t_e_trapping=self.t_e_trapping,
t_h_trapping=self.t_h_trapping,
t_e_t1=self.t_e_t1,
t_h_t1=self.t_h_t1,
t_r=self.t_r,
save_frac=self.save_frac)
n_slices = min(4, cpu_count() - 1)
pool = Pool(n_slices)
# Split data and into cores - 1 slices, max 4
slices_p_e_0 = np.array_split(p0, n_slices, axis=1)
slices_p_h_0 = np.array_split(p0, n_slices, axis=1)
slices_q0 = np.array_split(q0, n_slices, axis=0)
logging.info('Calculate drift diffusion on %d cores', n_slices)
jobs = []
for index in range(n_slices):
job = tools.apply_async(pool=pool,
fun=_solve_dd,
p_e_0=slices_p_e_0[index],
p_h_0=slices_p_h_0[index],
q0=slices_q0[index],
n_steps=n_steps,
dt=dt,
geom_descr=self.geom_descr,
pot_w_descr=self.pot_w_descr,
pot_descr=self.pot_descr,
temp=self.T,
diffusion=self.diffusion,
t_e_trapping=self.t_e_trapping,
t_h_trapping=self.t_h_trapping,
t_e_t1=self.t_e_t1,
t_h_t1=self.t_h_t1,
t_r=self.t_r,
save_frac=self.save_frac
)
jobs.append(job)
# Gather results
results = []
for job in jobs:
results.append(job.get())
pool.close()
pool.join()
del pool
# Merge results
I_ind_tot = np.zeros(shape=(n_steps,))
traj_e = np.concatenate([i[0] for i in results], axis=2)
traj_h = np.concatenate([i[1] for i in results], axis=2)
I_ind_e = np.concatenate([i[2] for i in results], axis=1)
I_ind_h = np.concatenate([i[3] for i in results], axis=1)
T = np.concatenate([i[4] for i in results], axis=1)
for i in results:
I_ind_tot += i[5]
Q_ind_tot_e = np.concatenate([i[6] for i in results], axis=0)
Q_ind_tot_h = np.concatenate([i[7] for i in results], axis=0)
return traj_e, traj_h, I_ind_e, I_ind_h, T, I_ind_tot, Q_ind_tot_e, Q_ind_tot_h
# Drift diffusion iteration loop helper functions
def _in_boundary(geom_descr, pot_descr, x, y, sel):
''' Checks if the particles are still in the bulk and should be propagated
'''
sel_x = np.logical_and(x[sel] >= pot_descr.min_x,
x[sel] <= pot_descr.max_x)
sel_y = np.logical_and(y[sel] >= pot_descr.min_y,
y[sel] <= pot_descr.max_y)
old_sel = sel.copy()
sel[sel] = np.logical_and(sel_x, sel_y)
if geom_descr:
sel_col = geom_descr.position_in_column(x, y, incl_sides=True)
sel = np.logical_and(sel, ~sel_col)
return sel, np.where(old_sel != sel)[0]
def _correct_boundary(geom_descr, pot_descr, x, y, sel, is_electron):
''' Checks if the particles are still in the bulk and should be propagated
'''
if not geom_descr: # planar sensor
if is_electron:
y[sel][y[sel] > pot_descr.max_y] = pot_descr.max_y
else:
y[sel][y[sel] < 0.] = 0.
def _solve_dd(p_e_0, p_h_0, q0, n_steps, dt, geom_descr, pot_w_descr,
pot_descr, temp, diffusion, t_e_trapping, t_h_trapping,
t_e_t1, t_h_t1, t_r, save_frac):
p_e, p_h = p_e_0, p_h_0
# Result arrays initialized to NaN
n_store = int(n_steps / save_frac) # Steps to store
max_step_size = n_steps / n_store * 10
# Different store time step for each e-h pair
T = np.full(shape=(n_store, p_e.shape[1]),
fill_value=np.nan, dtype=np.float32)
# Stored trajectory for each eh pair
traj_e = np.full(shape=(n_store, p_e.shape[0], p_e.shape[1]),
fill_value=np.nan)
traj_h = np.full(shape=(n_store, p_h.shape[0], p_h.shape[1]),
fill_value=np.nan)
# Stored induced charge for each eh pair
I_ind_e = np.zeros(shape=(n_store, p_e.shape[1]))
I_ind_h = np.zeros_like(I_ind_e)
# Helper array(s) of actual step index and next step index
i_step = np.zeros(p_e.shape[1], dtype=np.int) # Result array indeces
next_step = np.zeros_like(i_step) # Next time step to store
# Summed induced charge/current with every time step
I_ind_tot = np.zeros(shape=(n_steps))
# Total induced charge
Q_ind_tot_e = np.zeros(shape=(p_e.shape[1]))
Q_ind_tot_h = np.zeros(shape=(p_h.shape[1]))
def add_diffusion(v_e, v_h):
# Calculate absolute thermal velocity
v_th_e = silicon.get_thermal_velocity(temperature=temp,
is_electron=True)
v_th_h = silicon.get_thermal_velocity(temperature=temp,
is_electron=False)
# Create thermal velocity distribution
# From: The Atomistic Simulation of Thermal Diffusion
# and Coulomb Drift in Semiconductor Detectors
# IEEE VOL. 56, NO. 3, JUNE 2009
v_th_e *= np.sqrt(2. / 3. * np.log(np.abs(
1. / (1. - np.random.uniform(size=v_e.shape[1])))))
v_th_h *= np.sqrt(2. / 3. * np.log(np.abs(
1. / (1. - np.random.uniform(size=v_h.shape[1])))))
# Calculate random direction in x, y
# Uniform random number 0 .. 2 Pi
eta = np.random.uniform(0., 2. * np.pi, size=v_e.shape[1])
direction_e = np.array([np.cos(eta), np.sin(eta)])
eta = np.random.uniform(0., 2. * np.pi, size=v_h.shape[1])
direction_h = np.array([np.cos(eta), np.sin(eta)])
v_th_e = v_th_e[np.newaxis, :] * direction_e
v_th_h = v_th_h[np.newaxis, :] * direction_h
v_e += v_th_e
v_h += v_th_h
return v_e, v_h
def cal_step_size(t, Q_ind_tot, dt, dydt, q_max, i_step, n_store):
''' Calculates the step size from the actual data
It is assumed that the actual slope stays constant.
The remaining time distance is calculated and the step size is
adjusted to fit the remaining steps.
'''
step_size = np.zeros_like(q_max)
# All steps are used, mark as done
sel_done = n_store - i_step - 1 <= 0
# All storage spaces used up
if step_size[~sel_done].size == 0:
return step_size.astype(np.int)
# Set next step to NaN if storing is done
step_size[sel_done] = np.NaN
# Calculate remaining x distance to cover
try:
# Case: increasing function (assume value = q_max at tmax)
t_max_exp = ((q_max - Q_ind_tot) / dydt + t)[~sel_done]
# Case: decreasing function (assume value = 0 at tmax)
t_max_exp[dydt[~sel_done] < 0] = ((-q_max - Q_ind_tot) /
dydt + t)[~sel_done][dydt[~sel_done] < 0]
# Correct expected time larger than simulation time
t_max_exp[t_max_exp > n_steps * dt] = n_steps * dt
# Remaining time to cover
t_left = t_max_exp - t
except (IndexError, ValueError):
logging.error('q_max.shape %s', str(q_max.shape))
logging.error('sel_done.shape %s', str(sel_done.shape))
logging.error('Q_ind_tot.shape %s', str(Q_ind_tot.shape))
logging.error('dydt.shape %s', str(dydt.shape))
logging.error('t_max_exp.shape %s', str(t_max_exp.shape))
logging.error('dydt[~sel_done].shape %s', str(dydt[~sel_done].shape))
logging.error('(dydt[~sel_done] < 0).shape %s', str((dydt[~sel_done] < 0).shape))
logging.error('(-q_max - Q_ind_tot) / dydt + t).shape %s', str(((-q_max - Q_ind_tot) / dydt + t).shape))
logging.error('(-q_max - Q_ind_tot) / dydt + t)[~sel_done].shape %s', str(((-q_max - Q_ind_tot) / dydt + t)[~sel_done].shape))
raise
# Calculate the step size
step_size[~sel_done] = t_left / dt / (n_store - i_step[~sel_done] - 1)
# Limit step size to max_step_size
# Needed for slope direction changing functions
sel_lim_max = step_size[~sel_done] > max_step_size
step_size[~sel_done][sel_lim_max] = max_step_size
# Minimum step size = 1
step_size[np.logical_and(~sel_done, step_size < 1.)] = 1
step_size = step_size.astype(np.int)
return step_size
def store_if_needed(step, next_step, Q_ind_tot_e, Q_ind_tot_h, dt, dQ_e,
dQ_h, T, I_ind_e, I_ind_h, p_e, p_h, q_max,
i_step, n_store, sel_e, sel_h):
''' Checks if charge carriers value needs to be stored and returns
next storage time step
'''
t = dt * step # Actual time step
# Select charges that need storing for full array (1 entry per e-h)
store = step == next_step
# Select charges that need storing and are not fully propagated yet
# for full array (1 entry per e-h)
store_e = np.logical_and(store, sel_e)
store_h = np.logical_and(store, sel_h)
# Select charges that need storing for reduced array (e-h that need
# propagating
s_e = store_e[sel_e]
s_h = store_h[sel_h]
# All carrier stored
if not np.any(store):
return
# All carriers needing storing are fully drifted
if not np.any(s_e) and not np.any(s_h):
return
# Set data of actual time step
T[i_step[store], store] = t
# Calculate induced charge for integrated time steps T
DT_e = T[i_step[store_e], store_e] - T[i_step[store_e] - 1, store_e]
DT_h = T[i_step[store_h], store_h] - T[i_step[store_h] - 1, store_h]
DT_e[np.isnan(DT_e)] = dt # First storing
DT_h[np.isnan(DT_h)] = dt # First storing
I_ind_e[i_step[store_e], store_e] = dQ_e_step[store_e] / DT_e
I_ind_h[i_step[store_h], store_h] = dQ_h_step[store_h] / DT_h
# Store data
# I_ind_e[i_step[store_e], store_e] = dQ_e[s_e] / dt
# I_ind_h[i_step[store_h], store_h] = dQ_h[s_h] / dt
traj_e[i_step[store_e], :, store_e] = p_e[:, store_e].T
traj_h[i_step[store_h], :, store_h] = p_h[:, store_h].T
# if np.max(i_step) == 145:
# plt.plot(T[:, 0], I_ind_e[:, 0], '.')
# plt.plot(T[:, 0], I_ind_h[:, 0], '.')
# plt.show()
# Calculate step size as the minimum of the e and h step size
d_step = np.zeros_like(next_step)
d_step_e = np.zeros_like(d_step)
d_step_h = np.zeros_like(d_step)
if np.any(store_e):
d_step_e[store_e] = cal_step_size(t, Q_ind_tot=Q_ind_tot_e[store_e], dt=dt,
dydt=I_ind_e[i_step[store_e], store_e],
q_max=q_max[store_e],
i_step=i_step[store_e],
n_store=n_store)
d_step[store_e] = d_step_e[store_e]
if np.any(store_h):
d_step_h[store_h] = cal_step_size(t, Q_ind_tot=Q_ind_tot_h[store_h], dt=dt,
dydt=I_ind_h[i_step[store_h], store_h],
q_max=q_max[store_h],
i_step=i_step[store_h],
n_store=n_store)
d_step[store_h] = d_step_h[store_h]
sel = np.logical_and(store_e, store_h)
d_step[sel] = np.minimum(d_step_e[sel], d_step_h[sel])
next_step += d_step
# Increase storage hists indeces
i_step[store] += 1
i_step[i_step >= T.shape[0]] = T.shape[0] - 1
# Reset tmp. variable
dQ_e_step[store_e] = 0.
dQ_h_step[store_h] = 0.
# Tmp. variables to store the total induced charge per save step
# Otherwise the resolution of induced current calculation
# is reduced
dQ_e_step = np.zeros(shape=p_e.shape[1])
dQ_h_step = np.zeros(shape=p_h.shape[1])
progress_bar = progressbar.ProgressBar(
widgets=['', progressbar.Percentage(), ' ',
progressbar.Bar(marker='*', left='|', right='|'), ' ',
progressbar.AdaptiveETA()],
maxval=n_steps, term_width=80)
progress_bar.start()
sel_e, _ = _in_boundary(geom_descr, pot_descr=pot_descr,
x=p_e[0, :], y=p_e[1, :],
sel=np.ones(p_e.shape[1], dtype=np.bool))
sel_h, _ = _in_boundary(geom_descr, pot_descr=pot_descr,
x=p_h[0, :], y=p_h[1, :],
sel=np.ones(p_h.shape[1], dtype=np.bool))
for step in range(n_steps):
# Check if all particles out of boundary
if not np.any(sel_h) and not np.any(sel_e):
break # Stop loop to safe time
# Electric field in V/cm
E_e = pot_descr.get_field(
p_e[0, sel_e], p_e[1, sel_e]) * 1e4
E_h = pot_descr.get_field(
p_h[0, sel_h], p_h[1, sel_h]) * 1e4
# Mobility in cm2 / Vs
mu_e = silicon.get_mobility(np.sqrt(E_e[0] ** 2 + E_e[1] ** 2),
temperature=temp, is_electron=True)
mu_h = silicon.get_mobility(np.sqrt(E_h[0] ** 2 + E_h[1] ** 2),
temperature=temp, is_electron=False)
# Drift velocity in cm / s
v_e, v_h = - E_e * mu_e, E_h * mu_h
if diffusion:
v_e, v_h = add_diffusion(v_e, v_h)
# Calculate induced current
# Only if electrons are still drifting
if np.any(sel_e):
# Weighting field in V/um
W_e = pot_w_descr.get_field(p_e[0, sel_e], p_e[1, sel_e])
# Induced charge in C/s, Q = E_w * v * q * dt
dQ_e = (W_e[0] * v_e[0] + W_e[1] * v_e[1]) * \
- q0[sel_e] * dt * 1e-5
# Reduce induced charge due to trapping
if t_e_trapping:
t_e = t_e_trapping + t_e_t1 * np.sqrt(E_e[0]**2 + E_e[1]**2) * 1e-4
dQ_e *= np.exp(-dt * step / t_e)
if t_r:
dQ_e *= np.exp(-dt * step / (t_r / 2.2))
dQ_e_step[sel_e] += dQ_e
Q_ind_tot_e[sel_e] += dQ_e
I_ind_tot[step] += dQ_e.sum() / dt
if np.any(sel_h): # Only if holes are still drifting
# Weighting field in V/um
W_h = pot_w_descr.get_field(p_h[0, sel_h], p_h[1, sel_h])
# Induced charge in C/s, Q = E_w * v * q * dt
dQ_h = (W_h[0] * v_h[0] + W_h[1] * v_h[1]) * \
q0[sel_h] * dt * 1e-5
# Reduce induced charge due to trapping
if t_h_trapping:
t_h = t_h_trapping + t_h_t1 * np.sqrt(E_h[0]**2 + E_h[1]**2) * 1e-4
dQ_h *= np.exp(-dt * step / t_h)
if t_r:
dQ_h *= np.exp(-dt * step / (t_r / 2.2))
dQ_h_step[sel_h] += dQ_h
Q_ind_tot_h[sel_h] += dQ_h
I_ind_tot[step] += dQ_h.sum() / dt
# Store
store_if_needed(step, next_step, Q_ind_tot_e=Q_ind_tot_e,
Q_ind_tot_h=Q_ind_tot_h, dt=dt,
dQ_e=dQ_e, dQ_h=dQ_h, T=T, I_ind_e=I_ind_e,
I_ind_h=I_ind_h, p_e=p_e, p_h=p_h, q_max=q0,
i_step=i_step, n_store=n_store,
sel_e=sel_e, sel_h=sel_h)
# Position change in um
d_p_e, d_p_h = v_e * dt * 1e-5, v_h * dt * 1e-5
# Update position
p_e[:, sel_e] = p_e[:, sel_e] + d_p_e
p_h[:, sel_h] = p_h[:, sel_h] + d_p_h
# Correct boundaries (e.g. leaving sensor due to diffusion)
_correct_boundary(geom_descr, pot_descr,
x=p_e[0, :], y=p_e[1, :],
sel=sel_e, is_electron=True)
_correct_boundary(geom_descr, pot_descr,
x=p_h[0, :], y=p_h[1, :],
sel=sel_h, is_electron=False)
# Check boundaries and update selection
sel_e, new_e = _in_boundary(geom_descr, pot_descr=pot_descr,
x=p_e[0, :], y=p_e[1, :],
sel=sel_e)
sel_h, new_h = _in_boundary(geom_descr, pot_descr=pot_descr,
x=p_h[0, :], y=p_h[1, :],
sel=sel_h)
# Force a storing step at for e-h pairs where one is finished
next_step[new_e] = step + 1
next_step[new_h] = step + 1
p_e[:, ~sel_e] = np.nan
p_h[:, ~sel_h] = np.nan
progress_bar.update(step)
progress_bar.finish()
return traj_e, traj_h, I_ind_e, I_ind_h, T, I_ind_tot, Q_ind_tot_e, Q_ind_tot_h