Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We鈥檒l occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add read_time support for get and query #334

Merged
merged 9 commits into from Jul 15, 2022
34 changes: 28 additions & 6 deletions google/cloud/datastore/client.py
Expand Up @@ -123,6 +123,7 @@ def _extended_lookup(
transaction_id=None,
retry=None,
timeout=None,
read_time=None,
):
"""Repeat lookup until all keys found (unless stop requested).

Expand Down Expand Up @@ -157,7 +158,7 @@ def _extended_lookup(
:type transaction_id: str
:param transaction_id: If passed, make the request in the scope of
the given transaction. Incompatible with
``eventual==True``.
``eventual==True`` or ``read_time``.

:type retry: :class:`google.api_core.retry.Retry`
:param retry:
Expand All @@ -170,6 +171,12 @@ def _extended_lookup(
Note that if ``retry`` is specified, the timeout applies
to each individual attempt.

:type read_time: datetime
:param read_time:
(Optional) Read time to use for read consistency. Incompatible with
``eventual==True`` or ``transaction_id``.
yixiaoshen marked this conversation as resolved.
Show resolved Hide resolved
This feature is in private preview.

:rtype: list of :class:`.entity_pb2.Entity`
:returns: The requested entities.
:raises: :class:`ValueError` if missing / deferred are not null or
Expand All @@ -186,7 +193,7 @@ def _extended_lookup(
results = []

loop_num = 0
read_options = helpers.get_read_options(eventual, transaction_id)
read_options = helpers.get_read_options(eventual, transaction_id, read_time)
while loop_num < _MAX_LOOPS: # loop against possible deferred.
loop_num += 1
lookup_response = datastore_api.lookup(
Expand Down Expand Up @@ -401,6 +408,7 @@ def get(
eventual=False,
retry=None,
timeout=None,
read_time=None,
):
"""Retrieve an entity from a single key (if it exists).

Expand Down Expand Up @@ -430,7 +438,8 @@ def get(
:type eventual: bool
:param eventual: (Optional) Defaults to strongly consistent (False).
Setting True will use eventual consistency, but cannot
be used inside a transaction or will raise ValueError.
be used inside a transaction or with read_time, or will
raise ValueError.

:type retry: :class:`google.api_core.retry.Retry`
:param retry:
Expand All @@ -443,10 +452,16 @@ def get(
Note that if ``retry`` is specified, the timeout applies
to each individual attempt.

:type read_time: datetime
:param read_time: Read the entity from the specified time (may be null).
Cannot be used with eventual consistency or inside a
transaction, or will raise ValueError. This feature is in private preview.

:rtype: :class:`google.cloud.datastore.entity.Entity` or ``NoneType``
:returns: The requested entity if it exists.

:raises: :class:`ValueError` if eventual is True and in a transaction.
:raises: :class:`ValueError` if more than one of ``eventual==True``,
``transaction``, and ``read_time`` is specified.
"""
entities = self.get_multi(
keys=[key],
Expand All @@ -456,6 +471,7 @@ def get(
eventual=eventual,
retry=retry,
timeout=timeout,
read_time=read_time,
)
if entities:
return entities[0]
Expand All @@ -469,6 +485,7 @@ def get_multi(
eventual=False,
retry=None,
timeout=None,
read_time=None,
):
"""Retrieve entities, along with their attributes.

Expand Down Expand Up @@ -506,11 +523,15 @@ def get_multi(
Note that if ``retry`` is specified, the timeout applies
to each individual attempt.

:type read_time: datetime
:param read_time: (Optional) Read time to use for read consistency. This feature is in private preview.

:rtype: list of :class:`google.cloud.datastore.entity.Entity`
:returns: The requested entities.
:raises: :class:`ValueError` if one or more of ``keys`` has a project
which does not match our project.
:raises: :class:`ValueError` if eventual is True and in a transaction.
which does not match our project; or if more than one of
``eventual==True``, ``transaction``, and ``read_time`` is
specified.
"""
if not keys:
return []
Expand All @@ -533,6 +554,7 @@ def get_multi(
transaction_id=transaction and transaction.id,
retry=retry,
timeout=timeout,
read_time=read_time,
)

if missing is not None:
Expand Down
28 changes: 21 additions & 7 deletions google/cloud/datastore/helpers.py
Expand Up @@ -29,6 +29,7 @@
from google.cloud.datastore_v1.types import entity as entity_pb2
from google.cloud.datastore.entity import Entity
from google.cloud.datastore.key import Key
from google.protobuf import timestamp_pb2


def _get_meaning(value_pb, is_list=False):
Expand Down Expand Up @@ -230,7 +231,7 @@ def entity_to_protobuf(entity):
return entity_pb


def get_read_options(eventual, transaction_id):
def get_read_options(eventual, transaction_id, read_time=None):
"""Validate rules for read options, and assign to the request.

Helper method for ``lookup()`` and ``run_query``.
Expand All @@ -242,21 +243,34 @@ def get_read_options(eventual, transaction_id):
:type transaction_id: bytes
:param transaction_id: A transaction identifier (may be null).

:type read_time: datetime
:param read_time: Read data from the specified time (may be null). This feature is in private preview.

:rtype: :class:`.datastore_pb2.ReadOptions`
:returns: The read options corresponding to the inputs.
:raises: :class:`ValueError` if ``eventual`` is ``True`` and the
``transaction_id`` is not ``None``.
:raises: :class:`ValueError` if more than one of ``eventual==True``,
``transaction``, and ``read_time`` is specified.
"""
if transaction_id is None:
if eventual:
return datastore_pb2.ReadOptions(
read_consistency=datastore_pb2.ReadOptions.ReadConsistency.EVENTUAL
)
if read_time is not None:
raise ValueError("eventual must be False when read_time is specified")
else:
return datastore_pb2.ReadOptions(
read_consistency=datastore_pb2.ReadOptions.ReadConsistency.EVENTUAL
)
else:
return datastore_pb2.ReadOptions()
if read_time is None:
return datastore_pb2.ReadOptions()
else:
read_time_pb = timestamp_pb2.Timestamp()
read_time_pb.FromDatetime(read_time)
return datastore_pb2.ReadOptions(read_time=read_time_pb)
else:
if eventual:
raise ValueError("eventual must be False when in a transaction")
elif read_time is not None:
raise ValueError("transaction and read_time are mutual exclusive")
else:
return datastore_pb2.ReadOptions(transaction=transaction_id)

Expand Down
23 changes: 20 additions & 3 deletions google/cloud/datastore/query.py
Expand Up @@ -357,6 +357,7 @@ def fetch(
eventual=False,
retry=None,
timeout=None,
read_time=None,
):
"""Execute the Query; return an iterator for the matching entities.

Expand Down Expand Up @@ -412,7 +413,8 @@ def fetch(
:param eventual: (Optional) Defaults to strongly consistent (False).
Setting True will use eventual consistency,
but cannot be used inside a transaction or
will raise ValueError.
with read_time, otherwise will raise
ValueError.

:type retry: :class:`google.api_core.retry.Retry`
:param retry:
Expand All @@ -425,6 +427,11 @@ def fetch(
Note that if ``retry`` is specified, the timeout applies
to each individual attempt.

:type read_time: datetime
:param read_time:
(Optional) use read_time read consistency, cannot be used inside a
transaction or with eventual consistency, or will raise ValueError.

:rtype: :class:`Iterator`
:returns: The iterator for the query.
"""
Expand All @@ -441,6 +448,7 @@ def fetch(
eventual=eventual,
retry=retry,
timeout=timeout,
read_time=read_time,
)


Expand Down Expand Up @@ -473,7 +481,7 @@ class Iterator(page_iterator.Iterator):
:param eventual: (Optional) Defaults to strongly consistent (False).
Setting True will use eventual consistency,
but cannot be used inside a transaction or
will raise ValueError.
with read_time, otherwise will raise ValueError.

:type retry: :class:`google.api_core.retry.Retry`
:param retry:
Expand All @@ -485,6 +493,11 @@ class Iterator(page_iterator.Iterator):
Time, in seconds, to wait for the request to complete.
Note that if ``retry`` is specified, the timeout applies
to each individual attempt.

:type read_time: datetime
:param read_time: (Optional) Runs the query with read time consistency.
Cannot be used with eventual consistency or inside a
transaction, otherwise will raise ValueError. This feature is in private preview.
"""

next_page_token = None
Expand All @@ -500,6 +513,7 @@ def __init__(
eventual=False,
retry=None,
timeout=None,
read_time=None,
):
super(Iterator, self).__init__(
client=client,
Expand All @@ -513,6 +527,7 @@ def __init__(
self._eventual = eventual
self._retry = retry
self._timeout = timeout
self._read_time = read_time
# The attributes below will change over the life of the iterator.
self._more_results = True
self._skipped_results = 0
Expand Down Expand Up @@ -593,7 +608,9 @@ def _next_page(self):
transaction_id = None
else:
transaction_id = transaction.id
read_options = helpers.get_read_options(self._eventual, transaction_id)
read_options = helpers.get_read_options(
self._eventual, transaction_id, self._read_time
)

partition_id = entity_pb2.PartitionId(
project_id=self._query.project, namespace_id=self._query.namespace
Expand Down
24 changes: 21 additions & 3 deletions google/cloud/datastore/transaction.py
Expand Up @@ -16,6 +16,7 @@

from google.cloud.datastore.batch import Batch
from google.cloud.datastore_v1.types import TransactionOptions
from google.protobuf import timestamp_pb2


def _make_retry_timeout_kwargs(retry, timeout):
Expand Down Expand Up @@ -141,18 +142,35 @@ class Transaction(Batch):

:type read_only: bool
:param read_only: indicates the transaction is read only.

:type read_time: datetime
:param read_time: (Optional) Time at which the transaction reads entities.
Only allowed when ``read_only=True``. This feature is in private preview.

:raises: :class:`ValueError` if read_time is specified when
``read_only=False``.
"""

_status = None

def __init__(self, client, read_only=False):
def __init__(self, client, read_only=False, read_time=None):
super(Transaction, self).__init__(client)
self._id = None

if read_only:
options = TransactionOptions(read_only=TransactionOptions.ReadOnly())
if read_time is not None:
read_time_pb = timestamp_pb2.Timestamp()
read_time_pb.FromDatetime(read_time)
options = TransactionOptions(
read_only=TransactionOptions.ReadOnly(read_time=read_time_pb)
)
else:
options = TransactionOptions(read_only=TransactionOptions.ReadOnly())
else:
options = TransactionOptions()
if read_time is not None:
raise ValueError("read_time is only allowed in read only transaction.")
else:
options = TransactionOptions()

self._options = options

Expand Down