live tracking and filter
Some checks failed
Create and publish a Docker image / build-and-push-image (push) Has been cancelled
Some checks failed
Create and publish a Docker image / build-and-push-image (push) Has been cancelled
This commit is contained in:
parent
534c36b0f7
commit
f5e0a31bb7
16 changed files with 414 additions and 115 deletions
161
api/src/services/trip_tracking.rs
Normal file
161
api/src/services/trip_tracking.rs
Normal file
|
|
@ -0,0 +1,161 @@
|
|||
use serde_json::Value;
|
||||
use serde::de;
|
||||
use std::sync::{Arc, Mutex};
|
||||
use std::thread;
|
||||
use std::collections::HashMap;
|
||||
use std::time::Duration;
|
||||
use log::{error, info};
|
||||
use serde::{Serialize, Deserialize, Deserializer};
|
||||
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
pub enum TripTracking {
|
||||
Tracked(LiveTrip),
|
||||
Untracked,
|
||||
Cancelled
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
pub struct LiveTrip {
|
||||
pub delay: f64,
|
||||
pub next_stop_id: Option<String>,
|
||||
pub timestamp: i64,
|
||||
pub vehicle_id: Option<String>
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
pub struct LiveTripJson {
|
||||
pub route_id: String,
|
||||
pub trip_id: String,
|
||||
pub service_id: Option<String>,
|
||||
pub trip_headsign: String,
|
||||
pub direction_id: i64,
|
||||
#[serde(deserialize_with = "de_numstr")]
|
||||
pub block_id: String,
|
||||
pub start_time: Option<String>,
|
||||
pub end_time: Option<String>,
|
||||
pub delay: f64,
|
||||
pub status: String,
|
||||
pub lat: Option<String>,
|
||||
pub lon: Option<String>,
|
||||
#[serde(deserialize_with = "de_numstrflo")]
|
||||
pub heading: Option<String>,
|
||||
#[serde(deserialize_with = "de_numstro")]
|
||||
pub next_stop_id: Option<String>,
|
||||
pub next_stop_name: Option<String>,
|
||||
pub next_stop_sequence: Option<i64>,
|
||||
pub seat_availability: Option<String>,
|
||||
pub vehicle_id: Option<String>,
|
||||
pub timestamp: i64
|
||||
}
|
||||
|
||||
const HOST: &str = "https://www3.septa.org";
|
||||
|
||||
struct TripTrackingServiceState {
|
||||
pub tracking_data: HashMap::<String, TripTracking>
|
||||
}
|
||||
|
||||
pub struct TripTrackingService {
|
||||
state: Arc<Mutex<TripTrackingServiceState>>
|
||||
}
|
||||
|
||||
impl TripTrackingService {
|
||||
const UPDATE_SECONDS: u64 = 75;
|
||||
|
||||
pub fn new() -> Self {
|
||||
Self {
|
||||
state: Arc::new(Mutex::new(TripTrackingServiceState{ tracking_data: HashMap::new()}))
|
||||
}
|
||||
}
|
||||
|
||||
pub fn start(&self) {
|
||||
let cloned_state = Arc::clone(&self.state);
|
||||
thread::spawn( move || {
|
||||
loop {
|
||||
info!("started");
|
||||
|
||||
let clonedx_state = Arc::clone(&cloned_state);
|
||||
let res = Self::update_live_trips(clonedx_state);
|
||||
|
||||
match res {
|
||||
Err(err) => {
|
||||
error!("{}", err);
|
||||
}
|
||||
_ => {}
|
||||
}
|
||||
|
||||
thread::sleep(Duration::from_secs(Self::UPDATE_SECONDS));
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
pub fn annotate_trips(&self, trips: &mut Vec<crate::database::Trip>) {
|
||||
for trip in trips {
|
||||
trip.tracking_data = match self.state.lock().unwrap().tracking_data.get(&trip.trip_id.clone()){
|
||||
Some(x) => x.clone(),
|
||||
None => TripTracking::Untracked
|
||||
};
|
||||
}
|
||||
}
|
||||
|
||||
fn update_live_trips(service: Arc<Mutex<TripTrackingServiceState>>) -> anyhow::Result<()> {
|
||||
let mut new_map: HashMap<String, TripTracking> = HashMap::new();
|
||||
let live_tracks = reqwest::blocking::get(format!("{}/api/v2/trips/", HOST))?.json::<Vec<LiveTripJson>>()?;
|
||||
|
||||
for live_track in live_tracks {
|
||||
let track: TripTracking = {
|
||||
if live_track.status == "NO GPS" {
|
||||
TripTracking::Untracked
|
||||
} else if live_track.status == "CANCELED" {
|
||||
TripTracking::Cancelled
|
||||
} else {
|
||||
TripTracking::Tracked(
|
||||
LiveTrip {
|
||||
delay: live_track.delay,
|
||||
next_stop_id: live_track.next_stop_id,
|
||||
timestamp: live_track.timestamp,
|
||||
vehicle_id: live_track.vehicle_id
|
||||
}
|
||||
)
|
||||
}
|
||||
};
|
||||
|
||||
if let TripTracking::Cancelled = track {
|
||||
}
|
||||
|
||||
new_map.insert(
|
||||
live_track.trip_id.clone(),
|
||||
track
|
||||
);
|
||||
}
|
||||
|
||||
info!("populated tracking data with {} entries", new_map.len());
|
||||
(service.lock().unwrap()).tracking_data = new_map;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
fn de_numstr<'de, D: Deserializer<'de>>(deserializer: D) -> Result<String, D::Error> {
|
||||
Ok(match Value::deserialize(deserializer)? {
|
||||
Value::String(s) => s,
|
||||
Value::Number(num) => num.as_i64().ok_or(de::Error::custom("Invalid number"))?.to_string(),
|
||||
_ => return Err(de::Error::custom("wrong type"))
|
||||
})
|
||||
}
|
||||
|
||||
fn de_numstro<'de, D: Deserializer<'de>>(deserializer: D) -> Result<Option<String>, D::Error> {
|
||||
Ok(match Value::deserialize(deserializer)? {
|
||||
Value::String(s) => Some(s),
|
||||
Value::Number(num) => Some(num.as_i64().ok_or(de::Error::custom("Invalid number"))?.to_string()),
|
||||
_ => None
|
||||
})
|
||||
}
|
||||
|
||||
fn de_numstrflo<'de, D: Deserializer<'de>>(deserializer: D) -> Result<Option<String>, D::Error> {
|
||||
Ok(match Value::deserialize(deserializer)? {
|
||||
Value::String(s) => Some(s),
|
||||
Value::Number(num) => Some(num.as_f64().ok_or(de::Error::custom("Invalid number"))?.to_string()),
|
||||
_ => None
|
||||
})
|
||||
}
|
||||
|
||||
Loading…
Add table
Add a link
Reference in a new issue