From 610e575043bfc75feafcce5bddaf7e1a436e5d02 Mon Sep 17 00:00:00 2001 From: Mica White Date: Sun, 7 Dec 2025 14:23:22 -0500 Subject: First commit --- src/pipe.rs | 157 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 157 insertions(+) create mode 100644 src/pipe.rs (limited to 'src/pipe.rs') diff --git a/src/pipe.rs b/src/pipe.rs new file mode 100644 index 0000000..a140ffd --- /dev/null +++ b/src/pipe.rs @@ -0,0 +1,157 @@ +use std::fmt::Debug; +use std::io::{Read, Write}; +use std::num::NonZeroU8; +use std::str::Utf8Error; +use std::sync::{Arc, OnceLock}; + +use happylock::ThreadKey; +use uuid::Uuid; + +use crate::processes::process_id; + +pub trait VirtualFile: Read + Write + Debug + Send + Sync {} +impl VirtualFile for T {} + +#[derive(Debug)] +pub enum MessageField { + File(Box), + Bytes(Box<[u8]>), + Empty, +} + +#[derive(Debug)] +struct ReturnSpace { + message: MessageField, + error_code: Option, +} + +#[derive(Debug, Clone)] +pub struct SharedReturnSpace(Arc>); + +#[derive(Debug)] +pub struct Message { + pub sending_program: Uuid, + pub fields: Box<[MessageField]>, + pub return_space: SharedReturnSpace, +} + +#[derive(Debug)] +pub struct MessageResponse { + return_space: SharedReturnSpace, +} + +#[derive(Debug)] +pub struct MessageError<'a> { + error_code: u8, + message: &'a MessageField, +} + +impl Message { + pub fn new(key: &mut ThreadKey, fields: impl Into>) -> Self { + Self { + sending_program: process_id(key), + fields: fields.into(), + return_space: SharedReturnSpace(Arc::new(OnceLock::new())), + } + } + + pub fn fields(&self) -> &[MessageField] { + &self.fields + } + + pub fn sending_program(&self) -> Uuid { + self.sending_program + } + + pub fn respond_ok(self, value: MessageField) { + self.return_space.respond_ok(value) + } + + pub fn respond_err(self, error_code: NonZeroU8, error_message: MessageField) { + self.return_space.respond_err(error_code, error_message); + } +} + +impl From<&[u8]> for MessageField { + fn from(value: &[u8]) -> Self { + Self::Bytes(value.into()) + } +} + +impl From<&str> for MessageField { + fn from(value: &str) -> Self { + Self::Bytes(value.as_bytes().into()) + } +} + +impl MessageField { + pub fn string(&self) -> Option> { + if let Self::Bytes(bytes) = self { + Some(str::from_utf8(bytes)) + } else { + None + } + } +} + +impl ReturnSpace { + fn as_result<'a>(&'a self) -> Result<&'a MessageField, MessageError<'a>> { + match self.error_code { + None => Ok(&self.message), + Some(error_code) => Err(MessageError { + error_code: error_code.get(), + message: &self.message, + }), + } + } +} + +impl SharedReturnSpace { + fn get(&self) -> Option<&ReturnSpace> { + self.0.get() + } + + fn wait(&self) -> &ReturnSpace { + self.0.wait() + } + + pub fn respond_ok(self, value: MessageField) { + debug_assert!( + self.0 + .set(ReturnSpace { + message: value, + error_code: None + }) + .is_ok(), + "it should not be possible to call this function twice" + ); + } + + pub fn respond_err(self, error_code: NonZeroU8, error_message: MessageField) { + debug_assert!( + self.0 + .set(ReturnSpace { + message: error_message, + error_code: Some(error_code) + }) + .is_ok(), + "it should not be possible to call this function twice" + ); + } +} + +impl MessageResponse { + pub fn from_message(message: &Message) -> Self { + Self { + return_space: message.return_space.clone(), + } + } + + pub fn poll<'a>(&'a self) -> Option>> { + self.return_space.get().map(ReturnSpace::as_result) + } + + pub fn wait<'a>(&'a self) -> Result<&'a MessageField, MessageError<'a>> { + self.return_space.wait().as_result() + } +} -- cgit v1.2.3