-
Notifications
You must be signed in to change notification settings - Fork 4.5k
[ML-12132] Fix persistence of Spark models on passthrough-enabled environments #3443
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
Signed-off-by: Sid Murching <sid.murching@databricks.com>
…tifact ACL-protected DBFS locations Signed-off-by: Sid Murching <sid.murching@databricks.com>
Signed-off-by: Sid Murching <sid.murching@databricks.com>
Signed-off-by: Sid Murching <sid.murching@databricks.com>
Signed-off-by: Sid Murching <sid.murching@databricks.com>
Signed-off-by: Sid Murching <sid.murching@databricks.com>
Signed-off-by: Sid Murching <sid.murching@databricks.com>
Signed-off-by: Sid Murching <sid.murching@databricks.com>
Signed-off-by: Sid Murching <sid.murching@databricks.com>
mlflow/spark.py
Outdated
_HadoopFileSystem.copy_to_local_file(tmp_path, sparkml_data_path, remove_src=True) | ||
# If spark DFS is DBFS and we're running on a Databricks cluster, copy to local FS | ||
# via the FUSE mount | ||
is_saving_to_dbfs = is_valid_dbfs_uri(tmp_path) or posixpath.abspath(tmp_path) == tmp_path |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we need the abspath check?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good Q, it's because persisting a spark model to a path like "/my-directory" actually saves to "dbfs:/my-directory".
I'll add this explanation/clarifying comment to the code, lmk if it makes sense:
# If we're running on a Databricks cluster, we can detect whether the tempdir we're saving our
# Spark model to is on DBFS, by checking if it either starts with dbfs:/
# e.g. "dbfs:/my-directory" or is an absolute path like "/my-directory" (in which case Spark
# saves to the default distributed FS => DBFS). If we're using a DBFS
# tempdir, copy to local FS via the FUSE mount. Otherwise, use HDFS APIs to copy the model to
# the local FS.
mlflow/utils/uri.py
Outdated
|
||
|
||
def dbfs_hdfs_uri_to_fuse_path(dbfs_uri): | ||
# Convert posixpaths (e.g. "/tmp/mlflow") to DBFS URIs by adding "dbfs:/" as a prefix |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you add a docstring to this method so that it's more obvious that this comment just applies to the immediate if statement below and not to the whole method?
|
||
def dbfs_hdfs_uri_to_fuse_path(dbfs_uri): | ||
# Convert posixpaths (e.g. "/tmp/mlflow") to DBFS URIs by adding "dbfs:/" as a prefix | ||
if not is_valid_dbfs_uri(dbfs_uri) and dbfs_uri == posixpath.abspath(dbfs_uri): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if dbfs_uri is a posixpath abspath there's no way it can be a dbfs:/ uri right?
is_saving_to_dbfs = is_valid_dbfs_uri(tmp_path) or posixpath.abspath(tmp_path) == tmp_path | ||
if is_saving_to_dbfs and databricks_utils.is_in_cluster(): | ||
tmp_path_fuse = dbfs_hdfs_uri_to_fuse_path(tmp_path) | ||
shutil.move(src=tmp_path_fuse, dst=sparkml_data_path) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When does the file get written to tmp_path fuse so we can move it? Are we assuming that the FUSE mount has been synced with HDFS? What if the sync hasn't finished yet?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In this save_model method, the goal is to save the SparkML model to a local filesystem path
The flow here is:
- Save SparkML model to a temporary dbfs location (
dbfs:/tmp/my-model
) - Move that persisted Spark model to the local filesystem using FUSE and a file move (move
/dbs/tmp/my-model
to the/my/desired/local/path
)
We leverage FUSE to copy from DBFS -> local as there's no allowlisted JVM-side, HDFS API for copying from dbfs:/... to the local filesystem
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The FUSE access has the same consistency properties as standard HDFS-style access to the same DBFS path. That is, writing to dbfs:/tmp/my-model
and reading /dbfs/tmp/my-model
is equivalent from a blob-storage-eventual-consistency perspective to writing to dbfs:/tmp/my-model
and subsequently reading dbfs:/tmp/my-model
fuse_dfs_tmpdir = dbfs_hdfs_uri_to_fuse_path(dfs_tmpdir) | ||
os.mkdir(fuse_dfs_tmpdir) | ||
# Workaround for inability to use shutil.copytree with DBFS FUSE due to permission-denied | ||
# errors on passthrough-enabled clusters when attempting to copy permission bits for directories |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we factor out this copytree functionality into a more generic helper function?
Signed-off-by: Sid Murching <sid.murching@databricks.com>
mlflow/spark.py
Outdated
relative_dir_path = os.path.relpath(os.path.join(dirpath, dirname), src_dir) | ||
# Compute corresponding FUSE path of each local directory and create an equivalent | ||
# FUSE directory | ||
fuse_dir_path = os.path.join(dst_dir, relative_dir_path) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is no longer specific to fuse, it's just "absolute_dir_path" right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍 yep that's right! I also added a unit test for this method, although it's an internal one used only in the mlflow.spark
module, because it's somewhat complex
Signed-off-by: Sid Murching <sid.murching@databricks.com>
try: | ||
spark_model.save(os.path.join(model_dir, _SPARK_MODEL_PATH_SUB)) | ||
spark_model.save(posixpath.join(model_dir, _SPARK_MODEL_PATH_SUB)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Note: should be a posixpath join since HDFS URIs are posixpath style (e.g. we don't want to include backward slashes \
here on platforms like Windows)
Signed-off-by: Sid Murching <sid.murching@databricks.com>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice refactoring! Can you update the PR description to clarify that this is changing dbfs access from HDFS to FUSE for all clusters (not just the passthrough ones that motivated this)?
…ironments (mlflow#3443) Fix persistence of Spark models using mlflow.spark.log/load_model on passthrough-enabled Databricks clusters Signed-off-by: harupy <17039389+harupy@users.noreply.github.com>
…ironments (mlflow#3443) Fix persistence of Spark models using mlflow.spark.log/load_model on passthrough-enabled Databricks clusters Signed-off-by: harupy <17039389+harupy@users.noreply.github.com>
What changes are proposed in this pull request?
Fixes
mlflow.spark.log_model
,mlflow.spark.load_model
APIs on passthrough-enabled environments against ACL'd artifact locations.Previously, we had the following behavior:
mlflow.spark.save_model
: saves spark model to temporary directory on Spark's default distributed FS (e.g. DBFS on Databricks), then copies the persisted spark model directory to the desired local path using HDFS APIs called via Py4j. The latter step fails in passthrough environments as HDFS APIs aren't allowlistedmlflow.spark.log_model
: attempts to write directly to the destination directory, which fails for ACL'd artifact locations. Then, falls back to callingsave_model
&log_artifacts
, which fails as described abovemlflow.spark.load_model
: attempts to load model directly from the artifact location via Spark. If this fails, downloads model files locally, uploads them to Spark's distributed FS via HDFS APIs (fails due to lack of allowlisting), and then loads Spark model from the distributed FSThis PR makes the following changes when running on Databricks, for both passthrough-enabled & non-passthrough envs:
mlflow.spark.save_model
: copy model files locally using DBFS FUSE instead of denylisted HDFS APIs. This also fixeslog_model
mlflow.spark.load_model
: upload locally-downloaded model files to Spark's distributed FS via DBFS FUSE instead of denylisted HDFS APIs.How is this patch tested?
Manually
Release Notes
Is this a user-facing change?
(Details in 1-2 sentences. You can just refer to another PR with a description if this PR is part of a larger change.)
What component(s), interfaces, languages, and integrations does this PR affect?
Components
area/artifacts
: Artifact stores and artifact loggingarea/build
: Build and test infrastructure for MLflowarea/docs
: MLflow documentation pagesarea/examples
: Example codearea/model-registry
: Model Registry service, APIs, and the fluent client calls for Model Registryarea/models
: MLmodel format, model serialization/deserialization, flavorsarea/projects
: MLproject format, project running backendsarea/scoring
: Local serving, model deployment tools, spark UDFsarea/server-infra
: MLflow server, JavaScript dev serverarea/tracking
: Tracking Service, tracking client APIs, autologgingInterface
area/uiux
: Front-end, user experience, JavaScript, plottingarea/docker
: Docker use across MLflow's components, such as MLflow Projects and MLflow Modelsarea/sqlalchemy
: Use of SQLAlchemy in the Tracking Service or Model Registryarea/windows
: Windows supportLanguage
language/r
: R APIs and clientslanguage/java
: Java APIs and clientslanguage/new
: Proposals for new client languagesIntegrations
integrations/azure
: Azure and Azure ML integrationsintegrations/sagemaker
: SageMaker integrationsintegrations/databricks
: Databricks integrationsHow should the PR be classified in the release notes? Choose one:
rn/breaking-change
- The PR will be mentioned in the "Breaking Changes" sectionrn/none
- No description will be included. The PR will be mentioned only by the PR number in the "Small Bugfixes and Documentation Updates" sectionrn/feature
- A new user-facing feature worth mentioning in the release notesrn/bug-fix
- A user-facing bug fix worth mentioning in the release notesrn/documentation
- A user-facing documentation change worth mentioning in the release notes