"""Sweep Action."""
from finesse.exceptions import FinesseException, NotChangeableDuringSimulation
from finesse.components import DegreeOfFreedom
from ...parameter import Parameter, GeometricParameter, ParameterRef
from ...element import ModelElement
from ...solutions import ArraySolution
from ..runners import run_axes_scan
from .base import Action, convert_str_to_parameter
from .folder import Folder
import logging
import numpy as np
import warnings
from finesse.warnings import (
InvalidSweepVariableWarning,
ModelParameterSettingWarning,
)
from finesse.env import warn
LOGGER = logging.getLogger(__name__)
[docs]def get_sweep_array(start: float, stop: float, steps: int, mode="lin"):
start = float(start)
stop = float(stop)
steps = int(steps)
if steps <= 0:
raise Exception("Steps must be greater than 0")
if mode == "lin":
arr = np.linspace(start, stop, steps + 1)
elif mode == "log":
arr = np.logspace(np.log10(start), np.log10(stop), steps + 1)
else:
raise ValueError(f"{mode} should be either lin or log")
return arr
# IMPORTANT: renaming this class impacts the katscript spec and should be avoided!
[docs]class Sweep(Action):
"""An action that sweeps N number of parameters through the values in N arrays.
Parameters
----------
args : [Parameter, str], array, boolean
Expects 3 arguments per axis. The first is a full name of a Parameter or
a Parameter object. The second is an array of values to step this
parameter over, and lastly a boolean value to say whether this is a
relative step from the parameters initial value.
pre_step : Action, optional
An action to perform before the step is computed
post_step : Action, optional
An action to perform after the step is computed
reset_parameter : boolean, optional
When true this action will reset the all the parameters it changed to
the values before it ran.
name : str
Name of the action, used to find the solution in the final output.
"""
def __init__(
self, *args, pre_step=None, post_step=None, reset_parameter=True, name="sweep"
):
super().__init__(name)
if len(args) % 3 != 0:
raise Exception(
f"Sweep requires triplet of input arguments: parameter, array, relative_change. Not {args}"
)
self.args = args
self.pre_step = pre_step
self.post_step = post_step
self.reset_parameter = reset_parameter
def process_input_parameter(p):
if isinstance(p, ModelElement):
if p.default_parameter_name is None:
extra = ""
if isinstance(p, DegreeOfFreedom):
extra = f".\nDid you mean '{p.name}.DC'?"
raise ValueError(
f"{repr(p)} does not have a default parameter, please specify "
"one to use" + extra
)
p = getattr(p, p.default_parameter_name)
elif isinstance(p, ParameterRef):
p = p.parameter
if isinstance(p, Parameter):
if not p.changeable_during_simulation:
raise NotChangeableDuringSimulation(
f"Parameter {p.full_name} cannot be changed during a simulation"
)
return p.full_name
else:
return p
self.parameters = tuple(process_input_parameter(p) for p in args[::3])
self.axes = tuple(np.atleast_1d(_).astype(np.float64) for _ in args[1::3])
# Convert bool true or false to a 0 or 1, True meaning we sweep around the
# initial value of the parameter
self.fractional_offset = np.array(args[2::3], dtype=np.float64)
self.out_shape = tuple(np.size(_) for _ in self.axes)
def _requests(self, model, memo, first=True):
params = tuple(convert_str_to_parameter(model, _) for _ in self.parameters)
if self.reset_parameter:
# Get the actual parameter for this xaxis
for p in params:
if p.value is None:
raise FinesseException(
f"Parameters being changed in a simulation must start with a float value not None. Change {repr(p)} to a float value before running the simulation."
)
if any((not p.changeable_during_simulation for p in params)):
raise NotChangeableDuringSimulation(
f"The property {p.full_name} cannot be changed during a simulation"
)
memo["changing_parameters"].extend(self.parameters)
if self.pre_step:
self.pre_step._requests(model, memo)
if self.post_step:
self.post_step._requests(model, memo)
def _do(self, state):
if state.model is None:
raise Exception("No model was provided")
if state.sim is None:
raise Exception("No simulation was provided")
# Get all the parameters that need to be tuned in this action and
# any of its pre/post steps
rq = self.get_requests(state.model)
all_params = tuple(
convert_str_to_parameter(state.model, _) for _ in rq["changing_parameters"]
)
# Get the actual parameter for this sweep
params = tuple(
convert_str_to_parameter(state.model, _) for _ in self.parameters
)
if not all((p.is_tunable for p in all_params)):
raise Exception(
f"Not all parameters {params} are tunable in this simulation {state.sim}"
)
return self._run_sweep(state, params, changing_parameters=all_params)
def _run_sweep(self, state, params, changing_parameters):
# Record intial values of parameters before we go changing
# anything so we can reset them later
if self.reset_parameter:
initial = tuple(float(param.value) for param in changing_parameters)
float_params = np.array(params, dtype=np.float64)
nan_entries = np.isnan(float_params)
if np.any(nan_entries):
warn(
f"Parameters {np.array(params)[nan_entries]} have a NaN initial value and may cause issues",
InvalidSweepVariableWarning,
)
inf_entries = np.isinf(float_params)
inf_offset_sweep_entries = inf_entries & (self.fractional_offset != 0)
if np.any(inf_offset_sweep_entries):
warn(
f"Parameters {np.array(params)[inf_offset_sweep_entries]} have a Inf initial value and are being swept relative to it's self which may result in errors.",
InvalidSweepVariableWarning,
)
sol = ArraySolution(
self.name,
None,
self.out_shape,
self.axes,
params,
)
sol.enable_update(state.sim.detector_workspaces)
# compute actual offsets. As from issue 400 need to be more careful when
# computing offsets as inf * 0 gives nans, and you can't really offset
# sweep around an inf anyway
idx = self.fractional_offset != 0
offsets = np.zeros_like(float_params)
offsets[idx] = float_params[idx] * self.fractional_offset[idx]
# Make new folder structure in solution if we have any actions
# that branch off.
pre_step = Folder("pre_step", self.pre_step, sol) if self.pre_step else None
post_step = Folder("post_step", self.post_step, sol) if self.post_step else None
run_axes_scan(
state,
self.axes,
params,
offsets,
self.out_shape,
sol,
pre_step,
post_step,
progress_bar=True,
progress_bar_desc=self.name,
)
if self.reset_parameter:
with warnings.catch_warnings():
warnings.filterwarnings("ignore", category=ModelParameterSettingWarning)
# Reset all parameters and if we were changing a geometric parameter
# reset the beamtrace data to initial state
for i, param in zip(initial, changing_parameters):
param.value = i
# Ensure the __cvalue of each symbolic parameter gets reset accordingly
for param in state.sim.changing_parameters:
param._reset_cvalue()
if any(
type(p) is GeometricParameter and p.is_symbolic
for p in state.sim.changing_parameters
):
state.model._update_symbolic_abcds()
# Need to check all changing parameters incase of symbols
# if any(type(p) is GeometricParameter for p in state.sim.changing_parameters):
# state.model.beam_trace()
return sol