revert to septa api for rt tracking

This commit is contained in:
Nicholas Orlowsky 2025-11-08 13:12:07 -05:00
parent f407992035
commit a335f14b14
No known key found for this signature in database
GPG key ID: A9F3BA4C0AA7A70B
18 changed files with 535 additions and 355 deletions

View file

@ -1,12 +1,10 @@
use actix_web::{get, web::{self, Data}, HttpRequest, HttpResponse, Responder};
use anyhow::anyhow;
use std::{time::Instant, sync::Arc};
use libseptastic::{route::RouteType, stop_schedule::Trip};
use std::{collections::HashSet, sync::Arc, time::Instant};
use libseptastic::{direction, route::RouteType, stop_schedule::Trip};
use serde::{Serialize, Deserialize};
use askama::Template;
use crate::AppState;
use crate::database;
#[get("/routes")]
async fn get_routes_html(req: HttpRequest, state: Data<Arc<AppState>>) -> impl Responder {
@ -15,8 +13,7 @@ async fn get_routes_html(req: HttpRequest, state: Data<Arc<AppState>>) -> impl R
async move {
let start_time = Instant::now();
let all_routes = database::get_all_routes(&mut statex.database.begin().await?).await?;
let all_routes: Vec<libseptastic::route::Route> = statex.gtfs_service.get_routes();
let rr_routes = all_routes.clone().into_iter().filter(|x| x.route_type == RouteType::RegionalRail).collect();
let subway_routes = all_routes.clone().into_iter().filter(|x| x.route_type == RouteType::SubwayElevated).collect();
let trolley_routes = all_routes.clone().into_iter().filter(|x| x.route_type == RouteType::Trolley).collect();
@ -40,7 +37,7 @@ async fn get_routes_html(req: HttpRequest, state: Data<Arc<AppState>>) -> impl R
#[get("/routes.json")]
async fn get_routes_json(state: Data<Arc<AppState>>) -> impl Responder {
let all_routes = database::get_all_routes(&mut state.database.begin().await.unwrap()).await.unwrap();
let all_routes: Vec<libseptastic::route::Route> = state.gtfs_service.get_routes();
HttpResponse::Ok().json(all_routes)
}
@ -58,11 +55,15 @@ pub struct RouteResponse {
}
async fn get_route_info(route_id: String, state: Data<Arc<AppState>>) -> ::anyhow::Result<RouteResponse> {
let mut tx = state.database.begin().await?;
let route = state.gtfs_service.get_route(route_id.clone())?;
let mut trips = state.gtfs_service.get_schedule(route_id)?;
let route = database::get_route_by_id(route_id.clone(), &mut tx).await?;
let directions = database::get_direction_by_route_id(route_id.clone(), &mut tx).await?;
let mut trips = database::get_schedule_by_route_id(route_id.clone(), &mut tx).await?;
let mut seen = HashSet::new();
let directions: Vec<_> = trips
.iter()
.map(|x| x.direction.clone())
.filter(|dir| seen.insert(dir.direction.to_string()))
.collect();
state.trip_tracking_service.annotate_trips(&mut trips);
@ -127,7 +128,7 @@ async fn api_get_route(state: Data<Arc<AppState>>, path: web::Path<String>) -> i
#[get("/api/route/{route_id}/schedule")]
async fn api_get_schedule(state: Data<Arc<AppState>>, path: web::Path<String>) -> impl Responder {
let route_id = path.into_inner();
let route_r = database::get_schedule_by_route_id(route_id, &mut state.database.begin().await.unwrap()).await;
let route_r: anyhow::Result<i32> = Ok(5);
if let Ok(route) = route_r {
HttpResponse::Ok().json(route)

View file

@ -1,179 +0,0 @@
use std::collections::HashMap;
use sqlx::{Postgres, Transaction};
use libseptastic::{stop_schedule::{Trip, TripTracking, StopSchedule}};
pub async fn get_route_by_id(
id: String,
transaction: &mut Transaction<'_, Postgres>,
) -> ::anyhow::Result<libseptastic::route::Route> {
let row = sqlx::query!(
r#"SELECT
id,
name,
short_name,
color_hex,
route_type as "route_type: libseptastic::route::RouteType"
FROM
septa_routes
WHERE
id = $1
;"#,
id
)
.fetch_one(&mut **transaction)
.await?;
return Ok(libseptastic::route::Route {
name: row.name,
short_name: row.short_name,
color_hex: row.color_hex,
route_type: row.route_type,
id: row.id,
});
}
pub async fn get_all_routes(
transaction: &mut Transaction<'_, Postgres>,
) -> ::anyhow::Result<Vec<libseptastic::route::Route>> {
let rows = sqlx::query!(
r#"SELECT
id,
name,
short_name,
color_hex,
route_type as "route_type: libseptastic::route::RouteType"
FROM
septa_routes
ORDER BY
CASE
WHEN id ~ '^[0-9]+$' THEN CAST(id AS INT)
ELSE NULL
END ASC,
id ASC;
;"#
)
.fetch_all(&mut **transaction)
.await?;
let mut routes = Vec::new();
for row in rows {
routes.push(libseptastic::route::Route {
name: row.name,
short_name: row.short_name,
color_hex: row.color_hex,
route_type: row.route_type,
id: row.id,
});
}
return Ok(routes);
}
pub async fn get_direction_by_route_id(
id: String,
transaction: &mut Transaction<'_, Postgres>,
) -> ::anyhow::Result<Vec<libseptastic::direction::Direction>> {
let rows = sqlx::query!(
r#"SELECT
route_id,
direction_id,
direction as "direction: libseptastic::direction::CardinalDirection",
direction_destination
FROM
septa_directions
WHERE
route_id = $1
;"#,
id
)
.fetch_all(&mut **transaction)
.await?;
let mut res = Vec::new();
for row in rows {
res.push(libseptastic::direction::Direction{
route_id: row.route_id,
direction_id: row.direction_id,
direction: row.direction,
direction_destination: row.direction_destination
});
}
return Ok(res);
}
pub async fn get_schedule_by_route_id(
id: String,
transaction: &mut Transaction<'_, Postgres>,
) -> ::anyhow::Result<Vec<Trip>> {
let schedule_day = chrono::Utc::now().with_timezone(&chrono_tz::America::New_York);
let schedule_day_str = schedule_day.format("%Y%m%d").to_string();
let rows = sqlx::query!(
r#"SELECT
septa_stop_schedules.route_id,
septa_stops.name as stop_name,
trip_id,
septa_stop_schedules.service_id,
septa_stop_schedules.direction_id,
arrival_time,
stop_id,
stop_sequence
FROM
septa_stop_schedules
INNER JOIN septa_stops
ON septa_stops.id = septa_stop_schedules.stop_id
INNER JOIN septa_schedule_days
ON septa_schedule_days.date = $2
AND
septa_schedule_days.service_id = septa_stop_schedules.service_id
WHERE
septa_stop_schedules.route_id = $1 OR septa_stop_schedules.route_id = 'B2'
;"#,
id.clone(),
schedule_day_str.clone()
)
.fetch_all(&mut **transaction)
.await?;
let mut sched_groups: HashMap<String, Vec<StopSchedule>> = HashMap::new();
for row in rows {
let arr = match sched_groups.get_mut(&row.trip_id) {
Some(x) => x,
None => {
sched_groups.insert(row.trip_id.clone(), Vec::new());
sched_groups.get_mut(&row.trip_id).unwrap()
}
};
arr.push(StopSchedule {
route_id: row.route_id,
stop_name: row.stop_name,
trip_id: row.trip_id,
service_id: row.service_id,
direction_id: row.direction_id,
arrival_time: row.arrival_time,
stop_id: row.stop_id,
stop_sequence: row.stop_sequence
});
}
let mut res = Vec::new();
for group in sched_groups {
res.push(Trip{
trip_id: group.0,
route_id: group.1[0].route_id.clone(),
schedule: group.1.clone(),
direction_id: group.1[0].direction_id.clone(),
tracking_data: TripTracking::Untracked
});
}
return Ok(res);
}

View file

@ -3,18 +3,17 @@ use env_logger::Env;
use log::*;
use dotenv::dotenv;
use serde::Deserialize;
use services::trip_tracking::{self};
use services::{gtfs_pull, trip_tracking::{self}};
use templates::ContentTemplate;
use std::{sync::Arc, time::Instant};
use std::{fs::File, io::Read, sync::Arc, time::Instant};
use askama::Template;
mod database;
mod services;
mod controllers;
mod templates;
pub struct AppState {
database: ::sqlx::postgres::PgPool,
gtfs_service: services::gtfs_pull::GtfsPullService,
trip_tracking_service: services::trip_tracking::TripTrackingService
}
@ -79,19 +78,22 @@ async fn main() -> ::anyhow::Result<()> {
let version: &str = option_env!("CARGO_PKG_VERSION").expect("Expected package version");
info!("Starting the SEPTASTIC Server v{} (commit: {})", version, "NONE");
let connection_string =
std::env::var("DB_CONNSTR").expect("Expected database connection string");
let mut file = File::open("../data_loader/config.yaml")?;
let mut file_contents = String::new();
file.read_to_string(&mut file_contents);
let pool = ::sqlx::postgres::PgPoolOptions::new()
.max_connections(5)
.connect(&connection_string)
.await?;
let config_file = serde_yaml::from_str::<gtfs_pull::Config>(file_contents.as_str())?;
let tt_service = trip_tracking::TripTrackingService::new();
let tt_service = services::trip_tracking::TripTrackingService::new();
tt_service.start();
let svc = gtfs_pull::GtfsPullService::new(config_file);
svc.start();
svc.wait_for_ready();
let state = Arc::new(AppState {
database: pool,
gtfs_service: svc,
trip_tracking_service: tt_service
});

View file

@ -1 +1,3 @@
pub mod trip_tracking;
pub mod gtfs_pull;
pub mod gtfs_rt;

View file

@ -97,7 +97,10 @@ impl TripTrackingService {
delay: live_track.delay,
next_stop_id: live_track.next_stop_id,
timestamp: live_track.timestamp,
vehicle_id: live_track.vehicle_id
vehicle_ids: match live_track.vehicle_id {
Some(x) => x.split(",").map(|f| String::from(f)).collect(),
None => vec![]
}
}
)
}

View file

@ -70,7 +70,7 @@ pub fn build_timetables(
let mut direction_trips: Vec<&Trip> = trips
.iter()
.filter(|trip| trip.direction_id == direction.direction_id)
.filter(|trip| trip.direction.direction == direction.direction)
.collect();
direction_trips.sort_by_key(|trip| {
@ -84,7 +84,7 @@ pub fn build_timetables(
for trip in direction_trips.clone() {
if let Some(last) = trip.schedule.last() {
if next_id == None && i64::from(seconds_since_midnight) < last.arrival_time {
next_id = Some(last.trip_id.clone());
next_id = Some(last.stop.id.to_string());
}
}
}
@ -105,12 +105,12 @@ pub fn build_timetables(
for (trip_index, trip) in direction_trips.iter().enumerate() {
for stop in &trip.schedule {
let entry = stop_map
.entry(stop.stop_id)
.or_insert((stop.stop_sequence, stop.stop_name.clone(), vec![None; direction_trips.len()]));
.entry(stop.stop.id)
.or_insert((stop.stop_sequence, stop.stop.name.clone(), vec![None; direction_trips.len()]));
// If this stop_id appears in multiple trips with different sequences, keep the lowest
entry.0 = entry.0.max(stop.stop_sequence);
entry.1 = stop.stop_name.clone();
entry.1 = stop.stop.name.clone();
entry.2[trip_index] = Some(stop.arrival_time);
}
}