From 0d2ef75548910b856314f9d22a84c021095dd508 Mon Sep 17 00:00:00 2001 From: Shane Harvey Date: Fri, 11 Nov 2022 14:46:44 -0800 Subject: [PATCH 1/4] PYTHON-3516 Improve test EventListener api --- test/test_auth.py | 8 +- test/test_change_stream.py | 50 +++++------ test/test_collation.py | 38 ++++---- test/test_command_monitoring_legacy.py | 4 +- test/test_cursor.py | 46 +++++----- test/test_data_lake.py | 6 +- test/test_database.py | 2 +- test/test_encryption.py | 44 +++++----- test/test_monitoring.py | 64 +++++++------- test/test_read_concern.py | 29 +++--- test/test_read_preferences.py | 2 +- test/test_read_write_concern_spec.py | 8 +- test/test_retryable_reads.py | 6 +- test/test_retryable_writes.py | 56 ++++++------ test/test_server_selection.py | 2 +- test/test_server_selection_in_window.py | 2 +- test/test_session.py | 112 ++++++++++++------------ test/test_transactions.py | 10 +-- test/test_versioned_api.py | 4 +- test/utils.py | 39 ++++++--- test/utils_spec_runner.py | 8 +- 21 files changed, 281 insertions(+), 259 deletions(-) diff --git a/test/test_auth.py b/test/test_auth.py index 69ed27bda0..9d80f06c00 100644 --- a/test/test_auth.py +++ b/test/test_auth.py @@ -392,7 +392,7 @@ def test_scram_skip_empty_exchange(self): if client_context.version < (4, 4, -1): # Assert we sent the skipEmptyExchange option. - first_event = listener.results["started"][0] + first_event = listener.started_events[0] self.assertEqual(first_event.command_name, "saslStart") self.assertEqual(first_event.command["options"], {"skipEmptyExchange": True}) @@ -449,7 +449,7 @@ def test_scram(self): ) client.testscram.command("dbstats") - self.listener.results.clear() + self.listener.reset() client = rs_or_single_client_noauth( username="both", password="pwd", authSource="testscram", event_listeners=[self.listener] ) @@ -457,9 +457,9 @@ def test_scram(self): if client_context.version.at_least(4, 4, -1): # Speculative authentication in 4.4+ sends saslStart with the # handshake. - self.assertEqual(self.listener.results["started"], []) + self.assertEqual(self.listener.started_events, []) else: - started = self.listener.results["started"][0] + started = self.listener.started_events[0] self.assertEqual(started.command.get("mechanism"), "SCRAM-SHA-256") # Step 3: verify auth failure conditions diff --git a/test/test_change_stream.py b/test/test_change_stream.py index 62d7abee62..b000c3eddb 100644 --- a/test/test_change_stream.py +++ b/test/test_change_stream.py @@ -167,7 +167,7 @@ def test_try_next_runs_one_getmore(self): client = rs_or_single_client(event_listeners=[listener]) # Connect to the cluster. client.admin.command("ping") - listener.results.clear() + listener.reset() # ChangeStreams only read majority committed data so use w:majority. coll = self.watched_collection().with_options(write_concern=WriteConcern("majority")) coll.drop() @@ -177,25 +177,25 @@ def test_try_next_runs_one_getmore(self): self.addCleanup(coll.drop) with self.change_stream_with_client(client, max_await_time_ms=250) as stream: self.assertEqual(listener.started_command_names(), ["aggregate"]) - listener.results.clear() + listener.reset() # Confirm that only a single getMore is run even when no documents # are returned. self.assertIsNone(stream.try_next()) self.assertEqual(listener.started_command_names(), ["getMore"]) - listener.results.clear() + listener.reset() self.assertIsNone(stream.try_next()) self.assertEqual(listener.started_command_names(), ["getMore"]) - listener.results.clear() + listener.reset() # Get at least one change before resuming. coll.insert_one({"_id": 2}) wait_until(lambda: stream.try_next() is not None, "get change from try_next") - listener.results.clear() + listener.reset() # Cause the next request to initiate the resume process. self.kill_change_stream_cursor(stream) - listener.results.clear() + listener.reset() # The sequence should be: # - getMore, fail @@ -203,7 +203,7 @@ def test_try_next_runs_one_getmore(self): # - no results, return immediately without another getMore self.assertIsNone(stream.try_next()) self.assertEqual(listener.started_command_names(), ["getMore", "aggregate"]) - listener.results.clear() + listener.reset() # Stream still works after a resume. coll.insert_one({"_id": 3}) @@ -217,7 +217,7 @@ def test_batch_size_is_honored(self): client = rs_or_single_client(event_listeners=[listener]) # Connect to the cluster. client.admin.command("ping") - listener.results.clear() + listener.reset() # ChangeStreams only read majority committed data so use w:majority. coll = self.watched_collection().with_options(write_concern=WriteConcern("majority")) coll.drop() @@ -229,12 +229,12 @@ def test_batch_size_is_honored(self): expected = {"batchSize": 23} with self.change_stream_with_client(client, max_await_time_ms=250, batch_size=23) as stream: # Confirm that batchSize is honored for initial batch. - cmd = listener.results["started"][0].command + cmd = listener.started_events[0].command self.assertEqual(cmd["cursor"], expected) - listener.results.clear() + listener.reset() # Confirm that batchSize is honored by getMores. self.assertIsNone(stream.try_next()) - cmd = listener.results["started"][0].command + cmd = listener.started_events[0].command key = next(iter(expected)) self.assertEqual(expected[key], cmd[key]) @@ -464,7 +464,7 @@ def _get_expected_resume_token_legacy(self, stream, listener, previous_change=No versions that don't support postBatchResumeToken. Assumes the stream has never returned any changes if previous_change is None.""" if previous_change is None: - agg_cmd = listener.results["started"][0] + agg_cmd = listener.started_events[0] stage = agg_cmd.command["pipeline"][0]["$changeStream"] return stage.get("resumeAfter") or stage.get("startAfter") @@ -481,7 +481,7 @@ def _get_expected_resume_token(self, stream, listener, previous_change=None): if token is not None: return token - response = listener.results["succeeded"][-1].reply + response = listener.succeeded_events[-1].reply return response["cursor"]["postBatchResumeToken"] @no_type_check @@ -558,8 +558,8 @@ def test_no_resume_attempt_if_aggregate_command_fails(self): pass # Driver should have attempted aggregate command only once. - self.assertEqual(len(listener.results["started"]), 1) - self.assertEqual(listener.results["started"][0].command_name, "aggregate") + self.assertEqual(len(listener.started_events), 1) + self.assertEqual(listener.started_events[0].command_name, "aggregate") # Prose test no. 5 - REMOVED # Prose test no. 6 - SKIPPED @@ -603,20 +603,20 @@ def test_start_at_operation_time_caching(self): with self.change_stream_with_client(client) as cs: self.kill_change_stream_cursor(cs) cs.try_next() - cmd = listener.results["started"][-1].command + cmd = listener.started_events[-1].command self.assertIsNotNone(cmd["pipeline"][0]["$changeStream"].get("startAtOperationTime")) # Case 2: change stream started with startAtOperationTime - listener.results.clear() + listener.reset() optime = self.get_start_at_operation_time() with self.change_stream_with_client(client, start_at_operation_time=optime) as cs: self.kill_change_stream_cursor(cs) cs.try_next() - cmd = listener.results["started"][-1].command + cmd = listener.started_events[-1].command self.assertEqual( cmd["pipeline"][0]["$changeStream"].get("startAtOperationTime"), optime, - str([k.command for k in listener.results["started"]]), + str([k.command for k in listener.started_events]), ) # Prose test no. 10 - SKIPPED @@ -631,7 +631,7 @@ def test_resumetoken_empty_batch(self): self.assertIsNone(change_stream.try_next()) resume_token = change_stream.resume_token - response = listener.results["succeeded"][0].reply + response = listener.succeeded_events[0].reply self.assertEqual(resume_token, response["cursor"]["postBatchResumeToken"]) # Prose test no. 11 @@ -643,7 +643,7 @@ def test_resumetoken_exhausted_batch(self): self._populate_and_exhaust_change_stream(change_stream) resume_token = change_stream.resume_token - response = listener.results["succeeded"][-1].reply + response = listener.succeeded_events[-1].reply self.assertEqual(resume_token, response["cursor"]["postBatchResumeToken"]) # Prose test no. 12 @@ -737,7 +737,7 @@ def test_startafter_resume_uses_startafter_after_empty_getMore(self): self.kill_change_stream_cursor(change_stream) change_stream.try_next() # Resume attempt - response = listener.results["started"][-1] + response = listener.started_events[-1] self.assertIsNone(response.command["pipeline"][0]["$changeStream"].get("resumeAfter")) self.assertIsNotNone(response.command["pipeline"][0]["$changeStream"].get("startAfter")) @@ -756,7 +756,7 @@ def test_startafter_resume_uses_resumeafter_after_nonempty_getMore(self): self.kill_change_stream_cursor(change_stream) change_stream.try_next() # Resume attempt - response = listener.results["started"][-1] + response = listener.started_events[-1] self.assertIsNotNone(response.command["pipeline"][0]["$changeStream"].get("resumeAfter")) self.assertIsNone(response.command["pipeline"][0]["$changeStream"].get("startAfter")) @@ -1056,7 +1056,7 @@ def tearDownClass(cls): def setUp(self): super(TestAllLegacyScenarios, self).setUp() - self.listener.results.clear() + self.listener.reset() def setUpCluster(self, scenario_dict): assets = [ @@ -1128,7 +1128,7 @@ def check_event(self, event, expectation_dict): self.assertEqual(getattr(event, key), value) def tearDown(self): - self.listener.results.clear() + self.listener.reset() _TEST_PATH = os.path.join(os.path.dirname(os.path.realpath(__file__)), "change_streams") diff --git a/test/test_collation.py b/test/test_collation.py index d8410a9de4..18f8bc78ac 100644 --- a/test/test_collation.py +++ b/test/test_collation.py @@ -113,11 +113,11 @@ def tearDownClass(cls): super(TestCollation, cls).tearDownClass() def tearDown(self): - self.listener.results.clear() + self.listener.reset() super(TestCollation, self).tearDown() def last_command_started(self): - return self.listener.results["started"][-1].command + return self.listener.started_events[-1].command def assertCollationInLastCommand(self): self.assertEqual(self.collation.document, self.last_command_started()["collation"]) @@ -129,7 +129,7 @@ def test_create_collection(self): # Test passing collation as a dict as well. self.db.test.drop() - self.listener.results.clear() + self.listener.reset() self.db.create_collection("test", collation=self.collation.document) self.assertCollationInLastCommand() @@ -139,7 +139,7 @@ def test_index_model(self): def test_create_index(self): self.db.test.create_index("foo", collation=self.collation) - ci_cmd = self.listener.results["started"][0].command + ci_cmd = self.listener.started_events[0].command self.assertEqual(self.collation.document, ci_cmd["indexes"][0]["collation"]) def test_aggregate(self): @@ -154,18 +154,18 @@ def test_distinct(self): self.db.test.distinct("foo", collation=self.collation) self.assertCollationInLastCommand() - self.listener.results.clear() + self.listener.reset() self.db.test.find(collation=self.collation).distinct("foo") self.assertCollationInLastCommand() def test_find_command(self): self.db.test.insert_one({"is this thing on?": True}) - self.listener.results.clear() + self.listener.reset() next(self.db.test.find(collation=self.collation)) self.assertCollationInLastCommand() def test_explain_command(self): - self.listener.results.clear() + self.listener.reset() self.db.test.find(collation=self.collation).explain() # The collation should be part of the explained command. self.assertEqual( @@ -174,40 +174,40 @@ def test_explain_command(self): def test_delete(self): self.db.test.delete_one({"foo": 42}, collation=self.collation) - command = self.listener.results["started"][0].command + command = self.listener.started_events[0].command self.assertEqual(self.collation.document, command["deletes"][0]["collation"]) - self.listener.results.clear() + self.listener.reset() self.db.test.delete_many({"foo": 42}, collation=self.collation) - command = self.listener.results["started"][0].command + command = self.listener.started_events[0].command self.assertEqual(self.collation.document, command["deletes"][0]["collation"]) def test_update(self): self.db.test.replace_one({"foo": 42}, {"foo": 43}, collation=self.collation) - command = self.listener.results["started"][0].command + command = self.listener.started_events[0].command self.assertEqual(self.collation.document, command["updates"][0]["collation"]) - self.listener.results.clear() + self.listener.reset() self.db.test.update_one({"foo": 42}, {"$set": {"foo": 43}}, collation=self.collation) - command = self.listener.results["started"][0].command + command = self.listener.started_events[0].command self.assertEqual(self.collation.document, command["updates"][0]["collation"]) - self.listener.results.clear() + self.listener.reset() self.db.test.update_many({"foo": 42}, {"$set": {"foo": 43}}, collation=self.collation) - command = self.listener.results["started"][0].command + command = self.listener.started_events[0].command self.assertEqual(self.collation.document, command["updates"][0]["collation"]) def test_find_and(self): self.db.test.find_one_and_delete({"foo": 42}, collation=self.collation) self.assertCollationInLastCommand() - self.listener.results.clear() + self.listener.reset() self.db.test.find_one_and_update( {"foo": 42}, {"$set": {"foo": 43}}, collation=self.collation ) self.assertCollationInLastCommand() - self.listener.results.clear() + self.listener.reset() self.db.test.find_one_and_replace({"foo": 42}, {"foo": 43}, collation=self.collation) self.assertCollationInLastCommand() @@ -229,8 +229,8 @@ def test_bulk_write(self): ] ) - delete_cmd = self.listener.results["started"][0].command - update_cmd = self.listener.results["started"][1].command + delete_cmd = self.listener.started_events[0].command + update_cmd = self.listener.started_events[1].command def check_ops(ops): for op in ops: diff --git a/test/test_command_monitoring_legacy.py b/test/test_command_monitoring_legacy.py index 5d9f2fe3ee..6f5ef8e43f 100644 --- a/test/test_command_monitoring_legacy.py +++ b/test/test_command_monitoring_legacy.py @@ -54,7 +54,7 @@ def tearDownClass(cls): cls.client.close() def tearDown(self): - self.listener.results.clear() + self.listener.reset() def format_actual_results(results): @@ -75,7 +75,7 @@ def run_scenario(self): coll = self.client[dbname][collname] coll.drop() coll.insert_many(scenario_def["data"]) - self.listener.results.clear() + self.listener.reset() name = camel_to_snake(test["operation"]["name"]) if "read_preference" in test["operation"]: coll = coll.with_options( diff --git a/test/test_cursor.py b/test/test_cursor.py index 5b4efcd391..f140ab6da8 100644 --- a/test/test_cursor.py +++ b/test/test_cursor.py @@ -329,7 +329,7 @@ def test_explain_with_read_concern(self): self.addCleanup(client.close) coll = client.pymongo_test.test.with_options(read_concern=ReadConcern(level="local")) self.assertTrue(coll.find().explain()) - started = listener.results["started"] + started = listener.started_events self.assertEqual(len(started), 1) self.assertNotIn("readConcern", started[0].command) @@ -1226,7 +1226,7 @@ def test_getMore_does_not_send_readPreference(self): self.addCleanup(coll.drop) list(coll.find(batch_size=3)) - started = listener.results["started"] + started = listener.started_events self.assertEqual(2, len(started)) self.assertEqual("find", started[0].command_name) if client_context.is_rs or client_context.is_mongos: @@ -1261,13 +1261,13 @@ def test_find_raw_transaction(self): batches = list( client[self.db.name].test.find_raw_batches(session=session).sort("_id") ) - cmd = listener.results["started"][0] + cmd = listener.started_events[0] self.assertEqual(cmd.command_name, "find") self.assertIn("$clusterTime", cmd.command) self.assertEqual(cmd.command["startTransaction"], True) self.assertEqual(cmd.command["txnNumber"], 1) # Ensure we update $clusterTime from the command response. - last_cmd = listener.results["succeeded"][-1] + last_cmd = listener.succeeded_events[-1] self.assertEqual( last_cmd.reply["$clusterTime"]["clusterTime"], session.cluster_time["clusterTime"], @@ -1293,8 +1293,8 @@ def test_find_raw_retryable_reads(self): self.assertEqual(1, len(batches)) self.assertEqual(docs, decode_all(batches[0])) - self.assertEqual(len(listener.results["started"]), 2) - for cmd in listener.results["started"]: + self.assertEqual(len(listener.started_events), 2) + for cmd in listener.started_events: self.assertEqual(cmd.command_name, "find") @client_context.require_version_min(5, 0, 0) @@ -1314,7 +1314,7 @@ def test_find_raw_snapshot_reads(self): self.assertEqual(1, len(batches)) self.assertEqual(docs, decode_all(batches[0])) - find_cmd = listener.results["started"][1].command + find_cmd = listener.started_events[1].command self.assertEqual(find_cmd["readConcern"]["level"], "snapshot") self.assertIsNotNone(find_cmd["readConcern"]["atClusterTime"]) @@ -1372,15 +1372,15 @@ def test_monitoring(self): c.drop() c.insert_many([{"_id": i} for i in range(10)]) - listener.results.clear() + listener.reset() cursor = c.find_raw_batches(batch_size=4) # First raw batch of 4 documents. next(cursor) - started = listener.results["started"][0] - succeeded = listener.results["succeeded"][0] - self.assertEqual(0, len(listener.results["failed"])) + started = listener.started_events[0] + succeeded = listener.succeeded_events[0] + self.assertEqual(0, len(listener.failed_events)) self.assertEqual("find", started.command_name) self.assertEqual("pymongo_test", started.database_name) self.assertEqual("find", succeeded.command_name) @@ -1391,7 +1391,7 @@ def test_monitoring(self): self.assertEqual(len(csr["firstBatch"]), 1) self.assertEqual(decode_all(csr["firstBatch"][0]), [{"_id": i} for i in range(0, 4)]) - listener.results.clear() + listener.reset() # Next raw batch of 4 documents. next(cursor) @@ -1442,13 +1442,13 @@ def test_aggregate_raw_transaction(self): [{"$sort": {"_id": 1}}], session=session ) ) - cmd = listener.results["started"][0] + cmd = listener.started_events[0] self.assertEqual(cmd.command_name, "aggregate") self.assertIn("$clusterTime", cmd.command) self.assertEqual(cmd.command["startTransaction"], True) self.assertEqual(cmd.command["txnNumber"], 1) # Ensure we update $clusterTime from the command response. - last_cmd = listener.results["succeeded"][-1] + last_cmd = listener.succeeded_events[-1] self.assertEqual( last_cmd.reply["$clusterTime"]["clusterTime"], session.cluster_time["clusterTime"], @@ -1473,8 +1473,8 @@ def test_aggregate_raw_retryable_reads(self): self.assertEqual(1, len(batches)) self.assertEqual(docs, decode_all(batches[0])) - self.assertEqual(len(listener.results["started"]), 3) - cmds = listener.results["started"] + self.assertEqual(len(listener.started_events), 3) + cmds = listener.started_events self.assertEqual(cmds[0].command_name, "aggregate") self.assertEqual(cmds[1].command_name, "aggregate") @@ -1495,7 +1495,7 @@ def test_aggregate_raw_snapshot_reads(self): self.assertEqual(1, len(batches)) self.assertEqual(docs, decode_all(batches[0])) - find_cmd = listener.results["started"][1].command + find_cmd = listener.started_events[1].command self.assertEqual(find_cmd["readConcern"]["level"], "snapshot") self.assertIsNotNone(find_cmd["readConcern"]["atClusterTime"]) @@ -1536,13 +1536,13 @@ def test_monitoring(self): c.drop() c.insert_many([{"_id": i} for i in range(10)]) - listener.results.clear() + listener.reset() cursor = c.aggregate_raw_batches([{"$sort": {"_id": 1}}], batchSize=4) # Start cursor, no initial batch. - started = listener.results["started"][0] - succeeded = listener.results["succeeded"][0] - self.assertEqual(0, len(listener.results["failed"])) + started = listener.started_events[0] + succeeded = listener.succeeded_events[0] + self.assertEqual(0, len(listener.failed_events)) self.assertEqual("aggregate", started.command_name) self.assertEqual("pymongo_test", started.database_name) self.assertEqual("aggregate", succeeded.command_name) @@ -1551,7 +1551,7 @@ def test_monitoring(self): # First batch is empty. self.assertEqual(len(csr["firstBatch"]), 0) - listener.results.clear() + listener.reset() # Batches of 4 documents. n = 0 @@ -1570,7 +1570,7 @@ def test_monitoring(self): self.assertEqual(decode_all(batch), [{"_id": i} for i in range(n, min(n + 4, 10))]) n += 4 - listener.results.clear() + listener.reset() if __name__ == "__main__": diff --git a/test/test_data_lake.py b/test/test_data_lake.py index fbf79994d3..4fa38435a3 100644 --- a/test/test_data_lake.py +++ b/test/test_data_lake.py @@ -62,16 +62,16 @@ def test_1(self): next(cursor) # find command assertions - find_cmd = listener.results["succeeded"][-1] + find_cmd = listener.succeeded_events[-1] self.assertEqual(find_cmd.command_name, "find") cursor_id = find_cmd.reply["cursor"]["id"] cursor_ns = find_cmd.reply["cursor"]["ns"] # killCursors command assertions cursor.close() - started = listener.results["started"][-1] + started = listener.started_events[-1] self.assertEqual(started.command_name, "killCursors") - succeeded = listener.results["succeeded"][-1] + succeeded = listener.succeeded_events[-1] self.assertEqual(succeeded.command_name, "killCursors") self.assertIn(cursor_id, started.command["cursors"]) diff --git a/test/test_database.py b/test/test_database.py index 49387b8bb9..d56b9800e4 100644 --- a/test/test_database.py +++ b/test/test_database.py @@ -207,7 +207,7 @@ def test_list_collection_names_filter(self): results.clear() names = db.list_collection_names(filter=filter) self.assertEqual(names, ["capped"]) - self.assertNotIn("nameOnly", results["started"][0].command) + self.assertNotIn("nameOnly", listener.started_events[0].command) # Should send nameOnly (except on 2.6). for filter in (None, {}, {"name": {"$in": ["capped", "non_capped"]}}): diff --git a/test/test_encryption.py b/test/test_encryption.py index 6c54a90f7a..eaee22ebac 100644 --- a/test/test_encryption.py +++ b/test/test_encryption.py @@ -814,7 +814,7 @@ def run_test(self, provider_name): provider_name, master_key=master_key, key_alt_names=["%s_altname" % (provider_name,)] ) self.assertBinaryUUID(datakey_id) - cmd = self.listener.results["started"][-1] + cmd = self.listener.started_events[-1] self.assertEqual("insert", cmd.command_name) self.assertEqual({"w": "majority"}, cmd.command.get("writeConcern")) docs = list(self.vault.find({"_id": datakey_id})) @@ -1489,7 +1489,7 @@ def _test_automatic(self, expectation_extjson, payload): expected_document = json_util.loads(expectation_extjson, json_options=JSON_OPTS) coll.insert_one(payload) - event = insert_listener.results["started"][0] + event = insert_listener.started_events[0] inserted_doc = event.command["documents"][0] for key, value in expected_document.items(): @@ -1622,7 +1622,7 @@ def test_case_1(self): ), ) - cev = self.client_listener.results["started"] + cev = self.client_listener.started_events self.assertEqual(len(cev), 4) self.assertEqual(cev[0].command_name, "listCollections") self.assertEqual(cev[0].database_name, "db") @@ -1643,7 +1643,7 @@ def test_case_2(self): ), ) - cev = self.client_listener.results["started"] + cev = self.client_listener.started_events self.assertEqual(len(cev), 3) self.assertEqual(cev[0].command_name, "listCollections") self.assertEqual(cev[0].database_name, "db") @@ -1652,7 +1652,7 @@ def test_case_2(self): self.assertEqual(cev[2].command_name, "find") self.assertEqual(cev[2].database_name, "db") - cev = self.client_keyvault_listener.results["started"] + cev = self.client_keyvault_listener.started_events self.assertEqual(len(cev), 1) self.assertEqual(cev[0].command_name, "find") self.assertEqual(cev[0].database_name, "keyvault") @@ -1667,7 +1667,7 @@ def test_case_3(self): ), ) - cev = self.client_listener.results["started"] + cev = self.client_listener.started_events self.assertEqual(len(cev), 2) self.assertEqual(cev[0].command_name, "find") self.assertEqual(cev[0].database_name, "db") @@ -1684,12 +1684,12 @@ def test_case_4(self): ), ) - cev = self.client_listener.results["started"] + cev = self.client_listener.started_events self.assertEqual(len(cev), 1) self.assertEqual(cev[0].command_name, "find") self.assertEqual(cev[0].database_name, "db") - cev = self.client_keyvault_listener.results["started"] + cev = self.client_keyvault_listener.started_events self.assertEqual(len(cev), 1) self.assertEqual(cev[0].command_name, "find") self.assertEqual(cev[0].database_name, "keyvault") @@ -1704,7 +1704,7 @@ def test_case_5(self): ), ) - cev = self.client_listener.results["started"] + cev = self.client_listener.started_events self.assertEqual(len(cev), 5) self.assertEqual(cev[0].command_name, "listCollections") self.assertEqual(cev[0].database_name, "db") @@ -1727,7 +1727,7 @@ def test_case_6(self): ), ) - cev = self.client_listener.results["started"] + cev = self.client_listener.started_events self.assertEqual(len(cev), 3) self.assertEqual(cev[0].command_name, "listCollections") self.assertEqual(cev[0].database_name, "db") @@ -1736,7 +1736,7 @@ def test_case_6(self): self.assertEqual(cev[2].command_name, "find") self.assertEqual(cev[2].database_name, "db") - cev = self.client_keyvault_listener.results["started"] + cev = self.client_keyvault_listener.started_events self.assertEqual(len(cev), 1) self.assertEqual(cev[0].command_name, "find") self.assertEqual(cev[0].database_name, "keyvault") @@ -1751,7 +1751,7 @@ def test_case_7(self): ), ) - cev = self.client_listener.results["started"] + cev = self.client_listener.started_events self.assertEqual(len(cev), 2) self.assertEqual(cev[0].command_name, "find") self.assertEqual(cev[0].database_name, "db") @@ -1768,12 +1768,12 @@ def test_case_8(self): ), ) - cev = self.client_listener.results["started"] + cev = self.client_listener.started_events self.assertEqual(len(cev), 1) self.assertEqual(cev[0].command_name, "find") self.assertEqual(cev[0].database_name, "db") - cev = self.client_keyvault_listener.results["started"] + cev = self.client_keyvault_listener.started_events self.assertEqual(len(cev), 1) self.assertEqual(cev[0].command_name, "find") self.assertEqual(cev[0].database_name, "keyvault") @@ -1821,8 +1821,8 @@ def test_01_command_error(self): ): with self.assertRaises(OperationFailure): self.encrypted_client.db.decryption_events.aggregate([]) - self.assertEqual(len(self.listener.results["failed"]), 1) - for event in self.listener.results["failed"]: + self.assertEqual(len(self.listener.failed_events), 1) + for event in self.listener.failed_events: self.assertEqual(event.failure["code"], 123) def test_02_network_error(self): @@ -1834,8 +1834,8 @@ def test_02_network_error(self): ): with self.assertRaises(AutoReconnect): self.encrypted_client.db.decryption_events.aggregate([]) - self.assertEqual(len(self.listener.results["failed"]), 1) - self.assertEqual(self.listener.results["failed"][0].command_name, "aggregate") + self.assertEqual(len(self.listener.failed_events), 1) + self.assertEqual(self.listener.failed_events[0].command_name, "aggregate") def test_03_decrypt_error(self): self.encrypted_client.db.decryption_events.insert_one( @@ -1843,8 +1843,8 @@ def test_03_decrypt_error(self): ) with self.assertRaises(EncryptionError): next(self.encrypted_client.db.decryption_events.aggregate([])) - event = self.listener.results["succeeded"][0] - self.assertEqual(len(self.listener.results["failed"]), 0) + event = self.listener.succeeded_events[0] + self.assertEqual(len(self.listener.failed_events), 0) self.assertEqual( event.reply["cursor"]["firstBatch"][0]["encrypted"], self.malformed_cipher_text ) @@ -1852,8 +1852,8 @@ def test_03_decrypt_error(self): def test_04_decrypt_success(self): self.encrypted_client.db.decryption_events.insert_one({"encrypted": self.cipher_text}) next(self.encrypted_client.db.decryption_events.aggregate([])) - event = self.listener.results["succeeded"][0] - self.assertEqual(len(self.listener.results["failed"]), 0) + event = self.listener.succeeded_events[0] + self.assertEqual(len(self.listener.failed_events), 0) self.assertEqual(event.reply["cursor"]["firstBatch"][0]["encrypted"], self.cipher_text) diff --git a/test/test_monitoring.py b/test/test_monitoring.py index 0b8200c019..48e05212d4 100644 --- a/test/test_monitoring.py +++ b/test/test_monitoring.py @@ -49,7 +49,7 @@ def tearDownClass(cls): super(TestCommandMonitoring, cls).tearDownClass() def tearDown(self): - self.listener.results.clear() + self.listener.reset() super(TestCommandMonitoring, self).tearDown() def test_started_simple(self): @@ -117,7 +117,7 @@ def test_find_one(self): def test_find_and_get_more(self): self.client.pymongo_test.test.drop() self.client.pymongo_test.test.insert_many([{} for _ in range(10)]) - self.listener.results.clear() + self.listener.reset() cursor = self.client.pymongo_test.test.find(projection={"_id": False}, batch_size=4) for _ in range(4): next(cursor) @@ -147,7 +147,7 @@ def test_find_and_get_more(self): self.assertEqual(csr["ns"], "pymongo_test.test") self.assertEqual(csr["firstBatch"], [{} for _ in range(4)]) - self.listener.results.clear() + self.listener.reset() # Next batch. Exhausting the cursor could cause a getMore # that returns id of 0 and no results. next(cursor) @@ -182,7 +182,7 @@ def test_find_with_explain(self): cmd = SON([("explain", SON([("find", "test"), ("filter", {})]))]) self.client.pymongo_test.test.drop() self.client.pymongo_test.test.insert_one({}) - self.listener.results.clear() + self.listener.reset() coll = self.client.pymongo_test.test # Test that we publish the unwrapped command. if self.client.is_mongos: @@ -212,7 +212,7 @@ def _test_find_options(self, query, expected_cmd): coll.insert_many([{"x": i} for i in range(5)]) # Test that we publish the unwrapped command. - self.listener.results.clear() + self.listener.reset() if self.client.is_mongos: coll = coll.with_options(read_preference=ReadPreference.PRIMARY_PREFERRED) @@ -293,7 +293,7 @@ def test_find_snapshot(self): def test_command_and_get_more(self): self.client.pymongo_test.test.drop() self.client.pymongo_test.test.insert_many([{"x": 1} for _ in range(10)]) - self.listener.results.clear() + self.listener.reset() coll = self.client.pymongo_test.test # Test that we publish the unwrapped command. if self.client.is_mongos: @@ -333,7 +333,7 @@ def test_command_and_get_more(self): } self.assertEqualCommand(expected_cursor, succeeded.reply.get("cursor")) - self.listener.results.clear() + self.listener.reset() next(cursor) try: results = self.listener.results @@ -403,7 +403,7 @@ def test_not_primary_error(self): client = single_client(*address, event_listeners=[self.listener]) # Clear authentication command results from the listener. client.admin.command("ping") - self.listener.results.clear() + self.listener.reset() error = None try: client.pymongo_test.test.find_one_and_delete({}) @@ -426,7 +426,7 @@ def test_not_primary_error(self): def test_exhaust(self): self.client.pymongo_test.test.drop() self.client.pymongo_test.test.insert_many([{} for _ in range(11)]) - self.listener.results.clear() + self.listener.reset() cursor = self.client.pymongo_test.test.find( projection={"_id": False}, batch_size=5, cursor_type=CursorType.EXHAUST ) @@ -462,7 +462,7 @@ def test_exhaust(self): } self.assertEqualReply(expected_result, succeeded.reply) - self.listener.results.clear() + self.listener.reset() tuple(cursor) results = self.listener.results self.assertEqual(0, len(results["failed"])) @@ -492,7 +492,7 @@ def test_kill_cursors(self): cursor = self.client.pymongo_test.test.find().batch_size(5) next(cursor) cursor_id = cursor.cursor_id - self.listener.results.clear() + self.listener.reset() cursor.close() time.sleep(2) results = self.listener.results @@ -524,7 +524,7 @@ def test_kill_cursors(self): def test_non_bulk_writes(self): coll = self.client.pymongo_test.test coll.drop() - self.listener.results.clear() + self.listener.reset() # Implied write concern insert_one res = coll.insert_one({"x": 1}) @@ -555,7 +555,7 @@ def test_non_bulk_writes(self): self.assertEqual(1, reply.get("n")) # Unacknowledged insert_one - self.listener.results.clear() + self.listener.reset() coll = coll.with_options(write_concern=WriteConcern(w=0)) res = coll.insert_one({"x": 1}) results = self.listener.results @@ -584,7 +584,7 @@ def test_non_bulk_writes(self): self.assertEqualReply(succeeded.reply, {"ok": 1}) # Explicit write concern insert_one - self.listener.results.clear() + self.listener.reset() coll = coll.with_options(write_concern=WriteConcern(w=1)) res = coll.insert_one({"x": 1}) results = self.listener.results @@ -615,7 +615,7 @@ def test_non_bulk_writes(self): self.assertEqual(1, reply.get("n")) # delete_many - self.listener.results.clear() + self.listener.reset() res = coll.delete_many({"x": 1}) results = self.listener.results started = results["started"][0] @@ -645,7 +645,7 @@ def test_non_bulk_writes(self): self.assertEqual(res.deleted_count, reply.get("n")) # replace_one - self.listener.results.clear() + self.listener.reset() oid = ObjectId() res = coll.replace_one({"_id": oid}, {"_id": oid, "x": 1}, upsert=True) results = self.listener.results @@ -689,7 +689,7 @@ def test_non_bulk_writes(self): self.assertEqual([{"index": 0, "_id": oid}], reply.get("upserted")) # update_one - self.listener.results.clear() + self.listener.reset() res = coll.update_one({"x": 1}, {"$inc": {"x": 1}}) results = self.listener.results started = results["started"][0] @@ -731,7 +731,7 @@ def test_non_bulk_writes(self): self.assertEqual(1, reply.get("n")) # update_many - self.listener.results.clear() + self.listener.reset() res = coll.update_many({"x": 2}, {"$inc": {"x": 1}}) results = self.listener.results started = results["started"][0] @@ -773,7 +773,7 @@ def test_non_bulk_writes(self): self.assertEqual(1, reply.get("n")) # delete_one - self.listener.results.clear() + self.listener.reset() _ = coll.delete_one({"x": 3}) results = self.listener.results started = results["started"][0] @@ -807,7 +807,7 @@ def test_non_bulk_writes(self): # write errors coll.insert_one({"_id": 1}) try: - self.listener.results.clear() + self.listener.reset() coll.insert_one({"_id": 1}) except OperationFailure: pass @@ -848,7 +848,7 @@ def test_insert_many(self): # This always uses the bulk API. coll = self.client.pymongo_test.test coll.drop() - self.listener.results.clear() + self.listener.reset() big = "x" * (1024 * 1024 * 4) docs = [{"_id": i, "big": big} for i in range(6)] @@ -889,7 +889,7 @@ def test_insert_many_unacknowledged(self): coll = self.client.pymongo_test.test coll.drop() unack_coll = coll.with_options(write_concern=WriteConcern(w=0)) - self.listener.results.clear() + self.listener.reset() # Force two batches on legacy servers. big = "x" * (1024 * 1024 * 12) @@ -928,7 +928,7 @@ def test_insert_many_unacknowledged(self): def test_bulk_write(self): coll = self.client.pymongo_test.test coll.drop() - self.listener.results.clear() + self.listener.reset() coll.bulk_write( [ @@ -991,7 +991,7 @@ def test_bulk_write(self): @client_context.require_failCommand_fail_point def test_bulk_write_command_network_error(self): coll = self.client.pymongo_test.test - self.listener.results.clear() + self.listener.reset() insert_network_error = { "configureFailPoint": "failCommand", @@ -1004,7 +1004,7 @@ def test_bulk_write_command_network_error(self): with self.fail_point(insert_network_error): with self.assertRaises(AutoReconnect): coll.bulk_write([InsertOne({"_id": 1})]) - failed = self.listener.results["failed"] + failed = self.listener.failed_events self.assertEqual(1, len(failed)) event = failed[0] self.assertEqual(event.command_name, "insert") @@ -1015,7 +1015,7 @@ def test_bulk_write_command_network_error(self): @client_context.require_failCommand_fail_point def test_bulk_write_command_error(self): coll = self.client.pymongo_test.test - self.listener.results.clear() + self.listener.reset() insert_command_error = { "configureFailPoint": "failCommand", @@ -1029,7 +1029,7 @@ def test_bulk_write_command_error(self): with self.fail_point(insert_command_error): with self.assertRaises(NotPrimaryError): coll.bulk_write([InsertOne({"_id": 1})]) - failed = self.listener.results["failed"] + failed = self.listener.failed_events self.assertEqual(1, len(failed)) event = failed[0] self.assertEqual(event.command_name, "insert") @@ -1040,7 +1040,7 @@ def test_bulk_write_command_error(self): def test_write_errors(self): coll = self.client.pymongo_test.test coll.drop() - self.listener.results.clear() + self.listener.reset() try: coll.bulk_write( @@ -1084,7 +1084,7 @@ def test_write_errors(self): def test_first_batch_helper(self): # Regardless of server version and use of helpers._first_batch # this test should still pass. - self.listener.results.clear() + self.listener.reset() tuple(self.client.pymongo_test.test.list_indexes()) results = self.listener.results started = results["started"][0] @@ -1105,12 +1105,12 @@ def test_first_batch_helper(self): self.assertTrue("cursor" in succeeded.reply) self.assertTrue("ok" in succeeded.reply) - self.listener.results.clear() + self.listener.reset() def test_sensitive_commands(self): listeners = self.client._event_listeners - self.listener.results.clear() + self.listener.reset() cmd = SON([("getnonce", 1)]) listeners.publish_command_start(cmd, "pymongo_test", 12345, self.client.address) delta = datetime.timedelta(milliseconds=100) @@ -1159,7 +1159,7 @@ def tearDownClass(cls): def setUp(self): super(TestGlobalListener, self).setUp() - self.listener.results.clear() + self.listener.reset() def test_simple(self): self.client.pymongo_test.command("ping") diff --git a/test/test_read_concern.py b/test/test_read_concern.py index d5df682fba..3a1c8f3a54 100644 --- a/test/test_read_concern.py +++ b/test/test_read_concern.py @@ -14,6 +14,11 @@ """Test the read_concern module.""" +import sys +import unittest + +sys.path[0:0] = [""] + from test import IntegrationTest, client_context from test.utils import OvertCommandListener, rs_or_single_client, single_client @@ -41,7 +46,7 @@ def tearDownClass(cls): super(TestReadConcern, cls).tearDownClass() def tearDown(self): - self.listener.results.clear() + self.listener.reset() super(TestReadConcern, self).tearDown() def test_read_concern(self): @@ -74,9 +79,9 @@ def test_find_command(self): # readConcern not sent in command if not specified. coll = self.db.coll tuple(coll.find({"field": "value"})) - self.assertNotIn("readConcern", self.listener.results["started"][0].command) + self.assertNotIn("readConcern", self.listener.started_events[0].command) - self.listener.results.clear() + self.listener.reset() # Explicitly set readConcern to 'local'. coll = self.db.get_collection("coll", read_concern=ReadConcern("local")) @@ -89,23 +94,21 @@ def test_find_command(self): ("readConcern", {"level": "local"}), ] ), - self.listener.results["started"][0].command, + self.listener.started_events[0].command, ) def test_command_cursor(self): # readConcern not sent in command if not specified. coll = self.db.coll tuple(coll.aggregate([{"$match": {"field": "value"}}])) - self.assertNotIn("readConcern", self.listener.results["started"][0].command) + self.assertNotIn("readConcern", self.listener.started_events[0].command) - self.listener.results.clear() + self.listener.reset() # Explicitly set readConcern to 'local'. coll = self.db.get_collection("coll", read_concern=ReadConcern("local")) tuple(coll.aggregate([{"$match": {"field": "value"}}])) - self.assertEqual( - {"level": "local"}, self.listener.results["started"][0].command["readConcern"] - ) + self.assertEqual({"level": "local"}, self.listener.started_events[0].command["readConcern"]) def test_aggregate_out(self): coll = self.db.get_collection("coll", read_concern=ReadConcern("local")) @@ -113,6 +116,10 @@ def test_aggregate_out(self): # Aggregate with $out supports readConcern MongoDB 4.2 onwards. if client_context.version >= (4, 1): - self.assertIn("readConcern", self.listener.results["started"][0].command) + self.assertIn("readConcern", self.listener.started_events[0].command) else: - self.assertNotIn("readConcern", self.listener.results["started"][0].command) + self.assertNotIn("readConcern", self.listener.started_events[0].command) + + +if __name__ == "__main__": + unittest.main() diff --git a/test/test_read_preferences.py b/test/test_read_preferences.py index ae2fa8bcee..1362623dff 100644 --- a/test/test_read_preferences.py +++ b/test/test_read_preferences.py @@ -541,7 +541,7 @@ def test_send_hedge(self): coll = client.test.get_collection("test", read_preference=pref) listener.reset() coll.find_one() - started = listener.results["started"] + started = listener.started_events self.assertEqual(len(started), 1, started) cmd = started[0].command if client_context.is_rs or client_context.is_mongos: diff --git a/test/test_read_write_concern_spec.py b/test/test_read_write_concern_spec.py index 4dfc8f068c..5cc4845e32 100644 --- a/test/test_read_write_concern_spec.py +++ b/test/test_read_write_concern_spec.py @@ -85,11 +85,11 @@ def insert_command_default_write_concern(): ] for name, f in ops: - listener.results.clear() + listener.reset() f() - self.assertGreaterEqual(len(listener.results["started"]), 1) - for i, event in enumerate(listener.results["started"]): + self.assertGreaterEqual(len(listener.started_events), 1) + for i, event in enumerate(listener.started_events): self.assertNotIn( "readConcern", event.command, @@ -221,7 +221,7 @@ def test_write_error_details_exposes_errinfo(self): self.assertIsNotNone(ctx.exception.details) assert ctx.exception.details is not None self.assertIsNotNone(ctx.exception.details.get("errInfo")) - for event in listener.results["succeeded"]: + for event in listener.succeeded_events: if event.command_name == "insert": self.assertEqual(event.reply["writeErrors"][0], ctx.exception.details) break diff --git a/test/test_retryable_reads.py b/test/test_retryable_reads.py index 2b8bc17c58..517e1122b0 100644 --- a/test/test_retryable_reads.py +++ b/test/test_retryable_reads.py @@ -208,12 +208,12 @@ def test_pool_paused_error_is_retryable(self): # Connection check out failures are not reflected in command # monitoring because we only publish command events _after_ checking # out a connection. - started = cmd_listener.results["started"] + started = cmd_listener.started_events msg = pprint.pformat(cmd_listener.results) self.assertEqual(3, len(started), msg) - succeeded = cmd_listener.results["succeeded"] + succeeded = cmd_listener.succeeded_events self.assertEqual(2, len(succeeded), msg) - failed = cmd_listener.results["failed"] + failed = cmd_listener.failed_events self.assertEqual(1, len(failed), msg) diff --git a/test/test_retryable_writes.py b/test/test_retryable_writes.py index 8d556b90ae..7ca1c9c1ef 100644 --- a/test/test_retryable_writes.py +++ b/test/test_retryable_writes.py @@ -227,9 +227,9 @@ def test_supported_single_statement_no_retry(self): self.addCleanup(client.close) for method, args, kwargs in retryable_single_statement_ops(client.db.retryable_write_test): msg = "%s(*%r, **%r)" % (method.__name__, args, kwargs) - listener.results.clear() + listener.reset() method(*args, **kwargs) - for event in listener.results["started"]: + for event in listener.started_events: self.assertNotIn( "txnNumber", event.command, @@ -240,10 +240,10 @@ def test_supported_single_statement_no_retry(self): def test_supported_single_statement_supported_cluster(self): for method, args, kwargs in retryable_single_statement_ops(self.db.retryable_write_test): msg = "%s(*%r, **%r)" % (method.__name__, args, kwargs) - self.listener.results.clear() + self.listener.reset() method(*args, **kwargs) - commands_started = self.listener.results["started"] - self.assertEqual(len(self.listener.results["succeeded"]), 1, msg) + commands_started = self.listener.started_events + self.assertEqual(len(self.listener.succeeded_events), 1, msg) first_attempt = commands_started[0] self.assertIn( "lsid", @@ -283,10 +283,10 @@ def test_supported_single_statement_unsupported_cluster(self): for method, args, kwargs in retryable_single_statement_ops(self.db.retryable_write_test): msg = "%s(*%r, **%r)" % (method.__name__, args, kwargs) - self.listener.results.clear() + self.listener.reset() method(*args, **kwargs) - for event in self.listener.results["started"]: + for event in self.listener.started_events: self.assertNotIn( "txnNumber", event.command, @@ -301,11 +301,11 @@ def test_unsupported_single_statement(self): coll ) + retryable_single_statement_ops(coll_w0): msg = "%s(*%r, **%r)" % (method.__name__, args, kwargs) - self.listener.results.clear() + self.listener.reset() method(*args, **kwargs) - started_events = self.listener.results["started"] - self.assertEqual(len(self.listener.results["succeeded"]), len(started_events), msg) - self.assertEqual(len(self.listener.results["failed"]), 0, msg) + started_events = self.listener.started_events + self.assertEqual(len(self.listener.succeeded_events), len(started_events), msg) + self.assertEqual(len(self.listener.failed_events), 0, msg) for event in started_events: self.assertNotIn( "txnNumber", @@ -324,10 +324,10 @@ def test_server_selection_timeout_not_retried(self): ) for method, args, kwargs in retryable_single_statement_ops(client.db.retryable_write_test): msg = "%s(*%r, **%r)" % (method.__name__, args, kwargs) - listener.results.clear() + listener.reset() with self.assertRaises(ServerSelectionTimeoutError, msg=msg): method(*args, **kwargs) - self.assertEqual(len(listener.results["started"]), 0, msg) + self.assertEqual(len(listener.started_events), 0, msg) @client_context.require_replica_set @client_context.require_test_commands @@ -353,11 +353,11 @@ def raise_error(*args, **kwargs): for method, args, kwargs in retryable_single_statement_ops(client.db.retryable_write_test): msg = "%s(*%r, **%r)" % (method.__name__, args, kwargs) - listener.results.clear() + listener.reset() topology.select_server = mock_select_server with self.assertRaises(ConnectionFailure, msg=msg): method(*args, **kwargs) - self.assertEqual(len(listener.results["started"]), 1, msg) + self.assertEqual(len(listener.started_events), 1, msg) @client_context.require_replica_set @client_context.require_test_commands @@ -366,7 +366,7 @@ def test_batch_splitting(self): large = "s" * 1024 * 1024 * 15 coll = self.db.retryable_write_test coll.delete_many({}) - self.listener.results.clear() + self.listener.reset() bulk_result = coll.bulk_write( [ InsertOne({"_id": 1, "l": large}), @@ -381,7 +381,7 @@ def test_batch_splitting(self): # Each command should fail and be retried. # With OP_MSG 3 inserts are one batch. 2 updates another. # 2 deletes a third. - self.assertEqual(len(self.listener.results["started"]), 6) + self.assertEqual(len(self.listener.started_events), 6) self.assertEqual(coll.find_one(), {"_id": 1, "count": 1}) # Assert the final result expected_result = { @@ -412,7 +412,7 @@ def test_batch_splitting_retry_fails(self): ] ) ) - self.listener.results.clear() + self.listener.reset() with self.client.start_session() as session: initial_txn = session._server_session._transaction_id try: @@ -430,9 +430,9 @@ def test_batch_splitting_retry_fails(self): else: self.fail("bulk_write should have failed") - started = self.listener.results["started"] + started = self.listener.started_events self.assertEqual(len(started), 3) - self.assertEqual(len(self.listener.results["succeeded"]), 1) + self.assertEqual(len(self.listener.succeeded_events), 1) expected_txn = Int64(initial_txn + 1) self.assertEqual(started[0].command["txnNumber"], expected_txn) self.assertEqual(started[0].command["lsid"], session.session_id) @@ -483,9 +483,7 @@ def test_RetryableWriteError_error_label(self): if client_context.version >= Version(4, 4): # In MongoDB 4.4+ we rely on the server returning the error label. - self.assertIn( - "RetryableWriteError", listener.results["succeeded"][-1].reply["errorLabels"] - ) + self.assertIn("RetryableWriteError", listener.succeeded_events[-1].reply["errorLabels"]) @client_context.require_version_min(4, 4) def test_RetryableWriteError_error_label_RawBSONDocument(self): @@ -575,12 +573,12 @@ def test_pool_paused_error_is_retryable(self): # Connection check out failures are not reflected in command # monitoring because we only publish command events _after_ checking # out a connection. - started = cmd_listener.results["started"] + started = cmd_listener.started_events msg = pprint.pformat(cmd_listener.results) self.assertEqual(3, len(started), msg) - succeeded = cmd_listener.results["succeeded"] + succeeded = cmd_listener.succeeded_events self.assertEqual(2, len(succeeded), msg) - failed = cmd_listener.results["failed"] + failed = cmd_listener.failed_events self.assertEqual(1, len(failed), msg) @@ -605,7 +603,7 @@ def raise_connection_err_select_server(*args, **kwargs): raise ConnectionFailure("Connection refused") for method, args, kwargs in retryable_single_statement_ops(client.db.retryable_write_test): - listener.results.clear() + listener.reset() topology.select_server = raise_connection_err_select_server with client.start_session() as session: kwargs = copy.deepcopy(kwargs) @@ -616,8 +614,8 @@ def raise_connection_err_select_server(*args, **kwargs): # Each operation should fail on the first attempt and succeed # on the second. method(*args, **kwargs) - self.assertEqual(len(listener.results["started"]), 1, msg) - retry_cmd = listener.results["started"][0].command + self.assertEqual(len(listener.started_events), 1, msg) + retry_cmd = listener.started_events[0].command sent_txn_id = retry_cmd["txnNumber"] final_txn_id = session._server_session.transaction_id self.assertEqual(Int64(initial_txn_id + 1), sent_txn_id, msg) diff --git a/test/test_server_selection.py b/test/test_server_selection.py index c3f3762f9a..8d4ffe5e9b 100644 --- a/test/test_server_selection.py +++ b/test/test_server_selection.py @@ -97,7 +97,7 @@ def all_hosts_started(): coll.find_one({"name": "John Doe"}) # Confirm all find commands are run against appropriate host. - for command in listener.results["started"]: + for command in listener.started_events: if command.command_name == "find": self.assertEqual(command.connection_id[1], expected_port) diff --git a/test/test_server_selection_in_window.py b/test/test_server_selection_in_window.py index cae2d7661b..d076ae77b3 100644 --- a/test/test_server_selection_in_window.py +++ b/test/test_server_selection_in_window.py @@ -115,7 +115,7 @@ def frequencies(self, client, listener, n_finds=10): for thread in threads: self.assertTrue(thread.passed) - events = listener.results["started"] + events = listener.started_events self.assertEqual(len(events), n_finds * N_THREADS) nodes = client.nodes self.assertEqual(len(nodes), 2) diff --git a/test/test_session.py b/test/test_session.py index 386bab295c..25d209ebaf 100644 --- a/test/test_session.py +++ b/test/test_session.py @@ -58,9 +58,9 @@ def failed(self, event): super(SessionTestListener, self).failed(event) def first_command_started(self): - assert len(self.results["started"]) >= 1, "No command-started events" + assert len(self.started_events) >= 1, "No command-started events" - return self.results["started"][0] + return self.started_events[0] def session_ids(client): @@ -103,7 +103,7 @@ def tearDown(self): """All sessions used in the test must be returned to the pool.""" self.client.drop_database("pymongo_test") used_lsids = self.initial_lsids.copy() - for event in self.session_checker_listener.results["started"]: + for event in self.session_checker_listener.started_events: if "lsid" in event.command: used_lsids.add(event.command["lsid"]["id"]) @@ -118,15 +118,15 @@ def _test_ops(self, client, *ops): last_use = s._server_session.last_use start = time.monotonic() self.assertLessEqual(last_use, start) - listener.results.clear() + listener.reset() # In case "f" modifies its inputs. args = copy.copy(args) kw = copy.copy(kw) kw["session"] = s f(*args, **kw) self.assertGreaterEqual(s._server_session.last_use, start) - self.assertGreaterEqual(len(listener.results["started"]), 1) - for event in listener.results["started"]: + self.assertGreaterEqual(len(listener.started_events), 1) + for event in listener.started_events: self.assertTrue( "lsid" in event.command, "%s sent no lsid with %s" % (f.__name__, event.command_name), @@ -157,11 +157,11 @@ def _test_ops(self, client, *ops): # No explicit session. for f, args, kw in ops: - listener.results.clear() + listener.reset() f(*args, **kw) - self.assertGreaterEqual(len(listener.results["started"]), 1) + self.assertGreaterEqual(len(listener.started_events), 1) lsids = [] - for event in listener.results["started"]: + for event in listener.started_events: self.assertTrue( "lsid" in event.command, "%s sent no lsid with %s" % (f.__name__, event.command_name), @@ -205,7 +205,7 @@ def test_implicit_sessions_checkout(self): (client.db.list_collections, []), ] threads = [] - listener.results.clear() + listener.reset() def thread_target(op, *args): res = op(*args) @@ -225,7 +225,7 @@ def thread_target(op, *args): self.assertIsNone(thread.exc) client.close() lsid_set.clear() - for i in listener.results["started"]: + for i in listener.started_events: if i.command.get("lsid"): lsid_set.add(i.command.get("lsid")["id"]) if len(lsid_set) == 1: @@ -280,13 +280,13 @@ def test_end_sessions(self): self.assertEqual(len(client._topology._session_pool), _MAX_END_SESSIONS + 1) client.close() self.assertEqual(len(client._topology._session_pool), 0) - end_sessions = [e for e in listener.results["started"] if e.command_name == "endSessions"] + end_sessions = [e for e in listener.started_events if e.command_name == "endSessions"] self.assertEqual(len(end_sessions), 2) # Closing again should not send any commands. - listener.results.clear() + listener.reset() client.close() - self.assertEqual(len(listener.results["started"]), 0) + self.assertEqual(len(listener.started_events), 0) def test_client(self): client = self.client @@ -399,10 +399,10 @@ def test_cursor(self): for name, f in ops: with client.start_session() as s: - listener.results.clear() + listener.reset() f(session=s) - self.assertGreaterEqual(len(listener.results["started"]), 1) - for event in listener.results["started"]: + self.assertGreaterEqual(len(listener.started_events), 1) + for event in listener.started_events: self.assertTrue( "lsid" in event.command, "%s sent no lsid with %s" % (name, event.command_name), @@ -419,7 +419,7 @@ def test_cursor(self): # No explicit session. for name, f in ops: - listener.results.clear() + listener.reset() f(session=None) event0 = listener.first_command_started() self.assertTrue( @@ -428,7 +428,7 @@ def test_cursor(self): lsid = event0.command["lsid"] - for event in listener.results["started"][1:]: + for event in listener.started_events[1:]: self.assertTrue( "lsid" in event.command, "%s sent no lsid with %s" % (name, event.command_name) ) @@ -600,7 +600,7 @@ def test_aggregate_error(self): # 3.6.0 mongos only validates the aggregate pipeline when the # database exists. coll.insert_one({}) - listener.results.clear() + listener.reset() with self.assertRaises(OperationFailure): coll.aggregate([{"$badOperation": {"bar": 1}}]) @@ -687,7 +687,7 @@ def _test_unacknowledged_ops(self, client, *ops): for f, args, kw in ops: with client.start_session() as s: - listener.results.clear() + listener.reset() # In case "f" modifies its inputs. args = copy.copy(args) kw = copy.copy(kw) @@ -698,7 +698,7 @@ def _test_unacknowledged_ops(self, client, *ops): f(*args, **kw) if f.__name__ == "create_collection": # create_collection runs listCollections first. - event = listener.results["started"].pop(0) + event = listener.started_events.pop(0) self.assertEqual("listCollections", event.command_name) self.assertIn( "lsid", @@ -707,19 +707,19 @@ def _test_unacknowledged_ops(self, client, *ops): ) # Should not run any command before raising an error. - self.assertFalse(listener.results["started"], "%s sent command" % (f.__name__,)) + self.assertFalse(listener.started_events, "%s sent command" % (f.__name__,)) self.assertTrue(s.has_ended) # Unacknowledged write without a session does not send an lsid. for f, args, kw in ops: - listener.results.clear() + listener.reset() f(*args, **kw) - self.assertGreaterEqual(len(listener.results["started"]), 1) + self.assertGreaterEqual(len(listener.started_events), 1) if f.__name__ == "create_collection": # create_collection runs listCollections first. - event = listener.results["started"].pop(0) + event = listener.started_events.pop(0) self.assertEqual("listCollections", event.command_name) self.assertIn( "lsid", @@ -727,7 +727,7 @@ def _test_unacknowledged_ops(self, client, *ops): "%s sent no lsid with %s" % (f.__name__, event.command_name), ) - for event in listener.results["started"]: + for event in listener.started_events: self.assertNotIn( "lsid", event.command, "%s sent lsid with %s" % (f.__name__, event.command_name) ) @@ -799,26 +799,26 @@ def test_core(self): with self.client.start_session() as sess: self.assertIsNone(sess.cluster_time) self.assertIsNone(sess.operation_time) - self.listener.results.clear() + self.listener.reset() self.client.pymongo_test.test.find_one(session=sess) - started = self.listener.results["started"][0] + started = self.listener.started_events[0] cmd = started.command self.assertIsNone(cmd.get("readConcern")) op_time = sess.operation_time self.assertIsNotNone(op_time) - succeeded = self.listener.results["succeeded"][0] + succeeded = self.listener.succeeded_events[0] reply = succeeded.reply self.assertEqual(op_time, reply.get("operationTime")) # No explicit session self.client.pymongo_test.test.insert_one({}) self.assertEqual(sess.operation_time, op_time) - self.listener.results.clear() + self.listener.reset() try: self.client.pymongo_test.command("doesntexist", session=sess) except: pass - failed = self.listener.results["failed"][0] + failed = self.listener.failed_events[0] failed_op_time = failed.failure.get("operationTime") # Some older builds of MongoDB 3.5 / 3.6 return None for # operationTime when a command fails. Make sure we don't @@ -848,14 +848,14 @@ def _test_reads(self, op, exception=None): coll.find_one({}, session=sess) operation_time = sess.operation_time self.assertIsNotNone(operation_time) - self.listener.results.clear() + self.listener.reset() if exception: with self.assertRaises(exception): op(coll, sess) else: op(coll, sess) act = ( - self.listener.results["started"][0] + self.listener.started_events[0] .command.get("readConcern", {}) .get("afterClusterTime") ) @@ -887,10 +887,10 @@ def _test_writes(self, op): op(coll, sess) operation_time = sess.operation_time self.assertIsNotNone(operation_time) - self.listener.results.clear() + self.listener.reset() coll.find_one({}, session=sess) act = ( - self.listener.results["started"][0] + self.listener.started_events[0] .command.get("readConcern", {}) .get("afterClusterTime") ) @@ -938,9 +938,9 @@ def _test_no_read_concern(self, op): coll.find_one({}, session=sess) operation_time = sess.operation_time self.assertIsNotNone(operation_time) - self.listener.results.clear() + self.listener.reset() op(coll, sess) - rc = self.listener.results["started"][0].command.get("readConcern") + rc = self.listener.started_events[0].command.get("readConcern") self.assertIsNone(rc) @client_context.require_no_standalone @@ -1001,19 +1001,19 @@ def test_get_more_does_not_include_read_concern(self): coll.insert_many([{}, {}]) cursor = coll.find({}).batch_size(1) next(cursor) - self.listener.results.clear() + self.listener.reset() list(cursor) - started = self.listener.results["started"][0] + started = self.listener.started_events[0] self.assertEqual(started.command_name, "getMore") self.assertIsNone(started.command.get("readConcern")) def test_session_not_causal(self): with self.client.start_session(causal_consistency=False) as s: self.client.pymongo_test.test.insert_one({}, session=s) - self.listener.results.clear() + self.listener.reset() self.client.pymongo_test.test.find_one({}, session=s) act = ( - self.listener.results["started"][0] + self.listener.started_events[0] .command.get("readConcern", {}) .get("afterClusterTime") ) @@ -1023,10 +1023,10 @@ def test_session_not_causal(self): def test_server_not_causal(self): with self.client.start_session(causal_consistency=True) as s: self.client.pymongo_test.test.insert_one({}, session=s) - self.listener.results.clear() + self.listener.reset() self.client.pymongo_test.test.find_one({}, session=s) act = ( - self.listener.results["started"][0] + self.listener.started_events[0] .command.get("readConcern", {}) .get("afterClusterTime") ) @@ -1038,17 +1038,17 @@ def test_read_concern(self): with self.client.start_session(causal_consistency=True) as s: coll = self.client.pymongo_test.test coll.insert_one({}, session=s) - self.listener.results.clear() + self.listener.reset() coll.find_one({}, session=s) - read_concern = self.listener.results["started"][0].command.get("readConcern") + read_concern = self.listener.started_events[0].command.get("readConcern") self.assertIsNotNone(read_concern) self.assertIsNone(read_concern.get("level")) self.assertIsNotNone(read_concern.get("afterClusterTime")) coll = coll.with_options(read_concern=ReadConcern("majority")) - self.listener.results.clear() + self.listener.reset() coll.find_one({}, session=s) - read_concern = self.listener.results["started"][0].command.get("readConcern") + read_concern = self.listener.started_events[0].command.get("readConcern") self.assertIsNotNone(read_concern) self.assertEqual(read_concern.get("level"), "majority") self.assertIsNotNone(read_concern.get("afterClusterTime")) @@ -1056,17 +1056,17 @@ def test_read_concern(self): @client_context.require_no_standalone def test_cluster_time_with_server_support(self): self.client.pymongo_test.test.insert_one({}) - self.listener.results.clear() + self.listener.reset() self.client.pymongo_test.test.find_one({}) - after_cluster_time = self.listener.results["started"][0].command.get("$clusterTime") + after_cluster_time = self.listener.started_events[0].command.get("$clusterTime") self.assertIsNotNone(after_cluster_time) @client_context.require_standalone def test_cluster_time_no_server_support(self): self.client.pymongo_test.test.insert_one({}) - self.listener.results.clear() + self.listener.reset() self.client.pymongo_test.test.find_one({}) - after_cluster_time = self.listener.results["started"][0].command.get("$clusterTime") + after_cluster_time = self.listener.started_events[0].command.get("$clusterTime") self.assertIsNone(after_cluster_time) @@ -1129,22 +1129,22 @@ def insert_and_aggregate(): ] for name, f in ops: - listener.results.clear() + listener.reset() # Call f() twice, insert to advance clusterTime, call f() again. f() f() collection.insert_one({}) f() - self.assertGreaterEqual(len(listener.results["started"]), 1) - for i, event in enumerate(listener.results["started"]): + self.assertGreaterEqual(len(listener.started_events), 1) + for i, event in enumerate(listener.started_events): self.assertTrue( "$clusterTime" in event.command, "%s sent no $clusterTime with %s" % (f.__name__, event.command_name), ) if i > 0: - succeeded = listener.results["succeeded"][i - 1] + succeeded = listener.succeeded_events[i - 1] self.assertTrue( "$clusterTime" in succeeded.reply, "%s received no $clusterTime with %s" diff --git a/test/test_transactions.py b/test/test_transactions.py index 02e691329e..dc58beb930 100644 --- a/test/test_transactions.py +++ b/test/test_transactions.py @@ -343,11 +343,11 @@ def test_transaction_starts_with_batched_write(self): self.assertEqual( ["insert", "insert", "commitTransaction"], listener.started_command_names() ) - first_cmd = listener.results["started"][0].command + first_cmd = listener.started_events[0].command self.assertTrue(first_cmd["startTransaction"]) lsid = first_cmd["lsid"] txn_number = first_cmd["txnNumber"] - for event in listener.results["started"][1:]: + for event in listener.started_events[1:]: self.assertNotIn("startTransaction", event.command) self.assertEqual(lsid, event.command["lsid"]) self.assertEqual(txn_number, event.command["txnNumber"]) @@ -459,7 +459,7 @@ def callback(session): # Create the collection. coll.insert_one({}) - listener.results.clear() + listener.reset() with client.start_session() as s: with PatchSessionTimeout(0): with self.assertRaises(OperationFailure): @@ -491,7 +491,7 @@ def callback(session): } ) self.addCleanup(self.set_fail_point, {"configureFailPoint": "failCommand", "mode": "off"}) - listener.results.clear() + listener.reset() with client.start_session() as s: with PatchSessionTimeout(0): @@ -521,7 +521,7 @@ def callback(session): } ) self.addCleanup(self.set_fail_point, {"configureFailPoint": "failCommand", "mode": "off"}) - listener.results.clear() + listener.reset() with client.start_session() as s: with PatchSessionTimeout(0): diff --git a/test/test_versioned_api.py b/test/test_versioned_api.py index a2fd059d21..7dbf2c867d 100644 --- a/test/test_versioned_api.py +++ b/test/test_versioned_api.py @@ -83,7 +83,7 @@ def test_command_options(self): self.addCleanup(coll.delete_many, {}) list(coll.find(batch_size=25)) client.admin.command("ping") - self.assertServerApiInAllCommands(listener.results["started"]) + self.assertServerApiInAllCommands(listener.started_events) @client_context.require_version_min(4, 7) @client_context.require_transactions @@ -100,7 +100,7 @@ def test_command_options_txn(self): coll.insert_many([{} for _ in range(100)], session=s) list(coll.find(batch_size=25, session=s)) client.test.command("find", "test", session=s) - self.assertServerApiInAllCommands(listener.results["started"]) + self.assertServerApiInAllCommands(listener.started_events) if __name__ == "__main__": diff --git a/test/utils.py b/test/utils.py index 6b0876a158..842e9e3a7b 100644 --- a/test/utils.py +++ b/test/utils.py @@ -29,7 +29,7 @@ from collections import abc, defaultdict from functools import partial from test import client_context, db_pwd, db_user -from typing import Any +from typing import Any, List from bson import json_util from bson.objectid import ObjectId @@ -140,26 +140,43 @@ def pool_closed(self, event): self.add_event(event) -class EventListener(monitoring.CommandListener): +class EventListener(BaseListener, monitoring.CommandListener): def __init__(self): + super(EventListener, self).__init__() self.results = defaultdict(list) - def started(self, event): - self.results["started"].append(event) + @property + def started_events(self) -> List[monitoring.CommandStartedEvent]: + return self.results["started"] - def succeeded(self, event): - self.results["succeeded"].append(event) + @property + def succeeded_events(self) -> List[monitoring.CommandSucceededEvent]: + return self.results["succeeded"] - def failed(self, event): - self.results["failed"].append(event) + @property + def failed_events(self) -> List[monitoring.CommandFailedEvent]: + return self.results["failed"] - def started_command_names(self): + def started(self, event: monitoring.CommandStartedEvent) -> None: + self.started_events.append(event) + self.add_event(event) + + def succeeded(self, event: monitoring.CommandSucceededEvent) -> None: + self.succeeded_events.append(event) + self.add_event(event) + + def failed(self, event: monitoring.CommandFailedEvent) -> None: + self.failed_events.append(event) + self.add_event(event) + + def started_command_names(self) -> List[str]: """Return list of command names started.""" - return [event.command_name for event in self.results["started"]] + return [event.command_name for event in self.started_events] - def reset(self): + def reset(self) -> None: """Reset the state of this listener.""" self.results.clear() + super(EventListener, self).reset() class TopologyEventListener(monitoring.TopologyListener): diff --git a/test/utils_spec_runner.py b/test/utils_spec_runner.py index f8ad26efe7..8528ecb8c7 100644 --- a/test/utils_spec_runner.py +++ b/test/utils_spec_runner.py @@ -371,16 +371,16 @@ def run_operations(self, sessions, collection, ops, in_with_transaction=False): # TODO: factor with test_command_monitoring.py def check_events(self, test, listener, session_ids): - res = listener.results + events = listener.started_events if not len(test["expectations"]): return # Give a nicer message when there are missing or extra events - cmds = decode_raw([event.command for event in res["started"]]) - self.assertEqual(len(res["started"]), len(test["expectations"]), cmds) + cmds = decode_raw([event.command for event in events]) + self.assertEqual(len(events), len(test["expectations"]), cmds) for i, expectation in enumerate(test["expectations"]): event_type = next(iter(expectation)) - event = res["started"][i] + event = events[i] # The tests substitute 42 for any number other than 0. if event.command_name == "getMore" and event.command["getMore"]: From 5606558e9fe208c2559605037e64a256f007fe40 Mon Sep 17 00:00:00 2001 From: Shane Harvey Date: Fri, 11 Nov 2022 15:02:25 -0800 Subject: [PATCH 2/4] PYTHON-3516 find/replace more usages --- test/test_change_stream.py | 4 +- test/test_collection.py | 16 +- test/test_command_monitoring_legacy.py | 6 +- test/test_comment.py | 2 +- test/test_cursor.py | 86 +++++----- test/test_database.py | 4 +- test/test_monitoring.py | 219 +++++++++++-------------- 7 files changed, 153 insertions(+), 184 deletions(-) diff --git a/test/test_change_stream.py b/test/test_change_stream.py index b000c3eddb..77a70a1a0c 100644 --- a/test/test_change_stream.py +++ b/test/test_change_stream.py @@ -259,8 +259,8 @@ def _test_full_pipeline(self, expected_cs_stage): with self.change_stream_with_client(client, [{"$project": {"foo": 0}}]) as _: pass - self.assertEqual(1, len(results["started"])) - command = results["started"][0] + self.assertEqual(1, len(listener.started_events)) + command = listener.started_events[0] self.assertEqual("aggregate", command.command_name) self.assertEqual( [{"$changeStream": expected_cs_stage}, {"$project": {"foo": 0}}], diff --git a/test/test_collection.py b/test/test_collection.py index e7ac248124..3519833377 100644 --- a/test/test_collection.py +++ b/test/test_collection.py @@ -1991,15 +1991,15 @@ def test_find_one_and_write_concern(self): db.command("ping") results.clear() c_w0.find_one_and_update({"_id": 1}, {"$set": {"foo": "bar"}}) - self.assertEqual({"w": 0}, results["started"][0].command["writeConcern"]) + self.assertEqual({"w": 0}, listener.started_events[0].command["writeConcern"]) results.clear() c_w0.find_one_and_replace({"_id": 1}, {"foo": "bar"}) - self.assertEqual({"w": 0}, results["started"][0].command["writeConcern"]) + self.assertEqual({"w": 0}, listener.started_events[0].command["writeConcern"]) results.clear() c_w0.find_one_and_delete({"_id": 1}) - self.assertEqual({"w": 0}, results["started"][0].command["writeConcern"]) + self.assertEqual({"w": 0}, listener.started_events[0].command["writeConcern"]) results.clear() # Test write concern errors. @@ -2017,26 +2017,26 @@ def test_find_one_and_write_concern(self): WriteConcernError, c_wc_error.find_one_and_replace, {"w": 0}, - results["started"][0].command["writeConcern"], + listener.started_events[0].command["writeConcern"], ) self.assertRaises( WriteConcernError, c_wc_error.find_one_and_delete, {"w": 0}, - results["started"][0].command["writeConcern"], + listener.started_events[0].command["writeConcern"], ) results.clear() c_default.find_one_and_update({"_id": 1}, {"$set": {"foo": "bar"}}) - self.assertNotIn("writeConcern", results["started"][0].command) + self.assertNotIn("writeConcern", listener.started_events[0].command) results.clear() c_default.find_one_and_replace({"_id": 1}, {"foo": "bar"}) - self.assertNotIn("writeConcern", results["started"][0].command) + self.assertNotIn("writeConcern", listener.started_events[0].command) results.clear() c_default.find_one_and_delete({"_id": 1}) - self.assertNotIn("writeConcern", results["started"][0].command) + self.assertNotIn("writeConcern", listener.started_events[0].command) results.clear() def test_find_with_nested(self): diff --git a/test/test_command_monitoring_legacy.py b/test/test_command_monitoring_legacy.py index 6f5ef8e43f..be5f4368de 100644 --- a/test/test_command_monitoring_legacy.py +++ b/test/test_command_monitoring_legacy.py @@ -58,9 +58,9 @@ def tearDown(self): def format_actual_results(results): - started = results["started"] - succeeded = results["succeeded"] - failed = results["failed"] + started = listener.started_events + succeeded = listener.succeeded_events + failed = listener.failed_events msg = "\nStarted: %r" % (started[0].command if len(started) else None,) msg += "\nSucceeded: %r" % (succeeded[0].reply if len(succeeded) else None,) msg += "\nFailed: %r" % (failed[0].failure if len(failed) else None,) diff --git a/test/test_comment.py b/test/test_comment.py index c83428fd70..9bd4f5d976 100644 --- a/test/test_comment.py +++ b/test/test_comment.py @@ -77,7 +77,7 @@ def _test_ops( tested = False # For some reason collection.list_indexes creates two commands and the first # one doesn't contain 'comment'. - for i in results["started"]: + for i in listener.started_events: if cc == i.command.get("comment", ""): self.assertEqual(cc, i.command["comment"]) tested = True diff --git a/test/test_cursor.py b/test/test_cursor.py index f140ab6da8..0baa48297e 100644 --- a/test/test_cursor.py +++ b/test/test_cursor.py @@ -223,74 +223,74 @@ def test_max_await_time_ms(self): # Tailable_await defaults. list(coll.find(cursor_type=CursorType.TAILABLE_AWAIT)) # find - self.assertFalse("maxTimeMS" in results["started"][0].command) + self.assertFalse("maxTimeMS" in listener.started_events[0].command) # getMore - self.assertFalse("maxTimeMS" in results["started"][1].command) + self.assertFalse("maxTimeMS" in listener.started_events[1].command) results.clear() # Tailable_await with max_await_time_ms set. list(coll.find(cursor_type=CursorType.TAILABLE_AWAIT).max_await_time_ms(99)) # find - self.assertEqual("find", results["started"][0].command_name) - self.assertFalse("maxTimeMS" in results["started"][0].command) + self.assertEqual("find", listener.started_events[0].command_name) + self.assertFalse("maxTimeMS" in listener.started_events[0].command) # getMore - self.assertEqual("getMore", results["started"][1].command_name) - self.assertTrue("maxTimeMS" in results["started"][1].command) - self.assertEqual(99, results["started"][1].command["maxTimeMS"]) + self.assertEqual("getMore", listener.started_events[1].command_name) + self.assertTrue("maxTimeMS" in listener.started_events[1].command) + self.assertEqual(99, listener.started_events[1].command["maxTimeMS"]) results.clear() # Tailable_await with max_time_ms list(coll.find(cursor_type=CursorType.TAILABLE_AWAIT).max_time_ms(99)) # find - self.assertEqual("find", results["started"][0].command_name) - self.assertTrue("maxTimeMS" in results["started"][0].command) - self.assertEqual(99, results["started"][0].command["maxTimeMS"]) + self.assertEqual("find", listener.started_events[0].command_name) + self.assertTrue("maxTimeMS" in listener.started_events[0].command) + self.assertEqual(99, listener.started_events[0].command["maxTimeMS"]) # getMore - self.assertEqual("getMore", results["started"][1].command_name) - self.assertFalse("maxTimeMS" in results["started"][1].command) + self.assertEqual("getMore", listener.started_events[1].command_name) + self.assertFalse("maxTimeMS" in listener.started_events[1].command) results.clear() # Tailable_await with both max_time_ms and max_await_time_ms list(coll.find(cursor_type=CursorType.TAILABLE_AWAIT).max_time_ms(99).max_await_time_ms(99)) # find - self.assertEqual("find", results["started"][0].command_name) - self.assertTrue("maxTimeMS" in results["started"][0].command) - self.assertEqual(99, results["started"][0].command["maxTimeMS"]) + self.assertEqual("find", listener.started_events[0].command_name) + self.assertTrue("maxTimeMS" in listener.started_events[0].command) + self.assertEqual(99, listener.started_events[0].command["maxTimeMS"]) # getMore - self.assertEqual("getMore", results["started"][1].command_name) - self.assertTrue("maxTimeMS" in results["started"][1].command) - self.assertEqual(99, results["started"][1].command["maxTimeMS"]) + self.assertEqual("getMore", listener.started_events[1].command_name) + self.assertTrue("maxTimeMS" in listener.started_events[1].command) + self.assertEqual(99, listener.started_events[1].command["maxTimeMS"]) results.clear() # Non tailable_await with max_await_time_ms list(coll.find(batch_size=1).max_await_time_ms(99)) # find - self.assertEqual("find", results["started"][0].command_name) - self.assertFalse("maxTimeMS" in results["started"][0].command) + self.assertEqual("find", listener.started_events[0].command_name) + self.assertFalse("maxTimeMS" in listener.started_events[0].command) # getMore - self.assertEqual("getMore", results["started"][1].command_name) - self.assertFalse("maxTimeMS" in results["started"][1].command) + self.assertEqual("getMore", listener.started_events[1].command_name) + self.assertFalse("maxTimeMS" in listener.started_events[1].command) results.clear() # Non tailable_await with max_time_ms list(coll.find(batch_size=1).max_time_ms(99)) # find - self.assertEqual("find", results["started"][0].command_name) - self.assertTrue("maxTimeMS" in results["started"][0].command) - self.assertEqual(99, results["started"][0].command["maxTimeMS"]) + self.assertEqual("find", listener.started_events[0].command_name) + self.assertTrue("maxTimeMS" in listener.started_events[0].command) + self.assertEqual(99, listener.started_events[0].command["maxTimeMS"]) # getMore - self.assertEqual("getMore", results["started"][1].command_name) - self.assertFalse("maxTimeMS" in results["started"][1].command) + self.assertEqual("getMore", listener.started_events[1].command_name) + self.assertFalse("maxTimeMS" in listener.started_events[1].command) # Non tailable_await with both max_time_ms and max_await_time_ms list(coll.find(batch_size=1).max_time_ms(99).max_await_time_ms(88)) # find - self.assertEqual("find", results["started"][0].command_name) - self.assertTrue("maxTimeMS" in results["started"][0].command) - self.assertEqual(99, results["started"][0].command["maxTimeMS"]) + self.assertEqual("find", listener.started_events[0].command_name) + self.assertTrue("maxTimeMS" in listener.started_events[0].command) + self.assertEqual(99, listener.started_events[0].command["maxTimeMS"]) # getMore - self.assertEqual("getMore", results["started"][1].command_name) - self.assertFalse("maxTimeMS" in results["started"][1].command) + self.assertEqual("getMore", listener.started_events[1].command_name) + self.assertFalse("maxTimeMS" in listener.started_events[1].command) @client_context.require_test_commands @client_context.require_no_mongos @@ -1187,10 +1187,10 @@ def test_close_kills_cursor_synchronously(self): cursor.close() def assertCursorKilled(): - self.assertEqual(1, len(results["started"])) - self.assertEqual("killCursors", results["started"][0].command_name) - self.assertEqual(1, len(results["succeeded"])) - self.assertEqual("killCursors", results["succeeded"][0].command_name) + self.assertEqual(1, len(listener.started_events)) + self.assertEqual("killCursors", listener.started_events[0].command_name) + self.assertEqual(1, len(listener.succeeded_events)) + self.assertEqual("killCursors", listener.succeeded_events[0].command_name) assertCursorKilled() results.clear() @@ -1204,7 +1204,7 @@ def assertCursorKilled(): if cursor.cursor_id: assertCursorKilled() else: - self.assertEqual(0, len(results["started"])) + self.assertEqual(0, len(listener.started_events)) def test_delete_not_initialized(self): # Creating a cursor with invalid arguments will not run __init__ @@ -1397,9 +1397,9 @@ def test_monitoring(self): next(cursor) try: results = listener.results - started = results["started"][0] - succeeded = results["succeeded"][0] - self.assertEqual(0, len(results["failed"])) + started = listener.started_events[0] + succeeded = listener.succeeded_events[0] + self.assertEqual(0, len(listener.failed_events)) self.assertEqual("getMore", started.command_name) self.assertEqual("pymongo_test", started.database_name) self.assertEqual("getMore", succeeded.command_name) @@ -1557,9 +1557,9 @@ def test_monitoring(self): n = 0 for batch in cursor: results = listener.results - started = results["started"][0] - succeeded = results["succeeded"][0] - self.assertEqual(0, len(results["failed"])) + started = listener.started_events[0] + succeeded = listener.succeeded_events[0] + self.assertEqual(0, len(listener.failed_events)) self.assertEqual("getMore", started.command_name) self.assertEqual("pymongo_test", started.database_name) self.assertEqual("getMore", succeeded.command_name) diff --git a/test/test_database.py b/test/test_database.py index d56b9800e4..01e9fb9ad2 100644 --- a/test/test_database.py +++ b/test/test_database.py @@ -215,7 +215,7 @@ def test_list_collection_names_filter(self): names = db.list_collection_names(filter=filter) self.assertIn("capped", names) self.assertIn("non_capped", names) - command = results["started"][0].command + command = listener.started_events[0].command self.assertIn("nameOnly", command) self.assertTrue(command["nameOnly"]) @@ -231,7 +231,7 @@ def test_check_exists(self): listener.reset() db.drop_collection("unique") db.create_collection("unique", check_exists=False) - self.assertTrue(len(results["started"]) > 0) + self.assertTrue(len(listener.started_events) > 0) self.assertNotIn("listCollections", listener.started_command_names()) def test_list_collections(self): diff --git a/test/test_monitoring.py b/test/test_monitoring.py index 48e05212d4..ffa535eeed 100644 --- a/test/test_monitoring.py +++ b/test/test_monitoring.py @@ -54,10 +54,9 @@ def tearDown(self): def test_started_simple(self): self.client.pymongo_test.command("ping") - results = self.listener.results - started = results["started"][0] - succeeded = results["succeeded"][0] - self.assertEqual(0, len(results["failed"])) + started = self.listener.started_events[0] + succeeded = self.listener.succeeded_events[0] + self.assertEqual(0, len(self.listener.failed_events)) self.assertTrue(isinstance(succeeded, monitoring.CommandSucceededEvent)) self.assertTrue(isinstance(started, monitoring.CommandStartedEvent)) self.assertEqualCommand(SON([("ping", 1)]), started.command) @@ -68,10 +67,9 @@ def test_started_simple(self): def test_succeeded_simple(self): self.client.pymongo_test.command("ping") - results = self.listener.results - started = results["started"][0] - succeeded = results["succeeded"][0] - self.assertEqual(0, len(results["failed"])) + started = self.listener.started_events[0] + succeeded = self.listener.succeeded_events[0] + self.assertEqual(0, len(self.listener.failed_events)) self.assertTrue(isinstance(started, monitoring.CommandStartedEvent)) self.assertTrue(isinstance(succeeded, monitoring.CommandSucceededEvent)) self.assertEqual("ping", succeeded.command_name) @@ -85,10 +83,9 @@ def test_failed_simple(self): self.client.pymongo_test.command("oops!") except OperationFailure: pass - results = self.listener.results - started = results["started"][0] - failed = results["failed"][0] - self.assertEqual(0, len(results["succeeded"])) + started = self.listener.started_events[0] + failed = self.listener.failed_events[0] + self.assertEqual(0, len(self.listener.succeeded_events)) self.assertTrue(isinstance(started, monitoring.CommandStartedEvent)) self.assertTrue(isinstance(failed, monitoring.CommandFailedEvent)) self.assertEqual("oops!", failed.command_name) @@ -99,10 +96,9 @@ def test_failed_simple(self): def test_find_one(self): self.client.pymongo_test.test.find_one() - results = self.listener.results - started = results["started"][0] - succeeded = results["succeeded"][0] - self.assertEqual(0, len(results["failed"])) + started = self.listener.started_events[0] + succeeded = self.listener.succeeded_events[0] + self.assertEqual(0, len(self.listener.failed_events)) self.assertTrue(isinstance(succeeded, monitoring.CommandSucceededEvent)) self.assertTrue(isinstance(started, monitoring.CommandStartedEvent)) self.assertEqualCommand( @@ -122,10 +118,9 @@ def test_find_and_get_more(self): for _ in range(4): next(cursor) cursor_id = cursor.cursor_id - results = self.listener.results - started = results["started"][0] - succeeded = results["succeeded"][0] - self.assertEqual(0, len(results["failed"])) + started = self.listener.started_events[0] + succeeded = self.listener.succeeded_events[0] + self.assertEqual(0, len(self.listener.failed_events)) self.assertTrue(isinstance(started, monitoring.CommandStartedEvent)) self.assertEqualCommand( SON( @@ -152,10 +147,9 @@ def test_find_and_get_more(self): # that returns id of 0 and no results. next(cursor) try: - results = self.listener.results - started = results["started"][0] - succeeded = results["succeeded"][0] - self.assertEqual(0, len(results["failed"])) + started = self.listener.started_events[0] + succeeded = self.listener.succeeded_events[0] + self.assertEqual(0, len(self.listener.failed_events)) self.assertTrue(isinstance(started, monitoring.CommandStartedEvent)) self.assertEqualCommand( SON([("getMore", cursor_id), ("collection", "test"), ("batchSize", 4)]), @@ -188,10 +182,9 @@ def test_find_with_explain(self): if self.client.is_mongos: coll = coll.with_options(read_preference=ReadPreference.PRIMARY_PREFERRED) res = coll.find().explain() - results = self.listener.results - started = results["started"][0] - succeeded = results["succeeded"][0] - self.assertEqual(0, len(results["failed"])) + started = self.listener.started_events[0] + succeeded = self.listener.succeeded_events[0] + self.assertEqual(0, len(self.listener.failed_events)) self.assertTrue(isinstance(started, monitoring.CommandStartedEvent)) self.assertEqualCommand(cmd, started.command) self.assertEqual("explain", started.command_name) @@ -220,10 +213,9 @@ def _test_find_options(self, query, expected_cmd): next(cursor) try: - results = self.listener.results - started = results["started"][0] - succeeded = results["succeeded"][0] - self.assertEqual(0, len(results["failed"])) + started = self.listener.started_events[0] + succeeded = self.listener.succeeded_events[0] + self.assertEqual(0, len(self.listener.failed_events)) self.assertTrue(isinstance(started, monitoring.CommandStartedEvent)) self.assertEqualCommand(expected_cmd, started.command) self.assertEqual("find", started.command_name) @@ -302,10 +294,9 @@ def test_command_and_get_more(self): for _ in range(4): next(cursor) cursor_id = cursor.cursor_id - results = self.listener.results - started = results["started"][0] - succeeded = results["succeeded"][0] - self.assertEqual(0, len(results["failed"])) + started = self.listener.started_events[0] + succeeded = self.listener.succeeded_events[0] + self.assertEqual(0, len(self.listener.failed_events)) self.assertTrue(isinstance(started, monitoring.CommandStartedEvent)) self.assertEqualCommand( SON( @@ -336,10 +327,9 @@ def test_command_and_get_more(self): self.listener.reset() next(cursor) try: - results = self.listener.results - started = results["started"][0] - succeeded = results["succeeded"][0] - self.assertEqual(0, len(results["failed"])) + started = self.listener.started_events[0] + succeeded = self.listener.succeeded_events[0] + self.assertEqual(0, len(self.listener.failed_events)) self.assertTrue(isinstance(started, monitoring.CommandStartedEvent)) self.assertEqualCommand( SON([("getMore", cursor_id), ("collection", "test"), ("batchSize", 4)]), @@ -377,10 +367,9 @@ def test_get_more_failure(self): next(cursor) except Exception: pass - results = self.listener.results - started = results["started"][0] - self.assertEqual(0, len(results["succeeded"])) - failed = results["failed"][0] + started = self.listener.started_events[0] + self.assertEqual(0, len(self.listener.succeeded_events)) + failed = self.listener.failed_events[0] self.assertTrue(isinstance(started, monitoring.CommandStartedEvent)) self.assertEqualCommand( SON([("getMore", cursor_id), ("collection", "test")]), started.command @@ -409,10 +398,9 @@ def test_not_primary_error(self): client.pymongo_test.test.find_one_and_delete({}) except NotPrimaryError as exc: error = exc.errors - results = self.listener.results - started = results["started"][0] - failed = results["failed"][0] - self.assertEqual(0, len(results["succeeded"])) + started = self.listener.started_events[0] + failed = self.listener.failed_events[0] + self.assertEqual(0, len(self.listener.succeeded_events)) self.assertTrue(isinstance(started, monitoring.CommandStartedEvent)) self.assertTrue(isinstance(failed, monitoring.CommandFailedEvent)) self.assertEqual("findAndModify", failed.command_name) @@ -432,10 +420,9 @@ def test_exhaust(self): ) next(cursor) cursor_id = cursor.cursor_id - results = self.listener.results - started = results["started"][0] - succeeded = results["succeeded"][0] - self.assertEqual(0, len(results["failed"])) + started = self.listener.started_events[0] + succeeded = self.listener.succeeded_events[0] + self.assertEqual(0, len(self.listener.failed_events)) self.assertTrue(isinstance(started, monitoring.CommandStartedEvent)) self.assertEqualCommand( SON( @@ -464,9 +451,8 @@ def test_exhaust(self): self.listener.reset() tuple(cursor) - results = self.listener.results - self.assertEqual(0, len(results["failed"])) - for event in results["started"]: + self.assertEqual(0, len(self.listener.failed_events)) + for event in self.listener.started_events: self.assertTrue(isinstance(event, monitoring.CommandStartedEvent)) self.assertEqualCommand( SON([("getMore", cursor_id), ("collection", "test"), ("batchSize", 5)]), @@ -476,14 +462,14 @@ def test_exhaust(self): self.assertEqual(cursor.address, event.connection_id) self.assertEqual("pymongo_test", event.database_name) self.assertTrue(isinstance(event.request_id, int)) - for event in results["succeeded"]: + for event in self.listener.succeeded_events: self.assertTrue(isinstance(event, monitoring.CommandSucceededEvent)) self.assertTrue(isinstance(event.duration_micros, int)) self.assertEqual("getMore", event.command_name) self.assertTrue(isinstance(event.request_id, int)) self.assertEqual(cursor.address, event.connection_id) # Last getMore receives a response with cursor id 0. - self.assertEqual(0, results["succeeded"][-1].reply["cursor"]["id"]) + self.assertEqual(0, self.listener.succeeded_events[-1].reply["cursor"]["id"]) def test_kill_cursors(self): with client_knobs(kill_cursor_frequency=0.01): @@ -495,10 +481,9 @@ def test_kill_cursors(self): self.listener.reset() cursor.close() time.sleep(2) - results = self.listener.results - started = results["started"][0] - succeeded = results["succeeded"][0] - self.assertEqual(0, len(results["failed"])) + started = self.listener.started_events[0] + succeeded = self.listener.succeeded_events[0] + self.assertEqual(0, len(self.listener.failed_events)) self.assertTrue(isinstance(started, monitoring.CommandStartedEvent)) # There could be more than one cursor_id here depending on # when the thread last ran. @@ -528,10 +513,9 @@ def test_non_bulk_writes(self): # Implied write concern insert_one res = coll.insert_one({"x": 1}) - results = self.listener.results - started = results["started"][0] - succeeded = results["succeeded"][0] - self.assertEqual(0, len(results["failed"])) + started = self.listener.started_events[0] + succeeded = self.listener.succeeded_events[0] + self.assertEqual(0, len(self.listener.failed_events)) self.assertIsInstance(started, monitoring.CommandStartedEvent) expected = SON( [ @@ -558,10 +542,9 @@ def test_non_bulk_writes(self): self.listener.reset() coll = coll.with_options(write_concern=WriteConcern(w=0)) res = coll.insert_one({"x": 1}) - results = self.listener.results - started = results["started"][0] - succeeded = results["succeeded"][0] - self.assertEqual(0, len(results["failed"])) + started = self.listener.started_events[0] + succeeded = self.listener.succeeded_events[0] + self.assertEqual(0, len(self.listener.failed_events)) self.assertIsInstance(started, monitoring.CommandStartedEvent) expected = SON( [ @@ -587,10 +570,9 @@ def test_non_bulk_writes(self): self.listener.reset() coll = coll.with_options(write_concern=WriteConcern(w=1)) res = coll.insert_one({"x": 1}) - results = self.listener.results - started = results["started"][0] - succeeded = results["succeeded"][0] - self.assertEqual(0, len(results["failed"])) + started = self.listener.started_events[0] + succeeded = self.listener.succeeded_events[0] + self.assertEqual(0, len(self.listener.failed_events)) self.assertIsInstance(started, monitoring.CommandStartedEvent) expected = SON( [ @@ -617,10 +599,9 @@ def test_non_bulk_writes(self): # delete_many self.listener.reset() res = coll.delete_many({"x": 1}) - results = self.listener.results - started = results["started"][0] - succeeded = results["succeeded"][0] - self.assertEqual(0, len(results["failed"])) + started = self.listener.started_events[0] + succeeded = self.listener.succeeded_events[0] + self.assertEqual(0, len(self.listener.failed_events)) self.assertIsInstance(started, monitoring.CommandStartedEvent) expected = SON( [ @@ -648,10 +629,9 @@ def test_non_bulk_writes(self): self.listener.reset() oid = ObjectId() res = coll.replace_one({"_id": oid}, {"_id": oid, "x": 1}, upsert=True) - results = self.listener.results - started = results["started"][0] - succeeded = results["succeeded"][0] - self.assertEqual(0, len(results["failed"])) + started = self.listener.started_events[0] + succeeded = self.listener.succeeded_events[0] + self.assertEqual(0, len(self.listener.failed_events)) self.assertIsInstance(started, monitoring.CommandStartedEvent) expected = SON( [ @@ -691,10 +671,9 @@ def test_non_bulk_writes(self): # update_one self.listener.reset() res = coll.update_one({"x": 1}, {"$inc": {"x": 1}}) - results = self.listener.results - started = results["started"][0] - succeeded = results["succeeded"][0] - self.assertEqual(0, len(results["failed"])) + started = self.listener.started_events[0] + succeeded = self.listener.succeeded_events[0] + self.assertEqual(0, len(self.listener.failed_events)) self.assertIsInstance(started, monitoring.CommandStartedEvent) expected = SON( [ @@ -733,10 +712,9 @@ def test_non_bulk_writes(self): # update_many self.listener.reset() res = coll.update_many({"x": 2}, {"$inc": {"x": 1}}) - results = self.listener.results - started = results["started"][0] - succeeded = results["succeeded"][0] - self.assertEqual(0, len(results["failed"])) + started = self.listener.started_events[0] + succeeded = self.listener.succeeded_events[0] + self.assertEqual(0, len(self.listener.failed_events)) self.assertIsInstance(started, monitoring.CommandStartedEvent) expected = SON( [ @@ -775,10 +753,9 @@ def test_non_bulk_writes(self): # delete_one self.listener.reset() _ = coll.delete_one({"x": 3}) - results = self.listener.results - started = results["started"][0] - succeeded = results["succeeded"][0] - self.assertEqual(0, len(results["failed"])) + started = self.listener.started_events[0] + succeeded = self.listener.succeeded_events[0] + self.assertEqual(0, len(self.listener.failed_events)) self.assertIsInstance(started, monitoring.CommandStartedEvent) expected = SON( [ @@ -811,10 +788,9 @@ def test_non_bulk_writes(self): coll.insert_one({"_id": 1}) except OperationFailure: pass - results = self.listener.results - started = results["started"][0] - succeeded = results["succeeded"][0] - self.assertEqual(0, len(results["failed"])) + started = self.listener.started_events[0] + succeeded = self.listener.succeeded_events[0] + self.assertEqual(0, len(self.listener.failed_events)) self.assertIsInstance(started, monitoring.CommandStartedEvent) expected = SON( [ @@ -853,10 +829,9 @@ def test_insert_many(self): big = "x" * (1024 * 1024 * 4) docs = [{"_id": i, "big": big} for i in range(6)] coll.insert_many(docs) - results = self.listener.results - started = results["started"] - succeeded = results["succeeded"] - self.assertEqual(0, len(results["failed"])) + started = self.listener.started_events + succeeded = self.listener.succeeded_events + self.assertEqual(0, len(self.listener.failed_events)) documents = [] count = 0 operation_id = started[0].operation_id @@ -895,10 +870,9 @@ def test_insert_many_unacknowledged(self): big = "x" * (1024 * 1024 * 12) docs = [{"_id": i, "big": big} for i in range(6)] unack_coll.insert_many(docs) - results = self.listener.results - started = results["started"] - succeeded = results["succeeded"] - self.assertEqual(0, len(results["failed"])) + started = self.listener.started_events + succeeded = self.listener.succeeded_events + self.assertEqual(0, len(self.listener.failed_events)) documents = [] operation_id = started[0].operation_id self.assertIsInstance(operation_id, int) @@ -937,10 +911,9 @@ def test_bulk_write(self): DeleteOne({"_id": 1}), ] ) - results = self.listener.results - started = results["started"] - succeeded = results["succeeded"] - self.assertEqual(0, len(results["failed"])) + started = self.listener.started_events + succeeded = self.listener.succeeded_events + self.assertEqual(0, len(self.listener.failed_events)) operation_id = started[0].operation_id pairs = list(zip(started, succeeded)) self.assertEqual(3, len(pairs)) @@ -1054,10 +1027,9 @@ def test_write_errors(self): ) except OperationFailure: pass - results = self.listener.results - started = results["started"] - succeeded = results["succeeded"] - self.assertEqual(0, len(results["failed"])) + started = self.listener.started_events + succeeded = self.listener.succeeded_events + self.assertEqual(0, len(self.listener.failed_events)) operation_id = started[0].operation_id pairs = list(zip(started, succeeded)) errors = [] @@ -1086,10 +1058,9 @@ def test_first_batch_helper(self): # this test should still pass. self.listener.reset() tuple(self.client.pymongo_test.test.list_indexes()) - results = self.listener.results - started = results["started"][0] - succeeded = results["succeeded"][0] - self.assertEqual(0, len(results["failed"])) + started = self.listener.started_events[0] + succeeded = self.listener.succeeded_events[0] + self.assertEqual(0, len(self.listener.failed_events)) self.assertIsInstance(started, monitoring.CommandStartedEvent) expected = SON([("listIndexes", "test"), ("cursor", {})]) self.assertEqualCommand(expected, started.command) @@ -1117,10 +1088,9 @@ def test_sensitive_commands(self): listeners.publish_command_success( delta, {"nonce": "e474f4561c5eb40b", "ok": 1.0}, "getnonce", 12345, self.client.address ) - results = self.listener.results - started = results["started"][0] - succeeded = results["succeeded"][0] - self.assertEqual(0, len(results["failed"])) + started = self.listener.started_events[0] + succeeded = self.listener.succeeded_events[0] + self.assertEqual(0, len(self.listener.failed_events)) self.assertIsInstance(started, monitoring.CommandStartedEvent) self.assertEqual({}, started.command) self.assertEqual("pymongo_test", started.database_name) @@ -1163,10 +1133,9 @@ def setUp(self): def test_simple(self): self.client.pymongo_test.command("ping") - results = self.listener.results - started = results["started"][0] - succeeded = results["succeeded"][0] - self.assertEqual(0, len(results["failed"])) + started = self.listener.started_events[0] + succeeded = self.listener.succeeded_events[0] + self.assertEqual(0, len(self.listener.failed_events)) self.assertTrue(isinstance(succeeded, monitoring.CommandSucceededEvent)) self.assertTrue(isinstance(started, monitoring.CommandStartedEvent)) self.assertEqualCommand(SON([("ping", 1)]), started.command) From a15ca17cd8622c69d6bd9e9c83da1be2b0d20174 Mon Sep 17 00:00:00 2001 From: Shane Harvey Date: Fri, 11 Nov 2022 15:10:34 -0800 Subject: [PATCH 3/4] PYTHON-3516 More fixes --- test/test_change_stream.py | 1 - test/test_collection.py | 17 ++++++++--------- test/test_command_monitoring_legacy.py | 16 +++++++++------- test/test_comment.py | 5 ++--- test/test_cursor.py | 18 +++++++----------- test/test_database.py | 6 ++---- 6 files changed, 28 insertions(+), 35 deletions(-) diff --git a/test/test_change_stream.py b/test/test_change_stream.py index 77a70a1a0c..2388a6e1f4 100644 --- a/test/test_change_stream.py +++ b/test/test_change_stream.py @@ -255,7 +255,6 @@ def test_start_at_operation_time(self): @no_type_check def _test_full_pipeline(self, expected_cs_stage): client, listener = self.client_with_listener("aggregate") - results = listener.results with self.change_stream_with_client(client, [{"$project": {"foo": 0}}]) as _: pass diff --git a/test/test_collection.py b/test/test_collection.py index 3519833377..49a7017ef3 100644 --- a/test/test_collection.py +++ b/test/test_collection.py @@ -1986,21 +1986,20 @@ def test_find_one_and_write_concern(self): c_w0 = db.get_collection("test", write_concern=WriteConcern(w=0)) # default WriteConcern. c_default = db.get_collection("test", write_concern=WriteConcern()) - results = listener.results # Authenticate the client and throw out auth commands from the listener. db.command("ping") - results.clear() + listener.reset() c_w0.find_one_and_update({"_id": 1}, {"$set": {"foo": "bar"}}) self.assertEqual({"w": 0}, listener.started_events[0].command["writeConcern"]) - results.clear() + listener.reset() c_w0.find_one_and_replace({"_id": 1}, {"foo": "bar"}) self.assertEqual({"w": 0}, listener.started_events[0].command["writeConcern"]) - results.clear() + listener.reset() c_w0.find_one_and_delete({"_id": 1}) self.assertEqual({"w": 0}, listener.started_events[0].command["writeConcern"]) - results.clear() + listener.reset() # Test write concern errors. if client_context.is_rs: @@ -2025,19 +2024,19 @@ def test_find_one_and_write_concern(self): {"w": 0}, listener.started_events[0].command["writeConcern"], ) - results.clear() + listener.reset() c_default.find_one_and_update({"_id": 1}, {"$set": {"foo": "bar"}}) self.assertNotIn("writeConcern", listener.started_events[0].command) - results.clear() + listener.reset() c_default.find_one_and_replace({"_id": 1}, {"foo": "bar"}) self.assertNotIn("writeConcern", listener.started_events[0].command) - results.clear() + listener.reset() c_default.find_one_and_delete({"_id": 1}) self.assertNotIn("writeConcern", listener.started_events[0].command) - results.clear() + listener.reset() def test_find_with_nested(self): c = self.db.test diff --git a/test/test_command_monitoring_legacy.py b/test/test_command_monitoring_legacy.py index be5f4368de..25e34ee580 100644 --- a/test/test_command_monitoring_legacy.py +++ b/test/test_command_monitoring_legacy.py @@ -127,11 +127,13 @@ def run_scenario(self): except OperationFailure: pass - res = self.listener.results + started_events = self.listener.started_events + succeeded_events = self.listener.succeeded_events + failed_events = self.listener.failed_events for expectation in test["expectations"]: event_type = next(iter(expectation)) if event_type == "command_started_event": - event = res["started"][0] if len(res["started"]) else None + event = started_events[0] if len(started_events) else None if event is not None: # The tests substitute 42 for any number other than 0. if event.command_name == "getMore" and event.command["getMore"]: @@ -147,7 +149,7 @@ def run_scenario(self): update.setdefault("upsert", False) update.setdefault("multi", False) elif event_type == "command_succeeded_event": - event = res["succeeded"].pop(0) if len(res["succeeded"]) else None + event = succeeded_events.pop(0) if len(succeeded_events) else None if event is not None: reply = event.reply # The tests substitute 42 for any number other than 0, @@ -171,12 +173,12 @@ def run_scenario(self): reply.pop("cursorsKilled") reply["cursorsUnknown"] = [42] # Found succeeded event. Pop related started event. - res["started"].pop(0) + started_events.pop(0) elif event_type == "command_failed_event": - event = res["failed"].pop(0) if len(res["failed"]) else None + event = failed_events.pop(0) if len(failed_events) else None if event is not None: # Found failed event. Pop related started event. - res["started"].pop(0) + started_events.pop(0) else: self.fail("Unknown event type") @@ -188,7 +190,7 @@ def run_scenario(self): % ( event_name, expectation[event_type]["command_name"], - format_actual_results(res), + format_actual_results(self.listener.events), ) ) diff --git a/test/test_comment.py b/test/test_comment.py index 9bd4f5d976..85e5470d74 100644 --- a/test/test_comment.py +++ b/test/test_comment.py @@ -43,12 +43,11 @@ class TestComment(IntegrationTest): def _test_ops( self, helpers, already_supported, listener, db=Empty(), coll=Empty() # noqa: B008 ): - results = listener.results for h, args in helpers: c = "testing comment with " + h.__name__ with self.subTest("collection-" + h.__name__ + "-comment"): for cc in [c, {"key": c}, ["any", 1]]: - results.clear() + listener.reset() kwargs = {"comment": cc} if h == coll.rename: _ = db.get_collection("temp_temp_temp").drop() @@ -98,7 +97,7 @@ def _test_ops( h.__doc__, ) - results.clear() + listener.reset() @client_context.require_version_min(4, 7, -1) @client_context.require_replica_set diff --git a/test/test_cursor.py b/test/test_cursor.py index 0baa48297e..96d83fecf1 100644 --- a/test/test_cursor.py +++ b/test/test_cursor.py @@ -218,7 +218,6 @@ def test_max_await_time_ms(self): listener = AllowListEventListener("find", "getMore") coll = rs_or_single_client(event_listeners=[listener])[self.db.name].pymongo_test - results = listener.results # Tailable_await defaults. list(coll.find(cursor_type=CursorType.TAILABLE_AWAIT)) @@ -226,7 +225,7 @@ def test_max_await_time_ms(self): self.assertFalse("maxTimeMS" in listener.started_events[0].command) # getMore self.assertFalse("maxTimeMS" in listener.started_events[1].command) - results.clear() + listener.reset() # Tailable_await with max_await_time_ms set. list(coll.find(cursor_type=CursorType.TAILABLE_AWAIT).max_await_time_ms(99)) @@ -237,7 +236,7 @@ def test_max_await_time_ms(self): self.assertEqual("getMore", listener.started_events[1].command_name) self.assertTrue("maxTimeMS" in listener.started_events[1].command) self.assertEqual(99, listener.started_events[1].command["maxTimeMS"]) - results.clear() + listener.reset() # Tailable_await with max_time_ms list(coll.find(cursor_type=CursorType.TAILABLE_AWAIT).max_time_ms(99)) @@ -248,7 +247,7 @@ def test_max_await_time_ms(self): # getMore self.assertEqual("getMore", listener.started_events[1].command_name) self.assertFalse("maxTimeMS" in listener.started_events[1].command) - results.clear() + listener.reset() # Tailable_await with both max_time_ms and max_await_time_ms list(coll.find(cursor_type=CursorType.TAILABLE_AWAIT).max_time_ms(99).max_await_time_ms(99)) @@ -260,7 +259,7 @@ def test_max_await_time_ms(self): self.assertEqual("getMore", listener.started_events[1].command_name) self.assertTrue("maxTimeMS" in listener.started_events[1].command) self.assertEqual(99, listener.started_events[1].command["maxTimeMS"]) - results.clear() + listener.reset() # Non tailable_await with max_await_time_ms list(coll.find(batch_size=1).max_await_time_ms(99)) @@ -270,7 +269,7 @@ def test_max_await_time_ms(self): # getMore self.assertEqual("getMore", listener.started_events[1].command_name) self.assertFalse("maxTimeMS" in listener.started_events[1].command) - results.clear() + listener.reset() # Non tailable_await with max_time_ms list(coll.find(batch_size=1).max_time_ms(99)) @@ -1169,7 +1168,6 @@ def test_close_kills_cursor_synchronously(self): self.client._process_periodic_tasks() listener = AllowListEventListener("killCursors") - results = listener.results client = rs_or_single_client(event_listeners=[listener]) self.addCleanup(client.close) coll = client[self.db.name].test_close_kills_cursors @@ -1178,7 +1176,7 @@ def test_close_kills_cursor_synchronously(self): docs_inserted = 1000 coll.insert_many([{"i": i} for i in range(docs_inserted)]) - results.clear() + listener.reset() # Close a cursor while it's still open on the server. cursor = coll.find().batch_size(10) @@ -1193,7 +1191,7 @@ def assertCursorKilled(): self.assertEqual("killCursors", listener.succeeded_events[0].command_name) assertCursorKilled() - results.clear() + listener.reset() # Close a command cursor while it's still open on the server. cursor = coll.aggregate([], batchSize=10) @@ -1396,7 +1394,6 @@ def test_monitoring(self): # Next raw batch of 4 documents. next(cursor) try: - results = listener.results started = listener.started_events[0] succeeded = listener.succeeded_events[0] self.assertEqual(0, len(listener.failed_events)) @@ -1556,7 +1553,6 @@ def test_monitoring(self): # Batches of 4 documents. n = 0 for batch in cursor: - results = listener.results started = listener.started_events[0] succeeded = listener.succeeded_events[0] self.assertEqual(0, len(listener.failed_events)) diff --git a/test/test_database.py b/test/test_database.py index 01e9fb9ad2..b1b2999df4 100644 --- a/test/test_database.py +++ b/test/test_database.py @@ -193,7 +193,6 @@ def test_list_collection_names(self): def test_list_collection_names_filter(self): listener = OvertCommandListener() - results = listener.results client = rs_or_single_client(event_listeners=[listener]) db = client[self.db.name] db.capped.drop() @@ -204,14 +203,14 @@ def test_list_collection_names_filter(self): filter: Union[None, dict] # Should not send nameOnly. for filter in ({"options.capped": True}, {"options.capped": True, "name": "capped"}): - results.clear() + listener.reset() names = db.list_collection_names(filter=filter) self.assertEqual(names, ["capped"]) self.assertNotIn("nameOnly", listener.started_events[0].command) # Should send nameOnly (except on 2.6). for filter in (None, {}, {"name": {"$in": ["capped", "non_capped"]}}): - results.clear() + listener.reset() names = db.list_collection_names(filter=filter) self.assertIn("capped", names) self.assertIn("non_capped", names) @@ -221,7 +220,6 @@ def test_list_collection_names_filter(self): def test_check_exists(self): listener = OvertCommandListener() - results = listener.results client = rs_or_single_client(event_listeners=[listener]) self.addCleanup(client.close) db = client[self.db.name] From fb8b5caabbf1374ea52b3ca86f427f68e741a834 Mon Sep 17 00:00:00 2001 From: Shane Harvey Date: Fri, 11 Nov 2022 15:15:44 -0800 Subject: [PATCH 4/4] PYTHON-3516 fix test/test_command_monitoring_legacy.py --- test/test_command_monitoring_legacy.py | 14 ++------------ 1 file changed, 2 insertions(+), 12 deletions(-) diff --git a/test/test_command_monitoring_legacy.py b/test/test_command_monitoring_legacy.py index 25e34ee580..1cc3e15cc9 100644 --- a/test/test_command_monitoring_legacy.py +++ b/test/test_command_monitoring_legacy.py @@ -57,16 +57,6 @@ def tearDown(self): self.listener.reset() -def format_actual_results(results): - started = listener.started_events - succeeded = listener.succeeded_events - failed = listener.failed_events - msg = "\nStarted: %r" % (started[0].command if len(started) else None,) - msg += "\nSucceeded: %r" % (succeeded[0].reply if len(succeeded) else None,) - msg += "\nFailed: %r" % (failed[0].failure if len(failed) else None,) - return msg - - def create_test(scenario_def, test): def run_scenario(self): dbname = scenario_def["database_name"] @@ -186,11 +176,11 @@ def run_scenario(self): event_name = event_type.split("_")[1] self.fail( "Expected %s event for %s command. Actual " - "results:%s" + "results:\n%s" % ( event_name, expectation[event_type]["command_name"], - format_actual_results(self.listener.events), + "\n".join(str(e) for e in self.listener.events), ) )