Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

samples: Add snippets and samples for Count query #383

Merged
merged 8 commits into from Dec 9, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
164 changes: 162 additions & 2 deletions samples/snippets/snippets.py
Expand Up @@ -14,6 +14,7 @@
import argparse
from datetime import datetime, timedelta, timezone
from pprint import pprint
import time

from google.cloud import datastore # noqa: I100

Expand Down Expand Up @@ -63,7 +64,7 @@ def query_with_readtime(client):
read_time = datetime.now(timezone.utc) - timedelta(seconds=15)

# Fetch an entity with read_time
task_key = client.key('Task', 'sampletask')
task_key = client.key("Task", "sampletask")
entity = client.get(task_key, read_time=read_time)

# Query Task entities with read_time
Expand All @@ -77,11 +78,170 @@ def query_with_readtime(client):
return results


def count_query_in_transaction(client):
# [START datastore_count_in_transaction]
task1 = datastore.Entity(client.key("Task", "task1"))
task2 = datastore.Entity(client.key("Task", "task2"))

task1["owner"] = "john"
task2["owner"] = "john"

tasks = [task1, task2]
client.put_multi(tasks)

with client.transaction() as transaction:

tasks_of_john = client.query(kind="Task")
tasks_of_john.add_filter("owner", "=", "john")
total_tasks_query = client.aggregation_query(tasks_of_john)

query_result = total_tasks_query.count(alias="tasks_count").fetch()
for task_result in query_result:
tasks_count = task_result[0]
if tasks_count.value < 2:
task3 = datastore.Entity(client.key("Task", "task3"))
task3["owner"] = "john"
transaction.put(task3)
tasks.append(task3)
else:
print(f"Found existing {tasks_count.value} tasks, rolling back")
client.entities_to_delete.extend(tasks)
raise ValueError("User 'John' cannot have more than 2 tasks")
# [END datastore_count_in_transaction]


def count_query_on_kind(client):
# [START datastore_count_on_kind]
task1 = datastore.Entity(client.key("Task", "task1"))
task2 = datastore.Entity(client.key("Task", "task2"))

tasks = [task1, task2]
client.put_multi(tasks)
all_tasks_query = client.query(kind="Task")
all_tasks_count_query = client.aggregation_query(all_tasks_query).count()
query_result = all_tasks_count_query.fetch()
for aggregation_results in query_result:
for aggregation in aggregation_results:
print(f"Total tasks (accessible from default alias) is {aggregation.value}")
# [END datastore_count_on_kind]
return tasks


def count_query_with_limit(client):
# [START datastore_count_with_limit]
task1 = datastore.Entity(client.key("Task", "task1"))
task2 = datastore.Entity(client.key("Task", "task2"))
task3 = datastore.Entity(client.key("Task", "task3"))

tasks = [task1, task2, task3]
client.put_multi(tasks)
all_tasks_query = client.query(kind="Task")
all_tasks_count_query = client.aggregation_query(all_tasks_query).count()
query_result = all_tasks_count_query.fetch(limit=2)
for aggregation_results in query_result:
for aggregation in aggregation_results:
print(f"We have at least {aggregation.value} tasks")
# [END datastore_count_with_limit]
return tasks


def count_query_property_filter(client):
# [START datastore_count_with_property_filter]
task1 = datastore.Entity(client.key("Task", "task1"))
task2 = datastore.Entity(client.key("Task", "task2"))
task3 = datastore.Entity(client.key("Task", "task3"))

task1["done"] = True
task2["done"] = False
task3["done"] = True

tasks = [task1, task2, task3]
client.put_multi(tasks)
completed_tasks = client.query(kind="Task").add_filter("done", "=", True)
remaining_tasks = client.query(kind="Task").add_filter("done", "=", False)

completed_tasks_query = client.aggregation_query(query=completed_tasks).count(
alias="total_completed_count"
)
remaining_tasks_query = client.aggregation_query(query=remaining_tasks).count(
alias="total_remaining_count"
)

completed_query_result = completed_tasks_query.fetch()
for aggregation_results in completed_query_result:
for aggregation_result in aggregation_results:
if aggregation_result.alias == "total_completed_count":
print(f"Total completed tasks count is {aggregation_result.value}")

remaining_query_result = remaining_tasks_query.fetch()
for aggregation_results in remaining_query_result:
for aggregation_result in aggregation_results:
if aggregation_result.alias == "total_remaining_count":
print(f"Total remaining tasks count is {aggregation_result.value}")
# [END datastore_count_with_property_filter]
return tasks


def count_query_with_stale_read(client):

tasks = [task for task in client.query(kind="Task").fetch()]
client.delete_multi(tasks) # ensure the database is empty before starting

# [START datastore_count_query_with_stale_read]
task1 = datastore.Entity(client.key("Task", "task1"))
task2 = datastore.Entity(client.key("Task", "task2"))

# Saving two tasks
task1["done"] = True
task2["done"] = False
client.put_multi([task1, task2])
time.sleep(10)

past_timestamp = datetime.now(
timezone.utc
) # we have two tasks in database at this time.
time.sleep(10)

# Saving third task
task3 = datastore.Entity(client.key("Task", "task3"))
task3["done"] = False
client.put(task3)

all_tasks = client.query(kind="Task")
all_tasks_count = client.aggregation_query(
query=all_tasks,
).count(alias="all_tasks_count")

# Executing aggregation query
query_result = all_tasks_count.fetch()
for aggregation_results in query_result:
for aggregation_result in aggregation_results:
print(f"Latest tasks count is {aggregation_result.value}")

# Executing aggregation query with past timestamp
tasks_in_past = client.aggregation_query(query=all_tasks).count(
alias="tasks_in_past"
)
tasks_in_the_past_query_result = tasks_in_past.fetch(read_time=past_timestamp)
for aggregation_results in tasks_in_the_past_query_result:
for aggregation_result in aggregation_results:
print(f"Stale tasks count is {aggregation_result.value}")
# [END datastore_count_query_with_stale_read]
return [task1, task2, task3]


def main(project_id):
client = datastore.Client(project_id)

for name, function in globals().items():
if name in ("main", "_preamble", "defaultdict", "datetime", "timezone", "timedelta") or not callable(function):
if name in (
"main",
"_preamble",
"defaultdict",
"datetime",
"timezone",
"timedelta",
) or not callable(function):
continue

print(name)
Expand Down
50 changes: 48 additions & 2 deletions samples/snippets/snippets_test.py
Expand Up @@ -15,8 +15,6 @@

import backoff
from google.cloud import datastore


import pytest

import snippets
Expand Down Expand Up @@ -72,3 +70,51 @@ def test_query_with_readtime(self, client):
tasks = snippets.query_with_readtime(client)
client.entities_to_delete.extend(tasks)
assert tasks is not None

@backoff.on_exception(backoff.expo, AssertionError, max_time=240)
def test_count_query_in_transaction(self, client):
with pytest.raises(ValueError) as excinfo:
snippets.count_query_in_transaction(client)
assert "User 'John' cannot have more than 2 tasks" in str(excinfo.value)

@backoff.on_exception(backoff.expo, AssertionError, max_time=240)
def test_count_query_on_kind(self, capsys, client):
tasks = snippets.count_query_on_kind(client)
captured = capsys.readouterr()
assert (
captured.out.strip() == "Total tasks (accessible from default alias) is 2"
)
assert captured.err == ""

client.entities_to_delete.extend(tasks)

@backoff.on_exception(backoff.expo, AssertionError, max_time=240)
def test_count_query_with_limit(self, capsys, client):
tasks = snippets.count_query_with_limit(client)
captured = capsys.readouterr()
assert captured.out.strip() == "We have at least 2 tasks"
assert captured.err == ""

client.entities_to_delete.extend(tasks)

@backoff.on_exception(backoff.expo, AssertionError, max_time=240)
def test_count_query_property_filter(self, capsys, client):
tasks = snippets.count_query_property_filter(client)
captured = capsys.readouterr()

assert "Total completed tasks count is 2" in captured.out
assert "Total remaining tasks count is 1" in captured.out
assert captured.err == ""

client.entities_to_delete.extend(tasks)

@backoff.on_exception(backoff.expo, AssertionError, max_time=240)
def test_count_query_with_stale_read(self, capsys, client):
tasks = snippets.count_query_with_stale_read(client)
captured = capsys.readouterr()

assert "Latest tasks count is 3" in captured.out
assert "Stale tasks count is 2" in captured.out
assert captured.err == ""

client.entities_to_delete.extend(tasks)