Source code for quetzal.model.transportmodel

import networkx as nx
import numpy as np
import pandas as pd
from quetzal.analysis import analysis
from quetzal.engine import engine, nested_logit
from quetzal.engine.park_and_ride_pathfinder import ParkRidePathFinder
from quetzal.engine.pathfinder import PublicPathFinder
from quetzal.engine.road_pathfinder import RoadPathFinder
from quetzal.model import model, optimalmodel, parkridemodel
from syspy.assignment import raw as raw_assignment
from syspy.assignment.raw import fast_assign as assign
from syspy.skims import skims
from tqdm import tqdm


[docs]def read_hdf(filepath): """Read HDF format Parameters ---------- filepath : string Returns ------- _type_ _description_ """ m = TransportModel() m.read_hdf(filepath) return m
[docs]def read_json(folder, **kwargs): """Read json format Parameters ---------- folder : string Returns ------- _type_ _description_ """ m = TransportModel() m.read_json(folder, **kwargs) return m
track_args = model.track_args log = model.log
[docs]class TransportModel(optimalmodel.OptimalModel, parkridemodel.ParkRideModel):
[docs] @track_args def step_distribution( self, segmented=False, deterrence_matrix=None, **od_volume_from_zones_kwargs ): """Function performing distribution of flows with doubly constrained algorithm, based on an impedance/deterrence matrix inversely proportional to the accessibility between two zones. Requires ---------- self.zones Parameters ---------- segmented : bool, optional if True: all parameters must be given in dict {segment: param}, by default False deterrence_matrix : unstacked dataframe, optional an OD unstaked dataframe representing the disincentive to travel as distance/time/cost increases, by default None od_volume_from_zones_kwargs : if the friction matrix is not provided, it will be automatically computed using a gravity distribution with - power: int, thee gravity exponent - intrazonal: (bool) if False set the intrazonal distance to 0, else compute a characteristic distance Builds ---------- self.volumes : """ if segmented: self.volumes = pd.DataFrame(columns=['origin', 'destination']) kwargs = od_volume_from_zones_kwargs.copy() if 'deterrence_matrix' not in kwargs.keys(): kwargs['deterrence_matrix'] = deterrence_matrix if deterrence_matrix is not None else {} if 'power' not in kwargs.keys(): kwargs['power'] = {} if 'intrazonal' not in kwargs.keys(): kwargs['intrazonal'] = {} for segment in self.segments: print(segment) cols = ['geometry', (segment, 'emission'), (segment, 'attraction')] if 'area' in self.zones: cols += ['area'] segment_zones = self.zones[cols].rename( columns={ (segment, 'emission'): 'emission', (segment, 'attraction'): 'attraction' } ) segment_volumes = engine.od_volume_from_zones( segment_zones, deterrence_matrix=kwargs['deterrence_matrix'].get(segment, None), coordinates_unit=self.coordinates_unit, power=kwargs['power'].get(segment, 2), intrazonal=kwargs['intrazonal'].get(segment, False) ) segment_volumes.rename(columns={'volume': segment}, inplace=True) self.volumes = self.volumes.merge( segment_volumes, on=['origin', 'destination'], how='outer' ) self.volumes['all'] = self.volumes[self.segments].T.sum() else: self.volumes = engine.od_volume_from_zones( self.zones, deterrence_matrix, coordinates_unit=self.coordinates_unit, **od_volume_from_zones_kwargs )
[docs] @track_args def step_road_pathfinder(self,method='bfw', maxiters=1, *args, **kwargs): """Performs road assignment with or without capacity constraint, depending on the method used Requires ---------- self.road_links self.zone_to_road self.volumes Parameters ---------- method : ['bfw'|'fw'|'msa'|'aon'], optional Which method to use for pathfinder. Options are: 'bfw' --(default) 'fw' -- 'msa' -- 'aon' -- all or nothing : shortest path pathfinder maxiters : integer, optional, default 10 Maxiters=1 will perform 'shortest path' pathfinder tolerance : float, optional, default 0.01 stop condition for RelGap, in percent volume_column : string, optional, default 'volume_car' column of self.volumes to use for volume ntleg_penalty : float, optional, default 1e9 ntleg penality for access time access_time : string, optional, default 'time' column for time in zone_to_road for access time od_set : dict, optional, default None set of od to use - may be used to reduce computation time for example, the od_set is the set of od for which there is a volume in self.volumes num_cores : integer, optional, default 1 for parallelization log : log data on each iteration (default False) vdf : dict, optional dict of function for the jam time : {'default_bpr': default_bpr,'limited_bpr':limited_bpr, 'free_flow': free_flow} beta : list, optional, default None give constant value for BFW betas. ex: [0.7,0.2,0.1]. Builds ---------- self.car_los : create tables of car levels of services self.road_links : add columns flow (volume on the road links) and jam_time (time of the link with congestion) for each OD pair results of pathfinder with/without capacity restriction """ roadpathfinder = RoadPathFinder(self) method = method.lower() if 'all_or_nothing' in kwargs: kwargs.pop('all_or_nothing', None) method = 'aon' print(" 'all_or_nothing'=True is deprecated. use method = 'aon' instead") if method in ['msa','fw','bfw','aon']: roadpathfinder.msa(method=method, maxiters=maxiters, *args, **kwargs) self.car_los = roadpathfinder.car_los if method != 'aon': # do not overwrite road_links if its all-or-nothing self.road_links = roadpathfinder.road_links self.relgap = roadpathfinder.relgap else: print(method,' not supported. use msa, fw, bfw or aon') self.car_los['origin'] == self.car_los['destination']
[docs] @track_args def step_pr_pathfinder( self, force=False, path_analysis=True, **kwargs ): """Park and Ride pathfinder algorithm : shortest path algorithm on the graph built from links (public transport routes) and road_links considered as "public transport access links" (with car speed). Requires ---------- self.zones self.links self.footpaths self.zone_to_road self.zone_to_transit self.transit_to_zone self.road_to_transit self.road_links Parameters ---------- force : bool, optional, default False If True, will NOT perform integrity_test_collision on the 'nodes', 'links', 'zones','road_nodes', 'road_links'. path_analysis : bool, optional, default True Performs paths analysis, adds columns 'all_walk' and 'ntransfers' to the output pt_los od_set : dict, optional, default None set of od to use - may be used to reduce computation time (default None) for example, the od_set is the set of od for which there is a volume in self.volumes cutoff : default np.inf description Builds ---------- self.pr_los """ if not force: sets = ['nodes', 'links', 'zones', 'road_nodes', 'road_links'] self.integrity_test_collision(sets) self.links = engine.graph_links(self.links) parkridepathfinder = ParkRidePathFinder(self) parkridepathfinder.find_best_path(**kwargs) self.pr_los = parkridepathfinder.paths if path_analysis: analysis_nodes = pd.concat([self.nodes, self.road_nodes]) analysis_links = pd.concat([self.links, self.road_links]) self.pr_los = analysis.path_analysis_od_matrix( od_matrix=self.pr_los, links=self.links, nodes=analysis_nodes, centroids=self.centroids, ) # analyse non vérifiée, prise directement depuis pt_los
[docs] @track_args def step_pt_pathfinder( self, broken_routes=True, broken_modes=True, route_column='route_id', mode_column='route_type', boarding_time=None, speedup=False, walk_on_road=False, # keep_graph=False, keep_pathfinder=False, force=False, path_analysis=True, **kwargs ): """Performs public transport pathfinder. With : - all or nothing Diskjstra algorithm if broken_routes=False AND broken_modes=False - Prunning algorithm if broken_routes=True OR/AND broken_modes=True For optimal strategy pathfinder, use step_pt_pathfinder of the class OptimalModel For connection scan pathfinder algorithm (with time tables), use step_pt_pathfinder of the class ConnectionScanModel. Requires ---------- self.zones self.links self.footpaths self.zone_to_road self.zone_to_transit Parameters ---------- broken_routes : bool, optional, default True If True, will perform the route breaker of the pathfinder prunning algorithm with the different routes found in the route_column broken_modes : bool, optional, default True If True, will perform the mode breaker of the pathfinder prunning algorithm with the different modes found in the mode_column route_column : str, optional, default 'route_id' columns of the self.links containing the routes identifier (prunning algorithm) mode_column : str, optional, default 'route_type' columns of the self.links containing the modes identifier (prunning algorithm) boarding_time : float, optional, default None aditional boarding time alighting_time : float, optional, default None aditional alighting time speedup : bool, optional, default False Speed up the computation time, by walk_on_road : bool, optional, default False If True, will consider using the road network and zone_to_road for pedestrian paths. Warning : it will only compare those paths using the raod network with the paths using pedestrian links (footpaths, zone_to_transit) Force the use of road network for pedestrian paths by NOT defining footpaths and zone_to_transit keep_pathfinder : bool, optional, default False If True, keeps all computation steps of the pathfinder Use to performed advanced route and mode breaker (without the need to create submodel for route_breaker, for example) force : bool, optional, default False If True, will NOT perform integrity_test_collision on the 'nodes', 'links', 'zones','road_nodes', 'road_links'. path_analysis : bool, optional, default True Performs paths analysis, adds columns 'all_walk' and 'ntransfers' to the output pt_los Builds ---------- self.pt_los : """ sets = ['nodes', 'links', 'zones'] if walk_on_road: sets += ['road_nodes', 'road_links'] if not force: self.integrity_test_collision(sets) self.links = engine.graph_links(self.links) publicpathfinder = PublicPathFinder(self, walk_on_road=walk_on_road) publicpathfinder.find_best_paths( broken_routes=broken_routes, broken_modes=broken_modes, route_column=route_column, mode_column=mode_column, speedup=speedup, boarding_time=boarding_time, **kwargs ) # if keep_graph: # self.nx_graph=publicpathfinder.nx_graph if keep_pathfinder: self.publicpathfinder = publicpathfinder self.pt_los = publicpathfinder.paths analysis_nodes = pd.concat([self.nodes, self.road_nodes]) if walk_on_road else self.nodes if path_analysis: self.pt_los = analysis.path_analysis_od_matrix( od_matrix=self.pt_los, links=self.links, nodes=analysis_nodes, centroids=self.centroids, )
[docs] @track_args def step_modal_split(self, build_od_stack=True, **modal_split_kwargs): """Performs modal split. Use only for simple models with the modes Public Transport and Car, with few details : only based on duration and modal penalties. Based on modes demand and levels of services, it returns the volume by mode. Does not include price. For modal split, prefer the use of function step_logit * Utility(car) = alpha_car * 'duration_car' + beta_car * Utility(pt) = 'duration_pt' Requires ---------- self.volumes : all mode origin->destination demand matrix self.los : levels of service. An od stack matrix with 'duration_pt' and 'duration_car' Parameters ---------- build_od_stack : bool, optional _description_, by default True time_scale : float time scale of the logistic regression that compares utilities. Defines selectiveness. Defined as 1/(utility value of time, in seconds) alpha_car : float multiplicative penalty on 'duration_car' for the calculation of 'utility_car' beta_car : float additive penalty on 'duration_car' for the calculation of 'utility_car' Builds ---------- self.od_stack : self.shared : Examples -------- :: los = pd.merge( car_los, pt_los, on=['origin', 'destination'], suffixes=['_car', '_pt'] ) sm.step_modal_split( time_scale=1/1800, alpha_car=2, beta_car=600 ) """ shared = engine.modal_split_from_volumes_and_los( self.volumes, self.los, **modal_split_kwargs ) # shared['distance_car'] = shared['distance'] if build_od_stack: self.od_stack = analysis.volume_analysis_od_matrix(shared) self.shared = shared
[docs] def compute_los_volume(self, time_expanded=False, keep_segments=True): """Compute volumes in the level of services table from volumes and probabilities computed in the step_logit. Requires ---------- self.los : concatenation of levels of services of the different modes (usually pt_los and car_los) self.volumes Parameters ---------- time_expanded : bool, optional, default False Use True for models using timetables and ConnectionScan models keep_segments : bool, optional, default True True to use model segments - compute volumes per path per segment Builds ---------- self.los : add volume column """ los = self.los if not time_expanded else self.te_los segments = self.segments probabilities = [(segment, 'probability') for segment in segments] shared_cols = list(set(self.volumes.columns).intersection(set(los.columns))) on = [col for col in shared_cols if col in ['origin', 'destination', 'wished_departure_time']] left = los[on + probabilities] left['index'] = left.index df = pd.merge(left, self.volumes, on=on).set_index('index') df = df.reindex(los.index) values = df[probabilities].values * df[segments].values i = 0 for segment in segments: los[segment] = values.T[i] i += 1 los['volume'] = np.nansum(values, axis=1) if time_expanded: los_volumes = self.te_los.groupby('path_id')[['volume'] + segments].sum() path_id_list = list(self.los['path_id']) volume_values = los_volumes.reindex(path_id_list).fillna(0).values for c in los_volumes.columns: self.los[c] = np.nan # create_columns self.los.loc[:, los_volumes.columns] = volume_values
[docs] def step_assignment( self, road=False, boardings=False, boarding_links=False, alightings=False, alighting_links=False, transfers=False, segmented=False, time_expanded=False, compute_los_volume=True ): """Performs assignment : compute the volumes on the links of the public transport network, and the boardings and alightings on the nodes of the PT network. Requires ---------- self.los : concatenation of levels of services of the different modes (usually pt_los and car_los) self.volumes Parameters ---------- road : bool, optional, default False Assign car volume. If road_link_list exists (columns of self.links computed with preparation_cast_network function) Add public transport volume on road_links. Requires boardings : bool, optional, default False If True, compute boardings to add to the nodes dataframe boarding_links : bool, optional, default False If True, compute boardings to add to the links dataframe alightings : bool, optional, default False If True, compute alightings to add to the nodes dataframe alighting_links : bool, optional, default False If True, compute alightings to add to the links dataframe transfers : bool, optional, default False If True, compute number of transfers to add to the nodes dataframe segmented : bool, optional, default False If True, use model segments - compute volumes on the links per segment time_expanded : bool, optional, default False Use True for models using timetables and ConnectionScan models compute_los_volume : bool, optional, default True True to add column volumes in los dataframe Builds ---------- self.los : add volume column if compute_los_volume=True self.links : add volumes on public transport links, can add boardings, alightings, can be per segment, depending on parameters self.nodes : Depending on parameters, add boardings, alightings, transfers self.road_links : add public transport volumes on road_links if road=True """ if compute_los_volume: self.compute_los_volume(time_expanded=time_expanded) los = self.los.copy() column = 'link_path' l = los.dropna(subset=[column]) l = l.loc[l['volume'] > 0] self.links['volume'] = assign(l['volume'], l[column]) if road: self.road_links[('volume', 'car')] = assign(l['volume'], l[column]) if 'road_link_list' in self.links.columns: to_assign = self.links.dropna(subset=['volume', 'road_link_list']) self.road_links[('volume', 'pt')] = assign( to_assign['volume'], to_assign['road_link_list'] ) if boardings and not boarding_links: print('to assign boardings on links pass boarding_links=True') if boarding_links: column = 'boarding_links' l = los.dropna(subset=[column]) self.links['boardings'] = assign(l['volume'], l[column]) if boardings: column = 'boardings' l = los.dropna(subset=[column]) self.nodes['boardings'] = assign(l['volume'], l[column]) if alighting_links: column = 'alighting_links' l = los.dropna(subset=[column]) self.links['alightings'] = assign(l['volume'], l[column]) if alightings: column = 'alightings' l = los.dropna(subset=[column]) self.nodes['alightings'] = assign(l['volume'], l[column]) if transfers: column = 'transfers' l = los.dropna(subset=[column]) self.nodes['transfers'] = assign(l['volume'], l[column]) if segmented: self.segmented_assigment( road=road, boardings=boardings, alightings=alightings, transfers=transfers, aggregated_los=los )
[docs] def segmented_assigment( self, road=False, boardings=False, alightings=False, transfers=False, aggregated_los=None ): """Performs assignment per segment : compute the volumes on the links of the public transport network, and the boardings and alightings on the nodes of the PT network, keeping the memory of segments in the volumes. Requires ---------- self.los : concatenation of levels of services of the different modes (usually pt_los and car_los) self.volumes Parameters ---------- road : bool, optional, default False Add public transport volume on road_links. Requires road_link_list - columns of self.links computed with preparation_cast_network function. boardings : bool, optional, default False If True, compute boardings to add to the nodes dataframe alightings : bool, optional, default False If True, compute alightings to add to the nodes dataframe transfers : bool, optional, default False If True, compute number of transfers to add to the nodes dataframe aggregated_los : string, optional, default None Name of attributes containing model aggregated los Builds ---------- self.los : add volume column if compute_los_volume=True self.links : add volumes on public transport links per segment, can add boardings, alightings,depending on parameters self.nodes : Depending on parameters, add boardings, alightings, transfers self.road_links : add public transport volumes on road_links if road=True """ los = aggregated_los if aggregated_los is not None else self.los for segment in self.segments: column = 'link_path' l = los.dropna(subset=[column]) self.links[segment] = assign(l[segment], l[column]) if road: self.road_links[(segment, 'car')] = assign(l[segment], l[column]) self.road_links[(segment, 'pt')] = assign( self.links[segment], self.links['road_link_list'] ) if boardings: column = 'boarding_links' l = los.dropna(subset=[column]) self.links[(segment, 'boardings')] = assign(l[segment], l[column]) column = 'boardings' l = los.dropna(subset=[column]) self.nodes[(segment, 'boardings')] = assign(l[segment], l[column]) if alightings: column = 'alighting_links' l = los.dropna(subset=[column]) self.links[(segment, 'alightings')] = assign(l[segment], l[column]) column = 'alightings' l = los.dropna(subset=[column]) self.nodes[(segment, 'alightings')] = assign(l[segment], l[column]) if transfers: column = 'transfers' l = los.dropna(subset=[column]) self.nodes[(segment, 'transfers')] = assign(l[segment], l[column])
[docs] @track_args def step_pt_assignment( self, volume_column=None, on_road_links=False, split_by=None, **kwargs ): """Performs assignment of the indicated volume column: compute the volumes on the links of the public transport network, and the boardings and alightings on the nodes of the PT network. This function is older and slower, will soon be deprecated, if possible priviledge the use of function step_assigment. Compared to step_assignment : - uses pt_los (and not los) - Can specify the volume column - Can strack path categories Requires ---------- self.links self.nodes self.pt_los : requires computed path probabilities in pt_los for each segment, they can be computed with funcions analysis_mode_utility + step_logit self.road_links self.volumes Parameters ---------- volume_column : string, optional, default None volume column of self.volumes to assign. If none, all columns will be assigned on_road_links : bool, optional, default False if True, performs pt assignment on road_links as well split_by : string, optional, default None path categories to be tracked in the assignment. Must be a column of self.pt_los Builds ---------- self.loaded_links self.loaded_nodes self.road_links : add public transport load column example: :: sm.step_assignment( volume_column=None, on_road_links=False, split_by='route_type', boardings=True, alightings=True, transfers=True } ) """ if volume_column is None: self.segmented_pt_assignment( on_road_links=on_road_links, split_by=split_by, **kwargs ) return # When split_by is not None, this call could be replaced by a sum, provided # prior dumb definition of loaded_links and loaded_nodes self.loaded_links, self.loaded_nodes = engine.loaded_links_and_nodes( self.links, self.nodes, volumes=self.volumes, path_finder_stack=self.pt_los, volume_column=volume_column, **kwargs ) # Rename columns self.loaded_links.rename(columns={volume_column: ('load', volume_column)}, inplace=True) self.loaded_nodes.rename(columns={volume_column: ('load', volume_column)}, inplace=True) for col in list(set(['boardings', 'alightings', 'transfers']).intersection(kwargs.keys())): self.loaded_links.rename(columns={col: (col, volume_column)}, inplace=True) self.loaded_nodes.rename(columns={col: (col, volume_column)}, inplace=True) # Group assignment if split_by is not None: groups = self.pt_los[split_by].unique() for group in groups: # TODO remove rows with empty link_path group_pt_los = self.pt_los.loc[self.pt_los[split_by] == group] group_loaded_links, group_loaded_nodes = engine.loaded_links_and_nodes( self.links, self.nodes, volumes=self.volumes, path_finder_stack=group_pt_los, volume_column=volume_column, **kwargs ) # Append results columns self.loaded_links[('load', volume_column, group)] = group_loaded_links[volume_column] self.loaded_nodes[('load', volume_column, group)] = group_loaded_nodes[volume_column] for col in list(set(['boardings', 'alightings', 'transfers']).intersection(kwargs.keys())): self.loaded_links[(col, volume_column, group)] = group_loaded_links[col] self.loaded_nodes[(col, volume_column, group)] = group_loaded_nodes[col] # Assignment on road_links if on_road_links: if 'road_link_path' not in self.pt_los.columns: # create road_link_path column from networkcasted linkss if not already defined self._analysis_road_link_path() merged = pd.merge(self.pt_los, self.volumes, on=['origin', 'destination']) merged['to_assign'] = merged[(volume_column, 'probability')] * merged[volume_column].fillna(0) if split_by is not None: def assign_group(g): x = g.reset_index() result = raw_assignment.assign(x['to_assign'], x['road_link_path']) return result group_assigned = merged.groupby(split_by).apply(assign_group) assigned = group_assigned.unstack().T.loc['volume'].fillna(0) # Add empty groups for empty in list(set(groups).difference(set(assigned.columns))): assigned[empty] = 0 self.road_links[[(volume_column, col) for col in groups]] = assigned[[col for col in groups]] self.road_links[volume_column] = assigned.T.sum() else: # no groups assigned = raw_assignment.assign(merged['to_assign'], merged['road_link_path']) self.road_links[volume_column] = assigned['volume'] # todo remove 'load' from analysis module: self.road_links['load'] = self.road_links[volume_column]
[docs] def segmented_pt_assignment(self, split_by=None, on_road_links=False, *args, **kwargs): """Performs pt assignment for all demand segments. Function used in step_pt_assignment function, refer to this function for other args. Requires ---------- self.links self.nodes self.pt_los : requires computed path probabilities in pt_los for each segment self.road_links self.volumes Parameters ---------- on_road_links : bool, optional, default False if True, performs pt assignment on road_links as well split_by : string, optional, default None path categories to be tracked in the assignment. Must be a column of self.pt_los Builds ---------- self.loaded_links self.loaded_nodes self.road_links : add public transport load column """ segments = self.segments iterator = tqdm(segments) for segment in iterator: iterator.desc = str(segment) # Assign demand segment self.step_pt_assignment( volume_column=segment, path_pivot_column=(segment, 'probability'), split_by=split_by, on_road_links=on_road_links, **kwargs ) # Update links and nodes to keep results as loaded links and nodes # are erased at each call of step_pt_assignment self.links = self.loaded_links self.nodes = self.loaded_nodes # Group assignment results: sum over demand segments try: groups = self.pt_los[split_by].unique() except KeyError: groups = [] cols = ['load'] # Add boardings, alightings and transfers if processed cols += list(set(['boardings', 'alightings', 'transfers']).intersection(kwargs.keys())) for col in cols: for g in groups: columns = [tuple([col, s, g]) for s in segments] name = tuple([col, g]) self.loaded_links[name] = self.loaded_links[columns].T.sum() self.loaded_links.drop(columns, 1, inplace=True) self.loaded_nodes[name] = self.loaded_nodes[columns].T.sum() self.loaded_nodes.drop(columns, 1, inplace=True) columns = [tuple([col, s]) for s in segments] self.loaded_links[col] = self.loaded_links[columns].T.sum() self.loaded_links.drop(columns, 1, inplace=True) self.loaded_nodes[col] = self.loaded_nodes[columns].T.sum() self.loaded_nodes.drop(columns, 1, inplace=True) if on_road_links: for group in groups: self.road_links[('all', group)] = self.road_links[[(s, group) for s in segments]].T.sum() self.road_links.drop([(s, group) for s in segments], 1, inplace=True) self.road_links['load'] = self.road_links[[s for s in segments]].T.sum() self.road_links.drop([s for s in segments], 1, inplace=True)
[docs] def step_car_assignment(self, volume_column=None): """Performs car assignment of the indicated volume column: compute the volumes on the road_links of the private transport network. This function is older and slower, will soon be deprecated, priviledge the use of function step_assigment and step_road_pathfinder. Compared to step_assignment : - uses car_los (and not los) - Can specify the volume column Requires ---------- self.road_links self.road_nodes self.car_los : requires computed path probabilities in car_los for each segment, they can be computed with funcions analysis_mode_utility + step_logit self.volumes Parameters ---------- volume_column : string, optional, default None volume column of self.volumes to assign. If none, all columns will be assigned Builds ---------- self.loaded_road_links self.loaded_road_nodes """ if volume_column is None: self.segmented_car_assignment() else: merged = pd.merge(self.car_los, self.volumes, on=['origin', 'destination']) merged['to_assign'] = merged[(volume_column, 'probability')] * merged[volume_column].fillna(0) assigned = raw_assignment.assign(merged['to_assign'], merged['link_path']).fillna(0) self.road_links[(volume_column, 'car')] = assigned
[docs] def segmented_car_assignment(self): """Performs car assignment for all demand segments. Function used in step_car_assignment function, refer to this function for other args and parameters. Requires ---------- self.road_links self.road_nodes self.car_los : requires computed path probabilities in car_los for each segment self.volumes Builds ---------- self.loaded_road_links self.loaded_road_nodes """ segments = self.segments iterator = tqdm(segments) for segment in iterator: iterator.desc = str(segment) merged = pd.merge(self.car_los, self.volumes, on=['origin', 'destination']) merged['to_assign'] = merged[(segment, 'probability')] * merged[segment].fillna(0) assigned = raw_assignment.assign(merged['to_assign'], merged['link_path']).fillna(0) self.road_links[(segment, 'car')] = assigned columns = [(segment, 'car') for segment in self.segments] self.road_links[('all', 'car')] = self.road_links[columns].T.sum()
# TODO Merge conflict: TO CHECK WITH ACCRA # self.road_links.drop(columns, 1, inplace=True) # if not 'load' in self.road_links.columns: # self.road_links['load'] = 0 # self.road_links['load'] += self.road_links[('all','car')] # TODO move all utility features to another object / file
[docs] def analysis_mode_utility(self, how='min', segment=None, segments=None, time_expanded=False): """Compute utilities per mode per segment based on logit parameters. Before applying this function : -- the function preparation_logit allows to create the stepmodel attributes utility_values, logit_scales, mode_utility and mode_nests required in the analysis_mode_utility function -- the functions analysis_pt_route_type and analysis_car_route_type may also be needed to builds 'route_type' in pt_los (resp in car_los) based on 'route_types' -- concatenate pt_los and car_los The function analysis_mode_utility computes the utility based on the chosen variables of the utility functions. The coefficients of the variable in the utility fonctions must be found in the utility_values model attribute and in the columns of the model los (default : price, time, ntransfers). They can be defined by segment. The modal constant can also be defined per segment and must be found in the table mode_utility. Requires ---------- self.mode_utility self.los self.utility_values self.segments Parameters ---------- how : ['main'|'min'|'max'|'mean'|'sum'], optional, default 'min' What type of agregation for mode constant. Options are: 'main' --(default) the utility (constant) of the 'route_type' is used 'min' -- minimum of the utilities (constant) of the modes in route_types 'max' -- maximum of the utilities (constant) of the modes in route_types 'mean' -- mean of the utilities (constant) of the modes in route_types 'sum' -- sum of the utilities (constant) of the modes in route_types segment : string, optional, default None Unique segment for which to compute utility. If None, will perform for all segments of self.segments segments : list, optional, default None List of segments of the model for which to compute utility time_expanded : bool, optional, default False Use True for models using timetables and ConnectionScan models Builds ---------- self.los : add columns (segment, 'utility') - value of the utility per segment """ if segment is None: for segment in tqdm(self.segments): self.analysis_mode_utility(how=how, segment=segment, time_expanded=time_expanded) return if time_expanded: logit_los = self.te_los else: logit_los = self.los mode_utility = self.mode_utility[segment].to_dict() if how == 'main': # the utility of the 'route_type' is used logit_los['mode_utility'] = logit_los['route_type'].apply(mode_utility.get) else : # how = 'min', 'max', 'mean', 'sum' # route type utilities rtu = { rt: get_combined_mode_utility( rt, how=how, mode_utility=mode_utility ) for rt in logit_los['route_types'].unique() } logit_los['mode_utility'] = logit_los['route_types'].map(rtu.get) utility_values = self.utility_values[segment].to_dict() u = 0 for key, value in utility_values.items(): u += value * logit_los[key] logit_los[(segment, 'utility')] = u logit_los[(segment, 'utility')] = logit_los[(segment, 'utility')]
[docs] def analysis_utility(self, segment='root', time_expanded=False, how='min'): """DEPRECATED USE analysis_mode_utility """ if segment is None: for segment in self.segments: print(segment) self.analysis_mode_utility(how=how, segment=segment, time_expanded=time_expanded) return if time_expanded: los = self.te_los else: los = self.los utility_values = self.utility_values[segment].to_dict() u = 0 for key, value in utility_values.items(): u += value * los[key] los[(segment, 'utility')] = u los[(segment, 'utility')] = los[(segment, 'utility')].astype(float)
[docs] def initialize_logit(self): """Not necessary creates tables od_probabilities and od_utilities """ zones = list(self.zones.index) od = pd.DataFrame(index=pd.MultiIndex.from_product([zones, zones])) self.od_probabilities = od.copy() self.od_utilities = od.copy()
[docs] def step_logit( self, time_expanded=False, decimals=None, n_paths_max=None, nchunks=10, workers=1, keep_od_tables=True ): """Performs the nested logit : compute the probabilities per segment of the paths in self.los after having computed the utilities with function analysis_mode_utility. prob(mode k)= exp(utility mode k)/sum(utilities, all modes). Parametrize the nested logit with model attribute mode_nests and logit_scales created with function preparation_logit. Requires ---------- self.mode_nests self.logit_scales self.los Parameters ---------- time_expanded : bool, optional, default False Use True for models using timetables and ConnectionScan models decimals : float, optional, default None minimum probability (avoid very small volumes) n_paths_max : int, optional, default None Maximum number of paths to keep per OD (avoid very small volumes) nchunks : int, optional, default 10 Parameter to speed up computation time (division of calculation) workers : int, optional, default 1 Parameter to speed up computation time (division of calculation) keep_od_tables : bool, optional, default True _description_, by default True Builds ---------- self.los Add columns (segment, 'probability') of probabilities per segment self.od_utilities self.od_probabilities self.path_utilities self.path_probabilities """ # concatenate paths od_cols = ['origin', 'destination'] if time_expanded: od_cols.append('wished_departure_time') to_concat = [] for segment in self.segments: keep_columns = od_cols + ['route_type', (segment, 'utility')] if time_expanded: paths = self.te_los[keep_columns] else: paths = self.los[keep_columns] paths.rename(columns={(segment, 'utility'): 'utility'}, inplace=True) paths = paths.dropna(subset=['utility']) paths['segment'] = segment to_concat.append(paths) segmented_paths = pd.concat(to_concat) try: # all the segments can be proccessed together # assert all logit scales are the same and pick one logit_scales = self.logit_scales.T.drop_duplicates().T assert len(logit_scales.columns) == 1 logit_scales.columns = ['root'] nls = logit_scales['root'].to_dict() # assert all mode_nests are the same and pick one mode_nests = self.mode_nests.T.drop_duplicates().T assert len(mode_nests.columns) == 1 mode_nests.columns = ['root'] nests = mode_nests.reset_index().groupby('root')['route_type'].agg( lambda s: list(s)).to_dict() p, mu, mp = nested_logit.nested_logit_from_paths( segmented_paths, od_cols, mode_nests=nests, phi=nls, verbose=False, decimals=decimals, n_paths_max=n_paths_max, nchunks=nchunks, workers=workers, return_od_tables=keep_od_tables ) except AssertionError: p_list = [] mu_list = [] mp_list = [] for segment in self.segments: mode_nests = self.mode_nests.reset_index().groupby(segment)['route_type'].agg( lambda s: list(s) ).to_dict() nls = self.logit_scales[segment].to_dict() paths = segmented_paths.loc[segmented_paths['segment'] == segment] p, mu, mp = nested_logit.nested_logit_from_paths( paths, mode_nests=mode_nests, phi=nls, od_cols=od_cols, decimals=decimals, n_paths_max=n_paths_max, ) p_list.append(p) mu_list.append(mu) mp_list.append(mp) p = pd.concat(p_list) mu = pd.concat(mu_list, ignore_index=True) mp = pd.concat(mp_list, ignore_index=True) p.reset_index(inplace=True) p.set_index(['segment', 'index'], inplace=True) for segment in self.segments: if time_expanded: self.te_los[(segment, 'probability')] = p.loc[segment]['probability'] else: assert self.los.index.is_unique, "los must have unique index" self.los[(segment, 'probability')] = p.loc[segment]['probability'] self.probabilities = mp self.utilities = mu
[docs]def get_combined_mode_utility(route_types, mode_utility, how='min'): """Agregate modal constants Parameters ---------- route_types : list List of the route_types in the path mode_utility : dataframe Modal constants how : ['main'|'min'|'max'|'mean'|'sum'], optional, default 'min' What type of agregation for mode constant. Options are: 'main' --(default) the utility (constant) of the 'route_type' is used 'min' -- minimum of the utilities (constant) of the modes in route_types 'max' -- maximum of the utilities (constant) of the modes in route_types 'mean' -- mean of the utilities (constant) of the modes in route_types 'sum' -- sum of the utilities (constant) of the modes in route_types """ utilities = [mode_utility[mode] for mode in route_types] if not len(utilities): return 0 if how == 'min': # worse mode return min(utilities) elif how == 'max': # best mode return max(utilities) elif how == 'sum': return sum(utilities) elif how == 'mean': return sum(utilities) / len(utilities)