""" This module implements specific pulse retrieval algorithms, e.g.,
COPRA, GPA, PCGPA, etc.
"""
import numpy as np
from scipy.optimize import minimize_scalar
from .retriever import BaseRetriever
from .. import lib
[docs]class StepRetriever(BaseRetriever):
def _retrieve(self):
# local rename
o = self.options
log = self.log
res = self._result
rs = self._retrieval_state
# store current guess in attribute
spectrum = self.initial_guess.copy()
# initialize R
R = res.trace_error
for i in range(o.maxiter):
# store trace error and spectrum for later analysis
if self.logging:
# if the trace error was only approximated, calculate it here
if rs.approximate_error:
R = self.trace_error(spectrum, store=False)
log.trace_error.append(R)
# Perform a single retriever step in one of the algorithms
R, new_spectrum = self._retrieve_step(i, spectrum.copy())
# update the solution if the result is better
if R < res.trace_error:
# R is calculated for the input, i.e., the old spectrum.
res.trace_error = R
res.approximate_error = rs.approximate_error
res.spectrum[:] = spectrum # store the old spectrum
rs.steps_since_improvement = 0
else:
rs.steps_since_improvement += 1
# accept the new spectrum
spectrum[:] = new_spectrum
if self.verbose:
if i == 0:
print("iteration".ljust(10) + "trace error".ljust(20))
s = "{:d}".format(i + 1).ljust(10)
if rs.approximate_error:
s += "~"
else:
s += " "
s += "{:.10e}".format(R)
if R == res.trace_error:
s += "*"
print(s)
if not rs.running:
break
if self.verbose:
print()
print("~ approximate trace error")
print("* accepted as best trace error")
print()
# return the retrieved spectrum
# for a more detailed analysis call self.result()
return res.spectrum
[docs]class COPRARetriever(StepRetriever):
""" This module implements the common pulse retrieval algorithm
[Geib2019]_.
"""
method = "copra"
[docs] def __init__(self, pnps, alpha=0.25, **kwargs):
""" For a full documentation of the arguments see :class:`Retriever`.
Parameters
----------
alpha : float, optional
Scales the step size in the global stage of COPRA. Higher values
mean potentially faster convergence but less accuracy. Lower
values provide higher accuracy for the cost of speed. Default is
0.25.
"""
super().__init__(pnps, alpha=alpha, **kwargs)
def _retrieve_begin(self, measurement, initial_guess, weights):
super()._retrieve_begin(measurement, initial_guess, weights)
pnps = self.pnps
rs = self._retrieval_state
rs.mode = "local" # COPRA starts with local mode
# calculate the maximum gradient norm
# self.trace_error() was called beforehand -> rs.Tmn and rs.Smk exist!
Smk2 = self._project(self.Tmn_meas / rs.mu, rs.Smk)
nablaZnm = pnps.gradient(Smk2, self.parameter)
rs.current_max_gradient = np.max(np.sum(lib.abs2(nablaZnm), axis=1))
def _retrieve_step(self, iteration, En):
""" Perform a single COPRA step.
Parameters
----------
iteration : int
The current iteration number - mainly for logging.
En : 1d-array
The current pulse spectrum.
"""
# local rename
ft = self.ft
options = self.options
pnps = self.pnps
rs = self._retrieval_state
Tmn_meas = self.Tmn_meas
# current gradient -> last gradient
rs.previous_max_gradient = rs.current_max_gradient
rs.current_max_gradient = 0.0
# switch iteration
if rs.steps_since_improvement == 5:
rs.mode = "global"
# local iteration
if rs.mode == "local":
# running estimate for the trace
Tmn = np.zeros((self.M, self.N))
for m in np.random.permutation(np.arange(self.M)):
p = self.parameter[m]
Tmn[m, :] = pnps.calculate(En, p)
Smk2 = self._project(Tmn_meas[m, :] / rs.mu, pnps.Smk)
nablaZnm = pnps.gradient(Smk2, p)
# calculate the step size
Zm = lib.norm2(Smk2 - pnps.Smk)
gradient_norm = lib.norm2(nablaZnm)
if gradient_norm > rs.current_max_gradient:
rs.current_max_gradient = gradient_norm
gamma = Zm / max(rs.current_max_gradient,
rs.previous_max_gradient)
# update the spectrum
En -= gamma * nablaZnm
# Tmn is only an approximation as En changed in the iteration!
rs.approximate_error = True
R = self._R(Tmn) # updates rs.mu!!!
# global iteration
elif rs.mode == "global":
Tmn = pnps.calculate(En, self.parameter)
r = self._r(Tmn)
R = self._Rr(r) # updates rs.mu!!!
rs.approximate_error = False
# gradient descent w.r.t. Smk
w2 = self._weights * self._weights
gradrmk = (-4 * ft.dt / (ft.dw * lib.twopi) *
ft.backward(rs.mu * ft.forward(pnps.Smk) *
(Tmn_meas - rs.mu * Tmn) * w2))
etar = options.alpha * r / lib.norm2(gradrmk)
Smk2 = pnps.Smk - etar * gradrmk
# gradient descent w.r.t. En
nablaZn = pnps.gradient(Smk2, self.parameter).sum(axis=0)
# calculate the step size
Z = lib.norm2(Smk2 - pnps.Smk)
etaz = options.alpha * Z / lib.norm2(nablaZn)
# update the spectrum
En -= etaz * nablaZn
return R, En
[docs]class PCGPARetriever(StepRetriever):
""" This class implements the principal components generalized projections
algorithm (PCGPA) for SHG-FROG.
We follow the algorithm as described in [Kane1999]_ but use the PNPS
formalism from [Geib2019]_ and some minor modifications:
- it supports both the singular value decomposition and the power
method to find/approximate the largest eigenvector.
- the projection includes the scaling factor µ. This makes the method
robust against initial guesses with the wrong magnitude. It should
have no adverse effect.
"""
method = "pcgpa"
supported_schemes = ["shg-frog"]
[docs] def __init__(self, pnps, decomposition="power", **kwargs):
""" For a full documentation of the arguments see :class:`Retriever`.
Parameters
----------
decomposition : str, optional
It specifies how the FROG signal is decomposed. If `power` (the
default) the power method is used to find the largest eigenvalue.
If `svd` a full singular value decomposition is performed. This
is potentially much slower but more accurate.
"""
super().__init__(pnps, decomposition=decomposition, **kwargs)
def _retrieve_begin(self, measurement, initial_guess, weights):
super()._retrieve_begin(measurement, initial_guess, weights)
if np.any(self.parameter != measurement.axes[0]):
raise ValueError("The delay has to be sampled exactly at the "
"temporal simulation grid.")
def _retrieve_step(self, iteration, En):
# local rename
ft = self.ft
options = self.options
pnps = self.pnps
Tmn_meas = self.Tmn_meas
rs = self._retrieval_state
R = self.trace_error(En) # updates rs.mu!!!
# project on measured intensity
Smk2 = self._project(Tmn_meas / rs.mu, pnps.Smk)
# to outer product form
for n in range(ft.N):
Smk2[:, n] = np.roll(Smk2[::-1, n], n)
if options.decomposition == "power":
# apply power method iteration once
Ek = ft.backward(En)
Ek[:] = Smk2.conj() @ Ek
Ek[:] = Ek.conj() / lib.norm(Ek)
elif options.decomposition == "svd":
# use full svd (slow!)
U, s, V = np.linalg.svd(Smk2)
Ek = U[:, 0] * np.sqrt(s[0]) # select U
ft.forward(Ek, out=En)
return R, En
[docs]class GPARetriever(StepRetriever):
""" Implements the classical generalized projections algorithm for
SHG-FROG as described in [DeLong1994]_ and [Trebino2000].
As far as I know the determination of the step size in GPA is not
made explicit in the publications. It is usually done in a line search.
In this implementation we offer three different options:
- an exact line search using a Brent style minimizer
- a backtracking (inexact) line search using the Armijo-Goldstein
condition with c=0.5 and tau=0.5.
- the same heuristic choice for the step size used in copra.
The last method is the fastest, but as the first is the classic choice
for GPA, it is the default.
"""
method = "gpa"
supported_schemes = ["shg-frog"]
[docs] def __init__(self, pnps, step_size="exact", **kwargs):
""" For a full documentation of the arguments see :class:`Retriever`.
Parameters
----------
step_size : str, optional
Specifies how the step size of the gradient step in GPA is
determined. Default is `exact` which performs an exact line search.
`inexact` performs a backtracking line search and `copra` uses the
ad-hoc estimates for the step size used in COPRA.
"""
super().__init__(pnps, step_size=step_size, **kwargs)
def _retrieve_begin(self, measurement, initial_guess):
super()._retrieve_begin(measurement, initial_guess)
if np.any(self.parameter != measurement.axes[0]):
raise ValueError("The delay has to be sampled exactly at the "
"temporal simulation grid.")
def _retrieve_step(self, iteration, En):
# local rename
ft = self.ft
options = self.options
pnps = self.pnps
Tmn_meas = self.Tmn_meas
rs = self._retrieval_state
R = self.trace_error(En) # updates rs.mu!!!
# obtain intermediate results
delay, Amk, Ek, Smk, Tmn = pnps.intermediate(self.parameter)
Ek = Ek[0, :] # the same for every parameter
# project on measured intensity
Smk2 = self._project(Tmn_meas / rs.mu, Smk)
# calculate the gradient of Z w.r.t. to the temporal pulse envelope
# by directly implementing (S58) of [Geib2019]_
dS = Smk2 - Smk
indices = np.array(np.rint(self.parameter / ft.dt), dtype=np.int32)
gradient = np.zeros((self.M, self.N), dtype=np.complex128)
for m in range(self.M):
gradient[m, :] = np.roll(dS[m, :] * Ek.conj(), -indices[m])
gradient = -2 * np.sum(gradient + dS * Amk.conj(), axis=0)
# approximate the step size with the "copra" step size
gamma0 = lib.norm2(dS) / lib.norm2(gradient)
if options.step_size == "copra":
# directly choose the copra step size
gamma = gamma0
else:
# do a line search
def objective(gamma):
self.trace_error(ft.forward(Ek - gamma * gradient))
return lib.norm2(Smk2 - pnps.Smk)
if options.step_size == "exact":
# perform an exact line search
bracket = [0.9 * gamma0, gamma0]
ret = minimize_scalar(objective,
bracket=bracket,
method="brent")
gamma = ret.x
elif options.step_size == "inexact":
# perform a back-tracking line search until the Armijo
# condition is fulfilled.
t = 0.5 * lib.norm2(gradient)
tau = 0.5
if iteration == 0:
rs.old_gamma = 5.0 * gamma0
gamma = 2 * rs.old_gamma
objective0 = objective(0.0)
while objective(gamma) - objective0 > -gamma * t:
gamma = gamma * tau
rs.old_gamma = gamma
Ek = Ek - gamma * gradient
ft.forward(Ek, out=En)
return R, En
[docs]class GPDSCANRetriever(StepRetriever):
""" This class implements a pulse retrieval algorithm for SHG and THG
d-scan based on the paper [Miranda2017]_.
In our tests we found that it does not converge in the noiseless case.
In other words the global solution to the least-squares problem is not a
fixed point of the iteration.
"""
supported_schemes = ["shg-dscan", "thg-dscan"]
method = "gp-dscan"
def _retrieve_begin(self, measurement, initial_guess):
super()._retrieve_begin(measurement, initial_guess)
pnps = self.pnps
rs = self._retrieval_state
# calculate phase mask once
rs.Hmn = np.zeros((self.M, self.N), dtype=np.complex128)
for i, p in enumerate(self.parameter):
rs.Hmn[i, :] = pnps.mask(p)
def _retrieve_step(self, iteration, En):
# local rename
ft = self.ft
pnps = self.pnps
Tmn_meas = self.Tmn_meas
rs = self._retrieval_state
Hmn = rs.Hmn
R = self.trace_error(En) # updates rs.mu!!!
# project on measured intensity
Smk2 = self._project(Tmn_meas / rs.mu, pnps.Smk)
# modify En
if pnps.process == "shg":
Smk2 *= ft.backward(Hmn * En).conj()
Smk2 /= np.abs(Smk2)**(2/3)
En = np.sum(Hmn.conj() * ft.forward(Smk2), axis=0)
elif pnps.process == "thg":
Smk2 *= ft.backward(Hmn * En).conj()**2
Smk2 /= np.abs(Smk2)**(4/5)
En = np.sum(Hmn.conj() * ft.forward(Smk2), axis=0)
return R, En
[docs]class PIERetriever(StepRetriever):
""" This class implements a pulse retrieval algorithm for SHG-FROG based on
the ptychographical iterative engine (PIE). It is based on the paper
[Sidorenko2016]_ and its erratum [Sidorenko2017]_.
We modified the algorithm to include the scaling factor µ in the
projection. This makes the method robust against initial guesses with the
wrong magnitude. It should have no adverse effect.
"""
method = "pie"
supported_schemes = ["shg-frog"]
def _retrieve_step(self, iteration, En):
# local rename
ft = self.ft
pnps = self.pnps
rs = self._retrieval_state
Tmn_meas = self.Tmn_meas
# running estimate for the trace
Tmn = np.zeros((self.M, self.N))
# random choice for the step size scaling
beta = np.random.uniform(0.1, 0.5)
for m in np.random.permutation(np.arange(self.M)):
p = self.parameter[m]
Tmn[m, :] = pnps.calculate(En, p)
# get intermediate results from private attribute
delay, Amk, Ek, Smk, _ = pnps._tmp[p]
# project
Smk2 = self._project(Tmn_meas[m, :] / rs.mu, Smk)
# perform update
Ek += beta * Amk.conj() * (Smk2 - Smk) / lib.abs2(Ek).max()
# update the spectrum
ft.forward(Ek, out=En)
# Tmn is only an approximation as En changed in the iteration!
rs.approximate_error = True
R = self._R(Tmn) # updates rs.mu!!!
return R, En