diff --git a/api/.envrc b/api/.envrc deleted file mode 100644 index 1d953f4..0000000 --- a/api/.envrc +++ /dev/null @@ -1 +0,0 @@ -use nix diff --git a/api/Cargo.lock b/api/Cargo.lock index b3d8d59..bf07f29 100644 --- a/api/Cargo.lock +++ b/api/Cargo.lock @@ -2531,7 +2531,9 @@ dependencies = [ "base64", "bytes", "encoding_rs", + "futures-channel", "futures-core", + "futures-util", "h2 0.4.11", "http 1.3.1", "http-body", diff --git a/api/Cargo.toml b/api/Cargo.toml index c175318..0225533 100644 --- a/api/Cargo.toml +++ b/api/Cargo.toml @@ -18,5 +18,5 @@ serde = "1.0.219" chrono = "0.4.41" chrono-tz = "0.10.4" actix-cors = "0.7.1" -reqwest = { version = "0.12.22", features = [ "json" ] } +reqwest = { version = "0.12.22", features = [ "json", "blocking" ] } sqlx-cli = "0.8.6" diff --git a/api/assets/style.css b/api/assets/style.css index 7c7eaae..403b61c 100644 --- a/api/assets/style.css +++ b/api/assets/style.css @@ -132,3 +132,7 @@ img { .tscroll td, .tscroll th { } + +details summary > * { + display: inline; +} diff --git a/api/shell.nix b/api/shell.nix deleted file mode 100644 index 5e00bc7..0000000 --- a/api/shell.nix +++ /dev/null @@ -1,8 +0,0 @@ -with import {}; -stdenv.mkDerivation { - name = "env"; - nativeBuildInputs = [ pkg-config ]; - buildInputs = [ - cryptsetup - ]; -} diff --git a/api/src/database.rs b/api/src/database.rs index 829b234..3666f41 100644 --- a/api/src/database.rs +++ b/api/src/database.rs @@ -1,10 +1,10 @@ -use std::{collections::HashMap, hash::Hash}; - -use actix_web::Route; +use std::collections::HashMap; use libseptastic::{direction::CardinalDirection, route::RouteType}; use serde::{Deserialize, Serialize}; use sqlx::{Postgres, Transaction}; +use crate::services::trip_tracking::TripTracking; + pub async fn get_route_by_id( id: String, transaction: &mut Transaction<'_, Postgres>, @@ -36,6 +36,39 @@ pub async fn get_route_by_id( }); } +pub async fn get_all_routes( + transaction: &mut Transaction<'_, Postgres>, +) -> ::anyhow::Result> { + + let rows = sqlx::query!( + r#"SELECT + id, + name, + short_name, + color_hex, + route_type as "route_type: libseptastic::route::RouteType" + FROM + septa_routes + ;"# + ) + .fetch_all(&mut **transaction) + .await?; + + let mut routes = Vec::new(); + + for row in rows { + routes.push(libseptastic::route::Route { + name: row.name, + short_name: row.short_name, + color_hex: row.color_hex, + route_type: row.route_type, + id: row.id, + }); + } + + return Ok(routes); +} + pub async fn get_direction_by_route_id( id: String, transaction: &mut Transaction<'_, Postgres>, @@ -89,9 +122,11 @@ pub struct Trip { pub route_id: String, pub trip_id: String, pub direction_id: i64, + pub tracking_data: TripTracking, pub schedule: Vec } + pub async fn get_schedule_by_route_id( id: String, transaction: &mut Transaction<'_, Postgres>, @@ -133,7 +168,7 @@ pub async fn get_schedule_by_route_id( let mut sched_groups: HashMap> = HashMap::new(); for row in rows { - let mut arr = match sched_groups.get_mut(&row.trip_id) { + let arr = match sched_groups.get_mut(&row.trip_id) { Some(x) => x, None => { sched_groups.insert(row.trip_id.clone(), Vec::new()); @@ -159,7 +194,8 @@ pub async fn get_schedule_by_route_id( trip_id: group.0, route_id: group.1[0].route_id.clone(), schedule: group.1.clone(), - direction_id: group.1[0].direction_id.clone() + direction_id: group.1[0].direction_id.clone(), + tracking_data: TripTracking::Untracked }); } diff --git a/api/src/main.rs b/api/src/main.rs index 086e82f..46f43de 100644 --- a/api/src/main.rs +++ b/api/src/main.rs @@ -2,18 +2,22 @@ use actix_web::{get, web::{self, Data}, App, HttpResponse, HttpServer, Responder use chrono::TimeDelta; use database::{get_direction_by_route_id, get_nta_by_stop_id, get_schedule_by_route_id}; use env_logger::Env; -use libseptastic::{direction::Direction}; -use database::{Trip, StopSchedule}; +use libseptastic::{direction::Direction, route::RouteType}; +use database::{Trip}; use log::*; use dotenv::dotenv; +use services::trip_tracking::{self, TripTracking}; use std::{cmp::Ordering, collections::BTreeMap, sync::Arc}; use askama::Template; -use serde::{Serialize}; +use serde::{Serialize, Deserialize}; +use crate::TripTracking::Tracked; mod database; +mod services; struct AppState { - database: ::sqlx::postgres::PgPool + database: ::sqlx::postgres::PgPool, + trip_tracking_service: services::trip_tracking::TripTrackingService } @@ -30,11 +34,13 @@ pub struct TimetableStopRow { pub times: Vec>, // one per trip, None if trip doesn't stop } + #[derive(Debug, Serialize)] pub struct TimetableDirection { pub direction: Direction, pub trip_ids: Vec, // column headers - pub rows: Vec, // one per unique stop + pub tracking_data: Vec, + pub rows: Vec } pub fn build_timetables( @@ -48,20 +54,26 @@ pub fn build_timetables( .iter() .filter(|trip| trip.direction_id == direction.direction_id) .collect(); -direction_trips.sort_by_key(|trip| { - trip.schedule - .iter() - .filter_map(|s| Some(s.arrival_time)) - .min() - .unwrap_or(i64::MAX) -}); + + direction_trips.sort_by_key(|trip| { + trip.schedule + .iter() + .filter_map(|s| Some(s.arrival_time)) + .min() + .unwrap_or(i64::MAX) + }); let trip_ids: Vec = direction_trips .iter() .map(|t| t.trip_id.clone()) .collect(); - // Map of stop_id -> (stop_sequence, Vec>) + let live_trips: Vec = direction_trips + .iter() + .map(|t| t.tracking_data.clone()) + .collect(); + + let mut stop_map: BTreeMap>)> = BTreeMap::new(); for (trip_index, trip) in direction_trips.iter().enumerate() { @@ -95,11 +107,17 @@ direction_trips.sort_by_key(|trip| { } }); + assert!(trip_ids.len() == live_trips.len()); + for row in &rows { + assert!(row.times.len() == live_trips.len()); + } + results.push(TimetableDirection { - direction: direction.clone(), + direction: direction.clone(), trip_ids, rows, + tracking_data: live_trips }); } @@ -138,12 +156,24 @@ struct ContentTemplate { struct RouteTemplate { route: libseptastic::route::Route, directions: Vec, - timetables: Vec + timetables: Vec, + filter_stops: Option> +} + +#[derive(Serialize, Deserialize)] +struct RouteResponse { + route: libseptastic::route::Route, + directions: Vec, + schedule: Vec } #[derive(askama::Template)] #[template(path = "routes.html")] struct RoutesTemplate { + rr_routes: Vec, + subway_routes: Vec, + trolley_routes: Vec, + bus_routes: Vec } #[derive(askama::Template)] @@ -152,15 +182,37 @@ struct IndexTemplate { } #[get("/routes")] -async fn get_routes() -> impl Responder { +async fn get_routes_html(state: Data>) -> impl Responder { + + let mut all_routes = database::get_all_routes(&mut state.database.begin().await.unwrap()).await.unwrap(); + all_routes.sort_by(|x, y| { + if let Ok(x_p) = x.id.parse::() { + if let Ok(y_p) = y.id.parse::() { + return if y_p > x_p { Ordering::Less } else { Ordering::Greater }; + } + } + + return if y.id > x.id { Ordering::Less } else { Ordering::Greater }; + }); HttpResponse::Ok().body(ContentTemplate { page_title: None, page_desc: None, - content: RoutesTemplate {} + content: RoutesTemplate { + rr_routes: all_routes.clone().into_iter().filter(|x| x.route_type == RouteType::RegionalRail).collect(), + subway_routes: all_routes.clone().into_iter().filter(|x| x.route_type == RouteType::SubwayElevated).collect(), + trolley_routes: all_routes.clone().into_iter().filter(|x| x.route_type == RouteType::Trolley).collect(), + bus_routes: all_routes.into_iter().filter(|x| x.route_type == RouteType::TracklessTrolley || x.route_type == RouteType::Bus).collect(), + } }.render().unwrap()) } +#[get("/routes.json")] +async fn get_routes_json(state: Data>) -> impl Responder { + let all_routes = database::get_all_routes(&mut state.database.begin().await.unwrap()).await.unwrap(); + HttpResponse::Ok().json(all_routes) +} + #[get("/")] async fn get_index() -> impl Responder { @@ -171,20 +223,34 @@ async fn get_index() -> impl Responder { }.render().unwrap()) } +#[derive(Debug, Deserialize)] + pub struct MyQueryParams { + #[serde(default)] // Optional: handle missing parameters with a default value + stops: Option, + } + #[get("/route/{route_id}")] -async fn get_route(state: Data>, path: web::Path<(String)>) -> impl Responder { +async fn get_route(state: Data>, info: web::Query, path: web::Path) -> impl Responder { + let mut fils: Option> = None; + if let Some (stops_v) = info.stops.clone() { + let mut items = Vec::new(); + + for sid in stops_v.split(",") { + items.push(sid.parse::().unwrap()); + } + fils = Some(items); + } let route_id = path.into_inner(); - let route_r = get_route_by_id(route_id.clone(), state.clone()).await; - let directions = get_direction_by_route_id(route_id.clone(), &mut state.database.begin().await.unwrap()).await.unwrap(); - let trips = get_schedule_by_route_id(route_id, &mut state.database.begin().await.unwrap()).await.unwrap(); - if let Ok(route) = route_r { + let route_info_r = get_route_info(route_id, state).await; + if let Ok(route_info) = route_info_r { HttpResponse::Ok().body(ContentTemplate { page_title: None, page_desc: None, content: RouteTemplate { - route, - directions: directions.clone(), - timetables: build_timetables(directions.as_slice(), trips.as_slice()) + route: route_info.route, + directions: route_info.directions.clone(), + timetables: build_timetables(route_info.directions.as_slice(), route_info.schedule.as_slice()), + filter_stops: fils.clone() } }.render().unwrap()) } else { @@ -192,19 +258,35 @@ async fn get_route(state: Data>, path: web::Path<(String)>) -> imp } } -#[get("/api/route/{route_id}")] -async fn api_get_route(state: Data>, path: web::Path<(String)>) -> impl Responder { + +async fn get_route_info(route_id: String, state: Data>) -> ::anyhow::Result { + let route = get_route_by_id(route_id.clone(), state.clone()).await?; + let directions = get_direction_by_route_id(route_id.clone(), &mut state.database.begin().await?).await?; + let mut trips = get_schedule_by_route_id(route_id.clone(), &mut state.database.begin().await?).await?; + + state.trip_tracking_service.annotate_trips(&mut trips); + + Ok(RouteResponse{ + route, + directions, + schedule: trips + }) +} + + +#[get("/route/{route_id}.json")] +async fn api_get_route(state: Data>, path: web::Path) -> impl Responder { let route_id = path.into_inner(); - let route_r = get_route_by_id(route_id, state).await; - if let Ok(route) = route_r { - HttpResponse::Ok().json(route) + let route_info_r = get_route_info(route_id, state).await; + if let Ok(route_info) = route_info_r { + HttpResponse::Ok().json(route_info) } else { HttpResponse::InternalServerError().body("Error") } } #[get("/api/route/{route_id}/schedule")] -async fn api_get_schedule(state: Data>, path: web::Path<(String)>) -> impl Responder { +async fn api_get_schedule(state: Data>, path: web::Path) -> impl Responder { let route_id = path.into_inner(); let route_r = get_schedule_by_route_id(route_id, &mut state.database.begin().await.unwrap()).await; if let Ok(route) = route_r { @@ -215,7 +297,7 @@ async fn api_get_schedule(state: Data>, path: web::Path<(String)>) } #[get("/api/stop/{stop_id}/nta")] -async fn api_get_nta(state: Data>, path: web::Path<(String)>) -> impl Responder { +async fn api_get_nta(state: Data>, path: web::Path) -> impl Responder { let route_id = path.into_inner().split(',') .map(|s| s.parse::()) .collect::, _>>().unwrap(); let route_r = get_nta_by_stop_id(route_id, chrono::Utc::now(), chrono::Utc::now() + TimeDelta::minutes(30), &mut state.database.begin().await.unwrap()).await; @@ -244,10 +326,15 @@ async fn main() -> ::anyhow::Result<()> { .connect(&connection_string) .await?; + let tt_service = trip_tracking::TripTrackingService::new(); + tt_service.start(); + let state = Arc::new(AppState { - database: pool + database: pool, + trip_tracking_service: tt_service }); + HttpServer::new(move || { App::new() .wrap(actix_cors::Cors::permissive()) @@ -256,7 +343,8 @@ async fn main() -> ::anyhow::Result<()> { .service(api_get_schedule) .service(api_get_nta) .service(get_route) - .service(get_routes) + .service(get_routes_json) + .service(get_routes_html) .service(get_index) .service(actix_files::Files::new("/assets", "./assets")) }) diff --git a/api/src/services/mod.rs b/api/src/services/mod.rs new file mode 100644 index 0000000..dc3fd85 --- /dev/null +++ b/api/src/services/mod.rs @@ -0,0 +1 @@ +pub mod trip_tracking; diff --git a/api/src/services/trip_tracking.rs b/api/src/services/trip_tracking.rs new file mode 100644 index 0000000..026b664 --- /dev/null +++ b/api/src/services/trip_tracking.rs @@ -0,0 +1,161 @@ +use serde_json::Value; +use serde::de; +use std::sync::{Arc, Mutex}; +use std::thread; +use std::collections::HashMap; +use std::time::Duration; +use log::{error, info}; +use serde::{Serialize, Deserialize, Deserializer}; + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub enum TripTracking { + Tracked(LiveTrip), + Untracked, + Cancelled +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct LiveTrip { + pub delay: f64, + pub next_stop_id: Option, + pub timestamp: i64, + pub vehicle_id: Option +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct LiveTripJson { + pub route_id: String, + pub trip_id: String, + pub service_id: Option, + pub trip_headsign: String, + pub direction_id: i64, + #[serde(deserialize_with = "de_numstr")] + pub block_id: String, + pub start_time: Option, + pub end_time: Option, + pub delay: f64, + pub status: String, + pub lat: Option, + pub lon: Option, + #[serde(deserialize_with = "de_numstrflo")] + pub heading: Option, + #[serde(deserialize_with = "de_numstro")] + pub next_stop_id: Option, + pub next_stop_name: Option, + pub next_stop_sequence: Option, + pub seat_availability: Option, + pub vehicle_id: Option, + pub timestamp: i64 +} + +const HOST: &str = "https://www3.septa.org"; + +struct TripTrackingServiceState { + pub tracking_data: HashMap:: +} + +pub struct TripTrackingService { + state: Arc> +} + +impl TripTrackingService { + const UPDATE_SECONDS: u64 = 75; + + pub fn new() -> Self { + Self { + state: Arc::new(Mutex::new(TripTrackingServiceState{ tracking_data: HashMap::new()})) + } + } + + pub fn start(&self) { + let cloned_state = Arc::clone(&self.state); + thread::spawn( move || { + loop { + info!("started"); + + let clonedx_state = Arc::clone(&cloned_state); + let res = Self::update_live_trips(clonedx_state); + + match res { + Err(err) => { + error!("{}", err); + } + _ => {} + } + + thread::sleep(Duration::from_secs(Self::UPDATE_SECONDS)); + } + }); + } + + pub fn annotate_trips(&self, trips: &mut Vec) { + for trip in trips { + trip.tracking_data = match self.state.lock().unwrap().tracking_data.get(&trip.trip_id.clone()){ + Some(x) => x.clone(), + None => TripTracking::Untracked + }; + } + } + + fn update_live_trips(service: Arc>) -> anyhow::Result<()> { + let mut new_map: HashMap = HashMap::new(); + let live_tracks = reqwest::blocking::get(format!("{}/api/v2/trips/", HOST))?.json::>()?; + + for live_track in live_tracks { + let track: TripTracking = { + if live_track.status == "NO GPS" { + TripTracking::Untracked + } else if live_track.status == "CANCELED" { + TripTracking::Cancelled + } else { + TripTracking::Tracked( + LiveTrip { + delay: live_track.delay, + next_stop_id: live_track.next_stop_id, + timestamp: live_track.timestamp, + vehicle_id: live_track.vehicle_id + } + ) + } + }; + + if let TripTracking::Cancelled = track { + } + + new_map.insert( + live_track.trip_id.clone(), + track + ); + } + + info!("populated tracking data with {} entries", new_map.len()); + (service.lock().unwrap()).tracking_data = new_map; + + Ok(()) + } +} + +fn de_numstr<'de, D: Deserializer<'de>>(deserializer: D) -> Result { + Ok(match Value::deserialize(deserializer)? { + Value::String(s) => s, + Value::Number(num) => num.as_i64().ok_or(de::Error::custom("Invalid number"))?.to_string(), + _ => return Err(de::Error::custom("wrong type")) + }) +} + +fn de_numstro<'de, D: Deserializer<'de>>(deserializer: D) -> Result, D::Error> { + Ok(match Value::deserialize(deserializer)? { + Value::String(s) => Some(s), + Value::Number(num) => Some(num.as_i64().ok_or(de::Error::custom("Invalid number"))?.to_string()), + _ => None + }) +} + +fn de_numstrflo<'de, D: Deserializer<'de>>(deserializer: D) -> Result, D::Error> { + Ok(match Value::deserialize(deserializer)? { + Value::String(s) => Some(s), + Value::Number(num) => Some(num.as_f64().ok_or(de::Error::custom("Invalid number"))?.to_string()), + _ => None + }) +} + diff --git a/api/templates/layout.html b/api/templates/layout.html index f4ffa25..d5e98f6 100644 --- a/api/templates/layout.html +++ b/api/templates/layout.html @@ -27,6 +27,10 @@
+
+ This website is not run by SEPTA. Data may be inaccurate. +
+