Setup daemon tracking.

Track original path before resolution for later git copying and
messaging.
pull/3/head
Joshua Potter 2021-12-30 16:53:29 -05:00
parent 4fa6654423
commit 7db0656e89
6 changed files with 268 additions and 126 deletions

View File

@ -10,7 +10,7 @@ use std::io::Write;
pub fn write_config(mut pending: PathConfig) -> config::Result<()> {
println!(
"Generating config at {}...\n",
Success.paint(pending.0.display().to_string())
Success.paint(pending.0.unresolved().display().to_string())
);
print!(
@ -45,9 +45,9 @@ pub fn write_config(mut pending: PathConfig) -> config::Result<()> {
pub fn list_packages(config: PathConfig) {
println!(
"Listing packages in {}...\n",
Success.paint(config.0.display().to_string())
Success.paint(config.0.unresolved().display().to_string())
);
// TODO(jrpotter): Alphabetize the output list.
// Alphabetical ordered ensured by B-tree implementation.
for (k, _) in config.1.packages {
println!("{}", k);
}

View File

@ -1,7 +1,6 @@
use super::path;
use super::path::{NormalPathBuf, Normalize};
use super::path::ResPathBuf;
use serde_derive::{Deserialize, Serialize};
use std::collections::HashMap;
use std::collections::BTreeMap;
use std::io::Write;
use std::path::PathBuf;
use std::{error, fmt, fs, io};
@ -61,7 +60,7 @@ pub struct Package {
#[derive(Debug, Deserialize, Serialize)]
pub struct Config {
pub remote: Remote,
pub packages: HashMap<String, Package>,
pub packages: BTreeMap<String, Package>,
}
impl Config {
@ -71,10 +70,10 @@ impl Config {
}
#[derive(Debug)]
pub struct PathConfig(pub NormalPathBuf, pub Config);
pub struct PathConfig(pub ResPathBuf, pub Config);
impl PathConfig {
pub fn new(path: &NormalPathBuf, config: Option<Config>) -> Self {
pub fn new(path: &ResPathBuf, config: Option<Config>) -> Self {
PathConfig(
path.clone(),
config.unwrap_or(Config {
@ -82,7 +81,7 @@ impl PathConfig {
owner: "example-user".to_owned(),
name: "home-config".to_owned(),
},
packages: HashMap::new(),
packages: BTreeMap::new(),
}),
)
}
@ -111,7 +110,7 @@ pub fn default_paths() -> Vec<PathBuf> {
DEFAULT_PATHS.iter().map(|s| PathBuf::from(s)).collect()
}
pub fn load(candidates: &Vec<NormalPathBuf>) -> Result<PathConfig> {
pub fn load(candidates: &Vec<ResPathBuf>) -> Result<PathConfig> {
// When trying our paths, the only acceptable error is a `NotFound` file.
// Anything else should be surfaced to the end user.
for candidate in candidates {
@ -126,3 +125,9 @@ pub fn load(candidates: &Vec<NormalPathBuf>) -> Result<PathConfig> {
}
Err(Error::MissingConfig)
}
pub fn reload(config: &PathConfig) -> Result<PathConfig> {
// TODO(jrpotter): Let's add a proper logging solution.
println!("Configuration reloaded.");
load(&vec![config.0.clone()])
}

View File

@ -1,99 +1,210 @@
use super::config;
use super::config::PathConfig;
use super::path;
use super::path::{NormalPathBuf, Normalize};
use notify::{RecommendedWatcher, Watcher};
use super::path::ResPathBuf;
use notify::{DebouncedEvent, RecommendedWatcher, RecursiveMode, Watcher};
use std::collections::HashSet;
use std::error::Error;
use std::path::PathBuf;
use std::sync::mpsc::channel;
use std::sync::mpsc::{Receiver, Sender, TryRecvError};
use std::thread;
use std::time::Duration;
// TODO(jrpotter): Add logging.
// TODO(jrpotter): Add pid file to only allow one daemon at a time.
// Used for both polling and debouncing file system events.
const WATCH_FREQUENCY: Duration = Duration::from_secs(5);
// ========================================
// State
// Polling
// ========================================
struct WatchState {
// Paths that we were not able to watch properly but could potentially do so
// in the future. These include paths that did not exist at the time of
// canonicalization or did not have environment variables defined that may
// be defined later on.
pending: HashSet<PathBuf>,
// Paths that we are currently watching.
watching: HashSet<NormalPathBuf>,
// Paths that are not valid and will never become valid. These may include
// paths that include prefixes or refer to directories that could never be
// reached (e.g. parent of root).
invalid: HashSet<PathBuf>,
enum PollEvent {
Pending(PathBuf),
Clear,
}
impl WatchState {
pub fn new(config: &PathConfig) -> notify::Result<Self> {
let mut pending: HashSet<PathBuf> = HashSet::new();
let mut watching: HashSet<NormalPathBuf> = HashSet::new();
watching.insert(config.0.clone());
// We try and resolve our configuration again here. We want to
// specifically track any new configs that may pop up with higher
// priority.
for path in config::default_paths() {
match path::normalize(&path)? {
// TODO(jrpotter): Check if the path can be canonicalized.
Normalize::Done(p) => watching.insert(p),
Normalize::Pending => pending.insert(path),
};
fn resolve_pending(tx: &Sender<DebouncedEvent>, pending: &HashSet<PathBuf>) -> Vec<PathBuf> {
let mut to_remove = vec![];
for path in pending {
match path::resolve(&path) {
Ok(Some(resolved)) => {
to_remove.push(path.clone());
tx.send(DebouncedEvent::Create(resolved.into()))
.expect("File watcher channel closed.");
}
Ok(None) => (),
Err(e) => {
to_remove.push(path.clone());
eprintln!(
"Encountered unexpected error {} when processing path {}",
e,
path.display()
)
}
}
}
to_remove
}
fn poll_pending(tx: Sender<DebouncedEvent>, rx: Receiver<PollEvent>) {
let mut pending = HashSet::new();
loop {
match rx.try_recv() {
Ok(PollEvent::Pending(path)) => {
pending.insert(path);
}
Ok(PollEvent::Clear) => pending.clear(),
Err(TryRecvError::Empty) => {
resolve_pending(&tx, &pending).iter().for_each(|r| {
pending.remove(r);
});
thread::sleep(WATCH_FREQUENCY);
}
Err(TryRecvError::Disconnected) => panic!("Polling channel closed."),
}
}
}
// ========================================
// File Watcher
// ========================================
struct WatchState<'a> {
poll_tx: Sender<PollEvent>,
watcher: &'a mut RecommendedWatcher,
watching: HashSet<ResPathBuf>,
}
impl<'a> WatchState<'a> {
pub fn new(
poll_tx: Sender<PollEvent>,
watcher: &'a mut RecommendedWatcher,
) -> notify::Result<Self> {
Ok(WatchState {
pending,
watching,
invalid: HashSet::new(),
poll_tx,
watcher,
watching: HashSet::new(),
})
}
fn send_poll(&self, event: PollEvent) {
self.poll_tx.send(event).expect("Polling channel closed.");
}
fn watch(&mut self, path: ResPathBuf) {
match self.watcher.watch(&path, RecursiveMode::NonRecursive) {
Ok(()) => {
self.watching.insert(path);
}
Err(e) => {
eprintln!(
"Encountered unexpected error {} when watching path {}",
e,
path.unresolved().display()
);
}
}
}
/// Reads in the new path config, updating all watched and pending files
/// according to the packages in the specified config.
pub fn update(&mut self, config: &PathConfig) {
self.send_poll(PollEvent::Clear);
for path in &self.watching {
match self.watcher.unwatch(&path) {
Ok(()) => (),
Err(e) => {
eprintln!(
"Encountered unexpected error {} when unwatching path {}",
e,
path.unresolved().display()
);
}
}
}
self.watching.clear();
for (_, package) in &config.1.packages {
for path in &package.configs {
match path::resolve(&path) {
Ok(None) => self.send_poll(PollEvent::Pending(path.clone())),
Ok(Some(n)) => self.watch(n),
Err(_) => (),
}
}
}
}
}
// ========================================
// Daemon
// ========================================
fn reload_config(config: &PathConfig) -> notify::Result<WatchState> {
let state = WatchState::new(config)?;
Ok(state)
}
pub fn launch(config: PathConfig) -> notify::Result<()> {
let (tx, rx) = channel();
// Create a "debounced" watcher. Events will not trigger until after the
// specified duration has passed with no additional changes.
let mut watcher: RecommendedWatcher = Watcher::new(tx, Duration::from_secs(2))?;
// for (_, package) in &config.1.packages {
// for path in &package.configs {
// match normalize_path(&path) {
// // `notify-rs` is not able to handle files that do not exist and
// // are then created. This is handled internally by the library
// // via the `fs::canonicalize` which fails on missing paths. So
// // track which paths end up missing and apply polling on them.
// Ok(normalized) => match watcher.watch(&normalized, RecursiveMode::NonRecursive) {
// Ok(_) => {
// tracked_paths.insert(normalized);
// }
// Err(notify::Error::PathNotFound) => {
// missing_paths.insert(normalized);
// }
// Err(e) => return Err(e),
// },
// // TODO(jrpotter): Retry even in cases where environment
// // variables are not defined.
// Err(e) => eprintln!("{}", e),
// };
// }
// }
// This is a simple loop, but you may want to use more complex logic here,
// for example to handle I/O.
pub fn launch(mut config: PathConfig) -> Result<(), Box<dyn Error>> {
let (poll_tx, poll_rx) = channel();
let (watch_tx, watch_rx) = channel();
let watch_tx1 = watch_tx.clone();
// `notify-rs` internally uses `fs::canonicalize` on each path we try to
// watch, but this fails if no file exists at the given path. In these
// cases, we rely on a basic polling strategy to check if the files ever
// come into existence.
thread::spawn(move || poll_pending(watch_tx, poll_rx));
// Track our original config file separately from the other files that may
// be defined in the config. We want to make sure we're always alerted on
// changes to it for hot reloading purposes, and not worry that our wrapper
// will ever clear it from its watch state.
let mut watcher: RecommendedWatcher = Watcher::new(watch_tx1, WATCH_FREQUENCY)?;
watcher.watch(&config.0, RecursiveMode::NonRecursive)?;
let mut state = WatchState::new(poll_tx, &mut watcher)?;
state.update(&config);
loop {
match rx.recv() {
Ok(event) => println!("{:?}", event),
Err(e) => println!("watch error: {:?}", e),
// Received paths should always be the fully resolved ones so safe to
// compare against our current config path.
match watch_rx.recv() {
Ok(DebouncedEvent::NoticeWrite(_)) => {
// Intentionally ignore in favor of stronger signals.
}
Ok(DebouncedEvent::NoticeRemove(_)) => {
// Intentionally ignore in favor of stronger signals.
}
Ok(DebouncedEvent::Create(p)) => {
if config.0 == p {
config = config::reload(&config)?;
state.update(&config);
}
println!("Create {}", p.display());
}
Ok(DebouncedEvent::Write(p)) => {
if config.0 == p {
config = config::reload(&config)?;
state.update(&config);
}
println!("Write {}", p.display());
}
// Do not try reloading our primary config in any of the following
// cases since it may lead to undesired behavior. If our config has
// e.g. been removed, let's just keep using what we have in memory
// in the chance it may be added back.
Ok(DebouncedEvent::Chmod(p)) => {
println!("Chmod {}", p.display());
}
Ok(DebouncedEvent::Remove(p)) => {
println!("Remove {}", p.display());
}
Ok(DebouncedEvent::Rename(src, dst)) => {
println!("Rename {} {}", src.display(), dst.display())
}
Ok(DebouncedEvent::Rescan) => {
println!("Rescanning");
}
Ok(DebouncedEvent::Error(e, _maybe_path)) => {
println!("Error {}", e);
}
Err(e) => {
println!("watch error: {:?}", e);
}
}
}
}

View File

@ -4,7 +4,7 @@ pub mod daemon;
pub mod path;
use config::PathConfig;
use path::NormalPathBuf;
use path::ResPathBuf;
use std::error::Error;
use std::io;
@ -18,7 +18,7 @@ pub fn run_daemon(config: PathConfig) -> Result<(), Box<dyn Error>> {
Ok(())
}
pub fn run_init(candidates: Vec<NormalPathBuf>) -> Result<(), config::Error> {
pub fn run_init(candidates: Vec<ResPathBuf>) -> Result<(), config::Error> {
debug_assert!(!candidates.is_empty(), "Empty candidates found in `init`.");
if candidates.is_empty() {
return Err(config::Error::FileError(io::Error::new(

View File

@ -1,5 +1,5 @@
use clap::{App, AppSettings, Arg};
use homesync::path::{NormalPathBuf, Normalize};
use homesync::path::ResPathBuf;
use std::error::Error;
use std::io;
use std::path::PathBuf;
@ -54,18 +54,18 @@ fn dispatch(matches: clap::ArgMatches) -> Result<(), Box<dyn Error>> {
}
}
fn find_candidates(matches: &clap::ArgMatches) -> Result<Vec<NormalPathBuf>, Box<dyn Error>> {
fn find_candidates(matches: &clap::ArgMatches) -> Result<Vec<ResPathBuf>, io::Error> {
let candidates = match matches.value_of("config") {
Some(config_match) => vec![PathBuf::from(config_match)],
None => homesync::config::default_paths(),
};
let mut normals = vec![];
let mut resolved = vec![];
for candidate in candidates {
if let Ok(Normalize::Done(n)) = homesync::path::normalize(&candidate) {
normals.push(n);
if let Ok(Some(r)) = homesync::path::resolve(&candidate) {
resolved.push(r);
}
}
if normals.is_empty() {
if resolved.is_empty() {
if let Some(config_match) = matches.value_of("config") {
Err(io::Error::new(
io::ErrorKind::NotFound,
@ -79,6 +79,6 @@ fn find_candidates(matches: &clap::ArgMatches) -> Result<Vec<NormalPathBuf>, Box
))?
}
} else {
Ok(normals)
Ok(resolved)
}
}

View File

@ -9,45 +9,67 @@ use std::path::{Component, Path, PathBuf};
// Path
// ========================================
#[derive(Clone, Debug, Eq, PartialEq)]
pub struct NormalPathBuf(PathBuf);
#[derive(Clone, Debug)]
pub struct ResPathBuf {
inner: PathBuf,
unresolved: PathBuf,
}
impl NormalPathBuf {
impl ResPathBuf {
pub fn display(&self) -> std::path::Display {
self.0.display()
self.inner.display()
}
pub fn unresolved(&self) -> &PathBuf {
return &self.unresolved;
}
}
impl AsRef<Path> for NormalPathBuf {
impl PartialEq<ResPathBuf> for ResPathBuf {
fn eq(&self, other: &Self) -> bool {
self.inner == other.inner
}
}
impl PartialEq<PathBuf> for ResPathBuf {
fn eq(&self, other: &PathBuf) -> bool {
self.inner == *other
}
}
impl Eq for ResPathBuf {}
impl From<ResPathBuf> for PathBuf {
fn from(path: ResPathBuf) -> PathBuf {
path.inner
}
}
impl AsRef<Path> for ResPathBuf {
fn as_ref(&self) -> &Path {
&self.0
&self.inner
}
}
impl AsRef<PathBuf> for NormalPathBuf {
impl AsRef<PathBuf> for ResPathBuf {
fn as_ref(&self) -> &PathBuf {
&self.0
&self.inner
}
}
impl Hash for NormalPathBuf {
impl Hash for ResPathBuf {
fn hash<H: Hasher>(&self, h: &mut H) {
for component in self.0.components() {
for component in self.inner.components() {
component.hash(h);
}
}
}
pub enum Normalize {
Done(NormalPathBuf), // An instance of a fully resolved path.
Pending, // An instance of a path that cannot yet be normalized.
}
// Find environment variables found within the argument and expand them if
// possible.
//
// Returns `None` in the case an environment variable present within the
// argument is not defined.
/// Find environment variables found within the argument and expand them if
/// possible.
///
/// Returns `None` in the case an environment variable present within the
/// argument is not defined.
fn expand_env(s: &OsStr) -> Option<OsString> {
let re = Regex::new(r"\$(?P<env>[[:alnum:]]+)").unwrap();
let lossy = s.to_string_lossy();
@ -59,17 +81,14 @@ fn expand_env(s: &OsStr) -> Option<OsString> {
Some(path.into())
}
// Normalizes the provided path, returning a new instance.
//
// There currently does not exist a method that yields some canonical path for
// files that do not exist (at least in the parts of the standard library I've
// looked in). We create a consistent view of every path so as to avoid watching
// the same path multiple times, which would duplicate messages on changes.
//
// Note this does not actually prevent the issue fully. We could have two paths
// that refer to the same real path - normalization would not catch this.
pub fn normalize(path: &Path) -> io::Result<Normalize> {
let mut pb = env::current_dir()?;
/// Attempt to resolve the provided path, returning a fully resolved path
/// instance.
///
/// If the provided file does not exist but could potentially exist in the
/// future (e.g. for paths with environment variables defined), this will
/// return a `None` instead of an error.
pub fn resolve(path: &Path) -> io::Result<Option<ResPathBuf>> {
let mut expanded = env::current_dir()?;
for comp in path.components() {
match comp {
Component::Prefix(_) => {
@ -79,12 +98,12 @@ pub fn normalize(path: &Path) -> io::Result<Normalize> {
))
}
Component::RootDir => {
pb.clear();
pb.push(Component::RootDir)
expanded.clear();
expanded.push(Component::RootDir)
}
Component::CurDir => (), // Make no changes.
Component::ParentDir => {
if !pb.pop() {
if !expanded.pop() {
return Err(io::Error::new(
io::ErrorKind::InvalidInput,
"Cannot take parent of root.",
@ -92,12 +111,19 @@ pub fn normalize(path: &Path) -> io::Result<Normalize> {
}
}
Component::Normal(c) => match expand_env(c) {
Some(c) => pb.push(Component::Normal(&c)),
Some(c) => expanded.push(Component::Normal(&c)),
// The environment variable isn't defined yet but might be in
// the future.
None => return Ok(Normalize::Pending),
None => return Ok(None),
},
}
}
Ok(Normalize::Done(NormalPathBuf(pb)))
match expanded.canonicalize() {
Ok(resolved) => Ok(Some(ResPathBuf {
inner: resolved,
unresolved: path.to_path_buf(),
})),
Err(e) if e.kind() == io::ErrorKind::NotFound => Ok(None),
Err(e) => Err(e),
}
}