Skip to content

Commit

Permalink
Switch to tokio-0.2.3 and (non-preview) futures
Browse files Browse the repository at this point in the history
Fixes #160
  • Loading branch information
osa1 committed Dec 6, 2019
1 parent c2be34d commit dcb7bc9
Show file tree
Hide file tree
Showing 16 changed files with 325 additions and 404 deletions.
504 changes: 209 additions & 295 deletions Cargo.lock

Large diffs are not rendered by default.

8 changes: 3 additions & 5 deletions libtiny_client/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,10 @@ edition = "2018"
[dependencies]
base64 = "0.6.0"
env_logger = "0.7"
futures-preview = { version = "0.3.0-alpha.19", features = ["async-await"] }
futures-util-preview = "0.3.0-alpha.19"
futures = "0.3.1"
libtiny_logger = { path = "../libtiny_logger" }
libtiny_wire = { path = "../libtiny_wire" }
log = "0.4"
native-tls = "0.2"
tokio = { git = "https://github.com/tokio-rs/tokio.git", features = ["timer"], rev = "227533d" }
tokio-executor = { git = "https://github.com/tokio-rs/tokio.git", rev = "227533d" }
tokio-tls = { git = "https://github.com/tokio-rs/tokio.git", rev = "227533d" }
tokio = { version = "0.2.3", features = ["full"] }
tokio-tls = "0.3.0"
14 changes: 8 additions & 6 deletions libtiny_client/examples/echo.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
use libtiny_client::{Client, Event, ServerInfo};
use libtiny_wire::{Cmd, Msg, MsgTarget, Pfx};

use futures_util::stream::StreamExt;
use futures::stream::StreamExt;
use std::process::exit;

fn main() {
Expand Down Expand Up @@ -42,11 +42,13 @@ fn main() {

println!("{:?}", server_info);

let mut executor = tokio::runtime::current_thread::Runtime::new().unwrap();

executor.spawn(echo_bot_task(server_info));

executor.run().unwrap();
let mut runtime = tokio::runtime::Builder::new()
.basic_scheduler()
.enable_all()
.build()
.unwrap();
let local = tokio::task::LocalSet::new();
local.block_on(&mut runtime, echo_bot_task(server_info));
}

fn show_usage() {
Expand Down
19 changes: 11 additions & 8 deletions libtiny_client/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,8 @@ use state::State;
use stream::{Stream, StreamError};

use futures::future::FutureExt;
use futures::stream::StreamExt;
use futures::stream::{Fuse, StreamExt};
use futures::{pin_mut, select};
use futures_util::stream::Fuse;
use std::net::{SocketAddr, ToSocketAddrs};
use std::time::Duration;
use tokio::io::{AsyncReadExt, AsyncWriteExt};
Expand Down Expand Up @@ -297,7 +296,7 @@ fn connect(server_info: ServerInfo) -> (Client, mpsc::Receiver<Event>) {
let irc_state_clone = irc_state.clone();

let task = main_loop(server_info, irc_state_clone, snd_ev, rcv_cmd);
tokio::runtime::current_thread::spawn(task);
tokio::task::spawn_local(task);

(
Client {
Expand Down Expand Up @@ -436,7 +435,7 @@ async fn main_loop(

// Spawn a task for outgoing messages.
let mut snd_ev_clone = snd_ev.clone();
tokio::runtime::current_thread::spawn(async move {
tokio::task::spawn_local(async move {
while let Some(msg) = rcv_msg.next().await {
if let Err(io_err) = write_half.write_all(msg.as_str().as_bytes()).await {
debug!("IO error when writing: {:?}", io_err);
Expand Down Expand Up @@ -544,7 +543,7 @@ enum TaskResult<A> {
async fn wait_(rcv_cmd: &mut Fuse<mpsc::Receiver<Cmd>>) -> TaskResult<()> {
// Weird code because of a bug in select!?
let delay = async {
tokio::timer::delay_for(Duration::from_secs(60)).await;
tokio::time::delay_for(Duration::from_secs(60)).await;
}
.fuse();
pin_mut!(delay);
Expand Down Expand Up @@ -591,17 +590,21 @@ async fn resolve_addr(
snd_ev: &mut mpsc::Sender<Event>,
) -> TaskResult<std::vec::IntoIter<SocketAddr>> {
let mut addr_iter_task =
tokio_executor::blocking::run(move || (serv_name.as_str(), port).to_socket_addrs()).fuse();
tokio::task::spawn_blocking(move || (serv_name.as_str(), port).to_socket_addrs()).fuse();

loop {
select! {
addr_iter = addr_iter_task => {
match addr_iter {
Err(io_err) => {
Err(join_err) => {
// TODO (osa): Not sure about this
panic!("DNS thread failed: {:?}", join_err);
}
Ok(Err(io_err)) => {
snd_ev.send(Event::IoErr(io_err)).await.unwrap();
return TryAfterDelay;
}
Ok(addr_iter) => {
Ok(Ok(addr_iter)) => {
return Done(addr_iter);
}
}
Expand Down
4 changes: 2 additions & 2 deletions libtiny_client/src/pinger.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use futures::FutureExt;
use futures::{pin_mut, select, stream::StreamExt};
use std::time::Duration;
use tokio::sync::mpsc;
use tokio::timer::delay_for;
use tokio::time::delay_for;

pub(crate) struct Pinger {
snd_rst: mpsc::Sender<()>,
Expand Down Expand Up @@ -69,7 +69,7 @@ impl Pinger {
let (snd_ev, rcv_ev) = mpsc::channel(1);
// No need for sending another "reset" when there's already one waiting to be processed
let (snd_rst, rcv_rst) = mpsc::channel(1);
tokio::runtime::current_thread::spawn(pinger_task(rcv_rst, snd_ev));
tokio::task::spawn_local(pinger_task(rcv_rst, snd_ev));
(Pinger { snd_rst }, rcv_ev)
}

Expand Down
5 changes: 2 additions & 3 deletions libtiny_tui/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,7 @@ edition = "2018"

[dependencies]
env_logger = "0.7"
futures-preview = { version = "0.3.0-alpha.19", features = ["async-await"] }
futures-util-preview = "0.3.0-alpha.19"
futures = "0.3.1"
libtiny_ui = { path = "../libtiny_ui" }
log = "0.4"
notify-rust = "3"
Expand All @@ -16,7 +15,7 @@ tempfile = "3.0.3"
term_input = { path = "../term_input" }
termbox_simple = { path = "../termbox" }
time = "0.1"
tokio = { git = "https://github.com/tokio-rs/tokio.git", features = ["timer", "signal"], rev = "227533d" }
tokio = { version = "0.2.3", features = ["full"] }

[dev-dependencies]
mio = "0.6.9"
Expand Down
20 changes: 12 additions & 8 deletions libtiny_tui/examples/bench.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ use libtiny_tui::{Colors, TUI};
use libtiny_ui::*;
use std::fs::File;
use std::io::{BufRead, BufReader};
use tokio::runtime::current_thread::Runtime;

fn main() {
let args = std::env::args().collect::<Vec<_>>();
Expand All @@ -18,15 +17,20 @@ fn main() {
let file_buffered = BufReader::new(file);
let lines = file_buffered.lines().map(Result::unwrap).collect();

let mut executor = Runtime::new().unwrap();
let (tui, _) = TUI::run(Colors::default(), &mut executor);
let mut runtime = tokio::runtime::Builder::new()
.basic_scheduler()
.enable_all()
.build()
.unwrap();
let local = tokio::task::LocalSet::new();
local.block_on(&mut runtime, async move {
let (tui, _) = TUI::run(Colors::default());

tui.new_server_tab("test");
tui.draw();

executor.block_on(bench_task(tui, lines));
tui.new_server_tab("test");
tui.draw();

// executor.run();
bench_task(tui, lines).await;
});
}

async fn bench_task(tui: TUI, lines: Vec<String>) {
Expand Down
10 changes: 5 additions & 5 deletions libtiny_tui/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,14 @@ pub use crate::tab::TabStyle;
pub use libtiny_ui::*;

use futures::select;
use futures_util::stream::StreamExt;
use futures::stream::StreamExt;
use std::cell::RefCell;
use std::rc::{Rc, Weak};
use term_input::Input;
use time::Tm;
use tokio::runtime::current_thread::Runtime;
use tokio::signal::unix::{signal, SignalKind};
use tokio::sync::mpsc;
use tokio::task::spawn_local;

#[macro_use]
extern crate log;
Expand All @@ -41,7 +41,7 @@ pub struct TUI {
}

impl TUI {
pub fn run(colors: Colors, runtime: &mut Runtime) -> (TUI, mpsc::Receiver<Event>) {
pub fn run(colors: Colors) -> (TUI, mpsc::Receiver<Event>) {
let tui = Rc::new(RefCell::new(tui::TUI::new(colors)));
let inner = Rc::downgrade(&tui);

Expand All @@ -51,10 +51,10 @@ impl TUI {
let (snd_abort, rcv_abort) = mpsc::channel::<()>(1);

// Spawn SIGWINCH handler
runtime.spawn(sigwinch_handler(inner.clone(), rcv_abort));
spawn_local(sigwinch_handler(inner.clone(), rcv_abort));

// Spawn input handler task
runtime.spawn(input_handler(tui, snd_ev, snd_abort));
spawn_local(input_handler(tui, snd_ev, snd_abort));

(TUI { inner }, rcv_ev)
}
Expand Down
4 changes: 2 additions & 2 deletions term_input/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ license = "MIT"
edition = "2018"

[dependencies]
futures-preview = "0.3.0-alpha.18"
futures = "0.3.1"
libc = "0.2"
mio = "0.6"
tokio = { git = "https://github.com/tokio-rs/tokio.git", rev = "227533d" }
tokio = { version = "0.2.3", features = ["full"] }
10 changes: 7 additions & 3 deletions term_input/examples/test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,13 @@ fn main() {

/* DO THE BUSINESS HERE */
let mut input = Input::new();
let mut executor = tokio::runtime::current_thread::Runtime::new().unwrap();
executor.spawn(async move {
let mut runtime = tokio::runtime::Builder::new()
.basic_scheduler()
.enable_all()
.build()
.unwrap();
let local = tokio::task::LocalSet::new();
local.block_on(&mut runtime, async move {
while let Some(mb_ev) = input.next().await {
match mb_ev {
Ok(ev) => {
Expand All @@ -51,7 +56,6 @@ fn main() {
}
}
});
executor.run().unwrap();

// restore the old settings
unsafe { libc::tcsetattr(libc::STDIN_FILENO, libc::TCSANOW, &old_term) };
Expand Down
8 changes: 4 additions & 4 deletions term_input/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,8 @@ use std::collections::VecDeque;
use std::pin::Pin;
use std::task::{Context, Poll};

use tokio::prelude::*;
use futures::stream::Stream;
use tokio::io::PollEvented;

////////////////////////////////////////////////////////////////////////////////////////////////////
// Public types
Expand Down Expand Up @@ -168,16 +169,15 @@ pub struct Input {
/// Used when reading from stdin.
buf: Vec<u8>,

stdin: tokio::net::util::PollEvented<mio::unix::EventedFd<'static>>,
stdin: PollEvented<mio::unix::EventedFd<'static>>,
}

impl Input {
pub fn new() -> Input {
Input {
evs: VecDeque::new(),
buf: Vec::with_capacity(100),
stdin: tokio::net::util::PollEvented::new(mio::unix::EventedFd(&libc::STDIN_FILENO))
.unwrap(),
stdin: PollEvented::new(mio::unix::EventedFd(&libc::STDIN_FILENO)).unwrap(),
}
}
}
Expand Down
4 changes: 2 additions & 2 deletions tiny/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ edition = "2018"
[dependencies]
dirs = "1.0.2"
env_logger = "0.7"
futures-util-preview = "0.3.0-alpha.19"
futures = "0.3.1"
libtiny_client = { path = "../libtiny_client" }
libtiny_logger = { path = "../libtiny_logger" }
libtiny_tui = { path = "../libtiny_tui" }
Expand All @@ -21,4 +21,4 @@ log = "0.4"
serde = { version = "1.0.8", features = ["derive"] }
serde_yaml = "0.7.1"
time = "0.1"
tokio = { git = "https://github.com/tokio-rs/tokio.git", features = ["timer"], rev = "227533d" }
tokio = { version = "0.2.3", features = ["full"] }
2 changes: 1 addition & 1 deletion tiny/src/cmd.rs
Original file line number Diff line number Diff line change
Expand Up @@ -264,7 +264,7 @@ fn connect_(
// Spawn UI task
let ui_clone = libtiny_ui::clone_box(&**ui);
let client_clone = client.clone();
tokio::runtime::current_thread::spawn(crate::conn::task(rcv_ev, ui_clone, client_clone));
tokio::task::spawn_local(crate::conn::task(rcv_ev, ui_clone, client_clone));

clients.push(client);
}
Expand Down
2 changes: 1 addition & 1 deletion tiny/src/conn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

//! IRC event handling

use futures_util::stream::StreamExt;
use futures::stream::StreamExt;
use libtiny_client::Client;
use libtiny_ui::{MsgTarget, TabStyle, UI};
use libtiny_wire as wire;
Expand Down

0 comments on commit dcb7bc9

Please sign in to comment.