-
Notifications
You must be signed in to change notification settings - Fork 3.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[Issue #5395][broker] Implement AutoTopicCreation by namespace override #6471
Merged
sijie
merged 22 commits into
apache:master
from
klevy-toast:feature/autocreate-by-namespace
Mar 26, 2020
Merged
Changes from all commits
Commits
Show all changes
22 commits
Select commit
Hold shift + click to select a range
9e6ee25
Add allowAutoTopicCreation option to policies and CLI
klevy-toast f60eeae
Auto-create by namespace MVP
klevy-toast e960426
Refactor to allow partitioned topics, added tests
klevy-toast bb50b3b
Merge branch 'master' into feature/autocreate-by-namespace
klevy-toast 0a92e59
Cleanup commented code, imports
klevy-toast 8bd458f
Tweak override validation
klevy-toast 49f443c
More cleanup, documentation, tests
klevy-toast 0752ccf
Add default partition type
klevy-toast ee3a3dc
Add license to new pulsar-common files
klevy-toast 454b8c0
Remove null check that could cause silent failure
klevy-toast 4f31397
Switch back to synchronous policy retreival
klevy-toast d5aa45f
Merge branch 'master' into feature/autocreate-by-namespace
klevy-toast bee6179
Merge branch 'master' into feature/autocreate-by-namespace
klevy-toast 423562b
Make API async, renaming
klevy-toast 163811a
Add clause to resume async response on exceptions
klevy-toast 5897a8d
Merge branch 'master' into feature/autocreate-by-namespace
klevy-toast 9957a3b
Remove override from method names
klevy-toast 84f4e4a
Merge branch 'master' into feature/autocreate-by-namespace
klevy-toast 102ccfe
Fix typo
klevy-toast bafdb70
Merge branch 'master' into feature/autocreate-by-namespace
klevy-toast 221ddab
Merge branch 'master' into feature/autocreate-by-namespace
sijie f237b22
Fix pulsar-client-admin style
klevy-toast File filter
Filter by extension
Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -69,6 +69,7 @@ | |
import org.apache.pulsar.common.naming.NamespaceName; | ||
import org.apache.pulsar.common.naming.TopicDomain; | ||
import org.apache.pulsar.common.naming.TopicName; | ||
import org.apache.pulsar.common.policies.data.AutoTopicCreationOverride; | ||
import org.apache.pulsar.common.policies.data.AuthAction; | ||
import org.apache.pulsar.common.policies.data.BacklogQuota; | ||
import org.apache.pulsar.common.policies.data.BacklogQuota.BacklogQuotaType; | ||
|
@@ -553,6 +554,105 @@ protected void internalSetNamespaceMessageTTL(int messageTTL) { | |
} | ||
} | ||
|
||
protected void internalSetAutoTopicCreation(AsyncResponse asyncResponse, AutoTopicCreationOverride autoTopicCreationOverride) { | ||
validateAdminAccessForTenant(namespaceName.getTenant()); | ||
validatePoliciesReadOnlyAccess(); | ||
|
||
if (!AutoTopicCreationOverride.isValidOverride(autoTopicCreationOverride)) { | ||
throw new RestException(Status.PRECONDITION_FAILED, "Invalid configuration for autoTopicCreationOverride"); | ||
} | ||
|
||
// Force to read the data s.t. the watch to the cache content is setup. | ||
policiesCache().getWithStatAsync(path(POLICIES, namespaceName.toString())).thenApply( | ||
policies -> { | ||
if (policies.isPresent()) { | ||
Entry<Policies, Stat> policiesNode = policies.get(); | ||
policiesNode.getKey().autoTopicCreationOverride = autoTopicCreationOverride; | ||
try { | ||
// Write back the new policies into zookeeper | ||
globalZk().setData(path(POLICIES, namespaceName.toString()), | ||
jsonMapper().writeValueAsBytes(policiesNode.getKey()), policiesNode.getValue().getVersion()); | ||
policiesCache().invalidate(path(POLICIES, namespaceName.toString())); | ||
asyncResponse.resume(Response.noContent().build()); | ||
log.info("[{}] Successfully {} on namespace {}", clientAppId(), | ||
autoTopicCreationOverride.allowAutoTopicCreation ? "enabled" : "disabled", namespaceName); | ||
return null; | ||
} catch (KeeperException.NoNodeException e) { | ||
log.error("[{}] Failed to modify autoTopicCreation status for namespace {}: does not exist", clientAppId(), | ||
namespaceName); | ||
asyncResponse.resume(new RestException(Status.NOT_FOUND, "Namespace does not exist")); | ||
return null; | ||
} catch (KeeperException.BadVersionException e) { | ||
log.error( | ||
"[{}] Failed to modify autoTopicCreation status on namespace {} expected policy node version={} : concurrent modification", | ||
clientAppId(), namespaceName, policiesNode.getValue().getVersion()); | ||
|
||
asyncResponse.resume(new RestException(Status.CONFLICT, "Concurrent modification")); | ||
return null; | ||
} catch (Exception e) { | ||
log.error("[{}] Failed to modify autoTopicCreation status on namespace {}", clientAppId(), namespaceName, e); | ||
asyncResponse.resume(new RestException(e)); | ||
return null; | ||
} | ||
} else { | ||
asyncResponse.resume(new RestException(Status.NOT_FOUND, "Namespace " + namespaceName + " does not exist")); | ||
return null; | ||
} | ||
} | ||
).exceptionally(e -> { | ||
log.error("[{}] Failed to modify autoTopicCreation status on namespace {}", clientAppId(), namespaceName, e); | ||
asyncResponse.resume(new RestException(e)); | ||
return null; | ||
}); | ||
} | ||
|
||
protected void internalRemoveAutoTopicCreation(AsyncResponse asyncResponse) { | ||
validateAdminAccessForTenant(namespaceName.getTenant()); | ||
validatePoliciesReadOnlyAccess(); | ||
Comment on lines
+610
to
+611
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Same as above comment. |
||
|
||
// Force to read the data s.t. the watch to the cache content is setup. | ||
policiesCache().getWithStatAsync(path(POLICIES, namespaceName.toString())).thenApply( | ||
policies -> { | ||
if (policies.isPresent()) { | ||
Entry<Policies, Stat> policiesNode = policies.get(); | ||
policiesNode.getKey().autoTopicCreationOverride = null; | ||
try { | ||
// Write back the new policies into zookeeper | ||
globalZk().setData(path(POLICIES, namespaceName.toString()), | ||
jsonMapper().writeValueAsBytes(policiesNode.getKey()), policiesNode.getValue().getVersion()); | ||
policiesCache().invalidate(path(POLICIES, namespaceName.toString())); | ||
asyncResponse.resume(Response.noContent().build()); | ||
log.info("[{}] Successfully removed override on namespace {}", clientAppId(), namespaceName); | ||
return null; | ||
} catch (KeeperException.NoNodeException e) { | ||
log.error("[{}] Failed to modify autoTopicCreation status for namespace {}: does not exist", clientAppId(), | ||
namespaceName); | ||
asyncResponse.resume(new RestException(Status.NOT_FOUND, "Namespace does not exist")); | ||
return null; | ||
} catch (KeeperException.BadVersionException e) { | ||
log.error( | ||
"[{}] Failed to modify autoTopicCreation status on namespace {} expected policy node version={} : concurrent modification", | ||
clientAppId(), namespaceName, policiesNode.getValue().getVersion()); | ||
|
||
asyncResponse.resume(new RestException(Status.CONFLICT, "Concurrent modification")); | ||
return null; | ||
} catch (Exception e) { | ||
log.error("[{}] Failed to modify autoTopicCreation status on namespace {}", clientAppId(), namespaceName, e); | ||
asyncResponse.resume(new RestException(e)); | ||
return null; | ||
} | ||
} else { | ||
asyncResponse.resume(new RestException(Status.NOT_FOUND, "Namespace " + namespaceName + " does not exist")); | ||
return null; | ||
} | ||
} | ||
).exceptionally(e -> { | ||
log.error("[{}] Failed to modify autoTopicCreation status on namespace {}", clientAppId(), namespaceName, e); | ||
asyncResponse.resume(new RestException(e)); | ||
return null; | ||
}); | ||
} | ||
|
||
protected void internalModifyDeduplication(boolean enableDeduplication) { | ||
validateAdminAccessForTenant(namespaceName.getTenant()); | ||
validatePoliciesReadOnlyAccess(); | ||
|
@@ -573,17 +673,17 @@ protected void internalModifyDeduplication(boolean enableDeduplication) { | |
log.info("[{}] Successfully {} on namespace {}", clientAppId(), | ||
enableDeduplication ? "enabled" : "disabled", namespaceName); | ||
} catch (KeeperException.NoNodeException e) { | ||
log.warn("[{}] Failed to modify deplication status for namespace {}: does not exist", clientAppId(), | ||
log.warn("[{}] Failed to modify deduplication status for namespace {}: does not exist", clientAppId(), | ||
namespaceName); | ||
throw new RestException(Status.NOT_FOUND, "Namespace does not exist"); | ||
} catch (KeeperException.BadVersionException e) { | ||
log.warn( | ||
"[{}] Failed to modify deplication status on namespace {} expected policy node version={} : concurrent modification", | ||
"[{}] Failed to modify deduplication status on namespace {} expected policy node version={} : concurrent modification", | ||
clientAppId(), namespaceName, policiesNode.getValue().getVersion()); | ||
|
||
throw new RestException(Status.CONFLICT, "Concurrent modification"); | ||
} catch (Exception e) { | ||
log.error("[{}] Failed to modify deplication status on namespace {}", clientAppId(), namespaceName, e); | ||
log.error("[{}] Failed to modify deduplication status on namespace {}", clientAppId(), namespaceName, e); | ||
throw new RestException(e); | ||
} | ||
} | ||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We'd better complete the
asyncResponse
when there are RunTimeException occurs. Otherwise, the client just can get a500
response but can't get any error messages.