remote-block-device-backup/remote-block-device-backup-.../src/lib.rs

591 lines
21 KiB
Rust

#[macro_use]
extern crate derive_new;
mod aes128_xts128_stream;
mod keys;
mod simple_communicator;
use aes128_xts128_stream::Aes128Xts128Stream;
use hex_literal::hex;
use itertools::Itertools;
use serde::{Deserialize, Serialize};
use sha3::Digest;
use sha3::Sha3_256;
use simple_communicator::SimpleCommunicator;
use std::cmp::{max, min};
use std::fs;
use std::io;
use std::io::prelude::*;
use std::net::TcpStream;
use std::os::unix::fs::FileTypeExt;
use std::path::PathBuf;
use std::time;
use std::time::Duration;
pub static MAGIC_SERVER: [u8; 2] = hex!("b949");
pub static MAGIC_CLIENT: [u8; 2] = hex!("f8e8");
#[derive(new, Serialize, Deserialize, Clone, PartialEq, PartialOrd, Eq, Ord, Debug)]
pub struct BlockBackupDefinition {
pub target: String,
pub block_size: u32,
pub check_size: u32,
}
#[derive(new, Serialize, Deserialize, Clone, Copy, PartialEq, PartialOrd, Eq, Ord, Debug)]
pub struct BlockBackupSnapshotBlock {
pub addr_base: u64,
pub addr_last: u64,
pub hash: [u8; 32],
}
#[derive(new, Serialize, Deserialize, Clone, Debug)]
pub struct BlockBackupSnapshot {
pub start_millis: u128,
pub disk_size: u64,
pub blocks: Vec<BlockBackupSnapshotBlock>,
}
pub fn handle_client(
stream: &mut TcpStream,
path: PathBuf,
target: String,
block_size: u32,
check_size: u32,
) {
let time_start = time::SystemTime::now()
.duration_since(time::SystemTime::UNIX_EPOCH)
.unwrap()
.as_millis();
let target_path = path.join(&target);
let current_path = target_path.join("current.json");
let corrupt_path = target_path.join("corrupt.bin");
let hashes_path = target_path.join("hashes");
let snaps_path = target_path.join("snaps");
let defs_path = target_path.join("definitions.json");
let snap_path = snaps_path.join(&format!("{:020}.json", time_start));
{
if corrupt_path.is_file() {
fs::remove_file(&corrupt_path).unwrap();
}
if !path.is_dir() {
fs::create_dir_all(&path).unwrap();
}
if !target_path.is_dir() {
fs::create_dir_all(&target_path).unwrap();
}
if !hashes_path.is_dir() {
fs::create_dir_all(&hashes_path).unwrap();
}
if !snaps_path.is_dir() {
fs::create_dir_all(&snaps_path).unwrap();
}
if defs_path.is_file() {
let bbd: BlockBackupDefinition =
serde_json::from_str(&fs::read_to_string(defs_path).unwrap()).unwrap();
if bbd.target != target {
panic!(
"Specified target {:?} is a folder that was meant to be used for {:?}",
target, bbd.target
);
}
if bbd.block_size != block_size || bbd.check_size != check_size {
return handle_client(stream, path, target, bbd.block_size, bbd.check_size);
}
} else {
fs::write(
defs_path,
serde_json::to_string_pretty(&BlockBackupDefinition::new(
target.clone(),
block_size,
check_size,
))
.unwrap(),
)
.unwrap();
}
}
// Configure socket
{
stream
.set_write_timeout(Some(Duration::from_secs(30)))
.unwrap();
stream
.set_read_timeout(Some(Duration::from_secs(30)))
.unwrap();
stream.set_nodelay(true).unwrap();
}
// Exchange magic bytes
client_exchange_magic(stream);
// Diffie Hellman then AES-XTS
let mut encrypted = {
let curve25519repo = keys::Curve25519Repository::default();
let shared_secret = plain_net_diffie_hellman(stream, &curve25519repo);
Aes128Xts128Stream::from((stream, shared_secret.to_bytes()))
};
// Re-exchange magic bytes for ensuring connection sanity
client_exchange_magic(&mut encrypted);
encrypted.send_string_small(&target).unwrap();
encrypted.send_u32(block_size).unwrap();
encrypted.send_u32(check_size).unwrap();
let target_size = encrypted.recv_u64().unwrap();
let mut snapshot_current = BlockBackupSnapshot::new(time_start, target_size, vec![]);
fs::write(
&current_path,
serde_json::to_string_pretty(&snapshot_current).unwrap(),
)
.unwrap();
let snapshot_lastest_opt: Option<BlockBackupSnapshot> = snaps_path
.read_dir()
.unwrap()
.filter_map(|x| x.ok())
.map(|x| {
x.file_name()
.to_string_lossy()
.split('.')
.next()
.unwrap()
.to_string()
})
.sorted()
.collect::<Vec<String>>()
.last()
.and_then(|x| Some(format!("{}.json", x)))
.and_then(|x| Some(fs::read_to_string(snaps_path.join(x)).unwrap()))
.and_then(|x| Some(serde_json::from_str(&x).unwrap()));
// Re-exchange magic bytes for ensuring connection sanity
client_exchange_magic(&mut encrypted);
loop {
let is_block = encrypted.recv_bool().unwrap();
let addr_base = encrypted.recv_u64().unwrap();
let addr_last = encrypted.recv_u64().unwrap();
if addr_base == u64::MAX && addr_last == u64::MAX {
break;
}
print!("Recv");
print!("Hashed{}", if is_block { "Block" } else { "Check" });
print!(
"(addr_base={:#016X}, addr_last={:#016X}, ",
addr_base, addr_last
);
io::stdout().flush().unwrap();
let (known_hash, known_blocks): (Option<[u8; 32]>, Option<Vec<BlockBackupSnapshotBlock>>) = {
if let Some(snapshot_lastest) = &snapshot_lastest_opt {
let blocks: Vec<BlockBackupSnapshotBlock> = snapshot_lastest
.blocks
.iter()
.filter(|x| x.addr_base >= addr_base)
.filter(|x| x.addr_last <= addr_last)
.sorted()
.dedup()
.copied()
.collect();
if blocks.len() > 0
&& blocks.first().unwrap().addr_base == addr_base
&& blocks.last().unwrap().addr_last == addr_last
{
let chained = (0..(blocks.len() - 1))
.into_iter()
.all(|i| blocks[i].addr_last == blocks[i + 1].addr_base);
let all_present = blocks.iter().all(|x| {
make_hash_segments_tree(&hashes_path, &x.hash, 3)
.join("block.img")
.is_file()
});
if chained && all_present {
let mut hasher = Sha3_256::new();
for block in blocks.iter() {
let v = fs::read(
make_hash_segments_tree(&hashes_path, &block.hash, 3)
.join("block.img"),
)
.unwrap();
hasher.update(v);
}
let mut a = [0u8; 32];
let v = hasher.finalize().to_vec();
if v.len() != 32 {
panic!("SHA3-256 returned {} bytes, expected 32", v.len());
}
a.copy_from_slice(&v);
(Some(a), Some(blocks))
} else {
(None, None)
}
} else {
(None, None)
}
} else {
(None, None)
}
};
let server_hash = encrypted.recv_32bytes().unwrap();
if Some(server_hash) == known_hash {
encrypted.send_bool(false).unwrap();
snapshot_current.blocks.append(&mut known_blocks.unwrap());
print!("stored=true)");
io::stdout().flush().unwrap();
} else {
let hash_path = make_hash_segments_tree(&hashes_path, &server_hash, 3);
if is_block && hash_path.join("block.img").is_file() {
encrypted.send_bool(false).unwrap();
snapshot_current.blocks.push(BlockBackupSnapshotBlock::new(
addr_base,
addr_last,
server_hash,
));
print!("stored=true)");
io::stdout().flush().unwrap();
} else {
encrypted.send_bool(true).unwrap();
print!("stored=false");
io::stdout().flush().unwrap();
if !is_block {
print!(")");
} else {
print!(", data_len=");
io::stdout().flush().unwrap();
let blob = encrypted.recv_bytes_large().unwrap();
if blob.len() != block_size as usize {
panic!(
"Data size corruption error detected: expected {} does not match {}",
block_size,
blob.len(),
)
}
let local_hash = sha3256handy(&blob);
if local_hash != server_hash {
fs::write(corrupt_path, blob).unwrap();
panic!(
"Data corruption error detected: expected {} does not match {}",
base16::encode_upper(&server_hash),
base16::encode_upper(&local_hash),
)
}
print!("{:#016X}", blob.len());
io::stdout().flush().unwrap();
snapshot_current.blocks.push(BlockBackupSnapshotBlock::new(
addr_base,
addr_last,
server_hash,
));
if !hash_path.exists() {
fs::create_dir_all(&hash_path).unwrap();
}
fs::write(hash_path.join("block.img"), blob).unwrap();
fs::write(
&current_path,
serde_json::to_string_pretty(&snapshot_current).unwrap(),
)
.unwrap();
print!(")");
io::stdout().flush().unwrap();
}
}
}
println!();
client_exchange_magic(&mut encrypted);
}
client_exchange_magic(&mut encrypted);
let snapshot_current_serialized = serde_json::to_string_pretty(&snapshot_current).unwrap();
if snapshot_lastest_opt.and_then(|x| Some(x.blocks)) != Some(snapshot_current.blocks) {
fs::write(&snap_path, snapshot_current_serialized).unwrap();
}
fs::remove_file(current_path).unwrap();
encrypted.shutdown(std::net::Shutdown::Both).unwrap();
}
pub fn handle_server(stream: &mut TcpStream) {
// Configure socket
{
stream
.set_write_timeout(Some(Duration::from_secs(30)))
.unwrap();
stream
.set_read_timeout(Some(Duration::from_secs(30)))
.unwrap();
stream.set_nodelay(true).unwrap();
}
// Exchange magic bytes
server_exchange_magic(stream);
let mut encrypted = {
let curve25519repo = keys::Curve25519Repository::default();
let shared_secret = plain_net_diffie_hellman(stream, &curve25519repo);
Aes128Xts128Stream::from((stream, shared_secret.to_bytes()))
};
// Re-exchange magic bytes for ensuring connection sanity
server_exchange_magic(&mut encrypted);
let target_name = encrypted.recv_string_small().unwrap();
let target = PathBuf::from("/dev").join(&target_name);
if !target
.metadata()
.and_then(|x| Ok(x.file_type()))
.and_then(|x| Ok(x.is_block_device()))
.unwrap_or(false)
{
panic!("Block device not found: {:?}", target);
}
let block_size = encrypted.recv_u32().unwrap();
let check_size = encrypted.recv_u32().unwrap();
if check_size < block_size {
panic!("{} must not be less than {}", check_size, block_size);
}
if (check_size % 4096) != 0 {
panic!("{} must be multiple of {}", check_size, 4096);
}
if (block_size % 4096) != 0 {
panic!("{} must be multiple of {}", block_size, 4096);
}
if (check_size % block_size) != 0 {
panic!("{} must be multiple of {}", block_size, block_size);
}
let checks_per_block = check_size / block_size;
let target_size = fs::read_to_string(
PathBuf::from("/sys/class/block")
.join(&target_name)
.join("size"),
)
.unwrap()
.trim()
.parse::<u64>()
.unwrap()
* 512;
encrypted.send_u64(target_size as u64).unwrap();
let check_chunk_count: u64 =
(target_size / check_size as u64) + min(1, max(0, target_size % check_size as u64));
// Re-exchange magic bytes for ensuring connection sanity
server_exchange_magic(&mut encrypted);
for check_chunk_index in 0..check_chunk_count {
let check_chunk_base_addr_this = (check_chunk_index + 0) * check_size as u64;
let check_chunk_last_addr_this = (check_chunk_index + 1) * check_size as u64;
encrypted.send_bool(false).unwrap();
encrypted.send_u64(check_chunk_base_addr_this).unwrap();
encrypted.send_u64(check_chunk_last_addr_this).unwrap();
print!("Send");
print!("HashedCheck");
print!(
"(addr_base={:#016X}, addr_last={:#016X}, ",
check_chunk_base_addr_this, check_chunk_last_addr_this
);
io::stdout().flush().unwrap();
if {
let check_chunk_hash = sha3256handy(&read_disk(
&target,
check_chunk_base_addr_this,
check_size as u64,
));
encrypted.send_32bytes(&check_chunk_hash).unwrap();
let needs_sending_individual_blocks = encrypted.recv_bool().unwrap();
println!("stored={:#})", !needs_sending_individual_blocks);
server_exchange_magic(&mut encrypted);
needs_sending_individual_blocks
} {
for block_chunk_index in 0..checks_per_block {
let block_chunk_base_addr_this = ((block_chunk_index as u64 + 0)
* block_size as u64)
+ check_chunk_base_addr_this;
let block_chunk_last_addr_this = ((block_chunk_index as u64 + 1)
* block_size as u64)
+ check_chunk_base_addr_this;
encrypted.send_bool(true).unwrap();
encrypted.send_u64(block_chunk_base_addr_this).unwrap();
encrypted.send_u64(block_chunk_last_addr_this).unwrap();
print!("Send");
print!("HashedBlock");
print!(
"(addr_base={:#016X}, addr_last={:#016X}, ",
block_chunk_base_addr_this, block_chunk_last_addr_this
);
io::stdout().flush().unwrap();
let block_chunk = read_disk(&target, block_chunk_base_addr_this, block_size as u64);
let block_chunk_hash = sha3256handy(&block_chunk);
encrypted.send_32bytes(&block_chunk_hash).unwrap();
let needs_uploading_block = encrypted.recv_bool().unwrap();
print!("stored={:#}", !needs_uploading_block);
io::stdout().flush().unwrap();
if needs_uploading_block {
print!(", data_len=");
io::stdout().flush().unwrap();
print!("{:#016X}", block_chunk.len());
io::stdout().flush().unwrap();
encrypted.send_bytes_large(&block_chunk).unwrap();
}
println!(")");
server_exchange_magic(&mut encrypted);
}
}
}
encrypted.send_bool(false).unwrap();
encrypted.send_u64(u64::MAX).unwrap();
encrypted.send_u64(u64::MAX).unwrap();
server_exchange_magic(&mut encrypted);
encrypted.shutdown(std::net::Shutdown::Both).unwrap();
}
// pub trait BlockExchangeProtocolState {
// fn assert_compatible_roles(&mut self);
// }
// impl dyn BlockExchangeProtocolState {
// pub fn run(&mut self) {
// self.assert_compatible_roles();
// }
// }
// pub struct BlockExchangeProtocolInitialStateKnowledge<S>
// where
// S: io::Read + io::Write,
// {
// pub stream: Option<TcpStream>,
// pub encrypted: Option<S>,
// pub path: PathBuf,
// pub target: String,
// pub block_size: u32,
// pub check_size: u32,
// }
// pub struct BlockExchangeProtocolStateClient<S>
// where
// S: io::Read + io::Write,
// {
// pub state: BlockExchangeProtocolInitialStateKnowledge<S>,
// }
// pub struct BlockExchangeProtocolStateServer<S>
// where
// S: io::Read + io::Write,
// {
// pub state: BlockExchangeProtocolInitialStateKnowledge<S>,
// }
// impl<S> BlockExchangeProtocolState for BlockExchangeProtocolStateClient<S>
// where
// S: io::Read + io::Write,
// {
// fn assert_compatible_roles(&mut self) {
// match &mut self.state.encrypted {
// Some(e) => client_exchange_magic(e),
// None => client_exchange_magic(&mut (self.state.stream.as_ref().unwrap())),
// };
// }
// }
// impl<S> BlockExchangeProtocolState for BlockExchangeProtocolStateServer<S>
// where
// S: io::Read + io::Write,
// {
// fn assert_compatible_roles(&mut self) {
// match &mut self.state.encrypted {
// Some(e) => client_exchange_magic(e),
// None => client_exchange_magic(&mut self.state.stream.as_ref().unwrap()),
// };
// }
// }
fn make_hash_segments_tree(path: &PathBuf, hash: &[u8], nibbles: u8) -> PathBuf {
let encoded: Vec<String> = base16::encode_upper(hash)
.chars()
.chunks(max(nibbles as usize, 1))
.into_iter()
.map(|x| x.collect::<String>())
.collect();
let mut new_path = path.to_owned();
for enc in encoded {
new_path = new_path.join(enc);
}
new_path
}
fn read_disk(target: &PathBuf, start_addr: u64, read_length: u64) -> Vec<u8> {
let mut v: Vec<u8> = vec![0u8; read_length as usize];
{
let mut disk = fs::File::open(&target).unwrap();
disk.seek(io::SeekFrom::Start(start_addr)).unwrap();
disk.take(read_length as u64).read(&mut v).unwrap();
}
v
}
fn peer_exchange_state<S>(sock: &mut S, state_no: u32, message: Option<&str>)
where
S: io::Read + io::Write,
{
sock.send_u32(state_no).unwrap();
sock.flush().unwrap();
let ost = sock.recv_u32().unwrap();
if ost != state_no {
panic!(
"Local state is {} and remote state is {}: {}",
state_no,
ost,
message.unwrap_or("")
);
};
}
fn client_exchange_magic<S>(sock: &mut S)
where
S: io::Read + io::Write,
{
sock.send_2bytes(&MAGIC_CLIENT).unwrap();
if sock.recv_2bytes().unwrap() != MAGIC_SERVER {
panic!("Magic number mismatch");
};
sock.flush().unwrap();
}
fn server_exchange_magic<S>(sock: &mut S)
where
S: io::Read + io::Write,
{
sock.send_2bytes(&MAGIC_SERVER).unwrap();
if sock.recv_2bytes().unwrap() != MAGIC_CLIENT {
panic!("Magic number mismatch");
};
sock.flush().unwrap();
}
fn crc32handy(v: &[u8]) -> u32 {
let mut hasher = crc32fast::Hasher::default();
hasher.update(v);
hasher.finalize()
}
fn sha3256handy(v: &[u8]) -> [u8; 32] {
let mut hasher = Sha3_256::new();
hasher.update(v);
let mut a = [0u8; 32];
let v = hasher.finalize().to_vec();
if v.len() != 32 {
panic!("SHA3-256 returned {} bytes, expected 32", v.len());
}
a.copy_from_slice(&v);
a
}
fn plain_net_diffie_hellman(
stream: &mut TcpStream,
repo: &keys::Curve25519Repository,
) -> x25519_dalek::SharedSecret {
let key_private_mine = repo.dh_private_load_generating().unwrap();
stream
.send_u32(crc32handy(&key_private_mine.to_bytes()))
.unwrap();
let key_public_their = {
let key_public_their_hash = stream.recv_u32().unwrap();
if let Some(k) = repo.dh_public_authorized(key_public_their_hash) {
k
} else {
panic!(
"Unknown key: {} from {:?}",
base16::encode_upper(&key_public_their_hash.to_be_bytes()),
stream
.peer_addr()
.ok()
.and_then(|x| Some(format!("{}", x)))
.unwrap_or("UNKNOWN".to_string())
)
}
};
key_private_mine.diffie_hellman(&key_public_their)
}