Skip to content

Commit

Permalink
Don't require an Encoder type while constructing FramedWrite so we ca…
Browse files Browse the repository at this point in the history
…n be generic over the lifetime
  • Loading branch information
w4 committed Feb 1, 2021
1 parent cd3a036 commit 8799836
Show file tree
Hide file tree
Showing 5 changed files with 38 additions and 15 deletions.
3 changes: 2 additions & 1 deletion CHANGES.md
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
# CHANGES

## Unreleased - 2021-xx-xx

* Changed the signature of `actix::io::FramedWrite` to allow `FramedWrite::write()`'s `item`
lifetime to be the lifetime of the function rather than the struct. [#462]

## 0.11.0-beta.1 - 2021-01-01
### Added
Expand Down
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ derive = ["actix_derive"]
mailbox_assert = []

[dependencies]
actix-rt = "2.0.0-beta.1"
actix-rt = "=2.0.0-beta.2"
actix_derive = { version = "0.5", optional = true }
actix-macros = { version = "0.1.3", features = ["actix-reexport"] }

Expand Down
4 changes: 2 additions & 2 deletions examples/chat/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use std::{io, net, thread};
use actix::prelude::*;
use tokio::io::WriteHalf;
use tokio::net::TcpStream;
use tokio_util::codec::FramedRead;
use tokio_util::codec::{Encoder, FramedRead};

mod codec;

Expand Down Expand Up @@ -45,9 +45,9 @@ async fn main() {

struct ChatClient {
framed: actix::io::FramedWrite<
codec::ChatRequest,
WriteHalf<TcpStream>,
codec::ClientChatCodec,
<codec::ClientChatCodec as Encoder<codec::ChatRequest>>::Error,
>,
}

Expand Down
13 changes: 11 additions & 2 deletions examples/chat/src/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use actix::prelude::*;

use tokio::io::WriteHalf;
use tokio::net::TcpStream;
use tokio_util::codec::Encoder;

use crate::codec::{ChatCodec, ChatRequest, ChatResponse};
use crate::server::{self, ChatServer};
Expand All @@ -29,7 +30,11 @@ pub struct ChatSession {
/// joined room
room: String,
/// Framed wrapper
framed: actix::io::FramedWrite<ChatResponse, WriteHalf<TcpStream>, ChatCodec>,
framed: actix::io::FramedWrite<
WriteHalf<TcpStream>,
ChatCodec,
<ChatCodec as Encoder<ChatResponse>>::Error,
>,
}

impl Actor for ChatSession {
Expand Down Expand Up @@ -125,7 +130,11 @@ impl Handler<Message> for ChatSession {
impl ChatSession {
pub fn new(
addr: Addr<ChatServer>,
framed: actix::io::FramedWrite<ChatResponse, WriteHalf<TcpStream>, ChatCodec>,
framed: actix::io::FramedWrite<
WriteHalf<TcpStream>,
ChatCodec,
<ChatCodec as Encoder<ChatResponse>>::Error,
>,
) -> ChatSession {
ChatSession {
addr,
Expand Down
31 changes: 22 additions & 9 deletions src/io.rs
Original file line number Diff line number Diff line change
Expand Up @@ -306,17 +306,17 @@ where

/// A wrapper for the `AsyncWrite` and `Encoder` types. The AsyncWrite will be flushed when this
/// struct is dropped.
pub struct FramedWrite<I, T: AsyncWrite + Unpin, U: Encoder<I>> {
pub struct FramedWrite<T: AsyncWrite + Unpin, U, E: From<io::Error> = std::io::Error> {
enc: U,
inner: UnsafeWriter<T, U::Error>,
inner: UnsafeWriter<T, E>,
}

impl<I, T: AsyncWrite + Unpin, U: Encoder<I>> FramedWrite<I, T, U> {
impl<T: AsyncWrite + Unpin, U, E: From<io::Error>> FramedWrite<T, U, E> {
pub fn new<A, C>(io: T, enc: U, ctx: &mut C) -> Self
where
A: Actor<Context = C> + WriteHandler<U::Error>,
A: Actor<Context = C> + WriteHandler<E>,
C: AsyncContext<A>,
U::Error: 'static,
E: 'static,
T: Unpin + 'static,
{
let inner = UnsafeWriter(
Expand All @@ -343,9 +343,9 @@ impl<I, T: AsyncWrite + Unpin, U: Encoder<I>> FramedWrite<I, T, U> {

pub fn from_buffer<A, C>(io: T, enc: U, buffer: BytesMut, ctx: &mut C) -> Self
where
A: Actor<Context = C> + WriteHandler<U::Error>,
A: Actor<Context = C> + WriteHandler<E>,
C: AsyncContext<A>,
U::Error: 'static,
E: 'static,
T: Unpin + 'static,
{
let inner = UnsafeWriter(
Expand Down Expand Up @@ -390,7 +390,10 @@ impl<I, T: AsyncWrite + Unpin, U: Encoder<I>> FramedWrite<I, T, U> {
}

/// Writes an item to the sink.
pub fn write(&mut self, item: I) {
pub fn write<I>(&mut self, item: I)
where
U: Encoder<I, Error = E>,
{
let mut inner = self.inner.0.borrow_mut();
let _ = self.enc.encode(item, &mut inner.buffer).map_err(|e| {
inner.error = Some(e);
Expand All @@ -404,9 +407,19 @@ impl<I, T: AsyncWrite + Unpin, U: Encoder<I>> FramedWrite<I, T, U> {
pub fn handle(&self) -> SpawnHandle {
self.inner.0.borrow().handle
}

/// Returns a reference to the underlying encoder.
pub fn encoder(&self) -> &U {
&self.enc
}

/// Returns a mutable reference to the underlying encoder.
pub fn encoder_mut(&mut self) -> &mut U {
&mut self.enc
}
}

impl<I, T: AsyncWrite + Unpin, U: Encoder<I>> Drop for FramedWrite<I, T, U> {
impl<T: AsyncWrite + Unpin, U, E: From<io::Error>> Drop for FramedWrite<T, U, E> {
fn drop(&mut self) {
// Attempts to write any remaining bytes to the stream and flush it
let mut async_writer = self.inner.1.borrow_mut();
Expand Down

0 comments on commit 8799836

Please sign in to comment.