import logging
import numpy as np
cimport numpy as np
from libc.stdlib cimport free, calloc
from finesse.components.node import NodeType
from finesse.frequency cimport FrequencyContainer
LOGGER = logging.getLogger(__name__)
[docs]cdef class MatrixSystemWorkspaces:
def __cinit__(self):
self.ptr_to_refill = NULL
self.ptr_to_rhs_refill = NULL
self.ptr_to_noise_refill = NULL
self.ptr_to_noise_input_refill = NULL
self.ptr_noise_detectors = NULL
def __init__(self):
self.to_initial_fill = []
self.to_refill = []
self.to_rhs_refill = []
self.to_noise_refill = []
self.to_noise_input_refill = []
self.noise_detectors = []
def list_to_C(self):
"""Converts the python lists of workspaces into C Pyobject arrays for
fast loop access.
"""
if (
self.ptr_to_refill != NULL
or self.ptr_to_rhs_refill != NULL
or self.ptr_to_noise_refill != NULL
or self.ptr_to_noise_input_refill != NULL
):
raise MemoryError()
self.num_to_refill = len(self.to_refill)
self.ptr_to_refill = <PyObject**> calloc(self.num_to_refill, sizeof(PyObject*))
if not self.ptr_to_refill:
raise MemoryError()
cdef int i
for i in range(self.num_to_refill):
self.ptr_to_refill[i] = <PyObject*>self.to_refill[i]
self.num_to_rhs_refill = len(self.to_rhs_refill)
self.ptr_to_rhs_refill = <PyObject**> calloc(self.num_to_rhs_refill, sizeof(PyObject*))
if not self.ptr_to_rhs_refill:
raise MemoryError()
for i in range(self.num_to_rhs_refill):
self.ptr_to_rhs_refill[i] = <PyObject*>self.to_rhs_refill[i]
self.num_to_noise_refill = len(self.to_noise_refill)
self.ptr_to_noise_refill = <PyObject**> calloc(self.num_to_noise_refill, sizeof(PyObject*))
if not self.ptr_to_noise_refill:
raise MemoryError()
for i in range(self.num_to_noise_refill):
self.ptr_to_noise_refill[i] = <PyObject*>self.to_noise_refill[i]
self.num_to_noise_input_refill = len(self.to_noise_input_refill)
self.ptr_to_noise_input_refill = <PyObject**> calloc(self.num_to_noise_input_refill, sizeof(PyObject*))
if not self.ptr_to_noise_input_refill:
raise MemoryError()
for i in range(self.num_to_noise_input_refill):
self.ptr_to_noise_input_refill[i] = <PyObject*>self.to_noise_input_refill[i]
def clear_workspaces(self):
self.to_initial_fill.clear()
self.to_refill.clear()
self.to_rhs_refill.clear()
self.to_noise_refill.clear()
self.to_noise_input_refill.clear()
self.noise_detectors.clear()
def detector_list_to_C(self):
self.num_noise_detectors = len(self.noise_detectors)
if self.ptr_noise_detectors != NULL:
raise MemoryError()
self.ptr_noise_detectors = <PyObject**> calloc(self.num_noise_detectors, sizeof(PyObject*))
if not self.ptr_noise_detectors:
raise MemoryError()
for i in range(self.num_noise_detectors):
self.ptr_noise_detectors[i] = <PyObject*>self.noise_detectors[i]
def __dealloc__(self):
if self.ptr_to_refill:
free(self.ptr_to_refill)
if self.ptr_to_rhs_refill:
free(self.ptr_to_rhs_refill)
if self.ptr_to_noise_refill:
free(self.ptr_to_noise_refill)
if self.ptr_to_noise_input_refill:
free(self.ptr_to_noise_input_refill)
if self.ptr_noise_detectors:
free(self.ptr_noise_detectors)
[docs]cdef class BaseSolver:
"""A linear set of systems can be represented as a matrix, each equation
in this system is a particular state which we want to compute. The system
is solved by applying some inputs into various states, or the right hand
side (RHS) vector, and solving the system.
The underlying matrix can be either a sparse or dense matrix. This class
should not assume either, but merely call upon a standard matrix interface.
Therefore the algorithm used for solving can vary significantly.
The overall matrix is sectioned into submatricies which connect various
states together.
Nodes represent a physical location in the model in which some state of
the system must be computed. Some nodes can have multiple states, such
as multiple optical modes.
"""
def __init__(
self,
str name,
list nodes,
FrequencyContainer optical_frequencies,
dict signal_frequencies,
bint is_signal_matrix,
bint forced_refill,
dict node_aliases
):
if is_signal_matrix:
if signal_frequencies is None:
raise Exception("Signal frequency containers not provided")
else:
if signal_frequencies is not None:
raise Exception("Signal frequency container incorrectly provided for carrier simulation")
self.is_signal_matrix = is_signal_matrix
self.manual_rhs = False
self.workspaces = MatrixSystemWorkspaces()
self.forced_refill = forced_refill
self.optical_frequencies = optical_frequencies
self.signal_frequencies = signal_frequencies
if is_signal_matrix:
# Get the unique FrequencyContainer objects for filling the info of.
# This could be more optimal and dive into the references and check if
# frequency containers are actually unique, but this mostly just stops
# there being 100s of fsig containers from each node
# TODO - optimise this away from a tuple
self.unique_elec_mech_fcnts = tuple(set(signal_frequencies.values()))
self.noise_sources = dict()
self.any_frequencies_changing = False
# Global flag for if any frequency is changing
for _f in self.optical_frequencies.frequencies:
if _f.symbol.is_changing:
self.any_frequencies_changing = True
break
self.setup_nodes(nodes, node_aliases)
def input_components(self):
"""Components that are injecting something into the simulation"""
components = set()
for ws in self.workspaces.to_rhs_refill:
components.add(ws.owner)
return components
cpdef tuple get_node_frequencies(self, node):
if node.type == NodeType.OPTICAL:
return self.optical_frequencies.frequencies
elif node.type == NodeType.MECHANICAL:
return self.signal_frequencies[node].frequencies
elif node.type == NodeType.ELECTRICAL:
return self.signal_frequencies[node].frequencies
else:
raise ValueError()
cpdef setup_nodes(self, list all_nodes, dict node_aliases):
self.nodes = {n.full_name : n for n in all_nodes}
self.num_nodes = len(self.nodes)
self.node_aliases = {a.full_name: b.full_name for a, b, in node_aliases.items()}
self.node_2_index = {}
self.index_2_node = {}
i = 0
for n in self.nodes.values():
if n in node_aliases:
continue
self.node_2_index[n.full_name] = i
self.index_2_node[i] = n.full_name
i += 1
for n, a in node_aliases.items():
# Map node n to node a details, essentially just an alias
self.node_2_index[n.full_name] = self.node_2_index[a.full_name]
cpdef assign_operators(self, connector_workspaces):
"""An important function. This takes all the connector workspaces - i.e. model elements
that have requested some type of connection in the model - and ensures they have the
correct submatrix allocated to them in for this solver.
"""
raise NotImplementedError()
cpdef assign_noise_operators(self, connector_workspaces):
raise NotImplementedError()
cpdef add_rhs(self, unicode key):
raise NotImplementedError
cpdef factor(self):
raise NotImplementedError()
cpdef refactor(self):
raise NotImplementedError()
cpdef solve(self):
raise NotImplementedError()
cpdef solve_noises(self):
raise NotImplementedError()
def initialise(self, sim):
pass
cpdef initial_fill(self):
raise NotImplementedError()
cpdef refill(self):
raise NotImplementedError()
cpdef refill_rhs(self):
raise NotImplementedError()
cpdef fill_rhs(self):
raise NotImplementedError()
cpdef fill_noise_inputs(self):
raise NotImplementedError()
cpdef construct(self):
"""This is called when workspaces and submatrices have been setup. Calling
construct should now go and allocate the memory for the matrix and RHS.
This method should be overwritten by an inheriting solver class with
specfics of the solving technique.
"""
raise NotImplementedError()
cpdef destruct(self):
"""This is called when finishing and unbuilding the simulation.
Classes that override this call should mindful of what this method is doing
to and call it.
"""
self.workspaces.clear_workspaces()
cpdef initial_run(self):
"""Once a solver has been constructed it will most likely need to be initially
filled and ran. Some sparse solvers for example must do a full factor first, then
can perform faster refactors.
This method should be overwritten by an inheriting solver class with
specfics of the solving technique.
"""
raise NotImplementedError()
cpdef run(self):
"""Executes the simulation for model in its current state.
Takes the following steps to compute an output:
* If self.manual_rhs:
* Clears the RHS vector
* Fills the RHS vector
* Fills the matrix
* Solves
"""
if not self.manual_rhs:
self.clear_rhs()
self.refill_rhs()
self.refill()
self.solve()
def print_matrix(self):
raise NotImplementedError()
cpdef clear_rhs(self):
raise NotImplementedError()
cpdef Py_ssize_t findex(self, object node, Py_ssize_t freq):
"""
Returns simulation unique index for a given frequency at this node.
Used to refer to submatrices of HOMs in the interferometer matrix.
Parameters
----------
node : :class:`.Node`
Node object to get the index of.
freq : int
Frequency index.
Returns
-------
index : int
Index of the `node` for a given frequency.
"""
raise NotImplementedError()
cpdef Py_ssize_t node_id(self, object node):
if type(node) is str:
return self.node_2_index[node]
else:
return self.node_2_index[node.full_name]
cpdef get_node_info(self, node):
"""For a given node (object or name) the key parameters for where this node is
represented in the matrix of linear equations.
Parameters
----------
node : [str | Node]
The name or the Node object of the node.
Returns
-------
dict: A dictionary containing the following information about the node:
- index: The index of the node.
- rhs_index: The index of the right-hand side vector associated with the node.
- freq_index: The index of the frequency vector associated with the node.
- nfreqs: The number of frequencies.
- nhoms: The number of higher order modes. [TODO generalise to pixels/HOMs/whatever]
"""
raise NotImplementedError()
def get_frequency_object(self, frequency, node):
"""Get a :class:`.Frequency` object corresponding to a numerical or symbolic value.
Returns none if nothing has been found.
Parameters
----------
f : number or :class:`.Symbol`
Frequency to search for in this simulation.
Returns
-------
:class:`.Frequency`
The frequency object.
"""
from finesse.symbols import Symbol
if node.type == NodeType.OPTICAL:
frequencies = self.optical_frequencies.frequencies
elif node.type == NodeType.MECHANICAL:
if not self.is_signal_matrix:
return None
frequencies = self.signal_frequencies[node].frequencies
elif node.type == NodeType.ELECTRICAL:
if not self.is_signal_matrix:
return None
frequencies = self.signal_frequencies[node].frequencies
if isinstance(frequency, Symbol):
if frequency.is_changing:
# if it's tunable we want to look for the symbol that is just this
# lasers frequency, as it will be changing
for f in frequencies:
if f.symbol == frequency:
return f
f_value = float(frequency)
# otherwise do some value comparisons
for f in frequencies:
if np.isclose(float(f.f), f_value, atol=1e-15, rtol=1e-15):
return f
return None
cpdef update_frequency_info(self):
self.optical_frequencies.update_frequency_info()
if self.is_signal_matrix:
for i in range(len(self.unique_elec_mech_fcnts)):
(<FrequencyContainer>self.unique_elec_mech_fcnts[i]).update_frequency_info()