use std::{collections::{HashMap, HashSet}, env, hash::Hash, io::Cursor, path::PathBuf, sync::{Arc, Mutex, MutexGuard}, thread, time::Duration}; use anyhow::anyhow; use libseptastic::{stop::Platform, stop_schedule::CalendarDay}; use log::{info, error}; use serde::{Deserialize, Serialize}; use zip::ZipArchive; macro_rules! make_global_id { ($prefix: expr, $id: expr) => (format!("{}_{}", $prefix, $id)) } #[derive(Serialize, Deserialize, PartialEq, Debug,Clone)] struct GtfsSource { pub uri: String, pub subzip: Option, pub prefix: String } #[derive(Serialize, Deserialize, PartialEq, Debug,Clone)] struct MultiplatformStopConfig { pub id: String, pub name: String, pub platform_station_ids: Vec } #[derive(Serialize, Deserialize, PartialEq, Debug,Clone)] struct Annotations { pub multiplatform_stops: Vec } #[derive(Serialize, Deserialize, PartialEq, Debug)] pub struct Config { pub gtfs_zips: Vec, pub annotations: Annotations } #[derive(Clone)] struct GtfsFile { pub source: GtfsSource, pub hash: Option } struct TransitData { pub routes: HashMap>, pub agencies: HashMap, pub trips: HashMap>, pub stops: HashMap>, pub platforms: HashMap>, pub calendar_days: HashMap>, // extended lookup methods pub route_id_by_stops: HashMap>, pub stops_by_route_id: HashMap>, pub stops_by_platform_id: HashMap> } struct GtfsPullServiceState { pub gtfs_files: Vec, pub tmp_dir: PathBuf, pub ready: bool, pub annotations: Annotations, pub transit_data: TransitData } pub struct GtfsPullService { state: Arc> } impl TransitData { pub fn new() -> Self { return TransitData { routes: HashMap::new(), agencies: HashMap::new(), trips: HashMap::new(), stops: HashMap::new(), platforms: HashMap::new(), route_id_by_stops: HashMap::new(), stops_by_route_id: HashMap::new(), stops_by_platform_id: HashMap::new(), calendar_days: HashMap::new() } } } impl GtfsPullService { const UPDATE_SECONDS: u64 = 3600*24; const READYSTATE_CHECK_MILLISECONDS: u64 = 500; pub fn new(config: Config) -> Self { Self { state: Arc::new(Mutex::new( GtfsPullServiceState { gtfs_files: config.gtfs_zips.iter().map(|f| { GtfsFile { source: f.clone(), hash: None} }).collect(), tmp_dir: env::temp_dir(), annotations: config.annotations.clone(), ready: false, transit_data: TransitData::new() } )) } } pub fn wait_for_ready(&self) { while !(self.state.lock().unwrap()).ready { thread::sleep( Duration::from_millis(Self::READYSTATE_CHECK_MILLISECONDS) ); } } pub fn start(&self) { let cloned_state = Arc::clone(&self.state); thread::spawn(move || { loop { let recloned_state = Arc::clone(&cloned_state); let res = Self::update_gtfs_data(recloned_state); match res { Err(err) => { error!("{}", err); } _ => {} } thread::sleep(Duration::from_secs(Self::UPDATE_SECONDS)); } }); } pub fn get_routes(&self) -> Vec { let l_state = self.state.lock().unwrap(); l_state.transit_data.routes.iter().map(|r| libseptastic::route::Route::clone(r.1)).collect() } pub fn get_route(&self, route_id: String) -> anyhow::Result { let l_state = self.state.lock().unwrap(); if let Some(route) = l_state.transit_data.routes.get(&route_id) { Ok(libseptastic::route::Route::clone(route)) } else { Err(anyhow!("")) } } pub fn get_all_routes(&self) -> HashMap { let l_state = self.state.lock().unwrap(); l_state.transit_data.routes.iter().map(|r| (r.0.clone(), libseptastic::route::Route::clone(r.1))).collect() } pub fn get_all_stops(&self) -> HashMap> { let l_state = self.state.lock().unwrap(); l_state.transit_data.stops.clone() } pub fn get_all_trips(&self) -> HashMap> { let l_state = self.state.lock().unwrap(); l_state.transit_data.trips.clone() } pub fn get_routes_at_stop(&self, id: &String) -> HashSet { let l_state = self.state.lock().unwrap(); l_state.transit_data.route_id_by_stops.get(id).unwrap_or(&HashSet::new()).clone() } pub fn get_stops_by_route(&self, id: &String) -> HashSet { let l_state = self.state.lock().unwrap(); l_state.transit_data.stops_by_route_id.get(id).unwrap_or(&HashSet::new()).clone() } pub fn get_stop_by_id(&self, id: &String) -> Option { let l_state = self.state.lock().unwrap(); match l_state.transit_data.stops.get(id) { Some(stop) => Some(libseptastic::stop::Stop::clone(stop)), None => None } } pub fn get_schedule(&self, route_id: String) -> anyhow::Result> { let l_state = self.state.lock().unwrap(); if let Some(trips) = l_state.transit_data.trips.get(&route_id) { Ok(trips.clone()) } else { Err(anyhow!("")) } } fn postprocess_stops(state: &mut MutexGuard<'_, GtfsPullServiceState>) -> anyhow::Result<()> { for annotated_stop in state.annotations.multiplatform_stops.clone() { let global_id = make_global_id!("ANNOTATED", annotated_stop.id.clone()); let stop = Arc::new(libseptastic::stop::Stop { id: global_id.clone(), name: annotated_stop.name.clone(), platforms: libseptastic::stop::StopType::MultiPlatform(annotated_stop.platform_station_ids.iter().map(|platform_id| { info!("Folding {} stop into stop {} as platform", platform_id.clone(), annotated_stop.id.clone()); let platform = match state.transit_data.stops.remove(platform_id).unwrap().platforms.clone() { libseptastic::stop::StopType::SinglePlatform(plat) => Ok(plat), _ => Err(anyhow!("")) }.unwrap(); state.transit_data.stops_by_platform_id.remove(&platform.id).unwrap(); platform }).collect()) }); state.transit_data.stops.insert(global_id.clone(), stop.clone()); match &stop.platforms { libseptastic::stop::StopType::MultiPlatform(platforms) => { for platform in platforms { state.transit_data.stops_by_platform_id.insert(platform.id.clone(), stop.clone()); } Ok(()) }, _ => Err(anyhow!("")) }? } Ok(()) } fn populate_stops(state: &mut MutexGuard<'_, GtfsPullServiceState>, prefix: &String, gtfs: >fs_structures::Gtfs) -> anyhow::Result<()> { for stop in >fs.stops { let global_id = make_global_id!(prefix, stop.1.id.clone()); let platform = Arc::new(Platform { id : global_id.clone(), name: stop.1.name.clone().unwrap(), lat: stop.1.latitude.unwrap(), lng: stop.1.longitude.unwrap(), platform_location: libseptastic::stop::PlatformLocationType::Normal }); let stop = Arc::new(libseptastic::stop::Stop { id: global_id.clone(), name: stop.1.name.clone().unwrap(), platforms: libseptastic::stop::StopType::SinglePlatform(platform.clone()) }); state.transit_data.stops.insert(global_id.clone(), stop.clone()); state.transit_data.platforms.insert(global_id.clone(), platform.clone()); state.transit_data.stops_by_platform_id.insert(global_id.clone(), stop.clone()); } Ok(()) } fn populate_routes(state: &mut MutexGuard<'_, GtfsPullServiceState>, prefix: &String, gtfs: >fs_structures::Gtfs) -> anyhow::Result<()> { for route in >fs.routes { let global_rt_id = make_global_id!(prefix, route.1.id); let rt_name = match route.1.long_name.clone() { Some(x) => x, _ => String::from("Unknown") }; state.transit_data.routes.insert(global_rt_id.clone(), Arc::new(libseptastic::route::Route{ name: rt_name, short_name: match route.1.short_name.clone() { Some(x) => x, _ => String::from("unknown") }, color_hex: match route.1.color{ Some(x) => x.to_string(), _ => String::from("unknown") }, id: global_rt_id, route_type: match route.1.route_type { gtfs_structures::RouteType::Bus => libseptastic::route::RouteType::Bus, gtfs_structures::RouteType::Rail => libseptastic::route::RouteType::RegionalRail, gtfs_structures::RouteType::Subway => libseptastic::route::RouteType::SubwayElevated, gtfs_structures::RouteType::Tramway => libseptastic::route::RouteType::Trolley, _ => libseptastic::route::RouteType::TracklessTrolley } })); } Ok(()) } fn populate_trips(state: &mut MutexGuard<'_, GtfsPullServiceState>, prefix: &String, gtfs: >fs_structures::Gtfs) -> anyhow::Result<()> { for trip in >fs.trips { let global_rt_id = make_global_id!(prefix, trip.1.route_id); let sched = trip.1.stop_times.iter().map(|s| { let global_stop_id = make_global_id!(prefix, s.stop.id); let stop = state.transit_data.stops_by_platform_id.get(&global_stop_id).unwrap().clone(); let platform = state.transit_data.platforms.get(&global_stop_id).unwrap().clone(); state.transit_data.route_id_by_stops.entry(stop.id.clone()).or_insert(HashSet::new()).insert(global_rt_id.clone()); state.transit_data.stops_by_route_id.entry(global_rt_id.clone()).or_insert(HashSet::new()).insert(stop.id.clone()); state.transit_data.route_id_by_stops.entry(platform.id.clone()).or_insert(HashSet::new()).insert(global_rt_id.clone()); state.transit_data.stops_by_route_id.entry(global_rt_id.clone()).or_insert(HashSet::new()).insert(platform.id.clone()); libseptastic::stop_schedule::StopSchedule{ arrival_time: i64::from(s.arrival_time.unwrap()), stop_sequence: i64::from(s.stop_sequence), stop, platform } }).collect(); if let Some(calendar_day) = state.transit_data.calendar_days.get(&trip.1.service_id.clone()) { let trip = libseptastic::stop_schedule::Trip{ trip_id: trip.1.id.clone(), route: state.transit_data.routes.get(&make_global_id!(prefix, trip.1.route_id)).unwrap().clone(), direction: libseptastic::direction::Direction { direction: match trip.1.direction_id.unwrap() { gtfs_structures::DirectionType::Outbound => libseptastic::direction::CardinalDirection::Outbound, gtfs_structures::DirectionType::Inbound => libseptastic::direction::CardinalDirection::Inbound }, direction_destination: trip.1.trip_headsign.clone().unwrap() }, tracking_data: libseptastic::stop_schedule::TripTracking::Untracked, schedule: sched, service_id: trip.1.service_id.clone(), calendar_day: calendar_day.clone() }; if let Some(trip_arr) = state.transit_data.trips.get_mut(&global_rt_id) { trip_arr.push(trip); } else { state.transit_data.trips.insert(global_rt_id, vec![trip]); } } } Ok(()) } pub fn update_gtfs_data(state: Arc>) -> anyhow::Result<()> { let mut l_state = state.lock().unwrap(); let files = l_state.gtfs_files.clone(); l_state.transit_data = TransitData::new(); let mut gtfses = Vec::new(); for gtfs_file in files.iter() { let gtfs = if let Some(subzip) = gtfs_file.source.subzip.clone() { info!("Reading GTFS file at {} (subzip {})", gtfs_file.source.uri, subzip); let res = reqwest::blocking::get(gtfs_file.source.uri.clone())?; let outer_archive = res.bytes()?; let mut archive = ZipArchive::new(Cursor::new(outer_archive))?; archive.extract(l_state.tmp_dir.clone())?; let mut file_path = l_state.tmp_dir.clone(); file_path.push(subzip.clone()); info!("Downloaded, parsing"); gtfs_structures::Gtfs::new(file_path.to_str().unwrap())? } else { info!("Reading GTFS file at {}", gtfs_file.source.uri); gtfs_structures::Gtfs::new(gtfs_file.source.uri.as_str())? }; gtfses.push((gtfs, gtfs_file.source.prefix.clone())); } info!("Data loaded, processing..."); for (gtfs, prefix) in >fses { GtfsPullService::populate_routes(&mut l_state, &prefix, >fs)?; GtfsPullService::populate_stops(&mut l_state, &prefix, >fs)?; for calendar in >fs.calendar { l_state.transit_data.calendar_days.insert(calendar.1.id.clone(), Arc::new(CalendarDay{ id: calendar.1.id.clone(), monday: calendar.1.monday, tuesday: calendar.1.tuesday, wednesday: calendar.1.wednesday, thursday: calendar.1.thursday, friday: calendar.1.friday, saturday: calendar.1.saturday, sunday: calendar.1.sunday, start_date: calendar.1.start_date, end_date: calendar.1.end_date })); } } GtfsPullService::postprocess_stops(&mut l_state)?; for (gtfs, prefix) in >fses { GtfsPullService::populate_trips(&mut l_state, &prefix, >fs)?; } l_state.ready = true; info!("Finished initial sync, ready state is true"); Ok(()) } }