diff --git a/google/cloud/datastore/client.py b/google/cloud/datastore/client.py index 03829ce0..212ba1d4 100644 --- a/google/cloud/datastore/client.py +++ b/google/cloud/datastore/client.py @@ -123,6 +123,7 @@ def _extended_lookup( transaction_id=None, retry=None, timeout=None, + read_time=None, ): """Repeat lookup until all keys found (unless stop requested). @@ -157,7 +158,7 @@ def _extended_lookup( :type transaction_id: str :param transaction_id: If passed, make the request in the scope of the given transaction. Incompatible with - ``eventual==True``. + ``eventual==True`` or ``read_time``. :type retry: :class:`google.api_core.retry.Retry` :param retry: @@ -170,6 +171,12 @@ def _extended_lookup( Note that if ``retry`` is specified, the timeout applies to each individual attempt. + :type read_time: datetime + :param read_time: + (Optional) Read time to use for read consistency. Incompatible with + ``eventual==True`` or ``transaction_id``. + This feature is in private preview. + :rtype: list of :class:`.entity_pb2.Entity` :returns: The requested entities. :raises: :class:`ValueError` if missing / deferred are not null or @@ -186,7 +193,7 @@ def _extended_lookup( results = [] loop_num = 0 - read_options = helpers.get_read_options(eventual, transaction_id) + read_options = helpers.get_read_options(eventual, transaction_id, read_time) while loop_num < _MAX_LOOPS: # loop against possible deferred. loop_num += 1 lookup_response = datastore_api.lookup( @@ -401,6 +408,7 @@ def get( eventual=False, retry=None, timeout=None, + read_time=None, ): """Retrieve an entity from a single key (if it exists). @@ -430,7 +438,8 @@ def get( :type eventual: bool :param eventual: (Optional) Defaults to strongly consistent (False). Setting True will use eventual consistency, but cannot - be used inside a transaction or will raise ValueError. + be used inside a transaction or with read_time, or will + raise ValueError. :type retry: :class:`google.api_core.retry.Retry` :param retry: @@ -443,10 +452,16 @@ def get( Note that if ``retry`` is specified, the timeout applies to each individual attempt. + :type read_time: datetime + :param read_time: Read the entity from the specified time (may be null). + Cannot be used with eventual consistency or inside a + transaction, or will raise ValueError. This feature is in private preview. + :rtype: :class:`google.cloud.datastore.entity.Entity` or ``NoneType`` :returns: The requested entity if it exists. - :raises: :class:`ValueError` if eventual is True and in a transaction. + :raises: :class:`ValueError` if more than one of ``eventual==True``, + ``transaction``, and ``read_time`` is specified. """ entities = self.get_multi( keys=[key], @@ -456,6 +471,7 @@ def get( eventual=eventual, retry=retry, timeout=timeout, + read_time=read_time, ) if entities: return entities[0] @@ -469,6 +485,7 @@ def get_multi( eventual=False, retry=None, timeout=None, + read_time=None, ): """Retrieve entities, along with their attributes. @@ -506,11 +523,15 @@ def get_multi( Note that if ``retry`` is specified, the timeout applies to each individual attempt. + :type read_time: datetime + :param read_time: (Optional) Read time to use for read consistency. This feature is in private preview. + :rtype: list of :class:`google.cloud.datastore.entity.Entity` :returns: The requested entities. :raises: :class:`ValueError` if one or more of ``keys`` has a project - which does not match our project. - :raises: :class:`ValueError` if eventual is True and in a transaction. + which does not match our project; or if more than one of + ``eventual==True``, ``transaction``, and ``read_time`` is + specified. """ if not keys: return [] @@ -533,6 +554,7 @@ def get_multi( transaction_id=transaction and transaction.id, retry=retry, timeout=timeout, + read_time=read_time, ) if missing is not None: diff --git a/google/cloud/datastore/helpers.py b/google/cloud/datastore/helpers.py index f976070e..123f356e 100644 --- a/google/cloud/datastore/helpers.py +++ b/google/cloud/datastore/helpers.py @@ -29,6 +29,7 @@ from google.cloud.datastore_v1.types import entity as entity_pb2 from google.cloud.datastore.entity import Entity from google.cloud.datastore.key import Key +from google.protobuf import timestamp_pb2 def _get_meaning(value_pb, is_list=False): @@ -230,7 +231,7 @@ def entity_to_protobuf(entity): return entity_pb -def get_read_options(eventual, transaction_id): +def get_read_options(eventual, transaction_id, read_time=None): """Validate rules for read options, and assign to the request. Helper method for ``lookup()`` and ``run_query``. @@ -242,21 +243,34 @@ def get_read_options(eventual, transaction_id): :type transaction_id: bytes :param transaction_id: A transaction identifier (may be null). + :type read_time: datetime + :param read_time: Read data from the specified time (may be null). This feature is in private preview. + :rtype: :class:`.datastore_pb2.ReadOptions` :returns: The read options corresponding to the inputs. - :raises: :class:`ValueError` if ``eventual`` is ``True`` and the - ``transaction_id`` is not ``None``. + :raises: :class:`ValueError` if more than one of ``eventual==True``, + ``transaction``, and ``read_time`` is specified. """ if transaction_id is None: if eventual: - return datastore_pb2.ReadOptions( - read_consistency=datastore_pb2.ReadOptions.ReadConsistency.EVENTUAL - ) + if read_time is not None: + raise ValueError("eventual must be False when read_time is specified") + else: + return datastore_pb2.ReadOptions( + read_consistency=datastore_pb2.ReadOptions.ReadConsistency.EVENTUAL + ) else: - return datastore_pb2.ReadOptions() + if read_time is None: + return datastore_pb2.ReadOptions() + else: + read_time_pb = timestamp_pb2.Timestamp() + read_time_pb.FromDatetime(read_time) + return datastore_pb2.ReadOptions(read_time=read_time_pb) else: if eventual: raise ValueError("eventual must be False when in a transaction") + elif read_time is not None: + raise ValueError("transaction and read_time are mutual exclusive") else: return datastore_pb2.ReadOptions(transaction=transaction_id) diff --git a/google/cloud/datastore/query.py b/google/cloud/datastore/query.py index 57c19205..5907f3c1 100644 --- a/google/cloud/datastore/query.py +++ b/google/cloud/datastore/query.py @@ -357,6 +357,7 @@ def fetch( eventual=False, retry=None, timeout=None, + read_time=None, ): """Execute the Query; return an iterator for the matching entities. @@ -412,7 +413,8 @@ def fetch( :param eventual: (Optional) Defaults to strongly consistent (False). Setting True will use eventual consistency, but cannot be used inside a transaction or - will raise ValueError. + with read_time, otherwise will raise + ValueError. :type retry: :class:`google.api_core.retry.Retry` :param retry: @@ -425,6 +427,11 @@ def fetch( Note that if ``retry`` is specified, the timeout applies to each individual attempt. + :type read_time: datetime + :param read_time: + (Optional) use read_time read consistency, cannot be used inside a + transaction or with eventual consistency, or will raise ValueError. + :rtype: :class:`Iterator` :returns: The iterator for the query. """ @@ -441,6 +448,7 @@ def fetch( eventual=eventual, retry=retry, timeout=timeout, + read_time=read_time, ) @@ -473,7 +481,7 @@ class Iterator(page_iterator.Iterator): :param eventual: (Optional) Defaults to strongly consistent (False). Setting True will use eventual consistency, but cannot be used inside a transaction or - will raise ValueError. + with read_time, otherwise will raise ValueError. :type retry: :class:`google.api_core.retry.Retry` :param retry: @@ -485,6 +493,11 @@ class Iterator(page_iterator.Iterator): Time, in seconds, to wait for the request to complete. Note that if ``retry`` is specified, the timeout applies to each individual attempt. + + :type read_time: datetime + :param read_time: (Optional) Runs the query with read time consistency. + Cannot be used with eventual consistency or inside a + transaction, otherwise will raise ValueError. This feature is in private preview. """ next_page_token = None @@ -500,6 +513,7 @@ def __init__( eventual=False, retry=None, timeout=None, + read_time=None, ): super(Iterator, self).__init__( client=client, @@ -513,6 +527,7 @@ def __init__( self._eventual = eventual self._retry = retry self._timeout = timeout + self._read_time = read_time # The attributes below will change over the life of the iterator. self._more_results = True self._skipped_results = 0 @@ -593,7 +608,9 @@ def _next_page(self): transaction_id = None else: transaction_id = transaction.id - read_options = helpers.get_read_options(self._eventual, transaction_id) + read_options = helpers.get_read_options( + self._eventual, transaction_id, self._read_time + ) partition_id = entity_pb2.PartitionId( project_id=self._query.project, namespace_id=self._query.namespace diff --git a/google/cloud/datastore/transaction.py b/google/cloud/datastore/transaction.py index 5da64198..dc18e64d 100644 --- a/google/cloud/datastore/transaction.py +++ b/google/cloud/datastore/transaction.py @@ -16,6 +16,7 @@ from google.cloud.datastore.batch import Batch from google.cloud.datastore_v1.types import TransactionOptions +from google.protobuf import timestamp_pb2 def _make_retry_timeout_kwargs(retry, timeout): @@ -141,18 +142,35 @@ class Transaction(Batch): :type read_only: bool :param read_only: indicates the transaction is read only. + + :type read_time: datetime + :param read_time: (Optional) Time at which the transaction reads entities. + Only allowed when ``read_only=True``. This feature is in private preview. + + :raises: :class:`ValueError` if read_time is specified when + ``read_only=False``. """ _status = None - def __init__(self, client, read_only=False): + def __init__(self, client, read_only=False, read_time=None): super(Transaction, self).__init__(client) self._id = None if read_only: - options = TransactionOptions(read_only=TransactionOptions.ReadOnly()) + if read_time is not None: + read_time_pb = timestamp_pb2.Timestamp() + read_time_pb.FromDatetime(read_time) + options = TransactionOptions( + read_only=TransactionOptions.ReadOnly(read_time=read_time_pb) + ) + else: + options = TransactionOptions(read_only=TransactionOptions.ReadOnly()) else: - options = TransactionOptions() + if read_time is not None: + raise ValueError("read_time is only allowed in read only transaction.") + else: + options = TransactionOptions() self._options = options diff --git a/tests/system/test_read_consistency.py b/tests/system/test_read_consistency.py new file mode 100644 index 00000000..d65b9356 --- /dev/null +++ b/tests/system/test_read_consistency.py @@ -0,0 +1,112 @@ +# Copyright 2022 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import time + +from datetime import datetime + +from google.cloud import datastore + + +def _parent_key(datastore_client): + return datastore_client.key("Blog", "PizzaMan") + + +def _put_entity(datastore_client, entity_id): + key = datastore_client.key( + "read_time_test", entity_id, parent=_parent_key(datastore_client) + ) + entity = datastore.Entity(key=key) + entity["field"] = "old_value" + datastore_client.put(entity) + return entity + + +def test_get_w_read_time(datastore_client, entities_to_delete): + entity = _put_entity(datastore_client, 1) + + entities_to_delete.append(entity) + + # Add some sleep to accommodate server & client clock discrepancy. + time.sleep(1) + read_time = datetime.now() + time.sleep(1) + + entity["field"] = "new_value" + datastore_client.put(entity) + + # Get without read_time. + retrieved_entity = datastore_client.get(entity.key) + assert retrieved_entity["field"] == "new_value" + + # Directly specify read_time in get request. + retrieved_entity_from_read_time = datastore_client.get( + entity.key, read_time=read_time + ) + assert retrieved_entity_from_read_time["field"] == "old_value" + + # Use read_time in a read_only transaction. + with datastore_client.transaction(read_only=True, read_time=read_time): + retrieved_entity_from_xact = datastore_client.get(entity.key) + assert retrieved_entity_from_xact["field"] == "old_value" + + +def test_query_w_read_time(datastore_client, entities_to_delete): + entity0 = _put_entity(datastore_client, 1) + entity1 = _put_entity(datastore_client, 2) + entity2 = _put_entity(datastore_client, 3) + + entities_to_delete.append(entity0) + entities_to_delete.append(entity1) + entities_to_delete.append(entity2) + + # Add some sleep to accommodate server & client clock discrepancy. + time.sleep(1) + read_time = datetime.now() + time.sleep(1) + + entity2["field"] = "new_value" + datastore_client.put(entity2) + + query = datastore_client.query( + kind="read_time_test", ancestor=_parent_key(datastore_client) + ) + query = query.add_filter("field", "=", "old_value") + + # Query without read_time. + iterator = query.fetch() + page = next(iterator.pages) + query_results = list(page) + assert len(query_results) == 2 + assert query_results[0].key == entity0.key + assert query_results[1].key == entity1.key + + # Directly specify read_time in query. + iterator_read_time = query.fetch(read_time=read_time) + page_read_time = next(iterator_read_time.pages) + query_results_read_time = list(page_read_time) + assert len(query_results_read_time) == 3 + assert query_results_read_time[0].key == entity0.key + assert query_results_read_time[1].key == entity1.key + assert query_results_read_time[2].key == entity2.key + + # Run the query in a read_only transacxtion with read_time. + with datastore_client.transaction(read_only=True, read_time=read_time): + iterator_from_xact = query.fetch() + page_from_xact = next(iterator_from_xact.pages) + query_results_from_xact = list(page_from_xact) + assert len(query_results_from_xact) == 3 + assert query_results_from_xact[0].key == entity0.key + assert query_results_from_xact[1].key == entity1.key + assert query_results_from_xact[2].key == entity2.key diff --git a/tests/unit/test_client.py b/tests/unit/test_client.py index 51cddb6a..2a15677a 100644 --- a/tests/unit/test_client.py +++ b/tests/unit/test_client.py @@ -367,6 +367,7 @@ def test_client_get_miss(): eventual=False, retry=None, timeout=None, + read_time=None, ) @@ -389,6 +390,7 @@ def test_client_get_hit(): eventual=False, retry=None, timeout=None, + read_time=None, ) @@ -658,6 +660,50 @@ def test_client_get_multi_hit_w_transaction(): ) +def test_client_get_multi_hit_w_read_time(): + from datetime import datetime + + from google.cloud.datastore.key import Key + from google.cloud.datastore_v1.types import datastore as datastore_pb2 + from google.protobuf.timestamp_pb2 import Timestamp + + read_time = datetime.utcfromtimestamp(1641058200.123456) + read_time_pb = Timestamp(seconds=1641058200, nanos=123456000) + kind = "Kind" + id_ = 1234 + path = [{"kind": kind, "id": id_}] + + # Make a found entity pb to be returned from mock backend. + entity_pb = _make_entity_pb(PROJECT, kind, id_, "foo", "Foo") + + # Make a connection to return the entity pb. + creds = _make_credentials() + client = _make_client(credentials=creds) + lookup_response = _make_lookup_response(results=[entity_pb]) + ds_api = _make_datastore_api(lookup_response=lookup_response) + client._datastore_api_internal = ds_api + + key = Key(kind, id_, project=PROJECT) + (result,) = client.get_multi([key], read_time=read_time) + new_key = result.key + + # Check the returned value is as expected. + assert new_key is not key + assert new_key.project == PROJECT + assert new_key.path == path + assert list(result) == ["foo"] + assert result["foo"] == "Foo" + + read_options = datastore_pb2.ReadOptions(read_time=read_time_pb) + ds_api.lookup.assert_called_once_with( + request={ + "project_id": PROJECT, + "keys": [key.to_protobuf()], + "read_options": read_options, + } + ) + + def test_client_get_multi_hit_multiple_keys_same_project(): from google.cloud.datastore_v1.types import datastore as datastore_pb2 from google.cloud.datastore.key import Key diff --git a/tests/unit/test_helpers.py b/tests/unit/test_helpers.py index a8477f2d..cf626ee3 100644 --- a/tests/unit/test_helpers.py +++ b/tests/unit/test_helpers.py @@ -495,43 +495,84 @@ def test_w_nothing_in_pb(): key_from_protobuf(pb) -def test__get_read_options_w_eventual_w_txn(): +def test__get_read_options_w_eventual_w_txn_wo_read_time(): from google.cloud.datastore.helpers import get_read_options with pytest.raises(ValueError): - get_read_options(True, b"123") + get_read_options(True, b"123", None) -def test__get_read_options_w_eventual_wo_txn(): - from google.cloud.datastore_v1.types import datastore as datastore_pb2 +def test__get_read_options_w_eventual_wo_txn_wo_read_time(): from google.cloud.datastore.helpers import get_read_options + from google.cloud.datastore_v1.types import datastore as datastore_pb2 - read_options = get_read_options(True, None) + read_options = get_read_options(True, None, None) expected = datastore_pb2.ReadOptions( read_consistency=datastore_pb2.ReadOptions.ReadConsistency.EVENTUAL ) assert read_options == expected -def test__get_read_options_w_default_w_txn(): - from google.cloud.datastore_v1.types import datastore as datastore_pb2 +def test__get_read_options_w_evntual_w_txn_w_read_time(): + from datetime import datetime + + from google.cloud.datastore.helpers import get_read_options + + with pytest.raises(ValueError): + get_read_options(True, b"123", datetime(2022, 1, 1, 17, 30, 0, 123456)) + + +def test__get_read_options_w_evntual_wo_txn_w_read_time(): + from datetime import datetime + + from google.cloud.datastore.helpers import get_read_options + + with pytest.raises(ValueError): + get_read_options(True, None, datetime(2022, 1, 1, 17, 30, 0, 123456)) + + +def test__get_read_options_w_default_w_txn_wo_read_time(): from google.cloud.datastore.helpers import get_read_options + from google.cloud.datastore_v1.types import datastore as datastore_pb2 txn_id = b"123abc-easy-as" - read_options = get_read_options(False, txn_id) + read_options = get_read_options(False, txn_id, None) expected = datastore_pb2.ReadOptions(transaction=txn_id) assert read_options == expected -def test__get_read_options_w_default_wo_txn(): - from google.cloud.datastore_v1.types import datastore as datastore_pb2 +def test__get_read_options_w_default_wo_txn_wo_read_time(): from google.cloud.datastore.helpers import get_read_options + from google.cloud.datastore_v1.types import datastore as datastore_pb2 - read_options = get_read_options(False, None) + read_options = get_read_options(False, None, None) expected = datastore_pb2.ReadOptions() assert read_options == expected +def test__get_read_options_w_default_w_txn_w_read_time(): + from datetime import datetime + + from google.cloud.datastore.helpers import get_read_options + + with pytest.raises(ValueError): + get_read_options(False, b"123", datetime(2022, 1, 1, 17, 30, 0, 123456)) + + +def test__get_read_options_w_default_wo_txn_w_read_time(): + from datetime import datetime + + from google.cloud.datastore.helpers import get_read_options + from google.cloud.datastore_v1.types import datastore as datastore_pb2 + from google.protobuf.timestamp_pb2 import Timestamp + + read_time = datetime.utcfromtimestamp(1641058200.123456) + read_time_pb = Timestamp(seconds=1641058200, nanos=123456000) + read_options = get_read_options(False, None, read_time) + expected = datastore_pb2.ReadOptions(read_time=read_time_pb) + assert read_options == expected + + def test__pb_attr_value_w_datetime_naive(): import calendar import datetime diff --git a/tests/unit/test_query.py b/tests/unit/test_query.py index 1f250f46..b473a8c7 100644 --- a/tests/unit/test_query.py +++ b/tests/unit/test_query.py @@ -11,6 +11,7 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. +import datetime import mock import pytest @@ -555,12 +556,13 @@ def test_iterator__process_query_results_bad_enum(): iterator._process_query_results(response_pb) -def _next_page_helper(txn_id=None, retry=None, timeout=None): +def _next_page_helper(txn_id=None, retry=None, timeout=None, read_time=None): from google.api_core import page_iterator + from google.cloud.datastore.query import Query from google.cloud.datastore_v1.types import datastore as datastore_pb2 from google.cloud.datastore_v1.types import entity as entity_pb2 from google.cloud.datastore_v1.types import query as query_pb2 - from google.cloud.datastore.query import Query + from google.protobuf.timestamp_pb2 import Timestamp more_enum = query_pb2.QueryResultBatch.MoreResultsType.NOT_FINISHED result = _make_query_response([], b"", more_enum, 0) @@ -581,7 +583,11 @@ def _next_page_helper(txn_id=None, retry=None, timeout=None): if timeout is not None: kwargs["timeout"] = timeout - iterator = _make_iterator(query, client, **kwargs) + it_kwargs = kwargs.copy() + if read_time is not None: + it_kwargs["read_time"] = read_time + + iterator = _make_iterator(query, client, **it_kwargs) page = iterator._next_page() @@ -589,10 +595,14 @@ def _next_page_helper(txn_id=None, retry=None, timeout=None): assert page._parent is iterator partition_id = entity_pb2.PartitionId(project_id=project) - if txn_id is None: - read_options = datastore_pb2.ReadOptions() - else: + if txn_id is not None: read_options = datastore_pb2.ReadOptions(transaction=txn_id) + elif read_time is not None: + read_time_pb = Timestamp() + read_time_pb.FromDatetime(read_time) + read_options = datastore_pb2.ReadOptions(read_time=read_time_pb) + else: + read_options = datastore_pb2.ReadOptions() empty_query = query_pb2.Query() ds_api.run_query.assert_called_once_with( request={ @@ -622,6 +632,11 @@ def test_iterator__next_page_in_transaction(): _next_page_helper(txn_id) +def test_iterator__next_page_w_read_time(): + read_time = datetime.datetime.utcfromtimestamp(1641058200.123456) + _next_page_helper(read_time=read_time) + + def test_iterator__next_page_no_more(): from google.cloud.datastore.query import Query diff --git a/tests/unit/test_transaction.py b/tests/unit/test_transaction.py index 3e78a6a3..178bb4f1 100644 --- a/tests/unit/test_transaction.py +++ b/tests/unit/test_transaction.py @@ -44,6 +44,34 @@ def test_transaction_constructor_read_only(): assert xact._options == options +def test_transaction_constructor_w_read_time(): + from datetime import datetime + + project = "PROJECT" + id_ = 850302 + read_time = datetime.utcfromtimestamp(1641058200.123456) + ds_api = _make_datastore_api(xact=id_) + client = _Client(project, datastore_api=ds_api) + options = _make_options(read_only=True, read_time=read_time) + + xact = _make_transaction(client, read_only=True, read_time=read_time) + + assert xact._options == options + + +def test_transaction_constructor_read_write_w_read_time(): + from datetime import datetime + + project = "PROJECT" + id_ = 850302 + read_time = datetime.utcfromtimestamp(1641058200.123456) + ds_api = _make_datastore_api(xact=id_) + client = _Client(project, datastore_api=ds_api) + + with pytest.raises(ValueError): + _make_transaction(client, read_only=False, read_time=read_time) + + def test_transaction_current(): from google.cloud.datastore_v1.types import datastore as datastore_pb2 @@ -128,6 +156,24 @@ def test_transaction_begin_w_readonly(): ds_api.begin_transaction.assert_called_once_with(request=expected_request) +def test_transaction_begin_w_read_time(): + from datetime import datetime + + project = "PROJECT" + id_ = 889 + read_time = datetime.utcfromtimestamp(1641058200.123456) + ds_api = _make_datastore_api(xact_id=id_) + client = _Client(project, datastore_api=ds_api) + xact = _make_transaction(client, read_only=True, read_time=read_time) + + xact.begin() + + assert xact.id == id_ + + expected_request = _make_begin_request(project, read_only=True, read_time=read_time) + ds_api.begin_transaction.assert_called_once_with(request=expected_request) + + def test_transaction_begin_w_retry_w_timeout(): project = "PROJECT" id_ = 889 @@ -413,13 +459,20 @@ def __exit__(self, *args): self._client._pop_batch() -def _make_options(read_only=False, previous_transaction=None): +def _make_options(read_only=False, previous_transaction=None, read_time=None): from google.cloud.datastore_v1.types import TransactionOptions + from google.protobuf.timestamp_pb2 import Timestamp kw = {} if read_only: - kw["read_only"] = TransactionOptions.ReadOnly() + read_only_kw = {} + if read_time is not None: + read_time_pb = Timestamp() + read_time_pb.FromDatetime(read_time) + read_only_kw["read_time"] = read_time_pb + + kw["read_only"] = TransactionOptions.ReadOnly(**read_only_kw) return TransactionOptions(**kw) @@ -430,8 +483,8 @@ def _make_transaction(client, **kw): return Transaction(client, **kw) -def _make_begin_request(project, read_only=False): - expected_options = _make_options(read_only=read_only) +def _make_begin_request(project, read_only=False, read_time=None): + expected_options = _make_options(read_only=read_only, read_time=read_time) return { "project_id": project, "transaction_options": expected_options,