Skip to content

Commit

Permalink
Allow deletion of async searches with the manage privilege (#67965)
Browse files Browse the repository at this point in the history
This change allows users that do not initiated an async search to delete it
if they have the cluster manage and manage-security privilege.
It is equivalent to the cancellation of tasks through the task manager (same privilege required)
and will allow users with the right permissions to cancel/delete async searches if they know
the async execution id.
  • Loading branch information
jimczi committed Feb 8, 2021
1 parent c9bbc84 commit ed6de0c
Show file tree
Hide file tree
Showing 10 changed files with 187 additions and 88 deletions.
3 changes: 2 additions & 1 deletion x-pack/plugin/async-search/qa/security/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -15,5 +15,6 @@ testClusters.all {
user username: "test-admin", password: 'x-pack-test-password', role: "test-admin"
user username: "user1", password: 'x-pack-test-password', role: "user1"
user username: "user2", password: 'x-pack-test-password', role: "user2"
user username: "user_dls", password: 'x-pack-test-password', role: "user_dls"
user username: "user-dls", password: 'x-pack-test-password', role: "user-dls"
user username: "user-manage", password: 'x-pack-test-password', role: "user-manage"
}
6 changes: 5 additions & 1 deletion x-pack/plugin/async-search/qa/security/roles.yml
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ user2:
- create_index
- indices:admin/refresh

user_dls:
user-dls:
cluster:
- cluster:monitor/main
indices:
Expand All @@ -55,3 +55,7 @@ user_dls:
]
}
}
user-manage:
cluster:
- manage
Original file line number Diff line number Diff line change
Expand Up @@ -76,9 +76,9 @@ public void indexDocuments() throws IOException {
}

public void testWithDlsAndFls() throws Exception {
Response submitResp = submitAsyncSearch("*", "*", TimeValue.timeValueSeconds(10), "user_dls");
Response submitResp = submitAsyncSearch("*", "*", TimeValue.timeValueSeconds(10), "user-dls");
assertOK(submitResp);
SearchHit[] hits = getSearchHits(extractResponseId(submitResp), "user_dls");
SearchHit[] hits = getSearchHits(extractResponseId(submitResp), "user-dls");
assertThat(hits, arrayContainingInAnyOrder(
new CustomMatcher<SearchHit>("\"index\" doc 1 matcher") {
@Override
Expand Down Expand Up @@ -115,7 +115,7 @@ public void testWithUsers() throws Exception {
}

private void testCase(String user, String other) throws Exception {
for (String indexName : new String[] {"index", "index-" + user}) {
for (String indexName : new String[] {"index", "index-" + user}) {
Response submitResp = submitAsyncSearch(indexName, "foo:bar", TimeValue.timeValueSeconds(10), user);
assertOK(submitResp);
String id = extractResponseId(submitResp);
Expand All @@ -126,20 +126,32 @@ private void testCase(String user, String other) throws Exception {
ResponseException exc = expectThrows(ResponseException.class, () -> getAsyncSearch(id, other));
assertThat(exc.getResponse().getStatusLine().getStatusCode(), equalTo(404));

// user-manage cannot access the result
exc = expectThrows(ResponseException.class, () -> getAsyncSearch(id, "user-manage"));
assertThat(exc.getResponse().getStatusLine().getStatusCode(), equalTo(404));

// other cannot delete the result
exc = expectThrows(ResponseException.class, () -> deleteAsyncSearch(id, other));
assertThat(exc.getResponse().getStatusLine().getStatusCode(), equalTo(404));

// other and user cannot access the result from direct get calls
AsyncExecutionId searchId = AsyncExecutionId.decode(id);
for (String runAs : new String[] {user, other}) {
exc = expectThrows(ResponseException.class, () -> get(ASYNC_RESULTS_INDEX, searchId.getDocId(), runAs));
assertThat(exc.getResponse().getStatusLine().getStatusCode(), equalTo(403));
assertThat(exc.getMessage(), containsString("unauthorized"));
}
AsyncExecutionId searchId = AsyncExecutionId.decode(id);
for (String runAs : new String[] {user, other}) {
exc = expectThrows(ResponseException.class, () -> get(ASYNC_RESULTS_INDEX, searchId.getDocId(), runAs));
assertThat(exc.getResponse().getStatusLine().getStatusCode(), equalTo(403));
assertThat(exc.getMessage(), containsString("unauthorized"));
}

Response delResp = deleteAsyncSearch(id, user);
assertOK(delResp);

// check that user with 'manage' privileges can delete an async
// search submitted by a different user
Response newResp = submitAsyncSearch(indexName, "foo:bar", TimeValue.timeValueSeconds(10), user);
assertOK(newResp);
String newId = extractResponseId(newResp);
delResp = deleteAsyncSearch(newId, "user-manage");
assertOK(delResp);
}
ResponseException exc = expectThrows(ResponseException.class,
() -> submitAsyncSearch("index-" + other, "*", TimeValue.timeValueSeconds(10), user));
Expand Down Expand Up @@ -254,9 +266,9 @@ public void testWithDLSPointInTime() throws Exception {
assertOK(userResp);
assertThat(getSearchHits(extractResponseId(userResp), "user1"), arrayWithSize(3));

Response dlsResp = submitAsyncSearchWithPIT(pitId, "*", TimeValue.timeValueSeconds(10), "user_dls");
Response dlsResp = submitAsyncSearchWithPIT(pitId, "*", TimeValue.timeValueSeconds(10), "user-dls");
assertOK(dlsResp);
assertThat(getSearchHits(extractResponseId(dlsResp), "user_dls"), arrayContainingInAnyOrder(
assertThat(getSearchHits(extractResponseId(dlsResp), "user-dls"), arrayContainingInAnyOrder(
new CustomMatcher<SearchHit>("\"index\" doc 1 matcher") {
@Override
public boolean matches(Object actual) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ public AsyncSearchTask createTask(long id, String type, String action, TaskId pa
Supplier<InternalAggregation.ReduceContext> aggReduceContextSupplier =
() -> requestToAggReduceContextBuilder.apply(request.getSearchRequest());
return new AsyncSearchTask(id, type, action, parentTaskId, this::buildDescription, keepAlive,
originHeaders, taskHeaders, searchId, store.getClient(), nodeClient.threadPool(), aggReduceContextSupplier);
originHeaders, taskHeaders, searchId, store.getClientWithOrigin(), nodeClient.threadPool(), aggReduceContextSupplier);
}
};
searchRequest.setParentTask(new TaskId(nodeClient.getLocalNodeId(), submitTask.getId()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ private void getSearchResponseFromTask(AsyncExecutionId searchId,
long expirationTimeMillis,
ActionListener<Response> listener) {
try {
final Task task = store.getTask(taskManager, searchId, asyncTaskClass);
final Task task = store.getTaskAndCheckAuthentication(taskManager, searchId, asyncTaskClass);
if (task == null) {
getSearchResponseFromIndex(searchId, request, nowInMillis, listener);
return;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.script.Script;
import org.elasticsearch.script.ScriptType;
import org.elasticsearch.search.fetch.subphase.FetchSourceContext;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.tasks.TaskManager;
import org.elasticsearch.xpack.core.search.action.AsyncSearchResponse;
Expand Down Expand Up @@ -119,6 +120,7 @@ static XContentBuilder mappings() throws IOException {
private final String index;
private final ClusterService clusterService;
private final Client client;
private final Client clientWithOrigin;
private final SecurityContext securityContext;
private final NamedWriteableRegistry registry;
private final Writeable.Reader<R> reader;
Expand All @@ -134,13 +136,21 @@ public AsyncTaskIndexService(String index,
this.index = index;
this.clusterService = clusterService;
this.securityContext = new SecurityContext(clusterService.getSettings(), threadContext);
this.client = new OriginSettingClient(client, origin);
this.client = client;
this.clientWithOrigin = new OriginSettingClient(client, origin);
this.registry = registry;
this.reader = reader;
}

/**
* Returns the internal client with origin.
* Returns the internal client wrapped with the async user origin.
*/
public Client getClientWithOrigin() {
return clientWithOrigin;
}

/**
* Returns the internal client.
*/
public Client getClient() {
return client;
Expand All @@ -152,7 +162,7 @@ public Client getClient() {
void createIndexIfNecessary(ActionListener<Void> listener) {
if (clusterService.state().routingTable().hasIndex(index) == false) {
try {
client.admin().indices().prepareCreate(index)
clientWithOrigin.admin().indices().prepareCreate(index)
.setSettings(settings())
.addMapping(SINGLE_MAPPING_NAME, mappings())
.execute(ActionListener.wrap(
Expand All @@ -174,6 +184,13 @@ void createIndexIfNecessary(ActionListener<Void> listener) {
}
}

/**
* Returns the authentication information, or null if the current context has no authentication info.
**/
public Authentication getAuthentication() {
return securityContext.getAuthentication();
}

/**
* Stores the initial response with the original headers of the authenticated user
* and the expected expiration time.
Expand All @@ -190,7 +207,7 @@ public void createResponse(String docId,
.create(true)
.id(docId)
.source(source, XContentType.JSON);
createIndexIfNecessary(ActionListener.wrap(v -> client.index(indexRequest, listener), listener::onFailure));
createIndexIfNecessary(ActionListener.wrap(v -> clientWithOrigin.index(indexRequest, listener), listener::onFailure));
}

/**
Expand All @@ -211,7 +228,7 @@ public void updateResponse(String docId,
.retryOnConflict(5);
// updates create the index automatically if it doesn't exist so we force the creation
// preemptively.
createIndexIfNecessary(ActionListener.wrap(v -> client.update(request, listener), listener::onFailure));
createIndexIfNecessary(ActionListener.wrap(v -> clientWithOrigin.update(request, listener), listener::onFailure));
} catch(Exception e) {
listener.onFailure(e);
}
Expand All @@ -230,7 +247,7 @@ public void extendExpirationTime(String docId, long expirationTimeMillis, Action
.retryOnConflict(5);
// updates create the index automatically if it doesn't exist so we force the creation
// preemptively.
createIndexIfNecessary(ActionListener.wrap(v -> client.update(request, listener), listener::onFailure));
createIndexIfNecessary(ActionListener.wrap(v -> clientWithOrigin.update(request, listener), listener::onFailure));
}

/**
Expand All @@ -242,7 +259,7 @@ public void deleteResponse(AsyncExecutionId asyncExecutionId,
DeleteRequest request = new DeleteRequest(index).id(asyncExecutionId.getDocId());
// deletes create the index automatically if it doesn't exist so we force the creation
// preemptively.
createIndexIfNecessary(ActionListener.wrap(v -> client.delete(request, listener), listener::onFailure));
createIndexIfNecessary(ActionListener.wrap(v -> clientWithOrigin.delete(request, listener), listener::onFailure));
} catch(Exception e) {
listener.onFailure(e);
}
Expand All @@ -251,11 +268,10 @@ public void deleteResponse(AsyncExecutionId asyncExecutionId,
/**
* Returns the {@link AsyncTask} if the provided <code>asyncTaskId</code>
* is registered in the task manager, <code>null</code> otherwise.
*
* This method throws a {@link ResourceNotFoundException} if the authenticated user
* is not the creator of the original task.
*/
public <T extends AsyncTask> T getTask(TaskManager taskManager, AsyncExecutionId asyncExecutionId, Class<T> tClass) throws IOException {
public <T extends AsyncTask> T getTask(TaskManager taskManager,
AsyncExecutionId asyncExecutionId,
Class<T> tClass) throws IOException {
Task task = taskManager.getTask(asyncExecutionId.getTaskId().getId());
if (tClass.isInstance(task) == false) {
return null;
Expand All @@ -264,7 +280,24 @@ public <T extends AsyncTask> T getTask(TaskManager taskManager, AsyncExecutionId
if (asyncTask.getExecutionId().equals(asyncExecutionId) == false) {
return null;
}
return asyncTask;
}


/**
* Returns the {@link AsyncTask} if the provided <code>asyncTaskId</code>
* is registered in the task manager, <code>null</code> otherwise.
*
* This method throws a {@link ResourceNotFoundException} if the authenticated user
* is not the creator of the original task.
*/
public <T extends AsyncTask> T getTaskAndCheckAuthentication(TaskManager taskManager,
AsyncExecutionId asyncExecutionId,
Class<T> tClass) throws IOException {
T asyncTask = getTask(taskManager, asyncExecutionId, tClass);
if (asyncTask == null) {
return null;
}
// Check authentication for the user
final Authentication auth = securityContext.getAuthentication();
if (ensureAuthenticatedUserIsSame(asyncTask.getOriginHeaders(), auth) == false) {
Expand All @@ -274,13 +307,12 @@ public <T extends AsyncTask> T getTask(TaskManager taskManager, AsyncExecutionId
}

private void getEncodedResponse(AsyncExecutionId asyncExecutionId,
boolean restoreResponseHeaders,
ActionListener<Tuple<String, Long>> listener) {
final Authentication current = securityContext.getAuthentication();
boolean restoreResponseHeaders,
ActionListener<Tuple<String, Long>> listener) {
GetRequest internalGet = new GetRequest(index)
.preference(asyncExecutionId.getEncoded())
.id(asyncExecutionId.getDocId());
client.get(internalGet, ActionListener.wrap(
clientWithOrigin.get(internalGet, ActionListener.wrap(
get -> {
if (get.isExists() == false) {
listener.onFailure(new ResourceNotFoundException(asyncExecutionId.getEncoded()));
Expand All @@ -290,7 +322,7 @@ private void getEncodedResponse(AsyncExecutionId asyncExecutionId,
// check the authentication of the current user against the user that initiated the async task
@SuppressWarnings("unchecked")
Map<String, String> headers = (Map<String, String>) get.getSource().get(HEADERS_FIELD);
if (ensureAuthenticatedUserIsSame(headers, current) == false) {
if (ensureAuthenticatedUserIsSame(headers, securityContext.getAuthentication()) == false) {
listener.onFailure(new ResourceNotFoundException(asyncExecutionId.getEncoded()));
return;
}
Expand Down Expand Up @@ -335,13 +367,13 @@ public void getResponse(AsyncExecutionId asyncExecutionId,
* @param statusProducer – a producer of the status from the stored async search response and expirationTime
* @param listener – listener to report result to
*/
public void getStatusResponse(
AsyncExecutionId asyncExecutionId,
BiFunction<R, Long, AsyncStatusResponse> statusProducer, ActionListener<AsyncStatusResponse> listener) {
public void getStatusResponse(AsyncExecutionId asyncExecutionId,
BiFunction<R, Long, AsyncStatusResponse> statusProducer,
ActionListener<AsyncStatusResponse> listener) {
GetRequest internalGet = new GetRequest(index)
.preference(asyncExecutionId.getEncoded())
.id(asyncExecutionId.getDocId());
client.get(internalGet, ActionListener.wrap(
clientWithOrigin.get(internalGet, ActionListener.wrap(
get -> {
if (get.isExists() == false) {
listener.onFailure(new ResourceNotFoundException(asyncExecutionId.getEncoded()));
Expand All @@ -360,17 +392,32 @@ public void getStatusResponse(
}

/**
* Ensures that the current user can read the specified response without actually reading it
*/
public void authorizeResponse(AsyncExecutionId asyncExecutionId,
boolean restoreResponseHeaders,
ActionListener<R> listener) {
getEncodedResponse(asyncExecutionId, restoreResponseHeaders, ActionListener.wrap(
(t) -> listener.onResponse(null),
listener::onFailure
));
}
* Checks if the current user's authentication matches the original authentication stored
* in the async search index.
**/
void ensureAuthenticatedUserCanDeleteFromIndex(AsyncExecutionId executionId, ActionListener<Void> listener) {
GetRequest internalGet = new GetRequest(index)
.preference(executionId.getEncoded())
.id(executionId.getDocId())
.fetchSourceContext(new FetchSourceContext(true, new String[] { HEADERS_FIELD }, new String[] {}));

clientWithOrigin.get(internalGet, ActionListener.wrap(
get -> {
if (get.isExists() == false) {
listener.onFailure(new ResourceNotFoundException(executionId.getEncoded()));
return;
}
// Check authentication for the user
@SuppressWarnings("unchecked")
Map<String, String> headers = (Map<String, String>) get.getSource().get(HEADERS_FIELD);
if (ensureAuthenticatedUserIsSame(headers, securityContext.getAuthentication())) {
listener.onResponse(null);
} else {
listener.onFailure(new ResourceNotFoundException(executionId.getEncoded()));
}
},
exc -> listener.onFailure(new ResourceNotFoundException(executionId.getEncoded()))));
}

/**
* Extracts the authentication from the original headers and checks that it matches
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ synchronized void executeNextCleanup() {
long nowInMillis = System.currentTimeMillis();
DeleteByQueryRequest toDelete = new DeleteByQueryRequest(index)
.setQuery(QueryBuilders.rangeQuery(EXPIRATION_TIME_FIELD).lte(nowInMillis));
indexService.getClient()
indexService.getClientWithOrigin()
.execute(DeleteByQueryAction.INSTANCE, toDelete, ActionListener.wrap(this::scheduleNextCleanup));
}
}
Expand Down

0 comments on commit ed6de0c

Please sign in to comment.