import math
import os
import warnings
import geopandas as gpd
import pandas as pd
from shapely.geometry import Polygon
[docs]
def aggregate_to_grid(gdf: gpd.GeoDataFrame, cell_size: float) -> gpd.GeoDataFrame:
"""
Creates a grid of square cells covering the extent of the input GeoDataFrame,
and keeps cells that contain at least one feature from the source GeoDataFrame.
Parameters
----------
gdf : gpd.GeoDataFrame
The input GeoDataFrame representing the spatial extent and features.
cell_size : float
The size of each square cell in the grid in meters.
Returns
-------
gpd.GeoDataFrame
The resulting grid GeoDataFrame, with cells containing at least
one feature from the source GeoDataFrame, and a 'id' for each cell.
"""
utm_crs = gdf.estimate_utm_crs()
gdf_utm = gdf.to_crs(utm_crs)
minx, miny, maxx, maxy = gdf_utm.total_bounds
nx = math.ceil((maxx - minx) / cell_size)
ny = math.ceil((maxy - miny) / cell_size)
grid_cells = []
grid_indices = []
index = 0 # Initialize a counter for the grid index
for i in range(nx):
for j in range(ny):
x1 = minx + i * cell_size
y1 = miny + j * cell_size
x2 = x1 + cell_size
y2 = y1 + cell_size
cell = Polygon([(x1, y1), (x2, y1), (x2, y2), (x1, y2)])
grid_cells.append(cell)
grid_indices.append(f"grid_{index}") # Add the current index to the list
index += 1 # Increment the index for the next cell
# Create the initial grid GeoDataFrame
grid = gpd.GeoDataFrame({'id': grid_indices, 'geometry': grid_cells}, crs=utm_crs)
# Perform a spatial join between the grid and the original GeoDataFrame
filtered_grid = gpd.sjoin(grid, gdf_utm[['geometry']], how='inner')
# Drop duplicates to ensure each cell is unique, keeping only 'geometry' and 'grid_index'
filtered_grid = filtered_grid[['geometry', 'id']].drop_duplicates(subset=['id'])
filtered_grid.reset_index(drop=True, inplace=True)
return filtered_grid
[docs]
def create_centroids_dataframe(polygon_gdf: gpd.GeoDataFrame) -> gpd.GeoDataFrame:
"""
Creates a GeoDataFrame with the centroids of polygons from the given GeoDataFrame.
Parameters
----------
polygon_gdf : gpd.GeoDataFrame
GeoDataFrame containing polygons.
Returns:
-------
gpd.GeoDataFrame
GeoDataFrame with Point geometries of the centroids.
"""
# Create id column if it doesn't exist
# ID is required for the explicit origin_id column in the final output
if "id" not in polygon_gdf.columns:
polygon_gdf["id"] = polygon_gdf.index
# Create a GeoDataFrame with these centroids
# and include the 'origin_id' from the parent polygon
centroids_gdf = gpd.GeoDataFrame(
polygon_gdf[["id"]].copy(),
geometry=polygon_gdf.to_crs("EPSG:4087").geometry.centroid,
crs="EPSG:4087",
)
centroids_gdf.rename(columns={"id": "origin_id"}, inplace=True)
return centroids_gdf
[docs]
def validate_feed(gtfs_path: str) -> bool:
"""
Validates the GTFS feed located at the specified path.
Parameters
----------
gtfs_path : str
Path to the GTFS dataset directory.
Returns
-------
bool
True if the GTFS feed is valid, False otherwise.
"""
if not os.path.isdir(gtfs_path):
warnings.warn("Invalid GTFS path.")
return False
# List of required GTFS files
required_files = [
"agency.txt", "stops.txt", "routes.txt",
"trips.txt", "stop_times.txt", "calendar.txt"
]
# Check for the existence of required GTFS files
for file in required_files:
if not os.path.isfile(os.path.join(gtfs_path, file)):
warnings.warn(f"Missing required file: {file}")
return False
try:
# Load GTFS files
agency_df = pd.read_csv(os.path.join(gtfs_path, "agency.txt"))
stops_df = pd.read_csv(os.path.join(gtfs_path, "stops.txt"))
routes_df = pd.read_csv(os.path.join(gtfs_path, "routes.txt"))
trips_df = pd.read_csv(os.path.join(gtfs_path, "trips.txt"))
stop_times_df = pd.read_csv(os.path.join(gtfs_path, "stop_times.txt"), low_memory=False)
calendar_df = pd.read_csv(os.path.join(gtfs_path, "calendar.txt"))
critical_errors = False
# Validate agency.txt
if agency_df.empty or 'agency_id' not in agency_df.columns:
print("agency.txt is invalid or missing required 'agency_id' column.")
# Validate stops.txt
if stops_df.empty or 'stop_id' not in stops_df.columns:
print("stops.txt is invalid or missing required 'stop_id' column.")
critical_errors = True
# Validate routes.txt
if routes_df.empty or 'route_id' not in routes_df.columns or 'route_id' not in routes_df.columns:
print("routes.txt is invalid or missing required columns (agency_id, route_id).")
critical_errors = True
if not set(routes_df['agency_id']).issubset(set(agency_df['agency_id'])):
print("Mismatch in agency IDs between routes and agency files.")
critical_errors = True
# Validate trips.txt
if trips_df.empty or 'trip_id' not in trips_df.columns or 'route_id' not in trips_df.columns:
print("trips.txt is invalid or missing required columns.")
critical_errors = True
if not set(trips_df['route_id']).issubset(set(routes_df['route_id'])):
print("Mismatch in route IDs between trips and routes files.")
critical_errors = True
# Validate stop_times.txt
if stop_times_df.empty or 'trip_id' not in stop_times_df.columns or 'stop_id' not in stop_times_df.columns:
print("stop_times.txt is invalid or missing required columns.")
critical_errors = True
if not set(stop_times_df['trip_id']).issubset(set(trips_df['trip_id'])):
print("Mismatch in trip IDs between stop_times and trips files.")
critical_errors = True
if not set(stop_times_df['stop_id']).issubset(set(stops_df['stop_id'])):
print("Mismatch in stop IDs between stop_times and stops files.")
critical_errors = True
# Validate calendar.txt
if calendar_df.empty:
print("calendar.txt is invalid or empty.")
critical_errors = True
# Validate stop_times.txt for blank times and format of times
if 'departure_time' not in stop_times_df.columns or 'arrival_time' not in stop_times_df.columns:
print("stop_times.txt is missing required time columns.")
critical_errors = True
# Check for blank times
if stop_times_df['departure_time'].isnull().any() or stop_times_df['arrival_time'].isnull().any():
print("Blank departure or arrival times found in stop_times.txt.")
# Validate time format (HH:MM:SS)
time_format_regex = r'^(\d{2}):([0-5]\d):([0-5]\d)$' # check for HH:MM:SS format
invalid_departure_times = stop_times_df[~stop_times_df['departure_time'].str.match(time_format_regex)]
invalid_arrival_times = stop_times_df[~stop_times_df['arrival_time'].str.match(time_format_regex)]
if not invalid_departure_times.empty or not invalid_arrival_times.empty:
print("Invalid time format found in departure or arrival times in stop_times.txt.")
print(f"Invalid departure times: {invalid_departure_times['departure_time'].values}")
print(f"Invalid arrival times: {invalid_arrival_times['arrival_time'].values}")
# Additional format and consistency checks= will be added
except Exception as e:
print(f"Error during validation: {e}")
return False
if critical_errors:
print("GTFS feed contains critical errors.")
return False
else:
print("GTFS feed is valid.")
return True
[docs]
def _unpack_path_vertices(path):
"""
This function separates pedestrian segments
of given path into list of lists
"""
pedestrian_path = []
current_sublist = []
# Transit verteces are always float or string (idk why lol)
# while pedestrian verteces (osmid) are integers
for vertex in path:
if isinstance(vertex, int):
current_sublist.append(vertex)
# if vertex is not an integer, it means that it is the end of the current pedestrian segment
# if current_sublist is not empty, push it to the pedestrian_path list
elif current_sublist:
pedestrian_path.append(current_sublist)
current_sublist = []
if current_sublist:
pedestrian_path.append(current_sublist)
return pedestrian_path
[docs]
def _calculate_pedestrian_time(pedestrian_path, graph):
"""
Calculate total impedance (travel time) for pedestrian paths by summing up the edge weights.
"""
impedance = 0
for subpath in pedestrian_path:
for i in range(len(subpath) - 1):
start_node = subpath[i]
end_node = subpath[i+1]
impedance += graph[start_node][end_node]['weight']
return impedance
[docs]
def _reconstruct_path(target, predecessors):
"""
Reconstruct path from predecessors dictionary
"""
path = []
current_node = target
while current_node is not None:
path.insert(0, current_node)
current_node = predecessors.get(current_node)
return path
[docs]
def separate_travel_times(graph, predecessors: dict, travel_times: dict, source) -> pd.DataFrame:
"""
Separate the travel times into transit time and pedestrian time for each node in the graph.
It calculates the pedestrian time by reconstructing the path from the source node
to each destination node and then estimating the time spent walking.
Parameters
----------
graph : networkx.DiGraph
The graph representing the transit network.
predecessors : dict
A dictionary containing the predecessors of each node in the graph.
travel_times : dict
A dictionary containing the travel times for each node in the graph.
source : hashable
The source node from which to calculate the travel times.
Returns
-------
pandas.DataFrame
A DataFrame containing the transit time and pedestrian time for each node.
"""
results = []
for node in graph.nodes(data=True):
if node[0] != source:
path = _reconstruct_path(node[0], predecessors)
pedestrian_path = _unpack_path_vertices(path)
pedestrian_time = _calculate_pedestrian_time(pedestrian_path, graph)
transit_time = travel_times[node[0]] - pedestrian_time
results.append(
{
"node": node[0],
"transit_time": transit_time,
"pedestrian_time": pedestrian_time,
}
)
results = pd.DataFrame(results)
return results