Skip to content

Commit

Permalink
Merge pull request #487 from teloxide/update_teloxide_core_to_master
Browse files Browse the repository at this point in the history
Update `teloxide-core` to the latest git version
  • Loading branch information
Hirrolot committed Jan 12, 2022
2 parents 9d062f6 + 85ef148 commit c12fae2
Show file tree
Hide file tree
Showing 11 changed files with 54 additions and 79 deletions.
4 changes: 2 additions & 2 deletions Cargo.toml
Expand Up @@ -55,8 +55,8 @@ full = [
]

[dependencies]
teloxide-core = { version = "0.3.3", default-features = false }
#teloxide-core = { git = "https://github.com/teloxide/teloxide-core.git", rev = "...", default-features = false }
#teloxide-core = { version = "0.3.3", default-features = false }
teloxide-core = { git = "https://github.com/teloxide/teloxide-core.git", rev = "3ccb8f0", default-features = false }
teloxide-macros = { version = "0.4", optional = true }

serde_json = "1.0"
Expand Down
18 changes: 4 additions & 14 deletions examples/admin_bot/src/main.rs
@@ -1,6 +1,6 @@
use std::{error::Error, str::FromStr};

use chrono::{DateTime, Duration, NaiveDateTime, Utc};
use chrono::Duration;
use teloxide::{prelude::*, types::{ChatPermissions, Me}, utils::command::BotCommand};

// Derive BotCommand to parse text with a command into this enumeration.
Expand Down Expand Up @@ -71,14 +71,9 @@ async fn mute_user(cx: &Cx, time: Duration) -> Result<(), Box<dyn Error + Send +
.restrict_chat_member(
cx.update.chat_id(),
msg1.from().expect("Must be MessageKind::Common").id,
ChatPermissions::default(),
)
.until_date(
DateTime::<Utc>::from_utc(
NaiveDateTime::from_timestamp(cx.update.date as i64, 0),
Utc,
) + time,
ChatPermissions::empty(),
)
.until_date(cx.update.date + time)
.await?;
}
None => {
Expand Down Expand Up @@ -114,12 +109,7 @@ async fn ban_user(cx: &Cx, time: Duration) -> Result<(), Box<dyn Error + Send +
cx.update.chat_id(),
message.from().expect("Must be MessageKind::Common").id,
)
.until_date(
DateTime::<Utc>::from_utc(
NaiveDateTime::from_timestamp(cx.update.date as i64, 0),
Utc,
) + time,
)
.until_date(cx.update.date + time)
.await?;
}
None => {
Expand Down
6 changes: 2 additions & 4 deletions examples/heroku_ping_pong_bot/src/main.rs
Expand Up @@ -39,10 +39,8 @@ pub async fn webhook(bot: AutoSend<Bot>) -> impl update_listeners::UpdateListene
let server = warp::post()
.and(warp::path(path))
.and(warp::body::json())
.map(move |json: serde_json::Value| {
if let Ok(update) = Update::try_parse(&json) {
tx.send(Ok(update)).expect("Cannot send an incoming update from the webhook")
}
.map(move |update: Update| {
tx.send(Ok(update)).expect("Cannot send an incoming update from the webhook");

StatusCode::OK
})
Expand Down
4 changes: 2 additions & 2 deletions examples/inline_bot/src/main.rs
Expand Up @@ -44,8 +44,8 @@ async fn run() {
))),
)
.description("DuckDuckGo Search")
.thumb_url("https://duckduckgo.com/assets/logo_header.v108.png")
.url("https://duckduckgo.com/about"); // Note: This is the url that will open if they click the thumbnail
.thumb_url("https://duckduckgo.com/assets/logo_header.v108.png".parse().unwrap())
.url("https://duckduckgo.com/about".parse().unwrap()); // Note: This is the url that will open if they click the thumbnail

let results = vec![
InlineQueryResult::Article(google_search),
Expand Down
6 changes: 2 additions & 4 deletions examples/ngrok_ping_pong_bot/src/main.rs
Expand Up @@ -33,10 +33,8 @@ pub async fn webhook(bot: AutoSend<Bot>) -> impl update_listeners::UpdateListene

let server = warp::post()
.and(warp::body::json())
.map(move |json: serde_json::Value| {
if let Ok(update) = Update::try_parse(&json) {
tx.send(Ok(update)).expect("Cannot send an incoming update from the webhook")
}
.map(move |update: Update| {
tx.send(Ok(update)).expect("Cannot send an incoming update from the webhook");

StatusCode::OK
})
Expand Down
4 changes: 2 additions & 2 deletions examples/test_examples.sh 100644 → 100755
@@ -1,9 +1,9 @@
##!/bin/sh
#!/bin/sh

for example in */; do
echo Testing $example...
cd $example
cargo check &
cargo check
cd ..
done

Expand Down
23 changes: 20 additions & 3 deletions src/dispatching/dispatcher.rs
Expand Up @@ -20,8 +20,9 @@ use futures::{stream::FuturesUnordered, Future, StreamExt};
use teloxide_core::{
requests::Requester,
types::{
AllowedUpdate, CallbackQuery, ChatMemberUpdated, ChosenInlineResult, InlineQuery, Message,
Poll, PollAnswer, PreCheckoutQuery, ShippingQuery, Update, UpdateKind,
AllowedUpdate, CallbackQuery, ChatJoinRequest, ChatMemberUpdated, ChosenInlineResult,
InlineQuery, Message, Poll, PollAnswer, PreCheckoutQuery, ShippingQuery, Update,
UpdateKind,
},
};
use tokio::{
Expand Down Expand Up @@ -52,6 +53,7 @@ pub struct Dispatcher<R> {
poll_answers_queue: Tx<R, PollAnswer>,
my_chat_members_queue: Tx<R, ChatMemberUpdated>,
chat_members_queue: Tx<R, ChatMemberUpdated>,
chat_join_requests_queue: Tx<R, ChatJoinRequest>,

running_handlers: FuturesUnordered<JoinHandle<()>>,

Expand Down Expand Up @@ -81,6 +83,7 @@ where
poll_answers_queue: None,
my_chat_members_queue: None,
chat_members_queue: None,
chat_join_requests_queue: None,
running_handlers: FuturesUnordered::new(),
state: <_>::default(),
shutdown_notify_back: <_>::default(),
Expand Down Expand Up @@ -263,7 +266,7 @@ where
pub async fn dispatch(&mut self)
where
R: Requester + Clone,
<R as Requester>::GetUpdatesFaultTolerant: Send,
<R as Requester>::GetUpdates: Send,
{
let listener = update_listeners::polling_default(self.requester.clone()).await;
let error_handler =
Expand Down Expand Up @@ -446,6 +449,20 @@ where
chat_member_updated,
"UpdateKind::MyChatMember",
),
UpdateKind::ChatJoinRequest(chat_join_request) => send(
&self.requester,
&self.chat_join_requests_queue,
chat_join_request,
"UpdateKind::ChatJoinRequest",
),
UpdateKind::Error(err) => {
log::error!(
"Cannot parse an update.\nError: {:?}\n\
This is a bug in teloxide-core, please open an issue here: \
https://github.com/teloxide/teloxide-core/issues.",
err,
);
}
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion src/dispatching/repls/commands_repl.rs
Expand Up @@ -31,7 +31,7 @@ where
HandlerE: Debug + Send,
N: Into<String> + Send + 'static,
R: Requester + Send + Clone + 'static,
<R as Requester>::GetUpdatesFaultTolerant: Send,
<R as Requester>::GetUpdates: Send,
{
let cloned_requester = requester.clone();

Expand Down
2 changes: 1 addition & 1 deletion src/dispatching/repls/dialogues_repl.rs
Expand Up @@ -29,7 +29,7 @@ where
D: Clone + Default + Send + 'static,
Fut: Future<Output = DialogueStage<D>> + Send + 'static,
R: Requester + Send + Clone + 'static,
<R as Requester>::GetUpdatesFaultTolerant: Send,
<R as Requester>::GetUpdates: Send,
{
let cloned_requester = requester.clone();

Expand Down
2 changes: 1 addition & 1 deletion src/dispatching/repls/repl.rs
Expand Up @@ -28,7 +28,7 @@ where
Result<(), E>: OnError<E>,
E: Debug + Send,
R: Requester + Send + Clone + 'static,
<R as Requester>::GetUpdatesFaultTolerant: Send,
<R as Requester>::GetUpdates: Send,
{
let cloned_requester = requester.clone();
repl_with_listener(
Expand Down
62 changes: 17 additions & 45 deletions src/dispatching/update_listeners/polling.rs
Expand Up @@ -10,9 +10,9 @@ use crate::{
stop_token::{AsyncStopFlag, AsyncStopToken},
update_listeners::{stateful_listener::StatefulListener, UpdateListener},
},
payloads::GetUpdates,
payloads::{GetUpdates, GetUpdatesSetters as _},
requests::{HasPayload, Request, Requester},
types::{AllowedUpdate, SemiparsedVec, Update},
types::{AllowedUpdate, Update},
};

/// Returns a long polling update listener with `timeout` of 10 seconds.
Expand All @@ -25,7 +25,7 @@ use crate::{
pub async fn polling_default<R>(requester: R) -> impl UpdateListener<R::Err>
where
R: Requester + Send + 'static,
<R as Requester>::GetUpdatesFaultTolerant: Send,
<R as Requester>::GetUpdates: Send,
{
delete_webhook_if_setup(&requester).await;
polling(requester, Some(Duration::from_secs(10)), None, None)
Expand All @@ -51,7 +51,7 @@ pub fn polling<R>(
) -> impl UpdateListener<R::Err>
where
R: Requester + Send + 'static,
<R as Requester>::GetUpdatesFaultTolerant: Send,
<R as Requester>::GetUpdates: Send,
{
struct State<B: Requester> {
bot: B,
Expand All @@ -66,69 +66,41 @@ where
fn stream<B>(st: &mut State<B>) -> impl Stream<Item = Result<Update, B::Err>> + Send + '_
where
B: Requester + Send,
<B as Requester>::GetUpdatesFaultTolerant: Send,
<B as Requester>::GetUpdates: Send,
{
stream::unfold(st, move |state| async move {
let State { timeout, limit, allowed_updates, bot, offset, flag, .. } = &mut *state;

if flag.is_stopped() {
let mut req = bot.get_updates_fault_tolerant();

req.payload_mut().0 = GetUpdates {
offset: Some(*offset),
timeout: Some(0),
limit: Some(1),
allowed_updates: allowed_updates.take(),
};
let mut req = bot.get_updates().offset(*offset).timeout(0).limit(1);
req.payload_mut().allowed_updates = allowed_updates.take();

return match req.send().await {
Ok(_) => None,
Err(err) => Some((Either::Left(stream::once(ready(Err(err)))), state)),
};
}

let mut req = bot.get_updates_fault_tolerant();
req.payload_mut().0 = GetUpdates {
let mut req = bot.get_updates();
*req.payload_mut() = GetUpdates {
offset: Some(*offset),
timeout: *timeout,
limit: *limit,
allowed_updates: allowed_updates.take(),
};

let updates = match req.send().await {
Err(err) => return Some((Either::Left(stream::once(ready(Err(err)))), state)),
Ok(SemiparsedVec(updates)) => {
match req.send().await {
Ok(updates) => {
// Set offset to the last update's id + 1
if let Some(upd) = updates.last() {
let id: i32 = match upd {
Ok(ok) => ok.id,
Err((value, _)) => value["update_id"]
.as_i64()
.expect("The 'update_id' field must always exist in Update")
.try_into()
.expect("update_id must be i32"),
};

*offset = id + 1;
}

for update in &updates {
if let Err((value, e)) = update {
log::error!(
"Cannot parse an update.\nError: {:?}\nValue: {}\n\
This is a bug in teloxide-core, please open an issue here: \
https://github.com/teloxide/teloxide-core/issues.",
e,
value
);
}
*offset = upd.id + 1;
}

updates.into_iter().filter_map(Result::ok).map(Ok)
let updates = updates.into_iter().map(Ok);
Some((Either::Right(stream::iter(updates)), state))
}
};

Some((Either::Right(stream::iter(updates)), state))
Err(err) => Some((Either::Left(stream::once(ready(Err(err)))), state)),
}
})
.flatten()
}
Expand Down Expand Up @@ -170,7 +142,7 @@ where
}
};

let is_webhook_setup = !webhook_info.url.is_empty();
let is_webhook_setup = webhook_info.url.is_some();

if is_webhook_setup {
if let Err(e) = requester.delete_webhook().send().await {
Expand Down

0 comments on commit c12fae2

Please sign in to comment.