Skip to content

Commit

Permalink
bugfix/unique-temp-filenames (#225)
Browse files Browse the repository at this point in the history
* Add transcription range fields to database and ingestion models, add validator for time duration, add range filter to ffmpeg audio split, update tests

* Add tests for edge case transcription ranges

* Fix end_time=start_time typo, update tests to be more concise, add ffmpeg error logging

* Updated transcription range to video range, updated video handling to host when limiting video to a range, updated mp4 conversion to allow a range, connected mp4 to clip functionality, updated tests and tried to make testing slightly more consistent, added Session ingestion verification to test out

* Update session hash to reflect trimmed video

* Bypass hash task

* Remove unnecessary logging, duration validation comments, elif typo fix in cdp_will_host control structure

* Reverted function parameter doc for split audio

* Improved documentation for video_start_time in ingestion_models

* Use content hash to name videos and prevent collisions across sessions

* Use pathlib functions from 3.8, add type annotations for mock function

* Add return type annotations for mock function

* Lint updates

* Move file renaming to earliest point, return unique names from file conversion functions

* UUID for original file resource copy task so to prevent collisions earlier in the process

* Minor comment change to force CI reprocessing

* Stop renaming resource within task

* Lint

* Flag for adding source suffix in resource copy

* Log file status after copy for debugging

* Logging to debug file copy

* Logging to debug file copy

* Logging to debug file copy

* Logging to debug file copy

* Remove rename
  • Loading branch information
chrisjkhan committed Jan 1, 2023
1 parent b00f378 commit aa8ff15
Show file tree
Hide file tree
Showing 4 changed files with 171 additions and 24 deletions.
14 changes: 11 additions & 3 deletions cdp_backend/pipeline/event_gather_pipeline.py
Expand Up @@ -7,6 +7,7 @@
from operator import attrgetter
from pathlib import Path
from typing import Any, Callable, Dict, List, NamedTuple, Optional, Set, Tuple, Union
from uuid import uuid4

from aiohttp.client_exceptions import ClientResponseError
from fireo.fields.errors import FieldValidationFailed, InvalidFieldType, RequiredField
Expand Down Expand Up @@ -127,8 +128,13 @@ def create_event_gather_flow(
for event in events:
session_processing_results: List[SessionProcessingResult] = []
for session in event.sessions:
# Download video to local copy
resource_copy_filepath = resource_copy_task(uri=session.video_uri)
# Download video to local copy making
# copy unique in case of shared session video
resource_copy_filepath = resource_copy_task(
uri=session.video_uri,
dst=f"{str(uuid4())}_temp",
copy_suffix=True,
)

# Handle video conversion or non-secure resource
# hosting
Expand Down Expand Up @@ -229,7 +235,7 @@ def create_event_gather_flow(


@task(max_retries=3, retry_delay=timedelta(seconds=120))
def resource_copy_task(uri: str) -> str:
def resource_copy_task(uri: str, dst: str = None, copy_suffix: bool = None) -> str:
"""
Copy a file to a temporary location for processing.
Expand All @@ -250,6 +256,8 @@ def resource_copy_task(uri: str) -> str:
"""
return file_utils.resource_copy(
uri=uri,
dst=dst,
copy_suffix=copy_suffix,
overwrite=True,
)

Expand Down
38 changes: 22 additions & 16 deletions cdp_backend/tests/pipeline/test_event_gather_pipeline.py
Expand Up @@ -7,7 +7,7 @@
from pathlib import Path
from typing import List, Optional
from unittest import mock
from unittest.mock import MagicMock
from unittest.mock import MagicMock, patch

import pytest
from prefect import Flow
Expand Down Expand Up @@ -577,6 +577,10 @@ def test_store_event_processing_results(
EXISTING_REMOTE_M3U8_MINIMAL_EVENT.sessions[0].video_uri = EXAMPLE_M3U8_PLAYLIST_URI


def path_rename(self: Path, newPath: Path) -> Path:
return newPath


@mock.patch(f"{PIPELINE_PATH}.fs_functions.upload_file")
@mock.patch(f"{PIPELINE_PATH}.fs_functions.get_open_url_for_gcs_file")
@mock.patch(f"{PIPELINE_PATH}.fs_functions.remove_local_file")
Expand Down Expand Up @@ -633,20 +637,22 @@ def test_convert_video_and_handle_host(
mock_convert_video_to_mp4.return_value = expected_filepath
mock_hash_file_contents.return_value = "abc123"

(
mp4_filepath,
session_video_hosted_url,
session_content_hash,
) = pipeline.convert_video_and_handle_host.run(
video_filepath=video_filepath,
session=session,
credentials_file="fake/credentials.json",
bucket="doesnt://matter",
)
with patch.object(Path, "rename", path_rename):

(
mp4_filepath,
session_video_hosted_url,
session_content_hash,
) = pipeline.convert_video_and_handle_host.run(
video_filepath=video_filepath,
session=session,
credentials_file="fake/credentials.json",
bucket="doesnt://matter",
)

# Make sure mp4 files don't go through conversion
if Path(video_filepath).suffix == ".mp4":
assert not mock_convert_video_to_mp4.called
# Make sure mp4 files don't go through conversion
if Path(video_filepath).suffix == ".mp4":
assert not mock_convert_video_to_mp4.called

assert mp4_filepath == expected_filepath
assert session_video_hosted_url == expected_hosted_video_url
assert mp4_filepath == str(Path(video_filepath).with_suffix(".mp4"))
assert session_video_hosted_url == expected_hosted_video_url
52 changes: 52 additions & 0 deletions cdp_backend/tests/utils/test_file_utils.py
Expand Up @@ -35,6 +35,58 @@
#############################################################################


@pytest.mark.parametrize(
"path, stem, expected_result",
[
(Path("file.ext"), "new", "new.ext"),
],
)
def test_with_stem(path: Path, stem: str, expected_result: str) -> None:
new_path = file_utils.with_stem(path, stem)
assert str(new_path) == expected_result


@pytest.mark.parametrize(
"path, addition, expected_result",
[
(Path("file.ext"), "-new", "file-new.ext"),
],
)
def test_append_to_stem(path: Path, addition: str, expected_result: str) -> None:
new_path = file_utils.append_to_stem(path, addition)
assert str(new_path) == expected_result


@pytest.mark.parametrize(
"path, stem, expected_result",
[
(Path("file.ext"), "new", "new.ext"),
],
)
def test_rename_with_stem(path: Path, stem: str, expected_result: str) -> None:
file = open(path, "w")
file.close()
new_path = file_utils.rename_with_stem(path, stem)
assert str(new_path) == expected_result
assert new_path.exists()
os.remove(new_path)


@pytest.mark.parametrize(
"path, addition, expected_result",
[
(Path("file.ext"), "-new", "file-new.ext"),
],
)
def test_rename_append_to_stem(path: Path, addition: str, expected_result: str) -> None:
file = open(path, "w")
file.close()
new_path = file_utils.rename_append_to_stem(path, addition)
assert str(new_path) == expected_result
assert new_path.exists()
os.remove(new_path)


@pytest.mark.parametrize(
"uri, expected_result",
[
Expand Down
91 changes: 86 additions & 5 deletions cdp_backend/utils/file_utils.py
Expand Up @@ -28,6 +28,82 @@
MAX_THUMBNAIL_WIDTH = 960


def with_stem(path: Path, stem: str) -> Path:
"""
Create a path with a new stem
Parameters
----------
path: Path
The path to alter
stem: str
The string to be the new stem of the path
Returns
-------
path: Path
The new path with the replaced stem
"""
return path.with_name(f"{stem}{path.suffix}")


def append_to_stem(path: Path, addition: str) -> Path:
"""
Rename a file with a string appended to the path stem
Parameters
----------
path: Path
The path to alter
addition: str
The string to be appended to the path stem
Returns
-------
path: Path
The new path with the stem addition
"""
return with_stem(path, f"{path.stem}{addition}")


def rename_with_stem(path: Path, stem: str) -> Path:
"""
Rename a file with a string appended to the path stem
Parameters
----------
path: Path
The path to be renamed
stem: str
The string to become the new stem
Returns
-------
path: Path
The new path of the renamed file
"""
return path.rename(with_stem(path, stem))


def rename_append_to_stem(path: Path, addition: str) -> Path:
"""
Rename a file with a string appended to the path stem
Parameters
----------
path: Path
The path to be renamed
addition: str
The string to be appended to the path stem
Returns
-------
path: Path
The new path of the renamed file
"""
return path.rename(append_to_stem(path, addition))


def get_media_type(uri: str) -> Optional[str]:
"""
Get the IANA media type for the provided URI.
Expand Down Expand Up @@ -69,6 +145,7 @@ def get_media_type(uri: str) -> Optional[str]:
def resource_copy(
uri: str,
dst: Optional[Union[str, Path]] = None,
copy_suffix: Optional[bool] = False,
overwrite: bool = False,
) -> str:
"""
Expand All @@ -90,6 +167,7 @@ def resource_copy(
saved_path: str
The path of where the resource ended up getting copied to.
"""
uri_suffix = Path(uri.split("/")[-1].split("?")[0].split("#")[0]).suffix
if dst is None:
dst = uri.split("/")[-1]

Expand All @@ -103,10 +181,13 @@ def resource_copy(
# Split by the last "/"
dst = dst / uri.split("/")[-1]

if copy_suffix:
dst = dst.with_suffix(uri_suffix)

# Ensure filename is less than 255 chars
# Otherwise this can raise an OSError for too long of a filename
if len(dst.name) > 255:
dst = Path(str(dst)[:255])
dst = with_stem(dst, dst.stem[: (255 - len(dst.suffix))])

# Ensure dest isn't a file
if dst.is_file() and not overwrite:
Expand Down Expand Up @@ -148,6 +229,7 @@ def resource_copy(
# It was added because it's very common for SSL certs to be bad
# See: https://github.com/CouncilDataProject/cdp-scrapers/pull/85
# And: https://github.com/CouncilDataProject/seattle/runs/5957646032

with open(dst, "wb") as open_dst:
open_dst.write(
requests.get(
Expand Down Expand Up @@ -520,7 +602,6 @@ def convert_video_to_mp4(
The end time to trim the video in HH:MM:SS.
output_path: Path
The output path to place the clip at.
Must include a suffix to use for the reformatting.
Returns
-------
Expand Down Expand Up @@ -605,7 +686,7 @@ def clip_and_reformat_video(
video_filepath: Path,
start_time: Optional[str],
end_time: Optional[str],
output_path: Path = Path("clipped.mp4"),
output_path: Path = None,
output_format: str = "mp4",
) -> Path:
"""
Expand All @@ -621,8 +702,6 @@ def clip_and_reformat_video(
The end time of the clip in HH:MM:SS.
output_path: Path
The output path to place the clip at.
Must include a suffix to use for the reformatting.
Default: "clipped.mp4"
output_format: str
The output format.
Default: "mp4"
Expand All @@ -634,6 +713,8 @@ def clip_and_reformat_video(
"""
import ffmpeg

output_path = output_path or append_to_stem(video_filepath, "_clipped")

try:
ffmpeg_stdout, ffmpeg_stderr = (
ffmpeg.input(
Expand Down

0 comments on commit aa8ff15

Please sign in to comment.