Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add computation state cache, config loaders, and status pages #31133

Merged
merged 7 commits into from
May 14, 2024

Conversation

m-trieu
Copy link
Contributor

@m-trieu m-trieu commented Apr 30, 2024

Add computation state cache and config loaders that load the computation state cache
move status pages out of StreamingDataflowWorker file

R: @scwhittle

Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:

  • Mention the appropriate issue in your description (for example: addresses #123), if applicable. This will automatically add a link to the pull request in the issue. If you would like the issue to automatically close on merging the pull request, comment fixes #<ISSUE NUMBER> instead.
  • Update CHANGES.md with noteworthy changes.
  • If this contribution is large, please file an Apache Individual Contributor License Agreement.

See the Contributor Guide for more tips on how to make review process smoother.

To check the build health, please visit https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md

GitHub Actions Tests Status (on master branch)

Build python source distribution and wheels
Python tests
Java tests
Go tests

See CI.md for more information about GitHub Actions CI or the workflows README to see a list of phrases to trigger workflows.

Copy link
Contributor

Checks are failing. Will not request review until checks are succeeding. If you'd like to override that behavior, comment assign set of reviewers

Copy link
Contributor

@scwhittle scwhittle left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some initial comments, didn't get through everything

sampler,
metricTrackingWindmillServer::refreshActiveWork,
executorSupplier.apply("RefreshWork"));

WorkerStatusPages workerStatusPages =
WorkerStatusPages.create(DEFAULT_STATUS_PORT, memoryMonitor, () -> true);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can this last parameter be made optional via overload? or at least add a comment here

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done. overloaded

WorkerStatusPages.create(DEFAULT_STATUS_PORT, memoryMonitor, () -> true);
this.statusPages =
windmillServiceEnabled
? StreamingWorkerStatusPages.forStreamingEngine(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

would a builder pattern be better than the separate forStreamingEngine and forAppliance factory methods?

There seem to be a lot of common things.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

WindmillServerStub windmillServer = createWindmillServerStub(options, windmillStreamFactory);
ComputationConfig.Fetcher configFetcher =
options.isEnableStreamingEngine()
? StreamingEngineConfigFetcher.forTesting(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

forTesting isn't a good name if this is in the regular path

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

removed this was supposed to just use the create method

ComputationConfig.Fetcher configFetcher =
options.isEnableStreamingEngine()
? StreamingEngineConfigFetcher.forTesting(
true,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

rm param if hard-coded

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it is used in a different test

.setWindmillServiceEndpoints(ImmutableSet.of());
}

public static StreamingPipelineConfig forAppliance(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we just get rid of this and have the appliance config use builder() and set these?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

maxWorkItemCommitBytes))
: new StreamingApplianceConfigFetcher(
windmillServer,
config -> consumeUserStepToStateFamilyName(config, stateNameMap),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

seems like the consuming the config method could be the same for both SE and appliance, where unset fields are just ignored

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

removed

ComputationStateCache computationStateCache =
ComputationStateCache.create(
configFetcher, workExecutor, windmillStateCache::forComputation);
if (windmillServer instanceof GrpcWindmillServer) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this cast and lazy initialization is kind of gross

one idea could be to remove the windmillserver from the appliance config fetcher, it could just construct it's own channel and sync stub directly.
and then have a separate start method on the configfetcher taking the function to consume responses. Then you can make sure to call that after everything is initialized.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sgtm agreed

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i think the issue would be if it is using the WindmillServerBase implementation of WindmillServerStub, which uses JNI

i will try to find a way around this

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i actually realized that appliance does not refresh heartbeats which is what that consumer of heartbeat responses is for.

we only call refresh active work on get data stream if streaming engine is enabled.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done
i changed the way this was created, still not super clean but much better than before.

I wonder if we can make GrpcWindmillServer not implement WindmillServerStub. Not sure it currently makes sense to group together Appliance and Engine client implementations and leads to wonky situtations like above.

@m-trieu
Copy link
Contributor Author

m-trieu commented May 1, 2024

failing test is unrelated and passes on local runs

.collect(Collectors.toList()),
workExecutor,
stateCache::forComputation);
computationStateCacheRef.set(computationStateCache);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how about a VisibleForTesting annotated method to access the ComputationStateCache instead? Seems convoluted to pass in atomic ref to assign to.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

60,
TimeUnit.SECONDS);
scheduledExecutors.add(statusPageTimer);
}
workCommitter.start();
workerStatusReporter.start();
activeWorkRefresher.start();
}

public void startStatusPages() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can this be inlined into start() and removed?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

today we don't start them for tests (same across batch and streaming harnesses)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add a comment that this may be omitted for lighterweight testing?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added

Collection<LatencyAttribution> getWorkStreamLatencies) ->
computationStateCache
.get(computation)
.ifPresent(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what happened before on missing computation? In either case it seems like we should throw exception or log error as otherwise we're just dropping the work item silently which will be confusing.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added more logging in the ComputationStateCache

  public Optional<ComputationState> get(String computationId) {
    try {
      return Optional.ofNullable(computationCache.get(computationId));
    } catch (ExecutionException | ComputationStateNotFoundException e) {
      if (e.getCause() instanceof ComputationStateNotFoundException) {
        LOG.error(
            "Trying to fetch unknown computation={}, known computations are {}.",
            computationId,
            getAllComputationIds());
      } else {
        LOG.warn("Error occurred fetching computation for computationId={}", computationId, e);
      }
    }

/** Fetches computation config from Streaming Appliance. */
@Internal
@ThreadSafe
public final class StreamingApplianceConfigFetcher implements ComputationConfig.Fetcher {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should this be StreamingApplianceComputationConfigFetcher? ditto with SE

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

Consumer<StreamingPipelineConfig> onPipelineConfig,
Function<MapTask, MapTask> fixMapTaskMultiOutputInfoFn) {
this.windmillServer = windmillServer;
this.onPipelineConfig = onPipelineConfig;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

seems odd to have this listener hidden in the fetcher

Maybe instead the listener should be on whatever is driving the fetching. I think that woudl be the cache.

Additionally I think that the FIX_MULTI_OUTPUT_INFOS_ON_PAR_DO_INSTRUCTIONS could move into the cache where fetches are being performed. As is it is injected into all teh fetchers and applied to the testing generated configs. That would let you just do it once and possibly coudl remove from StreamingDataflowWorker to the cache

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done and moved

changed it to be a member instead of static since we need the static global id generator from StreamingDataflowWorker.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

@m-trieu m-trieu force-pushed the mt-computation-state-cache branch from 3581abd to f30767c Compare May 3, 2024 17:38
@m-trieu m-trieu force-pushed the mt-computation-state-cache branch from f30767c to f7f2481 Compare May 4, 2024 05:52
60,
TimeUnit.SECONDS);
scheduledExecutors.add(statusPageTimer);
}
workCommitter.start();
workerStatusReporter.start();
activeWorkRefresher.start();
}

public void startStatusPages() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add a comment that this may be omitted for lighterweight testing?

!computationState.getTransformUserNameToStateFamily().isEmpty()
? computationState.getTransformUserNameToStateFamily()
: stateNameMap,
// !computationState.getTransformUserNameToStateFamily().isEmpty()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

rm comments

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

removed

ConcurrentMap<String, String> stateNameMap) {
Function<MapTask, MapTask> fixMultiOutputInfosOnParDoInstructions =
new FixMultiOutputInfosOnParDoInstructions(idGenerator);
return new ComputationStateCache(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could this instead just use create and then poke in to add initial state name map values?
via
cache = create(...);
cache.stateNameMap.addAll(stateNameMap);

It's a fair bit of setup to duplicate and it coudl drift meaning we're not testing the main code-path.

Or you could further reduce duplication of the parameters the create/forTesting methods and just expose a visiblefortesting method for the statenamemap and the current caller of this method could use that directly.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

}

/** Returns a read-only view of all computations. */
public ImmutableList<ComputationState> getAllComputations() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should this be getAllPresentComputations?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

"computationId is empty. Cannot fetch computation config without a computationId.");

GetConfigResponse response =
windmillServer.getConfig(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

instead of taking in WindmillServer can you take in a functional interface matching this method?

That will help show that the full class isn't used and could simplify testing. It could help in the future break up windmillServer if we get rid of the jni class keeping it all tied together ATM.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done


default void stop() {}

Optional<ComputationConfig> getConfig(String computationId);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's call this fetchConfig, it makes it clearer that it is likely an rpc (and matches class name)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

private Optional<StreamingEnginePipelineConfig> getComputationConfigInternal(
String computationId) {
Optional<StreamingEnginePipelineConfig> streamingConfig = getConfigInternal(computationId);
streamingConfig.ifPresent(onStreamingConfig);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we remove this listening from the computation config fetching? It seems like it should just be the global config from the background thread that runs onStreamingConfig.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

LOG.info("Initial global configuration received, harness is now ready");
}

private Optional<StreamingEnginePipelineConfig> getComputationConfigInternal(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this should just return the computationconfig not full StreamingEnginePipelineConfig. I think that StreamingEnginePipelineConfig can have the computation config removed from it, as we don't care about listening to computations as they are fetched.

That will then make it clearer that we only want the listener to trigger on the periodic background fetching, not the computation fetching driven by the cache.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done


public abstract Map<String, String> userStepToStateFamilyNameMap();

public abstract Optional<StreamingComputationConfig> computationConfig();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

see other comment, I think this class should just be for the global config, not related to a computation.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

return getConfigInternal(null);
}

private Optional<StreamingEnginePipelineConfig> getConfigInternal(@Nullable String computation) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

probably need to change this to some template if we're returnign different types for global or per-computation config

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

Copy link
Contributor

github-actions bot commented May 7, 2024

Assigning reviewers. If you would like to opt out of this review, comment assign to next reviewer:

R: @riteshghorse added as fallback since no labels match configuration

Available commands:

  • stop reviewer notifications - opt out of the automated review tooling
  • remind me after tests pass - tag the comment author after tests pass
  • waiting on author - shift the attention set back to the author (any comment or push by the author will return the attention set to the reviewers)

The PR bot will only process comments in the main thread (not review comments).

Copy link
Contributor

@scwhittle scwhittle left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looking pretty good, mostly just some test comments

@@ -54,6 +56,7 @@ public final class ComputationStateCache implements StatusDataProvider {

private static final Logger LOG = LoggerFactory.getLogger(ComputationStateCache.class);

private final ConcurrentMap<String, String> globalUsernameToStateFamilyNameMap;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: global has connotations and makes it sound like static global, how about pipelineUserNameToStateFamilyNameMap

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

}
}

private static Optional<StreamingEnginePipelineConfig> createPipelineConfig(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

don't think this needs to return optional

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

localRetryTimeoutMs);
}

private StreamingDataflowWorker makeWorker(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

there are 6 makeWorkers, it seems some autovalue builder in this test would make it more readable and less duplication of the default for various params.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done used a builder with some test parameters

private ComputationStateCache computationStateCache;

private static Work createWork(long workToken, long cacheToken) {

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: rm blank line

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

asyncStartConfigLoader.start();
numExpectedRefreshes.await();
asyncStartConfigLoader.join();
assertThat(receivedPipelineConfig)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

seems safer to stop the fetcher's background thread before accessing received config (or use a concurrent set)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

when(mockDataflowServiceClient.getGlobalStreamingConfigWorkItem())
.thenReturn(Optional.of(firstConfig))
.thenReturn(Optional.of(secondConfig))
.thenReturn(Optional.of(thirdConfig));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will there be a flaky error if a 4th fetch comes in before the background thread stops? shoudl some sort of default response be setup?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

@m-trieu m-trieu force-pushed the mt-computation-state-cache branch from b670812 to 099081c Compare May 9, 2024 20:24
@m-trieu m-trieu force-pushed the mt-computation-state-cache branch from 099081c to f32844b Compare May 9, 2024 20:55
Copy link
Contributor

@scwhittle scwhittle left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just final couple nits. I'm also running some dataflow specific tests to verify

defaultWorkerParams()
.setInstructions(instructions)
.publishCounters()
.setOptions(createTestingPipelineOptions())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can setOptions(createTestingPipelineOptions()) since it's default

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

removed

@@ -3203,12 +3145,21 @@ public void testExceptionInvalidatesCache() throws Exception {
WindowingStrategy.globalDefault()),
makeSinkInstruction(StringUtf8Coder.of(), 1, GlobalWindow.Coder.INSTANCE));

defaultWorkerParams()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

rm, not used

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

StreamingDataflowWorker worker = makeWorker(instructions, options, true /* publishCounters */);
StreamingDataflowWorker worker =
makeWorker(
defaultWorkerParams()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does passing in "--activeWorkRefreshPeriodMillis=100" let you get rid of options?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

StreamingDataflowWorker worker = makeWorker(instructions, options, true /* publishCounters */);
StreamingDataflowWorker worker =
makeWorker(
defaultWorkerParams()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto, here and below

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

computationState.activateWork(shardedKey, work1);
computationState.activateWork(shardedKey, work2);
computationState.activateWork(shardedKey, work3);

// Activate 3 Work(s) for computationId2
Optional<ComputationState> maybeComputationState2 = computationStateCache.get(computationId);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: how about just activating work1 on computationState and work2, work3 on comptuationState2. Then we're not deviating from normal usage of work (shouldn't be activated on multiple/different comptuatoins) in case it causes issues down the line.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

@scwhittle scwhittle merged commit 31e81ff into apache:master May 14, 2024
17 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

2 participants