import numpy as np
import pandas as pd
from quetzal.engine import connectivity, engine, gps_tracks
from quetzal.engine.add_network import NetworkCaster
from quetzal.engine.add_network_mapmatching import NetworkCaster_MapMaptching
from quetzal.model import cubemodel, model
from syspy.renumber import renumber
from syspy.skims import skims
from tqdm import tqdm
import networkx as nx
import warnings
[docs]def read_hdf(filepath):
m = PreparationModel(hdf_database=filepath)
return m
[docs]def read_json(folder, **kwargs):
m = PreparationModel()
m.read_json(folder, **kwargs)
return m
track_args = model.track_args
log = model.log
[docs]class PreparationModel(model.Model, cubemodel.cubeModel):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
"""This class contains fonctions that can be applied to models
for preparation of :
- connectors (zone to transit, to road)
- pedestrian footpaths
"""
[docs] @track_args
def preparation_ntlegs(
self,
short_leg_speed=2,
long_leg_speed=10,
threshold=1000,
n_ntlegs=5,
max_ntleg_length=5000,
zone_to_transit=True,
zone_to_road=False,
prefix=False
):
"""Builds the centroids and the non-transit links/legs (ntlegs), also known as connectors.
Pameters short_leg_speed and long_leg_speed allow to model diferent types of access to the network (PT/private):
for short connectors, the short_leg_speed is used - it represents a walk speed. For long connectors,
which will occur for large zones at the edge of the study area, we may want to consider that the access
to the network is made by car/taxi, and hence at a larger speed, the long_leg_speed. Function integrates a
curve to smoothly go from short_leg_speed to long_leg_speed (can be understood as probability to access by foot or car).
Requires
----------
self.nodes
self.zones
Parameters
----------
short_leg_speed : int, optional, default 2
Speed of the short legs, in km/h
long_leg_speed : int, optional, default 10
Speed of the short legs, in km/h
threshold : int, optional, default 1000
Threshold for the definition of the short and long legs
n_ntlegs : int, optional, default 5
Number of ntlegs to create per zone (and per type)
max_ntleg_length : int, optional, default 5000
maximal length of the ntlegs, in m
zone_to_transit : bool, optional, default True
True to create links between zones and transit stops (nodes)
zone_to_road : bool, optional, default False
True to create links between zones and road_nodes, and between road_nodes and nodes
prefix : bool, optional, default False
If True, add prefixes to the index of the ntlegs
ztt_ (zone_to_transit), ztr_ (zone_to_road), rtt_ (road_to_transit)
Builds
----------
self.centroids
self.zone_to_transit
self.zone_to_road
self.road_to_transit
Examples
--------
::
sm.step_ntlegs(
n_ntlegs=5,
walk_speed=2,
short_leg_speed=3,
long_leg_speed=15,
threshold=500,
max_ntleg_length=5000
)
"""
self.centroids = self.zones.copy()
self.centroids['geometry'] = self.centroids['geometry'].apply(
lambda g: g.centroid)
length = max_ntleg_length
if zone_to_transit:
self.integrity_test_collision(sets=('nodes', 'zones'))
ntlegs = engine.ntlegs_from_centroids_and_nodes(
self.centroids,
self.nodes,
short_leg_speed=short_leg_speed,
long_leg_speed=long_leg_speed,
threshold=threshold,
n_neighbors=n_ntlegs,
coordinates_unit=self.coordinates_unit
)
ntlegs['walk_time'] = ntlegs['time']
self.zone_to_transit = ntlegs.loc[ntlegs['distance'] < length].copy()
if prefix:
self.zone_to_transit.index = 'ztt_' + pd.Series(self.zone_to_transit.index).astype(str)
else:
warnings.warn(("zone_to_transit indexes does not have prefixes. This may cause collisions."
"Consider using the option prefix=True. Prefixes will be added by default in"
"a future update"), FutureWarning)
if zone_to_road:
self.integrity_test_collision(sets=('road_nodes', 'zones'))
ntlegs = engine.ntlegs_from_centroids_and_nodes(
self.centroids,
self.road_nodes,
short_leg_speed=short_leg_speed,
long_leg_speed=long_leg_speed,
threshold=threshold,
n_neighbors=n_ntlegs,
coordinates_unit=self.coordinates_unit
)
ntlegs['walk_time'] = ntlegs['time']
self.zone_to_road = ntlegs.loc[ntlegs['distance'] < length].copy()
if prefix:
self.zone_to_road.index = 'ztr_' + pd.Series(self.zone_to_road.index).astype(str)
else:
warnings.warn(("zone_to_road indexes does not have prefixes. This may cause collisions."
"Consider using the option prefix=True. Prefixes will be added by default in"
"a future update"), FutureWarning)
ntlegs = engine.ntlegs_from_centroids_and_nodes(
self.nodes,
self.road_nodes,
short_leg_speed=short_leg_speed,
long_leg_speed=long_leg_speed,
threshold=threshold,
n_neighbors=n_ntlegs,
coordinates_unit=self.coordinates_unit
)
ntlegs['walk_time'] = ntlegs['time']
self.road_to_transit = ntlegs.loc[ntlegs['distance'] < length].copy()
if prefix:
self.road_to_transit.index = 'rtt_' + pd.Series(self.road_to_transit.index).astype(str)
else:
warnings.warn(("road_to_transit indexes does not have prefixes. This may cause collisions."
"Consider using the option prefix=True. Prefixes will be added by default in"
"a future update"), FutureWarning)
[docs] def preparation_drop_redundant_zone_to_transit(self):
"""Keeps only relevant zone_to_transit connectors : verifies if the nodes linked to the zones
exist in the self.links, and drops redundants
Requires
----------
self.links
self.zone_to_transit
self.zones
Builds
----------
self.zone_to_transit :
update zone_to_transit dataframe
"""
self.zone_to_transit.sort_values('time', inplace=True)
trips = self.links.groupby('a')['trip_id'].agg(set)
df = self.zone_to_transit
keep = []
# access
zones = set(df.loc[df['direction'] == 'access']['a'])
for zone in tqdm(set(zones)):
ztt = self.zone_to_transit.loc[self.zone_to_transit['a'] == zone]
if len(ztt):
ztt = ztt[['b']].reset_index()
ztt['trips'] = [trips.get(n, set()) for n in ztt['b']]
n = 1
t = set()
while len(ztt) > 0:
ztt['trips'] = [trips - t for trips in ztt['trips']]
ztt['n_trips'] = [len(trips) for trips in ztt['trips']]
index, t, n = ztt.iloc[0][['index', 'trips', 'n_trips']].values
ztt = ztt.loc[ztt['n_trips'] > 0]
keep.append(index)
# egress
zones = set(df.loc[df['direction'] != 'access']['b'])
for zone in tqdm(set(zones)):
ztt = self.zone_to_transit.loc[self.zone_to_transit['b'] == zone]
if len(ztt):
ztt = ztt[['a']].reset_index()
ztt['trips'] = [trips.get(n, set()) for n in ztt['a']]
n = 1
t = set()
while n > 0:
ztt['trips'] = [trips - t for trips in ztt['trips']]
ztt['n_trips'] = [len(trips) for trips in ztt['trips']]
ztt.sort_values('n_trips', ascending=False, inplace=True)
index, t, n = ztt.iloc[0][['index', 'trips', 'n_trips']].values
ztt = ztt.loc[ztt['n_trips'] > 0]
keep.append(index)
self.zone_to_transit = self.zone_to_transit.loc[list(set(keep))]
[docs] def preparation_drop_redundant_zone_to_road(self, access_time='time', log=False):
"""Reduce number of zone_to_road connectors to optimize computation by performing
a shortest path algorithm in the graph made of road_links and zone_to_road.
Requires
----------
self.road_links
self.zone_to_road
Parameters
----------
access_time : str, optional, default 'time'
Time column in zone_to_road
log : bool, optional, default False
If true, returns the old and new numbers of zone_to_road
Builds
----------
self.zone_to_road :
update zone_to_road dataframe
"""
a = len(self.zone_to_road)
g = nx.DiGraph()
g.add_weighted_edges_from(self.road_links[['a','b', 'walk_time']].values)
g.add_weighted_edges_from(self.zone_to_road[['a','b', access_time]].values)
self.zone_to_road['dijkstra_time'] = [
nx.dijkstra_path_length(g, a, b) for a, b in self.zone_to_road[['a', 'b']].values
]
self.zone_to_road = self.zone_to_road.loc[self.zone_to_road[access_time] <= self.zone_to_road['dijkstra_time']]
self.zone_to_road.drop('dijkstra_time', axis=1, inplace=True)
if log:
print('reduced number of zone_to_road from', a, 'to', len(self.zone_to_road))
[docs] def preparation_drop_redundant_road_to_transit(self, access_time='time', log=False):
"""Reduce number of road_to_transit connectors to optimize computation by performing
a shortest path algorithm in the graph made of road_links and road_to_transit.
Requires
----------
self.road_links
self.road_to_transit
Parameters
----------
access_time : str, optional, default 'time'
Time column in road_to_transit
log : bool, optional, default False
If true, returns the old and new numbers of road_to_transit
Builds
----------
self.road_to_transit :
update road_to_transit dataframe
"""
a = len(self.road_to_transit)
g = nx.DiGraph()
g.add_weighted_edges_from(self.road_links[['a','b', 'walk_time']].values)
g.add_weighted_edges_from(self.road_to_transit[['a','b', access_time]].values)
self.road_to_transit['dijkstra_time'] = [
nx.dijkstra_path_length(g, a, b) for a, b in self.road_to_transit[['a', 'b']].values
]
self.road_to_transit = self.road_to_transit.loc[self.road_to_transit[access_time] <= self.road_to_transit['dijkstra_time']]
self.road_to_transit.drop('dijkstra_time', axis=1, inplace=True)
if log:
print('reduced number of road_to_transit from', a, 'to', len(self.road_to_transit))
[docs] @track_args
def preparation_cast_network(
self,
nearest_method='nodes',
weight='length',
penalty_factor=1,
speed=3,
replace_nodes=False,
dumb_cast=False,
**nc_kwargs
):
"""Finds a path for the transport lines in the actual road network,
to know on which roads a bus line is going, because the public transport links are defined
between two stops, without road information.
It will evaluate the best combination of nodes in the road network between the two stops.
The evaluation is done with the distance.
The results will be found as a list of road_links for each public transport link.
If the transport network has modes on dedicated infrastructure (train, metro...), create two submodels
and use a dumbcast on the dedicated infrastructure modes.
Requires
----------
self.nodes
self.links
self.road_nodes
self.road_links
Parameters
----------
nearest_method : ['nodes'|'links'], optional, default 'nodes'
Options are:
'nodes' --(default) looks for the actual nearest node in road_nodes.
'links' --looks for the nearest link to a stop in road_links and links the stop to its end_node (b)
weight : str, optional, default 'length'
Column of road_links containing road_links length
penalty_factor : int, optional, default 1
Multiplicative penality of weight
speed : int, optional, default 3
Walk speed
replace_nodes : bool, optional, default False
If True replaces nodes by road_nodes. If False, model will use road_to_transit ntlegs
dumb_cast : bool, optional, default False
If True, the network is casted on himself (cast links on links and not on road_links).
It will still return the closest road_node for the stops.
nodes_checkpoints :
mandatory transit nodes
Builds
----------
self.links :
add columns road_a, road_b, road_node_list, road_link_list, road_length
"""
if dumb_cast:
nc = NetworkCaster(
self.nodes,
self.links,
self.road_nodes
)
nc.dumb_cast()
else:
try:
self.road_links[weight] + 1
except TypeError:
raise TypeError(str(weight) + ' should be an int or a float')
nc = NetworkCaster(
self.nodes,
self.links,
self.road_nodes,
self.road_links,
weight=weight
)
nc.build(
nearest_method=nearest_method,
penalty_factor=penalty_factor,
coordinates_unit=self.coordinates_unit,
geometry=True,
**nc_kwargs
)
self.networkcaster = nc
self.links = nc.links
if not dumb_cast:
self.networkcaster_neighbors = nc.neighbors.reset_index(drop=True)
self.networkcaster_road_access = nc.road_access.reset_index(drop=True)
if replace_nodes:
self.links[['a', 'b']] = self.links[['road_a', 'road_b']]
self.links = self.links.loc[self.links['a'] != self.links['b']]
self.nodes = self.road_nodes.loc[list(self.link_nodeset())].copy()
self.road_to_transit = None
# if we do not replace the nodes by the road_nodes,
# we have to provide road to transit legs...
elif not dumb_cast:
rc = nc.road_access['geometry'].reset_index()
if self.coordinates_unit == 'degree':
rc['length'] = skims.distance_from_geometry(rc['geometry'])
elif self.coordinates_unit == 'meter':
rc['length'] = rc['geometry'].apply(lambda x: x.length)
else:
raise('Invalid coordinates_unit.')
rc['time'] = (rc['length'] / 1000 / speed * 3600)
to_road = rc.rename(columns={'node': 'a', 'road_node': 'b'})
from_road = rc.rename(columns={'node': 'b', 'road_node': 'a'})
to_road['direction'] = 'to_road'
from_road['direction'] = 'from_road'
concatenated = pd.concat([from_road, to_road])
self.road_to_transit = concatenated.reindex().reset_index(drop=True)
[docs] @track_args
def preparation_map_matching(self,
routing=True,
n_neighbors_centroid=100,
n_neighbors=10,
distance_max=5000,
by='trip_id',
sequence = 'link_sequence'):
"""Mapmatch each trip_id in self.links to the road_network (self.road_links)
Parameters
----------
routing : bool, optional
_description_, by default True
n_neighbors_centroid : int, optional
_description_, by default 100
n_neighbors : int, optional
_description_, by default 10
distance_max : int, optional
_description_, by default 5000
by : str, optional
_description_, by default 'trip_id'
sequence : str, optional
_description_, by default 'link_sequence'
Builds
----------
"""
ncm = NetworkCaster_MapMaptching(self.nodes, self.road_links, self.links, by, sequence)
matched_links, links_mat, unmatched_trip = ncm.Multi_Mapmatching(routing=routing,
n_neighbors_centroid=n_neighbors_centroid,
n_neighbors=n_neighbors,
distance_max=distance_max,
by=by)
matched_links['road_id_a'] = matched_links['road_id_a'].apply(lambda x: ncm.links_index_dict.get(x))
matched_links['road_id_b'] = matched_links['road_id_b'].apply(lambda x: ncm.links_index_dict.get(x))
road_a_dict = matched_links['road_id_a'].to_dict()
road_b_dict = matched_links['road_id_b'].to_dict()
length_dict = matched_links['length'].to_dict()
self.links['road_a'] = self.links.index.map(road_a_dict.get)
self.links['road_b'] = self.links.index.map(road_b_dict.get)
self.links['length'] = self.links.index.map(length_dict.get)
self.links = self.links.merge(links_mat[['road_node_list', 'road_link_list']], left_index=True, right_index=True, how='left')
[docs] @track_args
def preparation_logit(
self,
mode=1,
pt_mode=1,
pt_path=1,
segments=[],
time=-1,
price=-1,
transfers=-1,
time_shift=None,
route_types=None
):
"""Builds the necessary tables to perform analysis_mode_utility and step_logit.
They contain the parameters of the nested logit.
For the neste logit we should have 1 >= mode >= pt_mode >= pt_path > 0.
If the three of them are equal to 1 the nested logit will be equivalent to a flat logit.
Does not require specific attributes in self.
Parameters
----------
mode : int, optional, default 1
phi parameter used in the logit choice between modes
pt_mode : int, optional, default 1
phi parameter used in the logit choice between pt modes
pt_path : int, optional, default 1
phi parameter used in the logit choice between pt paths
segments : list, optional, default []
Demand segments we want to use in the logit
time : int, optional, default -1
number of utility units by seconds
price : int, optional, default -1
number of utility units by currency unit
transfers : int, optional, default -1
number of utility units by transfer
time_shift : int, optional, default None
Used with timetable (time expanded) models. Number of utility units by time_shift
Builds
----------
self.mode_utility :
Modal constants, per mode and per segment
self.mode_nests :
Structure of the nested logit per segment
self.logit_scales :
Scales of the nested logit per segment (parameters phi)
self.utility_values
Utility values of time, price, transfers and time_shift per segment
"""
# TODO : move to preparation
# utility values
if time_shift is None:
self.utility_values = pd.DataFrame(
{'root': pd.Series({'time': time, 'price': price, 'ntransfers': transfers, 'mode_utility': 1})}
)
else:
self.utility_values = pd.DataFrame(
{'root': pd.Series({'time': time, 'price': price, 'ntransfers': transfers, 'mode_utility': 1, 'time_shift': time_shift})}
)
self.utility_values.index.name = 'value'
self.utility_values.columns.name = 'segment'
if route_types is None:
link_rt = set(self.links['route_type'].unique())
route_types = link_rt.union({'car', 'walk', 'root'})
# mode_utility
self.mode_utility = pd.DataFrame(
{'root': pd.Series({rt: 0 for rt in route_types})}
)
self.mode_utility.index.name = 'route_type'
self.mode_utility.columns.name = 'segment'
# mode nests
self.mode_nests = pd.DataFrame(
{'root': pd.Series({rt: 'pt' for rt in route_types})}
)
self.mode_nests.loc['pt', 'root'] = 'root'
self.mode_nests.loc[['car', 'walk'], 'root'] = 'root'
self.mode_nests.loc[['root'], 'root'] = np.nan
self.mode_nests.index.name = 'route_type'
self.mode_nests.columns.name = 'segment'
# logit_scales
self.logit_scales = self.mode_nests.copy()
self.logit_scales['root'] = pt_path
self.logit_scales.loc[['car', 'walk'], 'root'] = 0
self.logit_scales.loc[['pt'], 'root'] = pt_mode
self.logit_scales.loc[['root'], 'root'] = mode
for segment in segments:
for df in (self.mode_utility, self.mode_nests, self.logit_scales, self.utility_values):
df[segment] = df['root']
[docs] @track_args
def preparation_clusterize_zones(
self, max_zones=500, cluster_column=None,
is_od_stack=False, **kwargs
):
"""Clusterize zones to optimize computation time.
Requires
----------
self.zones
self.volumes
Parameters
----------
max_zones : int, optional, default 500
_description_, by
cluster_column : string, optional, default None
cluster column in self.zones if clusters are already defined
is_od_stack : bool, optional, default False
If True, requires table od_stack
Builds
----------
self.zones
self.volumes
self.cluster_series
"""
zones = self.zones
zones['geometry'] = zones['geometry'].apply(lambda g: g.buffer(1e-9))
self.micro_zones = zones.copy()
self.micro_volumes = self.volumes.copy()
if is_od_stack:
self.micro_od_stack = self.od_stack.copy()
self.zones, self.volumes, self.cluster_series, self.od_stack = renumber.renumber_quetzal(
self.micro_zones,
self.micro_volumes,
self.micro_od_stack,
max_zones,
cluster_column,
**kwargs
)
else:
self.zones, self.volumes, self.cluster_series = renumber.renumber(
self.micro_zones,
self.micro_volumes,
max_zones,
cluster_column,
**kwargs
)
[docs] @track_args
def preparation_clusterize_nodes(self, n_clusters=None, adaptive_clustering=False, **kwargs):
"""Create nodes clusters to optimize computation time.
It will agregate nodes based on their relative distance to build "stop areas"
Requires
----------
self.nodes
Parameters
----------
n_clusters : int, optional, default None
Number of nodes clusters
adaptive_clustering : bool, optional, default False
If True, will define itself the number of clusters.
If False n_clusters must be defined
Builds
----------
self.links :
contain recomputed links with the clusterized nodes
self.nodes :
contain the clusterized nodes
self.disaggregated_nodes :
contain the former nodes
"""
self.disaggregated_nodes = self.nodes.copy()
if not adaptive_clustering:
assert n_clusters is not None, 'n_clusters must be defined if adaptive_clustering is False'
if len(self.nodes) <= n_clusters:
return
if adaptive_clustering:
if 'clustering_zones' not in self.__dict__.keys():
self.clustering_zones = self.zones.copy()
self.nodes = connectivity.adaptive_clustering(self.nodes, self.clustering_zones, **kwargs)
self.links, self.nodes, self.node_clusters, self.node_parenthood = connectivity.node_clustering(
self.links, self.nodes, n_clusters, group_id='adaptive_cluster_id'
)
else:
self.links, self.nodes, self.node_clusters, self.node_parenthood = connectivity.node_clustering(
self.links, self.nodes, n_clusters, **kwargs
)
self.node_parenthood = self.node_parenthood[['cluster', 'geometry']]
self.node_clusters['geometry'] = self.node_clusters[
'geometry'
].apply(lambda g: g.buffer(1e-9))
[docs] def preparation_map_tracks(
self,
agg={'speed': lambda s: s.mean() * 3.6},
buffer=50,
smoothing_span=100,
*args, **kwargs
):
"""Grand mystère
Requires
----------
self.nodes
Parameters
----------
agg : dict, optional
_description_, by default {'speed': lambda s: s.mean() * 3.6}
buffer : int, optional
_description_, by default 50
smoothing_span : int, optional
_description_, by default 100
Builds
----------
"""
# agg = ['mean', 'min', 'max', 'std', list] for extensive description of speeds
to_concat = []
iterator = tqdm(self.track_points['trip_id'].unique())
for trip_id in iterator:
iterator.desc = str(trip_id)
points = self.track_points.loc[self.track_points['trip_id'] == trip_id]
times = gps_tracks.get_times(
points,
road_links=self.road_links,
buffer=buffer,
road_nodes=self.road_nodes,
smoothing_span=smoothing_span
)
times['trip_id'] = trip_id
to_concat.append(times)
# INDEX
self.road_links.drop(['index'], axis=1, errors='ignore', inplace=True)
indexed = self.road_links.reset_index().set_index(['a', 'b'])['index'].to_dict()
concatenated = pd.concat(to_concat)
concatenated['road_link'] = concatenated.apply(lambda r: indexed[(r['a'], r['b'])], axis=1)
aggregated = concatenated.groupby(['road_link']).agg(agg)
for c in aggregated.columns:
self.road_links[c] = aggregated[c]
self.track_links = concatenated.drop(['a', 'b'], axis=1)