diff --git a/data_loader/Cargo.lock b/data_loader/Cargo.lock index 51ba98b..62f9412 100644 --- a/data_loader/Cargo.lock +++ b/data_loader/Cargo.lock @@ -287,6 +287,7 @@ dependencies = [ "serde-map-to-array", "serde_json", "serde_repr", + "serde_yaml", "sqlx", "thiserror", "tokio", @@ -1668,6 +1669,19 @@ dependencies = [ "serde", ] +[[package]] +name = "serde_yaml" +version = "0.9.34+deprecated" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6a8b1a1a2ebf674015cc02edccce75287f1a0130d394307b36743c2f5d504b47" +dependencies = [ + "indexmap", + "itoa", + "ryu", + "serde", + "unsafe-libyaml", +] + [[package]] name = "sha1" version = "0.10.6" @@ -2272,6 +2286,12 @@ version = "0.1.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e70f2a8b45122e719eb623c01822704c4e0907e7e426a05927e1a1cfff5b75d0" +[[package]] +name = "unsafe-libyaml" +version = "0.2.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "673aac59facbab8a9007c7f6108d11f63b603f7cabff99fabf650fea5c32b861" + [[package]] name = "untrusted" version = "0.9.0" diff --git a/data_loader/Cargo.toml b/data_loader/Cargo.toml index 00ecaa0..4b39f8c 100644 --- a/data_loader/Cargo.toml +++ b/data_loader/Cargo.toml @@ -17,3 +17,4 @@ libseptastic = { path = "../libseptastic/" } env_logger = "0.11.8" log = "0.4.27" reqwest = { version = "0.12.22", features = [ "json", "blocking" ] } +serde_yaml = "0.9.34" diff --git a/data_loader/config.yaml b/data_loader/config.yaml new file mode 100644 index 0000000..9b680f7 --- /dev/null +++ b/data_loader/config.yaml @@ -0,0 +1,4 @@ +gtfs_zips: + - "https://www3.septa.org/developer/gtfs_public.zip" + - "https://www.njtransit.com/rail_data.zip" + - "https://www.njtransit.com/bus_data.zip" diff --git a/data_loader/src/fetchers/mod.rs b/data_loader/src/fetchers/mod.rs deleted file mode 100644 index 7edc9ae..0000000 --- a/data_loader/src/fetchers/mod.rs +++ /dev/null @@ -1,12 +0,0 @@ -use libseptastic::{direction::Direction, route::Route, route_stop::RouteStop, schedule_day::ScheduleDay, stop::Stop, stop_schedule::StopSchedule}; - -pub mod septa; - -pub struct AuthorityInfo { - pub routes: Vec, - pub stops: Vec, - pub route_stops: Vec, - pub stop_schedules: Vec, - pub schedule_days: Vec, - pub directions: Vec -} diff --git a/data_loader/src/fetchers/septa.rs b/data_loader/src/fetchers/septa.rs deleted file mode 100644 index 8a40774..0000000 --- a/data_loader/src/fetchers/septa.rs +++ /dev/null @@ -1,258 +0,0 @@ -use dotenv::dotenv; -use libseptastic::stop::Stop; -use crate::septa::route_stop; -use crate::septa_json::direction::Direction; -use crate::septa_json::route::Route; -use crate::septa_json::route_stop::RouteStop; -use crate::septa_json::schedule_day::{Calendar, ScheduleDay}; -use crate::septa_json::stop_schedule::StopSchedule; -use sqlx::postgres::PgPoolOptions; -use std::collections::{HashMap, HashSet, VecDeque}; -use std::sync::{Arc, Mutex}; -use std::{fs, thread}; -use std::path::Path; -use std::time::SystemTime; -use env_logger::{Builder, Env}; -use log::{error, info, warn}; -use crate::traits::*; - -use super::AuthorityInfo; - -async fn convert_routes(json_data: Vec) -> ::anyhow::Result> { - let mut parsed_data: Vec = Vec::new(); - - for json_single in json_data { - parsed_data.push(*libseptastic::route::Route::from_septa_json(json_single)?); - } - - Ok(parsed_data) -} - -async fn convert_stops(json_data: Vec) -> ::anyhow::Result> { - let mut parsed_data: Vec = Vec::new(); - - for json_single in json_data { - parsed_data.push(*libseptastic::stop::Stop::from_septa_json(json_single)?); - } - - Ok(parsed_data) -} - -async fn convert_route_stops(json_data: Vec) -> ::anyhow::Result> { - let mut parsed_data: Vec = Vec::new(); - - for json_single in json_data { - parsed_data.push(*libseptastic::route_stop::RouteStop::from_septa_json(json_single)?); - } - - Ok(parsed_data) -} - -async fn convert_stop_schedules(json_data:Vec) -> ::anyhow::Result> { - let mut parsed_data: Vec = Vec::new(); - - for json_single in json_data { - parsed_data.push(*libseptastic::stop_schedule::StopSchedule::from_septa_json(json_single)?); - } - - - Ok(parsed_data) -} - - -async fn convert_schedule_days(json_data: crate::septa_json::schedule_day::Calendar) -> ::anyhow::Result> { - let mut parsed_data: Vec = Vec::new(); - - parsed_data.append(&mut *Vec::::from_septa_json(json_data)?); - - Ok(parsed_data) -} - -async fn convert_directions(json_data: Vec) -> ::anyhow::Result> { - let mut parsed_data: Vec = Vec::new(); - - for json_single in json_data { - parsed_data.push(*libseptastic::direction::Direction::from_septa_json(json_single)?); - } - - Ok(parsed_data) -} - - -pub struct SeptaFetcher{ -} - -impl SeptaFetcher { - const HOST: &str = "https://flat-api.septa.org"; - - async fn fetch_version() -> ::anyhow::Result { - let unix_seconds = SystemTime::now().duration_since(SystemTime::UNIX_EPOCH)?.as_secs(); - let mut version = reqwest::get(format!("{}/version.txt?t={}", Self::HOST, unix_seconds)).await?.text().await?; - version = version.trim().to_string(); - Ok(version) - } - - async fn fetch_routes(version: &String) -> ::anyhow::Result> { - Ok(reqwest::get(format!("{}/routes.json?v={}", Self::HOST, version)).await?.json::>().await?) - } - - async fn fetch_route_stops(routes: &mut Vec, version: &String) -> ::anyhow::Result> { - let mut all_route_stops: Vec = Vec::new(); - - let mut i = 0; - while i < routes.len() { - let route = &routes[i]; - if let Ok(route_stops_json) = reqwest::get(format!("{}/stops/{}/stops.json?v={}", Self::HOST, route.route_id, version)).await { - if let Ok(mut route_stops) = route_stops_json.json::>().await { - all_route_stops.append(&mut route_stops); - i += 1; - continue; - } - } - - warn!("Encountered failure downloading route stops for Route {}, removing", route.route_id); - routes.swap_remove(i); - } - - if routes.len() == 0 { - error!("Unable to download SEPTA routes"); - return Err(anyhow::anyhow!("Unable to successfully download any SEPTA routes")); - } - - Ok(all_route_stops) - } - - async fn fetch_directions(version: &String) -> ::anyhow::Result> { - Ok(reqwest::get(format!("{}/directions.json?v={}", Self::HOST, version)).await?.json::>().await?) - } - - async fn fetch_calendar(version: &String) -> ::anyhow::Result { - Ok(reqwest::get(format!("{}/calendar.json?v={}", Self::HOST, version)).await?.json::().await?) - } - - - async fn fetch_schedules(stops: &Vec, version: &String) -> ::anyhow::Result> { - const NUM_THREADS: usize = 10; - - let mut threads = Vec::with_capacity(NUM_THREADS); - let mut stop_ids: HashSet = HashSet::new(); - let results = Arc::new(Mutex::new(Vec::new())); - let mut worker_queues: Vec> = Vec::with_capacity(NUM_THREADS); - - for _ in 0..NUM_THREADS { - worker_queues.push(VecDeque::new()); - } - - for stop in stops { - stop_ids.insert(stop.stop_id); - } - - let mut cur_thread_no = 0; - for stop_id in stop_ids { - worker_queues[cur_thread_no].push_back((stop_id, 0)); - cur_thread_no = (cur_thread_no + 1) % NUM_THREADS; - } - - - for thread_no in 0..NUM_THREADS { - let mut worker_queue = worker_queues[thread_no].clone(); - let local_result = results.clone(); - let local_version = version.clone(); - - threads.push(thread::spawn(move || { - let mut stop_schedules: Vec = Vec::new(); - - while let Some(stop_id) = worker_queue.pop_front() { - let uri = format!("{}/schedules/stop_schedule/{}.json?v={}", Self::HOST, stop_id.0, local_version); - - match (||{reqwest::blocking::get(uri)?.json::>()})() { - Ok(mut element) => (stop_schedules.append(&mut element)), - Err(_) => { - if stop_id.1 < 5 { - error!("Error downloading stop schedule for {} (try {}). Retrying.", stop_id.0, stop_id.1); - worker_queue.push_back((stop_id.0, stop_id.1 + 1)); - } else { - error!("Error downloading stop schedule for {} (try {}). Giving up.", stop_id.0, stop_id.1); - } - } - } - } - - if let Ok(mut res) = local_result.lock() { - res.append(&mut stop_schedules); - } - })); - } - - for thread in threads { - thread.join(); - } - - Ok(results.lock().unwrap().clone()) - } - - pub async fn fetch_septa_data() -> ::anyhow::Result { - let version = Self::fetch_version().await?; - info!("Got SEPTA schedule version {}", version); - - let mut routes = Self::fetch_routes(&version).await?; - info!("Discovered {} SEPTA routes", routes.len()); - - routes = routes.into_iter().filter(|x| x.release_name == "20250928").collect(); - - let route_stops = Self::fetch_route_stops(&mut routes, &version).await?; - info!("Stop data for {} stops on {} routes successfully downloaded", route_stops.len(), routes.len()); - - let directions = Self::fetch_directions(&version).await?; - info!("Directions data successfully downloaded for {} route directions", directions.len()); - - let mut calendar = Self::fetch_calendar(&version).await?; - info!("Calendar data successfully downloaded for {} days", calendar.len()); - for entry in calendar.clone() { - if entry.1.release_name != "20250928" { - calendar.remove(&entry.0.clone()); - } - } - - let mut schedule_stops = Self::fetch_schedules(&route_stops, &version).await?; - info!("Schedule data downloaded for {} scheduled stops", schedule_stops.len()); - schedule_stops = schedule_stops.into_iter().filter(|x| x.release_name == "20250928").collect(); - - let mut stop_map: HashMap = HashMap::new(); - let mut stop_set: HashSet = HashSet::new(); - - for stop in route_stops.iter() { - stop_map.insert(stop.stop_name.clone(), stop.stop_id); - stop_set.insert(stop.stop_id); - } - - - let mut route_map: HashSet = HashSet::new(); - - for route in routes.iter() { - if route_map.contains(&route.route_id) { - error!("Duplicate route found for {}", route.route_id.clone()); - } - route_map.insert(route.route_id.clone()); - } - - let filtered_directions: Vec<_> = directions.iter() - .filter(|direction| { - let keep = route_map.contains(&direction.route); - if !keep { - warn!( - "Removing route '{}' from direction data (This data has old and new wayfinding values)", - direction.route - ); - } - keep - }) - .cloned() - .collect(); - - info!("Data is valid"); - - - Ok(AuthorityInfo { routes: convert_routes(routes).await?, stops: convert_stops(route_stops.clone()).await?, route_stops: convert_route_stops(route_stops).await?, stop_schedules: convert_stop_schedules(schedule_stops).await?, schedule_days: convert_schedule_days(calendar).await?, directions: convert_directions(filtered_directions).await? }) - } -} diff --git a/data_loader/src/main.rs b/data_loader/src/main.rs index e51f72b..8c684ab 100644 --- a/data_loader/src/main.rs +++ b/data_loader/src/main.rs @@ -1,27 +1,11 @@ +use std::{clone, env, fs::File, io::{Read, Write}, path::{Path, PathBuf}, sync::{Arc, Mutex}, thread, time::Duration}; + use dotenv::dotenv; -use libseptastic::stop::Stop; -use septa::route_stop; -use septa_json::direction::Direction; -use septa_json::route::Route; -use septa_json::route_stop::RouteStop; -use septa_json::schedule_day::{Calendar, ScheduleDay}; -use septa_json::stop_schedule::StopSchedule; +use serde::{Deserialize, Serialize}; use sqlx::postgres::PgPoolOptions; -use std::collections::{HashMap, HashSet}; -use std::sync::{Arc, Mutex}; -use std::{fs, thread}; -use std::path::Path; -use std::time::SystemTime; use env_logger::{Builder, Env}; use log::{error, info, warn}; -pub mod traits; -use traits::*; - -pub mod septa_json; -pub mod septa; -pub mod fetchers; - #[tokio::main] async fn main() -> ::anyhow::Result<()> { @@ -29,46 +13,86 @@ async fn main() -> ::anyhow::Result<()> { let env = Env::new().filter_or("RUST_LOG", "data_loader=info"); Builder::from_env(env).init(); + + let mut file = File::open("config.yaml")?; + let mut file_contents = String::new(); + file.read_to_string(&mut file_contents); + + let config_file = serde_yaml::from_str::(file_contents.as_str()); + let database_url = std::env::var("DATABASE_URL").expect("Database URL"); let pool = PgPoolOptions::new() .max_connections(5) .connect(&database_url) .await?; - - let septa_data = fetchers::septa::SeptaFetcher::fetch_septa_data().await?; - + let mut tx = pool.begin().await?; - libseptastic::route::Route::create_table(&mut tx).await?; - libseptastic::direction::Direction::create_table(&mut tx).await?; - libseptastic::stop::Stop::create_table(&mut tx).await?; - libseptastic::route_stop::RouteStop::create_table(&mut tx).await?; - libseptastic::stop_schedule::StopSchedule::create_table(&mut tx).await?; - libseptastic::schedule_day::ScheduleDay::create_table(&mut tx).await?; - - - - info!("Inserting Route Data"); - libseptastic::route::Route::insert_many(septa_data.routes, &mut tx).await?; - - info!("Inserting Direction Data"); - libseptastic::direction::Direction::insert_many(septa_data.directions, &mut tx).await?; - - info!("Inserting Stop Data"); - libseptastic::stop::Stop::insert_many(septa_data.stops, &mut tx).await?; - - info!("Inserting Route-Stop Data"); - libseptastic::route_stop::RouteStop::insert_many(septa_data.route_stops, &mut tx).await?; - - info!("Inserting Stop Schedule Data"); - libseptastic::stop_schedule::StopSchedule::insert_many(septa_data.stop_schedules, &mut tx).await?; - - info!("Inserting Schedule Day Data"); - libseptastic::schedule_day::ScheduleDay::insert_many(septa_data.schedule_days, &mut tx).await?; - - tx.commit().await?; pool.close().await; Ok(()) } + +#[derive(Serialize, Deserialize, PartialEq, Debug)] +struct Config { + gtfs_zips: Vec +} + +struct GtfsFile { + pub url: String, + pub hash: Option +} + +struct GtfsPullServiceState { + pub gtfs_files: Vec, + pub tmp_dir: PathBuf +} + +pub struct GtfsPullService { + state: Arc> +} + +impl GtfsPullService { + const UPDATE_SECONDS: u64 = 3600*24; + + pub fn new(config: Config) -> Self { + Self { + state: Arc::new(Mutex::new( + GtfsPullServiceState { + gtfs_files: config.gtfs_zips.iter().map(|f| { GtfsFile { url: f.clone(), hash: None} }).collect(), + tmp_dir: env::temp_dir() + } + )) + } + } + + pub fn start(&self) { + let cloned_state = Arc::clone(&self.state); + thread::spawn(move || { + loop { + let recloned_state = Arc::clone(&cloned_state); + let res = Self::update_gtfs_data(recloned_state); + + match res { + Err(err) => { + error!("{}", err); + } + _ => {} + } + + thread::sleep(Duration::from_secs(Self::UPDATE_SECONDS)); + } + }); + } + + pub fn update_gtfs_data(state: Arc>) -> anyhow::Result<()> { + let l_state = state.lock().unwrap(); + + for gtfs_file in l_state.gtfs_files.iter() { + let resp = reqwest::blocking::get(gtfs_file.url.clone())?; + } + + Ok(()) + } +} diff --git a/data_loader/src/septa/direction.rs b/data_loader/src/septa/direction.rs deleted file mode 100644 index 515e775..0000000 --- a/data_loader/src/septa/direction.rs +++ /dev/null @@ -1,85 +0,0 @@ -use libseptastic::direction::{CardinalDirection, Direction}; -use sqlx::{Postgres, Transaction}; -use crate::traits::{DbObj, FromSeptaJson}; -use crate::septa_json; - - -impl DbObj for Direction { - async fn create_table(tx: &mut Transaction<'_, Postgres>) -> anyhow::Result<()> { - sqlx::query(" - CREATE TABLE IF NOT EXISTS septa_directions ( - route_id TEXT NOT NULL, - direction_id BIGINT NOT NULL, - direction septa_direction_type NOT NULL, - direction_destination TEXT NOT NULL, - - FOREIGN KEY (route_id) REFERENCES septa_routes(id) ON DELETE CASCADE, - PRIMARY KEY (route_id, direction_id) - - ); - ") - .execute(&mut **tx) - .await?; - - Ok(()) - } - async fn insert_many(dirs: Vec, tx: &mut Transaction<'_, Postgres>) -> anyhow::Result<()> { - let mut route_ids: Vec = Vec::new(); - let mut direction_ids: Vec = Vec::new(); - let mut directions: Vec = Vec::new(); - let mut direction_destinations: Vec = Vec::new(); - - for dir in dirs { - route_ids.push(dir.route_id); - direction_ids.push(dir.direction_id); - directions.push(dir.direction); - direction_destinations.push(dir.direction_destination); - } - - sqlx::query(" - INSERT INTO septa_directions ( - route_id, - direction_id, - direction, - direction_destination - ) - SELECT * FROM UNNEST( - $1::text[], - $2::bigint[], - $3::septa_direction_type[], - $4::text[] - ); - ") - .bind(&route_ids[..]) - .bind(&direction_ids[..]) - .bind(&directions[..]) - .bind(&direction_destinations[..]) - .execute(&mut **tx) - .await?; - - Ok(()) - } - - async fn insert(&self, tx: &mut Transaction<'_, Postgres>) -> anyhow::Result<()> { - Self::insert_many(vec![self.clone()], tx).await?; - Ok(()) - } -} - -impl FromSeptaJson for Direction { - fn from_septa_json(json_dir: septa_json::direction::Direction) -> ::anyhow::Result> { - Ok(Box::new(Direction { route_id: json_dir.route, direction_id: json_dir.direction.parse::()?, - direction: match json_dir.true_direction.as_str().clone() { - "Eastbound" => Ok(libseptastic::direction::CardinalDirection::Eastbound), - "Westbound" => Ok(libseptastic::direction::CardinalDirection::Westbound), - "Outbound" => Ok(libseptastic::direction::CardinalDirection::Outbound), - "Inbound" => Ok(libseptastic::direction::CardinalDirection::Inbound), - "Southbound" => Ok(libseptastic::direction::CardinalDirection::Southbound), - "Northbound" => Ok(libseptastic::direction::CardinalDirection::Northbound), - "LOOP" => Ok(libseptastic::direction::CardinalDirection::Loop), - "Loop" => Ok(libseptastic::direction::CardinalDirection::Loop), - _ => Err(anyhow::anyhow!(format!("Unable to find right direction {}", json_dir.true_direction))) - }? - , direction_destination: json_dir.direction_destination })) - } -} diff --git a/data_loader/src/septa/mod.rs b/data_loader/src/septa/mod.rs deleted file mode 100644 index 07ad21e..0000000 --- a/data_loader/src/septa/mod.rs +++ /dev/null @@ -1,7 +0,0 @@ -pub mod route; -pub mod direction; -pub mod stop; -pub mod route_stop; -pub mod stop_schedule; -pub mod schedule_day; -pub mod ridership; diff --git a/data_loader/src/septa/ridership.rs b/data_loader/src/septa/ridership.rs deleted file mode 100644 index 4049608..0000000 --- a/data_loader/src/septa/ridership.rs +++ /dev/null @@ -1,216 +0,0 @@ -use std::collections::HashMap; - -use libseptastic::direction::CardinalDirection; -use libseptastic::ridership::Ridership; -use sqlx::{Postgres, Transaction}; -use crate::traits::{DbObj, FromSeptaJson, FromSeptaJsonAndStations}; -use crate::septa_json; - - -impl DbObj for Ridership { - async fn create_table(tx: &mut Transaction<'_, Postgres>) -> anyhow::Result<()> { - sqlx::query(" - CREATE TABLE IF NOT EXISTS septa_ridership ( - route_id TEXT NOT NULL, - stop_id bigint NOT NULL, - direction septa_direction_type, - exp_ons bigint, - exp_offs bigint, - ons bigint, - offs bigint, - year bigint, - - PRIMARY KEY (route_id, stop_id, direction), - - FOREIGN KEY (route_id) REFERENCES septa_routes(id) ON DELETE CASCADE, - FOREIGN KEY (stop_id) REFERENCES septa_stops(id) ON DELETE CASCADE - ); - ") - .execute(&mut **tx) - .await?; - - Ok(()) - } - - async fn insert_many(dirs: Vec, tx: &mut Transaction<'_, Postgres>) -> anyhow::Result<()> { - let mut route_ids: Vec = Vec::new(); - let mut stop_ids: Vec = Vec::new(); - let mut directions: Vec = Vec::new(); - let mut exp_onss: Vec = Vec::new(); - let mut exp_offss: Vec = Vec::new(); - let mut onss: Vec = Vec::new(); - let mut offss: Vec = Vec::new(); - let mut years: Vec = Vec::new(); - - for dir in dirs { - route_ids.push(dir.route_id); - stop_ids.push(dir.stop_id); - directions.push(dir.direction); - exp_onss.push(dir.exp_ons); - exp_offss.push(dir.exp_offs); - onss.push(dir.ons); - offss.push(dir.offs); - years.push(dir.year); - } - - sqlx::query(" - INSERT INTO - septa_ridership - ( - route_id, - stop_id, - direction, - exp_ons, - exp_offs, - ons, - offs, - year - ) - SELECT * FROM UNNEST( - $1::text[], - $2::bigint[], - $3::septa_direction_type[], - $4::bigint[], - $5::bigint[], - $6::bigint[], - $7::bigint[], - $8::bigint[] - ) - ON CONFLICT DO NOTHING - ; - ") - .bind(&route_ids[..]) - .bind(&stop_ids[..]) - .bind(&directions[..]) - .bind(&exp_onss[..]) - .bind(&exp_offss[..]) - .bind(&onss[..]) - .bind(&offss[..]) - .bind(&years[..]) - .execute(&mut **tx) - .await?; - - Ok(()) - } - - async fn insert(&self, tx: &mut Transaction<'_, Postgres>) -> anyhow::Result<()> { - Self::insert_many(vec![self.clone()], tx).await?; - Ok(()) - } -} - -impl FromSeptaJsonAndStations for Ridership { - fn from_septa_json(json_rider: septa_json::ridership::Ridership, stop_map: &HashMap) -> ::anyhow::Result> { - Ok(Box::new(Ridership { - route_id: match json_rider.route_id.as_str() { - // SEPTA Metro - "MFL" => "L1", - "BSL" => "B1", - "MFO" => "L1 OWL", - "BSO" => "B1 OWL", - "J" => "41", - "L" => "51", - "G" => "63", - "H" => "71", - "XH" => "81", - "R" => "82", - "101" => "D1", - "102" => "D2", - "15" => "G1", - "NHSL" => "M1", - "10" => "T1", - "34" => "T2", - "13" => "T3", - "11" => "T4", - "36" => "T5", - // Regional Rail Remap - "Airport" => "AIR", - "Chestnut Hill West" => "CHW", - "Chestnut Hill East" => "CHE", - "Cynwyd" => "CYN", - "Fox Chase" => "FOX", - "Lansdale/Doylestown" => "LAN", - "Media/Wawa" => "MED", - "Manyunk/Norristown" => "NOR", - "Paoli/Thorndale" => "PAO", - "Trenton" => "TRE", - "Warminster" => "WAR", - "Wilmington/Newark" => "WIL", - "West Trenton" => "WTR", - any => any - }.to_string(), - stop_id: { - if json_rider.stop_code != "" { - Ok(json_rider.stop_code.parse::()?) - } else { - if let Some(sid) = stop_map.get(&(match json_rider.stop_name.as_str() { - "Churchmans Crossing" => "Churchman's Crossing", - "Prospect Park" => "Prospect Park - Moore", - "Highland Ave." => "Highland Avenue", - "Chester TC" => "Chester Transit Center", - "University City" => "Penn Medicine Station", - "Wynnefield" => "Wynnefield Avenue", - "Temple" => "Temple University", - "Fern Rock" => "Fern Rock T C", - "Jenkintown" => "Jenkintown Wyncote", - "Cornwells Height" => "Cornwells Heights", - "Levittown" => "Levittown Station", - "9th Street" => "9th Street Lansdale", - "Del Val College" => "Delaware Valley College", - "Sedwick" => "Sedgwick", - "Allen Lane" => "Richard Allen Lane", - "Elm Street" => "Norristown - Elm Street", - "Norristown" => "Norristown Transit Center", - "Neshaminy Falls" => "Neshaminy", - "Moylan Rose Valley" => "Moylan-Rose Valley", - "Morton" => "Morton-Rutledge", - "Easwick" => "Eastwick", - "30th Street Station" => "Gray 30th Street", - "Holmesburg Jct" => "Holmesburg Junction", - any => any - }.to_string())) { - Ok(sid.clone()) - } else { - Err(::anyhow::anyhow!("Station {} not found", json_rider.stop_name)) - } - } - }?, - exp_ons: json_rider.exp_ons.parse()?, - exp_offs: json_rider.exp_offs.parse()?, - ons: json_rider.ons.parse()?, - offs: json_rider.offs.parse()?, - year: 2024, //FIXME FIXME! Actually parse - direction: match json_rider.direction.as_str() { - "Eastbound" => Ok(CardinalDirection::Eastbound), - "Westbound" => Ok(CardinalDirection::Westbound), - "Northbound" => Ok(CardinalDirection::Northbound), - "Southbound" => Ok(CardinalDirection::Southbound), - "Inbound" => Ok(CardinalDirection::Inbound), - "Outbound" => Ok(CardinalDirection::Outbound), - "Loop" => Ok(CardinalDirection::Loop), - "" => match json_rider.route_id.as_str() { - "AIR" => Ok(CardinalDirection::Inbound), - "CHE" => Ok(CardinalDirection::Outbound), - "CHW" => Ok(CardinalDirection::Inbound), - "CYN" => Ok(CardinalDirection::Inbound), - "FOX" => Ok(CardinalDirection::Outbound), - "LAN" => Ok(CardinalDirection::Outbound), - "NOR" => Ok(CardinalDirection::Outbound), - "MED" => Ok(CardinalDirection::Inbound), - "PAO" => Ok(CardinalDirection::Inbound), - "TRE" => Ok(CardinalDirection::Inbound), - "WAR" => Ok(CardinalDirection::Outbound), - "WTR" => Ok(CardinalDirection::Outbound), - "WIL" => Ok(CardinalDirection::Inbound), - _ => Err(anyhow::anyhow!("Bad dir value RR route id {}", json_rider.route_id)) - }, - _ => Err( - ::anyhow::anyhow!( - "Unknown true direction {} for Route {}", - json_rider.direction, - json_rider.route_id - ) - ) - }?})) - } -} diff --git a/data_loader/src/septa/route.rs b/data_loader/src/septa/route.rs deleted file mode 100644 index d5ee633..0000000 --- a/data_loader/src/septa/route.rs +++ /dev/null @@ -1,187 +0,0 @@ -use sqlx::{Postgres, Transaction}; -use crate::traits::{DbObj, FromSeptaJson}; -use crate::septa_json; -use libseptastic::route::{Route, RouteType, InterlinedRoute}; - -impl DbObj for Route { - - async fn create_table(tx: &mut Transaction<'_, Postgres>) -> anyhow::Result<()> { - sqlx::query(" - CREATE TYPE septa_route_type AS ENUM ( - 'trolley', - 'subway_elevated', - 'regional_rail', - 'bus', - 'trackless_trolley' - ); - ") - .execute(&mut **tx) - .await?; - - sqlx::query(" - CREATE TYPE septa_direction_type AS ENUM ( - 'northbound', - 'southbound', - 'eastbound', - 'westbound', - 'inbound', - 'outbound', - 'loop' - ); - ") - .execute(&mut **tx) - .await?; - - sqlx::query(" - CREATE TABLE IF NOT EXISTS septa_routes ( - id VARCHAR(8) PRIMARY KEY, - name VARCHAR(64) NOT NULL, - short_name VARCHAR(32) NOT NULL, - color_hex VARCHAR(6) NOT NULL, - route_type septa_route_type NOT NULL - ); - ") - .execute(&mut **tx) - .await?; - - sqlx::query(" - CREATE TABLE IF NOT EXISTS interlined_routes ( - id VARCHAR(8) PRIMARY KEY, - name VARCHAR(64) NOT NULL - ); - ") - .execute(&mut **tx) - .await?; - - sqlx::query(" - CREATE TABLE IF NOT EXISTS interlined_routes_subroutes ( - interline_id VARCHAR(8), - route_id VARCHAR(8), - FOREIGN KEY (interline_id) REFERENCES interlined_routes(id), - FOREIGN KEY (route_id) REFERENCES septa_routes(id) - ); - ") - .execute(&mut **tx) - .await?; - - Ok(()) - } - -async fn insert_many(routes: Vec, tx: &mut Transaction<'_, Postgres>) -> anyhow::Result<()> { - let mut names = Vec::new(); - let mut short_names = Vec::new(); - let mut color_hexes = Vec::new(); - let mut route_types: Vec = Vec::new(); - let mut ids = Vec::new(); - - for route in routes { - ids.push(route.id); - names.push(route.name); - short_names.push(route.short_name); - color_hexes.push(route.color_hex); - route_types.push(route.route_type); - } - - sqlx::query(" - INSERT INTO - septa_routes - ( - id, - name, - short_name, - color_hex, - route_type - ) - SELECT * FROM UNNEST( - $1::text[], - $2::text[], - $3::text[], - $4::text[], - $5::septa_route_type[] - ); - ") - .bind(&ids) - .bind(&names) - .bind(&short_names) - .bind(&color_hexes) - .bind(&route_types) - .execute(&mut **tx) - .await?; - - let ir: InterlinedRoute = InterlinedRoute { - interline_id: String::from("B"), - interline_name: String::from("Broad Street Line"), - interlined_routes: vec![String::from("B1"), String::from("B2"), String::from("B3")] - }; - - sqlx::query(" - INSERT INTO - interlined_routes - ( - id, - name - ) - VALUES - ( - $1, - $2 - ); - ") - .bind(&ir.interline_id) - .bind(&ir.interline_name) - .execute(&mut **tx) - .await?; - - for route in ir.interlined_routes { - sqlx::query(" - INSERT INTO - interlined_routes_subroutes - ( - interline_id, - route_id - ) - VALUES - ( - $1, - $2 - ); - ") - .bind(&ir.interline_id) - .bind(&route) - .execute(&mut **tx) - .await?; - } - - Ok(()) -} - async fn insert(&self, tx: &mut Transaction<'_, Postgres>) -> anyhow::Result<()> { - Self::insert_many(vec![self.clone()], tx).await?; - - Ok(()) - } -} - -impl FromSeptaJson for RouteType { - fn from_septa_json(json_rt: septa_json::route::RouteType) -> ::anyhow::Result> { - Ok(Box::new(match json_rt { - septa_json::route::RouteType::Trolley => RouteType::Trolley, - septa_json::route::RouteType::SubwayElevated => RouteType::SubwayElevated, - septa_json::route::RouteType::RegionalRail => RouteType::RegionalRail, - septa_json::route::RouteType::Bus => RouteType::Bus, - septa_json::route::RouteType::TracklessTrolley => RouteType::TracklessTrolley, - })) - } -} - -impl FromSeptaJson for Route { - fn from_septa_json(json_route: septa_json::route::Route) -> ::anyhow::Result> { - Ok(Box::new(Route { - name: json_route.route_long_name, - short_name: json_route.route_short_name, - color_hex: json_route.route_color, - route_type: *RouteType::from_septa_json(json_route.route_type)?, - id: json_route.route_id, - // FIXME: Actually get direction - })) - } -} diff --git a/data_loader/src/septa/route_stop.rs b/data_loader/src/septa/route_stop.rs deleted file mode 100644 index a626346..0000000 --- a/data_loader/src/septa/route_stop.rs +++ /dev/null @@ -1,78 +0,0 @@ -use sqlx::{Postgres, Transaction}; -use crate::traits::{DbObj, FromSeptaJson}; -use crate::septa_json; - -use libseptastic::route_stop::RouteStop; - - -impl DbObj for RouteStop { - async fn create_table(tx: &mut Transaction<'_, Postgres>) -> anyhow::Result<()> { - sqlx::query(" - CREATE TABLE IF NOT EXISTS septa_route_stops ( - route_id TEXT NOT NULL, - stop_id BIGINT NOT NULL, - direction_id BIGINT NOT NULL, - stop_sequence BIGINT NOT NULL, - - PRIMARY KEY (route_id, stop_id, direction_id), - - FOREIGN KEY (route_id) REFERENCES septa_routes(id) ON DELETE CASCADE, - FOREIGN KEY (stop_id) REFERENCES septa_stops(id) ON DELETE CASCADE - ); - ") - .execute(&mut **tx) - .await?; - - Ok(()) - } - async fn insert_many(rses: Vec, tx: &mut Transaction<'_, Postgres>) -> anyhow::Result<()> { - let mut route_ids: Vec = Vec::new(); - let mut stop_ids: Vec = Vec::new(); - let mut direction_ids: Vec = Vec::new(); - let mut stop_sequences: Vec = Vec::new(); - - for rs in rses { - route_ids.push(rs.route_id.clone()); - stop_ids.push(rs.stop_id.clone()); - direction_ids.push(rs.direction_id.clone()); - stop_sequences.push(rs.stop_sequence.clone()); - } - - sqlx::query(" - INSERT INTO - septa_route_stops - ( - route_id, - stop_id, - direction_id, - stop_sequence - ) - SELECT * FROM UNNEST($1::text[], $2::bigint[], $3::bigint[], $4::bigint[]) - ON CONFLICT DO NOTHING - ; - ") - .bind(&route_ids[..]) - .bind(&stop_ids[..]) - .bind(&direction_ids[..]) - .bind(&stop_sequences[..]) - .execute(&mut **tx) - .await?; - Ok(()) - } - - async fn insert(&self, tx: &mut Transaction<'_, Postgres>) -> anyhow::Result<()> { - Self::insert_many(vec![self.clone()], tx).await?; - Ok(()) - } -} - -impl FromSeptaJson for RouteStop { - fn from_septa_json(json_stops: septa_json::route_stop::RouteStop) -> ::anyhow::Result> { - Ok(Box::new(RouteStop{ - route_id: json_stops.route_id, - stop_id: json_stops.stop_id, - direction_id: json_stops.direction_id, - stop_sequence: json_stops.stop_sequence - })) - } -} diff --git a/data_loader/src/septa/schedule_day.rs b/data_loader/src/septa/schedule_day.rs deleted file mode 100644 index 6c1b816..0000000 --- a/data_loader/src/septa/schedule_day.rs +++ /dev/null @@ -1,70 +0,0 @@ -use sqlx::{Postgres, Transaction}; -use crate::traits::{DbObj, FromSeptaJson}; -use crate::septa_json; - -use libseptastic::schedule_day::ScheduleDay; - -impl DbObj for ScheduleDay { - async fn create_table(tx: &mut Transaction<'_, Postgres>) -> anyhow::Result<()> { - sqlx::query(" - CREATE TABLE IF NOT EXISTS septa_schedule_days ( - date TEXT NOT NULL, - service_id TEXT NOT NULL, - PRIMARY KEY (date, service_id) - ); - ") - .execute(&mut **tx) - .await?; - - Ok(()) - } - async fn insert_many(scheds: Vec, tx: &mut Transaction<'_, Postgres>) -> anyhow::Result<()> { - let mut dates: Vec = Vec::new(); - let mut service_ids: Vec = Vec::new(); - - for sched in scheds { - dates.push(sched.date); - service_ids.push(sched.service_id); - } - - sqlx::query(" - - INSERT INTO septa_schedule_days ( - date, - service_id - ) - SELECT * FROM UNNEST( - $1::text[], - $2::text[] - ) - ON CONFLICT DO NOTHING - ; - ") - .bind(&dates[..]) - .bind(&service_ids[..]) - .execute(&mut **tx) - .await?; - - Ok(()) - } - - async fn insert(&self, tx: &mut Transaction<'_, Postgres>) -> anyhow::Result<()> { - Self::insert_many(vec![self.clone()], tx).await?; - Ok(()) - } -} - - -impl FromSeptaJson for Vec { - fn from_septa_json(json_sched: septa_json::schedule_day::Calendar) -> ::anyhow::Result>> { - let mut res: Vec = Vec::new(); - - for (day, sched) in json_sched { - for svc in sched.service_id { - res.push(ScheduleDay { date: day.clone(), service_id: svc }); - } - } - - Ok(Box::new(res)) - } -} diff --git a/data_loader/src/septa/stop.rs b/data_loader/src/septa/stop.rs deleted file mode 100644 index d01f53c..0000000 --- a/data_loader/src/septa/stop.rs +++ /dev/null @@ -1,108 +0,0 @@ -use sqlx::{Postgres, Transaction}; -use crate::traits::{DbObj, FromSeptaJson}; -use crate::septa_json; - -use libseptastic::stop::{Stop, StopType}; - -impl DbObj for Stop { - async fn create_table(tx: &mut Transaction<'_, Postgres>) -> anyhow::Result<()> { - sqlx::query(" - CREATE TYPE septa_stop_type AS ENUM ( - 'far_side', - 'middle_block_near_side', - 'normal' - ); - ") - .execute(&mut **tx) - .await?; - - sqlx::query(" - CREATE TABLE IF NOT EXISTS septa_stops ( - id BIGINT PRIMARY KEY, - name VARCHAR(128) NOT NULL, - lat DOUBLE PRECISION NOT NULL, - lng DOUBLE PRECISION NOT NULL, - stop_type septa_stop_type NOT NULL - ); - ") - .execute(&mut **tx) - .await?; - - Ok(()) - } - async fn insert_many(stations: Vec, tx: &mut Transaction<'_, Postgres>) -> anyhow::Result<()> { - let mut ids: Vec = Vec::new(); - let mut names: Vec = Vec::new(); - let mut lats: Vec = Vec::new(); - let mut lngs: Vec = Vec::new(); - let mut stop_types: Vec = Vec::new(); - - for station in stations { - ids.push(station.id); - names.push(station.name); - lats.push(station.lat); - lngs.push(station.lng); - stop_types.push(station.stop_type); - } - - sqlx::query(" - INSERT INTO - septa_stops - ( - id, - name, - lat, - lng, - stop_type - ) - SELECT * FROM UNNEST( - $1::bigint[], - $2::text[], - $3::double precision[], - $4::double precision[], - $5::septa_stop_type[] - ) - ON CONFLICT DO NOTHING - ; - ") - .bind(&ids[..]) - .bind(&names[..]) - .bind(&lats[..]) - .bind(&lngs[..]) - .bind(&stop_types[..]) - .execute(&mut **tx) - .await?; - - Ok(()) - } - - async fn insert(&self, tx: &mut Transaction<'_, Postgres>) -> anyhow::Result<()> { - Self::insert_many(vec![self.clone()], tx).await?; - Ok(()) - } -} - -impl FromSeptaJson for Stop { - fn from_septa_json(json_station: septa_json::route_stop::RouteStop) -> ::anyhow::Result> { - let mut name = json_station.stop_name; - let mut stop_type = StopType::Normal; - - if let Some(new_name) = name.strip_suffix("- MNBS") { - stop_type = StopType::MiddleBlockNearSide; - name = new_name.to_string(); - } - - if let Some(new_name) = name.strip_suffix("- FS") { - stop_type = StopType::FarSide; - name = new_name.to_string(); - } - - Ok(Box::new(Stop { - name, - id: json_station.stop_id, - lat: json_station.stop_lat, - lng: json_station.stop_lon, - stop_type - })) - } -} diff --git a/data_loader/src/septa/stop_schedule.rs b/data_loader/src/septa/stop_schedule.rs deleted file mode 100644 index ef7a1e6..0000000 --- a/data_loader/src/septa/stop_schedule.rs +++ /dev/null @@ -1,127 +0,0 @@ -use sqlx::{Postgres, Transaction}; -use crate::traits::{DbObj, FromSeptaJson}; -use crate::septa_json; - -use libseptastic::stop_schedule::StopSchedule; - -impl DbObj for StopSchedule { - async fn create_table(tx: &mut Transaction<'_, Postgres>) -> anyhow::Result<()> { - sqlx::query(" - CREATE TABLE IF NOT EXISTS septa_stop_schedules ( - route_id TEXT NOT NULL, - trip_id TEXT NOT NULL, - service_id TEXT NOT NULL, - direction_id BIGINT NOT NULL, - arrival_time BIGINT NOT NULL, - stop_id BIGINT NOT NULL, - stop_sequence BIGINT NOT NULL, - - PRIMARY KEY (trip_id, stop_id), - - FOREIGN KEY (route_id) REFERENCES septa_routes(id) ON DELETE CASCADE, - FOREIGN KEY (stop_id) REFERENCES septa_stops(id) ON DELETE CASCADE - ); - ") - //FOREIGN KEY (route_id, direction_id) REFERENCES septa_directions(route_id, direction_id) ON DELETE CASCADE, - .execute(&mut **tx) - .await?; - - sqlx::query(" - CREATE INDEX septa_stop_schedule_trip_id_idx ON septa_stop_schedules (trip_id); - ").execute(&mut **tx).await?; - - sqlx::query(" - CREATE INDEX septa_stop_schedule_service_id_idx ON septa_stop_schedules (service_id); - ").execute(&mut **tx).await?; - - Ok(()) - } - async fn insert_many(scheds: Vec, tx: &mut Transaction<'_, Postgres>) -> anyhow::Result<()> { - let mut route_ids: Vec = Vec::new(); - let mut trip_ids: Vec = Vec::new(); - let mut service_ids: Vec = Vec::new(); - let mut direction_ids: Vec = Vec::new(); - let mut arrival_times: Vec = Vec::new(); - let mut stop_ids: Vec = Vec::new(); - let mut stop_sequences: Vec = Vec::new(); - - for sched in scheds { - route_ids.push(sched.route_id); - trip_ids.push(sched.trip_id); - service_ids.push(sched.service_id); - direction_ids.push(sched.direction_id); - arrival_times.push(sched.arrival_time); - stop_ids.push(sched.stop_id); - stop_sequences.push(sched.stop_sequence); - } - - sqlx::query(" - - INSERT INTO septa_stop_schedules ( - route_id, - trip_id, - service_id, - direction_id, - arrival_time, - stop_id, - stop_sequence - ) - SELECT * FROM UNNEST( - $1::text[], - $2::text[], - $3::text[], - $4::bigint[], - $5::bigint[], - $6::bigint[], - $7::bigint[] - ) - ON CONFLICT DO NOTHING - ; - ") - .bind(&route_ids[..]) - .bind(&trip_ids[..]) - .bind(&service_ids[..]) - .bind(&direction_ids[..]) - .bind(&arrival_times[..]) - .bind(&stop_ids[..]) - .bind(&stop_sequences[..]) - .execute(&mut **tx) - .await?; - - Ok(()) - } - - async fn insert(&self, tx: &mut Transaction<'_, Postgres>) -> anyhow::Result<()> { - Self::insert_many(vec![self.clone()], tx).await?; - Ok(()) - } -} - -impl FromSeptaJson for StopSchedule { - fn from_septa_json(json_sched: septa_json::stop_schedule::StopSchedule) -> ::anyhow::Result> { - let time_parts: Vec<&str> = json_sched.arrival_time.split(":").collect(); - - let arrival_time: i64 = { - let hour: i64 = time_parts[0].parse::()?; - let minute: i64 = time_parts[1].parse::()?; - let second: i64 = time_parts[2].parse::()?; - - (hour*3600) + (minute * 60) + second - }; - - Ok(Box::new(StopSchedule { - route_id: json_sched.route_id, - trip_id: match json_sched.trip_id{ - septa_json::stop_schedule::TripId::RegionalRail(x) => x, - septa_json::stop_schedule::TripId::Other(y) => y.to_string() - }, - stop_name: String::from(""), - service_id: json_sched.service_id, - // FIXME: Actually get direction - direction_id: json_sched.direction_id, - arrival_time, - stop_id: json_sched.stop_id, - stop_sequence: json_sched.stop_sequence - })) - } -} diff --git a/data_loader/src/septa_json/direction.rs b/data_loader/src/septa_json/direction.rs deleted file mode 100644 index ce15bc6..0000000 --- a/data_loader/src/septa_json/direction.rs +++ /dev/null @@ -1,19 +0,0 @@ -use crate::traits::*; -use serde::Deserialize; - - -#[derive(Debug, Clone, Deserialize)] -pub struct Direction { - #[serde(rename="Route")] - pub route: String, - #[serde(rename="DirectionDescription")] - pub direction_destination: String, - #[serde(rename="Direction")] - pub direction: String, - #[serde(rename="TrueDirection")] - pub true_direction: String, - #[serde(rename="Mode")] - pub mode: String -} - -impl SeptaJson for Direction {} diff --git a/data_loader/src/septa_json/mod.rs b/data_loader/src/septa_json/mod.rs deleted file mode 100644 index d28e3e1..0000000 --- a/data_loader/src/septa_json/mod.rs +++ /dev/null @@ -1,6 +0,0 @@ -pub mod route; -pub mod route_stop; -pub mod stop_schedule; -pub mod schedule_day; -pub mod direction; -pub mod ridership; diff --git a/data_loader/src/septa_json/ridership.rs b/data_loader/src/septa_json/ridership.rs deleted file mode 100644 index bc9dd33..0000000 --- a/data_loader/src/septa_json/ridership.rs +++ /dev/null @@ -1,37 +0,0 @@ -use crate::traits::*; -use serde::Deserialize; - -#[derive(Debug, Clone, Deserialize)] -pub struct Ridership { - #[serde(rename = "Stop ID")] - pub stop_id: Option, - - #[serde(rename = "Stop Code")] - pub stop_code: String, - - #[serde(rename = "Stop")] - pub stop_name: String, - - #[serde(rename = "Route")] - pub route_id: String, - - #[serde(rename = "Direction")] - pub direction: String, - - #[serde(rename = "Ons")] - pub ons: String, - - #[serde(rename = "Offs")] - pub offs: String, - - #[serde(rename = "Exp_Ons")] - pub exp_ons: String, - - #[serde(rename = "Exp_Offs")] - pub exp_offs: String, - - #[serde(rename = "Year")] - pub year: String -} - -impl SeptaJson for Ridership {} diff --git a/data_loader/src/septa_json/route.rs b/data_loader/src/septa_json/route.rs deleted file mode 100644 index 93e6967..0000000 --- a/data_loader/src/septa_json/route.rs +++ /dev/null @@ -1,42 +0,0 @@ -use crate::traits::*; -use serde_repr::*; -use serde::Deserialize; - -#[derive(Serialize_repr, Deserialize_repr, PartialEq, Debug)] -#[repr(u8)] -pub enum RouteType { - Trolley= 0, - SubwayElevated = 1, - RegionalRail = 2, - Bus = 3, - TracklessTrolley = 11 -} - - -#[derive(Debug, Deserialize)] -pub struct Document { - #[serde(rename = "type")] - pub doc_type: String, - pub title: String, - pub url: String, - pub link_label: Option, -} - -#[derive(Debug, Deserialize)] -pub struct Route { - pub release_name: String, - pub route_id: String, - pub route_short_name: String, - pub route_long_name: String, - pub route_type: RouteType, - pub route_color: String, - pub route_text_color: String, - pub route_frequency_text: String, - pub is_frequent_bus: bool, - pub route_color_dark: String, - pub route_color_text_dark: String, - pub documents: Vec -} - -impl SeptaJson for Route {} -impl SeptaJson for RouteType {} diff --git a/data_loader/src/septa_json/route_stop.rs b/data_loader/src/septa_json/route_stop.rs deleted file mode 100644 index 2eca64d..0000000 --- a/data_loader/src/septa_json/route_stop.rs +++ /dev/null @@ -1,31 +0,0 @@ -use crate::traits::*; -use serde::{self, Deserialize, Deserializer}; - -pub fn de_string_or_int<'de, D>(deserializer: D) -> Result -where - D: Deserializer<'de>, -{ - use serde::de::Error; - use serde_json::Value; - - match Value::deserialize(deserializer)? { - Value::Number(n) => n.as_i64().ok_or_else(|| Error::custom("Invalid number")), - Value::String(s) => s.parse::().map_err(|_| Error::custom("Invalid string number")), - _ => Err(Error::custom("Expected a string or number")), - } -} - -#[derive(Debug, Clone, Deserialize)] -pub struct RouteStop { - pub route_id: String, - pub direction_id: i64, - #[serde(deserialize_with = "de_string_or_int")] - pub stop_id: i64, - pub stop_name: String, - pub stop_sequence: i64, - pub stop_lat: f64, - pub stop_lon: f64, -} - -impl SeptaJson for RouteStop {} -impl SeptaJson for Vec {} diff --git a/data_loader/src/septa_json/schedule_day.rs b/data_loader/src/septa_json/schedule_day.rs deleted file mode 100644 index ad9886a..0000000 --- a/data_loader/src/septa_json/schedule_day.rs +++ /dev/null @@ -1,18 +0,0 @@ -use std::collections::{BTreeMap, HashMap, HashSet}; - -use crate::traits::*; -use serde::{Deserialize, Serialize}; - - -#[derive(Default, Debug, Serialize, Deserialize, Clone)] -pub struct ScheduleDay { - pub release_name: String, - pub special: bool, - pub service_id: Vec, - pub service_added: Vec, - pub service_removed: Vec -} - -pub type Calendar = BTreeMap; - -impl SeptaJson for Calendar {} diff --git a/data_loader/src/septa_json/stop_schedule.rs b/data_loader/src/septa_json/stop_schedule.rs deleted file mode 100644 index 65a98bf..0000000 --- a/data_loader/src/septa_json/stop_schedule.rs +++ /dev/null @@ -1,27 +0,0 @@ -use crate::traits::*; -use serde::{Deserialize, Serialize}; - -#[derive(Debug, Clone, Serialize, Deserialize)] -#[serde(untagged)] -pub enum TripId { - RegionalRail(String), - Other(i64) -} - -#[derive(Debug, Clone, Deserialize)] -pub struct StopSchedule { - pub route_id: String, - pub block_id: i64, - pub release_name: String, - pub trip_id: TripId, - pub service_id: String, - pub trip_headsign: Option, - pub direction_id: i64, - pub arrival_time: String, - pub stop_id: i64, - pub stop_name: String, - pub stop_sequence: i64, - pub last_stop_id: i64 -} - -impl SeptaJson for StopSchedule {} diff --git a/data_loader/src/traits.rs b/data_loader/src/traits.rs deleted file mode 100644 index 20eb2cc..0000000 --- a/data_loader/src/traits.rs +++ /dev/null @@ -1,23 +0,0 @@ -use std::collections::HashMap; - -use sqlx::{Postgres, Transaction}; - - - -pub trait SeptaJson {} - -pub trait FromSeptaJson { - fn from_septa_json(septa_json: T) -> ::anyhow::Result>; -} - -pub trait FromSeptaJsonAndStations { - fn from_septa_json(septa_json: T, stop_map: &HashMap) -> ::anyhow::Result>; -} - -pub trait DbObj { - fn create_table(tx: &mut Transaction<'_, Postgres>) -> impl std::future::Future< Output = ::anyhow::Result<()>> + Send; - fn insert(&self, tx: &mut Transaction<'_, Postgres>) -> impl std::future::Future< Output = ::anyhow::Result<()>> + Send; - fn insert_many(s: Vec, tx: &mut Transaction<'_, Postgres>) -> impl std::future::Future< Output = ::anyhow::Result<()>> + Send; -} - -pub trait Fetchable {}