Source code for code_index.mcp_server.services.graph_analyzer_service
from __future__ import annotations
from code_index.analyzer import SimpleAnalyzer
from code_index.analyzer.models import (
CallGraph,
Direction,
FindPathsResult,
GraphConstructOptions,
PathReturnMode,
)
from code_index.models import PureDefinition
from .code_index_service import CodeIndexService
[docs]
class GraphAnalyzerService:
"""A service for analyzing the call graph of a repository."""
_instance: GraphAnalyzerService | None = None
[docs]
@classmethod
def get_instance(cls) -> GraphAnalyzerService:
"""Get the singleton instance of GraphAnalyzerService."""
if cls._instance is None:
cls._instance = GraphAnalyzerService()
return cls._instance
def __init__(self):
self._analyzer = SimpleAnalyzer()
self._graph_cache: CallGraph | None = None
[docs]
def _get_graph(self) -> CallGraph:
"""Get the call graph, using a cache if available."""
if self._graph_cache is not None:
return self._graph_cache
index = CodeIndexService.get_instance().index
options = GraphConstructOptions(direction=Direction.FORWARD, compute_scc=True)
graph = self._analyzer.get_call_graph(index, options)
self._graph_cache = graph
return graph
[docs]
def clear_cache(self) -> None:
"""Clear the graph cache. Call this when the index is rebuilt."""
self._graph_cache = None
[docs]
def get_call_graph_overview(self):
"""
Generate a general view/stat of the call graph.
"""
from code_index.mcp_server.models import GraphOverviewResponse, SCCDetail, SCCOverview
graph = self._get_graph()
scc_details = [
SCCDetail(
scc_id=i,
size=len(scc),
nodes=[graph.nodes[node_idx] for node_idx in scc[:5]], # show first 5 nodes
)
for i, scc in enumerate(graph.sccs)
]
# Build adjacency list for determining starting nodes
adj: dict[int, list[int]] = {}
for edge in graph.edges:
if edge.src not in adj:
adj[edge.src] = []
adj[edge.src].append(edge.dst)
entrypoints = self._analyzer._determine_starting_nodes(graph, Direction.FORWARD, adj)
endpoints = self._analyzer._determine_starting_nodes(graph, Direction.BACKWARD, adj)
return GraphOverviewResponse(
stats=graph.stats,
scc_overview=SCCOverview(count=len(graph.sccs), details=scc_details),
entrypoints=[graph.nodes[i] for i in entrypoints[:10]],
endpoints=[graph.nodes[i] for i in endpoints[:10]],
)
[docs]
def get_subgraph(self, roots: list[PureDefinition], depth: int) -> CallGraph:
"""
Generate a subgraph of the call-graph from specific definition(s).
"""
graph = self._get_graph()
root_indices = [i for i, node in enumerate(graph.nodes) if node in roots]
return self._analyzer.get_subgraph(graph, roots=root_indices, depth=depth)
[docs]
def find_paths(
self,
src: PureDefinition,
dst: PureDefinition,
k: int = 3,
mode: PathReturnMode = PathReturnMode.HYBRID,
) -> FindPathsResult:
"""
Find paths between two definitions.
"""
graph = self._get_graph()
try:
src_idx = graph.nodes.index(src)
dst_idx = graph.nodes.index(dst)
except ValueError as e:
raise ValueError(f"Source or destination node not found in graph: {e}") from e
return self._analyzer.find_paths(graph, src_idx, dst_idx, k=k, return_mode=mode)
[docs]
def get_topological_order(
self, direction: Direction = Direction.BACKWARD
) -> list[PureDefinition]:
"""
Iterate the definitions topologically.
BACKWARD (default) goes from deepest dependencies up to entrypoints.
"""
graph = self._get_graph()
return list(self._analyzer.bfs_traverse_graph(graph, direction=direction))