import logging
from libc.stdlib cimport free, calloc
cimport cython
from finesse import BeamParam
from finesse.env import warn
from finesse.frequency cimport Frequency, FrequencyContainer
from finesse.cymath cimport complex_t
from finesse.cymath.math cimport float_eq
from finesse.cymath.complex cimport conj
from finesse.cymath.gaussbeam cimport beam_param, transform_q, inv_transform_q
from finesse.symbols import Constant
from finesse.exceptions import BeamTraceException, NotChangeableDuringSimulation
from finesse.components.modal.cavity cimport CavityWorkspace
from finesse.warnings import CavityUnstableWarning
from finesse.utilities.collections import OrderedSet
from finesse.utilities.cyomp cimport determine_nthreads_even
from finesse.parameter cimport Parameter
from finesse.simulations.workspace cimport ABCDWorkspace
from finesse.element_workspace cimport ElementWorkspace
LOGGER = logging.getLogger(__name__)
[docs]cdef class BaseSimulation:
"""This base level class should be inherited by others to perform the exact
needs of a particular type of simulation. This BaseSimulation class contains
the methods to store common settings as well as functionality for beam tracing
through a model, and detecting which beam parameters will be changing.
Developers should ensure that methods raising `NotImplementedError` are defined
in any deriving classes.
"""
def __init__(self, model, unicode name, dict simulation_options):
self.model = model
self.name = name
self.initial_trace_sol = None
self.trace = NULL
self.changing_mismatch_couplings = ()
self.contingent_trace_forests = {}
self.needs_reflag_changing_q = False
self.simulation_options = simulation_options
self.workspace_name_map = {}
def __dealloc__(self):
if self.trace:
free(self.trace)
def build_carrier_solver(self, nodes, optical_frequencies):
raise NotImplementedError()
def build_signal_solver(self, nodes, optical_frequencies, signal_frequencies):
raise NotImplementedError()
def build(self):
cf = self.carrier_frequencies_to_use
self.carrier = self.build_carrier_solver(self.optical_nodes_to_use, cf)
if self.compute_signals:
osf, sf, =self.signal_optical_frequencies_to_use, self.signal_frequencies_to_use
self.signal = self.build_signal_solver(self.optical_nodes_to_use + self.signal_nodes_to_use, osf, sf)
self.setup_build()
def setup_build(self):
raise NotImplementedError()
def pre_build(self):
"""
Pre-build performs some common setup routine that should be applicable to
any deriving simulation class. This method performs the following tasks:
* Checks if the signal frequency (fsig) is changing and if its value
is None. If so, it sets a default value of 1 Hz.
* Creates a list of all the changing parameters in the simulation.
* Creates a set of tunable parameters from the changing parameters.
* Determines if signal computation is required based on the value of
fsig.f. Sets self.compute_signals
* Determines if the simulation is modal or not. Sets self.is_modal
* Initializes the model settings
* Initializes the simulation configuration data.
* Generates carrier frequencies based on the model.
* Initializes the trace forest if the simulation is modal.
* Determines the changing beam parameters if the simulation is modal.
"""
if self.model.fsig.f.is_changing and self.model.fsig.f.value is None:
warn(
"Signal frequency (fsig) was set to None but simulation needs it. "
"Setting default value of 1 Hz"
)
self.model.fsig.f.value = 1
self.changing_parameters = OrderedSet(
(p for p in self.model.all_parameters if p.is_changing)
)
for p in self.changing_parameters:
if not p.changeable_during_simulation:
raise NotChangeableDuringSimulation(
f"Parameter {p} cannot be changed during a simulation. Loop over this parameter changed with a Python for loop instead."
)
self.tunable_parameters = OrderedSet(
p for p in self.changing_parameters if p.is_tunable
)
self.compute_signals = self.model.fsig.f.value is not None
self.is_modal = self.model.is_modal
self.initialise_model_settings()
self.initialise_sim_config_data()
self.optical_nodes_to_use = self.model.optical_nodes
self.carrier_frequencies_to_use = self.generate_carrier_frequencies()
if self.compute_signals:
self.signal_nodes_to_use = list(self.model.get_active_signal_nodes())
self.signal_optical_frequencies_to_use, self.signal_frequencies_to_use = self.generate_signal_frequencies(
self.optical_nodes_to_use + self.signal_nodes_to_use,
self.carrier_frequencies_to_use
)
self.initialise_trace_forest(self.model.optical_nodes)
for el in self.model.elements.values():
el._setup_changing_params()
def post_build(self):
"""Post build calls functions that should be called after everything else has been
built. For example, detectors that might rely on components being set up.
"""
if self.carrier is None:
raise RuntimeError("No carrier simulation was made")
if self.compute_signals and self.signal is None:
raise RuntimeError("No carrier simulation was made")
self.setup_output_workspaces()
def unbuild(self):
if self.carrier is not None:
self.carrier.destruct()
if self.signal is not None:
self.signal.destruct()
for p in self.changing_parameters:
# Reset all changing parameters so their type can change again
(<Parameter>p).__disable_state_type_change = False
# Code below can be used in debug mode to determine if anyone is keeping any
# references to this matrix object, meaning its memory can't be freed.
# This takes ~20ms to do so makes a difference for quick models. Maybe we need
# a debug mode
#_ref = self._M
#self._M = None
# refs = gc.get_referrers(_ref)
# Nref = len(refs)
# if Nref > 0:
# warn(
# f"Something other than the Simulation object (N={Nref}) is keeping"
# f" a reference to the matrix object {repr(self._M)}."
# " Could lead to excessive memory usage if not released."
# )
# for _ in refs:
# warn(f" - {repr(_)}")
#del _ref
cpdef update_cavities(self):
cdef CavityWorkspace ws
for ws in self.cavity_workspaces.values():
ws.update()
cpdef update_map_data(self):
"""This will cycle through each map being used and if
it is defined by a function it will be evaluated again.
"""
cdef ABCDWorkspace ws
# This could be made more efficient by just storing the
# a list of those with changing maps
for ws in self.to_scatter_matrix_compute:
ws.update_map_data()
cdef int set_gouy_phases(self) except -1:
cdef ABCDWorkspace ws
cdef int rtn
for ws in self.gouy_phase_workspaces:
if ws.fn_gouy_c is not None:
rtn = ws.fn_gouy_c.func(ws)
elif ws.fn_gouy_py is not None:
rtn = ws.fn_gouy_py(ws)
if rtn:
return rtn
return 0
cpdef modal_update(self):
"""Updates HOM related dependencies / properties of the model.
These updates are as follows:
* Execute a beam trace on the changing trace trees
* Computes the changing scattering matrices
* Calculates the Gouy phase of Spaces and Laser power coefficients
Returns
-------
validity : bool
True if the modal update was successful, or False if an unstable
cavity combination prevented a beam trace from being performed.
"""
# Evaluate changing properties of cavity workspaces
self.update_cavities()
if self.retrace:
if not self.trace_beam():
return False
# Update the changing Gouy phases at spaces
# and TEM Gouy phases at lasers
self.set_gouy_phases()
return True
cdef int _determine_changing_beam_params(
self, TraceForest forest=None, bint set_tree_node_ids=True,
):
if self.trace == NULL:
raise RuntimeError("trace is NULL")
cdef:
Py_ssize_t i
Py_ssize_t num_nodes = len(self.model.optical_nodes)
ABCDWorkspace kws
# Re-set all beam parameter changing flags to false initially
for i in range(num_nodes):
self.trace[i].is_fixed = True
if self.retrace:
LOGGER.info("Flagging changing beam parameters.")
# Prepare the forest for simulation by setting all the node_id attributes
# and flag the corresponding self.trace entries as changing
self._setup_trace_forest(forest, set_tree_node_ids)
# Now tell each knm workspace whether it is changing or not
# so that only changing scattering matrices get recomputed
# from here on
if self.to_scatter_matrix_compute is not None:
for kws in self.to_scatter_matrix_compute:
kws.flag_changing_beam_parameters(self.changing_mismatch_edges)
return 0
def is_component_in_mismatch_couplings(self, comp):
"""Determines whether the connector `comp` is associated with any
of the node couplings in the stored changing mismatch couplings.
.. note::
This method can be replaced if connectors eventually use more
granular refill flags --- i.e. per coupling refill flags. Then
the check for refilling that coupling can simply include the
condition ``(from_node, to_node) in sim.changing_mismatch_couplings``.
"""
return any(node.component is comp for node, _ in self.changing_mismatch_couplings)
cdef _setup_trace_forest(self, TraceForest forest=None, bint set_tree_node_ids=True):
cdef:
Py_ssize_t tree_idx
TraceTree tree
if forest is None:
forest = self.trace_forest
for tree_idx in range(forest.size()):
tree = forest.forest[tree_idx]
self._setup_single_trace_tree(tree, set_tree_node_ids)
cdef _setup_single_trace_tree(self, TraceTree tree, bint set_tree_node_ids=True):
cdef:
TraceTree ltree = tree.left
TraceTree rtree = tree.right
# Only ever need to do this once, so avoid repeating when reflagging
# changing beam params after exiting unstable cavity regions
if set_tree_node_ids:
tree.node_id = self.trace_node_index[tree.node]
tree.opp_node_id = self.trace_node_index[tree.node.opposite]
self.trace[tree.node_id].is_fixed = False
self.trace[tree.opp_node_id].is_fixed = False
if ltree is not None:
self._setup_single_trace_tree(ltree)
if rtree is not None:
self._setup_single_trace_tree(rtree)
cdef tuple _find_new_unstable_cavities(self) :
cdef:
Py_ssize_t tree_idx
TraceTree tree
CavityWorkspace cav_ws
bint source_is_cav
double gx, gy
list ch_unstable_cavities = []
for tree_idx in range(self.trace_forest.size()):
tree = self.trace_forest.forest[tree_idx]
if tree.is_source:
cav_ws = self.cavity_workspaces.get(tree.dependency)
source_is_cav = cav_ws is not None
if source_is_cav: # Tree is an internal cavity tree
# The geometrically changing cavity has become unstable
# so inform that this is the case
if not cav_ws.is_stable:
ch_unstable_cavities.append(cav_ws.owner)
gx = cav_ws.gx
gy = cav_ws.gy
if float_eq(gx, gy):
warn(
f"Cavity {repr(tree.dependency.name)} is unstable with "
f"g = {gx}",
CavityUnstableWarning
)
else:
warn(
f"Cavity {repr(tree.dependency.name)} is unstable with "
f"gx = {gx}, gy = {gy}",
CavityUnstableWarning
)
if not ch_unstable_cavities:
return ()
# Return tuple of the unstable cavities sorted by name so that
# all permutations of the same combination of cavities give same
# tuple --- important for look-ups in contingent_trace_forests dict
return tuple(sorted(ch_unstable_cavities, key=lambda x: x.name))
cdef TraceForest _initialise_contingent_forest(self, tuple unstable_cavities) :
cdef TraceForest contingent_forest = TraceForest(self.model, self.trace_forest.symmetric)
cdef TraceForest model_trace_forest = self.model.trace_forest
cdef list order = model_trace_forest.dependencies.copy()
for uc in unstable_cavities:
order.remove(uc)
# If there are no dependencies left after disabling the
# unstable cavities then a beam trace cannot be performed
# at this data point so no forest can be planted
if not order:
warn(
"Cannot build a contingent trace forest as the simulation "
"is in a regime with no stable cavities nor Gauss objects.",
CavityUnstableWarning
)
return None
contingent_forest.plant(order)
if self._determine_changing_beam_params(contingent_forest):
return None
return contingent_forest
@cython.initializedcheck(False)
cdef void _propagate_trace(self, TraceTree tree, bint symmetric) noexcept:
cdef:
TraceTree ltree = tree.left
TraceTree rtree = tree.right
const NodeBeamParam* q1 = &self.trace[tree.node_id]
complex_t qx1 = q1.qx.q
complex_t qy1 = q1.qy.q
complex_t qx2, qy2
if ltree is not None:
# For non-symmetric traces we have some special checks
# to do on trees which couldn't be reached from the
# other dependency trees. Note these are only performed
# on the left tree; see TraceForest._add_backwards_nonsymm_trees
# for details.
if symmetric or (not tree.do_nonsymm_reverse and not tree.do_inv_transform):
qx2 = transform_q(tree.left_abcd_x, qx1, tree.nr, ltree.nr)
qy2 = transform_q(tree.left_abcd_y, qy1, tree.nr, ltree.nr)
elif tree.do_inv_transform:
# Can't reach tree directly but there is a coupling from ltree.node
# to tree.node so apply the inverse abcd law to get correct q
qx2 = inv_transform_q(tree.left_abcd_x, qx1, tree.nr, ltree.nr)
qy2 = inv_transform_q(tree.left_abcd_y, qy1, tree.nr, ltree.nr)
else:
# Really is no way to get to the node (no coupling from ltree.node to
# tree.node) so only option now is to reverse q for ltree node entry
qx2 = -conj(qx1)
qy2 = -conj(qy1)
self.trace[ltree.node_id].qx.q = qx2
self.trace[ltree.node_id].qy.q = qy2
if symmetric:
self.trace[ltree.opp_node_id].qx.q = -conj(qx2)
self.trace[ltree.opp_node_id].qy.q = -conj(qy2)
self._propagate_trace(ltree, symmetric)
if rtree is not None:
qx2 = transform_q(tree.right_abcd_x, qx1, tree.nr, rtree.nr)
qy2 = transform_q(tree.right_abcd_y, qy1, tree.nr, rtree.nr)
self.trace[rtree.node_id].qx.q = qx2
self.trace[rtree.node_id].qy.q = qy2
if symmetric:
self.trace[rtree.opp_node_id].qx.q = -conj(qx2)
self.trace[rtree.opp_node_id].qy.q = -conj(qy2)
self._propagate_trace(rtree, symmetric)
cpdef trace_beam(self) :
"""Traces the beam through the paths which are dependent upon changing
geometric parameter(s).
This method will modify those entries in the ``self.trace`` C array
which were previously determined to have changing beam parameter values.
Returns
-------
validity : bool
True if the tracing was successful, or False if an unstable
cavity combination prevented a beam trace from being performed.
Raises
------
ex : :class:`.BeamTraceException`
If the ``"unstable_handling"`` entry of the associated
:attr:`.Model.sim_trace_config` dict is ``"abort"`` and
any unstable cavities were encountered.
"""
cdef:
TraceTree tree
Py_ssize_t tree_idx
CavityWorkspace cav_ws
bint source_is_cav
complex_t qx_src, qy_src
# The actual trace forest which gets traced. In most circumstances
# this will be self.trace_forest but if changing cavities enter an
# unstable regime then this will be temporarily swapped out for the
# contingent forest for this data point (see below).
TraceForest trace_forest
# Objects necessary for dealing with newly unstable cavities
tuple ch_unstable_cavities
TraceForest contingent_forest
# No changing beam parameters, do nothing
if self.trace_forest.empty():
return True
# First we loop over the source trees and find any changing
# cavities which have become unstable
ch_unstable_cavities = self._find_new_unstable_cavities()
# If we did find any newly unstable cavities then the current trace_forest
# is invalidated at the current data point so we must build a new forest with
# the unstable cavities disabled
# NOTE (sjr) Don't worry about increased Python interaction in
# this block as this will rarely be executed anyway
if ch_unstable_cavities:
unstable_handling = self.model.sim_trace_config["unstable_handling"]
# Abort simulation if tracing config set-up to do so when
# encountering any unstable cavities
if unstable_handling == "abort":
raise BeamTraceException(
"Aborting simulation due to presence of unstable cavities: "
f"{', '.join([cav.name for cav in ch_unstable_cavities])}"
)
# Or if tracing config is set-up to abort only the retrace when
# any unstable cavities encountered, flag this to notify that
# appropriate detector outputs should be masked
if unstable_handling == "mask":
LOGGER.info(
"Aborting retrace as simulation tracing configuration is set-up "
"to mask at any data point(s) where unstable cavities occur."
)
return False
LOGGER.info(
"Attempting to use a contingent trace forest due "
"to the presence of unstable cavities"
)
# Look-up the combination of unstable cavities to see if a
# contingent forest was already built from this
contingent_forest = self.contingent_trace_forests.get(ch_unstable_cavities)
# No previous forest built from the given combination of disabled
# unstable cavities so need to build one here
if contingent_forest is None:
LOGGER.debug(
"For unstable cavity combination %s no cached contingent "
"trace forest found, now attempting to build a new one...",
[uc.name for uc in ch_unstable_cavities],
)
contingent_forest = self._initialise_contingent_forest(ch_unstable_cavities)
# If there are no dependencies left after disabling the
# unstable cavities then a beam trace cannot be performed
# at this data point so inform of this on return
if contingent_forest is None:
return False
# Cache the contingent forest for this combination of unstable
# cavities as these typically occur in blocks (or across strides)
# of data points so we don't want to keep rebuilding the same
# contingency forests for identical unstable cavity combos
self.contingent_trace_forests[ch_unstable_cavities] = contingent_forest
else:
LOGGER.debug(
"For unstable cavity combination %s found and using "
"cached contingent trace forest:%s",
[uc.name for uc in ch_unstable_cavities],
contingent_forest
)
# Make sure only the correctly changing beam parameters, according
# to self.trace_forest, get reflagged when exiting from the unstable
# region again
self.needs_reflag_changing_q = True
# Use the contingent forest for this data point
trace_forest = contingent_forest
# Otherwise we just use the standard changing trace forest of the simulation
else:
trace_forest = self.trace_forest
# If we've just exited an unstable cavity region where a contingent trace
# forest was being used, then we need to reflag the beam parameters which
# are changing
if self.needs_reflag_changing_q:
if self._determine_changing_beam_params(forest=None, set_tree_node_ids=False):
return False
self.needs_reflag_changing_q = False
# Now do the actual beam tracing by simply traversing the forest
# and propagating the beam through each tree
for tree_idx in range(trace_forest.size()):
tree = trace_forest.forest[tree_idx]
if tree.is_source:
cav_ws = self.cavity_workspaces.get(tree.dependency)
source_is_cav = cav_ws is not None
if not source_is_cav: # Source tree is from a Gauss
# TODO (sjr) Should probably make some workspace for Gauss objects
# which then uses cy_expr's for evaluating these things
# if they're symbolic. But for now this will do.
qx_src = complex(tree.dependency.qx.q)
qy_src = complex(tree.dependency.qy.q)
else: # Source tree is from a Cavity
qx_src = cav_ws.qx
qy_src = cav_ws.qy
self.trace[tree.node_id].qx.q = qx_src
self.trace[tree.node_id].qy.q = qy_src
if trace_forest.symmetric:
self.trace[tree.opp_node_id].qx.q = -conj(qx_src)
self.trace[tree.opp_node_id].qy.q = -conj(qy_src)
self._propagate_trace(tree, trace_forest.symmetric)
return True
cpdef run_carrier(self) :
"""Runs the carrier matrix solver for the current state of the model.
This will update all the C based structs with the current model state so
that filling and calculations can be performed.
Returns
-------
validity : bool
True if this was a valid run, or False if a recoverable error occurred
which results in the output being invalid for this call.
"""
# NOTE (sjr) Just updating all parameter values on each call to run for
# now. This may not be the most optimal thing to do, but it
# avoids duplicating these parameter update calls in different
# places (e.g. refill, compute_knm_matrices, set_gouy_phases) and
# should be safe in that no parameters get accidentally missed at
# any data point.
# ddb - this just updates everything, even things that are not changing as
# it acts on all the workspaces, probably not the best idea
self.update_all_parameter_values()
# Update HOM stuff
if self.is_modal:
# Immediately return if invalid beam trace region encountered
# no need to go ahead and fill or solve as they won't be used
if not self.modal_update():
return False
self.carrier.run()
return True
cpdef run_signal(self, solve_noises=True) :
"""Runs the signal matrix solver for the current state. This function should assume that
a call to the `run_carrier` method has preceeded it. Many modal and parameter updates
should happen in there already, so do not need to be repeated here.
"""
self.model_settings.fsig = float(self.model.fsig.f.value)
# Probably some other preparatory stuff needs to go here in the future
self.signal.run()
# Then ask components for their noise contributions
if solve_noises and self.signal.noise_sources:
self.signal.fill_noise_inputs()
self.signal.solve_noises()
def setup_output_workspaces(self):
from finesse.detectors.general import NoiseDetector
from finesse.components.readout import _Readout
from finesse.detectors.workspace import DetectorWorkspace
# Once the simulations are started we can tell all the detectors to
# prepare themselves
self.detector_workspaces = []
self.readout_workspaces = []
for rd in self.model.get_elements_of_type(_Readout):
# Readouts can emulate multiple detectors, so here we
# get a collection of them depending on what the readout
# is doing and add them to the list
wss = rd._get_output_workspaces(self)
if wss is not None:
# Get the name of the readout output for storing
# bit of a hack to get the attributes out of the
# SimpleNamespace object used
for name, ws in zip(rd.outputs.__dict__.values(), wss): # check we can iterate over the returned workspaces
if not isinstance(ws, DetectorWorkspace):
raise TypeError(f"Readout detector ({rd}) workspace ({ws}) not a DetectorWorkspace type")
self.readout_workspaces.append(ws)
self.workspace_name_map[name] = ws
if rd.output_detectors:
self.detector_workspaces.append(ws)
for det in self.model.detectors:
ws = det._get_workspace(self)
self.workspace_name_map[det.name] = ws
if ws is not None:
if not isinstance(ws, DetectorWorkspace):
raise TypeError(f"Detector ({det}) workspace ({ws}) not a DetectorWorkspace type")
self.detector_workspaces.append(ws)
if self.signal and isinstance(ws.owner, NoiseDetector):
self.signal.workspaces.noise_detectors.append(ws)
for _ in self.detector_workspaces:
_.compile_cy_exprs()
if self.signal:
self.signal.workspaces.detector_list_to_C()
def __enter__(self):
self.pre_build()
self.build()
self.post_build()
def __exit__(self, type_, value, traceback):
self.unbuild()
cpdef update_all_parameter_values(self) :
"""Loops through all workspaces to update the C structs so they
represent the current model element parameter values.
"""
cdef:
ElementWorkspace ws
Py_ssize_t i
# TODO (sjr) Should probably cache these or move away from
# lists for best performance
Py_ssize_t Ncws = len(self.workspaces)
for i in range(Ncws):
ws = self.workspaces[i]
ws.update_parameter_values()
def get_q(self, node):
"""Returns a tuple of (qx, qy) for a given node. The returned
value is only valid until this simulations trace forest has
been updated.
Parameters
----------
node : :class:`finesse.components.OpticalNode`
Node to get beam parameters at
Returns
-------
(qx, qy)
Tuple of x and y beam parameters
"""
cdef NodeBeamParam *nodebp
idx = self.trace_node_index[node]
if idx >= 0 and idx < len(self.model.optical_nodes):
nodebp = &self.trace[idx]
return (
BeamParam(q=nodebp.qx.q, nr=nodebp.qx.nr, wavelength=nodebp.qx.wavelength),
BeamParam(q=nodebp.qy.q, nr=nodebp.qy.nr, wavelength=nodebp.qy.wavelength),
)
else:
raise IndexError("Node index is not in simulation")
cdef initialise_trace_forest(self, optical_nodes) :
cdef TraceForest model_trace_forest
cdef double nr
self.trace_node_index = {n: i for i, n in enumerate(optical_nodes)}
# Before we setup the workspaces some initial beam trace must be done
# so that workspaces can initialise themselves
if self.is_modal:
# Make sure the model trace forest gets re-planted
# when building a new simulation
self.model._rebuild_trace_forest = True
LOGGER.info(
"Performing initial beam trace with configuration options:\n %s",
self.model.sim_trace_config,
)
# Plant the model trace_forest and execute initial beam trace
self.initial_trace_sol = self.model.beam_trace(**self.model.sim_initial_trace_args)
model_trace_forest = self.model.trace_forest
self.nodes_with_changing_q = model_trace_forest.get_nodes_with_changing_q()
self.changing_mismatch_edges = OrderedSet()
LOGGER.info(
"Nodes with changing q during simulation:\n %s",
self.nodes_with_changing_q,
)
self.retrace = self.model.sim_trace_config["retrace"]
self.trace = <NodeBeamParam*> calloc(len(optical_nodes), sizeof(NodeBeamParam))
if not self.trace:
raise MemoryError()
for i, n in enumerate(optical_nodes):
qx, qy = self.initial_trace_sol[n]
nr = qx.nr
self.trace[i] = NodeBeamParam(
beam_param(qx.q, nr, self.model_settings.lambda0),
beam_param(qy.q, nr, self.model_settings.lambda0),
n in self.nodes_with_changing_q
)
if self.retrace:
# Construct the forest of changing trace trees - it's important
# that this is done before initialising connector workspaces as
# they need the changing forest to be present for refill flag
self.trace_forest = model_trace_forest.make_changing_forest()
self.retrace &= not self.trace_forest.empty()
if self.retrace:
LOGGER.info(
"Determined changing trace trees:%s", self.trace_forest
)
# Get the nodes at which trees of the changing forest intersect
# with trees of the full forest which have different trace
# dependencies. These couplings will have potentially changing
# mode mismatches during the simulation.
self.changing_mismatch_couplings = self.trace_forest.find_potential_mismatch_couplings(
model_trace_forest
)
# The above returns node objects. It is more useful to also store
# the equivalent simulation specific "node IDs" which is just an
# integer and used more in the cythonised code as some nodes get
# dropped when not used in a simulation.
# Optical nodes for signal and carrier all have the same IDs
for i, (ni, no) in enumerate(self.changing_mismatch_couplings):
self.changing_mismatch_edges.add(
tuple((self.trace_node_index[ni], self.trace_node_index[no]))
)
if self.changing_mismatch_couplings:
LOGGER.info(
"Found changing mismatched node couplings: %s",
[f"{n1.full_name} -> {n2.full_name}"
for n1, n2 in self.changing_mismatch_couplings]
)
else:
self.trace_forest = TraceForest(self.model, self.model.sim_trace_config["symmetric"])
else:
# just make an empty TraceForest
self.trace_forest = TraceForest(self.model, self.model.sim_trace_config["symmetric"])
cdef initialise_model_settings(self) :
self.model_settings = self.model._settings
if self.model.fsig.f.value is None:
self.model_settings.fsig = 0
else:
self.model_settings.fsig = float(self.model.fsig.f.value)
cdef initialise_sim_config_data(self) :
# Nominal number of threads will be Nhoms / 10
self.config_data.nthreads_homs = determine_nthreads_even(self.model_settings.num_HOMs, 10)
LOGGER.info("Using %d threads for HOM parallel loops.", self.config_data.nthreads_homs)
def generate_carrier_frequencies(self):
"""Returns a list of Frequency objects that the model has requested"""
from finesse.frequency import Frequency, generate_frequency_list
if len(self.model._frequency_generators) == 0:
# Nothing in the model is generating a carrier frequency. Typical
# situation is when no laser is included for signal modelling.
# Simple solution, just use a default 0Hz
frequencies_to_use = [Constant(0.0)]
else:
frequencies_to_use = generate_frequency_list(self.model)
carrier_frequencies = list()
LOGGER.info("Generating simulation with user carrier frequencies %s", self.model.frequencies)
for i, f in enumerate(self.model.frequencies):
try:
f_name = str(f.eval(keep_changing_symbols=True))
except AttributeError:
f = Constant(f)
f_name = str(f)
carrier_frequencies.append(Frequency(f_name, f, index=i))
N = len(self.model.frequencies)
LOGGER.info("Generating simulation with carrier frequencies %s", frequencies_to_use)
for i, f in enumerate(frequencies_to_use):
carrier_frequencies.append(
Frequency(
str(f.eval(keep_changing_symbols=True)),
f,
index=N+i
)
)
fcnt = FrequencyContainer(carrier_frequencies)
return fcnt
def generate_signal_frequencies(self, nodes, FrequencyContainer carrier_optical_frequencies):
"""Generates the optical, mechanical, and electrical frequencies that should be
modelled by the signal simulation.
"""
from finesse.components.node import NodeType
optical_frequencies = [] # All optical frequencies are present at all nodes
# elec and mech can have different frequencies on a per node basis
signal_frequencies = {}
for i, f in enumerate(carrier_optical_frequencies.frequencies):
fp = f.f + self.model.fsig.f.ref
fm = f.f - self.model.fsig.f.ref
optical_frequencies.append(
Frequency(str(fp.eval(keep_changing_symbols=True)),
fp, index=2*i, audio_order=1,
audio=True, audio_carrier_index=i,
audio_carrier_object=f)
)
optical_frequencies.append(
Frequency(str(fm.eval(keep_changing_symbols=True)),
fm, index=2*i+1, audio_order=-1,
audio=True, audio_carrier_index=i,
audio_carrier_object=f)
)
fcnt = FrequencyContainer(optical_frequencies, carrier_cnt=carrier_optical_frequencies)
# Audio matrix frequencies are more complicated as they can have multiple frequencies
# in mechanical and electrical, on a per-node basis...
fsig = FrequencyContainer(
(Frequency("fsig", self.model.fsig.f.ref, index=0), )
)
for node in nodes:
if node.type == NodeType.OPTICAL:
continue
#-----------------------------------------------------------------------------------
# Mechanical frequencies
#-----------------------------------------------------------------------------------
# By default mechanical frequencies just have a single frequency at the Model.fsig.f
# However for more complicated systems we can have multiple frequencies.
elif node.type == NodeType.MECHANICAL:
fs = []
freqs = node.frequencies
if len(freqs) == 1 and freqs[0] == self.model.fsig.f.ref:
# Most components will just use a single fsig so reuse same object
# for efficient filling later
signal_frequencies[node] = fsig
else:
for i, sym in enumerate(node.frequencies):
fs.append(
Frequency(
str(fm.eval(keep_changing_symbols=True)),
sym,
index=i,
)
)
signal_frequencies[node] = FrequencyContainer(tuple(fs))
#-----------------------------------------------------------------------------------
# Electrical frequencies
#-----------------------------------------------------------------------------------
elif node.type == NodeType.ELECTRICAL:
signal_frequencies[node] = fsig
else:
raise ValueError("Unexpected")
return fcnt, signal_frequencies