diff --git a/Cargo.toml b/Cargo.toml index fc9c0a52c..24f295da7 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -55,8 +55,8 @@ full = [ ] [dependencies] -teloxide-core = { version = "0.3.3", default-features = false } -#teloxide-core = { git = "https://github.com/teloxide/teloxide-core.git", rev = "...", default-features = false } +#teloxide-core = { version = "0.3.3", default-features = false } +teloxide-core = { git = "https://github.com/teloxide/teloxide-core.git", rev = "3ccb8f0", default-features = false } teloxide-macros = { version = "0.4", optional = true } serde_json = "1.0" diff --git a/examples/admin_bot/src/main.rs b/examples/admin_bot/src/main.rs index 904c93694..7693fa867 100644 --- a/examples/admin_bot/src/main.rs +++ b/examples/admin_bot/src/main.rs @@ -1,6 +1,6 @@ use std::{error::Error, str::FromStr}; -use chrono::{DateTime, Duration, NaiveDateTime, Utc}; +use chrono::Duration; use teloxide::{prelude::*, types::{ChatPermissions, Me}, utils::command::BotCommand}; // Derive BotCommand to parse text with a command into this enumeration. @@ -71,14 +71,9 @@ async fn mute_user(cx: &Cx, time: Duration) -> Result<(), Box::from_utc( - NaiveDateTime::from_timestamp(cx.update.date as i64, 0), - Utc, - ) + time, + ChatPermissions::empty(), ) + .until_date(cx.update.date + time) .await?; } None => { @@ -114,12 +109,7 @@ async fn ban_user(cx: &Cx, time: Duration) -> Result<(), Box::from_utc( - NaiveDateTime::from_timestamp(cx.update.date as i64, 0), - Utc, - ) + time, - ) + .until_date(cx.update.date + time) .await?; } None => { diff --git a/examples/heroku_ping_pong_bot/src/main.rs b/examples/heroku_ping_pong_bot/src/main.rs index 653fc9f89..e74d1789d 100644 --- a/examples/heroku_ping_pong_bot/src/main.rs +++ b/examples/heroku_ping_pong_bot/src/main.rs @@ -39,10 +39,8 @@ pub async fn webhook(bot: AutoSend) -> impl update_listeners::UpdateListene let server = warp::post() .and(warp::path(path)) .and(warp::body::json()) - .map(move |json: serde_json::Value| { - if let Ok(update) = Update::try_parse(&json) { - tx.send(Ok(update)).expect("Cannot send an incoming update from the webhook") - } + .map(move |update: Update| { + tx.send(Ok(update)).expect("Cannot send an incoming update from the webhook"); StatusCode::OK }) diff --git a/examples/inline_bot/src/main.rs b/examples/inline_bot/src/main.rs index d7c10399d..578da9cf8 100644 --- a/examples/inline_bot/src/main.rs +++ b/examples/inline_bot/src/main.rs @@ -44,8 +44,8 @@ async fn run() { ))), ) .description("DuckDuckGo Search") - .thumb_url("https://duckduckgo.com/assets/logo_header.v108.png") - .url("https://duckduckgo.com/about"); // Note: This is the url that will open if they click the thumbnail + .thumb_url("https://duckduckgo.com/assets/logo_header.v108.png".parse().unwrap()) + .url("https://duckduckgo.com/about".parse().unwrap()); // Note: This is the url that will open if they click the thumbnail let results = vec![ InlineQueryResult::Article(google_search), diff --git a/examples/ngrok_ping_pong_bot/src/main.rs b/examples/ngrok_ping_pong_bot/src/main.rs index c748e0c84..88a0bbf61 100644 --- a/examples/ngrok_ping_pong_bot/src/main.rs +++ b/examples/ngrok_ping_pong_bot/src/main.rs @@ -33,10 +33,8 @@ pub async fn webhook(bot: AutoSend) -> impl update_listeners::UpdateListene let server = warp::post() .and(warp::body::json()) - .map(move |json: serde_json::Value| { - if let Ok(update) = Update::try_parse(&json) { - tx.send(Ok(update)).expect("Cannot send an incoming update from the webhook") - } + .map(move |update: Update| { + tx.send(Ok(update)).expect("Cannot send an incoming update from the webhook"); StatusCode::OK }) diff --git a/examples/test_examples.sh b/examples/test_examples.sh old mode 100644 new mode 100755 index 4b1969ecd..d41d1f71e --- a/examples/test_examples.sh +++ b/examples/test_examples.sh @@ -1,9 +1,9 @@ -##!/bin/sh +#!/bin/sh for example in */; do echo Testing $example... cd $example - cargo check & + cargo check cd .. done diff --git a/src/dispatching/dispatcher.rs b/src/dispatching/dispatcher.rs index f486c46aa..dc1b1b001 100644 --- a/src/dispatching/dispatcher.rs +++ b/src/dispatching/dispatcher.rs @@ -20,8 +20,9 @@ use futures::{stream::FuturesUnordered, Future, StreamExt}; use teloxide_core::{ requests::Requester, types::{ - AllowedUpdate, CallbackQuery, ChatMemberUpdated, ChosenInlineResult, InlineQuery, Message, - Poll, PollAnswer, PreCheckoutQuery, ShippingQuery, Update, UpdateKind, + AllowedUpdate, CallbackQuery, ChatJoinRequest, ChatMemberUpdated, ChosenInlineResult, + InlineQuery, Message, Poll, PollAnswer, PreCheckoutQuery, ShippingQuery, Update, + UpdateKind, }, }; use tokio::{ @@ -52,6 +53,7 @@ pub struct Dispatcher { poll_answers_queue: Tx, my_chat_members_queue: Tx, chat_members_queue: Tx, + chat_join_requests_queue: Tx, running_handlers: FuturesUnordered>, @@ -81,6 +83,7 @@ where poll_answers_queue: None, my_chat_members_queue: None, chat_members_queue: None, + chat_join_requests_queue: None, running_handlers: FuturesUnordered::new(), state: <_>::default(), shutdown_notify_back: <_>::default(), @@ -263,7 +266,7 @@ where pub async fn dispatch(&mut self) where R: Requester + Clone, - ::GetUpdatesFaultTolerant: Send, + ::GetUpdates: Send, { let listener = update_listeners::polling_default(self.requester.clone()).await; let error_handler = @@ -446,6 +449,20 @@ where chat_member_updated, "UpdateKind::MyChatMember", ), + UpdateKind::ChatJoinRequest(chat_join_request) => send( + &self.requester, + &self.chat_join_requests_queue, + chat_join_request, + "UpdateKind::ChatJoinRequest", + ), + UpdateKind::Error(err) => { + log::error!( + "Cannot parse an update.\nError: {:?}\n\ + This is a bug in teloxide-core, please open an issue here: \ + https://github.com/teloxide/teloxide-core/issues.", + err, + ); + } } } } diff --git a/src/dispatching/repls/commands_repl.rs b/src/dispatching/repls/commands_repl.rs index c1799d58a..09fc1c4a3 100644 --- a/src/dispatching/repls/commands_repl.rs +++ b/src/dispatching/repls/commands_repl.rs @@ -31,7 +31,7 @@ where HandlerE: Debug + Send, N: Into + Send + 'static, R: Requester + Send + Clone + 'static, - ::GetUpdatesFaultTolerant: Send, + ::GetUpdates: Send, { let cloned_requester = requester.clone(); diff --git a/src/dispatching/repls/dialogues_repl.rs b/src/dispatching/repls/dialogues_repl.rs index 345c6bcb6..f0819183c 100644 --- a/src/dispatching/repls/dialogues_repl.rs +++ b/src/dispatching/repls/dialogues_repl.rs @@ -29,7 +29,7 @@ where D: Clone + Default + Send + 'static, Fut: Future> + Send + 'static, R: Requester + Send + Clone + 'static, - ::GetUpdatesFaultTolerant: Send, + ::GetUpdates: Send, { let cloned_requester = requester.clone(); diff --git a/src/dispatching/repls/repl.rs b/src/dispatching/repls/repl.rs index 635a030c1..b8ea3a3e9 100644 --- a/src/dispatching/repls/repl.rs +++ b/src/dispatching/repls/repl.rs @@ -28,7 +28,7 @@ where Result<(), E>: OnError, E: Debug + Send, R: Requester + Send + Clone + 'static, - ::GetUpdatesFaultTolerant: Send, + ::GetUpdates: Send, { let cloned_requester = requester.clone(); repl_with_listener( diff --git a/src/dispatching/update_listeners/polling.rs b/src/dispatching/update_listeners/polling.rs index f7050bff5..b8ca9f90a 100644 --- a/src/dispatching/update_listeners/polling.rs +++ b/src/dispatching/update_listeners/polling.rs @@ -10,9 +10,9 @@ use crate::{ stop_token::{AsyncStopFlag, AsyncStopToken}, update_listeners::{stateful_listener::StatefulListener, UpdateListener}, }, - payloads::GetUpdates, + payloads::{GetUpdates, GetUpdatesSetters as _}, requests::{HasPayload, Request, Requester}, - types::{AllowedUpdate, SemiparsedVec, Update}, + types::{AllowedUpdate, Update}, }; /// Returns a long polling update listener with `timeout` of 10 seconds. @@ -25,7 +25,7 @@ use crate::{ pub async fn polling_default(requester: R) -> impl UpdateListener where R: Requester + Send + 'static, - ::GetUpdatesFaultTolerant: Send, + ::GetUpdates: Send, { delete_webhook_if_setup(&requester).await; polling(requester, Some(Duration::from_secs(10)), None, None) @@ -51,7 +51,7 @@ pub fn polling( ) -> impl UpdateListener where R: Requester + Send + 'static, - ::GetUpdatesFaultTolerant: Send, + ::GetUpdates: Send, { struct State { bot: B, @@ -66,20 +66,14 @@ where fn stream(st: &mut State) -> impl Stream> + Send + '_ where B: Requester + Send, - ::GetUpdatesFaultTolerant: Send, + ::GetUpdates: Send, { stream::unfold(st, move |state| async move { let State { timeout, limit, allowed_updates, bot, offset, flag, .. } = &mut *state; if flag.is_stopped() { - let mut req = bot.get_updates_fault_tolerant(); - - req.payload_mut().0 = GetUpdates { - offset: Some(*offset), - timeout: Some(0), - limit: Some(1), - allowed_updates: allowed_updates.take(), - }; + let mut req = bot.get_updates().offset(*offset).timeout(0).limit(1); + req.payload_mut().allowed_updates = allowed_updates.take(); return match req.send().await { Ok(_) => None, @@ -87,48 +81,26 @@ where }; } - let mut req = bot.get_updates_fault_tolerant(); - req.payload_mut().0 = GetUpdates { + let mut req = bot.get_updates(); + *req.payload_mut() = GetUpdates { offset: Some(*offset), timeout: *timeout, limit: *limit, allowed_updates: allowed_updates.take(), }; - let updates = match req.send().await { - Err(err) => return Some((Either::Left(stream::once(ready(Err(err)))), state)), - Ok(SemiparsedVec(updates)) => { + match req.send().await { + Ok(updates) => { // Set offset to the last update's id + 1 if let Some(upd) = updates.last() { - let id: i32 = match upd { - Ok(ok) => ok.id, - Err((value, _)) => value["update_id"] - .as_i64() - .expect("The 'update_id' field must always exist in Update") - .try_into() - .expect("update_id must be i32"), - }; - - *offset = id + 1; - } - - for update in &updates { - if let Err((value, e)) = update { - log::error!( - "Cannot parse an update.\nError: {:?}\nValue: {}\n\ - This is a bug in teloxide-core, please open an issue here: \ - https://github.com/teloxide/teloxide-core/issues.", - e, - value - ); - } + *offset = upd.id + 1; } - updates.into_iter().filter_map(Result::ok).map(Ok) + let updates = updates.into_iter().map(Ok); + Some((Either::Right(stream::iter(updates)), state)) } - }; - - Some((Either::Right(stream::iter(updates)), state)) + Err(err) => Some((Either::Left(stream::once(ready(Err(err)))), state)), + } }) .flatten() } @@ -170,7 +142,7 @@ where } }; - let is_webhook_setup = !webhook_info.url.is_empty(); + let is_webhook_setup = webhook_info.url.is_some(); if is_webhook_setup { if let Err(e) = requester.delete_webhook().send().await {