Skip to content

Commit

Permalink
Add Charset functionality to SSE reply.
Browse files Browse the repository at this point in the history
  • Loading branch information
kouta-kun committed Mar 25, 2024
1 parent 7b07043 commit d38f7ed
Showing 1 changed file with 91 additions and 2 deletions.
93 changes: 91 additions & 2 deletions src/filters/sse.rs
Original file line number Diff line number Diff line change
Expand Up @@ -315,12 +315,98 @@ where
S: TryStream<Ok = Event> + Send + 'static,
S::Error: StdError + Send + Sync + 'static,
{
SseReply { event_stream }
SseReply { event_stream, charset: None }
}

/// Server-sent events reply (with explicit charset)
///
/// This function converts stream of server events into a `Reply` with:
///
/// - Status of `200 OK`
/// - Header `content-type: text/event-stream; charset=*charset*`
/// - Header `cache-control: no-cache`.
///
/// # Example
///
/// ```
///
/// use std::time::Duration;
/// use futures_util::Stream;
/// use futures_util::stream::iter;
/// use std::convert::Infallible;
/// use warp::{Filter, sse::Event};
/// use serde_derive::Serialize;
///
/// #[derive(Serialize)]
/// struct Msg {
/// from: u32,
/// text: String,
/// }
///
/// fn event_stream() -> impl Stream<Item = Result<Event, Infallible>> {
/// iter(vec![
/// // Unnamed event with data only
/// Ok(Event::default().data("payload")),
/// // Named event with ID and retry timeout
/// Ok(
/// Event::default().data("other message\nwith next line")
/// .event("chat")
/// .id(1.to_string())
/// .retry(Duration::from_millis(15000))
/// ),
/// // Event with JSON data
/// Ok(
/// Event::default().id(2.to_string())
/// .json_data(Msg {
/// from: 2,
/// text: "hello".into(),
/// }).unwrap(),
/// )
/// ])
/// }
///
/// async {
/// let app = warp::path("sse").and(warp::get()).map(|| {
/// warp::sse::reply_with_charset(event_stream(), "utf-8".to_string())
/// });
///
/// let res = warp::test::request()
/// .method("GET")
/// .header("Connection", "Keep-Alive")
/// .path("/sse")
/// .reply(&app)
/// .await
/// .into_body();
///
/// assert_eq!(
/// res,
/// r#"data:payload
///
/// event:chat
/// data:other message
/// data:with next line
/// id:1
/// retry:15000
///
/// data:{"from":2,"text":"hello"}
/// id:2
///
/// "#
/// );
/// };
/// ```
pub fn reply_with_charset<S>(event_stream: S, charset: String) -> impl Reply
where
S: TryStream<Ok = Event> + Send + 'static,
S::Error: StdError + Send + Sync + 'static,
{
SseReply { event_stream, charset: Some(charset) }
}

#[allow(missing_debug_implementations)]
struct SseReply<S> {
event_stream: S,
charset: Option<String>
}

impl<S> Reply for SseReply<S>
Expand All @@ -343,7 +429,10 @@ where
let mut res = Response::new(Body::wrap_stream(body_stream));
// Set appropriate content type
res.headers_mut()
.insert(CONTENT_TYPE, HeaderValue::from_static("text/event-stream"));
.insert(CONTENT_TYPE, match self.charset {
None => HeaderValue::from_static("text/event-stream"),
Some(charset) => HeaderValue::from_str(format!("text/event-stream; charset={}", charset).as_str()).expect("Invalid header value"),
});
// Disable response body caching
res.headers_mut()
.insert(CACHE_CONTROL, HeaderValue::from_static("no-cache"));
Expand Down

0 comments on commit d38f7ed

Please sign in to comment.