-
Notifications
You must be signed in to change notification settings - Fork 0
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
DSEGOG-316 Switch between parallel and sequential ingestion for Echo ingest script #120
base: DSEGOG-305-reingest-data
Are you sure you want to change the base?
DSEGOG-316 Switch between parallel and sequential ingestion for Echo ingest script #120
Conversation
…to sequential ingestion
- From 6.0, `mongosh` replaced `mongo`
- This fixes a failure in one of the export tests that occurred when the ingestion script was run in parallel (meaning files weren't ingested in chronological order
- This is useful to prevent a race condition when the S3 bucket is empty and the script might ingest the channel manifest file before dropping that database collection
Codecov ReportAll modified and coverable lines are covered by tests ✅
Additional details and impacted files@@ Coverage Diff @@
## DSEGOG-305-reingest-data #120 +/- ##
=========================================================
Coverage 94.99% 94.99%
=========================================================
Files 50 50
Lines 2837 2838 +1
Branches 297 297
=========================================================
+ Hits 2695 2696 +1
Misses 105 105
Partials 37 37 ☔ View full report in Codecov by Sentry. |
…ay through - This will be useful for ingestion onto the dev server
cabc074
to
315bae2
Compare
Making this PR ready to review as I've finished the ingestion and no further changes were made. |
DSEGOG-319 Expose Header
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
One comment on the possible use of Enums in Pydantic, the rest are a less requested changes and more due to me not being confident I've understood the ingest code. It might be that things work as they are or there's a reason why it needs to be like this (in which case maybe a comment or two might help me or someone else understand this).
ingest_mode: str | ||
file_to_restart_ingestion: Optional[str] | ||
|
||
@field_validator("ingest_mode") | ||
@classmethod | ||
def check_ingest_mode(cls, v: str) -> str: | ||
if v not in ["parallel", "sequential"]: | ||
sys.exit( | ||
f"ingest_mode must contain 'parallel' or 'sequential', found: '{v}'", | ||
) | ||
else: | ||
return v |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could we use Enum
here to avoid needing to write a custom validator (I would suggest StrEnum
but I think that's only Python3.11)? There's an example in the Pydantic docs: https://docs.pydantic.dev/2.7/api/standard_library_types/#enum
@@ -24,6 +39,8 @@ def main(): | |||
ssh.drop_database_collections(collection_names) | |||
else: | |||
local_commands.drop_database_collections(collection_names) | |||
# Give the drops a chance to complete before adding data | |||
sleep(5) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was wondering if we could do a while
loop and actually check the state of the DB to only break
when the collections are dropped, but seeing as this is "just" an ingest script a hardcoded wait is probably fine as is. Might be something to consider if this ever becomes a problem again (e.g. if 5 seconds turns out to be too short then maybe we could do something more rigorous rather than just bumping to 10).
if og_api.process: | ||
t = threading.Thread( | ||
target=APIStarter.clear_buffers, | ||
args=(og_api.process,), | ||
) | ||
t.start() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think I know what we're doing here. clear_buffers
seems to be for print
ing the stdout/err from og_api.process
to the terminal, but if we reach this else
block then we didn't actually do anything with og_api.process? Since that's only used on lines 99 and 101 (i.e. in the if
block).
I'm not familiar with thread pools in Python so maybe I'm misreading it and this does make sense to someone who knows what they're looking at (in which case maybe a code comment to help novices like me?)
@@ -64,13 +81,34 @@ def main(): | |||
hdf_page_iterator = echo.paginate_hdf_data() | |||
total_ingestion_start_time = time() | |||
|
|||
last_successful_file = Config.config.script_options.file_to_restart_ingestion | |||
ingestion_restarted = False if last_successful_file else True |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's really minor but the variable name here confuses me a bit, i.e. ingestion_restarted
is True
if I didn't provide a file_to_restart_ingestion
. I would expect the opposite, that giving a file to restart from means that I have "restarted ingestion".
After reading the other functionality, I can see that this is a because when no file is set we can start ingesting straight away, but if a file is set we need to find it before we can (re-)start ingesting. In which case, could we name this something like start_ingestion
or ingestion_started
? Obviously it's personal preference but that would have helped me understand this code better I think.
|
||
if ingestion_restarted: | ||
if Config.config.script_options.ingest_mode == "sequential": | ||
for name in object_names: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If last_successful_file
is set then we skip the page
, which makes sense. But we iterate over all the object_names
- should we skip these as well until we find last_successful_file
? Is there a guarantee that last_successful_file
is the first entry in the page?
Maybe something like:
object_names = object_names[object_names.index(last_successful_file):]
between lines 92 and 93?
This PR adds some config that switches between sequential and parallel ingestion. As such, I've added the parallel ingestion back in that existed in previous versions of this script. The intention is to use sequential for GitHub Actions and use parallel on large cloud VMs for ingestion of the simulated data on the dev server.
There's a couple of other changes I've made:
mongo
withmongosh
- from 6.0, themongo
command disappeared so usingmongosh
is a solution to support more MongoDB versions (at least 5.0-7.0, if not more)_id ASC
) on data export when no order is provided in the request. When I tested parallel ingestion on GitHub Action, one of the export tests failed because the nature of parallel ingestion means files aren't always ingested in chronological order so the order of export could change a bit.