Source code for quetzal.engine.engine

import networkx as nx
import numpy as np
import pandas as pd
import syspy.assignment.raw as assignment_raw
from syspy.distribution import distribution
from syspy.routing.frequency import graph as frequency_graph
from syspy.skims import skims
from syspy.spatial import spatial
from tqdm import tqdm


[docs]def od_volume_from_zones(zones, deterrence_matrix=None, power=2, coordinates_unit='degree', intrazonal=False): """ Use this function to build an Origin -> Destination demand matrix. * a deterrence matrix can be provided, if not: * The euclidean distance matrix will be calculated automatically. * The deterrence matrix will be calculated from the distance matrix (elevated to a given power) * A doubly constrained distribution will be performed in the end, it is based on zones['emission'] and zones['attraction'] examples: :: volumes = engine.od_volume_from_zones(zones, power=2) volumes = engine.od_volume_from_zones(zones, friction_matrix) :param zones: zoning GeoDataFrame, indexed by the zones numeric contains a geometry column named 'geometry', productions and attractions :param deterrence_matrix: Default None. The deterrence matrix used to compute the distribution. If not provided, a deterrence matrix will automatically be computed from the euclidean distance elevated to a given power. :param power: the friction curve used in the gravity model is equal to the euclidean distance to the power of this exponent :param intrazonal: (bool) if True, computes intrazonal volumes using a zone characteristic distance. Default False. :return: volumes, a DataFrame with the following columns ['origin', 'destination', 'volume'] """ if deterrence_matrix is None: euclidean = skims.euclidean(zones, coordinates_unit=coordinates_unit, intrazonal=intrazonal) distance = euclidean.set_index( ['origin', 'destination'])['euclidean_distance'].unstack('destination') # If intrazonal volumes are not wanted, the distance is set to infinity if not intrazonal: # friction the impedance matrix distance.replace(0, 1e9, inplace=True) deterrence_matrix = np.power(distance, -power) deterrence_matrix.replace(float('inf'), 1e9, inplace=True) # gravitaire doublement contraint volume_array = distribution.CalcDoublyConstrained( zones['emission'].values, zones['attraction'].values, deterrence_matrix.values ) volumes = pd.DataFrame( volume_array, index=list(zones.index), columns=list(zones.index) ).stack() volumes = volumes.reset_index() volumes.columns = ['origin', 'destination', 'volume'] return volumes
[docs]def ntlegs_from_centroids_and_nodes( centroids, nodes, short_leg_speed=3, long_leg_speed=15, threshold=500, n_neighbors=5, coordinates_unit='degree' ): """ From a given zoning an a given set of nodes, links the nearest nodes to each centroid of the zoning. example: :: centroids = zones.copy() centroids['geometry'] = centroids['geometry'].apply(lambda g: g.centroid) ntlegs = engine.ntlegs_from_centroids_and_nodes( centroids, nodes, ntleg_speed=3, n_neighbors=6 ) :param centroids: a point GeoDataFrame indexed by a numeric index; :param nodes: a point GeoDataFrame indexed by a numeric index; :param ntleg_speed: speed, as a crow flies, along the ntlegs km/h; :param n_neighbors: number of ntleg of each centroid; :param coordinates_unit: degree or meter :return: ntlegs, a line GeoDataFrame containing the ids of the the centroid and the node, the distance and the proximity rank; :rtype : line GeoDataFrame """ ntlegs = spatial.nearest( centroids, nodes, geometry=True, n_neighbors=n_neighbors ).rename( columns={'ix_one': 'centroid', 'ix_many': 'node'} )[['centroid', 'node', 'rank', 'distance', 'geometry']] access = ntlegs.rename(columns={'centroid': 'a', 'node': 'b'}) eggress = ntlegs.rename(columns={'centroid': 'b', 'node': 'a'}) access['direction'] = 'access' eggress['direction'] = 'eggress' ntlegs = pd.concat([access, eggress], ignore_index=True) if coordinates_unit == 'degree': ntlegs['distance'] = skims.distance_from_geometry(ntlegs['geometry']) elif coordinates_unit == 'meter': ntlegs['distance'] = ntlegs['geometry'].apply(lambda x: x.length) else: raise('Invalid coordinates_unit.') ntlegs['speed_factor'] = np.power(ntlegs['distance'] / threshold, 0.5) ntlegs['short_leg_speed'] = short_leg_speed ntlegs['long_leg_speed'] = long_leg_speed ntlegs['speed'] = ntlegs['short_leg_speed'] * ntlegs['speed_factor'] ntlegs['speed'] = np.minimum(ntlegs['speed'], ntlegs['long_leg_speed']) ntlegs['speed'] = np.maximum(ntlegs['speed'], ntlegs['short_leg_speed']) ntlegs['time'] = ntlegs['distance'] / ntlegs['speed'] / 1000 * 3600 return ntlegs
[docs]def multimodal_graph( links, ntlegs, pole_set, footpaths=None, boarding_cost=300, ntlegs_penalty=1e9, ): """ This is a graph builder and a pathfinder wrapper * Builds a public transport frequency graph from links; * Adds access and eggress to the graph (accegg) using the ntlegs; * Search for shortest paths in the time graph; * Returns an Origin->Destination stack matrix with path and duration; example: :: links = engine.graph_links(links) # links is a LineDraft export path_finder_stack = engine.path_and_duration_from_links_and_ntlegs( links, ntlegs ) :param links: link DataFrame, built with graph_links; :param ntlegs: ntlegs DataFrame built; :param boarding_cost: an artificial cost to add the boarding edges of the frequency graph (seconds); :param ntlegs_penalty: (default 1e9) high time penalty in seconds to ensure ntlegs are used only once for access and once for eggress; :return: Origin->Destination stack matrix with path and duration; """ print(boarding_cost) pole_set = pole_set.intersection(set(ntlegs['a'])) links = links.copy() ntlegs = ntlegs.copy() links['index'] = links.index # to be consistent with frequency_graph nx_graph, _ = frequency_graph.graphs_from_links( links, include_edges=[], include_igraph=False, boarding_cost=boarding_cost ) ntlegs.loc[ntlegs['direction'] == 'access', 'time'] += ntlegs_penalty nx_graph.add_weighted_edges_from(ntlegs[['a', 'b', 'time']].values.tolist()) if footpaths is not None: nx_graph.add_weighted_edges_from( footpaths[['a', 'b', 'time']].values.tolist() ) return nx_graph
[docs]def emission_share_pt(shared, origin): try: df = shared.loc[shared['origin'] == origin] return np.average(df['share_pt'], weights=df['volume']) except Exception: return np.nan
[docs]def attraction_share_pt(shared, destination): try: df = shared.loc[shared['destination'] == destination] return np.average(df['share_pt'], weights=df['volume']) except Exception: return np.nan
[docs]def aggregate_shares(shared, zones): """ Aggregates modal shares by zone. Weights the share by the demand; :param shared: straight output from the modal split; :param zones: zoning GeoDataFrame :return: aggregated_shares, the zoning augmented with the modal shares as columns; """ aggregated_shares = pd.DataFrame( [ (emission_share_pt(shared, i), attraction_share_pt(shared, i)) for i in zones.index ], columns=['pt_share_emission', 'pt_share_attraction'] ) aggregated_shares['geometry'] = zones['geometry'] try: aggregated_shares[ ['emission', 'attraction']] = zones[ ['emission', 'attraction']] except Exception: pass return aggregated_shares