Source code for finesse.graph.operator_graph

cimport numpy as np
import numpy as np
from cpython.ref cimport PyObject
from libc.stdlib cimport free


cdef expression_node_to_tuple_form(ExpressionNode* node, dict value_map):
    """
    Converts an ExpressionNode to a tuple representation. This is a representation
    of the expression tree with nested tuples. The first element of the tuple is
    the operator and the remaining elements are the operands, see Examples section
    below for more details.

    Parameters
    ----------
    node : ExpressionNode*
        The ExpressionNode to convert.
    value_map : dict
        A dictionary mapping values to names. If provided, the values in the
        ExpressionNode are replaced with their corresponding mapping.

    Returns
    -------
    Tuple
        The tuple representation of the ExpressionNode. See Example section

    Example
    -------
    >>> one = create_value_node(1)
    >>> expression_node_to_tuple_form(mul, {})
    (1,)
    >>> add = create_op_node(ADD, 3)
    >>> add_operand(add, create_value_node(1))
    >>> add_operand(add, create_value_node(2))
    >>> add_operand(add, create_value_node(3))
    >>> // add -> 1+2+3
    >>> mul = create_op_node(MUL, 2)
    >>> add_operand(mul, create_value_node(4))
    >>> add_operand(mul, add)
    >>> // mul -> 4*(1+2+3)
    >>> expression_node_to_tuple_form(mul, {})
    ('*', 4, ('+', 1, 2, 3))

    >>> expression_node_to_tuple_form(mul, value_map={1:'a', 2:'b', 3:'c', 4:'d'})
    ('*', 'd', ('+', 'a', 'b', 'c'))
    """
    cdef int i
    if node.type == VALUE:
        if value_map is not None:
            return value_map[node.data.value]
        else:
            return node.data.value
    elif node.type in (ADD, MUL):
        if node.type == ADD:
            operator = "+"
        elif node.type == MUL:
            operator = "*"
        else:
            raise ValueError("Invalid operator type")
        operands = tuple(expression_node_to_tuple_form(node.data.op.operands[i], value_map) for i in range(node.data.op.num_operands))
        return tuple((operator, *operands))


[docs]cdef class PyObjectPointer: cdef PyObject* ptr
[docs]cdef class OperatorGraph: """A class representing a graph of linear operators. This class provides methods for creating, manipulating, and analyzing a graph of linear operators. The graph is represented using an adjacency list data structure, where each node represents a state and each edge represents a linear operator connecting two states.
Parameters ---------- num_nodes : int
The number of nodes in the graph. Attributes ---------- graph : Graph The underlying graph data structure. operator_indices : dict A dictionary mapping operator names to their corresponding indices. indices_operator_names : dict A dictionary mapping indices to their corresponding operator names. Methods ------- _validate_node(node) Validates if a given node index is within the valid range. number_of_nodes() Returns the number of nodes in the graph. number_of_edges() Returns the number of edges in the graph. edges() Returns an iterator over all edges in the graph. output_edges(from_node) Returns an iterator over the outgoing edges at a particular node. input_edges(node) Returns an iterator over the input edges at a particular node. print_graph() Prints the graph. add_edge(name, from_node, to_node) Adds an edge to the graph. remove_edge(from_node, to_node) Removes an edge from the graph. get_edge_operator_expression(from_node, to_node) Returns a list presentation of the operator expression associated with an edge. number_of_self_loops() Returns the number of self-loops in the graph. nodes_with_self_loops() Returns the nodes in the graph that have self-loops. self_loop_edges() Returns the edges in the graph that are self-loops. has_self_loop(node) Checks if a node has a self-loop. in_degree(node) Returns the in-degree of a node. out_degree(node) Returns the out-degree of a node. fix_node(node, state) Fixes a node to a specific state. next_reducible_node() Returns a node index that can be reduced evaluation_nodes(ignore_self_loops) Returns the nodes that have both inputs and outputs in the graph. sum_rule(node) Applies the sum reduction rule to a node. split_rule(node) Applies the split reduction rule to a node. fan_rule(node) Applies the fan reduction rule to a node. series_rule(node) Applies the series reduction rule to a node. """ def __init__(self, num_nodes): self.graph = create_graph(num_nodes) self.operator_indices = {} self.indices_operator_names = {} def _validate_node(self, node): if node < 0 or node >= self.graph.num_nodes: raise Exception(f"Node {node} must be between 0 and {self.graph.num_nodes-1}") @property def number_of_nodes(self): return int(self.graph.num_nodes) @property def number_of_edges(self): return int(self.graph.num_edges) def edges(self): """An iterator over all edges""" for i in range(self.number_of_nodes): for edge in self.output_edges(i): yield edge def output_edges(self, from_node): """An iterator over the outgoing edges at a particular node """ self._validate_node(from_node) cdef int ifrom_node = int(from_node) cdef Edge* current = self.graph.output_edges[ifrom_node] edges = [] while (current != NULL): edges.append((current.from_node, current.to_node)) current = current.next return edges def input_edges(self, node): """An iterator over the input edges at a particular node """ self._validate_node(node) cdef int ifrom_node = int(node) cdef ListItem* current = self.graph.input_edges[ifrom_node] cdef Edge *edge = NULL edges = [] while (current != NULL): if current.ptr == NULL: raise Exception("No edge atttached") edge = <Edge*>current.ptr edges.append((edge.from_node, edge.to_node)) current = current.next return edges def print_graph(self): print_graph(self.graph) def add_edge(self, name : str, from_node : int, to_node : int): """Add an edge to the graph. This method adds an edge from `from_node` to `to_node` in the graph. The edge is associated with a linear operator named `name`. If the edge already exists in the graph, or if an error occurs while adding the edge, an exception is raised. The added linear operator is given a monotonically increasing ID value, starting from 0. See `.operator_indices` for a mapping of operator names to operator IDs and `.indices_operator_names` for a mapping of operator IDs to operator names. Parameters ---------- name : str The name of the linear operator associated with the edge. from_node : int The index of the node where the edge starts. to_node : int The index of the node where the edge ends. Raises ------ Exception If the edge already exists in the graph (error code -4), or if an error occurs while adding the edge (any positive error code). """ if name in self.operator_indices: raise KeyError(f"Operator {name} already exists in the graph") op_id = self.operator_indices[name] = len(self.operator_indices) self.indices_operator_names[op_id] = name self._validate_node(from_node) self._validate_node(to_node) cdef LinearOperator *op = create_linear_operator(op_id) if op == NULL: raise MemoryError(f"Error creating linear operator {name}") cdef int error = add_edge( self.graph, from_node, to_node, op, ) if error == 1: destroy_linear_operator(op) elif error == -4: raise Exception(f"A connection {from_node}->{to_node} already exists in the graph") elif error > 0: raise Exception(f"Error code {error} adding edge {from_node}->{to_node}") def remove_edge(self, from_node, to_node): """Remove an edge from the graph. This method removes an edge from `from_node` to `to_node` in the graph. If the edge does not exist in the graph, or if an error occurs while removing the edge, an exception is raised. Linear operator names and IDs associated with this edge are not removed from the graph when an edge is removed. See `.operator_indices` for a mapping of operator names to operator IDs and `.indices_operator_names` for a mapping of operator IDs to operator names. Parameters ---------- from_node : int The index of the node where the edge starts. to_node : int The index of the node where the edge ends. Raises ------ Exception If the edge does not exist in the graph, or if an error occurs while removing the edge. """ self._validate_node(from_node) self._validate_node(to_node) error = remove_edge(self.graph, from_node, to_node) if error != 0: raise Exception(f"Removing connection {from_node}->{to_node} had error {error}") def get_edge_operator_expression(self, from_node, to_node): """ Retrieves the operator expression associated with an edge between two nodes. Parameters ---------- from_node : Node The source node of the edge. to_node : Node The target node of the edge. Returns ------- list A list representation of the operator expression. Example ------- For an operator expression `4*(1+2+3)` this will return a list respresentation of `('*', 4, ('+', 1, 2, 3))`. See `expression_node_to_tuple_form` for more details. Raises ------ Exception If the connection between the nodes does not exist in the graph. Exception If there is no linear operator at the edge. Exception If there is no expression at the edge. """ self._validate_node(from_node) self._validate_node(to_node) cdef Edge* edge = NULL if not edge_exists(self.graph, from_node, to_node, &edge): raise Exception(f"Connection {from_node}->{to_node} does not exist in the graph") else: if edge.operator == NULL: raise Exception("No linear operator at this edge") if edge.operator.expr == NULL: raise Exception("No expression at this edge") expr = expression_node_to_tuple_form(edge.operator.expr, self.indices_operator_names) if type(expr) is not tuple: return (expr,) else: return expr @property def number_of_self_loops(self): """Get the number of self-loops in the graph. This property returns the total number of self-loops in the graph. A node is considered to have a self-loop if the corresponding entry in the `self_loop` array of the graph is greater than 0. The total number of self-loops is the sum of these entries. Returns ------- int The total number of self-loops in the graph. """ return np.sum(<int[:self.graph.num_nodes]>self.graph.self_loop) @property def nodes_with_self_loops(self): """Get the nodes in the graph that have self-loops. This property returns a tuple of indices of the nodes in the graph that have self-loops. A node is considered to have a self-loop if the corresponding entry in the `self_loop` array of the graph is greater than 0. Returns ------- tuple of int The indices of the nodes in the graph that have self-loops. """ return tuple(i for i, n in enumerate(<int[:self.graph.num_nodes]>self.graph.self_loop) if n > 0) @property def self_loop_edges(self): """ Get the edges in the graph that are self-loops. This property returns a tuple of tuples, where each inner tuple represents an edge that is a self-loop. Each inner tuple contains two identical elements, which are the index of the node that has a self-loop. Returns ------- tuple of tuple of int The edges in the graph that are self-loops. Each edge is represented as a tuple of two identical integers, which are the index of the node that has a self-loop. """ return tuple((i, i) for i, n in enumerate(<int[:self.graph.num_nodes]>self.graph.self_loop) if n > 0) def has_self_loop(self, node): """Check if a node has a self-loop. This method checks if the specified node has a self-loop in the graph. Parameters ---------- node : int The index of the node to check. Returns ------- int 1 if the node has a self-loop, 0 otherwise. """ self._validate_node(node) return int(self.graph.self_loop[int(node)]) def in_degree(self, node): """Get the in-degree of a node. This method returns the in-degree of the specified node in the graph. The in-degree of a node is the number of edges coming into the node. Parameters ---------- node : int The index of the node to check. Returns ------- int The in-degree of the node. """ self._validate_node(node) return int(self.graph.input_degree[int(node)]) def out_degree(self, node): """Get the out-degree of a node. This method returns the out-degree of the specified node in the graph. The out-degree of a node is the number of edges going out of the node. Parameters ---------- node : int The index of the node to check. Returns ------- int The out-degree of the node. """ self._validate_node(node) return int(self.graph.output_degree[int(node)]) def fix_node(self, int node, bint state): """Fix a node to a specific state. When True the node will not be reduced/removed from the graph. This method fixes the specified node to the given state in the graph. If the node index is not valid, an exception is raised. Parameters ---------- node : int The index of the node to fix. state : bint The state to which the node should be fixed. This should be a boolean-like value (0 or 1). Raises ------ Exception If the node index is not valid (i.e., if it is less than 0 or greater than the number of nodes in the graph). """ self._validate_node(node) if node < 0 or node > self.graph.num_nodes: raise Exception(f"Node should be between 0 and {self.graph.num_nodes}") else: self.graph.fixed_node[node] = state def next_reducible_node(self, keep=None): """ Keeps returning a node index if there is one that can be reduced. Returns None once there are none left """ for i in range(self.number_of_nodes): if self.has_self_loop(i) or (keep is not None and i in keep): continue if self.in_degree(i) == 1 and self.out_degree(i) == 1: return i elif self.in_degree(i) >= 1 and self.out_degree(i) == 1: return i elif self.in_degree(i) == 1 and self.out_degree(i) >= 1: return i elif self.in_degree(i) >= 1 and self.out_degree(i) >= 1: return i return None def evaluation_nodes(self, ignore_self_loops=False): """Nodes that have both inputs and outputs in the graph. These nodes must be included in the M matrix to solve a Ma=b type system. Parameters ---------- ignore_self_loops : bool, optional When true any self loops at a node are not included in the determination if this node is a source Returns ------- nodes : Tuple node integer values """ if ignore_self_loops: return tuple([ _ for _ in range(self.number_of_nodes) if self.out_degree(_)-self.has_self_loop(_) > 0 and self.in_degree(_)-self.has_self_loop(_) > 0 ]) else: return tuple([ _ for _ in range(self.number_of_nodes) if self.out_degree(_) > 0 and self.in_degree(_) > 0 ]) def source_nodes(self, ignore_self_loops=False): """Nodes which have no input edges Parameters ---------- ignore_self_loops : bool, optional When true any self loops at a node are not included in the determination if this node is a source Returns ------- nodes : Tuple node integer values """ if ignore_self_loops: return tuple([ _ for _ in range(self.number_of_nodes) if self.out_degree(_)-self.has_self_loop(_) > 0 and self.in_degree(_)-self.has_self_loop(_) == 0 ]) else: return tuple([ _ for _ in range(self.number_of_nodes) if self.out_degree(_) > 0 and self.in_degree(_) == 0 ]) def sink_nodes(self, ignore_self_loops=False): """Nodes which have no output edges Parameters ---------- ignore_self_loops : bool, optional When true any self loops at a node are not included in the determination if this node is a sink Returns ------- nodes : Tuple node integer values """ if not ignore_self_loops: return tuple([ _ for _ in range(self.number_of_nodes) if self.out_degree(_)-self.has_self_loop(_) == 0 and self.in_degree(_)-self.has_self_loop(_) > 0 ]) else: return tuple([ _ for _ in range(self.number_of_nodes) if self.out_degree(_) == 0 and self.in_degree(_) > 0 ]) def isolated_nodes(self): """Nodes which have no input or output edges Returns ------- nodes : Tuple node integer values """ return tuple( [ _ for _ in range(self.number_of_nodes) if self.in_degree(_) == 0 and self.out_degree(_) == 0 ] ) def to_networkx(self, ignore_nodes=None): """Convert the graph to a NetworkX MultiDiGraph. This method converts the current graph to a NetworkX MultiDiGraph. If `ignore_nodes` is provided, any edges involving nodes in `ignore_nodes` are not included in the resulting MultiDiGraph. Parameters ---------- ignore_nodes : iterable of int, optional An iterable of node indices to ignore. If provided, any edges involving these nodes are not included in the resulting MultiDiGraph. By default, no nodes are ignored. Returns ------- networkx.MultiDiGraph The NetworkX MultiDiGraph representation of the current graph. """ import networkx as nx G = nx.MultiDiGraph() for n in range(self.number_of_nodes): for i, o, in self.output_edges(n): if ignore_nodes is not None and (i in ignore_nodes or o in ignore_nodes): continue else: G.add_edge(i, o) return G def plot( self, graphviz_prog="neato", pos=None, node_labels=None, edge_labels=False, ignore_nodes=None, alpha_nodes=None, alpha="44", dpi=200, svg=False ): """ Plot the operator graph using Graphviz. Parameters ---------- graphviz_prog : str, optional The Graphviz program to use for layout. Default is 'neato', see PyGraphviz for mode layouts. pos : dict, optional A dictionary mapping node names to positions. Default is None. node_labels : dict, optional Replace node indices with text labels, default is off. edge_labels : bool, optional Label edges with operator expressions, default is False. ignore_nodes : list, optional A list of nodes indices to ignore. Default is an empty list. alpha_nodes : list, optional A list of node indices to apply alpha transparency to. Default is an empty list. alpha : str, optional The doubled hexadecial alpha value for transparency. Default is '44', opqaue is 'FF', fully transparent is '00' dpi : int, optional Specifies the expected number of pixels per inch on a display device. Default is '200' svg : bool, optional Whether to output the graph as SVG. Default is False. Returns ------- dict A dictionary mapping node names to positions. """ if alpha_nodes is None: alpha_nodes = [] from networkx.drawing.nx_agraph import to_agraph from IPython.display import Image, display, SVG from finesse.env import is_interactive from PIL import Image as PIL_Image import tempfile import webbrowser G = self.to_networkx(ignore_nodes=ignore_nodes) G.graph["splines"]="true" G.graph["overlap"]="false" G.graph["node"]={"color":"black", "style":"filled", "fillcolor":"#CCCCCC",} G.graph["dpi"]=dpi for n in G.nodes: if pos is not None and n in pos: G.nodes[n]["pos"] = pos[n] if n in alpha_nodes: G.nodes[n]["fillcolor"] = "#CCCCCC"+alpha G.nodes[n]["color"] = "#000000"+alpha G.nodes[n]["fontcolor"] = "#000000"+alpha if node_labels is not None: G.nodes[n]["label"] = node_labels[n] for a, b, c in G.edges: G[a][b][c]["penwidth"]=2 if edge_labels: G[a][b][c]["label"] = str(self.get_edge_operator_expression(a, b)) if a in alpha_nodes or b in alpha_nodes: G[a][b][c]["color"] = "#000000"+alpha else: G[a][b][c]["color"] = "#000000" for n, n in self.self_loop_edges: if n in alpha_nodes: G[n][n][0]["color"]="#FF0000"+alpha else: G[n][n][0]["color"]="#FF0000" A = to_agraph(G) if pos is None: A.layout(graphviz_prog) else: A.layout() img_fmt = "svg" if svg else "png" if is_interactive(): ipy_func = SVG if svg else Image display(ipy_func(A.draw(format=img_fmt))) else: # Need to keep the file, since the python process exits after launching # the browser (when opening svg files) with tempfile.NamedTemporaryFile(suffix="." + img_fmt, delete=False) as f: A.draw(format=img_fmt, path=f.name) if svg: webbrowser.open_new(f"file://{f.name}") else: PIL_Image.open(f.name).show() positions = {n:n.attr["pos"] for n in A.nodes()} return positions def find_forkless_paths(self): """Finds all paths between nodes which do not have any forks in. Forks are defined here as a node with an input or output degree == 1 - except for the first and last node. Only paths with more than one edge are returned. Returns ------- list Each element will be a list of nodes defining the edges of the forkless paths """ def traverse(self, start, visited, path, final_paths): if visited[start]: return path[:-1] visited[start] = True in_degree = self.in_degree(start) out_degree = self.out_degree(start) if out_degree == 1 and in_degree <= 1: # A single output so keep following it i, o = tuple(self.output_edges(start))[0] path.append(i) return traverse(self, o, visited, path, final_paths) elif out_degree > 1 or in_degree > 1: path.append(start) if len(path) > 2: final_paths.append(path) # Save this path for i, o in self.output_edges(start): # then start the process for each branch path = [i] traverse(self, o, visited, path, final_paths) else: path.append(start) if len(path) > 2: final_paths.append(path) return path visited = np.zeros(self.number_of_nodes, dtype=bool) final_paths = [] for source in self.source_nodes() + self.evaluation_nodes(): path = [] traverse(self, source, visited, path, final_paths) if not all(visited): raise Exception(f"Missed some nodes: {visited}") return final_paths def series_rule(self, node): """Apply the series reduction rule to a node in the graph. Parameters ---------- node : int The index of the node. Returns ------- Tuple[Tuple[int, int, int]] A tuple of tuples representing the edges that have been reduced. Each tuple contains the indices of the start, mid, and end nodes. Raises ------ Exception If the graph is null or if an error occurs while applying the rule. """ cdef int error = 0 cdef int start, end if self.graph == NULL: raise Exception("Graph is null") error = apply_series_rule(self.graph, node, &start, &end) if error < 0: ex = Exception(f"Error code {error} applying series rule to {node}") ex.error_code = error raise ex return ((start, node, end), ) def split_rule(self, node): """Apply the split reduction rule to a node in the graph. Parameters ---------- node : int The index of the node. Returns ------- Tuple[Tuple[int, int, int]] A tuple of tuples representing the edges that have been reduced. Each tuple contains the indices of the start, mid, and end nodes. Raises ------ Exception If the graph is null or if an error occurs while applying the rule. """ cdef int error = 0, size, start cdef int* end = NULL try: if self.graph == NULL: raise Exception("Graph is null") error = apply_split_rule(self.graph, node, &start, &size, &end) if error < 0: ex = Exception(f"Error code {error} applying split rule to {node}") ex.error_code = error raise ex reductions = [] for i in range(size): reductions.append((start, node, end[i])) return tuple(reductions) finally: if end != NULL: free(end) def sum_rule(self, node): """Apply the sum reduction rule to a node in the graph. Parameters ---------- node : int The index of the node. Returns ------- Tuple[Tuple[int, int, int]] A tuple of tuples representing the edges that have been reduced. Each tuple contains the indices of the start, mid, and end nodes. Raises ------ Exception If the graph is null or if an error occurs while applying the rule. """ cdef int error = 0, size, end cdef int* start = NULL try: if self.graph == NULL: raise Exception("Graph is null") error = apply_sum_rule(self.graph, node, &size, &start, &end) if error < 0: ex = Exception(f"Error code {error} applying sum rule to {node}") ex.error_code = error raise ex reductions = [] for i in range(size): reductions.append((start[i], node, end)) return tuple(reductions) finally: if start != NULL: free(start) def fan_rule(self, node): """Apply the fan reduction rule to a node in the graph. Parameters ---------- node : int The index of the node. Returns ------- Tuple[Tuple[int, int, int]] A tuple of tuples representing the edges that have been reduced. Each tuple contains the indices of the start, mid, and end nodes. Raises ------ Exception If the graph is null or if an error occurs while applying the rule. """ cdef int error = 0, size = 0 cdef int* start = NULL cdef int* end = NULL cdef int i = 0 try: if self.graph == NULL: raise Exception("Graph is null") error = apply_fan_rule(self.graph, node, &size, &start, &end) if error < 0: ex = Exception(f"Error code {error} applying fan rule to {node}") ex.error_code = error raise ex if start == NULL: raise MemoryError("Start is NULL") if end == NULL: raise MemoryError("End is NULL") reductions = [] for i in range(size): reductions.append((start[i], node, end[i])) return tuple(reductions) finally: if start != NULL: free(start) if end != NULL: free(end) def reduce(self, keep=None, reductions=None): """ Reduces the graph to as few evaluations nodes as possible. Evaluation nodes are those that must be solved for when solving a system such as Ma = b. All other nodes are then either sinks or sources. Sinks do not need to be included in M as evaluation and source nodes solutions can be simply propagated to them. Parameters ---------- keep : [list|tuple], optional nodes that should not be reduced reductions : list, optional Optional list that will be filled with the reductions applied Returns ------- Number of reductions applied """ cdef int total_reductions = 0 if keep is None: keep = [] for node in keep: self._validate_node(node) node = self.next_reducible_node(keep) while node is not None: if self.in_degree(node) == 1 and self.out_degree(node) == 1: applications = self.series_rule(node) elif self.in_degree(node) == 1 and self.out_degree(node) > 1: applications = self.split_rule(node) elif self.in_degree(node) > 1 and self.out_degree(node) == 1: applications = self.sum_rule(node) elif self.in_degree(node) > 1 and self.out_degree(node) > 1: applications = self.fan_rule(node) else: raise Exception(f"Node {node} is not reducible but marked as it should be") total_reductions += len(applications) if reductions is not None: reductions.extend(applications) node = self.next_reducible_node(keep) return total_reductions def __dealloc__(self): destroy_graph(self.graph)