diff --git a/samples/snippets/snippets.py b/samples/snippets/snippets.py index b37020c0..78e72bca 100644 --- a/samples/snippets/snippets.py +++ b/samples/snippets/snippets.py @@ -14,6 +14,7 @@ import argparse from datetime import datetime, timedelta, timezone from pprint import pprint +import time from google.cloud import datastore # noqa: I100 @@ -63,7 +64,7 @@ def query_with_readtime(client): read_time = datetime.now(timezone.utc) - timedelta(seconds=15) # Fetch an entity with read_time - task_key = client.key('Task', 'sampletask') + task_key = client.key("Task", "sampletask") entity = client.get(task_key, read_time=read_time) # Query Task entities with read_time @@ -77,11 +78,170 @@ def query_with_readtime(client): return results +def count_query_in_transaction(client): + # [START datastore_count_in_transaction] + task1 = datastore.Entity(client.key("Task", "task1")) + task2 = datastore.Entity(client.key("Task", "task2")) + + task1["owner"] = "john" + task2["owner"] = "john" + + tasks = [task1, task2] + client.put_multi(tasks) + + with client.transaction() as transaction: + + tasks_of_john = client.query(kind="Task") + tasks_of_john.add_filter("owner", "=", "john") + total_tasks_query = client.aggregation_query(tasks_of_john) + + query_result = total_tasks_query.count(alias="tasks_count").fetch() + for task_result in query_result: + tasks_count = task_result[0] + if tasks_count.value < 2: + task3 = datastore.Entity(client.key("Task", "task3")) + task3["owner"] = "john" + transaction.put(task3) + tasks.append(task3) + else: + print(f"Found existing {tasks_count.value} tasks, rolling back") + client.entities_to_delete.extend(tasks) + raise ValueError("User 'John' cannot have more than 2 tasks") + # [END datastore_count_in_transaction] + + +def count_query_on_kind(client): + # [START datastore_count_on_kind] + task1 = datastore.Entity(client.key("Task", "task1")) + task2 = datastore.Entity(client.key("Task", "task2")) + + tasks = [task1, task2] + client.put_multi(tasks) + all_tasks_query = client.query(kind="Task") + all_tasks_count_query = client.aggregation_query(all_tasks_query).count() + query_result = all_tasks_count_query.fetch() + for aggregation_results in query_result: + for aggregation in aggregation_results: + print(f"Total tasks (accessible from default alias) is {aggregation.value}") + # [END datastore_count_on_kind] + return tasks + + +def count_query_with_limit(client): + # [START datastore_count_with_limit] + task1 = datastore.Entity(client.key("Task", "task1")) + task2 = datastore.Entity(client.key("Task", "task2")) + task3 = datastore.Entity(client.key("Task", "task3")) + + tasks = [task1, task2, task3] + client.put_multi(tasks) + all_tasks_query = client.query(kind="Task") + all_tasks_count_query = client.aggregation_query(all_tasks_query).count() + query_result = all_tasks_count_query.fetch(limit=2) + for aggregation_results in query_result: + for aggregation in aggregation_results: + print(f"We have at least {aggregation.value} tasks") + # [END datastore_count_with_limit] + return tasks + + +def count_query_property_filter(client): + # [START datastore_count_with_property_filter] + task1 = datastore.Entity(client.key("Task", "task1")) + task2 = datastore.Entity(client.key("Task", "task2")) + task3 = datastore.Entity(client.key("Task", "task3")) + + task1["done"] = True + task2["done"] = False + task3["done"] = True + + tasks = [task1, task2, task3] + client.put_multi(tasks) + completed_tasks = client.query(kind="Task").add_filter("done", "=", True) + remaining_tasks = client.query(kind="Task").add_filter("done", "=", False) + + completed_tasks_query = client.aggregation_query(query=completed_tasks).count( + alias="total_completed_count" + ) + remaining_tasks_query = client.aggregation_query(query=remaining_tasks).count( + alias="total_remaining_count" + ) + + completed_query_result = completed_tasks_query.fetch() + for aggregation_results in completed_query_result: + for aggregation_result in aggregation_results: + if aggregation_result.alias == "total_completed_count": + print(f"Total completed tasks count is {aggregation_result.value}") + + remaining_query_result = remaining_tasks_query.fetch() + for aggregation_results in remaining_query_result: + for aggregation_result in aggregation_results: + if aggregation_result.alias == "total_remaining_count": + print(f"Total remaining tasks count is {aggregation_result.value}") + # [END datastore_count_with_property_filter] + return tasks + + +def count_query_with_stale_read(client): + + tasks = [task for task in client.query(kind="Task").fetch()] + client.delete_multi(tasks) # ensure the database is empty before starting + + # [START datastore_count_query_with_stale_read] + task1 = datastore.Entity(client.key("Task", "task1")) + task2 = datastore.Entity(client.key("Task", "task2")) + + # Saving two tasks + task1["done"] = True + task2["done"] = False + client.put_multi([task1, task2]) + time.sleep(10) + + past_timestamp = datetime.now( + timezone.utc + ) # we have two tasks in database at this time. + time.sleep(10) + + # Saving third task + task3 = datastore.Entity(client.key("Task", "task3")) + task3["done"] = False + client.put(task3) + + all_tasks = client.query(kind="Task") + all_tasks_count = client.aggregation_query( + query=all_tasks, + ).count(alias="all_tasks_count") + + # Executing aggregation query + query_result = all_tasks_count.fetch() + for aggregation_results in query_result: + for aggregation_result in aggregation_results: + print(f"Latest tasks count is {aggregation_result.value}") + + # Executing aggregation query with past timestamp + tasks_in_past = client.aggregation_query(query=all_tasks).count( + alias="tasks_in_past" + ) + tasks_in_the_past_query_result = tasks_in_past.fetch(read_time=past_timestamp) + for aggregation_results in tasks_in_the_past_query_result: + for aggregation_result in aggregation_results: + print(f"Stale tasks count is {aggregation_result.value}") + # [END datastore_count_query_with_stale_read] + return [task1, task2, task3] + + def main(project_id): client = datastore.Client(project_id) for name, function in globals().items(): - if name in ("main", "_preamble", "defaultdict", "datetime", "timezone", "timedelta") or not callable(function): + if name in ( + "main", + "_preamble", + "defaultdict", + "datetime", + "timezone", + "timedelta", + ) or not callable(function): continue print(name) diff --git a/samples/snippets/snippets_test.py b/samples/snippets/snippets_test.py index 58e75a59..18bc701e 100644 --- a/samples/snippets/snippets_test.py +++ b/samples/snippets/snippets_test.py @@ -15,8 +15,6 @@ import backoff from google.cloud import datastore - - import pytest import snippets @@ -72,3 +70,51 @@ def test_query_with_readtime(self, client): tasks = snippets.query_with_readtime(client) client.entities_to_delete.extend(tasks) assert tasks is not None + + @backoff.on_exception(backoff.expo, AssertionError, max_time=240) + def test_count_query_in_transaction(self, client): + with pytest.raises(ValueError) as excinfo: + snippets.count_query_in_transaction(client) + assert "User 'John' cannot have more than 2 tasks" in str(excinfo.value) + + @backoff.on_exception(backoff.expo, AssertionError, max_time=240) + def test_count_query_on_kind(self, capsys, client): + tasks = snippets.count_query_on_kind(client) + captured = capsys.readouterr() + assert ( + captured.out.strip() == "Total tasks (accessible from default alias) is 2" + ) + assert captured.err == "" + + client.entities_to_delete.extend(tasks) + + @backoff.on_exception(backoff.expo, AssertionError, max_time=240) + def test_count_query_with_limit(self, capsys, client): + tasks = snippets.count_query_with_limit(client) + captured = capsys.readouterr() + assert captured.out.strip() == "We have at least 2 tasks" + assert captured.err == "" + + client.entities_to_delete.extend(tasks) + + @backoff.on_exception(backoff.expo, AssertionError, max_time=240) + def test_count_query_property_filter(self, capsys, client): + tasks = snippets.count_query_property_filter(client) + captured = capsys.readouterr() + + assert "Total completed tasks count is 2" in captured.out + assert "Total remaining tasks count is 1" in captured.out + assert captured.err == "" + + client.entities_to_delete.extend(tasks) + + @backoff.on_exception(backoff.expo, AssertionError, max_time=240) + def test_count_query_with_stale_read(self, capsys, client): + tasks = snippets.count_query_with_stale_read(client) + captured = capsys.readouterr() + + assert "Latest tasks count is 3" in captured.out + assert "Stale tasks count is 2" in captured.out + assert captured.err == "" + + client.entities_to_delete.extend(tasks)