Skip to content

Commit

Permalink
Flip naming convention for ParallelAction consts and improve them
Browse files Browse the repository at this point in the history
It's actually more natural to name these actions after initiating side - when we're writing to the output, we should use action related to the place where we're writing, and when we're reading the message, action should point to the callee.

See: #7777 (comment)
  • Loading branch information
Wirone committed May 7, 2024
1 parent 56337a6 commit 5b4c33b
Show file tree
Hide file tree
Showing 4 changed files with 28 additions and 28 deletions.
12 changes: 6 additions & 6 deletions src/Console/Command/WorkerCommand.php
Original file line number Diff line number Diff line change
Expand Up @@ -118,11 +118,11 @@ function (ConnectionInterface $connection) use ($loop, $runner, $identifier): vo
$in = new Decoder($connection, true, 512, $jsonInvalidUtf8Ignore);

// [REACT] Initialise connection with the parallelisation operator
$out->write(['action' => ParallelAction::RUNNER_HELLO, 'identifier' => $identifier]);
$out->write(['action' => ParallelAction::WORKER_HELLO, 'identifier' => $identifier]);

$handleError = static function (\Throwable $error) use ($out): void {
$out->write([
'action' => ParallelAction::RUNNER_ERROR_REPORT,
'action' => ParallelAction::WORKER_ERROR_REPORT,
'message' => $error->getMessage(),
'file' => $error->getFile(),
'line' => $error->getLine(),
Expand All @@ -138,13 +138,13 @@ function (ConnectionInterface $connection) use ($loop, $runner, $identifier): vo
$action = $json['action'] ?? null;

// Parallelisation operator does not have more to do, let's close the connection
if (ParallelAction::WORKER_THANK_YOU === $action) {
if (ParallelAction::RUNNER_THANK_YOU === $action) {
$loop->stop();

return;
}

if (ParallelAction::WORKER_RUN !== $action) {
if (ParallelAction::RUNNER_REQUEST_ANALYSIS !== $action) {
// At this point we only expect analysis requests, if any other action happen, we need to fix the code.
throw new \LogicException(sprintf('Unexpected action ParallelAction::%s.', $action));
}
Expand All @@ -167,7 +167,7 @@ function (ConnectionInterface $connection) use ($loop, $runner, $identifier): vo
}

$out->write([
'action' => ParallelAction::RUNNER_RESULT,
'action' => ParallelAction::WORKER_RESULT,
'file' => $absolutePath,
'fileHash' => $this->events[0]->getFileHash(),
'status' => $this->events[0]->getStatus(),
Expand All @@ -177,7 +177,7 @@ function (ConnectionInterface $connection) use ($loop, $runner, $identifier): vo
}

// Request another file chunk (if available, the parallelisation operator will request new "run" action)
$out->write(['action' => ParallelAction::RUNNER_GET_FILE_CHUNK]);
$out->write(['action' => ParallelAction::WORKER_GET_FILE_CHUNK]);
});
},
static function (\Throwable $error) use ($errorOutput): void {
Expand Down
16 changes: 8 additions & 8 deletions src/Runner/Parallel/ParallelAction.php
Original file line number Diff line number Diff line change
Expand Up @@ -21,15 +21,15 @@
*/
final class ParallelAction
{
// Actions handled by the runner
public const RUNNER_ERROR_REPORT = 'errorReport';
public const RUNNER_HELLO = 'hello';
public const RUNNER_RESULT = 'result';
public const RUNNER_GET_FILE_CHUNK = 'getFileChunk';
// Actions executed by the runner (main process)
public const RUNNER_REQUEST_ANALYSIS = 'requestAnalysis';
public const RUNNER_THANK_YOU = 'thankYou';

// Actions handled by the worker
public const WORKER_RUN = 'run';
public const WORKER_THANK_YOU = 'thankYou';
// Actions executed by the worker
public const WORKER_ERROR_REPORT = 'errorReport';
public const WORKER_GET_FILE_CHUNK = 'getFileChunk';
public const WORKER_HELLO = 'hello';
public const WORKER_RESULT = 'result';

private function __construct() {}
}
16 changes: 8 additions & 8 deletions src/Runner/Runner.php
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,7 @@ private function fixParallel(): array

// [REACT] Bind connection when worker's process requests "hello" action (enables 2-way communication)
$decoder->on('data', static function (array $data) use ($processPool, $getFileChunk, $decoder, $encoder): void {
if (ParallelAction::RUNNER_HELLO !== $data['action']) {
if (ParallelAction::WORKER_HELLO !== $data['action']) {
return;
}

Expand All @@ -209,13 +209,13 @@ private function fixParallel(): array
$fileChunk = $getFileChunk();

if (0 === \count($fileChunk)) {
$process->request(['action' => ParallelAction::WORKER_THANK_YOU]);
$process->request(['action' => ParallelAction::RUNNER_THANK_YOU]);
$processPool->endProcessIfKnown($identifier);

return;
}

$process->request(['action' => ParallelAction::WORKER_RUN, 'files' => $fileChunk]);
$process->request(['action' => ParallelAction::RUNNER_REQUEST_ANALYSIS, 'files' => $fileChunk]);
});
});

Expand Down Expand Up @@ -243,7 +243,7 @@ private function fixParallel(): array
// [REACT] Handle workers' responses (multiple actions possible)
function (array $workerResponse) use ($processPool, $process, $identifier, $getFileChunk, &$changed): void {
// File analysis result (we want close-to-realtime progress with frequent cache savings)
if (ParallelAction::RUNNER_RESULT === $workerResponse['action']) {
if (ParallelAction::WORKER_RESULT === $workerResponse['action']) {
$fileAbsolutePath = $workerResponse['file'];
$fileRelativePath = $this->directory->getRelativePathTo($fileAbsolutePath);

Expand Down Expand Up @@ -274,23 +274,23 @@ function (array $workerResponse) use ($processPool, $process, $identifier, $getF
return;
}

if (ParallelAction::RUNNER_GET_FILE_CHUNK === $workerResponse['action']) {
if (ParallelAction::WORKER_GET_FILE_CHUNK === $workerResponse['action']) {
// Request another chunk of files, if still available
$fileChunk = $getFileChunk();

if (0 === \count($fileChunk)) {
$process->request(['action' => ParallelAction::WORKER_THANK_YOU]);
$process->request(['action' => ParallelAction::RUNNER_THANK_YOU]);
$processPool->endProcessIfKnown($identifier);

return;
}

$process->request(['action' => ParallelAction::WORKER_RUN, 'files' => $fileChunk]);
$process->request(['action' => ParallelAction::RUNNER_REQUEST_ANALYSIS, 'files' => $fileChunk]);

return;
}

if (ParallelAction::RUNNER_ERROR_REPORT === $workerResponse['action']) {
if (ParallelAction::WORKER_ERROR_REPORT === $workerResponse['action']) {
$this->errorsManager->reportWorkerError(new WorkerError(
$workerResponse['message'],
$workerResponse['file'],
Expand Down
12 changes: 6 additions & 6 deletions tests/Console/Command/WorkerCommandTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -127,16 +127,16 @@ static function (array $data) use ($encoder, &$workerScope): void {
$workerScope['messages'][] = $data;
$ds = \DIRECTORY_SEPARATOR;

if (ParallelAction::RUNNER_HELLO === $data['action']) {
$encoder->write(['action' => ParallelAction::WORKER_RUN, 'files' => [
if (ParallelAction::WORKER_HELLO === $data['action']) {
$encoder->write(['action' => ParallelAction::RUNNER_REQUEST_ANALYSIS, 'files' => [
realpath(__DIR__.$ds.'..'.$ds.'..').$ds.'Fixtures'.$ds.'FixerTest'.$ds.'fix'.$ds.'somefile.php',
]]);

return;
}

if (3 === \count($workerScope['messages'])) {
$encoder->write(['action' => ParallelAction::WORKER_THANK_YOU]);
$encoder->write(['action' => ParallelAction::RUNNER_THANK_YOU]);
}
}
);
Expand All @@ -152,10 +152,10 @@ static function (array $data) use ($encoder, &$workerScope): void {

self::assertSame(Command::SUCCESS, $process->getExitCode());
self::assertCount(3, $workerScope['messages']);
self::assertSame(ParallelAction::RUNNER_HELLO, $workerScope['messages'][0]['action']);
self::assertSame(ParallelAction::RUNNER_RESULT, $workerScope['messages'][1]['action']);
self::assertSame(ParallelAction::WORKER_HELLO, $workerScope['messages'][0]['action']);
self::assertSame(ParallelAction::WORKER_RESULT, $workerScope['messages'][1]['action']);
self::assertSame(FixerFileProcessedEvent::STATUS_FIXED, $workerScope['messages'][1]['status']);
self::assertSame(ParallelAction::RUNNER_GET_FILE_CHUNK, $workerScope['messages'][2]['action']);
self::assertSame(ParallelAction::WORKER_GET_FILE_CHUNK, $workerScope['messages'][2]['action']);

$server->close();
}
Expand Down

0 comments on commit 5b4c33b

Please sign in to comment.