Source code for nxtransit.routers

"""Main routing algorithms for time-dependent graphs."""

import bisect
from heapq import heappop, heappush
from typing import Dict, List, Tuple, Optional

from networkx import DiGraph
from .functions import _reconstruct_path


[docs] def _calculate_delay( graph, from_node, to_node, current_time, wheelchair=False ) -> Tuple[float, Optional[str]]: """ Calculates the delay and route for a given graph, from_node, to_node, and current_time. Used in the time-dependent Dijkstra algorithm. """ edge = graph[from_node][to_node] schedules = edge.get("sorted_schedules") if schedules: departure_times = edge["departure_times"] idx = bisect.bisect_left(departure_times, current_time) if idx < len(schedules): next_departure, next_arrival, route, wheelchair_acc = schedules[idx] if not wheelchair or wheelchair_acc == 1: delay = (next_departure - current_time) + ( next_arrival - next_departure ) return delay, route return float("inf"), None else: return edge.get("weight", float("inf")), None
[docs] def time_dependent_dijkstra( graph: DiGraph, source: str, target: str, start_time: float, track_used_routes: bool = False, wheelchair: bool = False, ) -> Tuple[List[str], float, float, Optional[set]]: """ Finds the shortest path between two nodes in a time-dependent graph using Dijkstra's algorithm. Parameters ---------- graph : networkx.Graph The graph to search for the shortest path. source The starting node. target The target node. start_time : float The starting time. wheelchair : bool, optional If set to True, the algorithm will only use wheelchair accessible routes. Returns ------- tuple A tuple containing the following elements: - list: The shortest path from the source to the target node. - float: The arrival time at the target node. - float: The travel time from the source to the target node. - set: The set of used routes. Examples -------- >>> G = nt.feed_to_graph(feed) >>> path, arrival_time, travel_time = nt.time_dependent_dijkstra(G, "A", "B", 86400) Implementation -------------- This function uses a priority queue to explore the graph with almost classic Dijkstra's algorithm. The main difference is that the delay between two nodes is calculated based on the ``current time`` and the sorted schedules of the edge. The function also keeps track of the routes used in the path. >>> G = nx.DiGraph() >>> G.add_edge("A", "B") >>> G.edges["A", "B"]["sorted_schedules"] = [ ... (10, 20, "route_1", None), ... (30, 40, "route_2", None), ... (50, 60, "route_3", None), ... ] So the edge from 'A' to 'B' has three schedules: route_1 departs at 10 and arrives at 20, route_2 departs at 30 and arrives at 40, and route_3 departs at 50 and arrives at 60. Internal function ``_calculate_delay`` will return the delay and the route for the next departure from 'A' to 'B' at a given time. i.e. if the current time is 25, the next departure is at 30, so the delay is 5 and time to arrival is 5 + (40 - 30) = 10. References ---------- .. [1] Gerth Stølting Brodal, Riko Jacob: Time-dependent Networks as Models to Achieve Fast Exact Time-table Queries. Electronic Notes in Theoretical Computer Science, 92:3-15, 2004. https://doi.org/10.1016/j.entcs.2003.12.019 [1]_ .. [2] Bradfield: Shortest Path with Dijkstra's Algorithm Practical Algorithms and Data Structures https://bradfieldcs.com/algos/graphs/dijkstras-algorithm/ [2]_ """ # abort immediately if the source or target node does not exist in the graph if source not in graph or target not in graph: raise ValueError("The source or target node does not exist in the graph.") # Initialize arrival times and predecessors for the current node in the queue arrival_times = {node: float("inf") for node in graph.nodes} predecessors = {node: None for node in graph.nodes} arrival_times[source] = start_time queue = [(start_time, source)] visited = set() # Track used routes routes = {} # while the queue is not empty and the target node has not been visited while queue: # Extract the node with the smallest arrival time from the queue current_time, u = heappop(queue) # If the node is the target, stop the execution if u == target: break # If the node has already been visited with a better result, skip it if u in visited and current_time > arrival_times[u]: continue # Add the node to the visited set to avoid visiting it again visited.add(u) # Iterate over all neighbors of the node for v in graph.neighbors(u): # If the neighbor has not been visited yet if v not in visited: delay, route = _calculate_delay( graph, u, v, current_time, wheelchair=wheelchair ) # Skip the neighbor if the arrival time is infinite if delay == float("inf"): continue # Calculate the new arrival time for the neighbor new_arrival_time = current_time + delay # If the new arrival time is better, update the arrival time and predecessor if new_arrival_time < arrival_times[v]: arrival_times[v] = new_arrival_time routes[v] = route # Assign the current node U as the predecessor of the neighbor V (in the loop) predecessors[v] = u # Add the neighbor to the queue with the new arrival time heappush(queue, (new_arrival_time, v)) travel_time = arrival_times[target] - start_time # reconstruct the path path = _reconstruct_path(target=target, predecessors=predecessors) if path[0] == source: # Empty set to track used routes used_routes = set() # Iterate over all nodes in the path for i in range(len(path) - 1): v = path[i + 1] # Add route, used to go from node U to node V used_routes.add(routes[v]) return path, arrival_times[target], travel_time, used_routes else: # If the path does not start with the source node, something went wrong, the path was not found return [], float("inf"), -float("inf"), set()
[docs] def single_source_time_dependent_dijkstra( graph: DiGraph, source: str, start_time: float, ) -> Tuple[Dict[str, float], Dict[str, str], Dict[str, float]]: """ Compute the shortest paths and travel times from a single source node to all other nodes in a time-dependent graph. Parameters ---------- graph : nx.DiGraph The time-dependent graph. source : Node The source node of the graph. start_time : float The starting time in seconds since midnight. Returns ------- tuple A tuple containing three dictionaries: - arrival_times: A dictionary mapping each node to the earliest arrival time from the source node. - predecessors: A dictionary mapping each node to its predecessor on the shortest path from the source node. - travel_times: A dictionary mapping each node to the travel time from the source node. See Also -------- nxtransit.routers.time_dependent_dijkstra : Point to point routing algorithm for time-dependent graphs. """ if source not in graph: raise ValueError(f"The source node {source} does not exist in the graph.") if not isinstance(start_time, (int, float)): raise ValueError("The start time must be a number.") arrival_times = {node: float("inf") for node in graph.nodes} predecessors = {node: None for node in graph.nodes} arrival_times[source] = start_time travel_times = {} queue = [(start_time, source)] while queue: current_time, current_node = heappop(queue) for neighbor in graph.neighbors(current_node): delay, _ = _calculate_delay( graph=graph, from_node=current_node, to_node=neighbor, current_time=current_time, ) new_arrival_time = current_time + delay if new_arrival_time < arrival_times[neighbor]: arrival_times[neighbor] = new_arrival_time predecessors[neighbor] = current_node heappush(queue, (new_arrival_time, neighbor)) travel_times[neighbor] = new_arrival_time - start_time return arrival_times, predecessors, travel_times