add gtfs file
Some checks failed
Create and publish a Docker image / build-and-push-image (push) Failing after 5m23s
Some checks failed
Create and publish a Docker image / build-and-push-image (push) Failing after 5m23s
This commit is contained in:
parent
786f32e7e3
commit
0b2859a9cb
2 changed files with 315 additions and 0 deletions
239
api/src/services/gtfs_pull.rs
Normal file
239
api/src/services/gtfs_pull.rs
Normal file
|
|
@ -0,0 +1,239 @@
|
||||||
|
use std::{collections::HashMap, env, hash::Hash, io::Cursor, path::PathBuf, sync::{Arc, Mutex}, thread, time::Duration};
|
||||||
|
|
||||||
|
use anyhow::anyhow;
|
||||||
|
use gtfs_structures::Trip;
|
||||||
|
use libseptastic::agency;
|
||||||
|
use log::{info, error};
|
||||||
|
use serde::{Deserialize, Serialize};
|
||||||
|
use zip::ZipArchive;
|
||||||
|
|
||||||
|
#[derive(Serialize, Deserialize, PartialEq, Debug,Clone)]
|
||||||
|
struct GtfsSource {
|
||||||
|
pub uri: String,
|
||||||
|
pub subzip: Option<String>
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Serialize, Deserialize, PartialEq, Debug)]
|
||||||
|
pub struct Config {
|
||||||
|
pub gtfs_zips: Vec<GtfsSource>
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Clone)]
|
||||||
|
struct GtfsFile {
|
||||||
|
pub source: GtfsSource,
|
||||||
|
pub hash: Option<String>
|
||||||
|
}
|
||||||
|
|
||||||
|
struct TransitData {
|
||||||
|
pub routes: HashMap<String, libseptastic::route::Route>,
|
||||||
|
pub agencies: HashMap<String, libseptastic::agency::Agency>,
|
||||||
|
pub trips: HashMap<String, Vec<libseptastic::stop_schedule::Trip>>
|
||||||
|
}
|
||||||
|
|
||||||
|
struct GtfsPullServiceState {
|
||||||
|
pub gtfs_files: Vec<GtfsFile>,
|
||||||
|
pub tmp_dir: PathBuf,
|
||||||
|
pub ready: bool,
|
||||||
|
pub transit_data: TransitData
|
||||||
|
}
|
||||||
|
|
||||||
|
pub struct GtfsPullService {
|
||||||
|
state: Arc<Mutex<GtfsPullServiceState>>
|
||||||
|
}
|
||||||
|
|
||||||
|
impl TransitData {
|
||||||
|
pub fn new() -> Self {
|
||||||
|
return TransitData { routes: HashMap::new(), agencies: HashMap::new(), trips: HashMap::new() }
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl GtfsPullService {
|
||||||
|
const UPDATE_SECONDS: u64 = 3600*24;
|
||||||
|
|
||||||
|
pub fn new(config: Config) -> Self {
|
||||||
|
Self {
|
||||||
|
state: Arc::new(Mutex::new(
|
||||||
|
GtfsPullServiceState {
|
||||||
|
gtfs_files: config.gtfs_zips.iter().map(|f| { GtfsFile { source: f.clone(), hash: None} }).collect(),
|
||||||
|
tmp_dir: env::temp_dir(),
|
||||||
|
ready: false,
|
||||||
|
transit_data: TransitData::new()
|
||||||
|
}
|
||||||
|
))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn wait_for_ready(&self) {
|
||||||
|
while !(self.state.lock().unwrap()).ready {
|
||||||
|
thread::sleep(Duration::from_millis(500));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn start(&self) {
|
||||||
|
let cloned_state = Arc::clone(&self.state);
|
||||||
|
thread::spawn(move || {
|
||||||
|
loop {
|
||||||
|
let recloned_state = Arc::clone(&cloned_state);
|
||||||
|
let res = Self::update_gtfs_data(recloned_state);
|
||||||
|
|
||||||
|
match res {
|
||||||
|
Err(err) => {
|
||||||
|
error!("{}", err);
|
||||||
|
}
|
||||||
|
_ => {}
|
||||||
|
}
|
||||||
|
|
||||||
|
thread::sleep(Duration::from_secs(Self::UPDATE_SECONDS));
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn get_routes(&self) -> Vec<libseptastic::route::Route> {
|
||||||
|
let l_state = self.state.lock().unwrap();
|
||||||
|
l_state.transit_data.routes.iter().map(|r| r.1.clone()).collect()
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn get_route(&self, route_id: String) -> anyhow::Result<libseptastic::route::Route> {
|
||||||
|
let l_state = self.state.lock().unwrap();
|
||||||
|
if let Some(route) = l_state.transit_data.routes.get(&route_id) {
|
||||||
|
Ok(route.clone())
|
||||||
|
} else {
|
||||||
|
Err(anyhow!(""))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn get_schedule(&self, route_id: String) -> anyhow::Result<Vec<libseptastic::stop_schedule::Trip>> {
|
||||||
|
let l_state = self.state.lock().unwrap();
|
||||||
|
if let Some(trips) = l_state.transit_data.trips.get(&route_id) {
|
||||||
|
Ok(trips.clone())
|
||||||
|
} else {
|
||||||
|
Err(anyhow!(""))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn update_gtfs_data(state: Arc<Mutex<GtfsPullServiceState>>) -> anyhow::Result<()> {
|
||||||
|
let mut l_state = state.lock().unwrap();
|
||||||
|
let files = l_state.gtfs_files.clone();
|
||||||
|
|
||||||
|
for gtfs_file in files.iter() {
|
||||||
|
let gtfs = if let Some(subzip) = gtfs_file.source.subzip.clone() {
|
||||||
|
info!("Reading GTFS file at {} (subzip {})", gtfs_file.source.uri, subzip);
|
||||||
|
let res = reqwest::blocking::get(gtfs_file.source.uri.clone())?;
|
||||||
|
let outer_archive = res.bytes()?;
|
||||||
|
let mut archive = ZipArchive::new(Cursor::new(outer_archive))?;
|
||||||
|
archive.extract(l_state.tmp_dir.clone())?;
|
||||||
|
|
||||||
|
let mut file_path = l_state.tmp_dir.clone();
|
||||||
|
file_path.push(subzip.clone());
|
||||||
|
|
||||||
|
gtfs_structures::Gtfs::new(file_path.to_str().unwrap())?
|
||||||
|
} else {
|
||||||
|
info!("Reading GTFS file at {}", gtfs_file.source.uri);
|
||||||
|
gtfs_structures::Gtfs::new(gtfs_file.source.uri.as_str())?
|
||||||
|
};
|
||||||
|
|
||||||
|
let mut hack_agency = None;
|
||||||
|
|
||||||
|
for agency in >fs.agencies {
|
||||||
|
if let Some(a_id) = &agency.id {
|
||||||
|
l_state.transit_data.agencies.insert(a_id.clone(), libseptastic::agency::Agency{
|
||||||
|
id: a_id.clone(),
|
||||||
|
name: agency.name.clone()
|
||||||
|
});
|
||||||
|
hack_agency = Some(libseptastic::agency::Agency{
|
||||||
|
id: a_id.clone(),
|
||||||
|
name: agency.name.clone()
|
||||||
|
});
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
for route in >fs.routes {
|
||||||
|
let agency = route.1.agency_id.as_ref()
|
||||||
|
.and_then(|agency_id| l_state.transit_data.agencies.get(agency_id))
|
||||||
|
.map(|agency| agency.clone());
|
||||||
|
|
||||||
|
let global_rt_id = match &agency {
|
||||||
|
Some(a) => format!("{}_{}", a.id, route.0.clone()),
|
||||||
|
None => format!("{}", route.0.clone())
|
||||||
|
};
|
||||||
|
|
||||||
|
let rt_name = match route.1.long_name.clone() {
|
||||||
|
Some(x) => x,
|
||||||
|
_ => match route.1.short_name.clone() {
|
||||||
|
Some(y) => match agency {
|
||||||
|
Some(z) => format!("{} {}", z.name, y),
|
||||||
|
None => y
|
||||||
|
},
|
||||||
|
None => String::from("unknown")
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
l_state.transit_data.routes.insert(global_rt_id.clone(), libseptastic::route::Route{
|
||||||
|
name: rt_name,
|
||||||
|
short_name: match route.1.short_name.clone() {
|
||||||
|
Some(x) => x,
|
||||||
|
_ => String::from("unknown")
|
||||||
|
},
|
||||||
|
color_hex: match route.1.color{
|
||||||
|
Some(x) => x.to_string(),
|
||||||
|
_ => String::from("unknown")
|
||||||
|
},
|
||||||
|
id: global_rt_id,
|
||||||
|
route_type: match route.1.route_type {
|
||||||
|
gtfs_structures::RouteType::Bus => libseptastic::route::RouteType::Bus,
|
||||||
|
gtfs_structures::RouteType::Rail => libseptastic::route::RouteType::RegionalRail,
|
||||||
|
gtfs_structures::RouteType::Subway => libseptastic::route::RouteType::SubwayElevated,
|
||||||
|
gtfs_structures::RouteType::Tramway => libseptastic::route::RouteType::Trolley,
|
||||||
|
_ => libseptastic::route::RouteType::TracklessTrolley
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
for trip in >fs.trips {
|
||||||
|
let global_rt_id = match &hack_agency {
|
||||||
|
Some(a) => format!("{}_{}", a.id, trip.1.route_id.clone()),
|
||||||
|
None => format!("{}", trip.1.route_id.clone())
|
||||||
|
};
|
||||||
|
let sched = trip.1.stop_times.iter().map(|s| libseptastic::stop_schedule::StopSchedule{
|
||||||
|
arrival_time: i64::from(s.arrival_time.unwrap()),
|
||||||
|
stop_sequence: i64::from(s.stop_sequence),
|
||||||
|
stop: libseptastic::stop::Stop {
|
||||||
|
name: s.stop.name.clone().unwrap(),
|
||||||
|
lat: s.stop.latitude.unwrap(),
|
||||||
|
lng: s.stop.longitude.unwrap(),
|
||||||
|
id: s.stop.id.parse().unwrap(),
|
||||||
|
stop_type: libseptastic::stop::StopType::Normal
|
||||||
|
}
|
||||||
|
}).collect();
|
||||||
|
|
||||||
|
let trip = libseptastic::stop_schedule::Trip{
|
||||||
|
trip_id: trip.1.id.clone(),
|
||||||
|
direction: libseptastic::direction::Direction {
|
||||||
|
direction: match trip.1.direction_id.unwrap() {
|
||||||
|
gtfs_structures::DirectionType::Outbound => libseptastic::direction::CardinalDirection::Outbound,
|
||||||
|
gtfs_structures::DirectionType::Inbound => libseptastic::direction::CardinalDirection::Inbound
|
||||||
|
},
|
||||||
|
direction_destination: trip.1.trip_headsign.clone().unwrap()
|
||||||
|
},
|
||||||
|
tracking_data: libseptastic::stop_schedule::TripTracking::Untracked,
|
||||||
|
schedule: sched,
|
||||||
|
service_id: trip.1.service_id.clone()
|
||||||
|
};
|
||||||
|
|
||||||
|
if let Some(trip_arr) = l_state.transit_data.trips.get_mut(&global_rt_id) {
|
||||||
|
trip_arr.push(trip);
|
||||||
|
} else {
|
||||||
|
l_state.transit_data.trips.insert(global_rt_id, vec![trip]);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
info!("Added {} routes", gtfs.routes.len());
|
||||||
|
}
|
||||||
|
|
||||||
|
l_state.ready = true;
|
||||||
|
info!("Finished initial sync, ready state is true");
|
||||||
|
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
}
|
||||||
76
api/src/services/gtfs_rt.rs
Normal file
76
api/src/services/gtfs_rt.rs
Normal file
|
|
@ -0,0 +1,76 @@
|
||||||
|
use serde_json::Value;
|
||||||
|
use serde::de;
|
||||||
|
use std::sync::{Arc, Mutex};
|
||||||
|
use std::thread;
|
||||||
|
use std::collections::HashMap;
|
||||||
|
use std::time::Duration;
|
||||||
|
use log::error;
|
||||||
|
use serde::{Serialize, Deserialize, Deserializer};
|
||||||
|
use libseptastic::stop_schedule::{LiveTrip, TripTracking};
|
||||||
|
use prost::Message;
|
||||||
|
|
||||||
|
struct TripTrackingServiceState {
|
||||||
|
pub tracking_data: HashMap::<String, TripTracking>
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Clone)]
|
||||||
|
struct GtfsRtFile {
|
||||||
|
pub uri: String,
|
||||||
|
pub hash: Option<String>
|
||||||
|
}
|
||||||
|
|
||||||
|
pub struct TripTrackingService {
|
||||||
|
state: Arc<Mutex<TripTrackingServiceState>>,
|
||||||
|
//pub gtfs_files: Vec<GtfsRtFile>
|
||||||
|
}
|
||||||
|
|
||||||
|
impl TripTrackingService {
|
||||||
|
const UPDATE_SECONDS: u64 = 30;
|
||||||
|
|
||||||
|
pub fn new() -> Self {
|
||||||
|
Self {
|
||||||
|
state: Arc::new(Mutex::new(TripTrackingServiceState{ tracking_data: HashMap::new()}))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn start(&self) {
|
||||||
|
let cloned_state = Arc::clone(&self.state);
|
||||||
|
thread::spawn( move || {
|
||||||
|
loop {
|
||||||
|
let clonedx_state = Arc::clone(&cloned_state);
|
||||||
|
let res = Self::update_live_trips(clonedx_state);
|
||||||
|
|
||||||
|
match res {
|
||||||
|
Err(err) => {
|
||||||
|
error!("{}", err);
|
||||||
|
}
|
||||||
|
_ => {}
|
||||||
|
}
|
||||||
|
|
||||||
|
thread::sleep(Duration::from_secs(Self::UPDATE_SECONDS));
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn annotate_trips(&self, trips: &mut Vec<libseptastic::stop_schedule::Trip>) {
|
||||||
|
for trip in trips {
|
||||||
|
trip.tracking_data = match self.state.lock().unwrap().tracking_data.get(&trip.trip_id.clone()){
|
||||||
|
Some(x) => x.clone(),
|
||||||
|
None => TripTracking::Untracked
|
||||||
|
};
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn update_live_trips(service: Arc<Mutex<TripTrackingServiceState>>) -> anyhow::Result<()> {
|
||||||
|
|
||||||
|
let url = "https://api-endpoint.mta.info/Dataservice/mtagtfsfeeds/nyct%2Fgtfs-l";
|
||||||
|
let response = reqwest::blocking::get(url).unwrap();
|
||||||
|
let bytes = response.bytes().unwrap();
|
||||||
|
let data: Result<gtfs_realtime::FeedMessage, prost::DecodeError> = prost::Message::decode(bytes.as_ref());
|
||||||
|
let data = data.unwrap();
|
||||||
|
|
||||||
|
println!("{:#?}", data);
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
}
|
||||||
Loading…
Add table
Add a link
Reference in a new issue