diff --git a/Cargo.lock b/Cargo.lock index 93d5dec792..9c97ad8262 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -248,6 +248,20 @@ version = "1.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "cdb031dd78e28731d87d56cc8ffef4a8f36ca26c38fe2de700543e627f8a464a" +[[package]] +name = "backoff" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b62ddb9cb1ec0a098ad4bbf9344d0713fa193ae1a80af55febcff2627b6a00c1" +dependencies = [ + "futures-core", + "getrandom", + "instant", + "pin-project-lite", + "rand", + "tokio", +] + [[package]] name = "base64" version = "0.13.0" @@ -2316,6 +2330,7 @@ version = "0.5.12" dependencies = [ "anyhow", "async-trait", + "backoff", "chrono", "clap 3.1.0", "console", diff --git a/sqlx-cli/Cargo.toml b/sqlx-cli/Cargo.toml index e308072b55..3c0cad925f 100644 --- a/sqlx-cli/Cargo.toml +++ b/sqlx-cli/Cargo.toml @@ -47,6 +47,8 @@ openssl = { version = "0.10.38", optional = true } # workaround for https://github.com/rust-lang/rust/issues/29497 remove_dir_all = "0.7.0" +backoff = { version = "0.4.0", features = ["futures", "tokio"] } + [features] default = ["postgres", "sqlite", "mysql", "native-tls"] rustls = ["sqlx/runtime-tokio-rustls"] diff --git a/sqlx-cli/src/database.rs b/sqlx-cli/src/database.rs index 7521b1fb68..9044e28b7b 100644 --- a/sqlx-cli/src/database.rs +++ b/sqlx-cli/src/database.rs @@ -1,43 +1,58 @@ use crate::migrate; +use crate::opt::ConnectOpts; use console::style; use promptly::{prompt, ReadlineError}; use sqlx::any::Any; use sqlx::migrate::MigrateDatabase; -pub async fn create(uri: &str) -> anyhow::Result<()> { - if !Any::database_exists(uri).await? { - Any::create_database(uri).await?; +pub async fn create(connect_opts: &ConnectOpts) -> anyhow::Result<()> { + // NOTE: only retry the idempotent action. + // We're assuming that if this succeeds, then any following operations should also succeed. + let exists = crate::retry_connect_errors(connect_opts, Any::database_exists).await?; + + if !exists { + Any::create_database(&connect_opts.database_url).await?; } Ok(()) } -pub async fn drop(uri: &str, confirm: bool) -> anyhow::Result<()> { - if confirm && !ask_to_continue(uri) { +pub async fn drop(connect_opts: &ConnectOpts, confirm: bool) -> anyhow::Result<()> { + if confirm && !ask_to_continue(connect_opts) { return Ok(()); } - if Any::database_exists(uri).await? { - Any::drop_database(uri).await?; + // NOTE: only retry the idempotent action. + // We're assuming that if this succeeds, then any following operations should also succeed. + let exists = crate::retry_connect_errors(connect_opts, Any::database_exists).await?; + + if exists { + Any::drop_database(&connect_opts.database_url).await?; } Ok(()) } -pub async fn reset(migration_source: &str, uri: &str, confirm: bool) -> anyhow::Result<()> { - drop(uri, confirm).await?; - setup(migration_source, uri).await +pub async fn reset( + migration_source: &str, + connect_opts: &ConnectOpts, + confirm: bool, +) -> anyhow::Result<()> { + drop(connect_opts, confirm).await?; + setup(migration_source, connect_opts).await } -pub async fn setup(migration_source: &str, uri: &str) -> anyhow::Result<()> { - create(uri).await?; - migrate::run(migration_source, uri, false, false).await +pub async fn setup(migration_source: &str, connect_opts: &ConnectOpts) -> anyhow::Result<()> { + create(connect_opts).await?; + migrate::run(migration_source, connect_opts, false, false).await } -fn ask_to_continue(uri: &str) -> bool { +fn ask_to_continue(connect_opts: &ConnectOpts) -> bool { loop { - let r: Result = - prompt(format!("Drop database at {}? (y/n)", style(uri).cyan())); + let r: Result = prompt(format!( + "Drop database at {}? (y/n)", + style(&connect_opts.database_url).cyan() + )); match r { Ok(response) => { if response == "n" || response == "N" { diff --git a/sqlx-cli/src/lib.rs b/sqlx-cli/src/lib.rs index b877d450da..59afdc267f 100644 --- a/sqlx-cli/src/lib.rs +++ b/sqlx-cli/src/lib.rs @@ -1,6 +1,10 @@ use anyhow::Result; +use futures::{Future, TryFutureExt}; +use sqlx::{AnyConnection, Connection}; +use std::io; +use std::time::Duration; -use crate::opt::{Command, DatabaseCommand, MigrateCommand}; +use crate::opt::{Command, ConnectOpts, DatabaseCommand, MigrateCommand}; mod database; // mod migration; @@ -23,11 +27,11 @@ pub async fn run(opt: Opt) -> Result<()> { source, dry_run, ignore_missing, - database_url, + connect_opts, } => { migrate::run( source.resolve(&migrate.source), - &database_url, + &connect_opts, dry_run, *ignore_missing, ) @@ -37,11 +41,11 @@ pub async fn run(opt: Opt) -> Result<()> { source, dry_run, ignore_missing, - database_url, + connect_opts, } => { migrate::revert( source.resolve(&migrate.source), - &database_url, + &connect_opts, dry_run, *ignore_missing, ) @@ -49,44 +53,88 @@ pub async fn run(opt: Opt) -> Result<()> { } MigrateCommand::Info { source, - database_url, - } => migrate::info(source.resolve(&migrate.source), &database_url).await?, + connect_opts, + } => migrate::info(source.resolve(&migrate.source), &connect_opts).await?, MigrateCommand::BuildScript { source, force } => { migrate::build_script(source.resolve(&migrate.source), force)? } }, Command::Database(database) => match database.command { - DatabaseCommand::Create { database_url } => database::create(&database_url).await?, + DatabaseCommand::Create { connect_opts } => database::create(&connect_opts).await?, DatabaseCommand::Drop { confirmation, - database_url, - } => database::drop(&database_url, !confirmation).await?, + connect_opts, + } => database::drop(&connect_opts, !confirmation.yes).await?, DatabaseCommand::Reset { confirmation, source, - database_url, - } => database::reset(&source, &database_url, !confirmation).await?, + connect_opts, + } => database::reset(&source, &connect_opts, !confirmation.yes).await?, DatabaseCommand::Setup { source, - database_url, - } => database::setup(&source, &database_url).await?, + connect_opts, + } => database::setup(&source, &connect_opts).await?, }, Command::Prepare { check: false, merged, args, - database_url, - } => prepare::run(&database_url, merged, args)?, + connect_opts, + } => prepare::run(&connect_opts, merged, args).await?, Command::Prepare { check: true, merged, args, - database_url, - } => prepare::check(&database_url, merged, args)?, + connect_opts, + } => prepare::check(&connect_opts, merged, args).await?, }; Ok(()) } + +/// Attempt to connect to the database server, retrying up to `ops.connect_timeout`. +async fn connect(opts: &ConnectOpts) -> sqlx::Result { + retry_connect_errors(opts, AnyConnection::connect).await +} + +/// Attempt an operation that may return errors like `ConnectionRefused`, +/// retrying up until `ops.connect_timeout`. +/// +/// The closure is passed `&ops.database_url` for easy composition. +async fn retry_connect_errors<'a, F, Fut, T>( + opts: &'a ConnectOpts, + mut connect: F, +) -> sqlx::Result +where + F: FnMut(&'a str) -> Fut, + Fut: Future> + 'a, +{ + backoff::future::retry( + backoff::ExponentialBackoffBuilder::new() + .with_max_elapsed_time(Some(Duration::from_secs(opts.connect_timeout))) + .build(), + || { + // Using `From`/`Into`/`?` to map an error to `backend::Error` + // by default treats it as transient (retryable). + connect(&opts.database_url).map_err(|e| -> backoff::Error { + match e { + sqlx::Error::Io(ref ioe) => match ioe.kind() { + io::ErrorKind::ConnectionRefused + | io::ErrorKind::ConnectionReset + | io::ErrorKind::ConnectionAborted => { + return e.into(); + } + _ => (), + }, + _ => (), + } + + backoff::Error::permanent(e) + }) + }, + ) + .await +} diff --git a/sqlx-cli/src/migrate.rs b/sqlx-cli/src/migrate.rs index f51e68f2d2..b33fa6006e 100644 --- a/sqlx-cli/src/migrate.rs +++ b/sqlx-cli/src/migrate.rs @@ -1,8 +1,8 @@ +use crate::opt::ConnectOpts; use anyhow::{bail, Context}; use chrono::Utc; use console::style; use sqlx::migrate::{AppliedMigration, Migrate, MigrateError, MigrationType, Migrator}; -use sqlx::{AnyConnection, Connection}; use std::borrow::Cow; use std::collections::{HashMap, HashSet}; use std::fmt::Write; @@ -116,9 +116,9 @@ fn short_checksum(checksum: &[u8]) -> String { s } -pub async fn info(migration_source: &str, uri: &str) -> anyhow::Result<()> { +pub async fn info(migration_source: &str, connect_opts: &ConnectOpts) -> anyhow::Result<()> { let migrator = Migrator::new(Path::new(migration_source)).await?; - let mut conn = AnyConnection::connect(uri).await?; + let mut conn = crate::connect(&connect_opts).await?; conn.ensure_migrations_table().await?; @@ -190,12 +190,12 @@ fn validate_applied_migrations( pub async fn run( migration_source: &str, - uri: &str, + connect_opts: &ConnectOpts, dry_run: bool, ignore_missing: bool, ) -> anyhow::Result<()> { let migrator = Migrator::new(Path::new(migration_source)).await?; - let mut conn = AnyConnection::connect(uri).await?; + let mut conn = crate::connect(connect_opts).await?; conn.ensure_migrations_table().await?; @@ -249,12 +249,12 @@ pub async fn run( pub async fn revert( migration_source: &str, - uri: &str, + connect_opts: &ConnectOpts, dry_run: bool, ignore_missing: bool, ) -> anyhow::Result<()> { let migrator = Migrator::new(Path::new(migration_source)).await?; - let mut conn = AnyConnection::connect(uri).await?; + let mut conn = crate::connect(&connect_opts).await?; conn.ensure_migrations_table().await?; diff --git a/sqlx-cli/src/opt.rs b/sqlx-cli/src/opt.rs index c7ac046fc1..e70dbf119b 100644 --- a/sqlx-cli/src/opt.rs +++ b/sqlx-cli/src/opt.rs @@ -38,7 +38,7 @@ pub enum Command { args: Vec, #[clap(flatten)] - database_url: DatabaseUrl, + connect_opts: ConnectOpts, }, #[clap(alias = "mig")] @@ -57,7 +57,7 @@ pub enum DatabaseCommand { /// Creates the database specified in your DATABASE_URL. Create { #[clap(flatten)] - database_url: DatabaseUrl, + connect_opts: ConnectOpts, }, /// Drops the database specified in your DATABASE_URL. @@ -66,7 +66,7 @@ pub enum DatabaseCommand { confirmation: Confirmation, #[clap(flatten)] - database_url: DatabaseUrl, + connect_opts: ConnectOpts, }, /// Drops the database specified in your DATABASE_URL, re-creates it, and runs any pending migrations. @@ -78,7 +78,7 @@ pub enum DatabaseCommand { source: Source, #[clap(flatten)] - database_url: DatabaseUrl, + connect_opts: ConnectOpts, }, /// Creates the database specified in your DATABASE_URL and runs any pending migrations. @@ -87,7 +87,7 @@ pub enum DatabaseCommand { source: Source, #[clap(flatten)] - database_url: DatabaseUrl, + connect_opts: ConnectOpts, }, } @@ -132,7 +132,7 @@ pub enum MigrateCommand { ignore_missing: IgnoreMissing, #[clap(flatten)] - database_url: DatabaseUrl, + connect_opts: ConnectOpts, }, /// Revert the latest migration with a down file. @@ -148,7 +148,7 @@ pub enum MigrateCommand { ignore_missing: IgnoreMissing, #[clap(flatten)] - database_url: DatabaseUrl, + connect_opts: ConnectOpts, }, /// List all available migrations. @@ -157,7 +157,7 @@ pub enum MigrateCommand { source: SourceOverride, #[clap(flatten)] - database_url: DatabaseUrl, + connect_opts: ConnectOpts, }, /// Generate a `build.rs` to trigger recompilation when a new migration is added. @@ -212,43 +212,24 @@ impl SourceOverride { /// Argument for the database URL. #[derive(Args, Debug)] -pub struct DatabaseUrl { +pub struct ConnectOpts { /// Location of the DB, by default will be read from the DATABASE_URL env var #[clap(long, short = 'D', env)] - database_url: String, -} - -impl Deref for DatabaseUrl { - type Target = String; + pub database_url: String, - fn deref(&self) -> &Self::Target { - &self.database_url - } + /// The maximum time, in seconds, to try connecting to the database server before + /// returning an error. + #[clap(long, default_value = "10")] + pub connect_timeout: u64, } -/// Argument for automatic confirmantion. +/// Argument for automatic confirmation. #[derive(Args, Copy, Clone, Debug)] pub struct Confirmation { /// Automatic confirmation. Without this option, you will be prompted before dropping /// your database. #[clap(short)] - yes: bool, -} - -impl Deref for Confirmation { - type Target = bool; - - fn deref(&self) -> &Self::Target { - &self.yes - } -} - -impl Not for Confirmation { - type Output = bool; - - fn not(self) -> Self::Output { - !self.yes - } + pub yes: bool, } /// Argument for ignoring applied migrations that were not resolved. diff --git a/sqlx-cli/src/prepare.rs b/sqlx-cli/src/prepare.rs index ca2e22bc18..343ad7baf0 100644 --- a/sqlx-cli/src/prepare.rs +++ b/sqlx-cli/src/prepare.rs @@ -1,8 +1,10 @@ +use crate::opt::ConnectOpts; use anyhow::{bail, Context}; use console::style; use remove_dir_all::remove_dir_all; use serde::Deserialize; use sqlx::any::{AnyConnectOptions, AnyKind}; +use sqlx::Connection; use std::collections::BTreeMap; use std::fs::File; use std::io::{BufReader, BufWriter}; @@ -22,7 +24,16 @@ struct DataFile { data: QueryData, } -pub fn run(url: &str, merge: bool, cargo_args: Vec) -> anyhow::Result<()> { +pub async fn run( + connect_opts: &ConnectOpts, + merge: bool, + cargo_args: Vec, +) -> anyhow::Result<()> { + // Ensure the database server is available. + crate::connect(connect_opts).await?.close().await?; + + let url = &connect_opts.database_url; + let db_kind = get_db_kind(url)?; let data = run_prepare_step(url, merge, cargo_args)?; @@ -52,7 +63,16 @@ pub fn run(url: &str, merge: bool, cargo_args: Vec) -> anyhow::Result<() Ok(()) } -pub fn check(url: &str, merge: bool, cargo_args: Vec) -> anyhow::Result<()> { +pub async fn check( + connect_opts: &ConnectOpts, + merge: bool, + cargo_args: Vec, +) -> anyhow::Result<()> { + // Ensure the database server is available. + crate::connect(connect_opts).await?.close().await?; + + let url = &connect_opts.database_url; + let db_kind = get_db_kind(url)?; let data = run_prepare_step(url, merge, cargo_args)?;