use chrono::Utc; use serde_json::Value; use serde::de; use sqlx::{Postgres, QueryBuilder, Transaction}; use std::sync::{Arc}; use futures::lock::Mutex; use std::collections::HashMap; use std::time::Duration; use log::{error, info}; use serde::{Serialize, Deserialize, Deserializer}; use libseptastic::stop_schedule::{LiveTrip, TripTracking}; #[derive(Debug, Clone, Serialize, Deserialize)] pub struct LiveTripJson { pub route_id: String, pub trip_id: String, pub service_id: Option, pub trip_headsign: String, #[serde(deserialize_with = "de_numstro")] pub direction_id: Option, #[serde(deserialize_with = "de_numstr")] pub block_id: String, pub start_time: Option, pub end_time: Option, pub delay: f64, pub status: String, pub lat: Option, pub lon: Option, #[serde(deserialize_with = "de_numstrflo")] pub heading: Option, #[serde(deserialize_with = "de_numstro")] pub next_stop_id: Option, pub next_stop_name: Option, pub next_stop_sequence: Option, pub seat_availability: Option, pub vehicle_id: Option, pub timestamp: i64 } const HOST: &str = "https://www3.septa.org"; struct TripTrackingServiceState { pub tracking_data: HashMap::, pub database: ::sqlx::postgres::PgPool } pub struct TripTrackingService { state: Arc> } impl TripTrackingService { const UPDATE_SECONDS: u64 = 75; pub async fn log_delay( transaction: &mut Transaction<'_, Postgres>, tracking_data: &HashMap::, timestamp: i64 ) -> ::anyhow::Result<()> { let mut query_builder: QueryBuilder = QueryBuilder::new( "INSERT INTO live_tracking ( delay_minutes, next_stop_id, timestamp, snapshot_timestamp, lat, lng, heading, seat_availability, vehicle_ids, trip_id, route_id ) VALUES" ); let mut separated = query_builder.separated(", "); for trip in tracking_data { if let TripTracking::Tracked(live_data) = trip.1 { separated.push("("); separated.push_bind_unseparated(live_data.delay); separated.push_bind(live_data.next_stop_id); separated.push_bind(live_data.timestamp); separated.push_bind(timestamp); separated.push_bind(live_data.latitude); separated.push_bind(live_data.longitude); separated.push_bind(live_data.heading); separated.push_bind(live_data.seat_availability.clone()); separated.push_bind(live_data.vehicle_ids.clone()); separated.push_bind(live_data.trip_id.clone()); separated.push_bind(live_data.route_id.clone()); separated.push_unseparated(")"); } } let query = query_builder.build(); query.execute(&mut **transaction).await?; Ok(()) } pub async fn new() -> Self { let connection_string = std::env::var("DB_CONNSTR").expect("Expected database connection string"); let pool = ::sqlx::postgres::PgPoolOptions::new() .max_connections(5) .connect(&connection_string) .await.unwrap(); Self { state: Arc::new(Mutex::new(TripTrackingServiceState{ tracking_data: HashMap::new(), database: pool})) } } pub fn start(&self) { info!("Starting live tracking service"); let cloned_state = Arc::clone(&self.state); tokio::spawn( async move { loop { let clonedx_state = Arc::clone(&cloned_state); let res = Self::update_live_trips(clonedx_state).await; match res { Err(err) => { error!("{:?}", err); } _ => {} } tokio::time::sleep(Duration::from_secs(Self::UPDATE_SECONDS)).await; } }); } pub async fn annotate_trips(&self, trips: &mut Vec) { for trip in trips { trip.tracking_data = match self.state.lock().await.tracking_data.get(&trip.trip_id.clone()){ Some(x) => x.clone(), None => TripTracking::Untracked }; } } async fn update_live_trips(service: Arc>) -> anyhow::Result<()> { let mut new_map: HashMap = HashMap::new(); let live_tracks = reqwest::get(format!("{}/api/v2/trips/", HOST)).await?.json::>().await?; for live_track in live_tracks { let track: TripTracking = { if live_track.status == "NO GPS" { TripTracking::Untracked } else if live_track.status == "CANCELED" { TripTracking::Cancelled } else { TripTracking::Tracked( LiveTrip { trip_id: live_track.trip_id.clone(), route_id: live_track.route_id, delay: live_track.delay, seat_availability: live_track.seat_availability, heading: match live_track.heading { Some(hdg) => if hdg != "" { Some(hdg.parse::()?)} else {None}, None => None }, latitude: match live_track.lat { Some(lat) => Some(lat.parse::()?), None => None }, longitude: match live_track.lon { Some(lon) => Some(lon.parse::()?), None => None }, next_stop_id: match live_track.next_stop_id { Some(x) => match x.parse() { Ok(y) => Some(y), Err(_) => None }, None => None }, timestamp: live_track.timestamp, vehicle_ids: match live_track.vehicle_id { Some(x) => x.split(",").map(|f| String::from(f)).collect(), None => vec![] } } ) } }; if let TripTracking::Cancelled = track { } new_map.insert( live_track.trip_id.clone(), track ); } let mut svc = service.lock().await; let mut tx = svc.database.begin().await?; Self::log_delay(&mut tx, &new_map, Utc::now().timestamp_nanos_opt().unwrap()).await?; tx.commit().await?; svc.tracking_data = new_map; Ok(()) } } fn de_numstr<'de, D: Deserializer<'de>>(deserializer: D) -> Result { Ok(match Value::deserialize(deserializer)? { Value::String(s) => s, Value::Number(num) => num.as_i64().ok_or(de::Error::custom("Invalid number"))?.to_string(), _ => return Err(de::Error::custom("wrong type")) }) } fn de_numstro<'de, D: Deserializer<'de>>(deserializer: D) -> Result, D::Error> { Ok(match Value::deserialize(deserializer)? { Value::String(s) => Some(s), Value::Number(num) => Some(num.as_i64().ok_or(de::Error::custom("Invalid number"))?.to_string()), _ => None }) } fn de_numstrflo<'de, D: Deserializer<'de>>(deserializer: D) -> Result, D::Error> { Ok(match Value::deserialize(deserializer)? { Value::String(s) => Some(s), Value::Number(num) => Some(num.as_f64().ok_or(de::Error::custom("Invalid number"))?.to_string()), _ => None }) }