Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
92 changes: 0 additions & 92 deletions crates/earl-core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -120,95 +120,3 @@ pub trait StreamingProtocolExecutor {
sender: tokio::sync::mpsc::Sender<StreamChunk>,
) -> impl Future<Output = anyhow::Result<StreamMeta>> + Send;
}

#[cfg(test)]
mod tests {
use super::*;

#[test]
fn stream_chunk_can_be_created() {
let chunk = StreamChunk {
data: b"hello".to_vec(),
content_type: Some("application/json".to_string()),
};
assert_eq!(chunk.data, b"hello");
assert_eq!(chunk.content_type.as_deref(), Some("application/json"));
}

#[test]
fn stream_meta_can_be_created() {
let meta = StreamMeta {
status: 200,
url: "https://example.com".to_string(),
};
assert_eq!(meta.status, 200);
}
}

#[cfg(test)]
mod streaming_tests {
use super::*;
use serde_json::Map;
use std::time::Duration;
use tokio::sync::mpsc;

struct MockStreamExecutor;

impl StreamingProtocolExecutor for MockStreamExecutor {
type PreparedData = String;

async fn execute_stream(
&mut self,
_data: &String,
_context: &ExecutionContext,
sender: mpsc::Sender<StreamChunk>,
) -> anyhow::Result<StreamMeta> {
sender
.send(StreamChunk {
data: b"chunk1".to_vec(),
content_type: None,
})
.await
.unwrap();
Ok(StreamMeta {
status: 200,
url: "https://example.com".to_string(),
})
}
}

#[tokio::test]
async fn mock_streaming_executor_sends_chunks() {
let (tx, mut rx) = mpsc::channel(16);
let mut executor = MockStreamExecutor;
let context = ExecutionContext {
key: "test".to_string(),
mode: CommandMode::Read,
allow_rules: vec![],
transport: ResolvedTransport {
timeout: Duration::from_secs(30),
follow_redirects: true,
max_redirect_hops: 10,
retry_max_attempts: 0,
retry_backoff: Duration::from_millis(100),
retry_on_status: vec![],
compression: false,
tls_min_version: None,
proxy_url: None,
max_response_bytes: 10_000_000,
},
result_template: ResultTemplate::default(),
args: Map::new(),
redactor: Redactor::new(vec![]),
};

let meta = executor
.execute_stream(&"test".to_string(), &context, tx)
.await
.unwrap();

assert_eq!(meta.status, 200);
let chunk = rx.recv().await.unwrap();
assert_eq!(chunk.data, b"chunk1");
}
}
19 changes: 0 additions & 19 deletions crates/earl-protocol-bash/src/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,22 +37,3 @@ pub struct BashSandboxTemplate {
pub max_memory_bytes: Option<u64>,
pub max_cpu_time_ms: Option<u64>,
}

#[cfg(test)]
mod tests {
use super::*;

#[test]
fn bash_operation_defaults_stream_false() {
let json = r#"{"bash":{"script":"echo hello"}}"#;
let op: BashOperationTemplate = serde_json::from_str(json).unwrap();
assert!(!op.stream);
}

#[test]
fn bash_operation_accepts_stream_true() {
let json = r#"{"stream":true,"bash":{"script":"echo hello"}}"#;
let op: BashOperationTemplate = serde_json::from_str(json).unwrap();
assert!(op.stream);
}
}
19 changes: 0 additions & 19 deletions crates/earl-protocol-grpc/src/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,22 +29,3 @@ pub struct GrpcTemplate {
pub body: Option<Value>,
pub descriptor_set_file: Option<String>,
}

#[cfg(test)]
mod tests {
use super::*;

#[test]
fn grpc_operation_defaults_stream_false() {
let json = r#"{"url":"https://example.com","grpc":{"service":"test.Svc","method":"Call"}}"#;
let op: GrpcOperationTemplate = serde_json::from_str(json).unwrap();
assert!(!op.stream);
}

#[test]
fn grpc_operation_accepts_stream_true() {
let json = r#"{"url":"https://example.com","stream":true,"grpc":{"service":"test.Svc","method":"Call"}}"#;
let op: GrpcOperationTemplate = serde_json::from_str(json).unwrap();
assert!(op.stream);
}
}
34 changes: 0 additions & 34 deletions crates/earl-protocol-http/src/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,37 +54,3 @@ pub struct GraphqlTemplate {
#[rkyv(with = AsJson)]
pub variables: Option<Value>,
}

#[cfg(test)]
mod tests {
use super::*;

#[test]
fn http_operation_defaults_stream_false() {
let json = r#"{"method":"GET","url":"https://example.com"}"#;
let op: HttpOperationTemplate = serde_json::from_str(json).unwrap();
assert!(!op.stream);
}

#[test]
fn http_operation_accepts_stream_true() {
let json = r#"{"method":"GET","url":"https://example.com","stream":true}"#;
let op: HttpOperationTemplate = serde_json::from_str(json).unwrap();
assert!(op.stream);
}

#[test]
fn graphql_operation_defaults_stream_false() {
let json = r#"{"url":"https://example.com","graphql":{"query":"{ users { id } }"}}"#;
let op: GraphqlOperationTemplate = serde_json::from_str(json).unwrap();
assert!(!op.stream);
}

#[test]
fn graphql_operation_accepts_stream_true() {
let json =
r#"{"url":"https://example.com","stream":true,"graphql":{"query":"{ users { id } }"}}"#;
let op: GraphqlOperationTemplate = serde_json::from_str(json).unwrap();
assert!(op.stream);
}
}
45 changes: 27 additions & 18 deletions crates/earl-protocol-http/src/sse.rs
Original file line number Diff line number Diff line change
Expand Up @@ -116,77 +116,75 @@ mod tests {
use super::*;

#[test]
fn parses_simple_data_event() {
fn single_data_line_returned_as_event_data() {
let input = "data: hello world\n\n";
let events = SseParser::new().feed(input);
assert_eq!(events.len(), 1);
assert_eq!(events[0].data, "hello world");
}

#[test]
fn parses_multiline_data_event() {
fn multiple_data_lines_joined_with_newline() {
let input = "data: line1\ndata: line2\n\n";
let events = SseParser::new().feed(input);
assert_eq!(events.len(), 1);
assert_eq!(events[0].data, "line1\nline2");
}

#[test]
fn parses_event_with_type() {
fn event_field_sets_event_type() {
let input = "event: update\ndata: {\"key\":\"value\"}\n\n";
let events = SseParser::new().feed(input);
assert_eq!(events.len(), 1);
assert_eq!(events[0].event_type.as_deref(), Some("update"));
assert_eq!(events[0].data, "{\"key\":\"value\"}");
}

#[test]
fn skips_comments() {
fn comment_lines_excluded_from_event_data() {
let input = ": this is a comment\ndata: actual data\n\n";
let events = SseParser::new().feed(input);
assert_eq!(events.len(), 1);
assert_eq!(events[0].data, "actual data");
}

#[test]
fn handles_multiple_events() {
fn multiple_complete_events_all_returned() {
let input = "data: event1\n\ndata: event2\n\n";
let events = SseParser::new().feed(input);
assert_eq!(events.len(), 2);
}

#[test]
fn parses_event_with_id() {
fn id_field_sets_event_id() {
let input = "id: 42\ndata: payload\n\n";
let events = SseParser::new().feed(input);
assert_eq!(events.len(), 1);
assert_eq!(events[0].id.as_deref(), Some("42"));
assert_eq!(events[0].data, "payload");
}

#[test]
fn handles_no_space_after_colon() {
fn no_space_after_colon_data_is_parsed() {
let input = "data:no-space\n\n";
let events = SseParser::new().feed(input);
assert_eq!(events.len(), 1);
assert_eq!(events[0].data, "no-space");
}

#[test]
fn ignores_block_without_data() {
fn block_without_data_field_produces_no_event() {
let input = "event: ping\n\n";
let events = SseParser::new().feed(input);
assert!(events.is_empty());
}

#[test]
fn handles_empty_input() {
fn empty_input_returns_no_events() {
let events = SseParser::new().feed("");
assert!(events.is_empty());
}

#[test]
fn event_split_across_chunks() {
fn event_split_across_chunks_buffered_until_complete() {
let mut parser = SseParser::new();

// First chunk contains the beginning of the event but no blank-line terminator.
Expand All @@ -200,16 +198,23 @@ mod tests {
}

#[test]
fn handles_crlf_line_endings() {
fn crlf_line_endings_parse_event_type() {
let input = "event: update\r\ndata: payload\r\n\r\n";
let events = SseParser::new().feed(input);
assert_eq!(events.len(), 1);
assert_eq!(events[0].event_type.as_deref(), Some("update"));
}

#[test]
fn crlf_line_endings_parse_data() {
let input = "event: update\r\ndata: payload\r\n\r\n";
let events = SseParser::new().feed(input);
assert_eq!(events.len(), 1);
assert_eq!(events[0].data, "payload");
}

#[test]
fn flush_trailing_event() {
fn trailing_data_without_terminator_emitted_on_flush() {
let mut parser = SseParser::new();

// Feed an event that is NOT terminated by a blank line.
Expand All @@ -222,15 +227,19 @@ mod tests {
}

#[test]
fn multiple_feed_calls() {
fn complete_event_before_partial_in_same_feed_emitted_immediately() {
let mut parser = SseParser::new();

// First feed: one complete event and start of another.
let events = parser.feed("data: first\n\ndata: sec");
assert_eq!(events.len(), 1);
assert_eq!(events[0].data, "first");
}

// Second feed: finish the second event and deliver a third.
#[test]
fn subsequent_feed_completes_partial_and_returns_additional_events() {
let mut parser = SseParser::new();
// Setup: buffer a partial event.
parser.feed("data: first\n\ndata: sec");
// Second feed completes the partial and delivers a further event.
let events = parser.feed("ond\n\ndata: third\n\n");
assert_eq!(events.len(), 2);
assert_eq!(events[0].data, "second");
Expand Down
Loading
Loading