just add data loader here
Some checks failed
Create and publish a Docker image / build-and-push-image (push) Has been cancelled
Some checks failed
Create and publish a Docker image / build-and-push-image (push) Has been cancelled
This commit is contained in:
parent
04ae29eb27
commit
55df6bdb16
23 changed files with 4097 additions and 1 deletions
84
data_loader/src/septa/direction.rs
Normal file
84
data_loader/src/septa/direction.rs
Normal file
|
|
@ -0,0 +1,84 @@
|
|||
use libseptastic::direction::{CardinalDirection, Direction};
|
||||
use sqlx::{Postgres, Transaction};
|
||||
use crate::traits::{DbObj, FromSeptaJson};
|
||||
use crate::septa_json;
|
||||
|
||||
|
||||
impl DbObj<Direction> for Direction {
|
||||
async fn create_table(tx: &mut Transaction<'_, Postgres>) -> anyhow::Result<()> {
|
||||
sqlx::query("
|
||||
CREATE TABLE IF NOT EXISTS septa_directions (
|
||||
route_id TEXT NOT NULL,
|
||||
direction_id BIGINT NOT NULL,
|
||||
direction septa_direction_type NOT NULL,
|
||||
direction_destination TEXT NOT NULL,
|
||||
|
||||
FOREIGN KEY (route_id) REFERENCES septa_routes(id) ON DELETE CASCADE,
|
||||
PRIMARY KEY (route_id, direction_id)
|
||||
|
||||
);
|
||||
")
|
||||
.execute(&mut **tx)
|
||||
.await?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
async fn insert_many(dirs: Vec<Direction>, tx: &mut Transaction<'_, Postgres>) -> anyhow::Result<()> {
|
||||
let mut route_ids: Vec<String> = Vec::new();
|
||||
let mut direction_ids: Vec<i64> = Vec::new();
|
||||
let mut directions: Vec<CardinalDirection> = Vec::new();
|
||||
let mut direction_destinations: Vec<String> = Vec::new();
|
||||
|
||||
for dir in dirs {
|
||||
route_ids.push(dir.route_id);
|
||||
direction_ids.push(dir.direction_id);
|
||||
directions.push(dir.direction);
|
||||
direction_destinations.push(dir.direction_destination);
|
||||
}
|
||||
|
||||
sqlx::query("
|
||||
INSERT INTO septa_directions (
|
||||
route_id,
|
||||
direction_id,
|
||||
direction,
|
||||
direction_destination
|
||||
)
|
||||
SELECT * FROM UNNEST(
|
||||
$1::text[],
|
||||
$2::bigint[],
|
||||
$3::septa_direction_type[],
|
||||
$4::text[]
|
||||
);
|
||||
")
|
||||
.bind(&route_ids[..])
|
||||
.bind(&direction_ids[..])
|
||||
.bind(&directions[..])
|
||||
.bind(&direction_destinations[..])
|
||||
.execute(&mut **tx)
|
||||
.await?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn insert(&self, tx: &mut Transaction<'_, Postgres>) -> anyhow::Result<()> {
|
||||
Self::insert_many(vec![self.clone()], tx).await?;
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
impl FromSeptaJson<septa_json::direction::Direction> for Direction {
|
||||
fn from_septa_json(json_dir: septa_json::direction::Direction) -> ::anyhow::Result<Box<Direction>> {
|
||||
Ok(Box::new(Direction { route_id: json_dir.route, direction_id: json_dir.direction.parse::<i64>()?,
|
||||
direction: match json_dir.true_direction.as_str() {
|
||||
"Eastbound" => Ok(libseptastic::direction::CardinalDirection::Eastbound),
|
||||
"Westbound" => Ok(libseptastic::direction::CardinalDirection::Westbound),
|
||||
"Outbound" => Ok(libseptastic::direction::CardinalDirection::Outbound),
|
||||
"Inbound" => Ok(libseptastic::direction::CardinalDirection::Inbound),
|
||||
"Southbound" => Ok(libseptastic::direction::CardinalDirection::Southbound),
|
||||
"Northbound" => Ok(libseptastic::direction::CardinalDirection::Northbound),
|
||||
"LOOP" => Ok(libseptastic::direction::CardinalDirection::Loop),
|
||||
_ => Err(anyhow::anyhow!("Unable to find right direction"))
|
||||
}?
|
||||
, direction_destination: json_dir.direction_destination }))
|
||||
}
|
||||
}
|
||||
7
data_loader/src/septa/mod.rs
Normal file
7
data_loader/src/septa/mod.rs
Normal file
|
|
@ -0,0 +1,7 @@
|
|||
pub mod route;
|
||||
pub mod direction;
|
||||
pub mod stop;
|
||||
pub mod route_stop;
|
||||
pub mod stop_schedule;
|
||||
pub mod schedule_day;
|
||||
pub mod ridership;
|
||||
216
data_loader/src/septa/ridership.rs
Normal file
216
data_loader/src/septa/ridership.rs
Normal file
|
|
@ -0,0 +1,216 @@
|
|||
use std::collections::HashMap;
|
||||
|
||||
use libseptastic::direction::CardinalDirection;
|
||||
use libseptastic::ridership::Ridership;
|
||||
use sqlx::{Postgres, Transaction};
|
||||
use crate::traits::{DbObj, FromSeptaJson, FromSeptaJsonAndStations};
|
||||
use crate::septa_json;
|
||||
|
||||
|
||||
impl DbObj<Ridership> for Ridership {
|
||||
async fn create_table(tx: &mut Transaction<'_, Postgres>) -> anyhow::Result<()> {
|
||||
sqlx::query("
|
||||
CREATE TABLE IF NOT EXISTS septa_ridership (
|
||||
route_id TEXT NOT NULL,
|
||||
stop_id bigint NOT NULL,
|
||||
direction septa_direction_type,
|
||||
exp_ons bigint,
|
||||
exp_offs bigint,
|
||||
ons bigint,
|
||||
offs bigint,
|
||||
year bigint,
|
||||
|
||||
PRIMARY KEY (route_id, stop_id, direction),
|
||||
|
||||
FOREIGN KEY (route_id) REFERENCES septa_routes(id) ON DELETE CASCADE,
|
||||
FOREIGN KEY (stop_id) REFERENCES septa_stops(id) ON DELETE CASCADE
|
||||
);
|
||||
")
|
||||
.execute(&mut **tx)
|
||||
.await?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn insert_many(dirs: Vec<Ridership>, tx: &mut Transaction<'_, Postgres>) -> anyhow::Result<()> {
|
||||
let mut route_ids: Vec<String> = Vec::new();
|
||||
let mut stop_ids: Vec<i64> = Vec::new();
|
||||
let mut directions: Vec<CardinalDirection> = Vec::new();
|
||||
let mut exp_onss: Vec<i64> = Vec::new();
|
||||
let mut exp_offss: Vec<i64> = Vec::new();
|
||||
let mut onss: Vec<i64> = Vec::new();
|
||||
let mut offss: Vec<i64> = Vec::new();
|
||||
let mut years: Vec<i64> = Vec::new();
|
||||
|
||||
for dir in dirs {
|
||||
route_ids.push(dir.route_id);
|
||||
stop_ids.push(dir.stop_id);
|
||||
directions.push(dir.direction);
|
||||
exp_onss.push(dir.exp_ons);
|
||||
exp_offss.push(dir.exp_offs);
|
||||
onss.push(dir.ons);
|
||||
offss.push(dir.offs);
|
||||
years.push(dir.year);
|
||||
}
|
||||
|
||||
sqlx::query("
|
||||
INSERT INTO
|
||||
septa_ridership
|
||||
(
|
||||
route_id,
|
||||
stop_id,
|
||||
direction,
|
||||
exp_ons,
|
||||
exp_offs,
|
||||
ons,
|
||||
offs,
|
||||
year
|
||||
)
|
||||
SELECT * FROM UNNEST(
|
||||
$1::text[],
|
||||
$2::bigint[],
|
||||
$3::septa_direction_type[],
|
||||
$4::bigint[],
|
||||
$5::bigint[],
|
||||
$6::bigint[],
|
||||
$7::bigint[],
|
||||
$8::bigint[]
|
||||
)
|
||||
ON CONFLICT DO NOTHING
|
||||
;
|
||||
")
|
||||
.bind(&route_ids[..])
|
||||
.bind(&stop_ids[..])
|
||||
.bind(&directions[..])
|
||||
.bind(&exp_onss[..])
|
||||
.bind(&exp_offss[..])
|
||||
.bind(&onss[..])
|
||||
.bind(&offss[..])
|
||||
.bind(&years[..])
|
||||
.execute(&mut **tx)
|
||||
.await?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn insert(&self, tx: &mut Transaction<'_, Postgres>) -> anyhow::Result<()> {
|
||||
Self::insert_many(vec![self.clone()], tx).await?;
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
impl FromSeptaJsonAndStations<septa_json::ridership::Ridership> for Ridership {
|
||||
fn from_septa_json(json_rider: septa_json::ridership::Ridership, stop_map: &HashMap<String, i64>) -> ::anyhow::Result<Box<Ridership>> {
|
||||
Ok(Box::new(Ridership {
|
||||
route_id: match json_rider.route_id.as_str() {
|
||||
// SEPTA Metro
|
||||
"MFL" => "L1",
|
||||
"BSL" => "B1",
|
||||
"MFO" => "L1 OWL",
|
||||
"BSO" => "B1 OWL",
|
||||
"J" => "41",
|
||||
"L" => "51",
|
||||
"G" => "63",
|
||||
"H" => "71",
|
||||
"XH" => "81",
|
||||
"R" => "82",
|
||||
"101" => "D1",
|
||||
"102" => "D2",
|
||||
"15" => "G1",
|
||||
"NHSL" => "M1",
|
||||
"10" => "T1",
|
||||
"34" => "T2",
|
||||
"13" => "T3",
|
||||
"11" => "T4",
|
||||
"36" => "T5",
|
||||
// Regional Rail Remap
|
||||
"Airport" => "AIR",
|
||||
"Chestnut Hill West" => "CHW",
|
||||
"Chestnut Hill East" => "CHE",
|
||||
"Cynwyd" => "CYN",
|
||||
"Fox Chase" => "FOX",
|
||||
"Lansdale/Doylestown" => "LAN",
|
||||
"Media/Wawa" => "MED",
|
||||
"Manyunk/Norristown" => "NOR",
|
||||
"Paoli/Thorndale" => "PAO",
|
||||
"Trenton" => "TRE",
|
||||
"Warminster" => "WAR",
|
||||
"Wilmington/Newark" => "WIL",
|
||||
"West Trenton" => "WTR",
|
||||
any => any
|
||||
}.to_string(),
|
||||
stop_id: {
|
||||
if json_rider.stop_code != "" {
|
||||
Ok(json_rider.stop_code.parse::<i64>()?)
|
||||
} else {
|
||||
if let Some(sid) = stop_map.get(&(match json_rider.stop_name.as_str() {
|
||||
"Churchmans Crossing" => "Churchman's Crossing",
|
||||
"Prospect Park" => "Prospect Park - Moore",
|
||||
"Highland Ave." => "Highland Avenue",
|
||||
"Chester TC" => "Chester Transit Center",
|
||||
"University City" => "Penn Medicine Station",
|
||||
"Wynnefield" => "Wynnefield Avenue",
|
||||
"Temple" => "Temple University",
|
||||
"Fern Rock" => "Fern Rock T C",
|
||||
"Jenkintown" => "Jenkintown Wyncote",
|
||||
"Cornwells Height" => "Cornwells Heights",
|
||||
"Levittown" => "Levittown Station",
|
||||
"9th Street" => "9th Street Lansdale",
|
||||
"Del Val College" => "Delaware Valley College",
|
||||
"Sedwick" => "Sedgwick",
|
||||
"Allen Lane" => "Richard Allen Lane",
|
||||
"Elm Street" => "Norristown - Elm Street",
|
||||
"Norristown" => "Norristown Transit Center",
|
||||
"Neshaminy Falls" => "Neshaminy",
|
||||
"Moylan Rose Valley" => "Moylan-Rose Valley",
|
||||
"Morton" => "Morton-Rutledge",
|
||||
"Easwick" => "Eastwick",
|
||||
"30th Street Station" => "Gray 30th Street",
|
||||
"Holmesburg Jct" => "Holmesburg Junction",
|
||||
any => any
|
||||
}.to_string())) {
|
||||
Ok(sid.clone())
|
||||
} else {
|
||||
Err(::anyhow::anyhow!("Station {} not found", json_rider.stop_name))
|
||||
}
|
||||
}
|
||||
}?,
|
||||
exp_ons: json_rider.exp_ons.parse()?,
|
||||
exp_offs: json_rider.exp_offs.parse()?,
|
||||
ons: json_rider.ons.parse()?,
|
||||
offs: json_rider.offs.parse()?,
|
||||
year: 2024, //FIXME FIXME! Actually parse
|
||||
direction: match json_rider.direction.as_str() {
|
||||
"Eastbound" => Ok(CardinalDirection::Eastbound),
|
||||
"Westbound" => Ok(CardinalDirection::Westbound),
|
||||
"Northbound" => Ok(CardinalDirection::Northbound),
|
||||
"Southbound" => Ok(CardinalDirection::Southbound),
|
||||
"Inbound" => Ok(CardinalDirection::Inbound),
|
||||
"Outbound" => Ok(CardinalDirection::Outbound),
|
||||
"Loop" => Ok(CardinalDirection::Loop),
|
||||
"" => match json_rider.route_id.as_str() {
|
||||
"AIR" => Ok(CardinalDirection::Inbound),
|
||||
"CHE" => Ok(CardinalDirection::Outbound),
|
||||
"CHW" => Ok(CardinalDirection::Inbound),
|
||||
"CYN" => Ok(CardinalDirection::Inbound),
|
||||
"FOX" => Ok(CardinalDirection::Outbound),
|
||||
"LAN" => Ok(CardinalDirection::Outbound),
|
||||
"NOR" => Ok(CardinalDirection::Outbound),
|
||||
"MED" => Ok(CardinalDirection::Inbound),
|
||||
"PAO" => Ok(CardinalDirection::Inbound),
|
||||
"TRE" => Ok(CardinalDirection::Inbound),
|
||||
"WAR" => Ok(CardinalDirection::Outbound),
|
||||
"WTR" => Ok(CardinalDirection::Outbound),
|
||||
"WIL" => Ok(CardinalDirection::Inbound),
|
||||
_ => Err(anyhow::anyhow!("Bad dir value RR route id {}", json_rider.route_id))
|
||||
},
|
||||
_ => Err(
|
||||
::anyhow::anyhow!(
|
||||
"Unknown true direction {} for Route {}",
|
||||
json_rider.direction,
|
||||
json_rider.route_id
|
||||
)
|
||||
)
|
||||
}?}))
|
||||
}
|
||||
}
|
||||
122
data_loader/src/septa/route.rs
Normal file
122
data_loader/src/septa/route.rs
Normal file
|
|
@ -0,0 +1,122 @@
|
|||
use sqlx::{Postgres, Transaction};
|
||||
use crate::traits::{DbObj, FromSeptaJson};
|
||||
use crate::septa_json;
|
||||
use libseptastic::route::{Route, RouteType};
|
||||
|
||||
impl DbObj<Route> for Route {
|
||||
|
||||
async fn create_table(tx: &mut Transaction<'_, Postgres>) -> anyhow::Result<()> {
|
||||
sqlx::query("
|
||||
CREATE TYPE septa_route_type AS ENUM (
|
||||
'trolley',
|
||||
'subway_elevated',
|
||||
'regional_rail',
|
||||
'bus',
|
||||
'trackless_trolley'
|
||||
);
|
||||
")
|
||||
.execute(&mut **tx)
|
||||
.await?;
|
||||
|
||||
sqlx::query("
|
||||
CREATE TYPE septa_direction_type AS ENUM (
|
||||
'northbound',
|
||||
'southbound',
|
||||
'eastbound',
|
||||
'westbound',
|
||||
'inbound',
|
||||
'outbound',
|
||||
'loop'
|
||||
);
|
||||
")
|
||||
.execute(&mut **tx)
|
||||
.await?;
|
||||
|
||||
sqlx::query("
|
||||
CREATE TABLE IF NOT EXISTS septa_routes (
|
||||
id VARCHAR(8) PRIMARY KEY,
|
||||
name VARCHAR(64) NOT NULL,
|
||||
short_name VARCHAR(32) NOT NULL,
|
||||
color_hex VARCHAR(6) NOT NULL,
|
||||
route_type septa_route_type NOT NULL
|
||||
);
|
||||
")
|
||||
.execute(&mut **tx)
|
||||
.await?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn insert_many(routes: Vec<Route>, tx: &mut Transaction<'_, Postgres>) -> anyhow::Result<()> {
|
||||
let mut names = Vec::new();
|
||||
let mut short_names = Vec::new();
|
||||
let mut color_hexes = Vec::new();
|
||||
let mut route_types: Vec<RouteType> = Vec::new();
|
||||
let mut ids = Vec::new();
|
||||
|
||||
for route in routes {
|
||||
ids.push(route.id);
|
||||
names.push(route.name);
|
||||
short_names.push(route.short_name);
|
||||
color_hexes.push(route.color_hex);
|
||||
route_types.push(route.route_type);
|
||||
}
|
||||
|
||||
sqlx::query("
|
||||
INSERT INTO
|
||||
septa_routes
|
||||
(
|
||||
id,
|
||||
name,
|
||||
short_name,
|
||||
color_hex,
|
||||
route_type
|
||||
)
|
||||
SELECT * FROM UNNEST(
|
||||
$1::text[],
|
||||
$2::text[],
|
||||
$3::text[],
|
||||
$4::text[],
|
||||
$5::septa_route_type[]
|
||||
);
|
||||
")
|
||||
.bind(&ids)
|
||||
.bind(&names)
|
||||
.bind(&short_names)
|
||||
.bind(&color_hexes)
|
||||
.bind(&route_types)
|
||||
.execute(&mut **tx)
|
||||
.await?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
async fn insert(&self, tx: &mut Transaction<'_, Postgres>) -> anyhow::Result<()> {
|
||||
Self::insert_many(vec![self.clone()], tx).await?;
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
impl FromSeptaJson<septa_json::route::RouteType> for RouteType {
|
||||
fn from_septa_json(json_rt: septa_json::route::RouteType) -> ::anyhow::Result<Box<RouteType>> {
|
||||
Ok(Box::new(match json_rt {
|
||||
septa_json::route::RouteType::Trolley => RouteType::Trolley,
|
||||
septa_json::route::RouteType::SubwayElevated => RouteType::SubwayElevated,
|
||||
septa_json::route::RouteType::RegionalRail => RouteType::RegionalRail,
|
||||
septa_json::route::RouteType::Bus => RouteType::Bus,
|
||||
septa_json::route::RouteType::TracklessTrolley => RouteType::TracklessTrolley,
|
||||
}))
|
||||
}
|
||||
}
|
||||
|
||||
impl FromSeptaJson<septa_json::route::Route> for Route {
|
||||
fn from_septa_json(json_route: septa_json::route::Route) -> ::anyhow::Result<Box<Route>> {
|
||||
Ok(Box::new(Route {
|
||||
name: json_route.route_long_name,
|
||||
short_name: json_route.route_short_name,
|
||||
color_hex: json_route.route_color,
|
||||
route_type: *RouteType::from_septa_json(json_route.route_type)?,
|
||||
id: json_route.route_id,
|
||||
// FIXME: Actually get direction
|
||||
}))
|
||||
}
|
||||
}
|
||||
78
data_loader/src/septa/route_stop.rs
Normal file
78
data_loader/src/septa/route_stop.rs
Normal file
|
|
@ -0,0 +1,78 @@
|
|||
use sqlx::{Postgres, Transaction};
|
||||
use crate::traits::{DbObj, FromSeptaJson};
|
||||
use crate::septa_json;
|
||||
|
||||
use libseptastic::route_stop::RouteStop;
|
||||
|
||||
|
||||
impl DbObj<RouteStop> for RouteStop {
|
||||
async fn create_table(tx: &mut Transaction<'_, Postgres>) -> anyhow::Result<()> {
|
||||
sqlx::query("
|
||||
CREATE TABLE IF NOT EXISTS septa_route_stops (
|
||||
route_id TEXT NOT NULL,
|
||||
stop_id BIGINT NOT NULL,
|
||||
direction_id BIGINT NOT NULL,
|
||||
stop_sequence BIGINT NOT NULL,
|
||||
|
||||
PRIMARY KEY (route_id, stop_id, direction_id),
|
||||
|
||||
FOREIGN KEY (route_id) REFERENCES septa_routes(id) ON DELETE CASCADE,
|
||||
FOREIGN KEY (stop_id) REFERENCES septa_stops(id) ON DELETE CASCADE
|
||||
);
|
||||
")
|
||||
.execute(&mut **tx)
|
||||
.await?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
async fn insert_many(rses: Vec<RouteStop>, tx: &mut Transaction<'_, Postgres>) -> anyhow::Result<()> {
|
||||
let mut route_ids: Vec<String> = Vec::new();
|
||||
let mut stop_ids: Vec<i64> = Vec::new();
|
||||
let mut direction_ids: Vec<i64> = Vec::new();
|
||||
let mut stop_sequences: Vec<i64> = Vec::new();
|
||||
|
||||
for rs in rses {
|
||||
route_ids.push(rs.route_id.clone());
|
||||
stop_ids.push(rs.stop_id.clone());
|
||||
direction_ids.push(rs.direction_id.clone());
|
||||
stop_sequences.push(rs.stop_sequence.clone());
|
||||
}
|
||||
|
||||
sqlx::query("
|
||||
INSERT INTO
|
||||
septa_route_stops
|
||||
(
|
||||
route_id,
|
||||
stop_id,
|
||||
direction_id,
|
||||
stop_sequence
|
||||
)
|
||||
SELECT * FROM UNNEST($1::text[], $2::bigint[], $3::bigint[], $4::bigint[])
|
||||
ON CONFLICT DO NOTHING
|
||||
;
|
||||
")
|
||||
.bind(&route_ids[..])
|
||||
.bind(&stop_ids[..])
|
||||
.bind(&direction_ids[..])
|
||||
.bind(&stop_sequences[..])
|
||||
.execute(&mut **tx)
|
||||
.await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn insert(&self, tx: &mut Transaction<'_, Postgres>) -> anyhow::Result<()> {
|
||||
Self::insert_many(vec![self.clone()], tx).await?;
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
impl FromSeptaJson<septa_json::route_stop::RouteStop> for RouteStop {
|
||||
fn from_septa_json(json_stops: septa_json::route_stop::RouteStop) -> ::anyhow::Result<Box<RouteStop>> {
|
||||
Ok(Box::new(RouteStop{
|
||||
route_id: json_stops.route_id,
|
||||
stop_id: json_stops.stop_id,
|
||||
direction_id: json_stops.direction_id,
|
||||
stop_sequence: json_stops.stop_sequence
|
||||
}))
|
||||
}
|
||||
}
|
||||
70
data_loader/src/septa/schedule_day.rs
Normal file
70
data_loader/src/septa/schedule_day.rs
Normal file
|
|
@ -0,0 +1,70 @@
|
|||
use sqlx::{Postgres, Transaction};
|
||||
use crate::traits::{DbObj, FromSeptaJson};
|
||||
use crate::septa_json;
|
||||
|
||||
use libseptastic::schedule_day::ScheduleDay;
|
||||
|
||||
impl DbObj<ScheduleDay> for ScheduleDay {
|
||||
async fn create_table(tx: &mut Transaction<'_, Postgres>) -> anyhow::Result<()> {
|
||||
sqlx::query("
|
||||
CREATE TABLE IF NOT EXISTS septa_schedule_days (
|
||||
date TEXT NOT NULL,
|
||||
service_id TEXT NOT NULL,
|
||||
PRIMARY KEY (date, service_id)
|
||||
);
|
||||
")
|
||||
.execute(&mut **tx)
|
||||
.await?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
async fn insert_many(scheds: Vec<ScheduleDay>, tx: &mut Transaction<'_, Postgres>) -> anyhow::Result<()> {
|
||||
let mut dates: Vec<String> = Vec::new();
|
||||
let mut service_ids: Vec<String> = Vec::new();
|
||||
|
||||
for sched in scheds {
|
||||
dates.push(sched.date);
|
||||
service_ids.push(sched.service_id);
|
||||
}
|
||||
|
||||
sqlx::query("
|
||||
|
||||
INSERT INTO septa_schedule_days (
|
||||
date,
|
||||
service_id
|
||||
)
|
||||
SELECT * FROM UNNEST(
|
||||
$1::text[],
|
||||
$2::text[]
|
||||
)
|
||||
ON CONFLICT DO NOTHING
|
||||
;
|
||||
")
|
||||
.bind(&dates[..])
|
||||
.bind(&service_ids[..])
|
||||
.execute(&mut **tx)
|
||||
.await?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn insert(&self, tx: &mut Transaction<'_, Postgres>) -> anyhow::Result<()> {
|
||||
Self::insert_many(vec![self.clone()], tx).await?;
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
impl FromSeptaJson<septa_json::schedule_day::Calendar> for Vec<ScheduleDay> {
|
||||
fn from_septa_json(json_sched: septa_json::schedule_day::Calendar) -> ::anyhow::Result<Box<Vec<ScheduleDay>>> {
|
||||
let mut res: Vec<ScheduleDay> = Vec::new();
|
||||
|
||||
for (day, sched) in json_sched {
|
||||
for svc in sched.service_id {
|
||||
res.push(ScheduleDay { date: day.clone(), service_id: svc });
|
||||
}
|
||||
}
|
||||
|
||||
Ok(Box::new(res))
|
||||
}
|
||||
}
|
||||
108
data_loader/src/septa/stop.rs
Normal file
108
data_loader/src/septa/stop.rs
Normal file
|
|
@ -0,0 +1,108 @@
|
|||
use sqlx::{Postgres, Transaction};
|
||||
use crate::traits::{DbObj, FromSeptaJson};
|
||||
use crate::septa_json;
|
||||
|
||||
use libseptastic::stop::{Stop, StopType};
|
||||
|
||||
impl DbObj<Stop> for Stop {
|
||||
async fn create_table(tx: &mut Transaction<'_, Postgres>) -> anyhow::Result<()> {
|
||||
sqlx::query("
|
||||
CREATE TYPE septa_stop_type AS ENUM (
|
||||
'far_side',
|
||||
'middle_block_near_side',
|
||||
'normal'
|
||||
);
|
||||
")
|
||||
.execute(&mut **tx)
|
||||
.await?;
|
||||
|
||||
sqlx::query("
|
||||
CREATE TABLE IF NOT EXISTS septa_stops (
|
||||
id BIGINT PRIMARY KEY,
|
||||
name VARCHAR(128) NOT NULL,
|
||||
lat DOUBLE PRECISION NOT NULL,
|
||||
lng DOUBLE PRECISION NOT NULL,
|
||||
stop_type septa_stop_type NOT NULL
|
||||
);
|
||||
")
|
||||
.execute(&mut **tx)
|
||||
.await?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
async fn insert_many(stations: Vec<Stop>, tx: &mut Transaction<'_, Postgres>) -> anyhow::Result<()> {
|
||||
let mut ids: Vec<i64> = Vec::new();
|
||||
let mut names: Vec<String> = Vec::new();
|
||||
let mut lats: Vec<f64> = Vec::new();
|
||||
let mut lngs: Vec<f64> = Vec::new();
|
||||
let mut stop_types: Vec<StopType> = Vec::new();
|
||||
|
||||
for station in stations {
|
||||
ids.push(station.id);
|
||||
names.push(station.name);
|
||||
lats.push(station.lat);
|
||||
lngs.push(station.lng);
|
||||
stop_types.push(station.stop_type);
|
||||
}
|
||||
|
||||
sqlx::query("
|
||||
INSERT INTO
|
||||
septa_stops
|
||||
(
|
||||
id,
|
||||
name,
|
||||
lat,
|
||||
lng,
|
||||
stop_type
|
||||
)
|
||||
SELECT * FROM UNNEST(
|
||||
$1::bigint[],
|
||||
$2::text[],
|
||||
$3::double precision[],
|
||||
$4::double precision[],
|
||||
$5::septa_stop_type[]
|
||||
)
|
||||
ON CONFLICT DO NOTHING
|
||||
;
|
||||
")
|
||||
.bind(&ids[..])
|
||||
.bind(&names[..])
|
||||
.bind(&lats[..])
|
||||
.bind(&lngs[..])
|
||||
.bind(&stop_types[..])
|
||||
.execute(&mut **tx)
|
||||
.await?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn insert(&self, tx: &mut Transaction<'_, Postgres>) -> anyhow::Result<()> {
|
||||
Self::insert_many(vec![self.clone()], tx).await?;
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
impl FromSeptaJson<septa_json::route_stop::RouteStop> for Stop {
|
||||
fn from_septa_json(json_station: septa_json::route_stop::RouteStop) -> ::anyhow::Result<Box<Stop>> {
|
||||
let mut name = json_station.stop_name;
|
||||
let mut stop_type = StopType::Normal;
|
||||
|
||||
if let Some(new_name) = name.strip_suffix("- MNBS") {
|
||||
stop_type = StopType::MiddleBlockNearSide;
|
||||
name = new_name.to_string();
|
||||
}
|
||||
|
||||
if let Some(new_name) = name.strip_suffix("- FS") {
|
||||
stop_type = StopType::FarSide;
|
||||
name = new_name.to_string();
|
||||
}
|
||||
|
||||
Ok(Box::new(Stop {
|
||||
name,
|
||||
id: json_station.stop_id,
|
||||
lat: json_station.stop_lat,
|
||||
lng: json_station.stop_lon,
|
||||
stop_type
|
||||
}))
|
||||
}
|
||||
}
|
||||
118
data_loader/src/septa/stop_schedule.rs
Normal file
118
data_loader/src/septa/stop_schedule.rs
Normal file
|
|
@ -0,0 +1,118 @@
|
|||
use sqlx::{Postgres, Transaction};
|
||||
use crate::traits::{DbObj, FromSeptaJson};
|
||||
use crate::septa_json;
|
||||
|
||||
use libseptastic::stop_schedule::StopSchedule;
|
||||
|
||||
impl DbObj<StopSchedule> for StopSchedule {
|
||||
async fn create_table(tx: &mut Transaction<'_, Postgres>) -> anyhow::Result<()> {
|
||||
sqlx::query("
|
||||
CREATE TABLE IF NOT EXISTS septa_stop_schedules (
|
||||
route_id TEXT NOT NULL,
|
||||
trip_id TEXT NOT NULL,
|
||||
service_id TEXT NOT NULL,
|
||||
direction_id BIGINT NOT NULL,
|
||||
arrival_time BIGINT NOT NULL,
|
||||
stop_id BIGINT NOT NULL,
|
||||
stop_sequence BIGINT NOT NULL,
|
||||
|
||||
PRIMARY KEY (trip_id, stop_id),
|
||||
|
||||
FOREIGN KEY (route_id) REFERENCES septa_routes(id) ON DELETE CASCADE,
|
||||
FOREIGN KEY (stop_id) REFERENCES septa_stops(id) ON DELETE CASCADE
|
||||
);
|
||||
")
|
||||
//FOREIGN KEY (route_id, direction_id) REFERENCES septa_directions(route_id, direction_id) ON DELETE CASCADE,
|
||||
.execute(&mut **tx)
|
||||
.await?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
async fn insert_many(scheds: Vec<StopSchedule>, tx: &mut Transaction<'_, Postgres>) -> anyhow::Result<()> {
|
||||
let mut route_ids: Vec<String> = Vec::new();
|
||||
let mut trip_ids: Vec<String> = Vec::new();
|
||||
let mut service_ids: Vec<String> = Vec::new();
|
||||
let mut direction_ids: Vec<i64> = Vec::new();
|
||||
let mut arrival_times: Vec<i64> = Vec::new();
|
||||
let mut stop_ids: Vec<i64> = Vec::new();
|
||||
let mut stop_sequences: Vec<i64> = Vec::new();
|
||||
|
||||
for sched in scheds {
|
||||
route_ids.push(sched.route_id);
|
||||
trip_ids.push(sched.trip_id);
|
||||
service_ids.push(sched.service_id);
|
||||
direction_ids.push(sched.direction_id);
|
||||
arrival_times.push(sched.arrival_time);
|
||||
stop_ids.push(sched.stop_id);
|
||||
stop_sequences.push(sched.stop_sequence);
|
||||
}
|
||||
|
||||
sqlx::query("
|
||||
|
||||
INSERT INTO septa_stop_schedules (
|
||||
route_id,
|
||||
trip_id,
|
||||
service_id,
|
||||
direction_id,
|
||||
arrival_time,
|
||||
stop_id,
|
||||
stop_sequence
|
||||
)
|
||||
SELECT * FROM UNNEST(
|
||||
$1::text[],
|
||||
$2::text[],
|
||||
$3::text[],
|
||||
$4::bigint[],
|
||||
$5::bigint[],
|
||||
$6::bigint[],
|
||||
$7::bigint[]
|
||||
)
|
||||
ON CONFLICT DO NOTHING
|
||||
;
|
||||
")
|
||||
.bind(&route_ids[..])
|
||||
.bind(&trip_ids[..])
|
||||
.bind(&service_ids[..])
|
||||
.bind(&direction_ids[..])
|
||||
.bind(&arrival_times[..])
|
||||
.bind(&stop_ids[..])
|
||||
.bind(&stop_sequences[..])
|
||||
.execute(&mut **tx)
|
||||
.await?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn insert(&self, tx: &mut Transaction<'_, Postgres>) -> anyhow::Result<()> {
|
||||
Self::insert_many(vec![self.clone()], tx).await?;
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
impl FromSeptaJson<septa_json::stop_schedule::StopSchedule> for StopSchedule {
|
||||
fn from_septa_json(json_sched: septa_json::stop_schedule::StopSchedule) -> ::anyhow::Result<Box<StopSchedule>> {
|
||||
let time_parts: Vec<&str> = json_sched.arrival_time.split(":").collect();
|
||||
|
||||
let arrival_time: i64 = {
|
||||
let hour: i64 = time_parts[0].parse::<i64>()?;
|
||||
let minute: i64 = time_parts[1].parse::<i64>()?;
|
||||
let second: i64 = time_parts[2].parse::<i64>()?;
|
||||
|
||||
(hour*3600) + (minute * 60) + second
|
||||
};
|
||||
|
||||
Ok(Box::new(StopSchedule {
|
||||
route_id: json_sched.route_id,
|
||||
trip_id: match json_sched.trip_id{
|
||||
septa_json::stop_schedule::TripId::RegionalRail(x) => x,
|
||||
septa_json::stop_schedule::TripId::Other(y) => y.to_string()
|
||||
},
|
||||
service_id: json_sched.service_id,
|
||||
// FIXME: Actually get direction
|
||||
direction_id: json_sched.direction_id,
|
||||
arrival_time,
|
||||
stop_id: json_sched.stop_id,
|
||||
stop_sequence: json_sched.stop_sequence
|
||||
}))
|
||||
}
|
||||
}
|
||||
Loading…
Add table
Add a link
Reference in a new issue