From 7db0656e891087219952db5dc5cb565a6794daaf Mon Sep 17 00:00:00 2001 From: Joshua Potter Date: Thu, 30 Dec 2021 16:53:29 -0500 Subject: [PATCH] Setup daemon tracking. Track original path before resolution for later git copying and messaging. --- src/cli.rs | 6 +- src/config.rs | 21 +++-- src/daemon.rs | 249 ++++++++++++++++++++++++++++++++++++-------------- src/lib.rs | 4 +- src/main.rs | 14 +-- src/path.rs | 100 ++++++++++++-------- 6 files changed, 268 insertions(+), 126 deletions(-) diff --git a/src/cli.rs b/src/cli.rs index ad0941a..6f15453 100644 --- a/src/cli.rs +++ b/src/cli.rs @@ -10,7 +10,7 @@ use std::io::Write; pub fn write_config(mut pending: PathConfig) -> config::Result<()> { println!( "Generating config at {}...\n", - Success.paint(pending.0.display().to_string()) + Success.paint(pending.0.unresolved().display().to_string()) ); print!( @@ -45,9 +45,9 @@ pub fn write_config(mut pending: PathConfig) -> config::Result<()> { pub fn list_packages(config: PathConfig) { println!( "Listing packages in {}...\n", - Success.paint(config.0.display().to_string()) + Success.paint(config.0.unresolved().display().to_string()) ); - // TODO(jrpotter): Alphabetize the output list. + // Alphabetical ordered ensured by B-tree implementation. for (k, _) in config.1.packages { println!("• {}", k); } diff --git a/src/config.rs b/src/config.rs index c2377d4..f48714d 100644 --- a/src/config.rs +++ b/src/config.rs @@ -1,7 +1,6 @@ -use super::path; -use super::path::{NormalPathBuf, Normalize}; +use super::path::ResPathBuf; use serde_derive::{Deserialize, Serialize}; -use std::collections::HashMap; +use std::collections::BTreeMap; use std::io::Write; use std::path::PathBuf; use std::{error, fmt, fs, io}; @@ -61,7 +60,7 @@ pub struct Package { #[derive(Debug, Deserialize, Serialize)] pub struct Config { pub remote: Remote, - pub packages: HashMap, + pub packages: BTreeMap, } impl Config { @@ -71,10 +70,10 @@ impl Config { } #[derive(Debug)] -pub struct PathConfig(pub NormalPathBuf, pub Config); +pub struct PathConfig(pub ResPathBuf, pub Config); impl PathConfig { - pub fn new(path: &NormalPathBuf, config: Option) -> Self { + pub fn new(path: &ResPathBuf, config: Option) -> Self { PathConfig( path.clone(), config.unwrap_or(Config { @@ -82,7 +81,7 @@ impl PathConfig { owner: "example-user".to_owned(), name: "home-config".to_owned(), }, - packages: HashMap::new(), + packages: BTreeMap::new(), }), ) } @@ -111,7 +110,7 @@ pub fn default_paths() -> Vec { DEFAULT_PATHS.iter().map(|s| PathBuf::from(s)).collect() } -pub fn load(candidates: &Vec) -> Result { +pub fn load(candidates: &Vec) -> Result { // When trying our paths, the only acceptable error is a `NotFound` file. // Anything else should be surfaced to the end user. for candidate in candidates { @@ -126,3 +125,9 @@ pub fn load(candidates: &Vec) -> Result { } Err(Error::MissingConfig) } + +pub fn reload(config: &PathConfig) -> Result { + // TODO(jrpotter): Let's add a proper logging solution. + println!("Configuration reloaded."); + load(&vec![config.0.clone()]) +} diff --git a/src/daemon.rs b/src/daemon.rs index 6bedb11..d058d33 100644 --- a/src/daemon.rs +++ b/src/daemon.rs @@ -1,99 +1,210 @@ use super::config; use super::config::PathConfig; use super::path; -use super::path::{NormalPathBuf, Normalize}; -use notify::{RecommendedWatcher, Watcher}; +use super::path::ResPathBuf; +use notify::{DebouncedEvent, RecommendedWatcher, RecursiveMode, Watcher}; use std::collections::HashSet; +use std::error::Error; use std::path::PathBuf; use std::sync::mpsc::channel; +use std::sync::mpsc::{Receiver, Sender, TryRecvError}; +use std::thread; use std::time::Duration; // TODO(jrpotter): Add logging. // TODO(jrpotter): Add pid file to only allow one daemon at a time. +// Used for both polling and debouncing file system events. +const WATCH_FREQUENCY: Duration = Duration::from_secs(5); + // ======================================== -// State +// Polling // ======================================== -struct WatchState { - // Paths that we were not able to watch properly but could potentially do so - // in the future. These include paths that did not exist at the time of - // canonicalization or did not have environment variables defined that may - // be defined later on. - pending: HashSet, - // Paths that we are currently watching. - watching: HashSet, - // Paths that are not valid and will never become valid. These may include - // paths that include prefixes or refer to directories that could never be - // reached (e.g. parent of root). - invalid: HashSet, +enum PollEvent { + Pending(PathBuf), + Clear, } -impl WatchState { - pub fn new(config: &PathConfig) -> notify::Result { - let mut pending: HashSet = HashSet::new(); - let mut watching: HashSet = HashSet::new(); - watching.insert(config.0.clone()); - // We try and resolve our configuration again here. We want to - // specifically track any new configs that may pop up with higher - // priority. - for path in config::default_paths() { - match path::normalize(&path)? { - // TODO(jrpotter): Check if the path can be canonicalized. - Normalize::Done(p) => watching.insert(p), - Normalize::Pending => pending.insert(path), - }; +fn resolve_pending(tx: &Sender, pending: &HashSet) -> Vec { + let mut to_remove = vec![]; + for path in pending { + match path::resolve(&path) { + Ok(Some(resolved)) => { + to_remove.push(path.clone()); + tx.send(DebouncedEvent::Create(resolved.into())) + .expect("File watcher channel closed."); + } + Ok(None) => (), + Err(e) => { + to_remove.push(path.clone()); + eprintln!( + "Encountered unexpected error {} when processing path {}", + e, + path.display() + ) + } } + } + to_remove +} + +fn poll_pending(tx: Sender, rx: Receiver) { + let mut pending = HashSet::new(); + loop { + match rx.try_recv() { + Ok(PollEvent::Pending(path)) => { + pending.insert(path); + } + Ok(PollEvent::Clear) => pending.clear(), + Err(TryRecvError::Empty) => { + resolve_pending(&tx, &pending).iter().for_each(|r| { + pending.remove(r); + }); + thread::sleep(WATCH_FREQUENCY); + } + Err(TryRecvError::Disconnected) => panic!("Polling channel closed."), + } + } +} + +// ======================================== +// File Watcher +// ======================================== + +struct WatchState<'a> { + poll_tx: Sender, + watcher: &'a mut RecommendedWatcher, + watching: HashSet, +} + +impl<'a> WatchState<'a> { + pub fn new( + poll_tx: Sender, + watcher: &'a mut RecommendedWatcher, + ) -> notify::Result { Ok(WatchState { - pending, - watching, - invalid: HashSet::new(), + poll_tx, + watcher, + watching: HashSet::new(), }) } + + fn send_poll(&self, event: PollEvent) { + self.poll_tx.send(event).expect("Polling channel closed."); + } + + fn watch(&mut self, path: ResPathBuf) { + match self.watcher.watch(&path, RecursiveMode::NonRecursive) { + Ok(()) => { + self.watching.insert(path); + } + Err(e) => { + eprintln!( + "Encountered unexpected error {} when watching path {}", + e, + path.unresolved().display() + ); + } + } + } + + /// Reads in the new path config, updating all watched and pending files + /// according to the packages in the specified config. + pub fn update(&mut self, config: &PathConfig) { + self.send_poll(PollEvent::Clear); + for path in &self.watching { + match self.watcher.unwatch(&path) { + Ok(()) => (), + Err(e) => { + eprintln!( + "Encountered unexpected error {} when unwatching path {}", + e, + path.unresolved().display() + ); + } + } + } + self.watching.clear(); + for (_, package) in &config.1.packages { + for path in &package.configs { + match path::resolve(&path) { + Ok(None) => self.send_poll(PollEvent::Pending(path.clone())), + Ok(Some(n)) => self.watch(n), + Err(_) => (), + } + } + } + } } // ======================================== // Daemon // ======================================== -fn reload_config(config: &PathConfig) -> notify::Result { - let state = WatchState::new(config)?; - Ok(state) -} - -pub fn launch(config: PathConfig) -> notify::Result<()> { - let (tx, rx) = channel(); - // Create a "debounced" watcher. Events will not trigger until after the - // specified duration has passed with no additional changes. - let mut watcher: RecommendedWatcher = Watcher::new(tx, Duration::from_secs(2))?; - // for (_, package) in &config.1.packages { - // for path in &package.configs { - // match normalize_path(&path) { - // // `notify-rs` is not able to handle files that do not exist and - // // are then created. This is handled internally by the library - // // via the `fs::canonicalize` which fails on missing paths. So - // // track which paths end up missing and apply polling on them. - // Ok(normalized) => match watcher.watch(&normalized, RecursiveMode::NonRecursive) { - // Ok(_) => { - // tracked_paths.insert(normalized); - // } - // Err(notify::Error::PathNotFound) => { - // missing_paths.insert(normalized); - // } - // Err(e) => return Err(e), - // }, - // // TODO(jrpotter): Retry even in cases where environment - // // variables are not defined. - // Err(e) => eprintln!("{}", e), - // }; - // } - // } - // This is a simple loop, but you may want to use more complex logic here, - // for example to handle I/O. +pub fn launch(mut config: PathConfig) -> Result<(), Box> { + let (poll_tx, poll_rx) = channel(); + let (watch_tx, watch_rx) = channel(); + let watch_tx1 = watch_tx.clone(); + // `notify-rs` internally uses `fs::canonicalize` on each path we try to + // watch, but this fails if no file exists at the given path. In these + // cases, we rely on a basic polling strategy to check if the files ever + // come into existence. + thread::spawn(move || poll_pending(watch_tx, poll_rx)); + // Track our original config file separately from the other files that may + // be defined in the config. We want to make sure we're always alerted on + // changes to it for hot reloading purposes, and not worry that our wrapper + // will ever clear it from its watch state. + let mut watcher: RecommendedWatcher = Watcher::new(watch_tx1, WATCH_FREQUENCY)?; + watcher.watch(&config.0, RecursiveMode::NonRecursive)?; + let mut state = WatchState::new(poll_tx, &mut watcher)?; + state.update(&config); loop { - match rx.recv() { - Ok(event) => println!("{:?}", event), - Err(e) => println!("watch error: {:?}", e), + // Received paths should always be the fully resolved ones so safe to + // compare against our current config path. + match watch_rx.recv() { + Ok(DebouncedEvent::NoticeWrite(_)) => { + // Intentionally ignore in favor of stronger signals. + } + Ok(DebouncedEvent::NoticeRemove(_)) => { + // Intentionally ignore in favor of stronger signals. + } + Ok(DebouncedEvent::Create(p)) => { + if config.0 == p { + config = config::reload(&config)?; + state.update(&config); + } + println!("Create {}", p.display()); + } + Ok(DebouncedEvent::Write(p)) => { + if config.0 == p { + config = config::reload(&config)?; + state.update(&config); + } + println!("Write {}", p.display()); + } + // Do not try reloading our primary config in any of the following + // cases since it may lead to undesired behavior. If our config has + // e.g. been removed, let's just keep using what we have in memory + // in the chance it may be added back. + Ok(DebouncedEvent::Chmod(p)) => { + println!("Chmod {}", p.display()); + } + Ok(DebouncedEvent::Remove(p)) => { + println!("Remove {}", p.display()); + } + Ok(DebouncedEvent::Rename(src, dst)) => { + println!("Rename {} {}", src.display(), dst.display()) + } + Ok(DebouncedEvent::Rescan) => { + println!("Rescanning"); + } + Ok(DebouncedEvent::Error(e, _maybe_path)) => { + println!("Error {}", e); + } + Err(e) => { + println!("watch error: {:?}", e); + } } } } diff --git a/src/lib.rs b/src/lib.rs index 03fe9a3..0fefb53 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -4,7 +4,7 @@ pub mod daemon; pub mod path; use config::PathConfig; -use path::NormalPathBuf; +use path::ResPathBuf; use std::error::Error; use std::io; @@ -18,7 +18,7 @@ pub fn run_daemon(config: PathConfig) -> Result<(), Box> { Ok(()) } -pub fn run_init(candidates: Vec) -> Result<(), config::Error> { +pub fn run_init(candidates: Vec) -> Result<(), config::Error> { debug_assert!(!candidates.is_empty(), "Empty candidates found in `init`."); if candidates.is_empty() { return Err(config::Error::FileError(io::Error::new( diff --git a/src/main.rs b/src/main.rs index 606b919..d0dbe7c 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,5 +1,5 @@ use clap::{App, AppSettings, Arg}; -use homesync::path::{NormalPathBuf, Normalize}; +use homesync::path::ResPathBuf; use std::error::Error; use std::io; use std::path::PathBuf; @@ -54,18 +54,18 @@ fn dispatch(matches: clap::ArgMatches) -> Result<(), Box> { } } -fn find_candidates(matches: &clap::ArgMatches) -> Result, Box> { +fn find_candidates(matches: &clap::ArgMatches) -> Result, io::Error> { let candidates = match matches.value_of("config") { Some(config_match) => vec![PathBuf::from(config_match)], None => homesync::config::default_paths(), }; - let mut normals = vec![]; + let mut resolved = vec![]; for candidate in candidates { - if let Ok(Normalize::Done(n)) = homesync::path::normalize(&candidate) { - normals.push(n); + if let Ok(Some(r)) = homesync::path::resolve(&candidate) { + resolved.push(r); } } - if normals.is_empty() { + if resolved.is_empty() { if let Some(config_match) = matches.value_of("config") { Err(io::Error::new( io::ErrorKind::NotFound, @@ -79,6 +79,6 @@ fn find_candidates(matches: &clap::ArgMatches) -> Result, Box ))? } } else { - Ok(normals) + Ok(resolved) } } diff --git a/src/path.rs b/src/path.rs index 440ba23..f1720b9 100644 --- a/src/path.rs +++ b/src/path.rs @@ -9,45 +9,67 @@ use std::path::{Component, Path, PathBuf}; // Path // ======================================== -#[derive(Clone, Debug, Eq, PartialEq)] -pub struct NormalPathBuf(PathBuf); +#[derive(Clone, Debug)] +pub struct ResPathBuf { + inner: PathBuf, + unresolved: PathBuf, +} -impl NormalPathBuf { +impl ResPathBuf { pub fn display(&self) -> std::path::Display { - self.0.display() + self.inner.display() + } + + pub fn unresolved(&self) -> &PathBuf { + return &self.unresolved; } } -impl AsRef for NormalPathBuf { +impl PartialEq for ResPathBuf { + fn eq(&self, other: &Self) -> bool { + self.inner == other.inner + } +} + +impl PartialEq for ResPathBuf { + fn eq(&self, other: &PathBuf) -> bool { + self.inner == *other + } +} + +impl Eq for ResPathBuf {} + +impl From for PathBuf { + fn from(path: ResPathBuf) -> PathBuf { + path.inner + } +} + +impl AsRef for ResPathBuf { fn as_ref(&self) -> &Path { - &self.0 + &self.inner } } -impl AsRef for NormalPathBuf { +impl AsRef for ResPathBuf { fn as_ref(&self) -> &PathBuf { - &self.0 + &self.inner } } -impl Hash for NormalPathBuf { +impl Hash for ResPathBuf { fn hash(&self, h: &mut H) { - for component in self.0.components() { + for component in self.inner.components() { component.hash(h); } } } -pub enum Normalize { - Done(NormalPathBuf), // An instance of a fully resolved path. - Pending, // An instance of a path that cannot yet be normalized. -} - -// Find environment variables found within the argument and expand them if -// possible. -// -// Returns `None` in the case an environment variable present within the -// argument is not defined. +/// Find environment variables found within the argument and expand them if +/// possible. +/// +/// Returns `None` in the case an environment variable present within the +/// argument is not defined. fn expand_env(s: &OsStr) -> Option { let re = Regex::new(r"\$(?P[[:alnum:]]+)").unwrap(); let lossy = s.to_string_lossy(); @@ -59,17 +81,14 @@ fn expand_env(s: &OsStr) -> Option { Some(path.into()) } -// Normalizes the provided path, returning a new instance. -// -// There currently does not exist a method that yields some canonical path for -// files that do not exist (at least in the parts of the standard library I've -// looked in). We create a consistent view of every path so as to avoid watching -// the same path multiple times, which would duplicate messages on changes. -// -// Note this does not actually prevent the issue fully. We could have two paths -// that refer to the same real path - normalization would not catch this. -pub fn normalize(path: &Path) -> io::Result { - let mut pb = env::current_dir()?; +/// Attempt to resolve the provided path, returning a fully resolved path +/// instance. +/// +/// If the provided file does not exist but could potentially exist in the +/// future (e.g. for paths with environment variables defined), this will +/// return a `None` instead of an error. +pub fn resolve(path: &Path) -> io::Result> { + let mut expanded = env::current_dir()?; for comp in path.components() { match comp { Component::Prefix(_) => { @@ -79,12 +98,12 @@ pub fn normalize(path: &Path) -> io::Result { )) } Component::RootDir => { - pb.clear(); - pb.push(Component::RootDir) + expanded.clear(); + expanded.push(Component::RootDir) } Component::CurDir => (), // Make no changes. Component::ParentDir => { - if !pb.pop() { + if !expanded.pop() { return Err(io::Error::new( io::ErrorKind::InvalidInput, "Cannot take parent of root.", @@ -92,12 +111,19 @@ pub fn normalize(path: &Path) -> io::Result { } } Component::Normal(c) => match expand_env(c) { - Some(c) => pb.push(Component::Normal(&c)), + Some(c) => expanded.push(Component::Normal(&c)), // The environment variable isn't defined yet but might be in // the future. - None => return Ok(Normalize::Pending), + None => return Ok(None), }, } } - Ok(Normalize::Done(NormalPathBuf(pb))) + match expanded.canonicalize() { + Ok(resolved) => Ok(Some(ResPathBuf { + inner: resolved, + unresolved: path.to_path_buf(), + })), + Err(e) if e.kind() == io::ErrorKind::NotFound => Ok(None), + Err(e) => Err(e), + } }