add files and tracking
All checks were successful
Create and publish a Docker image / build-and-push-image (push) Successful in 5m33s
All checks were successful
Create and publish a Docker image / build-and-push-image (push) Successful in 5m33s
This commit is contained in:
parent
cc301dd1b8
commit
2d8f131b91
12 changed files with 176 additions and 3928 deletions
|
|
@ -65,7 +65,7 @@ async fn get_route_info(route_id: String, state: Data<Arc<AppState>>) -> ::anyho
|
|||
.filter(|dir| seen.insert(dir.direction.to_string()))
|
||||
.collect();
|
||||
|
||||
state.trip_tracking_service.annotate_trips(&mut trips);
|
||||
state.trip_tracking_service.annotate_trips(&mut trips).await;
|
||||
|
||||
Ok(RouteResponse{
|
||||
route,
|
||||
|
|
|
|||
|
|
@ -70,9 +70,9 @@ async fn get_index(req: HttpRequest) -> impl Responder {
|
|||
}).await
|
||||
}
|
||||
|
||||
#[actix_web::main]
|
||||
#[tokio::main]
|
||||
async fn main() -> ::anyhow::Result<()> {
|
||||
env_logger::init_from_env(Env::default().default_filter_or("septastic_api=info"));
|
||||
env_logger::init_from_env(Env::default().default_filter_or("septastic_api=info"));
|
||||
dotenv().ok();
|
||||
|
||||
let version: &str = option_env!("CARGO_PKG_VERSION").expect("Expected package version");
|
||||
|
|
@ -84,14 +84,13 @@ async fn main() -> ::anyhow::Result<()> {
|
|||
|
||||
let config_file = serde_yaml::from_str::<gtfs_pull::Config>(file_contents.as_str())?;
|
||||
|
||||
let tt_service = services::trip_tracking::TripTrackingService::new();
|
||||
let tt_service = services::trip_tracking::TripTrackingService::new().await;
|
||||
tt_service.start();
|
||||
|
||||
let svc = gtfs_pull::GtfsPullService::new(config_file);
|
||||
svc.start();
|
||||
svc.wait_for_ready();
|
||||
|
||||
|
||||
let state = Arc::new(AppState {
|
||||
gtfs_service: svc,
|
||||
trip_tracking_service: tt_service
|
||||
|
|
|
|||
|
|
@ -1,10 +1,13 @@
|
|||
use chrono::Utc;
|
||||
use serde_json::Value;
|
||||
use serde::de;
|
||||
use std::sync::{Arc, Mutex};
|
||||
use sqlx::{Execute, Postgres, QueryBuilder, Transaction};
|
||||
use std::sync::{Arc};
|
||||
use futures::lock::Mutex;
|
||||
use std::thread;
|
||||
use std::collections::HashMap;
|
||||
use std::time::Duration;
|
||||
use log::error;
|
||||
use log::{error, info};
|
||||
use serde::{Serialize, Deserialize, Deserializer};
|
||||
use libseptastic::stop_schedule::{LiveTrip, TripTracking};
|
||||
|
||||
|
|
@ -37,7 +40,8 @@ pub struct LiveTripJson {
|
|||
const HOST: &str = "https://www3.septa.org";
|
||||
|
||||
struct TripTrackingServiceState {
|
||||
pub tracking_data: HashMap::<String, TripTracking>
|
||||
pub tracking_data: HashMap::<String, TripTracking>,
|
||||
pub database: ::sqlx::postgres::PgPool
|
||||
}
|
||||
|
||||
pub struct TripTrackingService {
|
||||
|
|
@ -47,18 +51,78 @@ pub struct TripTrackingService {
|
|||
impl TripTrackingService {
|
||||
const UPDATE_SECONDS: u64 = 75;
|
||||
|
||||
pub fn new() -> Self {
|
||||
pub async fn log_delay(
|
||||
transaction: &mut Transaction<'_, Postgres>,
|
||||
tracking_data: &HashMap::<String, TripTracking>,
|
||||
timestamp: i64
|
||||
) -> ::anyhow::Result<()> {
|
||||
|
||||
let mut query_builder: QueryBuilder<Postgres> = QueryBuilder::new(
|
||||
"INSERT INTO
|
||||
live_tracking
|
||||
(
|
||||
delay_minutes,
|
||||
next_stop_id,
|
||||
timestamp,
|
||||
snapshot_timestamp,
|
||||
lat,
|
||||
lng,
|
||||
heading,
|
||||
seat_availability,
|
||||
vehicle_ids,
|
||||
trip_id,
|
||||
route_id
|
||||
)
|
||||
VALUES"
|
||||
);
|
||||
|
||||
let mut separated = query_builder.separated(", ");
|
||||
for trip in tracking_data {
|
||||
if let TripTracking::Tracked(live_data) = trip.1 {
|
||||
separated.push("(");
|
||||
separated.push_bind_unseparated(live_data.delay);
|
||||
separated.push_bind(live_data.next_stop_id);
|
||||
separated.push_bind(live_data.timestamp);
|
||||
separated.push_bind(timestamp);
|
||||
separated.push_bind(live_data.latitude);
|
||||
separated.push_bind(live_data.longitude);
|
||||
separated.push_bind(live_data.heading);
|
||||
separated.push_bind(live_data.seat_availability.clone());
|
||||
separated.push_bind(live_data.vehicle_ids.clone());
|
||||
separated.push_bind(live_data.trip_id.clone());
|
||||
separated.push_bind(live_data.route_id.clone());
|
||||
separated.push_unseparated(")");
|
||||
}
|
||||
}
|
||||
|
||||
let query = query_builder.build();
|
||||
//info!("{}", query.sql());
|
||||
query.execute(&mut **transaction).await?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn new() -> Self {
|
||||
let connection_string =
|
||||
std::env::var("DB_CONNSTR").expect("Expected database connection string");
|
||||
|
||||
let pool = ::sqlx::postgres::PgPoolOptions::new()
|
||||
.max_connections(5)
|
||||
.connect(&connection_string)
|
||||
.await.unwrap();
|
||||
|
||||
Self {
|
||||
state: Arc::new(Mutex::new(TripTrackingServiceState{ tracking_data: HashMap::new()}))
|
||||
state: Arc::new(Mutex::new(TripTrackingServiceState{ tracking_data: HashMap::new(), database: pool}))
|
||||
}
|
||||
}
|
||||
|
||||
pub fn start(&self) {
|
||||
info!("Starting live service");
|
||||
let cloned_state = Arc::clone(&self.state);
|
||||
thread::spawn( move || {
|
||||
tokio::spawn( async move {
|
||||
loop {
|
||||
let clonedx_state = Arc::clone(&cloned_state);
|
||||
let res = Self::update_live_trips(clonedx_state);
|
||||
let res = Self::update_live_trips(clonedx_state).await;
|
||||
|
||||
match res {
|
||||
Err(err) => {
|
||||
|
|
@ -67,23 +131,23 @@ impl TripTrackingService {
|
|||
_ => {}
|
||||
}
|
||||
|
||||
thread::sleep(Duration::from_secs(Self::UPDATE_SECONDS));
|
||||
tokio::time::sleep(Duration::from_secs(Self::UPDATE_SECONDS)).await;
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
pub fn annotate_trips(&self, trips: &mut Vec<libseptastic::stop_schedule::Trip>) {
|
||||
pub async fn annotate_trips(&self, trips: &mut Vec<libseptastic::stop_schedule::Trip>) {
|
||||
for trip in trips {
|
||||
trip.tracking_data = match self.state.lock().unwrap().tracking_data.get(&trip.trip_id.clone()){
|
||||
trip.tracking_data = match self.state.lock().await.tracking_data.get(&trip.trip_id.clone()){
|
||||
Some(x) => x.clone(),
|
||||
None => TripTracking::Untracked
|
||||
};
|
||||
}
|
||||
}
|
||||
|
||||
fn update_live_trips(service: Arc<Mutex<TripTrackingServiceState>>) -> anyhow::Result<()> {
|
||||
async fn update_live_trips(service: Arc<Mutex<TripTrackingServiceState>>) -> anyhow::Result<()> {
|
||||
let mut new_map: HashMap<String, TripTracking> = HashMap::new();
|
||||
let live_tracks = reqwest::blocking::get(format!("{}/api/v2/trips/", HOST))?.json::<Vec<LiveTripJson>>()?;
|
||||
let live_tracks = reqwest::get(format!("{}/api/v2/trips/", HOST)).await?.json::<Vec<LiveTripJson>>().await?;
|
||||
|
||||
for live_track in live_tracks {
|
||||
let track: TripTracking = {
|
||||
|
|
@ -94,9 +158,22 @@ impl TripTrackingService {
|
|||
} else {
|
||||
TripTracking::Tracked(
|
||||
LiveTrip {
|
||||
trip_id: live_track.trip_id,
|
||||
trip_id: live_track.trip_id.clone(),
|
||||
route_id: live_track.route_id,
|
||||
delay: live_track.delay,
|
||||
seat_availability: live_track.seat_availability,
|
||||
heading: match live_track.heading {
|
||||
Some(hdg) => if hdg != "" { Some(hdg.parse::<f64>()?)} else {None},
|
||||
None => None
|
||||
},
|
||||
latitude: match live_track.lat {
|
||||
Some(lat) => Some(lat.parse::<f64>()?),
|
||||
None => None
|
||||
},
|
||||
longitude: match live_track.lon {
|
||||
Some(lon) => Some(lon.parse::<f64>()?),
|
||||
None => None
|
||||
},
|
||||
next_stop_id: match live_track.next_stop_id {
|
||||
Some(x) => match x.parse() {
|
||||
Ok(y) => Some(y),
|
||||
|
|
@ -123,7 +200,16 @@ impl TripTrackingService {
|
|||
);
|
||||
}
|
||||
|
||||
(service.lock().unwrap()).tracking_data = new_map;
|
||||
info!("Logged live data");
|
||||
let mut svc = service.lock().await;
|
||||
|
||||
info!("Logged live data");
|
||||
|
||||
let mut tx = svc.database.begin().await?;
|
||||
Self::log_delay(&mut tx, &new_map, Utc::now().timestamp_nanos_opt().unwrap()).await?;
|
||||
tx.commit().await?;
|
||||
|
||||
svc.tracking_data = new_map;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue