reddit-image-wall-rs/src/sync/mod.rs

656 lines
25 KiB
Rust

mod model;
pub use self::model::*;
use super::subprograms::*;
use ansi_term::Colour;
use itertools::Itertools;
use std::collections::HashMap;
use std::sync::mpsc;
use threadpool::ThreadPool;
pub static SUBPROGRAM: Subprogram = Subprogram {
name: SUBPROGRAM_NAME,
wrapper,
};
pub static SUBPROGRAM_NAME: &str = "sync";
struct SyncCallable {
max_workers: usize,
ignore_statistical_leverager: bool,
undersampled_check_time: usize,
tolerable_error_log_size: i32,
}
impl SubprogramWithArguments for SyncCallable {
fn call(&self) -> Result<(), String> {
main(
self.max_workers,
self.ignore_statistical_leverager,
self.undersampled_check_time,
self.tolerable_error_log_size,
)
}
}
fn wrapper<'a>(arg: &HashMap<String, String>) -> WrapperFnRet<'a> {
let mw = parse_from::<usize>(arg, &"max_workers".to_string(), &"8".to_string())?;
let tls = parse_from::<i32>(
arg,
&"tolerable_error_log_size".to_string(),
&"5".to_string(),
)?;
let uct = parse_from::<i32>(
arg,
&"undersampled_check_time".to_string(),
&format!("{}", 3 * 3600),
)?;
Ok(Box::new(SyncCallable {
max_workers: mw,
undersampled_check_time: if uct < 0 { 0usize } else { uct as usize },
tolerable_error_log_size: tls,
ignore_statistical_leverager: uct < 0,
}))
}
static SUBREDDIT_FLAGS: [(&str, &str); 4] = [
("no_download", "nodownload.flag"),
("no_sfw", "nosfw.flag"),
("no_nsfw", "nonsfw.flag"),
("wallpaper", "wallpaper.flag"),
];
fn main(
max_workers: usize,
ignore_statistical_leverager: bool,
undersampled_check_time: usize,
tolerable_error_log_size: i32,
) -> Result<(), String> {
println!("[sync] {}", Colour::Green.paint("Started"));
let workers_to_use = usize::min(1 << 16, usize::max(1, max_workers));
let (subreddits, mut subreddits_info, mut subreddits_posts, subreddits_timestamps) =
fix_inner_consistency();
println!(
"[sync] {}",
Colour::Cyan
.bold()
.paint("Syncing link lists with remote servers...")
);
let mut filtered_subreddits: Vec<String> = vec![];
for subreddit in subreddits {
if subreddit_error_within_tolerance(&subreddit, tolerable_error_log_size) {
if ignore_statistical_leverager
|| statistically_expects_post(
&subreddit,
subreddits_posts.get(&subreddit).unwrap(),
*subreddits_timestamps.get(&subreddit).unwrap(),
undersampled_check_time,
false,
)
{
filtered_subreddits.push(subreddit);
} else {
println!(
"[sync] {}",
Colour::Purple.paint(format!("Skipped on statistical guess: {:?}", subreddit))
);
}
} else {
println!(
"[sync] {}",
Colour::Purple.paint(format!("Skipped due too many errors: {:?}", subreddit))
);
}
}
println!(
"[sync] {}",
Colour::Purple.bold().paint(format!(
"Will check {} subreddits",
filtered_subreddits.len()
))
);
let (tx, rx): (
mpsc::Sender<Option<(String, Option<SubredditInfoStored>, SubredditPostsStore)>>,
mpsc::Receiver<Option<(String, Option<SubredditInfoStored>, SubredditPostsStore)>>,
) = mpsc::channel();
let thread_pool = ThreadPool::new(workers_to_use);
for subreddit in filtered_subreddits {
let thread_tx = tx.clone();
let subreddit_posts: SubredditPostsStore =
subreddits_posts.get(&subreddit).unwrap().clone();
let subreddit_info: Option<SubredditInfoStored> =
subreddits_info.get(&subreddit).map(|x| x.clone());
let subreddit_copied: String = subreddit.clone();
thread_pool.execute(move || {
syncer_thread(
&subreddit_copied,
&thread_tx,
&subreddit_posts,
&subreddit_info,
)
});
}
thread_pool.join();
tx.send(None).unwrap();
for msg in rx {
if let Some((subreddit, subreddit_info_opt, subreddit_posts)) = msg {
subreddits_posts.insert(subreddit.clone(), subreddit_posts);
if let Some(subreddit_info) = subreddit_info_opt {
subreddits_info.insert(subreddit, subreddit_info);
}
} else {
break;
}
}
println!(
"[sync] {}",
Colour::Cyan.bold().paint("Consolidating added data...")
);
save_final_result(subreddits_posts, subreddits_info);
// fix_inner_consistency();
std::fs::remove_file(get_db_dir(None).join("sync_unfinished.flag")).unwrap_or(());
println!("[sync] {}", Colour::Green.bold().paint("Done"));
Ok(())
}
fn syncer_thread(
subreddit_folder_name: &str,
result_tx: &mpsc::Sender<Option<(String, Option<SubredditInfoStored>, SubredditPostsStore)>>,
subreddit_posts: &SubredditPostsStore,
subreddit_info: &Option<SubredditInfoStored>,
) {
let subreddit_folder = get_db_dir(Some("r")).join(subreddit_folder_name);
let subreddit = subreddit_folder_name.to_lowercase();
// if subreddit != "yiff" {
// return;
// }
println!(
"[sync][syncer_thread] {}",
Colour::Green.paint(format!("Spawned {:?}", subreddit))
);
let client = reqwest::blocking::ClientBuilder::new()
.user_agent(FIREFOX_USER_AGENT)
.cookie_store(true)
.gzip(true)
// .brotli(true)
.referer(false)
.redirect(reqwest::redirect::Policy::limited(50))
.build()
.unwrap();
let mut stored_posts: SubredditPostsStore = subreddit_posts.clone();
let mut stored_info_option: Option<SubredditInfoStored> = None;
let mut next_page_link_optional = Some(build_gateway_link(&subreddit, None, None));
let mut page_count = 0usize;
while let Some(next_page_link) = next_page_link_optional {
page_count += 1;
next_page_link_optional = None;
if let Ok(response) = client.get(&next_page_link).send() {
// println!("{} {}", next_page_link, page_count);
// println!("{:?}", response);
// 404 => Banned subreddit
// 403 => Private subreddit
let response_status: u16 = response.status().as_u16();
if response_status != 200 {
println!(
"[sync][syncer_thread] {}",
Colour::Red.bold().paint(format!(
"FAILED on page {:>2} of {:?} on HTTP code {}",
page_count, subreddit, response_status
))
);
subreddit_error_append(
&subreddit_folder_name,
&format!("HTTP code {}", response_status),
);
} else {
println!(
"[sync][syncer_thread] {}",
Colour::Yellow.paint(format!(
"Loaded page {:>2} of {:?}",
page_count, subreddit
))
);
let response_text_result = response.text();
if let Ok(response_text) = response_text_result {
match serde_json::from_str::<SubredditInfoFromGateway>(&response_text) {
Err(e) => {
let err_dir = get_db_dir(Some("r_err"));
std::fs::write(
err_dir.join(format!("{}.err", subreddit_folder_name)),
format!("{:?}", e),
)
.unwrap();
std::fs::write(
err_dir.join(format!("{}.rsp", subreddit_folder_name)),
&response_text,
)
.unwrap();
subreddit_error_append(&subreddit_folder_name, &format!("{:?}", e));
// println!("{}", response_text);
// println!("{:?}", e);
}
Ok(response_info) => {
let subreddit_id_opt: Vec<String> = response_info
.subreddits
.keys()
.filter(|x| response_info.subredditAboutInfo.contains_key(*x))
.filter(|x| response_info.postFlair.contains_key(*x))
.cloned()
.collect::<Vec<String>>();
if !subreddit_id_opt.is_empty() {
let subreddit_id: String = subreddit_id_opt[0].to_string();
stored_info_option = Some(
response_info
.to_stored(subreddit_id.clone(), subreddit.clone()),
);
let page_post_ids: Vec<String> =
response_info.postIds.clone().unwrap_or_default();
let page_posts: Vec<&SubredditPostFromGateway> = (&page_post_ids)
.clone()
.iter()
.map(|x| response_info.posts.get(&x.clone()).clone().unwrap())
.collect();
let mut page_had_something_new = false;
for page_post in page_posts {
if (page_post.source.clone().map(|x| x.url).is_some()
|| page_post
.media
.clone()
.and_then(|x| x.content)
.is_some())
&& page_post.domain.is_some()
&& page_post.id.len() < 20
{
let storable_post = page_post.storable();
if !stored_posts.links.contains(&storable_post) {
stored_posts.date_first = stored_posts
.date_first
.min(storable_post.timestamp);
stored_posts.date_last =
stored_posts.date_last.max(storable_post.timestamp);
stored_posts.links.insert(0, storable_post);
page_had_something_new = true;
}
}
}
if response_info.token.is_none() || !page_had_something_new {
continue;
}
next_page_link_optional = Some(build_gateway_link(
&subreddit,
Some(&response_info.token.unwrap()),
Some(response_info.dist),
));
}
subreddit_error_empty(&subreddit_folder_name);
}
}
}
}
}
}
if let Some(stored_info) = &stored_info_option {
std::fs::write(
subreddit_folder.join("meta.json"),
serde_json::to_string_pretty(&stored_info).unwrap(),
)
.unwrap();
}
stored_posts
.links
.sort_by(|a, b| b.timestamp.partial_cmp(&a.timestamp).unwrap());
stored_posts.links.retain(|x| x.datakey.len() < 20);
std::fs::write(
subreddit_folder.join("subreddit.json"),
serde_json::to_string_pretty(&stored_posts).unwrap(),
)
.unwrap();
std::fs::write(get_db_dir(None).join("sync_unfinished.flag"), []).unwrap();
println!(
"[sync][syncer_thread] {}",
Colour::Green
.bold()
.paint(format!("Joining {:?}...", subreddit))
);
result_tx
.send(Some((
subreddit_folder_name.to_string(),
stored_info_option.or_else(|| subreddit_info.clone()),
stored_posts,
)))
.unwrap();
}
const GATEWAY_LINK_ARGS: [&str; 6] = [
"redditWebClient=web2x",
"app=web2x-client-production",
"allow_over18=1",
"layout=card",
"include=identity",
"sort=new",
];
fn build_gateway_link(subreddit: &str, after: Option<&str>, dist: Option<i64>) -> String {
let mut gateway_args: Vec<String> =
GATEWAY_LINK_ARGS.iter().map(|x| String::from(*x)).collect();
if let Some(x) = after {
gateway_args.push(format!("after={}", x));
}
if let Some(x) = dist {
if x > 0 {
gateway_args.push(format!("dist={}", x));
}
}
format!(
"https://gateway.reddit.com/desktopapi/v1/subreddits/{}?{}",
subreddit,
gateway_args.join("&")
)
}
fn fix_inner_consistency() -> (
Vec<String>,
HashMap<String, SubredditInfoStored>,
HashMap<String, SubredditPostsStore>,
HashMap<String, f64>,
) {
let db_dir = get_db_dir(Some("r"));
let subreddits_posts_path = get_db_dir(None).join("r.json");
let subreddits_info_path = get_db_dir(None).join("ri.json");
let subreddits_flags_path = get_db_dir(None).join("rf.json");
let mut subreddit_folder_paths_mut: Vec<PathBuf> = std::fs::read_dir(db_dir.clone())
.unwrap()
.filter(|p| p.as_ref().unwrap().file_type().unwrap().is_dir())
.map(|f| f.unwrap().path())
.collect();
subreddit_folder_paths_mut.sort();
let subreddit_folder_paths = subreddit_folder_paths_mut;
let subreddit_folders: Vec<&str> = (&subreddit_folder_paths)
.iter()
.map(|p| p.file_name().unwrap().to_str().unwrap())
.collect();
let subreddits_timestamps: HashMap<String, f64> = (&subreddit_folder_paths)
.iter()
.map(|p| {
(
String::from(p.file_name().unwrap().to_str().unwrap()),
p.join("subreddit.json")
.metadata()
.or_else(|_: std::io::Error| subreddits_posts_path.metadata())
.map(|x: std::fs::Metadata| {
x.modified()
.unwrap()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_secs_f64()
})
.unwrap_or(0.0),
)
})
.collect();
println!(
"[sync][fix_inner_consistency] {}",
Colour::Cyan.paint(">> Reading databases (0/3)")
);
let mut subreddits_posts: HashMap<String, SubredditPostsStore> = serde_json::from_str(
&std::fs::read_to_string(&subreddits_posts_path).unwrap_or_else(|_| String::from("{}")),
)
.unwrap_or_default();
println!(
"[sync][fix_inner_consistency] {}",
Colour::Cyan.paint(">> Reading databases (1/3)")
);
let mut subreddits_info: HashMap<String, SubredditInfoStored> = serde_json::from_str(
&std::fs::read_to_string(&subreddits_info_path).unwrap_or_else(|_| String::from("{}")),
)
.unwrap_or_default();
println!(
"[sync][fix_inner_consistency] {}",
Colour::Cyan.paint(">> Reading databases (2/3)")
);
let mut subreddits_flags: HashMap<String, HashMap<String, bool>> = serde_json::from_str(
&std::fs::read_to_string(&subreddits_flags_path).unwrap_or_else(|_| String::from("{}")),
)
.unwrap_or_default();
println!(
"[sync][fix_inner_consistency] {}",
Colour::Cyan.bold().paint(">> Reading databases (3/3)")
);
if get_db_dir(None).join("sync_unfinished.flag").exists() {
for (subreddit_seq, subreddit_folder) in subreddit_folders.iter().enumerate() {
println!(
"[sync][fix_inner_consistency] {}",
Colour::Cyan.paint(format!(
">> Preprocessing {:0>3}/{:0>3}: {}",
subreddit_seq + 1,
subreddit_folders.len(),
subreddit_folder
))
);
let subreddit_dir = db_dir.clone().join(&subreddit_folder);
let subreddit = subreddit_folder.to_lowercase();
let subreddit_flags: HashMap<String, bool> = SUBREDDIT_FLAGS
.iter()
.map(|f| (String::from(f.0), subreddit_dir.join(f.1).exists()))
.collect();
subreddits_flags.insert(subreddit.clone(), subreddit_flags);
let subreddit_info_opt_old: Option<&SubredditInfoStored> =
subreddits_info.get(&subreddit.clone());
let subreddit_posts_opt_old: Option<&SubredditPostsStore> =
subreddits_posts.get(&subreddit.clone());
let sr_mfp = db_dir.clone().join(&subreddit_folder).join("meta.json");
let sr_sfp = db_dir
.clone()
.join(&subreddit_folder)
.join("subreddit.json");
let subreddit_info_opt: serde::export::Result<SubredditInfoStored, _> =
serde_json::from_str(
&std::fs::read_to_string(&sr_mfp).unwrap_or_else(|_| String::from("")),
);
let subreddit_posts_opt: serde::export::Result<SubredditPostsStore, _> =
serde_json::from_str(
&std::fs::read_to_string(&sr_sfp).unwrap_or_else(|_| String::from("")),
);
match subreddit_posts_opt {
Ok(subreddit_posts) => {
subreddits_posts.insert(subreddit.clone(), subreddit_posts);
}
Err(_) => {
if let Some(subreddit_posts_old) = subreddit_posts_opt_old {
std::fs::write(
&sr_sfp,
serde_json::to_string_pretty(subreddit_posts_old).unwrap(),
)
.unwrap();
}
}
};
match subreddit_info_opt {
Ok(subreddit_info) => {
subreddits_info.insert(subreddit.clone(), subreddit_info);
}
Err(_) => {
if let Some(subreddit_info_old) = subreddit_info_opt_old {
std::fs::write(
&sr_mfp,
serde_json::to_string_pretty(subreddit_info_old).unwrap(),
)
.unwrap();
}
}
};
}
println!(
"[sync][fix_inner_consistency] {}",
Colour::Green.paint("Writing databases known to be structurally consistent (0/3)")
);
std::fs::write(
&subreddits_flags_path,
serde_json::to_string_pretty(&subreddits_flags).unwrap(),
)
.unwrap();
println!(
"[sync][fix_inner_consistency] {}",
Colour::Green.paint("Writing databases known to be structurally consistent (1/3)")
);
std::fs::write(
&subreddits_info_path,
serde_json::to_string_pretty(&subreddits_info).unwrap(),
)
.unwrap();
println!(
"[sync][fix_inner_consistency] {}",
Colour::Green.paint("Writing databases known to be structurally consistent (2/3)")
);
std::fs::write(
&subreddits_posts_path,
serde_json::to_string_pretty(&subreddits_posts).unwrap(),
)
.unwrap();
println!(
"[sync][fix_inner_consistency] {}",
Colour::Green.paint("Writing databases known to be structurally consistent (3/3)")
);
}
(
subreddit_folders
.iter()
.copied()
.map(|i| i.to_string())
.collect(),
subreddits_info,
subreddits_posts,
subreddits_timestamps,
)
}
fn subreddit_error_append(subreddit_folder_name: &str, error: &String) {
let subreddit_folder = get_db_dir(Some("r")).join(subreddit_folder_name);
let errors_path = subreddit_folder.join("errors.json");
let mut errors: Vec<LoggedStringifiedError> = vec![];
if errors_path.exists() {
errors = serde_json::from_str::<Vec<LoggedStringifiedError>>(
&std::fs::read_to_string(&errors_path).unwrap(),
)
.unwrap();
}
errors.push(LoggedStringifiedError::new(
error.clone(),
std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_secs_f64(),
));
std::fs::write(errors_path, serde_json::to_string_pretty(&errors).unwrap()).unwrap();
}
fn subreddit_error_empty(subreddit_folder_name: &str) {
let subreddit_folder = get_db_dir(Some("r")).join(subreddit_folder_name);
let errors_path = subreddit_folder.join("errors.json");
if errors_path.exists() {
std::fs::remove_file(errors_path).unwrap();
}
}
fn subreddit_error_within_tolerance(subreddit: &str, tolerable_error_log_size: i32) -> bool {
if tolerable_error_log_size < 0 {
return true;
}
let subreddit_folder = get_db_dir(Some("r")).join(subreddit);
let errors_path = subreddit_folder.join("errors.json");
let mut errors: Vec<LoggedStringifiedError> = vec![];
if errors_path.exists() {
errors = serde_json::from_str::<Vec<LoggedStringifiedError>>(
&std::fs::read_to_string(&errors_path).unwrap(),
)
.unwrap();
}
errors.len() as i64 <= tolerable_error_log_size as i64
}
fn statistically_expects_post(
subreddit: &str,
stored_posts: &SubredditPostsStore,
latest_file_change: f64,
undersampled_check_time: usize,
use_current_time_with_average: bool,
) -> bool {
let subreddit_path = get_db_dir(Some("r")).join(subreddit).join("subreddit.json");
if !subreddit_path.exists() {
return true;
}
let current_time = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap();
let mut recent_timestamps: Vec<i64> = stored_posts
.links
.iter()
.map(|x| x.timestamp)
.sorted()
.rev()
.take(21)
.collect();
// if subreddit == "furry" {
// println!("{:?} {:?} {:?}", subreddit, current_time, recent_timestamps);
// }
if recent_timestamps.len() < 2 {
return recent_timestamps.len() <= 0
|| (recent_timestamps[0] - latest_file_change as i64) > undersampled_check_time as i64;
}
if use_current_time_with_average {
recent_timestamps.insert(0, current_time.as_secs() as i64);
}
let recent_timediffs: Vec<i64> = (0..(recent_timestamps.len() - 1))
.map(|i| recent_timestamps[i] - recent_timestamps[i + 1])
.filter(|v| *v > 0)
.collect();
if recent_timediffs.len() <= 0 {
return true;
}
let recent_timediffs_avg: f64 =
(recent_timediffs.iter().sum::<i64>() as f64) / (recent_timediffs.len() as f64);
if !use_current_time_with_average
&& current_time.as_secs_f64() >= latest_file_change + recent_timediffs_avg * 5.0
{
return statistically_expects_post(
subreddit,
stored_posts,
latest_file_change,
undersampled_check_time,
true,
);
} else {
return current_time.as_secs_f64() >= latest_file_change + recent_timediffs_avg;
}
}
fn save_final_result(
subreddits_posts: HashMap<String, SubredditPostsStore>,
subreddits_info: HashMap<String, SubredditInfoStored>,
) {
let subreddits_posts_path = get_db_dir(None).join("r.json");
let subreddits_info_path = get_db_dir(None).join("ri.json");
println!(
"[sync][save_final_result] {}",
Colour::Green.paint("Writing databases known to be structurally consistent (0/2)")
);
std::fs::write(
&subreddits_info_path,
serde_json::to_string_pretty(&subreddits_info).unwrap(),
)
.unwrap();
println!(
"[sync][save_final_result] {}",
Colour::Green.paint("Writing databases known to be structurally consistent (1/2)")
);
std::fs::write(
&subreddits_posts_path,
serde_json::to_string_pretty(&subreddits_posts).unwrap(),
)
.unwrap();
println!(
"[sync][save_final_result] {}",
Colour::Green.paint("Writing databases known to be structurally consistent (2/2)")
);
}