Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

PYTHON-3516 Improve test EventListener api #1114

Merged
merged 4 commits into from Nov 14, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
8 changes: 4 additions & 4 deletions test/test_auth.py
Expand Up @@ -392,7 +392,7 @@ def test_scram_skip_empty_exchange(self):

if client_context.version < (4, 4, -1):
# Assert we sent the skipEmptyExchange option.
first_event = listener.results["started"][0]
first_event = listener.started_events[0]
self.assertEqual(first_event.command_name, "saslStart")
self.assertEqual(first_event.command["options"], {"skipEmptyExchange": True})

Expand Down Expand Up @@ -449,17 +449,17 @@ def test_scram(self):
)
client.testscram.command("dbstats")

self.listener.results.clear()
self.listener.reset()
client = rs_or_single_client_noauth(
username="both", password="pwd", authSource="testscram", event_listeners=[self.listener]
)
client.testscram.command("dbstats")
if client_context.version.at_least(4, 4, -1):
# Speculative authentication in 4.4+ sends saslStart with the
# handshake.
self.assertEqual(self.listener.results["started"], [])
self.assertEqual(self.listener.started_events, [])
else:
started = self.listener.results["started"][0]
started = self.listener.started_events[0]
self.assertEqual(started.command.get("mechanism"), "SCRAM-SHA-256")

# Step 3: verify auth failure conditions
Expand Down
55 changes: 27 additions & 28 deletions test/test_change_stream.py
Expand Up @@ -167,7 +167,7 @@ def test_try_next_runs_one_getmore(self):
client = rs_or_single_client(event_listeners=[listener])
# Connect to the cluster.
client.admin.command("ping")
listener.results.clear()
listener.reset()
# ChangeStreams only read majority committed data so use w:majority.
coll = self.watched_collection().with_options(write_concern=WriteConcern("majority"))
coll.drop()
Expand All @@ -177,33 +177,33 @@ def test_try_next_runs_one_getmore(self):
self.addCleanup(coll.drop)
with self.change_stream_with_client(client, max_await_time_ms=250) as stream:
self.assertEqual(listener.started_command_names(), ["aggregate"])
listener.results.clear()
listener.reset()

# Confirm that only a single getMore is run even when no documents
# are returned.
self.assertIsNone(stream.try_next())
self.assertEqual(listener.started_command_names(), ["getMore"])
listener.results.clear()
listener.reset()
self.assertIsNone(stream.try_next())
self.assertEqual(listener.started_command_names(), ["getMore"])
listener.results.clear()
listener.reset()

# Get at least one change before resuming.
coll.insert_one({"_id": 2})
wait_until(lambda: stream.try_next() is not None, "get change from try_next")
listener.results.clear()
listener.reset()

# Cause the next request to initiate the resume process.
self.kill_change_stream_cursor(stream)
listener.results.clear()
listener.reset()

# The sequence should be:
# - getMore, fail
# - resume with aggregate command
# - no results, return immediately without another getMore
self.assertIsNone(stream.try_next())
self.assertEqual(listener.started_command_names(), ["getMore", "aggregate"])
listener.results.clear()
listener.reset()

# Stream still works after a resume.
coll.insert_one({"_id": 3})
Expand All @@ -217,7 +217,7 @@ def test_batch_size_is_honored(self):
client = rs_or_single_client(event_listeners=[listener])
# Connect to the cluster.
client.admin.command("ping")
listener.results.clear()
listener.reset()
# ChangeStreams only read majority committed data so use w:majority.
coll = self.watched_collection().with_options(write_concern=WriteConcern("majority"))
coll.drop()
Expand All @@ -229,12 +229,12 @@ def test_batch_size_is_honored(self):
expected = {"batchSize": 23}
with self.change_stream_with_client(client, max_await_time_ms=250, batch_size=23) as stream:
# Confirm that batchSize is honored for initial batch.
cmd = listener.results["started"][0].command
cmd = listener.started_events[0].command
self.assertEqual(cmd["cursor"], expected)
listener.results.clear()
listener.reset()
# Confirm that batchSize is honored by getMores.
self.assertIsNone(stream.try_next())
cmd = listener.results["started"][0].command
cmd = listener.started_events[0].command
key = next(iter(expected))
self.assertEqual(expected[key], cmd[key])

Expand All @@ -255,12 +255,11 @@ def test_start_at_operation_time(self):
@no_type_check
def _test_full_pipeline(self, expected_cs_stage):
client, listener = self.client_with_listener("aggregate")
results = listener.results
with self.change_stream_with_client(client, [{"$project": {"foo": 0}}]) as _:
pass

self.assertEqual(1, len(results["started"]))
command = results["started"][0]
self.assertEqual(1, len(listener.started_events))
command = listener.started_events[0]
self.assertEqual("aggregate", command.command_name)
self.assertEqual(
[{"$changeStream": expected_cs_stage}, {"$project": {"foo": 0}}],
Expand Down Expand Up @@ -464,7 +463,7 @@ def _get_expected_resume_token_legacy(self, stream, listener, previous_change=No
versions that don't support postBatchResumeToken. Assumes the stream
has never returned any changes if previous_change is None."""
if previous_change is None:
agg_cmd = listener.results["started"][0]
agg_cmd = listener.started_events[0]
stage = agg_cmd.command["pipeline"][0]["$changeStream"]
return stage.get("resumeAfter") or stage.get("startAfter")

Expand All @@ -481,7 +480,7 @@ def _get_expected_resume_token(self, stream, listener, previous_change=None):
if token is not None:
return token

response = listener.results["succeeded"][-1].reply
response = listener.succeeded_events[-1].reply
return response["cursor"]["postBatchResumeToken"]

@no_type_check
Expand Down Expand Up @@ -558,8 +557,8 @@ def test_no_resume_attempt_if_aggregate_command_fails(self):
pass

# Driver should have attempted aggregate command only once.
self.assertEqual(len(listener.results["started"]), 1)
self.assertEqual(listener.results["started"][0].command_name, "aggregate")
self.assertEqual(len(listener.started_events), 1)
self.assertEqual(listener.started_events[0].command_name, "aggregate")

# Prose test no. 5 - REMOVED
# Prose test no. 6 - SKIPPED
Expand Down Expand Up @@ -603,20 +602,20 @@ def test_start_at_operation_time_caching(self):
with self.change_stream_with_client(client) as cs:
self.kill_change_stream_cursor(cs)
cs.try_next()
cmd = listener.results["started"][-1].command
cmd = listener.started_events[-1].command
self.assertIsNotNone(cmd["pipeline"][0]["$changeStream"].get("startAtOperationTime"))

# Case 2: change stream started with startAtOperationTime
listener.results.clear()
listener.reset()
optime = self.get_start_at_operation_time()
with self.change_stream_with_client(client, start_at_operation_time=optime) as cs:
self.kill_change_stream_cursor(cs)
cs.try_next()
cmd = listener.results["started"][-1].command
cmd = listener.started_events[-1].command
self.assertEqual(
cmd["pipeline"][0]["$changeStream"].get("startAtOperationTime"),
optime,
str([k.command for k in listener.results["started"]]),
str([k.command for k in listener.started_events]),
)

# Prose test no. 10 - SKIPPED
Expand All @@ -631,7 +630,7 @@ def test_resumetoken_empty_batch(self):
self.assertIsNone(change_stream.try_next())
resume_token = change_stream.resume_token

response = listener.results["succeeded"][0].reply
response = listener.succeeded_events[0].reply
self.assertEqual(resume_token, response["cursor"]["postBatchResumeToken"])

# Prose test no. 11
Expand All @@ -643,7 +642,7 @@ def test_resumetoken_exhausted_batch(self):
self._populate_and_exhaust_change_stream(change_stream)
resume_token = change_stream.resume_token

response = listener.results["succeeded"][-1].reply
response = listener.succeeded_events[-1].reply
self.assertEqual(resume_token, response["cursor"]["postBatchResumeToken"])

# Prose test no. 12
Expand Down Expand Up @@ -737,7 +736,7 @@ def test_startafter_resume_uses_startafter_after_empty_getMore(self):
self.kill_change_stream_cursor(change_stream)
change_stream.try_next() # Resume attempt

response = listener.results["started"][-1]
response = listener.started_events[-1]
self.assertIsNone(response.command["pipeline"][0]["$changeStream"].get("resumeAfter"))
self.assertIsNotNone(response.command["pipeline"][0]["$changeStream"].get("startAfter"))

Expand All @@ -756,7 +755,7 @@ def test_startafter_resume_uses_resumeafter_after_nonempty_getMore(self):
self.kill_change_stream_cursor(change_stream)
change_stream.try_next() # Resume attempt

response = listener.results["started"][-1]
response = listener.started_events[-1]
self.assertIsNotNone(response.command["pipeline"][0]["$changeStream"].get("resumeAfter"))
self.assertIsNone(response.command["pipeline"][0]["$changeStream"].get("startAfter"))

Expand Down Expand Up @@ -1056,7 +1055,7 @@ def tearDownClass(cls):

def setUp(self):
super(TestAllLegacyScenarios, self).setUp()
self.listener.results.clear()
self.listener.reset()

def setUpCluster(self, scenario_dict):
assets = [
Expand Down Expand Up @@ -1128,7 +1127,7 @@ def check_event(self, event, expectation_dict):
self.assertEqual(getattr(event, key), value)

def tearDown(self):
self.listener.results.clear()
self.listener.reset()


_TEST_PATH = os.path.join(os.path.dirname(os.path.realpath(__file__)), "change_streams")
Expand Down
38 changes: 19 additions & 19 deletions test/test_collation.py
Expand Up @@ -113,11 +113,11 @@ def tearDownClass(cls):
super(TestCollation, cls).tearDownClass()

def tearDown(self):
self.listener.results.clear()
self.listener.reset()
super(TestCollation, self).tearDown()

def last_command_started(self):
return self.listener.results["started"][-1].command
return self.listener.started_events[-1].command

def assertCollationInLastCommand(self):
self.assertEqual(self.collation.document, self.last_command_started()["collation"])
Expand All @@ -129,7 +129,7 @@ def test_create_collection(self):

# Test passing collation as a dict as well.
self.db.test.drop()
self.listener.results.clear()
self.listener.reset()
self.db.create_collection("test", collation=self.collation.document)
self.assertCollationInLastCommand()

Expand All @@ -139,7 +139,7 @@ def test_index_model(self):

def test_create_index(self):
self.db.test.create_index("foo", collation=self.collation)
ci_cmd = self.listener.results["started"][0].command
ci_cmd = self.listener.started_events[0].command
self.assertEqual(self.collation.document, ci_cmd["indexes"][0]["collation"])

def test_aggregate(self):
Expand All @@ -154,18 +154,18 @@ def test_distinct(self):
self.db.test.distinct("foo", collation=self.collation)
self.assertCollationInLastCommand()

self.listener.results.clear()
self.listener.reset()
self.db.test.find(collation=self.collation).distinct("foo")
self.assertCollationInLastCommand()

def test_find_command(self):
self.db.test.insert_one({"is this thing on?": True})
self.listener.results.clear()
self.listener.reset()
next(self.db.test.find(collation=self.collation))
self.assertCollationInLastCommand()

def test_explain_command(self):
self.listener.results.clear()
self.listener.reset()
self.db.test.find(collation=self.collation).explain()
# The collation should be part of the explained command.
self.assertEqual(
Expand All @@ -174,40 +174,40 @@ def test_explain_command(self):

def test_delete(self):
self.db.test.delete_one({"foo": 42}, collation=self.collation)
command = self.listener.results["started"][0].command
command = self.listener.started_events[0].command
self.assertEqual(self.collation.document, command["deletes"][0]["collation"])

self.listener.results.clear()
self.listener.reset()
self.db.test.delete_many({"foo": 42}, collation=self.collation)
command = self.listener.results["started"][0].command
command = self.listener.started_events[0].command
self.assertEqual(self.collation.document, command["deletes"][0]["collation"])

def test_update(self):
self.db.test.replace_one({"foo": 42}, {"foo": 43}, collation=self.collation)
command = self.listener.results["started"][0].command
command = self.listener.started_events[0].command
self.assertEqual(self.collation.document, command["updates"][0]["collation"])

self.listener.results.clear()
self.listener.reset()
self.db.test.update_one({"foo": 42}, {"$set": {"foo": 43}}, collation=self.collation)
command = self.listener.results["started"][0].command
command = self.listener.started_events[0].command
self.assertEqual(self.collation.document, command["updates"][0]["collation"])

self.listener.results.clear()
self.listener.reset()
self.db.test.update_many({"foo": 42}, {"$set": {"foo": 43}}, collation=self.collation)
command = self.listener.results["started"][0].command
command = self.listener.started_events[0].command
self.assertEqual(self.collation.document, command["updates"][0]["collation"])

def test_find_and(self):
self.db.test.find_one_and_delete({"foo": 42}, collation=self.collation)
self.assertCollationInLastCommand()

self.listener.results.clear()
self.listener.reset()
self.db.test.find_one_and_update(
{"foo": 42}, {"$set": {"foo": 43}}, collation=self.collation
)
self.assertCollationInLastCommand()

self.listener.results.clear()
self.listener.reset()
self.db.test.find_one_and_replace({"foo": 42}, {"foo": 43}, collation=self.collation)
self.assertCollationInLastCommand()

Expand All @@ -229,8 +229,8 @@ def test_bulk_write(self):
]
)

delete_cmd = self.listener.results["started"][0].command
update_cmd = self.listener.results["started"][1].command
delete_cmd = self.listener.started_events[0].command
update_cmd = self.listener.started_events[1].command

def check_ops(ops):
for op in ops:
Expand Down