Skip to content

Commit

Permalink
samples: Add snippets and samples for Count query (#383)
Browse files Browse the repository at this point in the history
* Add samples for Count query

* Remove unused variable.

* Add count query samples with limit

* Fix the stale read test.

* Raise ValueError instead of general Exception
  • Loading branch information
Mariatta committed Dec 9, 2022
1 parent a7399d4 commit 604ed70
Show file tree
Hide file tree
Showing 2 changed files with 210 additions and 4 deletions.
164 changes: 162 additions & 2 deletions samples/snippets/snippets.py
Expand Up @@ -14,6 +14,7 @@
import argparse
from datetime import datetime, timedelta, timezone
from pprint import pprint
import time

from google.cloud import datastore # noqa: I100

Expand Down Expand Up @@ -63,7 +64,7 @@ def query_with_readtime(client):
read_time = datetime.now(timezone.utc) - timedelta(seconds=15)

# Fetch an entity with read_time
task_key = client.key('Task', 'sampletask')
task_key = client.key("Task", "sampletask")
entity = client.get(task_key, read_time=read_time)

# Query Task entities with read_time
Expand All @@ -77,11 +78,170 @@ def query_with_readtime(client):
return results


def count_query_in_transaction(client):
# [START datastore_count_in_transaction]
task1 = datastore.Entity(client.key("Task", "task1"))
task2 = datastore.Entity(client.key("Task", "task2"))

task1["owner"] = "john"
task2["owner"] = "john"

tasks = [task1, task2]
client.put_multi(tasks)

with client.transaction() as transaction:

tasks_of_john = client.query(kind="Task")
tasks_of_john.add_filter("owner", "=", "john")
total_tasks_query = client.aggregation_query(tasks_of_john)

query_result = total_tasks_query.count(alias="tasks_count").fetch()
for task_result in query_result:
tasks_count = task_result[0]
if tasks_count.value < 2:
task3 = datastore.Entity(client.key("Task", "task3"))
task3["owner"] = "john"
transaction.put(task3)
tasks.append(task3)
else:
print(f"Found existing {tasks_count.value} tasks, rolling back")
client.entities_to_delete.extend(tasks)
raise ValueError("User 'John' cannot have more than 2 tasks")
# [END datastore_count_in_transaction]


def count_query_on_kind(client):
# [START datastore_count_on_kind]
task1 = datastore.Entity(client.key("Task", "task1"))
task2 = datastore.Entity(client.key("Task", "task2"))

tasks = [task1, task2]
client.put_multi(tasks)
all_tasks_query = client.query(kind="Task")
all_tasks_count_query = client.aggregation_query(all_tasks_query).count()
query_result = all_tasks_count_query.fetch()
for aggregation_results in query_result:
for aggregation in aggregation_results:
print(f"Total tasks (accessible from default alias) is {aggregation.value}")
# [END datastore_count_on_kind]
return tasks


def count_query_with_limit(client):
# [START datastore_count_with_limit]
task1 = datastore.Entity(client.key("Task", "task1"))
task2 = datastore.Entity(client.key("Task", "task2"))
task3 = datastore.Entity(client.key("Task", "task3"))

tasks = [task1, task2, task3]
client.put_multi(tasks)
all_tasks_query = client.query(kind="Task")
all_tasks_count_query = client.aggregation_query(all_tasks_query).count()
query_result = all_tasks_count_query.fetch(limit=2)
for aggregation_results in query_result:
for aggregation in aggregation_results:
print(f"We have at least {aggregation.value} tasks")
# [END datastore_count_with_limit]
return tasks


def count_query_property_filter(client):
# [START datastore_count_with_property_filter]
task1 = datastore.Entity(client.key("Task", "task1"))
task2 = datastore.Entity(client.key("Task", "task2"))
task3 = datastore.Entity(client.key("Task", "task3"))

task1["done"] = True
task2["done"] = False
task3["done"] = True

tasks = [task1, task2, task3]
client.put_multi(tasks)
completed_tasks = client.query(kind="Task").add_filter("done", "=", True)
remaining_tasks = client.query(kind="Task").add_filter("done", "=", False)

completed_tasks_query = client.aggregation_query(query=completed_tasks).count(
alias="total_completed_count"
)
remaining_tasks_query = client.aggregation_query(query=remaining_tasks).count(
alias="total_remaining_count"
)

completed_query_result = completed_tasks_query.fetch()
for aggregation_results in completed_query_result:
for aggregation_result in aggregation_results:
if aggregation_result.alias == "total_completed_count":
print(f"Total completed tasks count is {aggregation_result.value}")

remaining_query_result = remaining_tasks_query.fetch()
for aggregation_results in remaining_query_result:
for aggregation_result in aggregation_results:
if aggregation_result.alias == "total_remaining_count":
print(f"Total remaining tasks count is {aggregation_result.value}")
# [END datastore_count_with_property_filter]
return tasks


def count_query_with_stale_read(client):

tasks = [task for task in client.query(kind="Task").fetch()]
client.delete_multi(tasks) # ensure the database is empty before starting

# [START datastore_count_query_with_stale_read]
task1 = datastore.Entity(client.key("Task", "task1"))
task2 = datastore.Entity(client.key("Task", "task2"))

# Saving two tasks
task1["done"] = True
task2["done"] = False
client.put_multi([task1, task2])
time.sleep(10)

past_timestamp = datetime.now(
timezone.utc
) # we have two tasks in database at this time.
time.sleep(10)

# Saving third task
task3 = datastore.Entity(client.key("Task", "task3"))
task3["done"] = False
client.put(task3)

all_tasks = client.query(kind="Task")
all_tasks_count = client.aggregation_query(
query=all_tasks,
).count(alias="all_tasks_count")

# Executing aggregation query
query_result = all_tasks_count.fetch()
for aggregation_results in query_result:
for aggregation_result in aggregation_results:
print(f"Latest tasks count is {aggregation_result.value}")

# Executing aggregation query with past timestamp
tasks_in_past = client.aggregation_query(query=all_tasks).count(
alias="tasks_in_past"
)
tasks_in_the_past_query_result = tasks_in_past.fetch(read_time=past_timestamp)
for aggregation_results in tasks_in_the_past_query_result:
for aggregation_result in aggregation_results:
print(f"Stale tasks count is {aggregation_result.value}")
# [END datastore_count_query_with_stale_read]
return [task1, task2, task3]


def main(project_id):
client = datastore.Client(project_id)

for name, function in globals().items():
if name in ("main", "_preamble", "defaultdict", "datetime", "timezone", "timedelta") or not callable(function):
if name in (
"main",
"_preamble",
"defaultdict",
"datetime",
"timezone",
"timedelta",
) or not callable(function):
continue

print(name)
Expand Down
50 changes: 48 additions & 2 deletions samples/snippets/snippets_test.py
Expand Up @@ -15,8 +15,6 @@

import backoff
from google.cloud import datastore


import pytest

import snippets
Expand Down Expand Up @@ -72,3 +70,51 @@ def test_query_with_readtime(self, client):
tasks = snippets.query_with_readtime(client)
client.entities_to_delete.extend(tasks)
assert tasks is not None

@backoff.on_exception(backoff.expo, AssertionError, max_time=240)
def test_count_query_in_transaction(self, client):
with pytest.raises(ValueError) as excinfo:
snippets.count_query_in_transaction(client)
assert "User 'John' cannot have more than 2 tasks" in str(excinfo.value)

@backoff.on_exception(backoff.expo, AssertionError, max_time=240)
def test_count_query_on_kind(self, capsys, client):
tasks = snippets.count_query_on_kind(client)
captured = capsys.readouterr()
assert (
captured.out.strip() == "Total tasks (accessible from default alias) is 2"
)
assert captured.err == ""

client.entities_to_delete.extend(tasks)

@backoff.on_exception(backoff.expo, AssertionError, max_time=240)
def test_count_query_with_limit(self, capsys, client):
tasks = snippets.count_query_with_limit(client)
captured = capsys.readouterr()
assert captured.out.strip() == "We have at least 2 tasks"
assert captured.err == ""

client.entities_to_delete.extend(tasks)

@backoff.on_exception(backoff.expo, AssertionError, max_time=240)
def test_count_query_property_filter(self, capsys, client):
tasks = snippets.count_query_property_filter(client)
captured = capsys.readouterr()

assert "Total completed tasks count is 2" in captured.out
assert "Total remaining tasks count is 1" in captured.out
assert captured.err == ""

client.entities_to_delete.extend(tasks)

@backoff.on_exception(backoff.expo, AssertionError, max_time=240)
def test_count_query_with_stale_read(self, capsys, client):
tasks = snippets.count_query_with_stale_read(client)
captured = capsys.readouterr()

assert "Latest tasks count is 3" in captured.out
assert "Stale tasks count is 2" in captured.out
assert captured.err == ""

client.entities_to_delete.extend(tasks)

0 comments on commit 604ed70

Please sign in to comment.