summaryrefslogtreecommitdiff
path: root/src/pipe.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/pipe.rs')
-rw-r--r--src/pipe.rs157
1 files changed, 157 insertions, 0 deletions
diff --git a/src/pipe.rs b/src/pipe.rs
new file mode 100644
index 0000000..a140ffd
--- /dev/null
+++ b/src/pipe.rs
@@ -0,0 +1,157 @@
+use std::fmt::Debug;
+use std::io::{Read, Write};
+use std::num::NonZeroU8;
+use std::str::Utf8Error;
+use std::sync::{Arc, OnceLock};
+
+use happylock::ThreadKey;
+use uuid::Uuid;
+
+use crate::processes::process_id;
+
+pub trait VirtualFile: Read + Write + Debug + Send + Sync {}
+impl<T: Read + Write + Debug + Send + Sync> VirtualFile for T {}
+
+#[derive(Debug)]
+pub enum MessageField {
+ File(Box<dyn VirtualFile>),
+ Bytes(Box<[u8]>),
+ Empty,
+}
+
+#[derive(Debug)]
+struct ReturnSpace {
+ message: MessageField,
+ error_code: Option<NonZeroU8>,
+}
+
+#[derive(Debug, Clone)]
+pub struct SharedReturnSpace(Arc<OnceLock<ReturnSpace>>);
+
+#[derive(Debug)]
+pub struct Message {
+ pub sending_program: Uuid,
+ pub fields: Box<[MessageField]>,
+ pub return_space: SharedReturnSpace,
+}
+
+#[derive(Debug)]
+pub struct MessageResponse {
+ return_space: SharedReturnSpace,
+}
+
+#[derive(Debug)]
+pub struct MessageError<'a> {
+ error_code: u8,
+ message: &'a MessageField,
+}
+
+impl Message {
+ pub fn new(key: &mut ThreadKey, fields: impl Into<Box<[MessageField]>>) -> Self {
+ Self {
+ sending_program: process_id(key),
+ fields: fields.into(),
+ return_space: SharedReturnSpace(Arc::new(OnceLock::new())),
+ }
+ }
+
+ pub fn fields(&self) -> &[MessageField] {
+ &self.fields
+ }
+
+ pub fn sending_program(&self) -> Uuid {
+ self.sending_program
+ }
+
+ pub fn respond_ok(self, value: MessageField) {
+ self.return_space.respond_ok(value)
+ }
+
+ pub fn respond_err(self, error_code: NonZeroU8, error_message: MessageField) {
+ self.return_space.respond_err(error_code, error_message);
+ }
+}
+
+impl From<&[u8]> for MessageField {
+ fn from(value: &[u8]) -> Self {
+ Self::Bytes(value.into())
+ }
+}
+
+impl From<&str> for MessageField {
+ fn from(value: &str) -> Self {
+ Self::Bytes(value.as_bytes().into())
+ }
+}
+
+impl MessageField {
+ pub fn string(&self) -> Option<Result<&str, Utf8Error>> {
+ if let Self::Bytes(bytes) = self {
+ Some(str::from_utf8(bytes))
+ } else {
+ None
+ }
+ }
+}
+
+impl ReturnSpace {
+ fn as_result<'a>(&'a self) -> Result<&'a MessageField, MessageError<'a>> {
+ match self.error_code {
+ None => Ok(&self.message),
+ Some(error_code) => Err(MessageError {
+ error_code: error_code.get(),
+ message: &self.message,
+ }),
+ }
+ }
+}
+
+impl SharedReturnSpace {
+ fn get(&self) -> Option<&ReturnSpace> {
+ self.0.get()
+ }
+
+ fn wait(&self) -> &ReturnSpace {
+ self.0.wait()
+ }
+
+ pub fn respond_ok(self, value: MessageField) {
+ debug_assert!(
+ self.0
+ .set(ReturnSpace {
+ message: value,
+ error_code: None
+ })
+ .is_ok(),
+ "it should not be possible to call this function twice"
+ );
+ }
+
+ pub fn respond_err(self, error_code: NonZeroU8, error_message: MessageField) {
+ debug_assert!(
+ self.0
+ .set(ReturnSpace {
+ message: error_message,
+ error_code: Some(error_code)
+ })
+ .is_ok(),
+ "it should not be possible to call this function twice"
+ );
+ }
+}
+
+impl MessageResponse {
+ pub fn from_message(message: &Message) -> Self {
+ Self {
+ return_space: message.return_space.clone(),
+ }
+ }
+
+ pub fn poll<'a>(&'a self) -> Option<Result<&'a MessageField, MessageError<'a>>> {
+ self.return_space.get().map(ReturnSpace::as_result)
+ }
+
+ pub fn wait<'a>(&'a self) -> Result<&'a MessageField, MessageError<'a>> {
+ self.return_space.wait().as_result()
+ }
+}