cleanup and filter support
All checks were successful
Create and publish a Docker image / build-and-push-image (push) Successful in 39s

This commit is contained in:
Nicholas Orlowsky 2026-02-21 15:26:48 -05:00
parent 6773e6ae30
commit 3f68335eb4
No known key found for this signature in database
GPG key ID: A9F3BA4C0AA7A70B
62 changed files with 2364 additions and 1901 deletions

View file

@ -0,0 +1,10 @@
use crate::{
session_middleware::{SessionResponder, SessionResponse},
templates::IndexTemplate,
};
use actix_web::{Responder, get};
#[get("/")]
async fn get_index_html(resp: SessionResponse) -> impl Responder {
resp.respond("Home", "SEPTASTIC Home Page", IndexTemplate {})
}

View file

@ -0,0 +1,3 @@
pub mod index;
pub mod route;
pub mod stop;

View file

@ -0,0 +1,137 @@
use crate::{
AppState,
session_middleware::{SessionResponder, SessionResponse},
};
use actix_web::{
HttpResponse, Responder, get,
web::{self, Data},
};
use libseptastic::{route::RouteType, stop_schedule::Trip};
use serde::{Deserialize, Serialize};
use std::{collections::HashSet, sync::Arc};
#[derive(Debug, Deserialize, Clone)]
struct RouteQueryParams {
#[serde(default)] // Optional: handle missing parameters with a default value
stops: Option<String>,
}
#[derive(Serialize, Deserialize)]
struct RouteResponse {
pub route: libseptastic::route::Route,
pub directions: Vec<libseptastic::direction::Direction>,
pub schedule: Vec<Trip>,
}
async fn get_route_info(
route_id: String,
state: Data<Arc<AppState>>,
) -> ::anyhow::Result<RouteResponse> {
let route = state.gtfs_service.get_route(route_id.clone())?;
let mut trips = state.gtfs_service.get_schedule(route_id)?;
let mut seen = HashSet::new();
let directions: Vec<_> = trips
.iter()
.map(|x| x.direction.clone())
.filter(|dir| seen.insert(dir.direction.to_string()))
.collect();
state.trip_tracking_service.annotate_trips(&mut trips).await;
Ok(RouteResponse {
route,
directions,
schedule: trips,
})
}
#[get("/routes")]
async fn get_routes_html(state: Data<Arc<AppState>>, resp: SessionResponse) -> impl Responder {
let all_routes: Vec<libseptastic::route::Route> = state.gtfs_service.get_routes();
let rr_routes = all_routes
.clone()
.into_iter()
.filter(|x| x.route_type == RouteType::RegionalRail)
.collect();
let subway_routes = all_routes
.clone()
.into_iter()
.filter(|x| x.route_type == RouteType::SubwayElevated)
.collect();
let trolley_routes = all_routes
.clone()
.into_iter()
.filter(|x| x.route_type == RouteType::Trolley)
.collect();
let bus_routes = all_routes
.into_iter()
.filter(|x| x.route_type == RouteType::TracklessTrolley || x.route_type == RouteType::Bus)
.collect();
resp.respond(
"Routes",
"All routes",
crate::templates::RoutesTemplate {
rr_routes,
subway_routes,
trolley_routes,
bus_routes,
},
)
}
#[get("/routes.json")]
async fn get_routes_json(state: Data<Arc<AppState>>) -> impl Responder {
let all_routes: Vec<libseptastic::route::Route> = state.gtfs_service.get_routes();
HttpResponse::Ok().json(all_routes)
}
#[get("/route/{route_id}")]
async fn get_route_html(
state: Data<Arc<AppState>>,
info: web::Query<RouteQueryParams>,
path: web::Path<String>,
resp: SessionResponse,
) -> impl Responder {
let mut filters: Option<Vec<String>> = None;
if let Some(stops_v) = info.stops.clone() {
let mut items = Vec::new();
for sid in stops_v.split(",") {
items.push(String::from(sid));
}
filters = Some(items);
}
let route_id = path;
let route_info_r = get_route_info(route_id.clone(), state.clone()).await;
if let Ok(route_info) = route_info_r {
let timetables =
crate::templates::build_timetables(route_info.directions, route_info.schedule);
resp.respond(
format!("Schedules for {}", route_id.clone()).as_str(),
format!("Schedule information for {}", route_id.clone()).as_str(),
crate::templates::RouteTemplate {
route: route_info.route,
timetables,
filter_stops: filters.clone(),
},
)
} else {
HttpResponse::InternalServerError().body("")
}
}
#[get("/route/{route_id}.json")]
async fn get_route_json(state: Data<Arc<AppState>>, path: web::Path<String>) -> impl Responder {
let route_id = path.into_inner();
let route_info_r = get_route_info(route_id, state).await;
if let Ok(route_info) = route_info_r {
HttpResponse::Ok().json(route_info)
} else {
HttpResponse::InternalServerError().body("Error")
}
}

313
web/src/controllers/stop.rs Normal file
View file

@ -0,0 +1,313 @@
use crate::{
AppState,
session_middleware::{SessionResponder, SessionResponse},
templates::TripPerspective,
};
use actix_web::{
HttpResponse, Responder, get, post, web::{self, Data}
};
use askama::Template;
use chrono::{TimeDelta, Timelike};
use chrono_tz::America::New_York;
use libseptastic::{stop::Stop, stop_schedule::{SeatAvailability, Trip, TripTracking}};
use serde::{Deserialize, Serialize};
use serde_qs::actix::QsQuery;
use std::{
collections::{BTreeSet, HashSet},
sync::Arc,
};
async fn get_trip_perspective_for_stop(
state: &Data<Arc<AppState>>,
stop: &libseptastic::stop::Stop,
filter: &StopFilter,
) -> Vec<TripPerspective> {
let routes: Vec<libseptastic::route::Route> = state
.gtfs_service
.get_routes_at_stop(&stop.id)
.iter()
.filter_map(|route| match state.gtfs_service.get_route(route.clone()) {
Ok(route) => Some(route),
Err(_) => None,
})
.collect();
let route_ids: HashSet<String> = routes.iter().map(|route| route.id.clone()).collect();
let mut trips = state
.gtfs_service
.get_all_trips()
.iter()
.filter_map(|trip| {
if route_ids.contains(trip.0) {
Some(trip.1.clone())
} else {
None
}
})
.flatten()
.collect();
state.trip_tracking_service.annotate_trips(&mut trips).await;
let now_utc = chrono::Utc::now();
let now = now_utc.with_timezone(&New_York);
let naive_time = now.time();
let cur_time = i64::from(naive_time.num_seconds_from_midnight());
let mut filtered_trips: Vec<TripPerspective> = trips
.iter()
.filter_map(|trip| {
// poor midnight handling? -- going to offset by 4 hours, assume next 'schedule day'
// starts at 4a. Still may miss some trips. Oh well!
if !trip.calendar_day.is_calendar_active_for_date(
&now.naive_local()
.checked_add_signed(TimeDelta::hours(-4))?
.date(),
) {
return None;
}
let stop_sched: Vec<_> = trip
.schedule
.iter()
.filter(|stop_schedule| {
if stop_schedule.stop.id != stop.id {
return false;
}
match &trip.tracking_data {
libseptastic::stop_schedule::TripTracking::Tracked(live) => {
let actual_arrival_time = stop_schedule.get_arrival_time(&live);
return (actual_arrival_time - cur_time) > -(1 * 60)
&& (actual_arrival_time - cur_time) < (60 * 60);
}
libseptastic::stop_schedule::TripTracking::Untracked => {
return (stop_schedule.arrival_time - cur_time) > -(3 * 60)
&& (stop_schedule.arrival_time - cur_time) < (60 * 60);
}
libseptastic::stop_schedule::TripTracking::Cancelled => {
return false;
}
}
})
.filter_map(|ss| Some(ss.clone()))
.collect();
if stop_sched.len() > 0 && filter.trip_matches(trip) {
Some(TripPerspective {
perspective_stop: stop_sched.first().unwrap().clone(),
trip: trip.clone(),
})
} else {
None
}
})
.collect();
filtered_trips.sort_by_key(|f| match &f.trip.tracking_data {
TripTracking::Tracked(live) => f.perspective_stop.get_arrival_time(&live),
_ => f.perspective_stop.arrival_time,
});
filtered_trips
}
#[derive(Debug, Serialize, Deserialize, Clone)]
pub struct StopFilter {
pub routes: Option<HashSet<String>>,
pub live_tracked: Option<bool>,
pub scheduled: Option<bool>,
pub crowding: Option<HashSet<SeatAvailability>>,
pub unknown_crowding: Option<bool>,
}
impl StopFilter {
pub fn trip_matches(&self, trip: &Trip) -> bool {
let unspecified = self.live_tracked == None && self.scheduled == None;
let unknown_crowding = self.unknown_crowding.unwrap_or(true);
if (Some(false) == self.scheduled || (!unspecified && self.scheduled == None))
&& match trip.tracking_data {
TripTracking::Untracked => true,
_ => false,
}
{
return false;
}
if (Some(false) == self.live_tracked || (!unspecified && self.live_tracked == None))
&& match trip.tracking_data {
TripTracking::Tracked(_) => true,
_ => false,
}
{
return false;
}
if let Some(routes) = &self.routes {
let route_str = format!("{},{}", trip.route.id, trip.direction.direction);
if !routes.contains(&route_str) {
return false;
}
}
if let Some(crowding) = &self.crowding {
if let TripTracking::Tracked(live_trip) = &trip.tracking_data {
if let Some(seat_availability) = &live_trip.seat_availability {
if !crowding.contains(seat_availability) {
return false;
}
} else {
return unknown_crowding;
}
} else {
return unknown_crowding;
}
}
return true;
}
}
#[get("/stops")]
async fn get_stops_html(state: Data<Arc<AppState>>, resp: SessionResponse) -> impl Responder {
let stops = state
.gtfs_service
.get_all_stops()
.iter()
.filter_map(|f| {
if f.1.id.contains("ANNOTATED") {
Some(libseptastic::stop::Stop::clone(f.1))
} else {
None
}
})
.collect();
resp.respond(
"Stops",
"Stops",
crate::templates::StopsTemplate { tc_stops: stops },
)
}
#[derive(Deserialize)]
struct StringSearch {
search: String
}
#[post("/stops/search")]
async fn search_stops_html(state: Data<Arc<AppState>>, params: web::Form<StringSearch>) -> impl Responder {
let results_limit = 25;
let search_str = params.search.to_lowercase();
let stops: Vec<Stop> = state
.gtfs_service
.get_all_stops()
.iter()
.filter_map(|f| {
// Non-ideal
if f.1.name.to_lowercase().contains(&search_str) ||
f.1.id.to_lowercase().contains(&search_str) {
Some(libseptastic::stop::Stop::clone(f.1))
} else {
None
}
})
.collect();
HttpResponse::Ok().body(crate::templates::StopSearchResults {
results: if stops.len() > results_limit {
stops[..results_limit].to_vec()
} else {
stops
}
}.render().unwrap())
}
#[get("/stop/{stop_id}/table")]
async fn get_stop_table_html(
state: Data<Arc<AppState>>,
path: web::Path<String>,
query: QsQuery<StopFilter>,
) -> impl Responder {
let stop_id = path;
if let Some(stop) = state.gtfs_service.get_stop_by_id(&stop_id) {
let filtered_trips = get_trip_perspective_for_stop(&state, &stop, &query).await;
let now_utc = chrono::Utc::now();
let now = now_utc.with_timezone(&New_York);
let naive_time = now.time();
let cur_time = i64::from(naive_time.num_seconds_from_midnight());
let query_str = serde_qs::Config::new()
.array_format(serde_qs::ArrayFormat::Unindexed)
.serialize_string(&query.0.clone())
.unwrap();
HttpResponse::Ok()
.append_header((
"HX-Replace-Url",
format!("/stop/{}?{}", stop_id, &query_str).as_str(),
))
.body(
crate::templates::StopTableTemplate {
trips: filtered_trips,
current_time: cur_time,
query_str,
stop_id: stop_id.to_string(),
}
.render()
.unwrap(),
)
} else {
HttpResponse::InternalServerError().body("Error")
}
}
#[get("/stop/{stop_id}")]
async fn get_stop_html(
state: Data<Arc<AppState>>,
path: web::Path<String>,
query: QsQuery<StopFilter>,
resp: SessionResponse,
) -> impl Responder {
let stop_id = path;
if let Some(stop) = state.gtfs_service.get_stop_by_id(&stop_id) {
let routes: Vec<libseptastic::route::Route> = state
.gtfs_service
.get_routes_at_stop(&stop.id)
.iter()
.filter_map(|route| match state.gtfs_service.get_route(route.clone()) {
Ok(route) => Some(route),
Err(_) => None,
})
.collect();
let filtered_trips = get_trip_perspective_for_stop(&state, &stop, &query).await;
let now_utc = chrono::Utc::now();
let now = now_utc.with_timezone(&New_York);
let naive_time = now.time();
let cur_time = i64::from(naive_time.num_seconds_from_midnight());
resp.respond(
stop.name.as_str(),
"Stop information",
crate::templates::StopTemplate {
stop: stop.clone(),
routes: BTreeSet::from_iter(routes.into_iter()),
trips: filtered_trips,
current_time: cur_time,
filters: Some(query.0.clone()),
query_str: serde_qs::Config::new()
.array_format(serde_qs::ArrayFormat::Unindexed)
.serialize_string(&query.0)
.unwrap(),
},
)
} else {
HttpResponse::InternalServerError().body("Error")
}
}

179
web/src/database.rs Normal file
View file

@ -0,0 +1,179 @@
use std::collections::HashMap;
use sqlx::{Postgres, Transaction};
use libseptastic::{stop_schedule::{Trip, TripTracking, StopSchedule}};
pub async fn get_route_by_id(
id: String,
transaction: &mut Transaction<'_, Postgres>,
) -> ::anyhow::Result<libseptastic::route::Route> {
let row = sqlx::query!(
r#"SELECT
id,
name,
short_name,
color_hex,
route_type as "route_type: libseptastic::route::RouteType"
FROM
septa_routes
WHERE
id = $1
;"#,
id
)
.fetch_one(&mut **transaction)
.await?;
return Ok(libseptastic::route::Route {
name: row.name,
short_name: row.short_name,
color_hex: row.color_hex,
route_type: row.route_type,
id: row.id,
});
}
pub async fn get_all_routes(
transaction: &mut Transaction<'_, Postgres>,
) -> ::anyhow::Result<Vec<libseptastic::route::Route>> {
let rows = sqlx::query!(
r#"SELECT
id,
name,
short_name,
color_hex,
route_type as "route_type: libseptastic::route::RouteType"
FROM
septa_routes
ORDER BY
CASE
WHEN id ~ '^[0-9]+$' THEN CAST(id AS INT)
ELSE NULL
END ASC,
id ASC;
;"#
)
.fetch_all(&mut **transaction)
.await?;
let mut routes = Vec::new();
for row in rows {
routes.push(libseptastic::route::Route {
name: row.name,
short_name: row.short_name,
color_hex: row.color_hex,
route_type: row.route_type,
id: row.id,
});
}
return Ok(routes);
}
pub async fn get_direction_by_route_id(
id: String,
transaction: &mut Transaction<'_, Postgres>,
) -> ::anyhow::Result<Vec<libseptastic::direction::Direction>> {
let rows = sqlx::query!(
r#"SELECT
route_id,
direction_id,
direction as "direction: libseptastic::direction::CardinalDirection",
direction_destination
FROM
septa_directions
WHERE
route_id = $1
;"#,
id
)
.fetch_all(&mut **transaction)
.await?;
let mut res = Vec::new();
for row in rows {
res.push(libseptastic::direction::Direction{
route_id: row.route_id,
direction_id: row.direction_id,
direction: row.direction,
direction_destination: row.direction_destination
});
}
return Ok(res);
}
pub async fn get_schedule_by_route_id(
id: String,
transaction: &mut Transaction<'_, Postgres>,
) -> ::anyhow::Result<Vec<Trip>> {
let schedule_day = chrono::Utc::now().with_timezone(&chrono_tz::America::New_York);
let schedule_day_str = schedule_day.format("%Y%m%d").to_string();
let rows = sqlx::query!(
r#"SELECT
septa_stop_schedules.route_id,
septa_stops.name as stop_name,
trip_id,
septa_stop_schedules.service_id,
septa_stop_schedules.direction_id,
arrival_time,
stop_id,
stop_sequence
FROM
septa_stop_schedules
INNER JOIN septa_stops
ON septa_stops.id = septa_stop_schedules.stop_id
INNER JOIN septa_schedule_days
ON septa_schedule_days.date = $2
AND
septa_schedule_days.service_id = septa_stop_schedules.service_id
WHERE
septa_stop_schedules.route_id = $1
;"#,
id.clone(),
schedule_day_str.clone()
)
.fetch_all(&mut **transaction)
.await?;
let mut sched_groups: HashMap<String, Vec<StopSchedule>> = HashMap::new();
for row in rows {
let arr = match sched_groups.get_mut(&row.trip_id) {
Some(x) => x,
None => {
sched_groups.insert(row.trip_id.clone(), Vec::new());
sched_groups.get_mut(&row.trip_id).unwrap()
}
};
arr.push(StopSchedule {
route_id: row.route_id,
stop_name: row.stop_name,
trip_id: row.trip_id,
service_id: row.service_id,
direction_id: row.direction_id,
arrival_time: row.arrival_time,
stop_id: row.stop_id,
stop_sequence: row.stop_sequence
});
}
let mut res = Vec::new();
for group in sched_groups {
res.push(Trip{
trip_id: group.0,
route_id: group.1[0].route_id.clone(),
schedule: group.1.clone(),
direction_id: group.1[0].direction_id.clone(),
tracking_data: TripTracking::Untracked
});
}
return Ok(res);
}

67
web/src/main.rs Normal file
View file

@ -0,0 +1,67 @@
use actix_web::{App, HttpServer, web::Data};
use dotenv::dotenv;
use env_logger::Env;
use log::*;
use services::gtfs_pull;
use std::{fs::File, io::Read, sync::Arc};
mod controllers;
mod services;
mod session_middleware;
mod templates;
pub struct AppState {
gtfs_service: services::gtfs_pull::GtfsPullService,
trip_tracking_service: services::trip_tracking::TripTrackingService,
}
#[tokio::main]
async fn main() -> ::anyhow::Result<()> {
env_logger::init_from_env(Env::default().default_filter_or("septastic_web=info"));
dotenv().ok();
let version: &str = option_env!("CARGO_PKG_VERSION").expect("Expected package version");
info!(
"Starting the SEPTASTIC Server v{} (commit: {})",
version, "NONE"
);
let mut file = File::open("./config.yaml")?;
let mut file_contents = String::new();
file.read_to_string(&mut file_contents)?;
let config_file = serde_yaml::from_str::<gtfs_pull::Config>(file_contents.as_str())?;
let tt_service = services::trip_tracking::TripTrackingService::new().await;
tt_service.start();
let svc = gtfs_pull::GtfsPullService::new(config_file);
svc.start();
svc.wait_for_ready();
let state = Arc::new(AppState {
gtfs_service: svc,
trip_tracking_service: tt_service,
});
HttpServer::new(move || {
App::new()
.wrap(actix_cors::Cors::permissive())
.app_data(Data::new(state.clone()))
.service(controllers::route::get_route_html)
.service(controllers::route::get_route_json)
.service(controllers::route::get_routes_html)
.service(controllers::route::get_routes_json)
.service(controllers::stop::get_stops_html)
.service(controllers::stop::get_stop_html)
.service(controllers::stop::search_stops_html)
.service(controllers::stop::get_stop_table_html)
.service(controllers::index::get_index_html)
.service(actix_files::Files::new("/assets", "./assets"))
})
.bind(("0.0.0.0", 8080))?
.run()
.await?;
Ok(())
}

225
web/src/routing.rs Normal file
View file

@ -0,0 +1,225 @@
use std::{cmp::Ordering, collections::{BTreeSet, HashMap, HashSet}};
use log::info;
use crate::services;
pub struct RoutingNodePointer {
pub stop_id: String,
pub route_id: String,
pub stop_sequence: u64,
pub direction: u64,
pub dest_dist: f64
}
pub struct RoutingNode {
pub stop_id: String,
pub stop_name: String,
pub next_stops_per_routes: HashMap<String, BTreeSet<RoutingNodePointer>>,
pub visited: bool,
pub scratch: i64,
}
impl Ord for RoutingNodePointer {
fn cmp(&self, other: &Self) -> std::cmp::Ordering {
if self.dest_dist > other.dest_dist {
Ordering::Greater
} else {
Ordering::Less
}
}
}
impl PartialOrd for RoutingNodePointer {
fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
if self.dest_dist > other.dest_dist {
Some(Ordering::Greater)
} else {
Some(Ordering::Less)
}
}
}
impl PartialEq for RoutingNodePointer {
fn eq(&self, other: &Self) -> bool {
self.stop_id == other.stop_id
}
}
impl Eq for RoutingNodePointer {
}
struct TripState {
pub used_lines: HashSet<String>
}
#[derive(Clone)]
pub struct Coordinates {
pub lat: f64,
pub lng: f64,
}
pub type RoutingGraph = HashMap::<String, RoutingNode>;
pub fn haversine_distance(lat1: f64, lon1: f64, lat2: f64, lon2: f64) -> f64 {
let r = 6371.0; // Earth's radius in kilometers
let d_lat = (lat2 - lat1).to_radians();
let d_lon = (lon2 - lon1).to_radians();
let lat1_rad = lat1.to_radians();
let lat2_rad = lat2.to_radians();
let a = (d_lat / 2.0).sin().powi(2)
+ lat1_rad.cos() * lat2_rad.cos() * (d_lon / 2.0).sin().powi(2);
let c = 2.0 * a.sqrt().asin();
r * c
}
pub fn get_stops_near(cds: Coordinates,
all_stops: &HashMap<String, libseptastic::stop::Stop>
) -> HashSet<String> {
let near_thresh_km = 0.45;
let mut stops: HashSet<String> = HashSet::new();
for stop_p in all_stops {
let stop = stop_p.1;
let dist = haversine_distance(cds.lat, cds.lng, stop.lat, stop.lng);
if dist.abs() < near_thresh_km {
stops.insert(stop.id.clone());
}
}
stops
}
pub fn get_stop_as_node() {
}
pub fn construct_graph(
dest: Coordinates,
all_stops: &HashMap<String, libseptastic::stop::Stop>,
gtfs_service: &services::gtfs_pull::GtfsPullService
) -> RoutingGraph {
let mut graph = RoutingGraph::new();
let limited_rts = vec!["44", "65", "27", "38", "124", "125", "1"];
for stop_p in all_stops {
let stop = stop_p.1;
let ras = gtfs_service.get_routes_at_stop(&stop.id);
let cont = {
let mut ret = false;
for l_rt in limited_rts.clone() {
if ras.contains(&String::from(l_rt)) {
ret = true;
break;
}
}
ret
};
if !cont {
continue;
}
graph.insert(stop.id.clone(), RoutingNode {
stop_id: stop.id.clone(),
stop_name: stop.name.clone(),
next_stops_per_routes: {
let routes = gtfs_service.get_routes_at_stop(&stop.id);
let mut other_stops = HashMap::<String, BTreeSet<RoutingNodePointer>>::new();
for route in &routes {
let mut stops = gtfs_service.get_stops_by_route(&route);
stops.remove(&stop.id);
let rnps = {
let mut ret = BTreeSet::new();
for stop in &stops {
let stp = all_stops.get(stop).unwrap();
ret.insert(RoutingNodePointer{
dest_dist: haversine_distance(dest.lat, dest.lng, stp.lat, stp.lng),
stop_id: stop.clone(),
route_id: route.clone(),
stop_sequence: 0,
direction: 0
});
}
ret
};
other_stops.insert(route.clone(), rnps);
}
other_stops
},
visited: false,
scratch: 0
});
}
graph
}
pub fn bfs_rts_int(route_id: &String, origin: &String, graph: &RoutingGraph, dests: &HashSet<String>, mut visited: HashSet<String>, max_legs: u8) -> Option<String> {
if max_legs == 0 {
return None;
}
let mut limited_rts = HashSet::new();
for item in vec!["44", "65", "27", "38", "124", "125", "1"] {
limited_rts.insert(item);
}
if !limited_rts.contains(&route_id.as_str()) {
return None;
}
if let Some(origin_stop) = graph.get(origin) {
if dests.contains(origin) {
return Some(format!("[stop {} via rt {}] --> DEST", origin_stop.stop_name, route_id))
}
if visited.contains(origin) {
return None;
}
visited.insert(origin.clone());
for items in &origin_stop.next_stops_per_routes {
if route_id == items.0 {
continue;
}
for rnp in items.1 {
if let Some(rt) = bfs_rts_int(items.0, &rnp.stop_id, graph, dests, visited.clone(), max_legs - 1) {
return Some(format!("[stop {} via rt {}] >>[XFER]>> {}", origin_stop.stop_name, route_id, rt))
}
}
}
}
None
}
pub fn bfs_rts(origin: &String, graph: &RoutingGraph, dests: &HashSet<String>) -> String {
let mut resp = String::new();
if let Some(origin_stop) = graph.get(origin) {
for items in &origin_stop.next_stops_per_routes {
let route_id = items.0;
for rnp in items.1 {
if let Some(rt) = bfs_rts_int(route_id, &rnp.stop_id, graph, dests, HashSet::new(), 3) {
resp += format!("ORIGIN --> [stop {} via rt {}] --> {}\n", origin_stop.stop_name, route_id, rt).as_str();
}
}
}
}
resp
}

View file

@ -0,0 +1,629 @@
use anyhow::anyhow;
use libseptastic::{stop::Platform, stop_schedule::CalendarDay};
use log::{error, info, warn};
use serde::{Deserialize, Serialize};
use std::{
cmp::Ordering,
collections::{HashMap, HashSet, hash_map::Entry},
env,
io::Cursor,
path::PathBuf,
sync::{Arc, Mutex, MutexGuard},
thread,
time::Duration,
};
use zip::ZipArchive;
macro_rules! make_global_id {
($prefix: expr, $id: expr) => {
format!("{}_{}", $prefix, $id)
};
}
#[derive(Serialize, Deserialize, PartialEq, Debug, Clone)]
pub struct GtfsSource {
pub uri: String,
pub subzip: Option<String>,
pub prefix: String,
}
#[derive(Serialize, Deserialize, PartialEq, Debug, Clone)]
pub struct MultiplatformStopConfig {
pub id: String,
pub name: String,
pub platform_station_ids: Vec<String>,
}
#[derive(Serialize, Deserialize, PartialEq, Debug, Clone)]
pub struct StopRenameRule {
pub pattern: String,
pub replace: String,
}
#[derive(Serialize, Deserialize, PartialEq, Debug, Clone)]
pub struct Annotations {
pub multiplatform_stops: Vec<MultiplatformStopConfig>,
pub parent_stop_blacklist: Vec<String>,
pub stop_rename_rules: Vec<StopRenameRule>
}
#[derive(Serialize, Deserialize, PartialEq, Debug)]
pub struct Config {
pub gtfs_zips: Vec<GtfsSource>,
pub annotations: Annotations,
}
#[derive(Clone)]
pub struct GtfsFile {
pub source: GtfsSource,
}
pub struct TransitData {
pub routes: HashMap<String, Arc<libseptastic::route::Route>>,
pub agencies: HashMap<String, libseptastic::agency::Agency>,
pub trips: HashMap<String, Vec<libseptastic::stop_schedule::Trip>>,
pub stops: HashMap<String, Arc<libseptastic::stop::Stop>>,
pub platforms: HashMap<String, Arc<libseptastic::stop::Platform>>,
pub calendar_days: HashMap<String, Arc<libseptastic::stop_schedule::CalendarDay>>,
pub directions: HashMap<String, Vec<Arc<libseptastic::direction::Direction>>>,
// extended lookup methods
pub route_id_by_stops: HashMap<String, HashSet<String>>,
pub stops_by_route_id: HashMap<String, HashSet<String>>,
pub stops_by_platform_id: HashMap<String, Arc<libseptastic::stop::Stop>>,
}
pub struct GtfsPullServiceState {
pub gtfs_files: Vec<GtfsFile>,
pub tmp_dir: PathBuf,
pub ready: bool,
pub annotations: Annotations,
pub transit_data: TransitData,
}
pub struct GtfsPullService {
state: Arc<Mutex<GtfsPullServiceState>>,
}
impl TransitData {
pub fn new() -> Self {
return TransitData {
routes: HashMap::new(),
agencies: HashMap::new(),
trips: HashMap::new(),
stops: HashMap::new(),
platforms: HashMap::new(),
route_id_by_stops: HashMap::new(),
stops_by_route_id: HashMap::new(),
stops_by_platform_id: HashMap::new(),
calendar_days: HashMap::new(),
directions: HashMap::new(),
};
}
}
impl GtfsPullService {
const UPDATE_SECONDS: u64 = 3600 * 24;
const READYSTATE_CHECK_MILLISECONDS: u64 = 500;
pub fn new(config: Config) -> Self {
Self {
state: Arc::new(Mutex::new(GtfsPullServiceState {
gtfs_files: config
.gtfs_zips
.iter()
.map(|f| GtfsFile { source: f.clone() })
.collect(),
tmp_dir: env::temp_dir(),
annotations: config.annotations.clone(),
ready: false,
transit_data: TransitData::new(),
})),
}
}
pub fn wait_for_ready(&self) {
while !(self.state.lock().unwrap()).ready {
thread::sleep(Duration::from_millis(Self::READYSTATE_CHECK_MILLISECONDS));
}
}
pub fn start(&self) {
let cloned_state = Arc::clone(&self.state);
thread::spawn(move || {
loop {
let recloned_state = Arc::clone(&cloned_state);
let res = Self::update_gtfs_data(recloned_state);
match res {
Err(err) => {
error!("{}", err);
}
_ => {}
}
thread::sleep(Duration::from_secs(Self::UPDATE_SECONDS));
}
});
}
pub fn get_routes(&self) -> Vec<libseptastic::route::Route> {
let l_state = self.state.lock().unwrap();
l_state
.transit_data
.routes
.iter()
.map(|r| libseptastic::route::Route::clone(r.1))
.collect()
}
pub fn get_route(&self, route_id: String) -> anyhow::Result<libseptastic::route::Route> {
let l_state = self.state.lock().unwrap();
if let Some(route) = l_state.transit_data.routes.get(&route_id) {
Ok(libseptastic::route::Route::clone(route))
} else {
Err(anyhow!(""))
}
}
pub fn get_all_routes(&self) -> HashMap<String, libseptastic::route::Route> {
let l_state = self.state.lock().unwrap();
l_state
.transit_data
.routes
.iter()
.map(|r| (r.0.clone(), libseptastic::route::Route::clone(r.1)))
.collect()
}
pub fn get_all_stops(&self) -> HashMap<String, Arc<libseptastic::stop::Stop>> {
let l_state = self.state.lock().unwrap();
l_state.transit_data.stops.clone()
}
pub fn get_all_trips(&self) -> HashMap<String, Vec<libseptastic::stop_schedule::Trip>> {
let l_state = self.state.lock().unwrap();
l_state.transit_data.trips.clone()
}
pub fn get_routes_at_stop(&self, id: &String) -> HashSet<String> {
let l_state = self.state.lock().unwrap();
l_state
.transit_data
.route_id_by_stops
.get(id)
.unwrap_or(&HashSet::new())
.clone()
}
pub fn get_stops_by_route(&self, id: &String) -> HashSet<String> {
let l_state = self.state.lock().unwrap();
l_state
.transit_data
.stops_by_route_id
.get(id)
.unwrap_or(&HashSet::new())
.clone()
}
pub fn get_stop_by_id(&self, id: &String) -> Option<libseptastic::stop::Stop> {
let l_state = self.state.lock().unwrap();
match l_state.transit_data.stops.get(id) {
Some(stop) => Some(libseptastic::stop::Stop::clone(stop)),
None => None,
}
}
pub fn get_schedule(
&self,
route_id: String,
) -> anyhow::Result<Vec<libseptastic::stop_schedule::Trip>> {
let l_state = self.state.lock().unwrap();
if let Some(trips) = l_state.transit_data.trips.get(&route_id) {
Ok(trips.clone())
} else {
Err(anyhow!(""))
}
}
fn postprocess_stops(state: &mut MutexGuard<'_, GtfsPullServiceState>) -> anyhow::Result<()> {
for annotated_stop in state.annotations.multiplatform_stops.clone() {
let global_id = make_global_id!("ANNOTATED", annotated_stop.id.clone());
let stop = Arc::new(libseptastic::stop::Stop {
id: global_id.clone(),
name: annotated_stop.name.clone(),
platforms: libseptastic::stop::StopType::MultiPlatform(
annotated_stop
.platform_station_ids
.iter()
.map(|platform_id| {
info!(
"Folding {} stop into stop {} as platform",
platform_id.clone(),
annotated_stop.id.clone()
);
let platform = match state
.transit_data
.stops
.remove(platform_id)
.unwrap()
.platforms
.clone()
{
libseptastic::stop::StopType::SinglePlatform(plat) => Ok(plat),
_ => Err(anyhow!("")),
}
.unwrap();
state
.transit_data
.stops_by_platform_id
.remove(&platform.id)
.unwrap();
platform
})
.collect(),
),
});
state
.transit_data
.stops
.insert(global_id.clone(), stop.clone());
match &stop.platforms {
libseptastic::stop::StopType::MultiPlatform(platforms) => {
for platform in platforms {
state
.transit_data
.stops_by_platform_id
.insert(platform.id.clone(), stop.clone());
}
Ok(())
}
_ => Err(anyhow!("")),
}?
}
Ok(())
}
fn populate_stops(
state: &mut MutexGuard<'_, GtfsPullServiceState>,
prefix: &String,
gtfs: &gtfs_structures::Gtfs,
) -> anyhow::Result<()> {
let mut map: HashMap<String, Vec<String>>= HashMap::new();
for stop in &gtfs.stops {
let global_id = make_global_id!(prefix, stop.1.id.clone());
let platform = Arc::new(Platform {
id: global_id.clone(),
name: stop.1.name.clone().unwrap(),
lat: stop.1.latitude.unwrap(),
lng: stop.1.longitude.unwrap(),
platform_location: libseptastic::stop::PlatformLocationType::Normal,
});
if let Some(parent) = &stop.1.parent_station {
let parent_global_id = make_global_id!(prefix, parent);
if !state.annotations.parent_stop_blacklist.contains(&parent_global_id) {
map.entry(parent_global_id)
.or_insert(vec![]).push(global_id.clone());
}
}
let stop = Arc::new(libseptastic::stop::Stop {
id: global_id.clone(),
name: stop.1.name.clone().unwrap(),
platforms: libseptastic::stop::StopType::SinglePlatform(platform.clone()),
});
state
.transit_data
.stops
.insert(global_id.clone(), stop.clone());
state
.transit_data
.platforms
.insert(global_id.clone(), platform.clone());
state
.transit_data
.stops_by_platform_id
.insert(global_id.clone(), stop.clone());
}
for pair in &map {
let parent_stop = state.transit_data.stops.get(pair.0).unwrap().clone();
//let child_stop: Vec<libseptastic::stop::Stop> = pair.1.iter().map(|stop_id| {
// state.transit_data.stops.get(stop_id).unwrap().clone()
//}).collect();
state.annotations.multiplatform_stops.push(
MultiplatformStopConfig { id: parent_stop.id.clone(), name: parent_stop.name.clone(), platform_station_ids: pair.1.clone() }
);
}
Ok(())
}
fn populate_routes(
state: &mut MutexGuard<'_, GtfsPullServiceState>,
prefix: &String,
gtfs: &gtfs_structures::Gtfs,
) -> anyhow::Result<()> {
for route in &gtfs.routes {
let global_rt_id = make_global_id!(prefix, route.1.id);
let rt_name = match route.1.long_name.clone() {
Some(x) => x,
_ => String::from("Unknown"),
};
let dirs = match state.transit_data.directions.get(&global_rt_id) {
Some(x) => x
.iter()
.map(|f| libseptastic::direction::Direction::clone(f))
.collect(),
None => {
warn!("Excluding {} because it has no directions", global_rt_id);
continue;
}
};
state.transit_data.routes.insert(
global_rt_id.clone(),
Arc::new(libseptastic::route::Route {
name: rt_name,
directions: dirs,
short_name: match route.1.short_name.clone() {
Some(x) => x,
_ => String::from("unknown"),
},
color_hex: match route.1.color {
Some(x) => x.to_string(),
_ => String::from("unknown"),
},
id: global_rt_id,
route_type: match route.1.route_type {
gtfs_structures::RouteType::Bus => libseptastic::route::RouteType::Bus,
gtfs_structures::RouteType::Rail => {
libseptastic::route::RouteType::RegionalRail
}
gtfs_structures::RouteType::Subway => {
libseptastic::route::RouteType::SubwayElevated
}
gtfs_structures::RouteType::Tramway => {
libseptastic::route::RouteType::Trolley
}
_ => libseptastic::route::RouteType::TracklessTrolley,
},
}),
);
}
Ok(())
}
fn populate_directions(
state: &mut MutexGuard<'_, GtfsPullServiceState>,
prefix: &String,
gtfs: &gtfs_structures::Gtfs,
) -> anyhow::Result<()> {
for trip in &gtfs.trips {
let global_rt_id = make_global_id!(prefix, trip.1.route_id);
let dir = libseptastic::direction::Direction {
direction: match trip.1.direction_id.unwrap() {
gtfs_structures::DirectionType::Outbound => {
libseptastic::direction::CardinalDirection::Outbound
}
gtfs_structures::DirectionType::Inbound => {
libseptastic::direction::CardinalDirection::Inbound
}
},
direction_destination: trip.1.trip_headsign.clone().unwrap(),
};
match state.transit_data.directions.entry(global_rt_id) {
Entry::Vacant(e) => {
e.insert(vec![Arc::new(dir)]);
}
Entry::Occupied(mut e) => {
if e.get()
.iter()
.filter(|x| x.direction == dir.direction)
.count()
== 0
{
e.get_mut().push(Arc::new(dir));
}
}
}
}
for dir in &mut state.transit_data.directions {
dir.1.sort_by(|x, y| {
if x.direction > y.direction {
Ordering::Greater
} else {
Ordering::Less
}
});
}
Ok(())
}
fn populate_trips(
state: &mut MutexGuard<'_, GtfsPullServiceState>,
prefix: &String,
gtfs: &gtfs_structures::Gtfs,
) -> anyhow::Result<()> {
for trip in &gtfs.trips {
let global_rt_id = make_global_id!(prefix, trip.1.route_id);
let sched = trip
.1
.stop_times
.iter()
.map(|s| {
let global_stop_id = make_global_id!(prefix, s.stop.id);
let stop = state
.transit_data
.stops_by_platform_id
.get(&global_stop_id)
.unwrap()
.clone();
let platform = state
.transit_data
.platforms
.get(&global_stop_id)
.unwrap()
.clone();
state
.transit_data
.route_id_by_stops
.entry(stop.id.clone())
.or_insert(HashSet::new())
.insert(global_rt_id.clone());
state
.transit_data
.stops_by_route_id
.entry(global_rt_id.clone())
.or_insert(HashSet::new())
.insert(stop.id.clone());
state
.transit_data
.route_id_by_stops
.entry(platform.id.clone())
.or_insert(HashSet::new())
.insert(global_rt_id.clone());
state
.transit_data
.stops_by_route_id
.entry(global_rt_id.clone())
.or_insert(HashSet::new())
.insert(platform.id.clone());
libseptastic::stop_schedule::StopSchedule {
arrival_time: i64::from(s.arrival_time.unwrap()),
stop_sequence: i64::from(s.stop_sequence),
stop,
platform,
}
})
.collect();
if let Some(calendar_day) = state
.transit_data
.calendar_days
.get(&trip.1.service_id.clone())
{
let trip = libseptastic::stop_schedule::Trip {
trip_id: trip.1.id.clone(),
route: state
.transit_data
.routes
.get(&make_global_id!(prefix, trip.1.route_id))
.unwrap()
.clone(),
direction: libseptastic::direction::Direction {
direction: match trip.1.direction_id.unwrap() {
gtfs_structures::DirectionType::Outbound => {
libseptastic::direction::CardinalDirection::Outbound
}
gtfs_structures::DirectionType::Inbound => {
libseptastic::direction::CardinalDirection::Inbound
}
},
direction_destination: trip.1.trip_headsign.clone().unwrap(),
},
tracking_data: libseptastic::stop_schedule::TripTracking::Untracked,
schedule: sched,
service_id: trip.1.service_id.clone(),
calendar_day: calendar_day.clone(),
};
if let Some(trip_arr) = state.transit_data.trips.get_mut(&global_rt_id) {
trip_arr.push(trip);
} else {
state.transit_data.trips.insert(global_rt_id, vec![trip]);
}
}
}
Ok(())
}
pub fn update_gtfs_data(state: Arc<Mutex<GtfsPullServiceState>>) -> anyhow::Result<()> {
let mut l_state = state.lock().unwrap();
let files = l_state.gtfs_files.clone();
l_state.transit_data = TransitData::new();
let mut gtfses = Vec::new();
for gtfs_file in files.iter() {
let gtfs = if let Some(subzip) = gtfs_file.source.subzip.clone() {
info!(
"Reading GTFS file at {} (subzip {})",
gtfs_file.source.uri, subzip
);
let res = reqwest::blocking::get(gtfs_file.source.uri.clone())?;
let outer_archive = res.bytes()?;
let mut archive = ZipArchive::new(Cursor::new(outer_archive))?;
archive.extract(l_state.tmp_dir.clone())?;
let mut file_path = l_state.tmp_dir.clone();
file_path.push(subzip.clone());
info!("Downloaded, parsing");
gtfs_structures::Gtfs::new(file_path.to_str().unwrap())?
} else {
info!("Reading GTFS file at {}", gtfs_file.source.uri);
gtfs_structures::Gtfs::new(gtfs_file.source.uri.as_str())?
};
gtfses.push((gtfs, gtfs_file.source.prefix.clone()));
}
info!("Data loaded, processing...");
for (gtfs, prefix) in &gtfses {
GtfsPullService::populate_directions(&mut l_state, &prefix, &gtfs)?;
GtfsPullService::populate_routes(&mut l_state, &prefix, &gtfs)?;
GtfsPullService::populate_stops(&mut l_state, &prefix, &gtfs)?;
for calendar in &gtfs.calendar {
l_state.transit_data.calendar_days.insert(
calendar.1.id.clone(),
Arc::new(CalendarDay {
id: calendar.1.id.clone(),
monday: calendar.1.monday,
tuesday: calendar.1.tuesday,
wednesday: calendar.1.wednesday,
thursday: calendar.1.thursday,
friday: calendar.1.friday,
saturday: calendar.1.saturday,
sunday: calendar.1.sunday,
start_date: calendar.1.start_date,
end_date: calendar.1.end_date,
}),
);
}
}
GtfsPullService::postprocess_stops(&mut l_state)?;
for (gtfs, prefix) in &gtfses {
GtfsPullService::populate_trips(&mut l_state, &prefix, &gtfs)?;
}
l_state.ready = true;
info!("Finished initial sync, ready state is true");
Ok(())
}
}

2
web/src/services/mod.rs Normal file
View file

@ -0,0 +1,2 @@
pub mod gtfs_pull;
pub mod trip_tracking;

View file

@ -0,0 +1,266 @@
use chrono::Utc;
use futures::lock::Mutex;
use libseptastic::stop_schedule::{LiveTrip, SeatAvailability, TripTracking};
use log::{error, info};
use serde::de;
use serde::{Deserialize, Deserializer, Serialize};
use serde_json::Value;
use sqlx::{Postgres, QueryBuilder, Transaction};
use std::collections::HashMap;
use std::sync::Arc;
use std::time::Duration;
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct LiveTripJson {
pub route_id: String,
pub trip_id: String,
pub service_id: Option<String>,
pub trip_headsign: String,
#[serde(deserialize_with = "de_numstro")]
pub direction_id: Option<String>,
#[serde(deserialize_with = "de_numstr")]
pub block_id: String,
pub start_time: Option<String>,
pub end_time: Option<String>,
pub delay: f64,
pub status: String,
pub lat: Option<String>,
pub lon: Option<String>,
#[serde(deserialize_with = "de_numstrflo")]
pub heading: Option<String>,
#[serde(deserialize_with = "de_numstro")]
pub next_stop_id: Option<String>,
pub next_stop_name: Option<String>,
pub next_stop_sequence: Option<i64>,
pub seat_availability: Option<String>,
pub vehicle_id: Option<String>,
pub timestamp: i64,
}
const HOST: &str = "https://www3.septa.org";
struct TripTrackingServiceState {
pub tracking_data: HashMap<String, TripTracking>,
pub database: ::sqlx::postgres::PgPool,
}
pub struct TripTrackingService {
state: Arc<Mutex<TripTrackingServiceState>>,
}
impl TripTrackingService {
const UPDATE_SECONDS: u64 = 75;
pub async fn log_delay(
transaction: &mut Transaction<'_, Postgres>,
tracking_data: &HashMap<String, TripTracking>,
timestamp: i64,
) -> ::anyhow::Result<()> {
let mut query_builder: QueryBuilder<Postgres> = QueryBuilder::new(
"INSERT INTO
live_tracking
(
delay_minutes,
next_stop_id,
timestamp,
snapshot_timestamp,
lat,
lng,
heading,
seat_availability,
vehicle_ids,
trip_id,
route_id
)
VALUES",
);
let mut separated = query_builder.separated(", ");
for trip in tracking_data {
if let TripTracking::Tracked(live_data) = trip.1 {
separated.push("(");
separated.push_bind_unseparated(live_data.delay);
separated.push_bind(live_data.next_stop_id);
separated.push_bind(live_data.timestamp);
separated.push_bind(timestamp);
separated.push_bind(live_data.latitude);
separated.push_bind(live_data.longitude);
separated.push_bind(live_data.heading);
separated.push_bind(match &live_data.seat_availability {
Some(s) => Some(s.to_string()),
None => None,
});
separated.push_bind(live_data.vehicle_ids.clone());
separated.push_bind(live_data.trip_id.clone());
separated.push_bind(live_data.route_id.clone());
separated.push_unseparated(")");
}
}
let query = query_builder.build();
query.execute(&mut **transaction).await?;
Ok(())
}
pub async fn new() -> Self {
let connection_string =
std::env::var("DB_CONNSTR").expect("Expected database connection string");
let pool = ::sqlx::postgres::PgPoolOptions::new()
.max_connections(5)
.connect(&connection_string)
.await
.unwrap();
Self {
state: Arc::new(Mutex::new(TripTrackingServiceState {
tracking_data: HashMap::new(),
database: pool,
})),
}
}
pub fn start(&self) {
info!("Starting live tracking service");
let cloned_state = Arc::clone(&self.state);
tokio::spawn(async move {
loop {
let clonedx_state = Arc::clone(&cloned_state);
let res = Self::update_live_trips(clonedx_state).await;
match res {
Err(err) => {
error!("{:?}", err);
}
_ => {}
}
tokio::time::sleep(Duration::from_secs(Self::UPDATE_SECONDS)).await;
}
});
}
pub async fn annotate_trips(&self, trips: &mut Vec<libseptastic::stop_schedule::Trip>) {
for trip in trips {
trip.tracking_data = match self
.state
.lock()
.await
.tracking_data
.get(&trip.trip_id.clone())
{
Some(x) => x.clone(),
None => TripTracking::Untracked,
};
}
}
async fn update_live_trips(
service: Arc<Mutex<TripTrackingServiceState>>,
) -> anyhow::Result<()> {
let mut new_map: HashMap<String, TripTracking> = HashMap::new();
let live_tracks = reqwest::get(format!("{}/api/v2/trips/", HOST))
.await?
.json::<Vec<LiveTripJson>>()
.await?;
for live_track in live_tracks {
let track: TripTracking = {
if live_track.status == "NO GPS" {
TripTracking::Untracked
} else if live_track.status == "CANCELED" {
TripTracking::Cancelled
} else {
TripTracking::Tracked(LiveTrip {
trip_id: live_track.trip_id.clone(),
route_id: live_track.route_id,
delay: live_track.delay,
seat_availability: SeatAvailability::from_opt_string(
&live_track.seat_availability,
),
heading: match live_track.heading {
Some(hdg) => {
if hdg != "" {
Some(hdg.parse::<f64>()?)
} else {
None
}
}
None => None,
},
latitude: match live_track.lat {
Some(lat) => Some(lat.parse::<f64>()?),
None => None,
},
longitude: match live_track.lon {
Some(lon) => Some(lon.parse::<f64>()?),
None => None,
},
next_stop_id: match live_track.next_stop_id {
Some(x) => match x.parse() {
Ok(y) => Some(y),
Err(_) => None,
},
None => None,
},
timestamp: live_track.timestamp,
vehicle_ids: match live_track.vehicle_id {
Some(x) => x.split(",").map(|f| String::from(f)).collect(),
None => vec![],
},
})
}
};
if let TripTracking::Cancelled = track {}
new_map.insert(live_track.trip_id.clone(), track);
}
let mut svc = service.lock().await;
let mut tx = svc.database.begin().await?;
Self::log_delay(&mut tx, &new_map, Utc::now().timestamp_nanos_opt().unwrap()).await?;
tx.commit().await?;
svc.tracking_data = new_map;
Ok(())
}
}
fn de_numstr<'de, D: Deserializer<'de>>(deserializer: D) -> Result<String, D::Error> {
Ok(match Value::deserialize(deserializer)? {
Value::String(s) => s,
Value::Number(num) => num
.as_i64()
.ok_or(de::Error::custom("Invalid number"))?
.to_string(),
_ => return Err(de::Error::custom("wrong type")),
})
}
fn de_numstro<'de, D: Deserializer<'de>>(deserializer: D) -> Result<Option<String>, D::Error> {
Ok(match Value::deserialize(deserializer)? {
Value::String(s) => Some(s),
Value::Number(num) => Some(
num.as_i64()
.ok_or(de::Error::custom("Invalid number"))?
.to_string(),
),
_ => None,
})
}
fn de_numstrflo<'de, D: Deserializer<'de>>(deserializer: D) -> Result<Option<String>, D::Error> {
Ok(match Value::deserialize(deserializer)? {
Value::String(s) => Some(s),
Value::Number(num) => Some(
num.as_f64()
.ok_or(de::Error::custom("Invalid number"))?
.to_string(),
),
_ => None,
})
}

View file

@ -0,0 +1,69 @@
use actix_web::{FromRequest, HttpRequest, HttpResponse, cookie::Cookie, dev::Payload};
use askama::Template;
use serde::Deserialize;
use std::{pin::Pin, time::Instant};
#[derive(Deserialize)]
struct LocalStateQuery {
pub widescreen: Option<bool>,
}
pub trait SessionResponder<T: Template> {
fn respond(&self, page_title: &str, page_desc: &str, content: T) -> HttpResponse;
}
pub struct SessionResponse {
start_time: Instant,
widescreen: bool,
}
impl<T> SessionResponder<T> for SessionResponse
where
T: Template,
{
fn respond(&self, page_title: &str, page_desc: &str, content: T) -> HttpResponse {
let end_time = Instant::now();
let mut cookie = Cookie::new("widescreen", self.widescreen.to_string());
cookie.set_path("/");
HttpResponse::Ok().cookie(cookie).body(
crate::templates::ContentTemplate {
page_title: Some(page_title.to_string()),
page_desc: Some(page_desc.to_string()),
content,
load_time_ms: Some((end_time - self.start_time).as_nanos()),
widescreen: self.widescreen,
}
.render()
.unwrap(),
)
}
}
impl FromRequest for SessionResponse {
type Error = actix_web::Error;
type Future = Pin<Box<dyn Future<Output = Result<SessionResponse, Self::Error>>>>;
fn from_request(req: &HttpRequest, _: &mut Payload) -> Self::Future {
let start_time = Instant::now();
let mut enable_widescreen = true;
if let Some(widescreen_set) = req.cookie("widescreen") {
enable_widescreen = widescreen_set.value() == "true";
}
let query_params =
actix_web::web::Query::<LocalStateQuery>::from_query(req.query_string()).unwrap();
if let Some(set_widescreen) = query_params.widescreen {
enable_widescreen = set_widescreen;
}
Box::pin(async move {
Ok(SessionResponse {
start_time,
widescreen: enable_widescreen,
})
})
}
}

251
web/src/templates.rs Normal file
View file

@ -0,0 +1,251 @@
use chrono::Timelike;
use chrono_tz::America::New_York;
use libseptastic::stop::Stop;
use libseptastic::stop_schedule::TripTracking::Tracked;
use libseptastic::{
direction::Direction,
stop_schedule::{SeatAvailability, Trip, TripTracking},
};
use serde::Serialize;
use std::{
cmp::Ordering,
collections::{BTreeMap, BTreeSet},
};
use crate::controllers::stop::StopFilter;
#[derive(askama::Template)]
#[template(path = "layout.html")]
pub struct ContentTemplate<T: askama::Template> {
pub content: T,
pub page_title: Option<String>,
pub page_desc: Option<String>,
pub load_time_ms: Option<u128>,
pub widescreen: bool,
}
#[derive(askama::Template)]
#[template(path = "route.html")]
pub struct RouteTemplate {
pub route: libseptastic::route::Route,
pub timetables: Vec<TimetableDirection>,
pub filter_stops: Option<Vec<String>>,
}
#[derive(askama::Template)]
#[template(path = "routes.html")]
pub struct RoutesTemplate {
pub rr_routes: Vec<libseptastic::route::Route>,
pub subway_routes: Vec<libseptastic::route::Route>,
pub trolley_routes: Vec<libseptastic::route::Route>,
pub bus_routes: Vec<libseptastic::route::Route>,
}
#[derive(askama::Template)]
#[template(path = "stops.html")]
pub struct StopsTemplate {
pub tc_stops: Vec<libseptastic::stop::Stop>,
}
#[derive(askama::Template)]
#[template(path = "index.html")]
pub struct IndexTemplate {}
#[derive(Debug, Serialize)]
pub struct TimetableStopRow {
pub stop_id: String,
pub stop_name: String,
pub stop_sequence: i64,
pub times: Vec<Option<i64>>,
}
#[derive(Debug, Serialize)]
pub struct TimetableDirection {
pub direction: Direction,
pub trip_ids: Vec<String>,
pub tracking_data: Vec<TripTracking>,
pub rows: Vec<TimetableStopRow>,
pub next_id: Option<String>,
}
pub struct TripPerspective {
pub trip: libseptastic::stop_schedule::Trip,
pub perspective_stop: libseptastic::stop_schedule::StopSchedule,
}
#[derive(askama::Template)]
#[template(path = "stop.html")]
pub struct StopTemplate {
pub stop: libseptastic::stop::Stop,
pub routes: BTreeSet<libseptastic::route::Route>,
pub trips: Vec<TripPerspective>,
pub current_time: i64,
pub filters: Option<StopFilter>,
pub query_str: String,
}
#[derive(askama::Template)]
#[template(path = "stop_table_impl.html")]
pub struct StopTableTemplate {
pub trips: Vec<TripPerspective>,
pub current_time: i64,
pub query_str: String,
pub stop_id: String,
}
#[derive(askama::Template)]
#[template(path = "stop_search_results.html")]
pub struct StopSearchResults {
pub results: Vec<Stop>
}
pub fn build_timetables(directions: Vec<Direction>, trips: Vec<Trip>) -> Vec<TimetableDirection> {
let mut results = Vec::new();
for direction in directions {
let now_utc = chrono::Utc::now();
let now = now_utc.with_timezone(&New_York);
let naive_time = now.time();
let seconds_since_midnight = naive_time.num_seconds_from_midnight();
let mut next_id: Option<String> = None;
let mut direction_trips: Vec<&Trip> = trips
.iter()
.filter(|trip| trip.direction.direction == direction.direction)
.collect();
direction_trips.sort_by_key(|trip| {
trip.schedule
.iter()
.filter_map(|s| Some(s.arrival_time))
.min()
.unwrap_or(i64::MAX)
});
for trip in direction_trips.clone() {
if let Some(last) = trip.schedule.iter().max_by_key(|x| x.arrival_time) {
if next_id == None && i64::from(seconds_since_midnight) < last.arrival_time {
next_id = Some(last.stop.id.to_string());
}
}
}
let trip_ids: Vec<String> = direction_trips.iter().map(|t| t.trip_id.clone()).collect();
let live_trips: Vec<TripTracking> = direction_trips
.iter()
.map(|t| t.tracking_data.clone())
.collect();
let mut stop_map: BTreeMap<String, (i64, String, Vec<Option<i64>>)> = BTreeMap::new();
for (trip_index, trip) in direction_trips.iter().enumerate() {
for stop in &trip.schedule {
let entry = stop_map.entry(stop.stop.id.clone()).or_insert((
stop.stop_sequence,
stop.stop.name.clone(),
vec![None; direction_trips.len()],
));
// If this stop_id appears in multiple trips with different sequences, keep the lowest
entry.0 = entry.0.max(stop.stop_sequence);
entry.1 = stop.stop.name.clone();
entry.2[trip_index] = Some(stop.arrival_time);
}
}
let mut rows: Vec<TimetableStopRow> = stop_map
.into_iter()
.map(
|(stop_id, (stop_sequence, stop_name, times))| TimetableStopRow {
stop_id,
stop_sequence,
stop_name,
times,
},
)
.collect();
rows.sort_by(|a, b| {
if a.stop_sequence < b.stop_sequence {
Ordering::Less
} else {
Ordering::Greater
}
});
results.push(TimetableDirection {
direction: direction.clone(),
trip_ids,
rows,
tracking_data: live_trips,
next_id,
});
}
results
}
mod filters {
use askama::filter_fn;
#[filter_fn]
pub fn format_load_time(nanos: &u128, _: &dyn askama::Values) -> askama::Result<String> {
if *nanos >= 1000000000 {
return Ok(format!("{}s", (nanos / 1000000000)));
} else if *nanos >= 1000000 {
return Ok(format!("{}ms", nanos / 1000000));
}
if *nanos >= 1000 {
return Ok(format!("{}us", nanos / 1000));
} else {
return Ok(format!("{}ns", nanos));
}
}
#[filter_fn]
pub fn format_time(
seconds_since_midnight: &i64,
_: &dyn askama::Values,
) -> askama::Result<String> {
let total_minutes = seconds_since_midnight / 60;
let (hours, ampm) = {
let hrs = total_minutes / 60;
if hrs > 12 {
(hrs - 12, "PM")
} else if hrs == 12 {
(12, "PM")
} else if hrs > 0 {
(hrs, "AM")
} else {
(12, "AM")
}
};
let minutes = total_minutes % 60;
Ok(format!("{}:{:02} {}", hours, minutes, ampm))
}
#[filter_fn]
pub fn format_time_with_seconds(
seconds_since_midnight: &i64,
_: &dyn askama::Values,
) -> askama::Result<String> {
let total_minutes = seconds_since_midnight / 60;
let (hours, ampm) = {
let hrs = total_minutes / 60;
if hrs > 12 {
(hrs - 12, "PM")
} else if hrs == 12 {
(12, "PM")
} else if hrs > 0 {
(hrs, "AM")
} else {
(12, "AM")
}
};
let minutes = total_minutes % 60;
let seconds = seconds_since_midnight % 60;
Ok(format!("{}:{:02}:{:02} {}", hours, minutes, seconds, ampm))
}
}