"""Kat model compiler.
This takes parsed productions, figures out the dependencies and builds the model in the
correct order.
Sean Leavey <sean.leavey@ligo.org>
"""
from __future__ import annotations
import logging
from collections import defaultdict, ChainMap
from collections.abc import Iterable
from io import StringIO
import re
from typing import Optional
from functools import singledispatchmethod
from networkx import (
selfloop_edges,
lexicographical_topological_sort,
simple_cycles,
descendants,
NetworkXUnfeasible,
)
import numpy as np
from ..warnings import KeywordUsedWarning
from .. import Model
from ..env import warn
from ..symbols import Resolving
from ..utilities import option_list, ngettext
from finesse.utilities.text import get_close_matches
from ..utilities.functools import flagdispatchmethod
from ..exceptions import (
FinesseException,
IllegalSelfReferencing,
ModelAttributeError,
ModelParameterDefaultValueError,
ModelParameterSelfReferenceError,
ContextualTypeError,
ContextualValueError,
)
from .graph import ROOT_NODE_NAME, KatGraph, KatNodeType, KatEdgeType
from .containers import (
KatToken,
KatScript,
KatElement,
KatFunction,
KatKwarg,
KatGroupedExpression,
KatExpression,
KatArray,
KatNumericalArray,
)
from .parser import KatParser
from .exceptions import KatScriptError, KatSyntaxError, KatMissingAfterDirective
from .util import duplicates, merge_attributes
from finesse.script.adapter import ArgumentType
LOGGER = logging.getLogger(__name__)
def _production(node, graph):
from .generator import KatUnfiller
unfiller = KatUnfiller()
return unfiller.production(node, graph)
def _user_arg_productions_by_name(name, statement_node, adapter, graph):
"""Get arguments with name `name` specified by the user corresponding to the `graph`
node `statement_node`.
This searches the arguments specified by the user, so it can therefore be used to
find invalid or duplicate arguments. Any positional arguments specified by the user
are still matched by looking up the setter signature via `adapter`.
Yields
------
:class:`.TokenContainer`
Production matching `name`.
"""
for node in graph.sorted_dependent_argument_nodes(statement_node):
try:
argument = graph.argument(node, adapter)
except TypeError:
# This is an additional, unsupported positional argument.
continue
if argument.name == name:
yield _production(node, graph)
def _get_unknown_token_message(
token_type: str,
token_name: str,
options: Iterable[str],
prefix: Optional[str] = None,
) -> str:
"""Utility function to build an error message for an unknown token, possibly
including suggestions for corrections.
Parameters
----------
token_type : str
e.g. 'function', 'element' or 'command'
token_name : str
name of the unrecognized token
options : Iterable[str]
Iterable of options that are relevant in the current context
prefix: Optional[str]
String to be prefixed to the 'did you mean message'.
Returns
-------
str
Formatted error message, possibly including suggestions
"""
if prefix is None:
msg = f"unknown {token_type} '{token_name}'"
else:
msg = str(prefix)
if suggestions := get_close_matches(token_name, options):
suggestions_text = option_list(suggestions, quotechar="'", sort=True)
msg += f"\n\nDid you mean {suggestions_text}?"
return msg
[docs]class KatCompiler:
"""Kat model compiler."""
[docs] def __init__(self, spec=None):
if spec is None:
from .spec import KATSPEC as spec
self.spec = spec
self.graph = None
self.build_graph = None
self._parser = None
self._parameter_dependencies = None
self._build_order = None
self._element_script_order = None
@property
def script(self):
return self._parser.script
[docs] def compile(self, string, **kwargs):
"""Compile the contents of `string`.
Parameters
----------
string : :class:`str`
The string to compile kat script from.
Other Parameters
----------------
kwargs
Keyword arguments supported by :meth:`.compile_file`.
Returns
-------
:class:`.Model`
The model compiled from reading `string`.
"""
return self.compile_file(StringIO(string), **kwargs)
[docs] def compile_file(self, fobj, model=None, resolve=True, build=True):
"""Compile the contents of the specified file.
Parameters
----------
fobj : :class:`io.FileIO`
The file object to compile kat script from. This should be opened in text
mode.
model : :class:`.Model`, optional
An existing model to compile the contents of `fobj` into. If not specified,
a new, empty model is created.
resolve : :class:`bool`, optional
Resolve the parsed script. If False, the parsed contents is added to graph
but no sanity checks are performed and nothing is returned. Defaults to
True.
build : :class:`bool`, optional
Build the parsed script. If False, no Finesse objects will be added to the
model and nothing will be returned. Defaults to True.
Returns
-------
:class:`.Model` or None
If `resolve` and `build` are True, the model compiled from reading `fobj`;
None otherwise.
Raises
------
:class:`.KatSyntaxError`
If a syntax error is present in the contents of `fobj`.
"""
# Reset state.
self._parser = KatParser()
self.graph = KatGraph()
self.build_graph = None
self._parameter_dependencies = []
self._element_script_order = {}
try:
script = self._parser.parse_file(fobj)
except KatSyntaxError as e:
# There was a syntax error. Try to improve the error message using the spec.
self._reraise_parser_error_in_spec_context(e)
# Build the parse graph.
self._fill(script, ROOT_NODE_NAME)
# At this stage there shouldn't be any dependencies between branches.
assert self.graph.is_tree()
if resolve:
self._resolve()
if build:
model = self._build(ROOT_NODE_NAME, model)
# Merge the compiled syntax graph into the model.
if model.syntax_graph is None:
model.syntax_graph = self.graph
else:
LOGGER.debug("merging compiled syntax into existing syntax graph")
model.syntax_graph.merge(self.graph)
# Sort the model elements back into script order.
def script_order_sort(item):
_, obj = item
# Ensure the script's elements appear after any existing elements in
# the model, but sort script elements in line order. We rely on
# :meth:`.Model.sort_elements` being stable to retain the order of
# existing elements.
try:
return 1, self._element_script_order[obj]
except KeyError:
# This object wasn't compiled as part of this call; leave it
# in its current position.
return (0,)
model.sort_elements(key=script_order_sort)
return model
@property
def _directive_functions(self):
return ChainMap(self.spec.commands, self.spec.analyses)
@property
def _expression_functions(self):
return ChainMap(self.spec.expression_functions, self.spec.unary_operators)
@property
def _available_functions(self):
return ChainMap(self._directive_functions, self._expression_functions)
def _fill(self, value, path, **attributes):
attributes = merge_attributes(
attributes, self._item_node_attributes(value, path)
)
self.graph.add_node(path, **attributes)
if hasattr(value, "arguments"):
# Assemble arguments.
for order, argument in enumerate(value.arguments):
argument_path = self.graph.item_node_name(order, path)
self._fill(argument, argument_path)
self.graph.add_edge(
argument_path, path, type=KatEdgeType.ARGUMENT, order=order
)
return attributes
@singledispatchmethod
def _item_node_attributes(self, item, path):
"""Get the attributes for `item` to be added to the corresponding node."""
raise NotImplementedError(
f"don't know how to handle an item with type '{item.__class__.__name__}'"
)
@_item_node_attributes.register(KatScript)
def _(self, script, path):
return {"type": KatNodeType.ROOT, "extra_tokens": script.extra}
@_item_node_attributes.register(KatToken)
def _(self, token, path):
attributes = {"token": token}
if token.type == "NAME":
# This could be a keyword, constant or parameter reference.
name = token.value
if name in self.spec.keywords:
token_type = KatNodeType.KEYWORD
elif name in self.spec.constants:
token_type = KatNodeType.CONSTANT
else:
# Assume this is a parameter reference like `l1.P`.
self._parameter_dependencies.append(path)
token_type = KatNodeType.REFERENCE
elif token.type == "FIXME":
# User has attempted to parse a script containing a __FIXME__ value.
raise KatSyntaxError(
"an invalid value (probably denoting something that cannot be "
"represented in KatScript) needs manually fixed",
self.script,
token,
)
else:
# This is some other token like a number or string.
token_type = KatNodeType.VALUE
attributes["type"] = token_type
return attributes
@_item_node_attributes.register(KatKwarg)
def _(self, kwarg, path):
return self._fill(
kwarg.value, path, key_token=kwarg.key, equals_token=kwarg.equals
)
@_item_node_attributes.register(KatElement)
def _(self, element, path):
directive_token = element.directive
element_type = directive_token.value
if element_type not in self.spec.elements:
if element_type in self.spec.commands or element_type in self.spec.analyses:
# Syntax error was on an command or analysis definition.
raise KatIncorrectFormError(directive_token, self.script, self.spec)
else:
msg = _get_unknown_token_message(
"element", directive_token.raw_value, self.spec.elements.keys()
)
raise KatSyntaxError(
msg,
self.script,
directive_token,
)
return {
"token": element.directive,
"name_token": element.name,
"type": KatNodeType.ELEMENT,
"extra_tokens": element.extra,
}
@_item_node_attributes.register(KatFunction)
def _(self, function, path):
directive_token = function.directive
if directive_token.value not in self._available_functions:
msg = _get_unknown_token_message(
"function",
directive_token.raw_value,
self.spec.function_directives.keys(),
)
raise KatSyntaxError(
msg,
self.script,
directive_token,
)
return {
"token": directive_token,
"type": KatNodeType.FUNCTION,
"extra_tokens": function.extra,
}
@_item_node_attributes.register(KatGroupedExpression)
def _(self, group, path):
return {
"type": KatNodeType.GROUPED_EXPRESSION,
"extra_tokens": group.extra,
}
@_item_node_attributes.register(KatExpression)
def _(self, expression, path):
operator_token = expression.operator
if operator_token.value not in self.spec.binary_operators:
raise KatSyntaxError(
f"unknown expression operator '{operator_token.raw_value}'",
self.script,
expression.operator,
)
return {"type": KatNodeType.EXPRESSION, "token": operator_token}
@_item_node_attributes.register(KatArray)
def _(self, array, path):
return {"type": KatNodeType.ARRAY, "extra_tokens": array.extra}
@_item_node_attributes.register(KatNumericalArray)
def _(self, array, path):
return {"type": KatNodeType.NUMERICAL_ARRAY, "extra_tokens": array.extra}
def _resolve(self):
"""Resolve references and perform sanity checks on the graph.
Dependencies due to parameters (e.g. `m1.p1.o`) and parameter references (e.g.
`&l1.P`) are created by drawing edges to the relevant nodes, transforming the
parse tree into a graph.
This additionally checks that:
- element names are valid and unique,
- keyword arguments aren't defined in duplicate, and
- no cycles exist due to dependencies (i.e. if a reference depends on the
referencing component or its ancestors in any way).
The `order` attributes of the root edges are overwritten using a topological
sort to ensure the resulting graph is built in a way that most types of
dependency can be resolved.
"""
# Various types of node that we need to validate.
elements = defaultdict(list)
singular_directives = defaultdict(list)
analyses = []
root_directive_nodes = self.graph.dependent_argument_nodes(
ROOT_NODE_NAME, data=True
)
for directive_node, data in root_directive_nodes:
node_type = data["type"]
token = data["token"]
directive = token.value
if node_type in KatNodeType.DIRECTIVE_NODES:
if node_type in KatNodeType.ELEMENT:
name_token = data["name_token"]
name = name_token.value
if name in self.spec.constants:
# Disallow.
raise KatScriptError(
f"constant '{name}' cannot be used as an element name",
self.script,
[[name_token]],
)
elif name in self.spec.keywords:
warn(
f"element name {repr(name)} (line {name_token.lineno}) is "
f"also the name of a keyword, which may lead to confusion",
KeywordUsedWarning,
)
adapter = self._element_adapter(directive)
elements[name].append(directive_node)
elif node_type in KatNodeType.FUNCTION:
adapter = self._function_adapter(directive)
if directive in self.spec.analyses:
analyses.append(token)
if adapter.singular:
singular_directives[directive].append(directive_node)
# Check for duplicate keyword arguments.
dupekvs = duplicates(
[
self.graph.nodes[node]["key_token"]
for node in self.graph.dependent_argument_nodes(directive_node)
if "key_token" in self.graph.nodes[node]
],
key=lambda token: token.raw_value,
)
if dupekvs:
keys, dupetokens = zip(*dupekvs)
keylist = option_list([f"'{key}'" for key in keys], final_sep="and")
msg = ngettext(
len(keys),
f"duplicate arguments with key {keylist}",
f"duplicate arguments with keys {keylist}",
sub=False,
)
error_tokens = [
[token] for tokens in dupetokens for token in tokens
]
raise KatDirectiveError(
msg,
directive,
self.script,
error_tokens,
self.spec,
add_syntax_hint=True,
)
for directive, nodes in singular_directives.items():
if len(nodes) > 1:
raise KatScriptError(
f"there can only be one '{directive}' directive",
self.script,
[[self.graph.nodes[node]["token"]] for node in nodes],
)
for name, nodes in elements.items():
if len(nodes) > 1:
raise KatScriptError(
f"multiple elements with name '{name}'",
self.script,
[[self.graph.nodes[node]["name_token"]] for node in nodes],
)
# Check that there isn't more than one root analysis.
if len(analyses) > 1:
raise KatScriptError(
"duplicate analysis trees (combine with 'series' or 'parallel')",
self.script,
[[token] for token in analyses],
)
# Create dependencies for parameters and parameter references.
for source_param_path in self._parameter_dependencies:
source_token = self.graph.nodes[source_param_path]["token"]
target = source_token.value
# FIXME: remove once '&' is invalid syntax for references.
if source_token.type == "NAME" and target.startswith("&"):
target = target[1:]
# Targets may not exist in the current script; they may already be in the
# model prior to parsing.
try:
target_directive_path = self.graph.param_target_element_path(
target, ROOT_NODE_NAME
)
except ValueError:
LOGGER.debug(
f"parameter {repr(source_token.raw_value)} targetted by "
f"{repr(source_param_path)} does not exist in the script - ignoring"
)
continue
# Add the dependency.
self.graph.add_edge(
target_directive_path,
source_param_path,
type=KatEdgeType.DEPENDENCY,
)
# Create graph with just the root directives (use a copy because it may be
# modified during compilation).
self.build_graph = self.graph.directive_graph(ROOT_NODE_NAME)
def node_build_order(source_graph):
"""Compute node topological order, or throw a cycle error."""
# Remove self-references, since these will be dealt with by the compiler.
graph = source_graph.copy()
for edge in selfloop_edges(source_graph):
graph.remove_edge(*edge)
try:
# Compute topological order, resolving ambiguities in favour of lower
# line numbers.
# Convert to list so we know immediately if there are cycles.
return list(
lexicographical_topological_sort(
graph, key=lambda node: graph.nodes[node]["token"].lineno
)
)
except NetworkXUnfeasible:
# The graph contains at least one cyclic dependency. Work out the
# cause(s) by finding cycles using the full graph.
cyclic_param_nodes = set()
for cycle_nodes in simple_cycles(self.graph):
for cycle_node in cycle_nodes:
data = self.graph.nodes[cycle_node]
if data["type"] not in KatNodeType.DEPENDENT_NODES:
# Ignore anything in the cycle that isn't a reference node
# (e.g. directives).
continue
cyclic_param_nodes.add(cycle_node)
# Grab and sort the cyclic tokens in line order.
cyclic_parameter_tokens = sorted(
[self.graph.nodes[node]["token"] for node in cyclic_param_nodes],
key=lambda tok: tok.lineno,
)
raise KatCycleError(self.script, cyclic_parameter_tokens)
# Split the nodes into groups depending on whether they descend from a node with
# the "last" flag.
first_build_nodes = set(self.build_graph.nodes)
second_build_nodes = set()
for node in self.build_graph.nodes():
adapter = self._directive_adapter(self.graph.nodes[node]["token"].value)
try:
last = adapter.factory.last
except AttributeError:
last = False
if not last or node in second_build_nodes:
# Keep the directive in the first set.
continue
# All nodes that descend from (depend on) specified node.
children = descendants(self.build_graph, node)
# Move node and descendants from first to second build step.
first_build_nodes.remove(node)
first_build_nodes.difference_update(children)
second_build_nodes.add(node)
second_build_nodes.update(children)
first_build_order = node_build_order(
self.build_graph.subgraph(first_build_nodes)
)
second_build_order = node_build_order(
self.build_graph.subgraph(second_build_nodes)
)
self._build_order = first_build_order + second_build_order
LOGGER.debug(f"Build order: {self._build_order}")
def _build(self, node, model, kwarg_as_dict=True):
"""Build the branch at `node` into a model item.
`kwarg_as_dict` sets whether keyword arguments should be returned as
:class:`.dict` instead of just values.
"""
data = self.graph.nodes[node]
nodetype = data["type"]
value = self._do_build(nodetype, node, data, model)
# Create a dictionary for keyword arguments, if requested.
if kwarg_as_dict and "key_token" in data:
value = {data["key_token"].value: value}
LOGGER.debug(f"compiled {nodetype} {node} to {type(value)}")
return value
@flagdispatchmethod
def _do_build(self, nodetype, node, data, model):
raise NotImplementedError(
f"don't know how to compile parameter {repr(nodetype)}"
)
@_do_build.register(KatNodeType.COMPILER_TERMINAL_NODES)
def _(self, node, data, model):
# Just use the value.
return data["token"].value
@_do_build.register(KatNodeType.CONSTANT)
def _(self, node, data, model):
# Look up the symbol corresponding to the constant.
return self.spec.constants[data["token"].value]
@_do_build.register(KatNodeType.REFERENCE)
def _(self, node, data, model):
source_directive_path = self.graph.branch_base(node, ROOT_NODE_NAME)
if self.graph.has_edge(source_directive_path, node):
# This is a reference to another parameter in the same element. Throw an
# exception that can get caught by the argument compiler. Doing it this
# way ensures expressions with nested self-references (like `1-&m1.T` in
# `m m1 1-&m1.T 0`) get set as resolving.
LOGGER.debug(f"{node} is a self-reference")
raise KatParameterSelfReferenceException()
# Copy existing model parameter by reference.
target = data["token"].value
if target.startswith("&"):
warn(
"Parameter references should no longer start with '&'; support for "
"this syntax will be removed in a future release.",
FutureWarning,
)
target = target[1:]
# Reraise Finesse errors from incorrect targets as KatScript errors.
try:
value = model.get(target)
except ModelAttributeError as e:
raise KatParameterBuildError(e, self.script, node, self.graph) from e
# Attempt to get reference to the parameter.
try:
value = value.ref
except (AttributeError, FinesseException):
# This is not a model parameter. We get an AttributeError for e.g. ports
# and nodes, and FinesseException for model elements.
LOGGER.debug(f"parameter {node} targets {target} by value")
else:
LOGGER.debug(f"parameter {node} targets {target} by reference")
return value
@_do_build.register(KatNodeType.EXPRESSION)
def _(self, node, data, model):
operator = self.spec.binary_operators[data["token"].value]
arguments = self._built_arguments(node, model)
# Arguments are already in order.
assert len(arguments) == 2
lhs, rhs = arguments
value = operator(lhs, rhs)
# Eagerly evaluate the expression if its arguments don't depend on anything
# else.
if self.graph.is_independent(node):
try:
value = value.eval()
except AttributeError:
pass # Ignore if no eval method present
LOGGER.debug(
f"eagerly evaluated dependencyless expression '{node}' to a "
f"{type(value)}"
)
return value
@_do_build.register(KatNodeType.GROUPED_EXPRESSION)
def _(self, node, data, model):
arguments = self._built_arguments(node, model)
assert len(arguments) == 1
return arguments[0]
@_do_build.register(KatNodeType.ARRAY)
def _(self, node, data, model):
return self._built_arguments(node, model)
@_do_build.register(KatNodeType.NUMERICAL_ARRAY)
def _(self, node, data, model):
return np.array(self._built_arguments(node, model))
@_do_build.register(KatNodeType.ELEMENT)
def _(self, node, data, model):
# needs change here?
args, kwargs = self._built_directive_params(node, model)
adapter = self._element_adapter(data["token"].value)
# Add name to arguments.
name = data["name_token"].value
args = [name, *args]
try:
return adapter.factory(*args, **kwargs)
except Exception as e:
raise KatDirectiveBuildError(
e,
data["token"].value,
self.script,
node,
adapter,
self.graph,
self.spec,
) from e
@_do_build.register(KatNodeType.FUNCTION)
def _(self, node, data, model):
function_name = data["token"].value
if function_name in self._expression_functions:
operator = self._expression_functions[function_name]
args, kwargs = self._built_directive_params(node, model)
try:
value = operator(*args, **kwargs)
except TypeError as e:
# Replace 'lambda' in the function init error with the function
# name.
args = list(e.args)
args[0] = re.sub(
r"(\<lambda\>(\d+)?|\w+)\(\)",
f"'{function_name}'",
args[0],
)
raise TypeError(*args)
# Eagerly evaluate the function if its arguments don't depend on
# anything else.
if self.graph.is_independent(node):
try:
value = value.eval()
except AttributeError:
pass # Ignore if no eval method present
LOGGER.debug(
f"eagerly evaluated dependencyless function '{node}' to a "
f"{type(value)}"
)
else:
# A directive function.
adapter = self._function_adapter(data["token"].value)
args, kwargs = self._built_directive_params(node, model)
if adapter.factory is not None:
try:
value = adapter.factory(*args, **kwargs)
except Exception as e:
raise KatDirectiveBuildError(
e,
data["token"].value,
self.script,
node,
adapter,
self.graph,
self.spec,
) from e
else:
LOGGER.debug(f"function {node} has arguments {args}, {kwargs}")
value = args, kwargs
return value
@_do_build.register(KatNodeType.ROOT)
def _(self, node, data, model):
if model is None:
LOGGER.debug("creating new model")
model = Model()
else:
LOGGER.debug(f"compiling into existing model {repr(model)}")
# Use the computed build order, not the argument order.
for argument_node in self._build_order:
item = self._build(argument_node, model)
directive_token = self.graph.nodes[argument_node]["token"]
directive = directive_token.value
adapter = self._directive_adapter(directive)
if self.graph.nodes[argument_node]["type"] is KatNodeType.ELEMENT:
# Remember the item's original order for when we sort the elements.
self._element_script_order[item] = directive_token.lineno
LOGGER.debug(f"applying {repr(item)} ({argument_node}) to model")
try:
adapter.setter(model, item)
except Exception as e:
raise KatDirectiveBuildError(
e,
directive,
self.script,
argument_node,
adapter,
self.graph,
self.spec,
) from e
# Get all nodes connected to this argument node by a DEPENDENCY edge,
# where the node is a subbranch of the argument node.
# Convert to a list because we'll iterate multiple times.
self_references = list(
self.graph.filter_dependent_nodes(
argument_node,
key=lambda refnode, _: self.graph.branch_base(
refnode, ROOT_NODE_NAME
)
== argument_node,
)
)
if self_references:
# Delete graph dependencies so that references resolve when we build
# again.
for target in self_references:
LOGGER.debug(f"breaking dependency {argument_node} -> {target}")
self.graph.remove_edge(argument_node, target)
for self_ref_path in self_references:
self_ref_param_path = self.graph.branch_base(
self_ref_path, argument_node
)
# Figure out the signature argument and use it to grab the
# corresponding element's parameter.
reference_argument = self.graph.argument(
self_ref_param_path, adapter
)
LOGGER.debug(
f"resolving self-referencing {reference_argument.name}"
)
value = self._build(self_ref_param_path, model, kwarg_as_dict=False)
LOGGER.debug(
f"setting {reference_argument.name} to {repr(value)} (type "
f"{type(value)})"
)
try:
adapter.setter.update_parameter(item, reference_argument, value)
except Exception as e:
raise KatParameterBuildError(
e, self.script, self_ref_param_path, self.graph
) from e
LOGGER.debug(f"finished resolving self-refs for {repr(item)}")
# Remove dependency edges from the graph. They are no longer needed now that
# we've built everything, and if left they can interfere merging of syntax
# graphs in the model.
self.graph.remove_edges_from(
[
(u, v)
for u, v, edgetype in self.graph.edges(data="type")
if edgetype == KatEdgeType.DEPENDENCY
]
)
return model
def _built_arguments(self, node, model):
"""Get built dependent arguments of the current node."""
return [
self._build(argument_node, model)
for argument_node in self.graph.sorted_dependent_argument_nodes(node)
]
def _built_directive_params(self, node, model):
"""Get built dependent arguments of the current node in Python signature form.
Self-referencing parameters are caught here too, and set to :class:`.Resolving`;
these are fully resolved at the end of compilation.
"""
args = []
kwargs = {}
for argument_node in self.graph.sorted_dependent_argument_nodes(node):
try:
item = self._build(argument_node, model)
except KatParameterSelfReferenceException:
item = Resolving()
# Detect kwargs.
if key_token := self.graph.nodes[argument_node].get("key_token"):
item = {key_token.value: item}
if isinstance(item, dict):
kwargs.update(item)
else:
args.append(item)
return args, kwargs
def _element_adapter(self, element):
return self.spec.elements[element]
def _function_adapter(self, function):
if command := self.spec.commands.get(function):
return command
if analysis := self.spec.analyses.get(function):
return analysis
raise KeyError(f"could not find function corresponding to '{function}'")
def _directive_adapter(self, directive):
try:
return self._element_adapter(directive)
except KeyError:
try:
return self._function_adapter(directive)
except KeyError:
raise KeyError(
f"could not find directive corresponding to '{directive}'"
)
def _reraise_parser_error_in_spec_context(self, error):
"""Try to improve parser errors using the :class:`.KatSpec`.
The parser is unaware of the available elements and functions etc. in the kat
language spec, because this is only used during compilation. In some cases the
error messages can be improved with knowledge of the spec, so this method
identifies such cases.
"""
if isinstance(error, KatMissingAfterDirective):
# Use the spec to figure out whether the user might have missed a
# parenthesis or a name.
if error.directive.value in self.spec.directives:
raise KatIncorrectFormError(error.directive, self.script, self.spec)
else:
# Unrecognised directive type. Since that's the earlier error, throw
# that.
raise KatScriptError(
f"unknown element or function '{error.directive.raw_value}'",
self.script,
[[error.directive]],
)
# Nothing matched, so just reraise.
raise error
[docs]class KatCycleError(KatScriptError):
"""Cyclic parameters."""
def __init__(self, script, cyclic_nodes):
# Convert list of error tokens to error item lists.
items = [[item] for item in cyclic_nodes]
super().__init__("cyclic parameters", script, items)
[docs]class KatDirectiveError(KatScriptError):
"""Error compiling a directive (either resolving or building)."""
def __init__(
self, error, directive, script, error_items, spec, add_syntax_hint=False
):
syntax = None
if add_syntax_hint:
adapter = spec.directives[directive]
syntax = adapter.documenter.syntax(spec, adapter)
super().__init__(error, script, error_items, syntax=syntax)
[docs]class KatDirectiveBuildError(KatDirectiveError):
"""Error during the building of a directive (i.e. the call to the Finesse Python
object)."""
def __init__(self, error, directive, script, statement_node, adapter, graph, spec):
if error:
# Rewrite the error message to provide additional context to the user about
# what went wrong.
error, error_items, add_syntax_hint = self._rewrite(
error, directive, script, statement_node, adapter, graph, spec
)
else:
# Mark the whole line as the cause of the error.
error_items = [[_production(statement_node, graph)]]
add_syntax_hint = False
super().__init__(
error, directive, script, error_items, spec, add_syntax_hint=add_syntax_hint
)
@singledispatchmethod
def _rewrite(self, error, directive, script, statement_node, adapter, graph, spec):
# This performs the default rewrite behaviour when no matching type is found in
# the dispatch.
# By default, mark the whole line as the cause of the error.
error_items = [[_production(statement_node, graph)]]
return str(error), error_items, False
# see #642
@_rewrite.register(IllegalSelfReferencing)
def _(self, error, directive, script, statement_node, adapter, graph, spec):
name = graph.nodes[statement_node]["name_token"].raw_value
error_items = []
for node in graph.nodes:
if node == statement_node:
continue
node = graph.nodes[node]
if node["type"] == KatNodeType.REFERENCE and node[
"token"
].raw_value.startswith(f"{name}."):
error_items.append([node["token"]])
return str(error).strip(), error_items, False
@_rewrite.register(TypeError)
def _(self, error, directive, script, statement_node, adapter, graph, spec):
# Replace the called Finesse Python API function, if present, with the directive
# name. Replaces whatever appears before the first "()" in the error.
msg = re.sub(r".+\(\)", f"'{directive}'", str(error))
# Match specific error messages.
if matches := re.search(r"unexpected keyword argument '(\w+)'", msg):
def keyfunc(_, nodedata):
try:
return nodedata["key_token"].value == matches.group(1)
except KeyError:
return False
kwarg_node = next(graph.filter_argument_nodes(statement_node, keyfunc))
# Mark the matching kwarg key in the error.
item = _production(kwarg_node, graph)
error_items = [[item.key]]
# do not produce suggestions for single-character keyword arguments
# since the suggestions will always contain all other single-character
# options (which is not very helpful)
if len(item.key.raw_value) > 1:
possible_kwargs = [
k
for k, v in adapter.setter.arguments().items()
if v.kind in [ArgumentType.ANY, ArgumentType.KEYWORD_ONLY]
]
msg = _get_unknown_token_message(
token_type="",
token_name=item.key.raw_value,
options=possible_kwargs,
prefix=msg,
)
elif matches := re.search(
r"missing (\d+) required positional arguments?:", msg
):
params = list(adapter.setter.positional_args(keyword_defaults=False))
nmissing = int(matches.group(1))
imissingstart = len(params) - nmissing
assert nmissing > 0
assert imissingstart >= 0
description = ngettext(
nmissing,
"{n} required positional argument",
"{n} required positional arguments",
)
missing = option_list(
[repr(arg) for arg in params[imissingstart:]], final_sep="and"
)
msg = f"'{directive}' missing {description}: {missing}"
# Add a marker at the end of the arguments.
item = _production(statement_node, graph)
error_items = [[item.missing_argument_meta_token()]]
elif matches := re.search(
r"takes (\d+) positional arguments but (\d+) were given", msg
):
# Fix the number of arguments.
params = adapter.setter.positional_args()
ncorrection = int(matches.group(1)) - len(params)
assert ncorrection >= 0
nnew = [int(n) - ncorrection for n in matches.groups()]
nmax = nnew[0]
arg = ngettext(nnew[0], "argument", "arguments", sub=False)
msg = (
f"'{directive}' takes {nnew[0]} positional {arg} but {nnew[1]} "
f"were given"
)
# Mark the extra arguments.
arguments = graph.sorted_dependent_argument_nodes(statement_node)
error_items = [[_production(node, graph)] for node in arguments[nmax:]]
elif matches := re.search(
r"takes from (\d+) to (\d+) positional arguments but (\d+) were given",
msg,
):
# Fix the number of arguments.
params = adapter.setter.positional_args()
ncorrection = int(matches.group(2)) - len(params)
assert ncorrection >= 0
nnew = [int(n) - ncorrection for n in matches.groups()]
nmax = nnew[1]
msg = (
f"'{directive}' takes from {nnew[0]} to {nnew[1]} positional "
f"arguments but {nnew[2]} were given"
)
# Mark the extra arguments.
arguments = graph.sorted_dependent_argument_nodes(statement_node)
error_items = [[_production(node, graph)] for node in arguments[nmax:]]
elif matches := re.search(r"got multiple values for argument '(\w+)'", msg):
# Mark the duplicate arguments. We can't use the signature here because by
# definition this error happens when the signature is not followed. Instead
# we use the graph to see what was actually parsed.
error_items = []
items = _user_arg_productions_by_name(
matches.group(1), statement_node, adapter, graph
)
for item in items:
if hasattr(item, "key"):
# This is a kwarg - mark the value.
item = item.value
error_items.append([item])
else:
# By default, mark the whole line as the cause of the error.
error_items = [[_production(statement_node, graph)]]
return msg, error_items, True
@_rewrite.register(ContextualTypeError)
def _(self, error, directive, script, statement_node, adapter, graph, spec):
arg_item = next(
_user_arg_productions_by_name(error.param, statement_node, adapter, graph)
)
error_items = [[arg_item]]
# Substitute known types with descriptive equivalents (shield the user from
# Finesse internals).
allowedstr = option_list(
[spec.type_descriptor(t, t.__name__) for t in error.allowed_types]
)
gotstr = spec.type_descriptor(type(error.value), type(error.value).__name__)
msg = (
f"invalid type for '{directive}' argument '{error.param}': expected "
f"{allowedstr}, got {gotstr}"
)
return msg, error_items, True
@_rewrite.register(ContextualValueError)
def _(self, error, directive, script, statement_node, adapter, graph, spec):
data = graph.nodes[statement_node]
error_items = []
for param, value in error.params.items():
try:
item = next(
_user_arg_productions_by_name(param, statement_node, adapter, graph)
)
except StopIteration:
if value == ContextualValueError.empty:
# The value was empty because it wasn't specified.
continue
# Names are arguments to Python but aren't to the parser.
if param == "name" and "name_token" in data:
# This is an element and the issue was with the name.
item = data["name_token"]
else:
# Something went wrong...
raise RuntimeError(
f"'{directive}' build signalled an error with argument "
f"'{param}' but no matching argument of graph node "
f"{statement_node} was found"
)
if hasattr(item, "key"):
# Mark the value when it's a kwarg.
item = item.value
error_items.append([item])
if not error_items:
if any(
[value == ContextualValueError.empty for value in error.params.values()]
):
# There are missing parameters. Add a marker at the end of the
# arguments.
statement = _production(statement_node, graph)
error_items = [[statement.missing_argument_meta_token()]]
else:
# By default, mark the whole line as the cause of the error.
error_items = [[_production(statement_node, graph)]]
problem = ngettext(
len(error.params),
f"invalid value for '{directive}' argument",
f"invalid values for '{directive}' arguments",
sub=False,
)
extra = f": {error.extra_info}" if error.extra_info else ""
args = option_list(error.params, final_sep="and", quotechar="'")
msg = f"{problem} {args}{extra}"
return msg, error_items, True
[docs]class KatParameterBuildError(KatScriptError):
"""Error during the building of a directive parameter."""
def __init__(self, error, script, param_node, graph):
data = graph.nodes[param_node]
value_token = data["token"]
show_items = [value_token]
# Add the key if it exists.
try:
show_items.append(data["key_token"])
except KeyError:
pass
msg = str(error).strip()
if isinstance(error, ModelAttributeError):
# Mark only the value.
show_items = [value_token]
elif isinstance(error, ModelParameterDefaultValueError):
# A referenced component doesn't have a default model parameter.
target = data["token"].value
hint = f"{target}.[some parameter]"
msg = f"{msg} (hint: try {repr(hint)})"
# Mark only the value.
show_items = [value_token]
elif isinstance(error, ModelParameterSelfReferenceError):
# Grab the production.
item = _production(param_node, graph)
# Mark the key and the value.
show_items = item.sorted_tokens
super().__init__(msg, script, [show_items])
[docs]class KatParameterSelfReferenceException(Exception):
"""Indication that a parameter contains a self-reference that must be resolved
later."""