Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Dask dependency management plugin for coffea-casa #219

Open
1 of 3 tasks
oshadura opened this issue Nov 8, 2021 · 12 comments
Open
1 of 3 tasks

Dask dependency management plugin for coffea-casa #219

oshadura opened this issue Nov 8, 2021 · 12 comments

Comments

@oshadura
Copy link
Member

oshadura commented Nov 8, 2021

Related issues:

@oshadura oshadura added this to the 2022-02-01 milestone Nov 8, 2021
@oshadura oshadura added this to To Do in Coffea-casa development timeline via automation Nov 12, 2021
@bbockelm
Copy link

Current status:

  • Andrew started looking at the nanny plugins.
  • Main issue: Accessing logs for the nanny process.
  • Issue: Getting nanny to talk back to the scheduler. Need to figure out this: 24b94cb
  • Suggestion: Switch completely to the laptop. Run pure Dask setup using the same version of Dask in the image. Don't bother with coffea-casa until there's a prototype plugin.

Note on the plugin itself. It needs to:

  • Ship file-based dependencies (files/directories)
  • Ensure those dependencies are in the right PYTHONPATH.
  • Potentially do a conda install given a requirements.txt.
  • Handle versioning. Ensure the version of all the above is known by the Dask scheduler. Nannys shouldn't allow a worker to start unless their "version" matches the Dask scheduler.
    • Example: can do the checksum of all the input files and requirements.txt.
    • Goes toward having a "race condition free" dependency management.

@jthiltges
Copy link
Contributor

jthiltges commented Nov 12, 2021

These are the changes I'd made to add a --nanny-port argument to the worker.
jthiltges/distributed@main...jthiltges:nannyport

@nsmith-
Copy link
Member

nsmith- commented Nov 17, 2021

Another direction to consider here is to leverage the new ability of cloudpickle 2.0 to serialize modules: cloudpipe/cloudpickle#417

@oshadura
Copy link
Member Author

oshadura commented Dec 16, 2021

@Andrew42 check README in https://github.com/oshadura/dask-custom-docker to have some idea how to do a local development until I will manage to deploy coffea-casa in Minikube or e.g.

@Andrew42
Copy link
Contributor

@oshadura and I have been working on setting up a k8s+docker+helm local deployment of dask in an effort to mirror the coffea-casa production environment. After a lot of debugging and adjustments to the helm charts setup I think we've finally got the workers to startup with a nanny process at a host+port that it could correctly broadcast back to the scheduler and which also allows for the nanny process to communicate back and forth with the user dask Client.

So now we're able to utilize NannyPlugin type classes such as the UploadDirectory plugin and we see it correctly sending the packaged directory over to the worker nodes. The immediate next steps from my side are going to be to figure out how to have the plugin register some sort of versioning info with the scheduler, so that it can identify when a local copy of the packaged code differs from the ones that are deployed on the workers and then automatically replace the out-of-date code on the workers.

@Andrew42
Copy link
Contributor

Update: I've made a new plugin called DistributedEnvironmentPlugin which takes a pip-installable directory, zips it up, and installs it on a worker machine. The plugin also includes the ability to add individual files that should be shipped over with the main package. As far as the versioning is concerned, I'm not sure that it is actually needed? From my local tests it looks like the most up-to-date files get sent over each time the plugin gets registered with the client.

There was an issue where the worker would continue to use a cached version of the code, even when the updated files were sent over. My workaround for this was to use restart=True for the NannyPlugin, which stops and starts the worker instance on the machine and clears the old cached code. This did introduce a problem, since the NannyPlugin.setup() method is executed before the new worker instance gets created, which meant that I couldn't have the plugin unpack the 'additional files' and put them in the worker's working directory. Instead I just had these files placed in the nanny working directory, which is anyways where the main pip-installable directory is also located. In order to make the extra files available to the worker process I had to add the nanny working directory to sys.path.

Another issue that came up was a ModuleNotImplemented error, caused by dask not locating the module where the plugin was defined. If I defined the plugin entirely in the Jupyter notebook, everything worked fine. This is because dask would know to pickle the class and make it available on the worker nodes, but when the class is imported via a module, this doesn't occur and results in the above error. The initial attempt to resolve this was to insert the plugin into the distributed.diagnostic.plugin module via a patch, which could be applied when the Docker image is built. This worked well for the scheduler and worker images, but was problematic for the Jupyter image. After a recommendation from Nick Smith, it was decided that we could just package the plugin with coffea-casa directly and accomplish the same thing.

At this point, most of the code is in place and should be ready to be tested, unless anyone has some additional or outstanding concerns that they would like to have addressed first.

Next Steps:

  1. Open a PR to merge the plugin into coffea-casa
  2. Add some CI/CD testing functionality for the plugin
  3. Get it working with a full CMS analysis, e.g. topcoffea or some other available repo

If there any other steps that I should include, please let me know!

@oshadura
Copy link
Member Author

oshadura commented Apr 1, 2022

It will be tested after PR #265 is merged

@7quantumphysics
Copy link

Hello,
I am doing a ttbar analysis on coffea-casa. I have some systematic correction files that need to be called by my processor. In order for the workers to find them, I tried using
`ImportFiles = '/home/cms-jovyan/TTbarAllHadUproot/CorrectionFiles'

if __name__ == "__main__":       
    cluster = CoffeaCasaCluster(
        job_extra = {
            'docker_image': 'coffeateam/coffea-casa-analysis:latest',
            'transfer_input_files': ImportFiles
        }
    )`

, but I have no success, as I continue to get an error that the file was not found
FileNotFoundError: [Errno 2] No such file or directory: 'TTbarAllHadUproot/CorrectionFiles/SFs/bquark/subjet_btagging.json.gz'
I have checked that the file is there, and my processor can be run with an iterative or futures executor without error.

What am I doing wrong?

FYI, I am not writing this code in a notebook, but rather as a py script, in case this changes anything for what course of action I should take.

@Andrew42
Copy link
Contributor

Andrew42 commented Jul 6, 2022

So I think it is going to depend on how exactly your code runs, namely the part that runs on the worker.

In any case, I agree that it doesn't look like transfer_input_files actually moves anything to the worker:

import os
import dask
import pathlib
from dask.distributed import Client
from coffea_casa import CoffeaCasaCluster

directory = "tmp_test"

if not os.path.exists(directory):
    os.makedirs(directory)

with open(pathlib.Path(directory) / "foo.py","w") as f:
    f.write("x = 123")
with open(pathlib.Path(directory) / "bar.py","w") as f:
    f.write("from foo import x\n")
    f.write("print(x)")

cluster = CoffeaCasaCluster(job_extra={'docker_image': "coffeateam/coffea-casa-analysis:latest",'transfer_input_files': "tmp_test"})
host = os.getenv("HOST_IP")
client = Client(f"tls://{host}:8786")

print(client.run(os.listdir))
# >>> ['.bash_logout', '.bashrc', '.profile', 'dask-worker-space', '.conda', '.condor', '.local', 'work']
print(client.run(os.listdir,"dask-worker-space"))
# >>> ['global.lock', 'purge.lock', 'worker-p3calcjl', 'worker-p3calcjl.dirlock']

Now if all you want to do is get a directory onto the worker so that your code can access some static data content, you can try using the dask UploadDirectory plugin:

from distributed.diagnostics.plugin import UploadDirectory
client.register_worker_plugin(UploadDirectory(directory,restart=True,update_path=True),nanny=True)

print(client.run(os.listdir,"dask-worker-space"))
# >>> ['global.lock', 'purge.lock', 'tmp_test', 'worker-p3calcjl', 'worker-p3calcjl.dirlock']
print(client.run(os.listdir,"dask-worker-space/tmp_test"))
# >>> ['bar.py', 'foo.py']

Which should show that your directory exists on the worker machine.

@7quantumphysics
Copy link

7quantumphysics commented Jul 7, 2022

I am trying the UploadDirectory now, but I am still encountering the error. perhaps you can help me debug what I'm doing wrong?

For some context, my processor found at TTbarAllHadUproot/TTbarResProcessor.py is directly dependent on correction files. The first corrections that I am using are TTbarAllHadUproot/CorrectionFiles/SFs/bquark/subjet_btagging.json.gz.

Here is how I defined my client, and check to see that my directory was uploaded:

        cluster = CoffeaCasaCluster(
            job_extra = {
                'docker_image': 'coffeateam/coffea-casa-analysis:latest',
                'transfer_input_files': 'TTbarAllHadUproot/CorrectionFiles/SFs/bquark/'
            }
        )

        client = Client(cluster)

        client.register_worker_plugin(UploadDirectory('TTbarAllHadUproot',restart=True,update_path=True),nanny=True)
        print()
        print(client.run(os.listdir,"dask-worker-space")) #TTbarAllHadUproot Directory should be here
        print(client.run(os.listdir,"dask-worker-space/TTbarAllHadUproot/CorrectionFiles/SFs/bquark/")) #List correction files

After running the job, I am given some information about the cluster, followed by my print statements before preprocessing my dataset.

/opt/conda/lib/python3.8/site-packages/dask_jobqueue/core.py:20: FutureWarning: tmpfile is deprecated and will be removed in a future release. Please use dask.utils.tmpfile instead.
  from distributed.utils import tmpfile
{'job_extra': {'universe': 'docker', 'docker_image': 'coffeateam/coffea-casa-analysis:latest', 'container_service_names': 'dask', 'dask_container_port': 8786, 'transfer_input_files': 'TTbarAllHadUproot/CorrectionFiles/SFs/bquark/', 'encrypt_input_files': '/etc/cmsaf-secrets/ca.pem, /etc/cmsaf-secrets/hostcert.pem, /etc/cmsaf-secrets/xcache_token', 'transfer_output_files': '', 'when_to_transfer_output': 'ON_EXIT', 'should_transfer_files': 'YES', 'Stream_Output': 'False', 'Stream_Error': 'False', '+DaskSchedulerAddress': '"tls://ac-2emalik-2ewilliams-40cern-2ech.dask.coffea.casa:8786"'}, 'protocol': 'tls://', 'security': Security(require_encryption=True, tls_ca_file=Local (/etc/cmsaf-secrets/ca.pem), tls_client_cert=Local (/etc/cmsaf-secrets/hostcert.pem), tls_client_key=Local (/etc/cmsaf-secrets/hostcert.pem), tls_min_version=771, tls_scheduler_cert=Local (/etc/cmsaf-secrets/hostcert.pem), tls_scheduler_key=Local (/etc/cmsaf-secrets/hostcert.pem), tls_worker_cert=Local (/etc/cmsaf-secrets/hostcert.pem), tls_worker_key=Local (/etc/cmsaf-secrets/hostcert.pem)), 'log_directory': 'logs', 'silence_logs': 'DEBUG', 'scheduler_options': {'port': 8786, 'dashboard_address': '8787', 'protocol': 'tls', 'external_address': 'tls://ac-2emalik-2ewilliams-40cern-2ech.dask.coffea.casa:8786'}}
{'tls://ac-2emalik-2ewilliams-40cern-2ech.dask-worker.coffea.casa:8788': ['global.lock', 'purge.lock', 'TTbarAllHadUproot', 'worker-9rdsy2pv', 'worker-9rdsy2pv.dirlock']}
{'tls://ac-2emalik-2ewilliams-40cern-2ech.dask-worker.coffea.casa:8788': ['subjet_deepCSV_106XUL16postVFP_v1.csv', 'DeepCSV_106XUL17SF_V2.csv', 'subjet_btagging.json.gz']}

I can see that my directory, and in turn all of the relevant paths, have been uploaded. However, after preprocessing, the job still fails giving me the following error:

Traceback (most recent call last):       ] | 8% Completed | 45.0sin  0.2s
  File "/opt/conda/lib/python3.8/site-packages/coffea/processor/executor.py", line 1542, in _work_function
    out = processor_instance.process(events)
  File "/home/cms-jovyan/dask-worker-space/TTbarAllHadUproot/TTbarResProcessor.py", line 765, in process
    btag_sf = correctionlib.CorrectionSet.from_file(SF_filename)
  File "/opt/conda/lib/python3.8/site-packages/correctionlib/highlevel.py", line 186, in from_file
    return cls(open_auto(filename))
  File "/opt/conda/lib/python3.8/site-packages/correctionlib/highlevel.py", line 19, in open_auto
    with gzip.open(filename, "rt") as gzfile:
  File "/opt/conda/lib/python3.8/gzip.py", line 58, in open
    binary_file = GzipFile(filename, gz_mode, compresslevel)
  File "/opt/conda/lib/python3.8/gzip.py", line 173, in __init__
    fileobj = self.myfileobj = builtins.open(filename, mode or 'rb')
FileNotFoundError: [Errno 2] No such file or directory: 'TTbarAllHadUproot/CorrectionFiles/SFs/bquark/subjet_btagging.json.gz'

BTW It appears that my client is starting, stopping, and starting again, according to more logs that are output to the terminal after displaying the client information above. The messages are listed below:

2022-07-07 09:05:58,473 - distributed.scheduler - INFO - Clear task state
2022-07-07 09:05:58,475 - distributed.scheduler - INFO -   Scheduler at: tls://192.168.106.21:8786
2022-07-07 09:05:58,476 - distributed.scheduler - INFO -   dashboard at:                     :8787
2022-07-07 09:05:58,572 - distributed.scheduler - INFO - Receive client connection: Client-0419d0d1-fdd4-11ec-8105-92df6efa0dda
2022-07-07 09:05:58,573 - distributed.core - INFO - Starting established connection
2022-07-07 09:06:02,137 - distributed.scheduler - INFO - Register worker <WorkerState 'tls://ac-2emalik-2ewilliams-40cern-2ech.dask-worker.coffea.casa:8788', name: kubernetes-worker-9e845390-113b-4412-adf0-53bd0407b4f6, status: undefined, memory: 0, processing: 0>
2022-07-07 09:06:02,139 - distributed.scheduler - INFO - Starting worker compute stream, tls://ac-2emalik-2ewilliams-40cern-2ech.dask-worker.coffea.casa:8788
2022-07-07 09:06:02,140 - distributed.core - INFO - Starting established connection
2022-07-07 09:06:04,953 - distributed.scheduler - INFO - Remove worker <WorkerState 'tls://ac-2emalik-2ewilliams-40cern-2ech.dask-worker.coffea.casa:8788', name: kubernetes-worker-9e845390-113b-4412-adf0-53bd0407b4f6, status: running, memory: 0, processing: 0>
2022-07-07 09:06:04,953 - distributed.core - INFO - Removing comms to tls://ac-2emalik-2ewilliams-40cern-2ech.dask-worker.coffea.casa:8788
2022-07-07 09:06:04,954 - distributed.scheduler - INFO - Lost all workers
2022-07-07 09:06:07,465 - distributed.scheduler - INFO - Register worker <WorkerState 'tls://ac-2emalik-2ewilliams-40cern-2ech.dask-worker.coffea.casa:8788', name: kubernetes-worker-9e845390-113b-4412-adf0-53bd0407b4f6, status: undefined, memory: 0, processing: 0>
2022-07-07 09:06:07,467 - distributed.scheduler - INFO - Starting worker compute stream, tls://ac-2emalik-2ewilliams-40cern-2ech.dask-worker.coffea.casa:8788
2022-07-07 09:06:07,467 - distributed.core - INFO - Starting established connection

Why does my memory show up as zero (memory: 0), and why does it start, stop and start? Is this because of restart=True in UploadDirectory, and if so, what is the significance of the restart?

@Andrew42
Copy link
Contributor

Andrew42 commented Jul 7, 2022

Is it possible your issue is just a relative path difference? UploadDirectory (which is technically a nanny plugin) uploads the inputs to dask-worker-space, but when your code gets executed on the remote worker, I believe it runs from one directory up. So when your code looks for TTbarAllHadUproot/CorrectionFiles/SFs/bquark/subjet_btagging.json.gz, it can't find it b/c that directory is under dask-worker-space.

I'm not sure why the memory shows up as zero, maybe @oshadura can comment on this? You're also starting up your own CoffeaCasaCluster as part of your runtime script, but typically the way things are supposed to be set up is that you have a scheduler started automatically for you which runs in the background and you connect to it via the dask Client API and the scheduler then goes out and requests workers to do w/e jobs need doing.

The restart=True bit might not be needed in your case since you're starting up your cluster as part of your script, but typically when the scheduler is running independent of your script what happens is that if you re-run your script and re-register the plugin, the scheduler will see that it has already been registered with the worker you had from your previous running and therefore it won't try to re-upload the content. In your case, this might not be a big deal as you might not be making changes to your SF files on a regular basis, but if you did make a change it wouldn't get propagated to any of the workers that were already connected and to make things even more confusing, the new changes would show up in any new workers that connect to your job. So the restart=True option is to force your workers (rather the supervising nanny process) to start with a clean slate.

Suffice it to say that all this headache with remote file placement is a dask issue and one we're actively trying to get ironed out as it presents a lot of potential problems for many analyses if they weren't designed with this in mind (as you seem to be finding out... sorry!)

@7quantumphysics
Copy link

7quantumphysics commented Jul 8, 2022

YES! After uploading the directory the code does run an extra directory up, namely dask-worker-space. All I had to do was specify the path of the file with the dask space preceding the location. My processor can now use the correction files.

You're also starting up your own CoffeaCasaCluster as part of your runtime script, but typically the way things are supposed to be set up is that you have a scheduler started automatically for you which runs in the background and you connect to it via the dask Client API and the scheduler then goes out and requests workers to do w/e jobs need doing.

OK. I see now that when I use UploadDirectory specifically, there's no need to also specify that directory in the cluster manually via transfer_input_files. I went back to using the automatic scheduler.

Thanks!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Development

No branches or pull requests

6 participants