Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added StartupToken to idenitfy a process at startup #19014

Merged
merged 38 commits into from
Oct 15, 2021

Conversation

czgdp1807
Copy link
Contributor

@czgdp1807 czgdp1807 commented Sep 30, 2021

Why are these changes needed?

The changes in this PR introduce StartupToken to uniquely identify a process at startup. This helps in avoiding conflicts due to differing PIDs on Windows due to wrapper processes. For more details please refer #18951 (comment)

After this PR, ray.init() completes without hanging in virtual and conda environments.

Related issue number

Closes #18951
Closes #18952
#15970

Checks

  • I've run scripts/format.sh to lint the changes in this PR.
  • I've included any doc changes needed for https://docs.ray.io/en/master/.
  • I've made sure the tests are passing. Note that there might be a few flaky tests, see the recent failures at https://flakey-tests.ray.io/
  • Testing Strategy
    • Unit tests
    • Release tests
    • This PR is not tested :(

cc: @dharhas @mattip

@czgdp1807
Copy link
Contributor Author

This branch builds successfully and behaving as desired with ray.init() on my Windows system. However it is failing on Github Actions. Anyways, I will be testing it tomorrow locally and will update here. Meanwhile if you have any comments/feedbacks please let me know. Thanks.

@simon-mo simon-mo marked this pull request as ready for review September 30, 2021 22:54
@simon-mo
Copy link
Contributor

opps sorry I misclicked, please adjust to draft PR if not intended.

@@ -186,6 +187,7 @@ struct CoreWorkerOptions {
int runtime_env_hash;
/// The PID of the process for setup worker runtime env.
pid_t worker_shim_pid;
StartupToken startup_token;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add a comment above describing what this does :)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will add documentation comments at all the necessary places once I complete testing. There are some internal API changes so tests are required to be updated. Thanks.

@@ -27,6 +27,12 @@
#include "ray/rpc/worker/core_worker_client.h"
#include "ray/util/process.h"

#ifdef _WIN32
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should get rid of this typedef. Why not make it int64_t everywhere?

@@ -370,7 +371,11 @@ Process WorkerPool::StartWorkerProcess(
<< " worker(s) with pid " << proc.GetId();
MonitorStartingWorkerProcess(proc, language, worker_type);
state.starting_worker_processes.emplace(
proc, StartingWorkerProcessInfo{workers_to_start, workers_to_start, worker_type});
startup_token_,
std::make_pair(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you put startup_token into the StartingWorkerProcessInfo? Then we don't need an additional level of indirections :)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The core of the fix is using startup_token as key. I can additionally add it in StartingWorkerProcessInfo and make necessary changes else where.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah sorry, I meant put proc into StartingWorkerProcessInfo and get rid of the std::make_pair (startup_token should be the key yes). It mainly seems odd to use both std::make_pair here and also have the struct. This will simplify the code elsewhere because then you don't need the additional first and second calls.

@@ -38,6 +39,12 @@ namespace ray {

namespace raylet {

#ifdef _WIN32
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This shouldn't be needed since it is already in src/ray/raylet/worker.h. Just make sure it is in the right namespace there.

@@ -29,6 +29,12 @@
#include "src/ray/protobuf/common.pb.h"
#include "src/ray/protobuf/gcs.pb.h"

#ifdef _WIN32
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually maybe put it in ray/util/process.h that should be included by everybody who needs it :)

@pcmoritz
Copy link
Contributor

pcmoritz commented Oct 1, 2021

Thanks a lot for fixing this @czgdp1807 :)

Can you shepherd this through the process (another round of reviews and making sure tests are fixed etc) @wuisawesome ?

@czgdp1807
Copy link
Contributor Author

czgdp1807 commented Oct 3, 2021

Following tests fail on my branch,

FAIL: //:gcs_server_rpc_test 
FAIL: //:core_worker_test 
FAIL: //:event_test 
FAIL: //:gcs_server_test

The above tests fail on master as well with similar logs. So, I think that these failures are unrelated to my changes. I will add the documentation now. Some python tests failing on this branch are flaky I think. They end with RedisConnectionError. Moreover, this seems to be unrelated because if there would have been any error due to API changes then the syntax/semantic error should have been received from Python.

I have added documentation at necessary places in the diff. This PR is ready from my side. Please let me know if anything else is to be done. cc: @pcmoritz @wuisawesome @dharhas

@wuisawesome
Copy link
Contributor

@czgdp1807 it looks like the build is failing on all non-windows platforms right now. Can you fix the initialization order?

Use --sandbox_debug to see verbose messages from the sandbox
--
  | In file included from src/ray/raylet/worker_pool.cc:15:
  | bazel-out/k8-opt/bin/_virtual_includes/raylet_lib/ray/raylet/worker_pool.h: In constructor ‘ray::raylet::WorkerPool::WorkerPool(instrumented_io_context&, ray::NodeID, std::string, int, int, int, int, int, const std::vector<int>&, std::shared_ptr<ray::gcs::GcsClient>, const WorkerCommandMap&, std::function<void()>, int, std::function<double()>)’:
  | bazel-out/k8-opt/bin/_virtual_includes/raylet_lib/ray/raylet/worker_pool.h:608:21: error: ‘ray::raylet::WorkerPool::node_address_’ will be initialized after [-Werror=reorder]
  | 608 \|   const std::string node_address_;
  | \|                     ^~~~~~~~~~~~~
  | bazel-out/k8-opt/bin/_virtual_includes/raylet_lib/ray/raylet/worker_pool.h:141:16: error:   ‘ray::StartupToken ray::raylet::WorkerPool::startup_token_’ [-Werror=reorder]
  | 141 \|   StartupToken startup_token_;
  | \|                ^~~~~~~~~~~~~~
  | src/ray/raylet/worker_pool.cc:60:1: error:   when initialized here [-Werror=reorder]
  | 60 \| WorkerPool::WorkerPool(instrumented_io_context &io_service, const NodeID node_id,
  | \| ^~~~~~~~~~
  | cc1plus: all warnings being treated as errors

@@ -186,6 +187,9 @@ struct CoreWorkerOptions {
int runtime_env_hash;
/// The PID of the process for setup worker runtime env.
pid_t worker_shim_pid;
/// The startup token of the process assgined to it
/// during startup via command line arguments.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
/// during startup via command line arguments.
/// during startup via command line arguments. This is needed because the actual core worker process may not have the same pid as the process the worker pool starts (due to shim processes).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should I change this everywhere or doing it only once here would suffice?

@czgdp1807
Copy link
Contributor Author

I have made the change @wuisawesome. Thanks for pointing out.

@scv119 scv119 self-assigned this Oct 14, 2021
@@ -132,6 +133,12 @@ class Worker;
/// The WorkerPool is responsible for managing a pool of Workers. Each Worker
/// is a container for a unit of work.
class WorkerPool : public WorkerPoolInterface, public IOWorkerPoolInterface {
protected:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

any reason this should not be private? also as a general guideline, this should be declared after function declaration.

Copy link
Contributor Author

@czgdp1807 czgdp1807 Oct 14, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need this value in WorkerPoolMock (inherits WorkerPool) in worker_pool_test.cc. The other way to expose this member is via a protected function in WorkerPool which can be called by WorkerPoolMock. I do not want to expose startup_token via public method. Making a protected data member appeared simpler to me instead of making a protected function and then using it everywhere else. Let me know if you have other ideas.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sounds reasonable.

/// Gloabl startup token variable. Incremented once assigned
/// to a worker process and is added to
/// state.starting_worker_processes.
StartupToken startup_token_;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: worker_startup_token_counter_;
startup_token_ is a bit confusing.

@@ -559,9 +575,10 @@ Status WorkerPool::RegisterWorker(const std::shared_ptr<WorkerInterface> &worker
void WorkerPool::OnWorkerStarted(const std::shared_ptr<WorkerInterface> &worker) {
auto &state = GetStateForLanguage(worker->GetLanguage());
const auto &shim_process = worker->GetShimProcess();
const StartupToken &worker_startup_token = worker->GetStartupToken();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

doesn't need to be reference.

@@ -473,7 +484,8 @@ class WorkerPool : public WorkerPoolInterface, public IOWorkerPoolInterface {
/// A map from the pids of this shim processes to the extra information of
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this comment need to be updated.

@@ -516,6 +525,7 @@ Status WorkerPool::RegisterWorker(const std::shared_ptr<WorkerInterface> &worker
}
auto process = Process::FromPid(pid);
worker->SetProcess(process);
worker->SetStartupToken(worker_startup_token);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if we can move

  worker->SetProcess(process);
  worker->SetStartupToken(worker_startup_token);

to worker's constructor and make them const member variables we could just get rid of a category of errors

Copy link
Contributor

@scv119 scv119 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks a lot for helping out on the Windows issues!

/// This is needed because the actual core worker process
/// may not have the same pid as the process the worker pool
/// starts (due to shim processes).
StartupToken startup_token{-1};
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is there any special meaning for -1? maybe worth documenting if that's the case.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The startup tokens usually start from 0. So, set it to -1 so that first process created gets 0 as the startup token.

proc, StartingWorkerProcessInfo{workers_to_start, workers_to_start, worker_type});
startup_token_,
StartingWorkerProcessInfo{workers_to_start, workers_to_start, worker_type, proc});
startup_token_ += 1;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

imo this is a bit fragile... maybe have a helper function called get_next_worker_startup_token

@@ -360,6 +366,12 @@ Process WorkerPool::StartWorkerProcess(
env.insert({"SPT_NOENV", "1"});
}

if (language == Language::PYTHON) {
worker_command_args.push_back("--startup-token=" + std::to_string(startup_token_));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

a bit sad we have these slight difference between different languanges

Copy link
Contributor

@scv119 scv119 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

overall looks good to me. @wuisawesome takes another look before we merge this? :)

void Worker::SetProcess(Process proc) {
RAY_CHECK(proc_.IsNull()); // this procedure should not be called multiple times
proc_ = std::move(proc);
}

void Worker::SetStartupToken(StartupToken startup_token) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this only used for test? we can make it private and use FRIEND_TEST/friend to access it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah! right. This can be protected as it isn't required to be exposed publicly. What do you say?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sgtm :)

@pcmoritz pcmoritz merged commit d226cbf into ray-project:master Oct 15, 2021
@czgdp1807 czgdp1807 deleted the fix_ray_init branch October 15, 2021 22:15
@davidberenstein1957
Copy link

davidberenstein1957 commented Oct 25, 2021

@pcmoritz
This still seems to result in some process id issues for the agent_manager.
After deployment of the ray cluster locally within a poetry environment.
Example:
import ray import os ray.init(num_cpus=os.cpu_count(), namespace=namespace, include_dashboard=False) serve.start(detached=False)
Error:
(pid=None) [2021-10-25 12:41:57,798 E 5088 10652] agent_manager.cc:90: [ Event 19616925b8e76a75abc202154fcbf045c675 {"ip":"10.110.71.210","node_id":"84a2e03c8733f7ad48b0aa4e7a84911cd937d785fa7c829175cfd08d","pid":1148} ] Agent process with pid 3464 has not registered, restart it.

@czgdp1807
Copy link
Contributor Author

Hi @davidberenstein1957. I never saw this error before. Can I try re-producing it on my system and investigate if it's somehow related to the changes we have made here. AFAIK, the changes here only deal with worker processes and not agent processes. There are two things that I will do for re-producing this error - i) check on master build and ii) on some previous version before this PR to make sure if this is a result of any ripple effects. In any case, I am happy to investigate it further. Let me know @pcmoritz @davidberenstein1957

@davidberenstein1957
Copy link

davidberenstein1957 commented Oct 25, 2021

@czgdp1807
For the entire reproduction, see the following environment and script.
ENVIRONMENT

[tool.poetry]
name = "pandora.nlp.gpt"
version = "0.1.0"
description = ""
authors = ["David Berenstein"]

[tool.poetry.dependencies]
python = "3.8.2"
fastapi = "0.62.0"
spacy = "2.3.2"
spacy-langdetect = "0.1.2"
torch = "1.8.1"
fast-bert = "^1.9.8"
uvicorn = "^0.13.4"
pytz = "2020.5"
scikit-learn = "^0.24.2"
transformers = "^4.9.2"

[tool.poetry.dev-dependencies]
flake8 = "^3.9.2"
black = {version = "^21.5b1", allow-prereleases = true}
autopep8 = "^1.5.7"

[build-system]
requires = ["poetry-core>=1.0.0"]
build-backend = "poetry.core.masonry.api"

SCRIPT:

from fastapi import FastAPI
from ray import serve
import ray
import os
from starlette.middleware.cors import CORSMiddleware
from transformers import pipeline

app = FastAPI(
    title="GPT",
    description="GPT API - includes classification endpoints.",
    version="1.0.0",
    docs_url="/",
)

ray.init(num_cpus=os.cpu_count()-1, namespace=namespace, include_dashboard=False)
serve.start(detached=detached)

origins = [
    "http://localhost",
    "http://localhost:88",
]

@serve.deployment(num_replicas=1, max_concurrent_queries=100, ray_actor_options={'num_cpus': 0.25, 'num_gpus': 0, 'memory':1024*1024*1000})
class GPT2:
        def __init__(self):
            self.nlp_model = pipeline("text-generation", model="gpt2")
        async def __call__(self, request):
            return self.nlp_model(request.data, max_length=50)

GPT2..options(
    name='gpt2_test'
).deploy()

app.add_middleware(
    CORSMiddleware,
    allow_origins=origins,
    allow_credentials=True,
    allow_methods=["*"],
    allow_headers=["*"],
)

@main_router.post("/predict", tags=["Model"])
async def predict(query:str):
    handle = serve.get_deployment(name).get_handle(sync=False)
    return ray.get(await handle.remote(**kwargs))

RUN APP
``
uvicorn main:app --port 88

@davidberenstein1957
Copy link

davidberenstein1957 commented Oct 26, 2021

I get a comparable message when enabling GPU without full 'ray[default]' installment, even though I disabled dashboard usage. Same code as above, but with a initialization with GPU.

ray.init(num_cpus=os.cpu_count()-1, num_gpus=1, namespace=namespace, include_dashboard=False)
  @serve.deployment(num_replicas=1, max_concurrent_queries=100, 
    ray_actor_options={'num_cpus': 0.25, 'num_gpus': 1, 'memory':1024*1024*1000})
(pid=None) C:\Users\david\Documents\programming\Pandora.NLP.Classification\.venv\lib\site-packages
\ray\dashboard\modules\reporter\reporter_agent.py:37: 
    UserWarning: `gpustat` package is not installed. GPU monitoring is not available. 
    To have full functionality of the dashboard please install `pip install ray[default]`.)

@mattip
Copy link
Contributor

mattip commented Oct 26, 2021

I reformatted your error message to make the error clearer. You should install gpustat >= 1.0.0b1 for windows.

@czgdp1807

This comment has been minimized.

@pcmoritz
Copy link
Contributor

pcmoritz commented Nov 12, 2021

@davidberenstein1957 Thanks for reporting! I just went through your script. There were a few typos, but after fixing them (see attached), on my machine the problem was #19948, and after running pip install aiohttp==3.7 the error message was gone. We have a final fix for this in #20261, which will be part of the next release!

In the case that only ray and not ray[default] is installed, you will still get the UserWarning but that's expected :)

Glad to know the issue is unrelated to this PR though (which makes sense, since the PID of the agent is actually handled correctly and doesn't suffer from the issue that was fixed in this PR, as it calls os.pid() directly in the relevant process instead of trying to get the PID of the child).

For anybody curious how the aiohttp issue could cause this: Basically ray._private.utils.check_dashboard_dependencies_installed() returns False even if ray[default] is installed because of aiohttp.signals is not found.

Please let me know if the problem persists for you even after pinning aiohttp!

from fastapi import FastAPI
from ray import serve
import ray
import os
from starlette.middleware.cors import CORSMiddleware
from transformers import pipeline
 
app = FastAPI(
    title="GPT",
    description="GPT API - includes classification endpoints.",
    version="1.0.0",
    docs_url="/",
)
 
ray.init(num_cpus=os.cpu_count()-1, namespace="test", include_dashboard=False)
serve.start(detached=False)
 
origins = [
    "http://localhost",
    "http://localhost:88",
]
 
@serve.deployment(num_replicas=1, max_concurrent_queries=100, ray_actor_options={'num_cpus': 0.25, 'num_gpus': 0, 'memory':1024*1024*1000})
class GPT2:
        def __init__(self):
            self.nlp_model = pipeline("text-generation", model="gpt2")
        async def __call__(self, request):
            return self.nlp_model(request, max_length=50)
 
GPT2.options(
    name='gpt2_test'
).deploy()
 
app.add_middleware(
    CORSMiddleware,
    allow_origins=origins,
    allow_credentials=True,
    allow_methods=["*"],
    allow_headers=["*"],
)
 
@app.post("/predict", tags=["Model"])
async def predict(query:str):
    handle = serve.get_deployment("gpt2_test").get_handle(sync=False)
    return ray.get(await handle.remote(query))

pcmoritz pushed a commit that referenced this pull request Dec 13, 2021
PR #19014 introduced the idea of a StartupToken to uniquely identify a worker via a counter. This PR:
- returns the Process and the StartupToken from StartWorkerProcess (previously only Process was returned)
- Change the starting_workers_to_tasks map to index via the StartupToken, which seems to fix the windows failures.
- Unskip the windows tests in test_basic_2.py
It seems once a fix to PR #18167 goes in, the starting_workers_to_tasks map will be removed, which should remove the need for the changes to StartWorkerProcess made in this PR.
@czgdp1807 czgdp1807 restored the fix_ray_init branch February 24, 2022 09:54
@czgdp1807 czgdp1807 deleted the fix_ray_init branch May 7, 2022 06:30
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
10 participants