many changes
Some checks failed
Create and publish a Docker image / build-and-push-image (push) Has been cancelled
Some checks failed
Create and publish a Docker image / build-and-push-image (push) Has been cancelled
This commit is contained in:
parent
4777f46a38
commit
be177af6cd
25 changed files with 2059 additions and 47 deletions
298
api/src/database.rs
Normal file
298
api/src/database.rs
Normal file
|
|
@ -0,0 +1,298 @@
|
|||
use std::{collections::HashMap, hash::Hash};
|
||||
|
||||
use actix_web::Route;
|
||||
use libseptastic::{direction::CardinalDirection, route::RouteType};
|
||||
use serde::{Deserialize, Serialize};
|
||||
use sqlx::{Postgres, Transaction};
|
||||
|
||||
pub async fn get_route_by_id(
|
||||
id: String,
|
||||
transaction: &mut Transaction<'_, Postgres>,
|
||||
) -> ::anyhow::Result<libseptastic::route::Route> {
|
||||
|
||||
let row = sqlx::query!(
|
||||
r#"SELECT
|
||||
id,
|
||||
name,
|
||||
short_name,
|
||||
color_hex,
|
||||
route_type as "route_type: libseptastic::route::RouteType"
|
||||
FROM
|
||||
septa_routes
|
||||
WHERE
|
||||
id = $1
|
||||
;"#,
|
||||
id
|
||||
)
|
||||
.fetch_one(&mut **transaction)
|
||||
.await?;
|
||||
|
||||
return Ok(libseptastic::route::Route {
|
||||
name: row.name,
|
||||
short_name: row.short_name,
|
||||
color_hex: row.color_hex,
|
||||
route_type: row.route_type,
|
||||
id: row.id,
|
||||
});
|
||||
}
|
||||
|
||||
pub async fn get_direction_by_route_id(
|
||||
id: String,
|
||||
transaction: &mut Transaction<'_, Postgres>,
|
||||
) -> ::anyhow::Result<Vec<libseptastic::direction::Direction>> {
|
||||
|
||||
let rows = sqlx::query!(
|
||||
r#"SELECT
|
||||
route_id,
|
||||
direction_id,
|
||||
direction as "direction: libseptastic::direction::CardinalDirection",
|
||||
direction_destination
|
||||
FROM
|
||||
septa_directions
|
||||
WHERE
|
||||
route_id = $1
|
||||
;"#,
|
||||
id
|
||||
)
|
||||
.fetch_all(&mut **transaction)
|
||||
.await?;
|
||||
|
||||
let mut res = Vec::new();
|
||||
|
||||
for row in rows {
|
||||
res.push(libseptastic::direction::Direction{
|
||||
route_id: row.route_id,
|
||||
direction_id: row.direction_id,
|
||||
direction: row.direction,
|
||||
direction_destination: row.direction_destination
|
||||
});
|
||||
}
|
||||
|
||||
return Ok(res);
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
pub struct StopSchedule {
|
||||
pub route_id: String,
|
||||
pub stop_name: String,
|
||||
pub trip_id: String,
|
||||
pub service_id: String,
|
||||
pub direction_id: i64,
|
||||
pub arrival_time: i64,
|
||||
pub stop_id: i64,
|
||||
pub stop_sequence: i64
|
||||
}
|
||||
|
||||
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
pub struct Trip {
|
||||
pub route_id: String,
|
||||
pub trip_id: String,
|
||||
pub direction_id: i64,
|
||||
pub schedule: Vec<StopSchedule>
|
||||
}
|
||||
|
||||
pub async fn get_schedule_by_route_id(
|
||||
id: String,
|
||||
transaction: &mut Transaction<'_, Postgres>,
|
||||
) -> ::anyhow::Result<Vec<Trip>> {
|
||||
|
||||
let rows = sqlx::query!(
|
||||
r#"SELECT
|
||||
septa_stop_schedules.route_id,
|
||||
septa_stops.name as stop_name,
|
||||
trip_id,
|
||||
service_id,
|
||||
septa_stop_schedules.direction_id,
|
||||
septa_directions.direction as "direction: libseptastic::direction::CardinalDirection",
|
||||
arrival_time,
|
||||
stop_id,
|
||||
stop_sequence
|
||||
FROM
|
||||
septa_stop_schedules
|
||||
INNER JOIN septa_directions
|
||||
ON
|
||||
septa_directions.direction_id = septa_stop_schedules.direction_id
|
||||
AND
|
||||
septa_directions.route_id = septa_stop_schedules.route_id
|
||||
INNER JOIN septa_stops
|
||||
ON septa_stops.id = septa_stop_schedules.stop_id
|
||||
WHERE
|
||||
septa_stop_schedules.route_id = $1
|
||||
AND
|
||||
service_id IN (SELECT service_id FROM septa_schedule_days WHERE date = '20250707')
|
||||
;"#,
|
||||
id
|
||||
)
|
||||
.fetch_all(&mut **transaction)
|
||||
.await?;
|
||||
|
||||
let mut sched_groups: HashMap<String, Vec<StopSchedule>> = HashMap::new();
|
||||
for row in rows {
|
||||
let mut arr = match sched_groups.get_mut(&row.trip_id) {
|
||||
Some(x) => x,
|
||||
None => {
|
||||
sched_groups.insert(row.trip_id.clone(), Vec::new());
|
||||
sched_groups.get_mut(&row.trip_id).unwrap()
|
||||
}
|
||||
};
|
||||
|
||||
arr.push(StopSchedule {
|
||||
route_id: row.route_id,
|
||||
stop_name: row.stop_name,
|
||||
trip_id: row.trip_id,
|
||||
service_id: row.service_id,
|
||||
direction_id: row.direction_id,
|
||||
arrival_time: row.arrival_time,
|
||||
stop_id: row.stop_id,
|
||||
stop_sequence: row.stop_sequence
|
||||
});
|
||||
}
|
||||
let mut res = Vec::new();
|
||||
|
||||
for group in sched_groups {
|
||||
res.push(Trip{
|
||||
trip_id: group.0,
|
||||
route_id: group.1[0].route_id.clone(),
|
||||
schedule: group.1.clone(),
|
||||
direction_id: group.1[0].direction_id.clone()
|
||||
});
|
||||
}
|
||||
|
||||
return Ok(res);
|
||||
}
|
||||
|
||||
#[derive(Serialize,Deserialize,Clone)]
|
||||
pub struct NTALive {
|
||||
delay: i64,
|
||||
cancelled: bool,
|
||||
next_stop: Option<String>
|
||||
}
|
||||
|
||||
#[derive(Serialize,Deserialize)]
|
||||
pub struct LiveData {
|
||||
route_id: String,
|
||||
service_id: String,
|
||||
trip_id: String,
|
||||
trip_headsign: String,
|
||||
direction_id: i64,
|
||||
block_id: String,
|
||||
start_time: String,
|
||||
end_time: String,
|
||||
delay: i64,
|
||||
status: String,
|
||||
lat: Option<String>,
|
||||
lon: Option<String>,
|
||||
heading: Option<String>,
|
||||
next_stop_id: Option<i64>,
|
||||
next_stop_name: Option<String>,
|
||||
next_stop_sequence: Option<i64>,
|
||||
seat_availability: String,
|
||||
vehicle_id: String,
|
||||
timestamp: i64
|
||||
}
|
||||
|
||||
#[derive(Serialize,Deserialize)]
|
||||
pub struct NTAEntry {
|
||||
route_id: String,
|
||||
route_type: RouteType,
|
||||
route_name: String,
|
||||
color_hex: String,
|
||||
trip_id: String,
|
||||
arrival_time: i64,
|
||||
direction: CardinalDirection,
|
||||
direction_destination: String,
|
||||
live: Option<NTALive>
|
||||
}
|
||||
|
||||
#[derive(Serialize,Deserialize)]
|
||||
pub struct NTAResult {
|
||||
station_name: String,
|
||||
arrivals: Vec<NTAEntry>
|
||||
}
|
||||
|
||||
pub async fn get_nta_by_stop_id(
|
||||
ids: Vec<i64>,
|
||||
start_time: chrono::DateTime<chrono::Utc>,
|
||||
end_time: chrono::DateTime<chrono::Utc>,
|
||||
transaction: &mut Transaction<'_, Postgres>,
|
||||
) -> ::anyhow::Result<NTAResult> {
|
||||
let local_start = start_time.with_timezone(&chrono_tz::America::New_York);
|
||||
let local_end = end_time.with_timezone(&chrono_tz::America::New_York);
|
||||
let local_midnight = chrono::Utc::now().with_timezone(&chrono_tz::America::New_York).date().and_hms(0,0,0);
|
||||
let start_secs = local_start.signed_duration_since(local_midnight).num_seconds();
|
||||
let end_secs = local_end.signed_duration_since(local_midnight).num_seconds();
|
||||
|
||||
let name_row = sqlx::query!("SELECT name FROM septa_stops WHERE id = $1", ids[0]).fetch_one(&mut **transaction).await?;
|
||||
|
||||
let stop_name = name_row.name;
|
||||
|
||||
let rows: Vec<(String, RouteType, String, String, i64, CardinalDirection, String, String,)> = sqlx::query_as(
|
||||
r#"SELECT
|
||||
septa_stop_schedules.route_id,
|
||||
route_type as "route_type: libseptastic::route::RouteType",
|
||||
septa_routes.color_hex,
|
||||
trip_id,
|
||||
arrival_time,
|
||||
septa_directions.direction as "direction: libseptastic::direction::CardinalDirection",
|
||||
septa_directions.direction_destination,
|
||||
septa_routes.name
|
||||
FROM
|
||||
septa_stop_schedules
|
||||
INNER JOIN septa_directions
|
||||
ON
|
||||
septa_directions.direction_id = septa_stop_schedules.direction_id
|
||||
AND
|
||||
septa_directions.route_id = septa_stop_schedules.route_id
|
||||
INNER JOIN septa_stops
|
||||
ON septa_stops.id = septa_stop_schedules.stop_id
|
||||
INNER JOIN septa_routes
|
||||
ON septa_routes.id = septa_stop_schedules.route_id
|
||||
WHERE
|
||||
(septa_stops.id = $1 OR septa_stops.id = $2)
|
||||
AND
|
||||
service_id IN (SELECT service_id FROM septa_schedule_days WHERE date = '20250707')
|
||||
AND
|
||||
septa_stop_schedules.arrival_time > $3
|
||||
AND
|
||||
septa_stop_schedules.arrival_time < $4
|
||||
ORDER BY arrival_time
|
||||
;"#)
|
||||
.bind(&ids[0])
|
||||
.bind(&ids.get(1).unwrap_or(&0))
|
||||
.bind(&start_secs)
|
||||
.bind(&end_secs)
|
||||
.fetch_all(&mut **transaction)
|
||||
.await?;
|
||||
|
||||
let mut ntas: Vec<NTAEntry> = Vec::new();
|
||||
let mut live_map: HashMap<String, NTALive> = HashMap::new();
|
||||
|
||||
let lives: Vec<LiveData> = reqwest::get("https://www3.septa.org/api/v2/trips/?route_id=AIR,CHW,LAN,NOR,TRE,WIL,WAR,MED,PAO,FOX,WTR,CYN").await?.json().await?;
|
||||
|
||||
for live in lives {
|
||||
live_map.insert(live.route_id, NTALive { delay: live.delay, cancelled: live.status == "CANCELLED", next_stop: live.next_stop_name });
|
||||
}
|
||||
|
||||
for row in rows {
|
||||
ntas.push(NTAEntry {
|
||||
route_id: row.0.clone(),
|
||||
route_type: row.1,
|
||||
color_hex: row.2,
|
||||
trip_id: row.3,
|
||||
arrival_time: row.4,
|
||||
direction: row.5,
|
||||
direction_destination: row.6,
|
||||
route_name: row.7,
|
||||
live: match live_map.get(&row.0) {
|
||||
Some(x) => Some(x.clone()),
|
||||
None => None
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
return Ok(NTAResult{
|
||||
station_name: stop_name,
|
||||
arrivals: ntas
|
||||
});
|
||||
}
|
||||
244
api/src/main.rs
244
api/src/main.rs
|
|
@ -1,12 +1,229 @@
|
|||
use actix_web::{get, App, HttpResponse, HttpServer, Responder};
|
||||
use actix_web::{get, web::{self, Data}, App, HttpResponse, HttpServer, Responder};
|
||||
use chrono::TimeDelta;
|
||||
use database::{get_direction_by_route_id, get_nta_by_stop_id, get_schedule_by_route_id};
|
||||
use env_logger::Env;
|
||||
use libseptastic::{direction::Direction};
|
||||
use database::{Trip, StopSchedule};
|
||||
use log::*;
|
||||
use dotenv::dotenv;
|
||||
use serde_json::json;
|
||||
use std::{cmp::Ordering, collections::BTreeMap, sync::Arc};
|
||||
use askama::Template;
|
||||
use serde::{Serialize};
|
||||
|
||||
mod database;
|
||||
|
||||
struct AppState {
|
||||
database: ::sqlx::postgres::PgPool
|
||||
}
|
||||
|
||||
|
||||
async fn get_route_by_id(id: String, state: Data<Arc<AppState>>) -> ::anyhow::Result<libseptastic::route::Route> {
|
||||
Ok(database::get_route_by_id(id, &mut state.database.begin().await?).await?)
|
||||
}
|
||||
|
||||
|
||||
#[derive(Debug, Serialize)]
|
||||
pub struct TimetableStopRow {
|
||||
pub stop_id: i64,
|
||||
pub stop_name: String,
|
||||
pub stop_sequence: i64,
|
||||
pub times: Vec<Option<i64>>, // one per trip, None if trip doesn't stop
|
||||
}
|
||||
|
||||
#[derive(Debug, Serialize)]
|
||||
pub struct TimetableDirection {
|
||||
pub direction: Direction,
|
||||
pub trip_ids: Vec<String>, // column headers
|
||||
pub rows: Vec<TimetableStopRow>, // one per unique stop
|
||||
}
|
||||
|
||||
pub fn build_timetables(
|
||||
directions: &[Direction],
|
||||
trips: &[Trip],
|
||||
) -> Vec<TimetableDirection> {
|
||||
let mut results = Vec::new();
|
||||
|
||||
for direction in directions {
|
||||
let mut direction_trips: Vec<&Trip> = trips
|
||||
.iter()
|
||||
.filter(|trip| trip.direction_id == direction.direction_id)
|
||||
.collect();
|
||||
direction_trips.sort_by_key(|trip| {
|
||||
trip.schedule
|
||||
.iter()
|
||||
.filter_map(|s| Some(s.arrival_time))
|
||||
.min()
|
||||
.unwrap_or(i64::MAX)
|
||||
});
|
||||
|
||||
let trip_ids: Vec<String> = direction_trips
|
||||
.iter()
|
||||
.map(|t| t.trip_id.clone())
|
||||
.collect();
|
||||
|
||||
// Map of stop_id -> (stop_sequence, Vec<Option<arrival_time>>)
|
||||
let mut stop_map: BTreeMap<i64, (i64, String, Vec<Option<i64>>)> = BTreeMap::new();
|
||||
|
||||
for (trip_index, trip) in direction_trips.iter().enumerate() {
|
||||
for stop in &trip.schedule {
|
||||
let entry = stop_map
|
||||
.entry(stop.stop_id)
|
||||
.or_insert((stop.stop_sequence, stop.stop_name.clone(), vec![None; direction_trips.len()]));
|
||||
|
||||
// If this stop_id appears in multiple trips with different sequences, keep the lowest
|
||||
entry.0 = entry.0.max(stop.stop_sequence);
|
||||
entry.1 = stop.stop_name.clone();
|
||||
entry.2[trip_index] = Some(stop.arrival_time);
|
||||
}
|
||||
}
|
||||
|
||||
let mut rows: Vec<TimetableStopRow> = stop_map
|
||||
.into_iter()
|
||||
.map(|(stop_id, (stop_sequence, stop_name, times))| TimetableStopRow {
|
||||
stop_id,
|
||||
stop_sequence,
|
||||
stop_name,
|
||||
times,
|
||||
})
|
||||
.collect();
|
||||
|
||||
rows.sort_by(| a, b| {
|
||||
if a.stop_sequence < b.stop_sequence {
|
||||
Ordering::Less
|
||||
} else {
|
||||
Ordering::Greater
|
||||
}
|
||||
});
|
||||
|
||||
|
||||
results.push(TimetableDirection {
|
||||
direction: direction.clone(),
|
||||
trip_ids,
|
||||
rows,
|
||||
});
|
||||
}
|
||||
|
||||
results
|
||||
}
|
||||
|
||||
mod filters {
|
||||
pub fn format_time(
|
||||
seconds: &i64,
|
||||
_: &dyn askama::Values,
|
||||
) -> askama::Result<String> {
|
||||
let total_minutes = seconds / 60;
|
||||
let (hours, ampm) = {
|
||||
let hrs = total_minutes / 60;
|
||||
if hrs > 12 {
|
||||
(hrs - 12, "PM")
|
||||
} else {
|
||||
(hrs, "AM")
|
||||
}
|
||||
};
|
||||
let minutes = total_minutes % 60;
|
||||
Ok(format!("{}:{:02} {}", hours, minutes, ampm))
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(askama::Template)]
|
||||
#[template(path = "layout.html")]
|
||||
struct ContentTemplate<T: askama::Template> {
|
||||
content: T,
|
||||
page_title: Option<String>,
|
||||
page_desc: Option<String>,
|
||||
}
|
||||
|
||||
#[derive(askama::Template)]
|
||||
#[template(path = "route.html")]
|
||||
struct RouteTemplate {
|
||||
route: libseptastic::route::Route,
|
||||
directions: Vec<libseptastic::direction::Direction>,
|
||||
timetables: Vec<TimetableDirection>
|
||||
}
|
||||
|
||||
#[derive(askama::Template)]
|
||||
#[template(path = "routes.html")]
|
||||
struct RoutesTemplate {
|
||||
}
|
||||
|
||||
#[derive(askama::Template)]
|
||||
#[template(path = "index.html")]
|
||||
struct IndexTemplate {
|
||||
}
|
||||
|
||||
#[get("/routes")]
|
||||
async fn get_routes() -> impl Responder {
|
||||
|
||||
HttpResponse::Ok().body(ContentTemplate {
|
||||
page_title: None,
|
||||
page_desc: None,
|
||||
content: RoutesTemplate {}
|
||||
}.render().unwrap())
|
||||
}
|
||||
|
||||
#[get("/")]
|
||||
async fn hello() -> impl Responder {
|
||||
HttpResponse::Ok().json("{}")
|
||||
async fn get_index() -> impl Responder {
|
||||
|
||||
HttpResponse::Ok().body(ContentTemplate {
|
||||
page_title: None,
|
||||
page_desc: None,
|
||||
content: IndexTemplate {}
|
||||
}.render().unwrap())
|
||||
}
|
||||
|
||||
#[get("/route/{route_id}")]
|
||||
async fn get_route(state: Data<Arc<AppState>>, path: web::Path<(String)>) -> impl Responder {
|
||||
let route_id = path.into_inner();
|
||||
let route_r = get_route_by_id(route_id.clone(), state.clone()).await;
|
||||
let directions = get_direction_by_route_id(route_id.clone(), &mut state.database.begin().await.unwrap()).await.unwrap();
|
||||
let trips = get_schedule_by_route_id(route_id, &mut state.database.begin().await.unwrap()).await.unwrap();
|
||||
if let Ok(route) = route_r {
|
||||
HttpResponse::Ok().body(ContentTemplate {
|
||||
page_title: None,
|
||||
page_desc: None,
|
||||
content: RouteTemplate {
|
||||
route,
|
||||
directions: directions.clone(),
|
||||
timetables: build_timetables(directions.as_slice(), trips.as_slice())
|
||||
}
|
||||
}.render().unwrap())
|
||||
} else {
|
||||
HttpResponse::InternalServerError().body("Error")
|
||||
}
|
||||
}
|
||||
|
||||
#[get("/api/route/{route_id}")]
|
||||
async fn api_get_route(state: Data<Arc<AppState>>, path: web::Path<(String)>) -> impl Responder {
|
||||
let route_id = path.into_inner();
|
||||
let route_r = get_route_by_id(route_id, state).await;
|
||||
if let Ok(route) = route_r {
|
||||
HttpResponse::Ok().json(route)
|
||||
} else {
|
||||
HttpResponse::InternalServerError().body("Error")
|
||||
}
|
||||
}
|
||||
|
||||
#[get("/api/route/{route_id}/schedule")]
|
||||
async fn api_get_schedule(state: Data<Arc<AppState>>, path: web::Path<(String)>) -> impl Responder {
|
||||
let route_id = path.into_inner();
|
||||
let route_r = get_schedule_by_route_id(route_id, &mut state.database.begin().await.unwrap()).await;
|
||||
if let Ok(route) = route_r {
|
||||
HttpResponse::Ok().json(route)
|
||||
} else {
|
||||
HttpResponse::InternalServerError().body("Error")
|
||||
}
|
||||
}
|
||||
|
||||
#[get("/api/stop/{stop_id}/nta")]
|
||||
async fn api_get_nta(state: Data<Arc<AppState>>, path: web::Path<(String)>) -> impl Responder {
|
||||
let route_id = path.into_inner().split(',') .map(|s| s.parse::<i64>())
|
||||
.collect::<Result<Vec<i64>, _>>().unwrap();
|
||||
let route_r = get_nta_by_stop_id(route_id, chrono::Utc::now(), chrono::Utc::now() + TimeDelta::minutes(30), &mut state.database.begin().await.unwrap()).await;
|
||||
if let Ok(route) = route_r {
|
||||
HttpResponse::Ok().json(route)
|
||||
} else {
|
||||
HttpResponse::InternalServerError().body(format!("Error {:?}", route_r.err()))
|
||||
}
|
||||
}
|
||||
|
||||
#[actix_web::main]
|
||||
|
|
@ -27,12 +244,23 @@ async fn main() -> ::anyhow::Result<()> {
|
|||
.connect(&connection_string)
|
||||
.await?;
|
||||
|
||||
let mut transaction = pool.begin().await?;
|
||||
HttpServer::new(|| {
|
||||
let state = Arc::new(AppState {
|
||||
database: pool
|
||||
});
|
||||
|
||||
HttpServer::new(move || {
|
||||
App::new()
|
||||
.service(hello)
|
||||
.wrap(actix_cors::Cors::permissive())
|
||||
.app_data(Data::new(state.clone()))
|
||||
.service(api_get_route)
|
||||
.service(api_get_schedule)
|
||||
.service(api_get_nta)
|
||||
.service(get_route)
|
||||
.service(get_routes)
|
||||
.service(get_index)
|
||||
.service(actix_files::Files::new("/assets", "./assets"))
|
||||
})
|
||||
.bind(("127.0.0.1", 8080))?
|
||||
.bind(("0.0.0.0", 8080))?
|
||||
.run()
|
||||
.await?;
|
||||
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue