"""Objects for connecting and registering connections between components."""
import enum
import logging
import weakref
from collections import OrderedDict
from copy import deepcopy
from .. import components
from ..env import warn
from ..exceptions import ComponentNotConnected, FinesseException
from ..freeze import Freezable
from ..utilities import check_name, is_iterable
from ..symbols import Constant
LOGGER = logging.getLogger(__name__)
[docs]@enum.unique
class NodeType(enum.Enum):
"""Enum describing the physical connection type of a :class:`.Node`"""
OPTICAL = 0
ELECTRICAL = 1
MECHANICAL = 2
[docs]@enum.unique
class NodeDirection(enum.Enum):
"""Enum describing the direction that information at a :class:`.Node` flows.
This is largely a description to help understand how external information
flows in and out of a component. Inside a component all nodes will
couple to one another in more complex ways.
Input nodes are those going into a component, whereas output
describe those leaving. For example incident and reflected light fields.
Bidrectional takes information either direction. For example a mechanical
degree of freedom, external forces can be applied to it, or
its motion can be coupled to some external system.
"""
INPUT = 0
OUTPUT = 1
BIDIRECTIONAL = 2
[docs]class Port(Freezable):
"""A collection of all the nodes at a specific point/surface of a component.
Parameters
----------
name : str
Name of newly created node.
component : Sub-class of :class:`.Connector`
The component that this node belongs to.
node_type : :class:`.NodeType`
Physical node type.
"""
[docs] def __init__(self, name, component, node_type):
self._unfreeze()
self.__component = weakref.ref(component)
self.__name = check_name(name)
self.__full_name = "{}.{}".format(component.name, name)
self.__type = node_type
self.__nodes = OrderedDict()
self.__enabled = True
self._freeze()
def __deepcopy__(self, memo):
new = object.__new__(type(self))
memo[id(self)] = new
new.__dict__.update(deepcopy(self.__dict__, memo))
# Manually update the weakrefs to be correct
new.__component = weakref.ref(memo[id(self.component)])
return new
def _add_node(self, name, direction=None, node=None, **kwargs):
"""Adds a new node to this port. Once called the new node can be access with:
obj.port.name
Parameters
----------
name : str
Name of the new node
dof_parameter : ModelParameter
A model parameter to be associated with this node
Returns
-------
Node object added
"""
if name in self.__nodes:
raise Exception("Node already added to this component")
if node is None:
if self.__type == NodeType.OPTICAL:
if direction is None:
raise Exception(
"Node direction must be specified for optical nodes"
)
node = OpticalNode(name, self, direction, **kwargs)
elif (
self.__type == NodeType.MECHANICAL or self.__type == NodeType.ELECTRICAL
):
node = SignalNode(name, self, direction, self.__type, **kwargs)
else:
raise Exception("Unexpected node type")
elif len(kwargs) != 0:
raise Exception(f"Cannot use kwargs when node ({node!r}) is not None")
if node is None:
raise RuntimeError("Node unexpectedly None")
if node.type == NodeType.OPTICAL or self.type == NodeType.OPTICAL:
if node.type != self.type:
raise Exception(
f"Node ({node.type}) and port type ({self.type}) must be the same ({node!r}, {self!r})"
)
self.__nodes[name] = node
# Update Elements node dict so it doesn't have to keep checking
# it's ports about what nodes it has
self.__component()._Connector__nodes[node.full_name] = node
self._unfreeze()
assert not hasattr(self, name)
setattr(self, name, node)
self._freeze()
return node
def _replace_node(self, name, new_node):
"""Replaces a node at this port with a new one.
This can be used to change connections between elements but care must be taken.
This does not update any states of the elements beyond simply changing the node.
This should only be used when an element is borrowing a node from another
element, and you want to change which node it is borrowing.
"""
old_node = self.__nodes[name]
if name not in self.__nodes:
raise FinesseException(f"{self!r} does not have a node called `{name}`")
if new_node.type != self.type:
raise FinesseException("Node and port type must be the same")
self.__nodes[name] = new_node
# Update Elements node dict so it doesn't have to keep checking
# it's ports about what nodes it has
del self.__component()._Connector__nodes[old_node.full_name]
self.__component()._Connector__nodes[new_node.full_name] = new_node
self._unfreeze()
setattr(self, name, new_node)
self._freeze()
def __repr__(self):
return f"❮Port {self.component.name}.{self.name} Type={self.__type} @ {hex(id(self))}❯"
@property
def full_name(self):
"""
:getter: Returns a full name of the port: {component name}.{port name}
"""
return self.__full_name
@property
def enabled(self):
return self.__enabled
@property
def type(self):
""":class:`.NodeType` of the port object.
:getter: Returns the node-type of the port (read-only).
"""
return self.__type
@property
def name(self):
"""Name of the port object.
:getter: Returns the name of the port (read-only).
"""
return self.__name
@property
def _model(self):
return self.component._model
@property
def component(self):
"""The component which has ownership of this port.
:getter: Returns the component that this port belongs to (read-only).
"""
return self.__component()
@property
def is_connected(self):
"""Flag indicating whether the port is attached to another component.
:getter: Returns true if this port is attached to another component (read-only).
"""
return any(len(n.connections) > 0 for n in self.nodes)
@property
def attached_to(self):
"""Components that this port is attached to. Optical ports are only ever
connected to :class:`Space` elements. Ports containing signal nodes can have
multiple connections and returns a Set.
:getter: Returns the component this port is attached to, or returns None
if no such connected component exists. Signal ports return a Set
of components attached (read-only).
"""
if self.type == NodeType.OPTICAL:
spaces = [_.space for _ in self.nodes]
if all([spaces[0] == _ for _ in spaces]):
return spaces[0]
else:
raise Exception(
"Nodes are somehow connected to different "
"spaces which should not happen"
)
else:
attached_to = set()
for n in self.nodes:
for c in n.connections:
attached_to.add(c)
return attached_to
@property
def refractive_index(self):
"""If the port is an Optical port, this will return a symbolic value for the
refractive index at this port. The refractive index is set by the `Space`
elements that are attached to it.
Returns
-------
nr : Symbol
Symbolic value for refractive index
"""
if self.type != NodeType.OPTICAL:
raise Exception("Port type is not optical, cannot get refractive index")
if self.attached_to:
return self.attached_to.nr.ref
else:
return Constant(1)
@property
def space(self):
"""Space that the port is attached to. Equivalent to :attr:`Port.attached_to`.
:getter: Returns the space that this port is attached to (read-only).
"""
if self.type != NodeType.OPTICAL:
raise Exception("Port type is not optical, cannot retrieve attached space.")
return self.attached_to
@property
def nodes(self):
"""Nodes associated with the port.
:getter: Returns a tuple of the associated nodes at this port (read-only).
"""
return tuple(self.__nodes.values())
[docs] def node(self, name):
"""Get a node at this port by its name."""
return self.__nodes[name]
[docs] def get_unique_node(self, predicate: callable):
"""Returns the unique node at this port that satisfies the provided predicate.
If multiple nodes satisfy this predicate then a RuntimeError is raised.
Parameters
----------
predicate : callable(components.Node) -> bool
A callable that accepts a Node and returns a boolean value
Examples
--------
Selecting a unique output node:
port.get_unique_node(lambda node: not node.is_input)
"""
is_node = tuple(predicate(_) for _ in self.nodes)
if is_node.count(True) == 1:
idx = is_node.index(True)
return self.nodes[idx]
else:
raise RuntimeError(
f"Port {repr(self)} does not have a single node that satisfies the predicate"
)
[docs]class Node:
"""Represents a specific connection at a component.
Mathematically a node represents a single equation in the interferometer matrix.
A node can only be owned by a single component instance - with weak references stored by the
connected components.
Parameters
----------
name : str
Name of newly created node.
component : :class:`.Port`
The port that this node belongs to.
node_type : :class:`.NodeType`
Physical node type.
"""
[docs] def __init__(self, name, port, node_type, direction):
self.__port = weakref.ref(port)
# self.__component = weakref.ref(port.component)
self.__name = check_name(name)
self.__type = node_type
self.__direction = direction
self.__full_name = "{}.{}.{}".format(port.component.name, port.name, name)
self.__port_name = "{}.{}".format(port.name, name)
self.__tag_name = None
self.__connection = None
self.used_in_detector_output = []
self.has_signal_injection = False
def __deepcopy__(self, memo):
new = object.__new__(type(self))
memo[id(self)] = new
new.__dict__.update(deepcopy(self.__dict__, memo))
id_port = id(self.port)
# Manually update the weakrefs to be correct
if id_port in memo:
new.__port = weakref.ref(memo[id_port])
else:
# We need to update this reference later on
# This will be called when the port property
# is accessed. When this happens we'll peak back
# at the memo once it has been filled and get
# the new port reference. After this the refcount
# for this function should goto zero and be garbage
# collected
def update_later():
new.__port = weakref.ref(memo[id_port])
new.__port = update_later # just in case something calls
# this weakref in the meantime
memo[id(self._model)].after_deepcopy.append(update_later)
# new.__component = weakref.ref(memo[id(self.component)])
return new
def __repr__(self):
return f"❮{self.__class__.__name__} {self.full_name} @ {hex(id(self))}❯"
@property
def is_input(self):
"""Flag indicating whether this node is an input to the associated component.
:getter: Returns `True` if the field at this node
goes into `self.component` (read-only).
"""
return self.direction == NodeDirection.INPUT
@property
def full_name(self):
"""Full name.
:getter: Returns a full name of the node: {component name}.{port name}.{node name}
"""
return self.__full_name
@property
def port_name(self):
"""Port name.
:getter: Returns a shortened name of the node: {port name}.{node name}
"""
return self.__port_name
@property
def type(self):
""":class:`.NodeType` of the node object.
:getter: Returns the node-type of the node (read-only).
"""
return self.__type
@property
def direction(self):
"""
:class:`.NodeDirection` of this node. This is largely a
description to help understand how external information flow
in and out of a component. Inside a component all nodes will
couple to one another in more complex ways.
Input nodes are those going into a component, whereas output
describe those leaving. For example incident and reflected light fields.
Bidrectional takes information either direction. For example a mechanical
degree of freedom, external forces can be applied to it, or
its motion can be coupled to some external system.
:getter: Returns the directionality of the node (read-only).
"""
return self.__direction
@property
def port(self):
""":class:`.Port` this node is attached to.
:getter: Returns the port of this node (read-only).
"""
return self.__port()
@property
def name(self):
"""Name of the node object.
:getter: Returns the name of the node (read-only).
"""
return self.__name
@property
def tag(self):
"""Tagged name of the node object.
:getter: Returns the tagged (user-defined) name of the node (read-only).
"""
return self.__tag_name
def _set_tag(self, tag):
self.__tag_name = tag
@property
def _model(self):
return self.component._model
@property
def component(self):
"""The component which has ownership of this node.
:getter: Returns the component that this node belongs to (read-only).
"""
return self.port.component
[docs] def is_neighbour(self, node):
"""Checks if `node` is a connected by an edge to this node.
Parameters
----------
node : :class:`.Node`
Node with which to check connection.
Returns
-------
flag : bool
True if `node` is connected to this node, False otherwise.
"""
# if not associated with a model yet, check registered connections directly
try:
a = self._model.network.has_node(self.full_name)
b = node.full_name in self._model.network.neighbors(self.full_name)
return a and b
except ComponentNotConnected:
return (
self.full_name,
node.full_name,
) in self.component._registered_connections.values()
# if not self.has_model():
# return (self, node) in self.__component()._registered_connections.values()
# return (self._model.network.has_node(self) and
# node in self._model.network.neighbors(self))
@property
def connections(self):
""":getter: Returns a collection of :class:`.Space`, :class:`.Wire`,
or :class:`DegreeOfFreedom` objects attached to this node (read-only).
"""
try:
if self.direction == NodeDirection.INPUT:
edges = self._model.network.in_edges(self.full_name)
elif self.direction == NodeDirection.OUTPUT:
edges = self._model.network.out_edges(self.full_name)
else:
edges = self._model.network.edges(self.full_name)
objects = []
for edge in edges:
edge_data = self._model.network.get_edge_data(*edge)
owner = edge_data["owner"]()
if isinstance(
owner,
(
components.Space,
components.Wire,
components.DegreeOfFreedom,
),
):
objects.append(owner)
return objects
except ComponentNotConnected:
pass
return tuple()
[docs]class SignalNode(Node):
"""Represents a specific small signal degree of freedom. A signal is some small AC
oscillation in some property, such as longitudinal motion, voltage, laser amplitude,
etc.
Parameters
----------
name : str
Name of the mechanical motion.
port : :class:`.Port`
The port that this node belongs to.
num_frequencies : int
Number of mechanical frequencies to model
"""
def __init__(self, name, port, direction, node_type):
super().__init__(name, port, node_type, direction)
self.__frequencies = None
self.__num_frequencies = 1
@property
def num_frequencies(self):
return self.__num_frequencies
@property
def frequencies(self):
if self.__frequencies is None:
return (self.component._model.fsig.f.ref,)
else:
return self.__frequencies
@frequencies.setter
def frequencies(self, value):
self.__frequencies = value
self.__num_frequencies = len(value)
[docs]class OpticalNode(Node):
"""Represents a specific optical port connection at a component.
OpticalNodes also have additional physical properties such as the beam parameter (of type
:class:`.BeamParam`) at the nodes' position within the interferometer.
Parameters
----------
name : str
Name of the optical node.
port : :class:`.Port`
The port that this node belongs to.
direction : :class:`.NodeDirection`
True if the field at this node is going into the component.
"""
[docs] def __init__(self, name, port, direction):
super().__init__(name, port, NodeType.OPTICAL, direction)
self.__space = None
def __deepcopy__(self, memo):
new = super().__deepcopy__(memo)
# Manually update the weakrefs to be correct
if self.__space is not None:
id_space = id(self.__space())
if id_space in memo:
new.__space = weakref.ref(memo[id_space])
else:
# We need to update this reference later on
# This will be called when the port property
# is accessed. When this happens we'll peak back
# at the memo once it has been filled and get
# the new port reference. After this the refcount
# for this function should goto zero and be garbage
# collected
def update_later():
new.__space = weakref.ref(memo[id_space])
new.__space = update_later # just in case something calls
# this weakref in the meantime
memo[id(self._model)].after_deepcopy.append(update_later)
return new
[docs] @staticmethod
def get_opposite_direction(node):
"""Returns the opposite direction of a node from either a Node object or a full
string name qualifier for a node, `component.port.direction` `l1.p1.o`.
Parameters
----------
node : [str | :class:`.Node`]
Node to invert
"""
if isinstance(node, str):
if node.endswith(".o"):
return node.removesuffix(".o") + ".i"
elif node.endswith(".i"):
return node.removesuffix(".i") + ".o"
else:
raise ValueError(
f"`{node}` string name was not in the form `component.port.i` or `component.port.o`"
)
else:
return node.opposite
@property
def opposite(self):
"""The opposite direction node.
:getter: Returns the opposite direction node to this one.
"""
return getattr(self.port, "o" if self.is_input else "i")
@property
def q(self):
"""Beam parameter value at this node.
:getter: Returns the beam parameter at this node. If the beam
parameters in the tangential and sagittal planes are
different then it returns a tuple of the two parameters.
:setter: Sets the beam parameter at this node. If the argument
provided is a 2-tuple then the parameter is set astigmatically
for the node.
"""
return self.qx, self.qy
@q.setter
def q(self, value):
if is_iterable(value): # both qx and qy specified
self.qx = value[0]
self.qy = value[1]
else: # only one q specified
if self in self._model.gausses:
self._model.update_gauss(self, qx=value, qy=value)
else:
node_name = self.full_name.replace(".", "_")
name = f"g{node_name}"
from .gauss import Gauss # avoids circular import
self._model.add(Gauss(name, self, qx=value, qy=value))
@property
def qx(self):
"""Beam parameter value in the tangential plane.
:getter: Returns the beam parameter at the node in the
tangential plane.
:setter: Sets the beam parameter at the node in the
tangential plane.
"""
trace = self._model.last_trace
if trace is None:
gauss = self._model.gausses.get(self)
# TODO (sjr) Also check in model.cavities for this node as source
if gauss is None:
raise RuntimeError(
f"No stored beam trace yet performed on model and {self.full_name} "
"does not have correspond to any Gauss or Cavity object - unable to "
"access any beam parameter at this node."
)
warn(
"No beam trace solution yet stored in the model. Returning "
"qx based on Gauss object entry."
)
qx = gauss.qx
else:
qx, _ = trace.get(self, (None, None))
if qx is None:
raise RuntimeError(
f"Bug encountered! Could not find entry for {self.full_name} "
"in last stored beam trace solution of the model."
)
return qx
@qx.setter
def qx(self, value):
if self in self._model.gausses:
self._model.update_gauss(self, qx=value)
else:
node_name = self.full_name.replace(".", "_")
name = f"g{node_name}"
from .gauss import Gauss # avoids circular import
self._model.add(Gauss(name, self, q=value))
@property
def qy(self):
"""Beam parameter value in the sagittal plane.
:getter: Returns the beam parameter at the node in the
sagittal plane.
:setter: Sets the beam parameter at the node in the
sagittal plane.
"""
trace = self._model.last_trace
if trace is None:
gauss = self._model.gausses.get(self)
# TODO (sjr) Also check in model.cavities for this node as source
if gauss is None:
raise RuntimeError(
f"No stored beam trace yet performed on model and {self.full_name} "
"does not have correspond to any Gauss or Cavity object - unable to "
"access any beam parameter at this node."
)
warn(
"No beam trace solution yet stored in the model. Returning "
"qy based on Gauss object entry."
)
qy = gauss.qy
else:
_, qy = trace.get(self, (None, None))
if qy is None:
raise RuntimeError(
f"Bug encountered! Could not find entry for {self.full_name} "
"in last stored beam trace solution of the model."
)
return qy
@qy.setter
def qy(self, value):
if self in self._model.gausses:
self._model.update_gauss(self, qy=value)
else:
node_name = self.full_name.replace(".", "_")
name = f"g{node_name}"
from .gauss import Gauss # avoids circular import
self._model.add(Gauss(name, self, q=value))
@property
def space(self):
"""A reference to the :class:`.Space` object attached to this node.
:getter: Returns a reference to the :class:`.Space`
object attached to the node (read-only).
"""
if self.__space is not None:
return self.__space()
try:
if self.is_input:
edges = self._model.network.in_edges(self.full_name)
else:
edges = self._model.network.out_edges(self.full_name)
for edge in edges:
edge_data = self._model.network.get_edge_data(*edge)
owner = edge_data["owner"]()
if isinstance(owner, components.Space):
self.__space = weakref.ref(owner)
return owner
except ComponentNotConnected:
pass
return None