mostly works for fetching gtfs zips

This commit is contained in:
Nicholas Orlowsky 2025-10-08 07:27:15 -04:00
parent 167cffc868
commit f407992035
No known key found for this signature in database
GPG key ID: A9F3BA4C0AA7A70B
4 changed files with 679 additions and 42 deletions

668
data_loader/Cargo.lock generated

File diff suppressed because it is too large Load diff

View file

@ -18,3 +18,5 @@ env_logger = "0.11.8"
log = "0.4.27" log = "0.4.27"
reqwest = { version = "0.12.22", features = [ "json", "blocking" ] } reqwest = { version = "0.12.22", features = [ "json", "blocking" ] }
serde_yaml = "0.9.34" serde_yaml = "0.9.34"
gtfs-structures = "0.45.1"
zip = "5.1.1"

View file

@ -1,4 +1,7 @@
gtfs_zips: gtfs_zips:
- "https://www3.septa.org/developer/gtfs_public.zip" - uri: "https://www3.septa.org/developer/gtfs_public.zip"
- "https://www.njtransit.com/rail_data.zip" subzip: "google_rail.zip"
- "https://www.njtransit.com/bus_data.zip" - uri: "https://www3.septa.org/developer/gtfs_public.zip"
subzip: "google_bus.zip"
- uri: "https://www.njtransit.com/rail_data.zip"
- uri: "https://www.njtransit.com/bus_data.zip"

View file

@ -1,10 +1,11 @@
use std::{clone, env, fs::File, io::{Read, Write}, path::{Path, PathBuf}, sync::{Arc, Mutex}, thread, time::Duration}; use std::{clone, env, fs::{self, File}, io::{Cursor, Read, Write}, path::{Path, PathBuf}, sync::{Arc, Mutex}, thread, time::Duration};
use dotenv::dotenv; use dotenv::dotenv;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use sqlx::postgres::PgPoolOptions; use sqlx::postgres::PgPoolOptions;
use env_logger::{Builder, Env}; use env_logger::{Builder, Env};
use log::{error, info, warn}; use log::{error, info, warn};
use zip::ZipArchive;
#[tokio::main] #[tokio::main]
@ -18,7 +19,14 @@ async fn main() -> ::anyhow::Result<()> {
let mut file_contents = String::new(); let mut file_contents = String::new();
file.read_to_string(&mut file_contents); file.read_to_string(&mut file_contents);
let config_file = serde_yaml::from_str::<Config>(file_contents.as_str()); let config_file = serde_yaml::from_str::<Config>(file_contents.as_str())?;
let svc = GtfsPullService::new(config_file);
svc.start();
loop{
thread::sleep(Duration::from_secs(120));
}
let database_url = std::env::var("DATABASE_URL").expect("Database URL"); let database_url = std::env::var("DATABASE_URL").expect("Database URL");
let pool = PgPoolOptions::new() let pool = PgPoolOptions::new()
@ -34,13 +42,19 @@ async fn main() -> ::anyhow::Result<()> {
Ok(()) Ok(())
} }
#[derive(Serialize, Deserialize, PartialEq, Debug,Clone)]
struct GtfsSource {
pub uri: String,
pub subzip: Option<String>
}
#[derive(Serialize, Deserialize, PartialEq, Debug)] #[derive(Serialize, Deserialize, PartialEq, Debug)]
struct Config { struct Config {
gtfs_zips: Vec<String> pub gtfs_zips: Vec<GtfsSource>
} }
struct GtfsFile { struct GtfsFile {
pub url: String, pub source: GtfsSource,
pub hash: Option<String> pub hash: Option<String>
} }
@ -60,7 +74,7 @@ impl GtfsPullService {
Self { Self {
state: Arc::new(Mutex::new( state: Arc::new(Mutex::new(
GtfsPullServiceState { GtfsPullServiceState {
gtfs_files: config.gtfs_zips.iter().map(|f| { GtfsFile { url: f.clone(), hash: None} }).collect(), gtfs_files: config.gtfs_zips.iter().map(|f| { GtfsFile { source: f.clone(), hash: None} }).collect(),
tmp_dir: env::temp_dir() tmp_dir: env::temp_dir()
} }
)) ))
@ -90,9 +104,25 @@ impl GtfsPullService {
let l_state = state.lock().unwrap(); let l_state = state.lock().unwrap();
for gtfs_file in l_state.gtfs_files.iter() { for gtfs_file in l_state.gtfs_files.iter() {
let resp = reqwest::blocking::get(gtfs_file.url.clone())?; let gtfs = if let Some(subzip) = gtfs_file.source.subzip.clone() {
info!("Reading GTFS file at {} (subzip {})", gtfs_file.source.uri, subzip);
let res = reqwest::blocking::get(gtfs_file.source.uri.clone())?;
let outer_archive = res.bytes()?;
let mut archive = ZipArchive::new(Cursor::new(outer_archive))?;
archive.extract(l_state.tmp_dir.clone())?;
let mut file_path = l_state.tmp_dir.clone();
file_path.push(subzip.clone());
gtfs_structures::Gtfs::new(file_path.to_str().unwrap())?
} else {
info!("Reading GTFS file at {}", gtfs_file.source.uri);
gtfs_structures::Gtfs::new(gtfs_file.source.uri.as_str())?
};
} }
Ok(()) Ok(())
} }
} }