Source code for pyvcad_attribute_resolver.solver

from collections import deque


[docs] class ResolverError(Exception): """Base error for the attribute resolver."""
[docs] class NoPathError(ResolverError): """No conversion path found between the available and target attributes."""
[docs] class AmbiguousPathError(ResolverError): """Multiple equally-valid conversion paths found.""" def __init__(self, message, paths=None): super().__init__(message) self.paths = paths or []
def solve(graph, available_attributes, target_attribute, allowed_conversions=None, tags=None, max_depth=10): """Find a conversion path from any available attribute to the target. Uses BFS to find shortest-length paths through the conversion graph. When multiple shortest paths exist, the one with the highest total priority is selected. If priorities are tied the call raises ``AmbiguousPathError`` so the user can disambiguate. Arguments: graph: ``ConversionGraph`` instance. available_attributes: Attribute names present on the design. target_attribute: The desired output attribute. allowed_conversions: Optional list of ``ConversionEntry`` names to restrict the search. tags: Optional list of tags; edges must match *at least one*. max_depth: Maximum conversion chain length (cycle guard). Returns: A list of ``ConversionEntry`` forming the conversion path. An empty list if the target is already available. Raises: NoPathError: No conversion path exists. AmbiguousPathError: Multiple shortest paths with equal priority. """ if target_attribute in available_attributes: return [] allowed_set = set(allowed_conversions) if allowed_conversions else None tags_set = set(tags) if tags else None # BFS state: each queue element is (available_attribute_set, path_so_far). # Conversions only add new attributes, so there are no cycles as long as # we refuse to apply edges whose target is already present in the state. initial_state = frozenset(available_attributes) queue = deque([(initial_state, [])]) all_paths = [] # paths that reach the target shortest_len = None # length of the first path found while queue: current_state, path = queue.popleft() # Depth guard if len(path) >= max_depth: continue # If we already found a shortest path and the current path # is longer, stop exploring (BFS guarantees level-order) if shortest_len is not None and len(path) >= shortest_len: continue candidate_edges = {} for current_attribute in current_state: for entry in graph.neighbors(current_attribute): candidate_edges[entry.name] = entry for entry in candidate_edges.values(): if allowed_set is not None and entry.name not in allowed_set: continue if tags_set is not None and not tags_set.intersection(entry.tags): continue if not set(entry.required_inputs).issubset(current_state): continue if entry.target in current_state: continue new_path = path + [entry] new_state = frozenset(set(current_state) | {entry.target}) if entry.target == target_attribute: if shortest_len is None: shortest_len = len(new_path) if len(new_path) == shortest_len: all_paths.append(new_path) continue queue.append((new_state, new_path)) if not all_paths: raise NoPathError( f"No conversion path found to '{target_attribute}' from " f"available attributes {list(available_attributes)}" ) if len(all_paths) == 1: return all_paths[0] # Disambiguate by total priority (higher is better) scored = [(sum(e.priority for e in p), p) for p in all_paths] scored.sort(key=lambda x: x[0], reverse=True) if scored[0][0] > scored[1][0]: return scored[0][1] # Still tied — report ambiguity tied_paths = [p for s, p in scored if s == scored[0][0]] descriptions = [] for p in tied_paths: chain = " -> ".join(e.name for e in p) descriptions.append(chain) raise AmbiguousPathError( f"Ambiguous conversion to '{target_attribute}': " f"{len(tied_paths)} paths with equal priority: " + "; ".join(descriptions), paths=tied_paths, )