Skip to content

Commit

Permalink
feat(cli): add --connect-timeout
Browse files Browse the repository at this point in the history
  • Loading branch information
abonander committed Jun 3, 2022
1 parent 20d61f4 commit 2488046
Show file tree
Hide file tree
Showing 7 changed files with 157 additions and 78 deletions.
15 changes: 15 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions sqlx-cli/Cargo.toml
Expand Up @@ -47,6 +47,8 @@ openssl = { version = "0.10.38", optional = true }
# workaround for https://github.com/rust-lang/rust/issues/29497
remove_dir_all = "0.7.0"

backoff = { version = "0.4.0", features = ["futures", "tokio"] }

[features]
default = ["postgres", "sqlite", "mysql", "native-tls"]
rustls = ["sqlx/runtime-tokio-rustls"]
Expand Down
47 changes: 31 additions & 16 deletions sqlx-cli/src/database.rs
@@ -1,43 +1,58 @@
use crate::migrate;
use crate::opt::ConnectOpts;
use console::style;
use promptly::{prompt, ReadlineError};
use sqlx::any::Any;
use sqlx::migrate::MigrateDatabase;

pub async fn create(uri: &str) -> anyhow::Result<()> {
if !Any::database_exists(uri).await? {
Any::create_database(uri).await?;
pub async fn create(connect_opts: &ConnectOpts) -> anyhow::Result<()> {
// NOTE: only retry the idempotent action.
// We're assuming that if this succeeds, then any following operations should also succeed.
let exists = crate::retry_connect_errors(connect_opts, Any::database_exists).await?;

if !exists {
Any::create_database(&connect_opts.database_url).await?;
}

Ok(())
}

pub async fn drop(uri: &str, confirm: bool) -> anyhow::Result<()> {
if confirm && !ask_to_continue(uri) {
pub async fn drop(connect_opts: &ConnectOpts, confirm: bool) -> anyhow::Result<()> {
if confirm && !ask_to_continue(connect_opts) {
return Ok(());
}

if Any::database_exists(uri).await? {
Any::drop_database(uri).await?;
// NOTE: only retry the idempotent action.
// We're assuming that if this succeeds, then any following operations should also succeed.
let exists = crate::retry_connect_errors(connect_opts, Any::database_exists).await?;

if exists {
Any::drop_database(&connect_opts.database_url).await?;
}

Ok(())
}

pub async fn reset(migration_source: &str, uri: &str, confirm: bool) -> anyhow::Result<()> {
drop(uri, confirm).await?;
setup(migration_source, uri).await
pub async fn reset(
migration_source: &str,
connect_opts: &ConnectOpts,
confirm: bool,
) -> anyhow::Result<()> {
drop(connect_opts, confirm).await?;
setup(migration_source, connect_opts).await
}

pub async fn setup(migration_source: &str, uri: &str) -> anyhow::Result<()> {
create(uri).await?;
migrate::run(migration_source, uri, false, false).await
pub async fn setup(migration_source: &str, connect_opts: &ConnectOpts) -> anyhow::Result<()> {
create(connect_opts).await?;
migrate::run(migration_source, connect_opts, false, false).await
}

fn ask_to_continue(uri: &str) -> bool {
fn ask_to_continue(connect_opts: &ConnectOpts) -> bool {
loop {
let r: Result<String, ReadlineError> =
prompt(format!("Drop database at {}? (y/n)", style(uri).cyan()));
let r: Result<String, ReadlineError> = prompt(format!(
"Drop database at {}? (y/n)",
style(&connect_opts.database_url).cyan()
));
match r {
Ok(response) => {
if response == "n" || response == "N" {
Expand Down
82 changes: 64 additions & 18 deletions sqlx-cli/src/lib.rs
@@ -1,6 +1,10 @@
use anyhow::Result;
use futures::{Future, TryFutureExt};
use sqlx::{AnyConnection, Connection};
use std::io;
use std::time::Duration;

use crate::opt::{Command, DatabaseCommand, MigrateCommand};
use crate::opt::{Command, ConnectOpts, DatabaseCommand, MigrateCommand};

mod database;
// mod migration;
Expand All @@ -23,11 +27,11 @@ pub async fn run(opt: Opt) -> Result<()> {
source,
dry_run,
ignore_missing,
database_url,
connect_opts,
} => {
migrate::run(
source.resolve(&migrate.source),
&database_url,
&connect_opts,
dry_run,
*ignore_missing,
)
Expand All @@ -37,56 +41,98 @@ pub async fn run(opt: Opt) -> Result<()> {
source,
dry_run,
ignore_missing,
database_url,
connect_opts,
} => {
migrate::revert(
source.resolve(&migrate.source),
&database_url,
&connect_opts,
dry_run,
*ignore_missing,
)
.await?
}
MigrateCommand::Info {
source,
database_url,
} => migrate::info(source.resolve(&migrate.source), &database_url).await?,
connect_opts,
} => migrate::info(source.resolve(&migrate.source), &connect_opts).await?,
MigrateCommand::BuildScript { source, force } => {
migrate::build_script(source.resolve(&migrate.source), force)?
}
},

Command::Database(database) => match database.command {
DatabaseCommand::Create { database_url } => database::create(&database_url).await?,
DatabaseCommand::Create { connect_opts } => database::create(&connect_opts).await?,
DatabaseCommand::Drop {
confirmation,
database_url,
} => database::drop(&database_url, !confirmation).await?,
connect_opts,
} => database::drop(&connect_opts, !confirmation.yes).await?,
DatabaseCommand::Reset {
confirmation,
source,
database_url,
} => database::reset(&source, &database_url, !confirmation).await?,
connect_opts,
} => database::reset(&source, &connect_opts, !confirmation.yes).await?,
DatabaseCommand::Setup {
source,
database_url,
} => database::setup(&source, &database_url).await?,
connect_opts,
} => database::setup(&source, &connect_opts).await?,
},

Command::Prepare {
check: false,
merged,
args,
database_url,
} => prepare::run(&database_url, merged, args)?,
connect_opts,
} => prepare::run(&connect_opts, merged, args).await?,

Command::Prepare {
check: true,
merged,
args,
database_url,
} => prepare::check(&database_url, merged, args)?,
connect_opts,
} => prepare::check(&connect_opts, merged, args).await?,
};

Ok(())
}

/// Attempt to connect to the database server, retrying up to `ops.connect_timeout`.
async fn connect(opts: &ConnectOpts) -> sqlx::Result<AnyConnection> {
retry_connect_errors(opts, AnyConnection::connect).await
}

/// Attempt an operation that may return errors like `ConnectionRefused`,
/// retrying up until `ops.connect_timeout`.
///
/// The closure is passed `&ops.database_url` for easy composition.
async fn retry_connect_errors<'a, F, Fut, T>(
opts: &'a ConnectOpts,
mut connect: F,
) -> sqlx::Result<T>
where
F: FnMut(&'a str) -> Fut,
Fut: Future<Output = sqlx::Result<T>> + 'a,
{
backoff::future::retry(
backoff::ExponentialBackoffBuilder::new()
.with_max_elapsed_time(Some(Duration::from_secs(opts.connect_timeout)))
.build(),
|| {
connect(&opts.database_url).map_err(|e| -> backoff::Error<sqlx::Error> {
match e {
sqlx::Error::Io(ref ioe) => match ioe.kind() {
io::ErrorKind::ConnectionRefused
| io::ErrorKind::ConnectionReset
| io::ErrorKind::ConnectionAborted => {
return backoff::Error::transient(e);
}
_ => (),
},
_ => (),
}

backoff::Error::permanent(e)
})
},
)
.await
}
14 changes: 7 additions & 7 deletions sqlx-cli/src/migrate.rs
@@ -1,8 +1,8 @@
use crate::opt::ConnectOpts;
use anyhow::{bail, Context};
use chrono::Utc;
use console::style;
use sqlx::migrate::{AppliedMigration, Migrate, MigrateError, MigrationType, Migrator};
use sqlx::{AnyConnection, Connection};
use std::borrow::Cow;
use std::collections::{HashMap, HashSet};
use std::fmt::Write;
Expand Down Expand Up @@ -116,9 +116,9 @@ fn short_checksum(checksum: &[u8]) -> String {
s
}

pub async fn info(migration_source: &str, uri: &str) -> anyhow::Result<()> {
pub async fn info(migration_source: &str, connect_opts: &ConnectOpts) -> anyhow::Result<()> {
let migrator = Migrator::new(Path::new(migration_source)).await?;
let mut conn = AnyConnection::connect(uri).await?;
let mut conn = crate::connect(&connect_opts).await?;

conn.ensure_migrations_table().await?;

Expand Down Expand Up @@ -190,12 +190,12 @@ fn validate_applied_migrations(

pub async fn run(
migration_source: &str,
uri: &str,
connect_opts: &ConnectOpts,
dry_run: bool,
ignore_missing: bool,
) -> anyhow::Result<()> {
let migrator = Migrator::new(Path::new(migration_source)).await?;
let mut conn = AnyConnection::connect(uri).await?;
let mut conn = crate::connect(connect_opts).await?;

conn.ensure_migrations_table().await?;

Expand Down Expand Up @@ -249,12 +249,12 @@ pub async fn run(

pub async fn revert(
migration_source: &str,
uri: &str,
connect_opts: &ConnectOpts,
dry_run: bool,
ignore_missing: bool,
) -> anyhow::Result<()> {
let migrator = Migrator::new(Path::new(migration_source)).await?;
let mut conn = AnyConnection::connect(uri).await?;
let mut conn = crate::connect(&connect_opts).await?;

conn.ensure_migrations_table().await?;

Expand Down

0 comments on commit 2488046

Please sign in to comment.