diff --git a/.gitignore b/.gitignore index 02f0f6fa..ff9b0a39 100644 --- a/.gitignore +++ b/.gitignore @@ -124,6 +124,8 @@ transcript_model.md test.err test.out tfidf-*.parquet +index/ +index-chunks/ 7490ea6cf56648d60a40dd334e46e5d7de0f31dde0c7ce4d85747896fdd2ab42-* abc123-cdp* *-person-picture diff --git a/cdp_backend/bin/process_cdp_event_index_chunk.py b/cdp_backend/bin/process_cdp_event_index_chunk.py new file mode 100644 index 00000000..0f471146 --- /dev/null +++ b/cdp_backend/bin/process_cdp_event_index_chunk.py @@ -0,0 +1,128 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- + +import argparse +import logging +import sys +import traceback +from pathlib import Path + +from distributed import LocalCluster +from prefect import executors + +from cdp_backend.pipeline import process_event_index_chunk_pipeline as pipeline +from cdp_backend.pipeline.pipeline_config import EventIndexPipelineConfig + +############################################################################### + +logging.basicConfig( + level=logging.INFO, + format="[%(levelname)4s: %(module)s:%(lineno)4s %(asctime)s] %(message)s", +) +log = logging.getLogger(__name__) + +############################################################################### + + +class Args(argparse.Namespace): + def __init__(self) -> None: + self.__parse() + + def __parse(self) -> None: + p = argparse.ArgumentParser( + prog="process_cdp_event_index_chunk", + description=( + "Download a single event index chunk from remote storage, " + "then process it and upload ngrams to a CDP database." + ), + ) + p.add_argument( + "config_file", + type=Path, + help=( + "Path to the pipeline configuration file. " + "See cdp_backend.pipeline.pipeline_config.EventIndexPipelineConfig " + "for more details." + ), + ) + p.add_argument( + "chunk", + type=Path, + help="Filename for the parquet index chunk to process and upload.", + ) + p.add_argument( + "--upload_batch_size", + type=int, + default=500, + help="Number of ngrams to upload to database in a single batch.", + ) + p.add_argument( + "-p", + "--parallel", + action="store_true", + dest="parallel", + help=( + "Boolean option to spin up a local multi-threaded " + "Dask Distributed cluster for event processing." + ), + ) + + p.parse_args(namespace=self) + + +def main() -> None: + try: + args = Args() + with open(args.config_file, "r") as open_resource: + config = EventIndexPipelineConfig.from_json( # type: ignore + open_resource.read() + ) + + # Get flow definition + flow = pipeline.create_event_index_upload_pipeline( + config=config, + index_chunk=args.chunk, + upload_batch_size=args.upload_batch_size, + ) + + # Determine executor + if args.parallel: + # Create local cluster + log.info("Creating LocalCluster") + cluster = LocalCluster(processes=False) + log.info("Created LocalCluster") + + # Set distributed_executor_address + distributed_executor_address = cluster.scheduler_address + + # Log dashboard URI + log.info(f"Dask dashboard available at: {cluster.dashboard_link}") + + # Use dask cluster + state = flow.run( + executor=executors.DaskExecutor(address=distributed_executor_address), + ) + + # Shutdown cluster after run + cluster.close() + + else: + state = flow.run() + + if state.is_failed(): + raise ValueError("Flow run failed.") + + except Exception as e: + log.error("=============================================") + log.error("\n\n" + traceback.format_exc()) + log.error("=============================================") + log.error("\n\n" + str(e) + "\n") + log.error("=============================================") + sys.exit(1) + + +############################################################################### +# Allow caller to directly run this module (usually in development scenarios) + +if __name__ == "__main__": + main() diff --git a/cdp_backend/bin/run_cdp_event_index.py b/cdp_backend/bin/run_cdp_event_index_generation.py similarity index 84% rename from cdp_backend/bin/run_cdp_event_index.py rename to cdp_backend/bin/run_cdp_event_index_generation.py index ff0cb162..79e5cb48 100644 --- a/cdp_backend/bin/run_cdp_event_index.py +++ b/cdp_backend/bin/run_cdp_event_index_generation.py @@ -10,7 +10,7 @@ from distributed import LocalCluster from prefect import executors -from cdp_backend.pipeline import event_index_pipeline as pipeline +from cdp_backend.pipeline import generate_event_index_pipeline as pipeline from cdp_backend.pipeline.pipeline_config import EventIndexPipelineConfig ############################################################################### @@ -30,7 +30,7 @@ def __init__(self) -> None: def __parse(self) -> None: p = argparse.ArgumentParser( - prog="run_cdp_event_index", + prog="run_cdp_event_index_generation", description=( "Index all event (session) transcripts from a CDP infrastructure." ), @@ -52,11 +52,19 @@ def __parse(self) -> None: help="N number of terms to act as a unique entity.", ) p.add_argument( - "-l", - "--store_local", + "--ngrams_per_chunk", + type=int, + default=50_000, + help="Number of ngrams to store in a single chunk file.", + ) + p.add_argument( + "-s", + "--store_remote", action="store_true", + dest="store_remote", help=( - "Should the pipeline store the generated index to a local parquet file." + "Store chunks to remote cloud storage. " + "Required to add a search index." ), ) p.add_argument( @@ -82,10 +90,11 @@ def main() -> None: ) # Get flow definition - flow = pipeline.create_event_index_pipeline( + flow = pipeline.create_event_index_generation_pipeline( config=config, n_grams=args.n_grams, - store_local=args.store_local, + ngrams_per_chunk=args.ngrams_per_chunk, + store_remote=args.store_remote, ) # Determine executor diff --git a/cdp_backend/file_store/functions.py b/cdp_backend/file_store/functions.py index ce013a5f..0d63413d 100644 --- a/cdp_backend/file_store/functions.py +++ b/cdp_backend/file_store/functions.py @@ -126,6 +126,17 @@ def upload_file( return save_url +def download_file( + credentials_file: str, + bucket: str, + remote_filepath: str, + save_path: str, +) -> str: + fs = initialize_gcs_file_system(credentials_file) + fs.get(f"{bucket}/{remote_filepath}", save_path) + return save_path + + def get_open_url_for_gcs_file(credentials_file: str, uri: str) -> str: """ Simple wrapper around fsspec.FileSystem.url function for creating a connection diff --git a/cdp_backend/pipeline/event_index_pipeline.py b/cdp_backend/pipeline/generate_event_index_pipeline.py similarity index 80% rename from cdp_backend/pipeline/event_index_pipeline.py rename to cdp_backend/pipeline/generate_event_index_pipeline.py index 831a893b..881004cd 100644 --- a/cdp_backend/pipeline/event_index_pipeline.py +++ b/cdp_backend/pipeline/generate_event_index_pipeline.py @@ -3,13 +3,13 @@ import logging import math +import shutil from dataclasses import dataclass from datetime import datetime from pathlib import Path from tempfile import TemporaryDirectory from typing import Dict, List, NamedTuple, Tuple -import fireo import pandas as pd import pytz import rapidfuzz @@ -21,10 +21,13 @@ from ..database import functions as db_functions from ..database import models as db_models +from ..file_store import functions as fs_functions from ..utils import string_utils from .pipeline_config import EventIndexPipelineConfig from .transcript_model import Sentence, Transcript +REMOTE_INDEX_CHUNK_DIR = "index-chunks" + ############################################################################### log = logging.getLogger(__name__) @@ -343,99 +346,53 @@ def compute_tfidf( @task -def store_local_index(n_grams_df: pd.DataFrame, n_grams: int) -> None: - n_grams_df.to_parquet(f"tfidf-{n_grams}.parquet") - - -class AlmostCompleteIndexedEventGram(NamedTuple): - event_id: str - unstemmed_gram: str - stemmed_gram: str - context_span: str - value: float - datetime_weighted_value: float - - -@task -def chunk_n_grams( +def chunk_index( n_grams_df: pd.DataFrame, -) -> List[List[AlmostCompleteIndexedEventGram]]: - """ - Split the large n_grams dataframe into multiple lists of IndexedEventGram models - for batched, mapped, upload. - """ - # Split single large dataframe into many dataframes - chunk_size = 400 - n_grams_dfs = [ - n_grams_df[i : i + chunk_size] - for i in range(0, n_grams_df.shape[0], chunk_size) - ] - - # Convert each dataframe into a list of indexed event gram - event_gram_chunks: List[List[AlmostCompleteIndexedEventGram]] = [] - for n_gram_df_chunk in n_grams_dfs: - event_gram_chunk: List[AlmostCompleteIndexedEventGram] = [] - for _, row in n_gram_df_chunk.iterrows(): - event_gram_chunk.append( - AlmostCompleteIndexedEventGram( - event_id=row.event_id, - unstemmed_gram=row.unstemmed_gram, - stemmed_gram=row.stemmed_gram, - context_span=row.context_span, - value=row.tfidf, - datetime_weighted_value=row.datetime_weighted_tfidf, - ) - ) - - event_gram_chunks.append(event_gram_chunk) - - return event_gram_chunks - - -@task -def store_n_gram_chunk( - n_gram_chunk: List[AlmostCompleteIndexedEventGram], + n_grams: int, credentials_file: str, + bucket_name: str, + ngrams_per_chunk: int = 50_000, + storage_dir: Path = Path("index/"), + store_remote: bool = False, ) -> None: """ - Write all IndexedEventGrams in a single batch. + Split the large n_grams dataframe into multiple lists of IndexedEventGram models + for batched, mapped, upload. - This isn't about an atomic batch but reducing the total upload time. + Optionally store to cloud firestore. """ - # Init batch - batch = fireo.batch() - - # Trigger upserts for all items - event_lut: Dict[str, db_models.Event] = {} - for almost_complete_ieg in n_gram_chunk: - if almost_complete_ieg.event_id not in event_lut: - event_lut[almost_complete_ieg.event_id] = db_models.Event.collection.get( - f"{db_models.Event.collection_name}/{almost_complete_ieg.event_id}" - ) + # Clean the storage dir + storage_dir = Path(storage_dir) + if storage_dir.exists(): + shutil.rmtree(storage_dir) - # Construct true ieg - ieg = db_models.IndexedEventGram() - ieg.event_ref = event_lut[almost_complete_ieg.event_id] - ieg.unstemmed_gram = almost_complete_ieg.unstemmed_gram - ieg.stemmed_gram = almost_complete_ieg.stemmed_gram - ieg.context_span = almost_complete_ieg.context_span - ieg.value = almost_complete_ieg.value - ieg.datetime_weighted_value = almost_complete_ieg.datetime_weighted_value - - db_functions.upload_db_model( - db_model=ieg, - credentials_file=credentials_file, - batch=batch, - ) + # Create storage dir + storage_dir.mkdir(parents=True) - # Commit - batch.commit() + # Split single large dataframe into many dataframes + for chunk_index, chunk_offset in enumerate( + range(0, n_grams_df.shape[0], ngrams_per_chunk) + ): + n_grams_chunk = n_grams_df[chunk_offset : chunk_offset + ngrams_per_chunk] + save_filename = f"n_gram-{n_grams}--index_chunk-{chunk_index}.parquet" + local_chunk_path = storage_dir / save_filename + n_grams_chunk.to_parquet(local_chunk_path) + + # Optional remote storage + if store_remote: + fs_functions.upload_file( + credentials_file=credentials_file, + bucket=bucket_name, + filepath=str(local_chunk_path), + save_name=f"{REMOTE_INDEX_CHUNK_DIR}/{save_filename}", + ) -def create_event_index_pipeline( +def create_event_index_generation_pipeline( config: EventIndexPipelineConfig, n_grams: int = 1, - store_local: bool = False, + ngrams_per_chunk: int = 50_000, + store_remote: bool = False, ) -> Flow: """ Create the Prefect Flow object to preview, run, or visualize for indexing @@ -447,11 +404,12 @@ def create_event_index_pipeline( Configuration options for the pipeline. n_grams: int N number of terms to act as a unique entity. Default: 1 - store_local: bool - Should the generated index be stored locally to disk or uploaded to database. - Storing the local index is useful for testing search result rankings with the - `search_cdp_events` bin script. - Default: False (store to database) + ngrams_per_chunks: int + The number of ngrams to store in a single chunk file. + Default: 50_000 + store_remote: bool + Should the generated index chunks be sent to cloud storage. + Default: False (only store locally) Returns ------- @@ -507,16 +465,15 @@ def create_event_index_pipeline( datetime_weighting_days_decay=config.datetime_weighting_days_decay, ) - # Route to local storage task or remote bulk upload - if store_local: - store_local_index(n_grams_df=scored_n_grams, n_grams=n_grams) - - # Route to remote database storage - else: - chunked_scored_n_grams = chunk_n_grams(scored_n_grams) - store_n_gram_chunk.map( - n_gram_chunk=chunked_scored_n_grams, - credentials_file=unmapped(config.google_credentials_file), - ) + # Create index chunks and store local and optional remote + chunk_index( + n_grams_df=scored_n_grams, + n_grams=n_grams, + credentials_file=config.google_credentials_file, + bucket_name=config.validated_gcs_bucket_name, + ngrams_per_chunk=ngrams_per_chunk, + storage_dir=config.local_storage_dir, + store_remote=store_remote, + ) return flow diff --git a/cdp_backend/pipeline/pipeline_config.py b/cdp_backend/pipeline/pipeline_config.py index e29fd135..3eebe2ee 100644 --- a/cdp_backend/pipeline/pipeline_config.py +++ b/cdp_backend/pipeline/pipeline_config.py @@ -3,7 +3,8 @@ import json from dataclasses import dataclass, field -from typing import Optional +from pathlib import Path +from typing import Optional, Union from dataclasses_json import dataclass_json from gcsfs import GCSFileSystem @@ -95,6 +96,9 @@ class EventIndexPipelineConfig: The number of days that grams from an event should be labeled as more relevant. Default: 30 (grams from events less than 30 days old will generally be valued higher than their pure relevance score) + local_storage_dir: Union[str, Path] + The local storage directory to store the chunked index prior to upload. + Default: "index/" (current working directory / index) """ google_credentials_file: str @@ -105,6 +109,7 @@ class EventIndexPipelineConfig: default=None, ) datetime_weighting_days_decay: int = 30 + local_storage_dir: Union[str, Path] = "index/" @property def validated_gcs_bucket_name(self) -> str: diff --git a/cdp_backend/pipeline/process_event_index_chunk_pipeline.py b/cdp_backend/pipeline/process_event_index_chunk_pipeline.py new file mode 100644 index 00000000..f8ea6d82 --- /dev/null +++ b/cdp_backend/pipeline/process_event_index_chunk_pipeline.py @@ -0,0 +1,164 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- + +from pathlib import Path +from typing import Dict, List, NamedTuple, Union + +import fireo +import pandas as pd +from prefect import Flow, task, unmapped + +from ..database import functions as db_functions +from ..database import models as db_models +from ..file_store import functions as fs_functions +from .generate_event_index_pipeline import REMOTE_INDEX_CHUNK_DIR +from .pipeline_config import EventIndexPipelineConfig + +############################################################################### + + +class AlmostCompleteIndexedEventGram(NamedTuple): + event_id: str + unstemmed_gram: str + stemmed_gram: str + context_span: str + value: float + datetime_weighted_value: float + + +@task +def pull_chunk( + credentials_file: str, + bucket_name: str, + filename: str, +) -> str: + return fs_functions.download_file( + credentials_file=credentials_file, + bucket=bucket_name, + remote_filepath=filename, + save_path=filename, + ) + + +@task +def chunk_n_grams( + chunk_path: str, + upload_batch_size: int = 500, +) -> List[List[AlmostCompleteIndexedEventGram]]: + """ + Split the large n_grams dataframe into multiple lists of IndexedEventGram models + for batched, mapped, upload. + """ + # Read index chunk + n_grams_df = pd.read_parquet(chunk_path) + + # Split single large dataframe into many dataframes + n_grams_dfs = [ + n_grams_df[i : i + upload_batch_size] + for i in range(0, n_grams_df.shape[0], upload_batch_size) + ] + + # Convert each dataframe into a list of indexed event gram + event_gram_chunks: List[List[AlmostCompleteIndexedEventGram]] = [] + for n_gram_df_chunk in n_grams_dfs: + event_gram_chunk: List[AlmostCompleteIndexedEventGram] = [] + for _, row in n_gram_df_chunk.iterrows(): + event_gram_chunk.append( + AlmostCompleteIndexedEventGram( + event_id=row.event_id, + unstemmed_gram=row.unstemmed_gram, + stemmed_gram=row.stemmed_gram, + context_span=row.context_span, + value=row.tfidf, + datetime_weighted_value=row.datetime_weighted_tfidf, + ) + ) + + event_gram_chunks.append(event_gram_chunk) + + return event_gram_chunks + + +@task +def store_n_gram_chunk( + n_gram_chunk: List[AlmostCompleteIndexedEventGram], + credentials_file: str, +) -> None: + """ + Write all IndexedEventGrams in a single batch. + + This isn't about an atomic batch but reducing the total upload time. + """ + # Init batch + batch = fireo.batch() + + # Trigger upserts for all items + event_lut: Dict[str, db_models.Event] = {} + for almost_complete_ieg in n_gram_chunk: + if almost_complete_ieg.event_id not in event_lut: + event_lut[almost_complete_ieg.event_id] = db_models.Event.collection.get( + f"{db_models.Event.collection_name}/{almost_complete_ieg.event_id}" + ) + + # Construct true ieg + ieg = db_models.IndexedEventGram() + ieg.event_ref = event_lut[almost_complete_ieg.event_id] + ieg.unstemmed_gram = almost_complete_ieg.unstemmed_gram + ieg.stemmed_gram = almost_complete_ieg.stemmed_gram + ieg.context_span = almost_complete_ieg.context_span + ieg.value = almost_complete_ieg.value + ieg.datetime_weighted_value = almost_complete_ieg.datetime_weighted_value + + db_functions.upload_db_model( + db_model=ieg, + credentials_file=credentials_file, + batch=batch, + ) + + # Commit + batch.commit() + + +def create_event_index_upload_pipeline( + config: EventIndexPipelineConfig, + index_chunk: Union[str, Path], + upload_batch_size: int = 500, +) -> Flow: + """ + Create the Prefect Flow object to preview, run, or visualize for uploading a + generated index for all events in the database. + + Parameters + ---------- + config: EventIndexPipelineConfig + Configuration options for the pipeline. + n_grams: int + N number of terms to act as a unique entity. Default: 1 + upload_batch_size: int + Number of ngrams to upload to database in a single batch. + Default: 500 (max) + + Returns + ------- + flow: Flow + The constructed CDP Event Index Pipeline as a Prefect Flow. + """ + with Flow("CDP Event Index Pipeline") as flow: + # Pull the file + index_chunk_local = pull_chunk( + credentials_file=config.google_credentials_file, + bucket_name=config.validated_gcs_bucket_name, + filename=f"{REMOTE_INDEX_CHUNK_DIR}/{index_chunk}", + ) + + # Route to remote database storage + chunked_scored_n_grams = chunk_n_grams( + index_chunk_local, + upload_batch_size=upload_batch_size, + ) + store_n_gram_chunk.map( + n_gram_chunk=chunked_scored_n_grams, + credentials_file=unmapped(config.google_credentials_file), + ) + + return flow diff --git a/cdp_backend/tests/pipeline/test_event_index_pipeline.py b/cdp_backend/tests/pipeline/test_event_index_pipeline.py index 289b8af0..c23ad996 100644 --- a/cdp_backend/tests/pipeline/test_event_index_pipeline.py +++ b/cdp_backend/tests/pipeline/test_event_index_pipeline.py @@ -17,7 +17,7 @@ from cdp_backend.database import functions as db_functions from cdp_backend.database import models as db_models -from cdp_backend.pipeline import event_index_pipeline as pipeline +from cdp_backend.pipeline import generate_event_index_pipeline as pipeline from cdp_backend.pipeline.pipeline_config import EventIndexPipelineConfig from cdp_backend.utils.file_utils import resource_copy @@ -34,18 +34,22 @@ # # great system stdlib :upsidedownface: -PIPELINE_PATH = "cdp_backend.pipeline.event_index_pipeline" +PIPELINE_PATH = "cdp_backend.pipeline.generate_event_index_pipeline" ############################################################################# +@mock.patch("gcsfs.credentials.GoogleCredentials.connect") +@mock.patch(f"{PIPELINE_PATH}.EventIndexPipelineConfig.validated_gcs_bucket_name") @pytest.mark.parametrize("n_grams", [1, 2, 3]) -@pytest.mark.parametrize("store_local", [True, False]) -def test_create_event_index_flow(n_grams: int, store_local: bool) -> None: - flow = pipeline.create_event_index_pipeline( +def test_create_event_index_flow( + mocked_validated_bucket_name: MagicMock, + mocked_gcs_connect: MagicMock, + n_grams: int, +) -> None: + flow = pipeline.create_event_index_generation_pipeline( config=EventIndexPipelineConfig("/fake/creds.json", "doesn't-matter"), n_grams=n_grams, - store_local=store_local, ) assert isinstance(flow, Flow) @@ -227,9 +231,11 @@ def test_get_transcripts_per_event( @mock.patch(f"{PIPELINE_PATH}.get_transcripts.run") @mock.patch("gcsfs.credentials.GoogleCredentials.connect") +@mock.patch(f"{PIPELINE_PATH}.EventIndexPipelineConfig.validated_gcs_bucket_name") @mock.patch("gcsfs.GCSFileSystem.get") def test_mocked_pipeline_run( mocked_file_get: MagicMock, + mocked_validated_bucket_name: MagicMock, mocked_gcs_connect: MagicMock, mocked_get_transcript_models: MagicMock, resources_dir: Path, @@ -250,6 +256,8 @@ def test_mocked_pipeline_run( session_three_transcript_one, ] + mocked_validated_bucket_name.return_value = "doesn't-matter" + def copy_test_file(rpath: str, lpath: str) -> None: if "fake_captions.json" in rpath: resource_copy( @@ -270,17 +278,16 @@ def copy_test_file(rpath: str, lpath: str) -> None: mocked_file_get.side_effect = copy_test_file # Run pipeline to local storage - flow = pipeline.create_event_index_pipeline( + flow = pipeline.create_event_index_generation_pipeline( config=EventIndexPipelineConfig("/fake/creds.json", "doesn't-matter"), n_grams=1, - store_local=True, ) state = flow.run() assert state.is_successful() # Compare produced index expected_values = pd.read_parquet(resources_dir / "expected_1_gram_index.parquet") - result_values = pd.read_parquet("tfidf-1.parquet") + result_values = pd.read_parquet("index/n_gram-1--index_chunk-0.parquet") # Sort dataframes and reset indices to ensure consistency expected_values = expected_values.sort_values(by="stemmed_gram").reset_index( @@ -306,4 +313,4 @@ def copy_test_file(rpath: str, lpath: str) -> None: pd._testing.assert_frame_equal(result_values, expected_values) # Cleanup - os.remove("tfidf-1.parquet") + os.remove("index/n_gram-1--index_chunk-0.parquet") diff --git a/setup.py b/setup.py index 65b92464..5254c99e 100644 --- a/setup.py +++ b/setup.py @@ -29,6 +29,7 @@ "nltk~=3.6", "pandas~=1.0", "prefect~=1.2", + "pyarrow~=8.0", "rapidfuzz~=2.0", "spacy~=3.0", "truecase==0.0.14", @@ -46,7 +47,6 @@ "isort>=5.7.0", "mypy>=0.790", "networkx>=2.5", - "pyarrow>=5.0", "pydot>=1.4", "pytest>=5.4.3", "pytest-cov>=2.9.0", @@ -129,7 +129,14 @@ "cdp_backend.bin.create_cdp_event_gather_flow_viz:main" ), "run_cdp_event_gather=cdp_backend.bin.run_cdp_event_gather:main", - "run_cdp_event_index=cdp_backend.bin.run_cdp_event_index:main", + ( + "run_cdp_event_index_generation=" + "cdp_backend.bin.run_cdp_event_index_generation:main" + ), + ( + "process_cdp_event_index_chunk=" + "cdp_backend.bin.process_cdp_event_index_chunk:main" + ), "search_cdp_events=cdp_backend.bin.search_cdp_events:main", "process_special_event=cdp_backend.bin.process_special_event:main", (