From 0b2859a9cbce31ff34e600f2aeebbc285c0d7d5d Mon Sep 17 00:00:00 2001 From: Nicholas Orlowsky Date: Sat, 8 Nov 2025 13:19:24 -0500 Subject: [PATCH] add gtfs file --- api/src/services/gtfs_pull.rs | 239 ++++++++++++++++++++++++++++++++++ api/src/services/gtfs_rt.rs | 76 +++++++++++ 2 files changed, 315 insertions(+) create mode 100644 api/src/services/gtfs_pull.rs create mode 100644 api/src/services/gtfs_rt.rs diff --git a/api/src/services/gtfs_pull.rs b/api/src/services/gtfs_pull.rs new file mode 100644 index 0000000..47e1794 --- /dev/null +++ b/api/src/services/gtfs_pull.rs @@ -0,0 +1,239 @@ +use std::{collections::HashMap, env, hash::Hash, io::Cursor, path::PathBuf, sync::{Arc, Mutex}, thread, time::Duration}; + +use anyhow::anyhow; +use gtfs_structures::Trip; +use libseptastic::agency; +use log::{info, error}; +use serde::{Deserialize, Serialize}; +use zip::ZipArchive; + +#[derive(Serialize, Deserialize, PartialEq, Debug,Clone)] +struct GtfsSource { + pub uri: String, + pub subzip: Option +} + +#[derive(Serialize, Deserialize, PartialEq, Debug)] +pub struct Config { + pub gtfs_zips: Vec +} + +#[derive(Clone)] +struct GtfsFile { + pub source: GtfsSource, + pub hash: Option +} + +struct TransitData { + pub routes: HashMap, + pub agencies: HashMap, + pub trips: HashMap> +} + +struct GtfsPullServiceState { + pub gtfs_files: Vec, + pub tmp_dir: PathBuf, + pub ready: bool, + pub transit_data: TransitData +} + +pub struct GtfsPullService { + state: Arc> +} + +impl TransitData { + pub fn new() -> Self { + return TransitData { routes: HashMap::new(), agencies: HashMap::new(), trips: HashMap::new() } + } +} + +impl GtfsPullService { + const UPDATE_SECONDS: u64 = 3600*24; + + pub fn new(config: Config) -> Self { + Self { + state: Arc::new(Mutex::new( + GtfsPullServiceState { + gtfs_files: config.gtfs_zips.iter().map(|f| { GtfsFile { source: f.clone(), hash: None} }).collect(), + tmp_dir: env::temp_dir(), + ready: false, + transit_data: TransitData::new() + } + )) + } + } + + pub fn wait_for_ready(&self) { + while !(self.state.lock().unwrap()).ready { + thread::sleep(Duration::from_millis(500)); + } + } + + pub fn start(&self) { + let cloned_state = Arc::clone(&self.state); + thread::spawn(move || { + loop { + let recloned_state = Arc::clone(&cloned_state); + let res = Self::update_gtfs_data(recloned_state); + + match res { + Err(err) => { + error!("{}", err); + } + _ => {} + } + + thread::sleep(Duration::from_secs(Self::UPDATE_SECONDS)); + } + }); + } + + pub fn get_routes(&self) -> Vec { + let l_state = self.state.lock().unwrap(); + l_state.transit_data.routes.iter().map(|r| r.1.clone()).collect() + } + + pub fn get_route(&self, route_id: String) -> anyhow::Result { + let l_state = self.state.lock().unwrap(); + if let Some(route) = l_state.transit_data.routes.get(&route_id) { + Ok(route.clone()) + } else { + Err(anyhow!("")) + } + } + + pub fn get_schedule(&self, route_id: String) -> anyhow::Result> { + let l_state = self.state.lock().unwrap(); + if let Some(trips) = l_state.transit_data.trips.get(&route_id) { + Ok(trips.clone()) + } else { + Err(anyhow!("")) + } + } + + pub fn update_gtfs_data(state: Arc>) -> anyhow::Result<()> { + let mut l_state = state.lock().unwrap(); + let files = l_state.gtfs_files.clone(); + + for gtfs_file in files.iter() { + let gtfs = if let Some(subzip) = gtfs_file.source.subzip.clone() { + info!("Reading GTFS file at {} (subzip {})", gtfs_file.source.uri, subzip); + let res = reqwest::blocking::get(gtfs_file.source.uri.clone())?; + let outer_archive = res.bytes()?; + let mut archive = ZipArchive::new(Cursor::new(outer_archive))?; + archive.extract(l_state.tmp_dir.clone())?; + + let mut file_path = l_state.tmp_dir.clone(); + file_path.push(subzip.clone()); + + gtfs_structures::Gtfs::new(file_path.to_str().unwrap())? + } else { + info!("Reading GTFS file at {}", gtfs_file.source.uri); + gtfs_structures::Gtfs::new(gtfs_file.source.uri.as_str())? + }; + + let mut hack_agency = None; + + for agency in >fs.agencies { + if let Some(a_id) = &agency.id { + l_state.transit_data.agencies.insert(a_id.clone(), libseptastic::agency::Agency{ + id: a_id.clone(), + name: agency.name.clone() + }); + hack_agency = Some(libseptastic::agency::Agency{ + id: a_id.clone(), + name: agency.name.clone() + }); + } + } + + for route in >fs.routes { + let agency = route.1.agency_id.as_ref() + .and_then(|agency_id| l_state.transit_data.agencies.get(agency_id)) + .map(|agency| agency.clone()); + + let global_rt_id = match &agency { + Some(a) => format!("{}_{}", a.id, route.0.clone()), + None => format!("{}", route.0.clone()) + }; + + let rt_name = match route.1.long_name.clone() { + Some(x) => x, + _ => match route.1.short_name.clone() { + Some(y) => match agency { + Some(z) => format!("{} {}", z.name, y), + None => y + }, + None => String::from("unknown") + } + }; + + l_state.transit_data.routes.insert(global_rt_id.clone(), libseptastic::route::Route{ + name: rt_name, + short_name: match route.1.short_name.clone() { + Some(x) => x, + _ => String::from("unknown") + }, + color_hex: match route.1.color{ + Some(x) => x.to_string(), + _ => String::from("unknown") + }, + id: global_rt_id, + route_type: match route.1.route_type { + gtfs_structures::RouteType::Bus => libseptastic::route::RouteType::Bus, + gtfs_structures::RouteType::Rail => libseptastic::route::RouteType::RegionalRail, + gtfs_structures::RouteType::Subway => libseptastic::route::RouteType::SubwayElevated, + gtfs_structures::RouteType::Tramway => libseptastic::route::RouteType::Trolley, + _ => libseptastic::route::RouteType::TracklessTrolley + } + }); + } + + for trip in >fs.trips { + let global_rt_id = match &hack_agency { + Some(a) => format!("{}_{}", a.id, trip.1.route_id.clone()), + None => format!("{}", trip.1.route_id.clone()) + }; + let sched = trip.1.stop_times.iter().map(|s| libseptastic::stop_schedule::StopSchedule{ + arrival_time: i64::from(s.arrival_time.unwrap()), + stop_sequence: i64::from(s.stop_sequence), + stop: libseptastic::stop::Stop { + name: s.stop.name.clone().unwrap(), + lat: s.stop.latitude.unwrap(), + lng: s.stop.longitude.unwrap(), + id: s.stop.id.parse().unwrap(), + stop_type: libseptastic::stop::StopType::Normal + } + }).collect(); + + let trip = libseptastic::stop_schedule::Trip{ + trip_id: trip.1.id.clone(), + direction: libseptastic::direction::Direction { + direction: match trip.1.direction_id.unwrap() { + gtfs_structures::DirectionType::Outbound => libseptastic::direction::CardinalDirection::Outbound, + gtfs_structures::DirectionType::Inbound => libseptastic::direction::CardinalDirection::Inbound + }, + direction_destination: trip.1.trip_headsign.clone().unwrap() + }, + tracking_data: libseptastic::stop_schedule::TripTracking::Untracked, + schedule: sched, + service_id: trip.1.service_id.clone() + }; + + if let Some(trip_arr) = l_state.transit_data.trips.get_mut(&global_rt_id) { + trip_arr.push(trip); + } else { + l_state.transit_data.trips.insert(global_rt_id, vec![trip]); + } + } + + info!("Added {} routes", gtfs.routes.len()); + } + + l_state.ready = true; + info!("Finished initial sync, ready state is true"); + + + Ok(()) + } +} diff --git a/api/src/services/gtfs_rt.rs b/api/src/services/gtfs_rt.rs new file mode 100644 index 0000000..2a9add9 --- /dev/null +++ b/api/src/services/gtfs_rt.rs @@ -0,0 +1,76 @@ +use serde_json::Value; +use serde::de; +use std::sync::{Arc, Mutex}; +use std::thread; +use std::collections::HashMap; +use std::time::Duration; +use log::error; +use serde::{Serialize, Deserialize, Deserializer}; +use libseptastic::stop_schedule::{LiveTrip, TripTracking}; +use prost::Message; + +struct TripTrackingServiceState { + pub tracking_data: HashMap:: +} + +#[derive(Clone)] +struct GtfsRtFile { + pub uri: String, + pub hash: Option +} + +pub struct TripTrackingService { + state: Arc>, + //pub gtfs_files: Vec +} + +impl TripTrackingService { + const UPDATE_SECONDS: u64 = 30; + + pub fn new() -> Self { + Self { + state: Arc::new(Mutex::new(TripTrackingServiceState{ tracking_data: HashMap::new()})) + } + } + + pub fn start(&self) { + let cloned_state = Arc::clone(&self.state); + thread::spawn( move || { + loop { + let clonedx_state = Arc::clone(&cloned_state); + let res = Self::update_live_trips(clonedx_state); + + match res { + Err(err) => { + error!("{}", err); + } + _ => {} + } + + thread::sleep(Duration::from_secs(Self::UPDATE_SECONDS)); + } + }); + } + + pub fn annotate_trips(&self, trips: &mut Vec) { + for trip in trips { + trip.tracking_data = match self.state.lock().unwrap().tracking_data.get(&trip.trip_id.clone()){ + Some(x) => x.clone(), + None => TripTracking::Untracked + }; + } + } + + fn update_live_trips(service: Arc>) -> anyhow::Result<()> { + + let url = "https://api-endpoint.mta.info/Dataservice/mtagtfsfeeds/nyct%2Fgtfs-l"; + let response = reqwest::blocking::get(url).unwrap(); + let bytes = response.bytes().unwrap(); + let data: Result = prost::Message::decode(bytes.as_ref()); + let data = data.unwrap(); + + println!("{:#?}", data); + + Ok(()) + } +}