From f8565971a76f8a6b787ab3ff5dfea8a49544e9f3 Mon Sep 17 00:00:00 2001 From: JacksonMaxfield Date: Wed, 2 Mar 2022 16:33:50 -0800 Subject: [PATCH 01/15] Always upload 500 grams at a time --- cdp_backend/pipeline/event_index_pipeline.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cdp_backend/pipeline/event_index_pipeline.py b/cdp_backend/pipeline/event_index_pipeline.py index 831a893b..d126c5b8 100644 --- a/cdp_backend/pipeline/event_index_pipeline.py +++ b/cdp_backend/pipeline/event_index_pipeline.py @@ -365,7 +365,7 @@ def chunk_n_grams( for batched, mapped, upload. """ # Split single large dataframe into many dataframes - chunk_size = 400 + chunk_size = 500 n_grams_dfs = [ n_grams_df[i : i + chunk_size] for i in range(0, n_grams_df.shape[0], chunk_size) From ac3a4e1cf27292052b41f9455bb4e705dfc61562 Mon Sep 17 00:00:00 2001 From: JacksonMaxfield Date: Thu, 3 Mar 2022 16:06:21 -0800 Subject: [PATCH 02/15] Ignore all files in index/ --- .gitignore | 1 + 1 file changed, 1 insertion(+) diff --git a/.gitignore b/.gitignore index 02f0f6fa..469c26df 100644 --- a/.gitignore +++ b/.gitignore @@ -124,6 +124,7 @@ transcript_model.md test.err test.out tfidf-*.parquet +index/ 7490ea6cf56648d60a40dd334e46e5d7de0f31dde0c7ce4d85747896fdd2ab42-* abc123-cdp* *-person-picture From 539f706b32a7f898010cc15b25c4fcf5c9dcb055 Mon Sep 17 00:00:00 2001 From: JacksonMaxfield Date: Thu, 3 Mar 2022 16:06:42 -0800 Subject: [PATCH 03/15] Reorg index generation from upload --- ...x.py => run_cdp_event_index_generation.py} | 13 +- ...ne.py => generate_event_index_pipeline.py} | 122 ++++------------ cdp_backend/pipeline/pipeline_config.py | 7 +- .../pipeline/upload_event_index_pipeline.py | 134 ++++++++++++++++++ .../pipeline/test_event_index_pipeline.py | 8 +- 5 files changed, 170 insertions(+), 114 deletions(-) rename cdp_backend/bin/{run_cdp_event_index.py => run_cdp_event_index_generation.py} (89%) rename cdp_backend/pipeline/{event_index_pipeline.py => generate_event_index_pipeline.py} (80%) create mode 100644 cdp_backend/pipeline/upload_event_index_pipeline.py diff --git a/cdp_backend/bin/run_cdp_event_index.py b/cdp_backend/bin/run_cdp_event_index_generation.py similarity index 89% rename from cdp_backend/bin/run_cdp_event_index.py rename to cdp_backend/bin/run_cdp_event_index_generation.py index 0cda1334..710cd927 100644 --- a/cdp_backend/bin/run_cdp_event_index.py +++ b/cdp_backend/bin/run_cdp_event_index_generation.py @@ -10,7 +10,7 @@ from distributed import LocalCluster from prefect import executors -from cdp_backend.pipeline import event_index_pipeline as pipeline +from cdp_backend.pipeline import generate_event_index_pipeline as pipeline from cdp_backend.pipeline.pipeline_config import EventIndexPipelineConfig ############################################################################### @@ -51,14 +51,6 @@ def __parse(self) -> None: default=1, help="N number of terms to act as a unique entity.", ) - p.add_argument( - "-l", - "--store_local", - action="store_true", - help=( - "Should the pipeline store the generated index to a local parquet file." - ), - ) p.add_argument( "-p", "--parallel", @@ -82,10 +74,9 @@ def main() -> None: ) # Get flow definition - flow = pipeline.create_event_index_pipeline( + flow = pipeline.create_event_index_generation_pipeline( config=config, n_grams=args.n_grams, - store_local=args.store_local, ) # Determine executor diff --git a/cdp_backend/pipeline/event_index_pipeline.py b/cdp_backend/pipeline/generate_event_index_pipeline.py similarity index 80% rename from cdp_backend/pipeline/event_index_pipeline.py rename to cdp_backend/pipeline/generate_event_index_pipeline.py index d126c5b8..6764cc32 100644 --- a/cdp_backend/pipeline/event_index_pipeline.py +++ b/cdp_backend/pipeline/generate_event_index_pipeline.py @@ -3,13 +3,13 @@ import logging import math +import shutil from dataclasses import dataclass from datetime import datetime from pathlib import Path from tempfile import TemporaryDirectory from typing import Dict, List, NamedTuple, Tuple -import fireo import pandas as pd import pytz import rapidfuzz @@ -60,7 +60,7 @@ def get_transcripts(credentials_file: str) -> List[db_models.Transcript]: return db_functions.get_all_of_collection( db_model=db_models.Transcript, credentials_file=credentials_file, - ) + )[:20] @task @@ -343,99 +343,35 @@ def compute_tfidf( @task -def store_local_index(n_grams_df: pd.DataFrame, n_grams: int) -> None: - n_grams_df.to_parquet(f"tfidf-{n_grams}.parquet") - - -class AlmostCompleteIndexedEventGram(NamedTuple): - event_id: str - unstemmed_gram: str - stemmed_gram: str - context_span: str - value: float - datetime_weighted_value: float - - -@task -def chunk_n_grams( +def chunk_index( n_grams_df: pd.DataFrame, -) -> List[List[AlmostCompleteIndexedEventGram]]: + n_grams: int, + storage_dir: Path = Path("index/"), +) -> None: """ Split the large n_grams dataframe into multiple lists of IndexedEventGram models for batched, mapped, upload. """ - # Split single large dataframe into many dataframes - chunk_size = 500 - n_grams_dfs = [ - n_grams_df[i : i + chunk_size] - for i in range(0, n_grams_df.shape[0], chunk_size) - ] - - # Convert each dataframe into a list of indexed event gram - event_gram_chunks: List[List[AlmostCompleteIndexedEventGram]] = [] - for n_gram_df_chunk in n_grams_dfs: - event_gram_chunk: List[AlmostCompleteIndexedEventGram] = [] - for _, row in n_gram_df_chunk.iterrows(): - event_gram_chunk.append( - AlmostCompleteIndexedEventGram( - event_id=row.event_id, - unstemmed_gram=row.unstemmed_gram, - stemmed_gram=row.stemmed_gram, - context_span=row.context_span, - value=row.tfidf, - datetime_weighted_value=row.datetime_weighted_tfidf, - ) - ) - - event_gram_chunks.append(event_gram_chunk) + # Clean the storage dir + storage_dir = Path(storage_dir) + if storage_dir.exists(): + shutil.rmtree(storage_dir) - return event_gram_chunks + # Create storage dir + storage_dir.mkdir(parents=True) - -@task -def store_n_gram_chunk( - n_gram_chunk: List[AlmostCompleteIndexedEventGram], - credentials_file: str, -) -> None: - """ - Write all IndexedEventGrams in a single batch. - - This isn't about an atomic batch but reducing the total upload time. - """ - # Init batch - batch = fireo.batch() - - # Trigger upserts for all items - event_lut: Dict[str, db_models.Event] = {} - for almost_complete_ieg in n_gram_chunk: - if almost_complete_ieg.event_id not in event_lut: - event_lut[almost_complete_ieg.event_id] = db_models.Event.collection.get( - f"{db_models.Event.collection_name}/{almost_complete_ieg.event_id}" - ) - - # Construct true ieg - ieg = db_models.IndexedEventGram() - ieg.event_ref = event_lut[almost_complete_ieg.event_id] - ieg.unstemmed_gram = almost_complete_ieg.unstemmed_gram - ieg.stemmed_gram = almost_complete_ieg.stemmed_gram - ieg.context_span = almost_complete_ieg.context_span - ieg.value = almost_complete_ieg.value - ieg.datetime_weighted_value = almost_complete_ieg.datetime_weighted_value - - db_functions.upload_db_model( - db_model=ieg, - credentials_file=credentials_file, - batch=batch, + # Split single large dataframe into many dataframes + chunk_size = 100_000 + for i in range(0, n_grams_df.shape[0], chunk_size): + n_grams_chunk = n_grams_df[i : i + chunk_size] + n_grams_chunk.to_parquet( + storage_dir / f"n_gram-{n_grams}-index_chunk_{i}.parquet" ) - # Commit - batch.commit() - -def create_event_index_pipeline( +def create_event_index_generation_pipeline( config: EventIndexPipelineConfig, n_grams: int = 1, - store_local: bool = False, ) -> Flow: """ Create the Prefect Flow object to preview, run, or visualize for indexing @@ -447,11 +383,6 @@ def create_event_index_pipeline( Configuration options for the pipeline. n_grams: int N number of terms to act as a unique entity. Default: 1 - store_local: bool - Should the generated index be stored locally to disk or uploaded to database. - Storing the local index is useful for testing search result rankings with the - `search_cdp_events` bin script. - Default: False (store to database) Returns ------- @@ -507,16 +438,11 @@ def create_event_index_pipeline( datetime_weighting_days_decay=config.datetime_weighting_days_decay, ) - # Route to local storage task or remote bulk upload - if store_local: - store_local_index(n_grams_df=scored_n_grams, n_grams=n_grams) - # Route to remote database storage - else: - chunked_scored_n_grams = chunk_n_grams(scored_n_grams) - store_n_gram_chunk.map( - n_gram_chunk=chunked_scored_n_grams, - credentials_file=unmapped(config.google_credentials_file), - ) + chunk_index( + n_grams_df=scored_n_grams, + n_grams=n_grams, + storage_dir=config.local_storage_dir, + ) return flow diff --git a/cdp_backend/pipeline/pipeline_config.py b/cdp_backend/pipeline/pipeline_config.py index e29fd135..3eebe2ee 100644 --- a/cdp_backend/pipeline/pipeline_config.py +++ b/cdp_backend/pipeline/pipeline_config.py @@ -3,7 +3,8 @@ import json from dataclasses import dataclass, field -from typing import Optional +from pathlib import Path +from typing import Optional, Union from dataclasses_json import dataclass_json from gcsfs import GCSFileSystem @@ -95,6 +96,9 @@ class EventIndexPipelineConfig: The number of days that grams from an event should be labeled as more relevant. Default: 30 (grams from events less than 30 days old will generally be valued higher than their pure relevance score) + local_storage_dir: Union[str, Path] + The local storage directory to store the chunked index prior to upload. + Default: "index/" (current working directory / index) """ google_credentials_file: str @@ -105,6 +109,7 @@ class EventIndexPipelineConfig: default=None, ) datetime_weighting_days_decay: int = 30 + local_storage_dir: Union[str, Path] = "index/" @property def validated_gcs_bucket_name(self) -> str: diff --git a/cdp_backend/pipeline/upload_event_index_pipeline.py b/cdp_backend/pipeline/upload_event_index_pipeline.py new file mode 100644 index 00000000..f92e3a77 --- /dev/null +++ b/cdp_backend/pipeline/upload_event_index_pipeline.py @@ -0,0 +1,134 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- + +from pathlib import Path +from typing import Dict, List, NamedTuple, Union + +import fireo +import pandas as pd +from prefect import Flow, task, unmapped + +from ..database import functions as db_functions +from ..database import models as db_models +from .pipeline_config import EventIndexPipelineConfig + +############################################################################### + + +class AlmostCompleteIndexedEventGram(NamedTuple): + event_id: str + unstemmed_gram: str + stemmed_gram: str + context_span: str + value: float + datetime_weighted_value: float + + +@task +def chunk_n_grams( + n_grams_df: pd.DataFrame, +) -> List[List[AlmostCompleteIndexedEventGram]]: + """ + Split the large n_grams dataframe into multiple lists of IndexedEventGram models + for batched, mapped, upload. + """ + # Split single large dataframe into many dataframes + chunk_size = 500 + n_grams_dfs = [ + n_grams_df[i : i + chunk_size] + for i in range(0, n_grams_df.shape[0], chunk_size) + ] + + # Convert each dataframe into a list of indexed event gram + event_gram_chunks: List[List[AlmostCompleteIndexedEventGram]] = [] + for n_gram_df_chunk in n_grams_dfs: + event_gram_chunk: List[AlmostCompleteIndexedEventGram] = [] + for _, row in n_gram_df_chunk.iterrows(): + event_gram_chunk.append( + AlmostCompleteIndexedEventGram( + event_id=row.event_id, + unstemmed_gram=row.unstemmed_gram, + stemmed_gram=row.stemmed_gram, + context_span=row.context_span, + value=row.tfidf, + datetime_weighted_value=row.datetime_weighted_tfidf, + ) + ) + + event_gram_chunks.append(event_gram_chunk) + + return event_gram_chunks + + +@task +def store_n_gram_chunk( + n_gram_chunk: List[AlmostCompleteIndexedEventGram], + credentials_file: str, +) -> None: + """ + Write all IndexedEventGrams in a single batch. + + This isn't about an atomic batch but reducing the total upload time. + """ + # Init batch + batch = fireo.batch() + + # Trigger upserts for all items + event_lut: Dict[str, db_models.Event] = {} + for almost_complete_ieg in n_gram_chunk: + if almost_complete_ieg.event_id not in event_lut: + event_lut[almost_complete_ieg.event_id] = db_models.Event.collection.get( + f"{db_models.Event.collection_name}/{almost_complete_ieg.event_id}" + ) + + # Construct true ieg + ieg = db_models.IndexedEventGram() + ieg.event_ref = event_lut[almost_complete_ieg.event_id] + ieg.unstemmed_gram = almost_complete_ieg.unstemmed_gram + ieg.stemmed_gram = almost_complete_ieg.stemmed_gram + ieg.context_span = almost_complete_ieg.context_span + ieg.value = almost_complete_ieg.value + ieg.datetime_weighted_value = almost_complete_ieg.datetime_weighted_value + + db_functions.upload_db_model( + db_model=ieg, + credentials_file=credentials_file, + batch=batch, + ) + + # Commit + batch.commit() + + +def create_event_index_upload_pipeline( + config: EventIndexPipelineConfig, + index_chunk: Union[str, Path], +) -> Flow: + """ + Create the Prefect Flow object to preview, run, or visualize for uploading a + generated index for all events in the database. + + Parameters + ---------- + config: EventIndexPipelineConfig + Configuration options for the pipeline. + n_grams: int + N number of terms to act as a unique entity. Default: 1 + + Returns + ------- + flow: Flow + The constructed CDP Event Index Pipeline as a Prefect Flow. + """ + # Read index chunk + index_chunk = pd.read_parquet(index_chunk) + + with Flow("CDP Event Index Pipeline") as flow: + # Route to remote database storage + chunked_scored_n_grams = chunk_n_grams(index_chunk) + store_n_gram_chunk.map( + n_gram_chunk=chunked_scored_n_grams, + credentials_file=unmapped(config.google_credentials_file), + ) + + return flow diff --git a/cdp_backend/tests/pipeline/test_event_index_pipeline.py b/cdp_backend/tests/pipeline/test_event_index_pipeline.py index 289b8af0..5ac5a58c 100644 --- a/cdp_backend/tests/pipeline/test_event_index_pipeline.py +++ b/cdp_backend/tests/pipeline/test_event_index_pipeline.py @@ -17,7 +17,7 @@ from cdp_backend.database import functions as db_functions from cdp_backend.database import models as db_models -from cdp_backend.pipeline import event_index_pipeline as pipeline +from cdp_backend.pipeline import generate_event_index_pipeline as pipeline from cdp_backend.pipeline.pipeline_config import EventIndexPipelineConfig from cdp_backend.utils.file_utils import resource_copy @@ -34,7 +34,7 @@ # # great system stdlib :upsidedownface: -PIPELINE_PATH = "cdp_backend.pipeline.event_index_pipeline" +PIPELINE_PATH = "cdp_backend.pipeline.generate_event_index_pipeline" ############################################################################# @@ -42,7 +42,7 @@ @pytest.mark.parametrize("n_grams", [1, 2, 3]) @pytest.mark.parametrize("store_local", [True, False]) def test_create_event_index_flow(n_grams: int, store_local: bool) -> None: - flow = pipeline.create_event_index_pipeline( + flow = pipeline.create_event_index_generation_pipeline( config=EventIndexPipelineConfig("/fake/creds.json", "doesn't-matter"), n_grams=n_grams, store_local=store_local, @@ -270,7 +270,7 @@ def copy_test_file(rpath: str, lpath: str) -> None: mocked_file_get.side_effect = copy_test_file # Run pipeline to local storage - flow = pipeline.create_event_index_pipeline( + flow = pipeline.create_event_index_generation_pipeline( config=EventIndexPipelineConfig("/fake/creds.json", "doesn't-matter"), n_grams=1, store_local=True, From a5bb8722175b9877131d8f0488475cb94444a107 Mon Sep 17 00:00:00 2001 From: JacksonMaxfield Date: Thu, 3 Mar 2022 16:06:49 -0800 Subject: [PATCH 04/15] Rename bin script --- setup.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/setup.py b/setup.py index 8fe959f2..a0f14be2 100644 --- a/setup.py +++ b/setup.py @@ -127,7 +127,10 @@ "cdp_backend.bin.create_cdp_event_gather_flow_viz:main" ), "run_cdp_event_gather=cdp_backend.bin.run_cdp_event_gather:main", - "run_cdp_event_index=cdp_backend.bin.run_cdp_event_index:main", + ( + "run_cdp_event_index_generation=" + "cdp_backend.bin.run_cdp_event_index_generation:main" + ), "search_cdp_events=cdp_backend.bin.search_cdp_events:main", "process_special_event=cdp_backend.bin.process_special_event:main", ( From 600b237d73485d4a4c043c1a7a5a5017a856125b Mon Sep 17 00:00:00 2001 From: JacksonMaxfield Date: Thu, 10 Mar 2022 13:37:19 -0800 Subject: [PATCH 05/15] Fix tests and lint --- cdp_backend/tests/pipeline/test_event_index_pipeline.py | 9 +++------ 1 file changed, 3 insertions(+), 6 deletions(-) diff --git a/cdp_backend/tests/pipeline/test_event_index_pipeline.py b/cdp_backend/tests/pipeline/test_event_index_pipeline.py index 5ac5a58c..6a418096 100644 --- a/cdp_backend/tests/pipeline/test_event_index_pipeline.py +++ b/cdp_backend/tests/pipeline/test_event_index_pipeline.py @@ -40,12 +40,10 @@ @pytest.mark.parametrize("n_grams", [1, 2, 3]) -@pytest.mark.parametrize("store_local", [True, False]) -def test_create_event_index_flow(n_grams: int, store_local: bool) -> None: +def test_create_event_index_flow(n_grams: int) -> None: flow = pipeline.create_event_index_generation_pipeline( config=EventIndexPipelineConfig("/fake/creds.json", "doesn't-matter"), n_grams=n_grams, - store_local=store_local, ) assert isinstance(flow, Flow) @@ -273,14 +271,13 @@ def copy_test_file(rpath: str, lpath: str) -> None: flow = pipeline.create_event_index_generation_pipeline( config=EventIndexPipelineConfig("/fake/creds.json", "doesn't-matter"), n_grams=1, - store_local=True, ) state = flow.run() assert state.is_successful() # Compare produced index expected_values = pd.read_parquet(resources_dir / "expected_1_gram_index.parquet") - result_values = pd.read_parquet("tfidf-1.parquet") + result_values = pd.read_parquet("index/n_gram-1-index_chunk_0.parquet") # Sort dataframes and reset indices to ensure consistency expected_values = expected_values.sort_values(by="stemmed_gram").reset_index( @@ -306,4 +303,4 @@ def copy_test_file(rpath: str, lpath: str) -> None: pd._testing.assert_frame_equal(result_values, expected_values) # Cleanup - os.remove("tfidf-1.parquet") + os.remove("index/n_gram-1-index_chunk_0.parquet") From 837fce332a37a22c4181a8367eb9111314c64101 Mon Sep 17 00:00:00 2001 From: JacksonMaxfield Date: Sun, 12 Jun 2022 08:28:47 -0700 Subject: [PATCH 06/15] Add workflow and bin for upload, cleanup generate --- .../bin/upload_cdp_event_index_chunk.py | 120 ++++++++++++++++++ .../pipeline/generate_event_index_pipeline.py | 12 +- setup.py | 4 + 3 files changed, 131 insertions(+), 5 deletions(-) create mode 100644 cdp_backend/bin/upload_cdp_event_index_chunk.py diff --git a/cdp_backend/bin/upload_cdp_event_index_chunk.py b/cdp_backend/bin/upload_cdp_event_index_chunk.py new file mode 100644 index 00000000..4cd7fd57 --- /dev/null +++ b/cdp_backend/bin/upload_cdp_event_index_chunk.py @@ -0,0 +1,120 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- + +import argparse +import logging +import sys +import traceback +from pathlib import Path + +from distributed import LocalCluster +from prefect import executors + +from cdp_backend.pipeline import upload_event_index_pipeline as pipeline +from cdp_backend.pipeline.pipeline_config import EventIndexPipelineConfig + +############################################################################### + +logging.basicConfig( + level=logging.INFO, + format="[%(levelname)4s: %(module)s:%(lineno)4s %(asctime)s] %(message)s", +) +log = logging.getLogger(__name__) + +############################################################################### + + +class Args(argparse.Namespace): + def __init__(self) -> None: + self.__parse() + + def __parse(self) -> None: + p = argparse.ArgumentParser( + prog="upload_cdp_event_index_chunk", + description=( + "Upload a single index chunk to a CDP database." + ), + ) + p.add_argument( + "config_file", + type=Path, + help=( + "Path to the pipeline configuration file. " + "See cdp_backend.pipeline.pipeline_config.EventIndexPipelineConfig " + "for more details." + ), + ) + p.add_argument( + "chunk", + type=Path, + help="Filepath to the parquet index chunk to upload.", + ) + p.add_argument( + "-p", + "--parallel", + action="store_true", + dest="parallel", + help=( + "Boolean option to spin up a local multi-threaded " + "Dask Distributed cluster for event processing." + ), + ) + + p.parse_args(namespace=self) + + +def main() -> None: + try: + args = Args() + with open(args.config_file, "r") as open_resource: + config = EventIndexPipelineConfig.from_json( # type: ignore + open_resource.read() + ) + + # Get flow definition + flow = pipeline.create_event_index_upload_pipeline( + config=config, + index_chunk=args.chunk, + ) + + # Determine executor + if args.parallel: + # Create local cluster + log.info("Creating LocalCluster") + cluster = LocalCluster(processes=False) + log.info("Created LocalCluster") + + # Set distributed_executor_address + distributed_executor_address = cluster.scheduler_address + + # Log dashboard URI + log.info(f"Dask dashboard available at: {cluster.dashboard_link}") + + # Use dask cluster + state = flow.run( + executor=executors.DaskExecutor(address=distributed_executor_address), + ) + + # Shutdown cluster after run + cluster.close() + + else: + state = flow.run() + + if state.is_failed(): + raise ValueError("Flow run failed.") + + except Exception as e: + log.error("=============================================") + log.error("\n\n" + traceback.format_exc()) + log.error("=============================================") + log.error("\n\n" + str(e) + "\n") + log.error("=============================================") + sys.exit(1) + + +############################################################################### +# Allow caller to directly run this module (usually in development scenarios) + +if __name__ == "__main__": + main() diff --git a/cdp_backend/pipeline/generate_event_index_pipeline.py b/cdp_backend/pipeline/generate_event_index_pipeline.py index 6764cc32..0e111d92 100644 --- a/cdp_backend/pipeline/generate_event_index_pipeline.py +++ b/cdp_backend/pipeline/generate_event_index_pipeline.py @@ -60,7 +60,7 @@ def get_transcripts(credentials_file: str) -> List[db_models.Transcript]: return db_functions.get_all_of_collection( db_model=db_models.Transcript, credentials_file=credentials_file, - )[:20] + ) @task @@ -361,11 +361,13 @@ def chunk_index( storage_dir.mkdir(parents=True) # Split single large dataframe into many dataframes - chunk_size = 100_000 - for i in range(0, n_grams_df.shape[0], chunk_size): - n_grams_chunk = n_grams_df[i : i + chunk_size] + chunk_size = 50_000 + for chunk_index, chunk_offset in enumerate( + range(0, n_grams_df.shape[0], chunk_size) + ): + n_grams_chunk = n_grams_df[chunk_offset : chunk_offset + chunk_size] n_grams_chunk.to_parquet( - storage_dir / f"n_gram-{n_grams}-index_chunk_{i}.parquet" + storage_dir / f"n_gram-{n_grams}-index_chunk-{chunk_index}.parquet" ) diff --git a/setup.py b/setup.py index 88404573..61144108 100644 --- a/setup.py +++ b/setup.py @@ -133,6 +133,10 @@ "run_cdp_event_index_generation=" "cdp_backend.bin.run_cdp_event_index_generation:main" ), + ( + "upload_cdp_event_index_chunk=" + "cdp_backend.bin.upload_cdp_event_index_chunk:main" + ), "search_cdp_events=cdp_backend.bin.search_cdp_events:main", "process_special_event=cdp_backend.bin.process_special_event:main", ( From da80b166fe83115705fb69ff8548c515282f8313 Mon Sep 17 00:00:00 2001 From: JacksonMaxfield Date: Sun, 12 Jun 2022 08:35:00 -0700 Subject: [PATCH 07/15] Lint and format --- cdp_backend/bin/upload_cdp_event_index_chunk.py | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/cdp_backend/bin/upload_cdp_event_index_chunk.py b/cdp_backend/bin/upload_cdp_event_index_chunk.py index 4cd7fd57..c0b9d9c8 100644 --- a/cdp_backend/bin/upload_cdp_event_index_chunk.py +++ b/cdp_backend/bin/upload_cdp_event_index_chunk.py @@ -31,9 +31,7 @@ def __init__(self) -> None: def __parse(self) -> None: p = argparse.ArgumentParser( prog="upload_cdp_event_index_chunk", - description=( - "Upload a single index chunk to a CDP database." - ), + description=("Upload a single index chunk to a CDP database."), ) p.add_argument( "config_file", From 0407c49405b470ac146d5e639983c0dcedff260d Mon Sep 17 00:00:00 2001 From: JacksonMaxfield Date: Sun, 12 Jun 2022 08:43:07 -0700 Subject: [PATCH 08/15] Move pyarrow for test to pipeline deps --- setup.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/setup.py b/setup.py index 61144108..e9c81881 100644 --- a/setup.py +++ b/setup.py @@ -29,6 +29,7 @@ "nltk~=3.6", "pandas~=1.0", "prefect~=1.2", + "pyarrow~=5.0", "rapidfuzz~=2.0", "spacy~=3.0", "truecase==0.0.14", @@ -46,7 +47,6 @@ "isort>=5.7.0", "mypy>=0.790", "networkx>=2.5", - "pyarrow>=5.0", "pydot>=1.4", "pytest>=5.4.3", "pytest-cov>=2.9.0", From fde67fe2f4cbf11e276817f43c7bb0e8623c93f5 Mon Sep 17 00:00:00 2001 From: JacksonMaxfield Date: Sun, 12 Jun 2022 09:40:13 -0700 Subject: [PATCH 09/15] Fix filenaming in pipeline and tests --- cdp_backend/pipeline/generate_event_index_pipeline.py | 2 +- cdp_backend/tests/pipeline/test_event_index_pipeline.py | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/cdp_backend/pipeline/generate_event_index_pipeline.py b/cdp_backend/pipeline/generate_event_index_pipeline.py index 0e111d92..7257b93a 100644 --- a/cdp_backend/pipeline/generate_event_index_pipeline.py +++ b/cdp_backend/pipeline/generate_event_index_pipeline.py @@ -367,7 +367,7 @@ def chunk_index( ): n_grams_chunk = n_grams_df[chunk_offset : chunk_offset + chunk_size] n_grams_chunk.to_parquet( - storage_dir / f"n_gram-{n_grams}-index_chunk-{chunk_index}.parquet" + storage_dir / f"n_gram-{n_grams}--index_chunk-{chunk_index}.parquet" ) diff --git a/cdp_backend/tests/pipeline/test_event_index_pipeline.py b/cdp_backend/tests/pipeline/test_event_index_pipeline.py index 6a418096..0539c695 100644 --- a/cdp_backend/tests/pipeline/test_event_index_pipeline.py +++ b/cdp_backend/tests/pipeline/test_event_index_pipeline.py @@ -277,7 +277,7 @@ def copy_test_file(rpath: str, lpath: str) -> None: # Compare produced index expected_values = pd.read_parquet(resources_dir / "expected_1_gram_index.parquet") - result_values = pd.read_parquet("index/n_gram-1-index_chunk_0.parquet") + result_values = pd.read_parquet("index/n_gram-1--index_chunk-0.parquet") # Sort dataframes and reset indices to ensure consistency expected_values = expected_values.sort_values(by="stemmed_gram").reset_index( @@ -303,4 +303,4 @@ def copy_test_file(rpath: str, lpath: str) -> None: pd._testing.assert_frame_equal(result_values, expected_values) # Cleanup - os.remove("index/n_gram-1-index_chunk_0.parquet") + os.remove("index/n_gram-1--index_chunk-0.parquet") From ad8fbc4f045036fa7aa2c7d9d7dc15ead9c3286b Mon Sep 17 00:00:00 2001 From: JacksonMaxfield Date: Mon, 13 Jun 2022 11:34:36 -0700 Subject: [PATCH 10/15] Start on store remote work --- .../pipeline/generate_event_index_pipeline.py | 30 ++++++++++++++++--- 1 file changed, 26 insertions(+), 4 deletions(-) diff --git a/cdp_backend/pipeline/generate_event_index_pipeline.py b/cdp_backend/pipeline/generate_event_index_pipeline.py index 7257b93a..331c70f7 100644 --- a/cdp_backend/pipeline/generate_event_index_pipeline.py +++ b/cdp_backend/pipeline/generate_event_index_pipeline.py @@ -24,6 +24,7 @@ from ..utils import string_utils from .pipeline_config import EventIndexPipelineConfig from .transcript_model import Sentence, Transcript +from ..file_store import functions as fs_functions ############################################################################### @@ -346,11 +347,16 @@ def compute_tfidf( def chunk_index( n_grams_df: pd.DataFrame, n_grams: int, + credentials_file: str, + bucket_name: str, storage_dir: Path = Path("index/"), + store_remote: bool = False, ) -> None: """ Split the large n_grams dataframe into multiple lists of IndexedEventGram models for batched, mapped, upload. + + Optionally store to cloud firestore. """ # Clean the storage dir storage_dir = Path(storage_dir) @@ -366,14 +372,24 @@ def chunk_index( range(0, n_grams_df.shape[0], chunk_size) ): n_grams_chunk = n_grams_df[chunk_offset : chunk_offset + chunk_size] - n_grams_chunk.to_parquet( - storage_dir / f"n_gram-{n_grams}--index_chunk-{chunk_index}.parquet" - ) + save_filename = f"n_gram-{n_grams}--index_chunk-{chunk_index}.parquet" + local_chunk_path = storage_dir / save_filename + n_grams_chunk.to_parquet(local_chunk_path) + + # Optional remote storage + if store_remote: + fs_functions.upload_file( + credentials_file=credentials_file, + bucket=bucket_name, + filepath=local_chunk_path, + save_name=f"index-chunks/{save_filename}" + ) def create_event_index_generation_pipeline( config: EventIndexPipelineConfig, n_grams: int = 1, + store_remote: bool = False, ) -> Flow: """ Create the Prefect Flow object to preview, run, or visualize for indexing @@ -385,6 +401,9 @@ def create_event_index_generation_pipeline( Configuration options for the pipeline. n_grams: int N number of terms to act as a unique entity. Default: 1 + store_remote: bool + Should the generated index chunks be sent to cloud storage. + Default: False (only store locally) Returns ------- @@ -440,11 +459,14 @@ def create_event_index_generation_pipeline( datetime_weighting_days_decay=config.datetime_weighting_days_decay, ) - # Route to remote database storage + # Create index chunks and store local and optional remote chunk_index( n_grams_df=scored_n_grams, n_grams=n_grams, + credentials_file=config.google_credentials_file, + bucket_name=config.validated_gcs_bucket_name, storage_dir=config.local_storage_dir, + store_remote=store_remote, ) return flow From 824f41f1774648a47f798c947cc08d2edd2ddb48 Mon Sep 17 00:00:00 2001 From: JacksonMaxfield Date: Mon, 13 Jun 2022 15:25:24 -0700 Subject: [PATCH 11/15] Upgrade pyarrow version --- setup.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/setup.py b/setup.py index e9c81881..5254c99e 100644 --- a/setup.py +++ b/setup.py @@ -29,7 +29,7 @@ "nltk~=3.6", "pandas~=1.0", "prefect~=1.2", - "pyarrow~=5.0", + "pyarrow~=8.0", "rapidfuzz~=2.0", "spacy~=3.0", "truecase==0.0.14", @@ -134,8 +134,8 @@ "cdp_backend.bin.run_cdp_event_index_generation:main" ), ( - "upload_cdp_event_index_chunk=" - "cdp_backend.bin.upload_cdp_event_index_chunk:main" + "process_cdp_event_index_chunk=" + "cdp_backend.bin.process_cdp_event_index_chunk:main" ), "search_cdp_events=cdp_backend.bin.search_cdp_events:main", "process_special_event=cdp_backend.bin.process_special_event:main", From a5d8a77093b471cb2e043ab3f6092b44d955fb2f Mon Sep 17 00:00:00 2001 From: JacksonMaxfield Date: Mon, 13 Jun 2022 15:26:02 -0700 Subject: [PATCH 12/15] Add download functionality and finish processing index chunks --- ...nk.py => process_cdp_event_index_chunk.py} | 11 ++++--- .../bin/run_cdp_event_index_generation.py | 13 +++++++- cdp_backend/file_store/functions.py | 13 ++++++++ .../pipeline/generate_event_index_pipeline.py | 16 +++++---- ... => process_event_index_chunk_pipeline.py} | 33 ++++++++++++++++--- 5 files changed, 69 insertions(+), 17 deletions(-) rename cdp_backend/bin/{upload_cdp_event_index_chunk.py => process_cdp_event_index_chunk.py} (89%) rename cdp_backend/pipeline/{upload_event_index_pipeline.py => process_event_index_chunk_pipeline.py} (83%) diff --git a/cdp_backend/bin/upload_cdp_event_index_chunk.py b/cdp_backend/bin/process_cdp_event_index_chunk.py similarity index 89% rename from cdp_backend/bin/upload_cdp_event_index_chunk.py rename to cdp_backend/bin/process_cdp_event_index_chunk.py index c0b9d9c8..c10f2969 100644 --- a/cdp_backend/bin/upload_cdp_event_index_chunk.py +++ b/cdp_backend/bin/process_cdp_event_index_chunk.py @@ -10,7 +10,7 @@ from distributed import LocalCluster from prefect import executors -from cdp_backend.pipeline import upload_event_index_pipeline as pipeline +from cdp_backend.pipeline import process_event_index_chunk_pipeline as pipeline from cdp_backend.pipeline.pipeline_config import EventIndexPipelineConfig ############################################################################### @@ -30,8 +30,11 @@ def __init__(self) -> None: def __parse(self) -> None: p = argparse.ArgumentParser( - prog="upload_cdp_event_index_chunk", - description=("Upload a single index chunk to a CDP database."), + prog="process_cdp_event_index_chunk", + description=( + "Download a single event index chunk from remote storage, " + "then process it and upload ngrams to a CDP database." + ), ) p.add_argument( "config_file", @@ -45,7 +48,7 @@ def __parse(self) -> None: p.add_argument( "chunk", type=Path, - help="Filepath to the parquet index chunk to upload.", + help="Filename for the parquet index chunk to process and upload.", ) p.add_argument( "-p", diff --git a/cdp_backend/bin/run_cdp_event_index_generation.py b/cdp_backend/bin/run_cdp_event_index_generation.py index 034e5114..9e3c376d 100644 --- a/cdp_backend/bin/run_cdp_event_index_generation.py +++ b/cdp_backend/bin/run_cdp_event_index_generation.py @@ -30,7 +30,7 @@ def __init__(self) -> None: def __parse(self) -> None: p = argparse.ArgumentParser( - prog="run_cdp_event_index", + prog="run_cdp_event_index_generation", description=( "Index all event (session) transcripts from a CDP infrastructure." ), @@ -51,6 +51,16 @@ def __parse(self) -> None: default=1, help="N number of terms to act as a unique entity.", ) + p.add_argument( + "-s", + "--store_remote", + action="store_true", + dest="store_remote", + help=( + "Store chunks to remote cloud storage. " + "Required to add a search index." + ), + ) p.add_argument( "-p", "--parallel", @@ -77,6 +87,7 @@ def main() -> None: flow = pipeline.create_event_index_generation_pipeline( config=config, n_grams=args.n_grams, + store_remote=args.store_remote, ) # Determine executor diff --git a/cdp_backend/file_store/functions.py b/cdp_backend/file_store/functions.py index ce013a5f..c051d8ec 100644 --- a/cdp_backend/file_store/functions.py +++ b/cdp_backend/file_store/functions.py @@ -126,6 +126,19 @@ def upload_file( return save_url +def download_file( + credentials_file: str, + bucket: str, + remote_filepath: str, + save_path: str, +) -> Path: + fs = initialize_gcs_file_system(credentials_file) + print(remote_filepath) + print(save_path) + fs.get(f"{bucket}/{remote_filepath}", save_path) + return save_path + + def get_open_url_for_gcs_file(credentials_file: str, uri: str) -> str: """ Simple wrapper around fsspec.FileSystem.url function for creating a connection diff --git a/cdp_backend/pipeline/generate_event_index_pipeline.py b/cdp_backend/pipeline/generate_event_index_pipeline.py index 331c70f7..df9dd675 100644 --- a/cdp_backend/pipeline/generate_event_index_pipeline.py +++ b/cdp_backend/pipeline/generate_event_index_pipeline.py @@ -21,10 +21,12 @@ from ..database import functions as db_functions from ..database import models as db_models +from ..file_store import functions as fs_functions from ..utils import string_utils from .pipeline_config import EventIndexPipelineConfig from .transcript_model import Sentence, Transcript -from ..file_store import functions as fs_functions + +REMOTE_INDEX_CHUNK_DIR = "index-chunks" ############################################################################### @@ -378,12 +380,12 @@ def chunk_index( # Optional remote storage if store_remote: - fs_functions.upload_file( - credentials_file=credentials_file, - bucket=bucket_name, - filepath=local_chunk_path, - save_name=f"index-chunks/{save_filename}" - ) + fs_functions.upload_file( + credentials_file=credentials_file, + bucket=bucket_name, + filepath=local_chunk_path, + save_name=f"{REMOTE_INDEX_CHUNK_DIR}/{save_filename}", + ) def create_event_index_generation_pipeline( diff --git a/cdp_backend/pipeline/upload_event_index_pipeline.py b/cdp_backend/pipeline/process_event_index_chunk_pipeline.py similarity index 83% rename from cdp_backend/pipeline/upload_event_index_pipeline.py rename to cdp_backend/pipeline/process_event_index_chunk_pipeline.py index f92e3a77..8127e53a 100644 --- a/cdp_backend/pipeline/upload_event_index_pipeline.py +++ b/cdp_backend/pipeline/process_event_index_chunk_pipeline.py @@ -10,6 +10,8 @@ from ..database import functions as db_functions from ..database import models as db_models +from ..file_store import functions as fs_functions +from .generate_event_index_pipeline import REMOTE_INDEX_CHUNK_DIR from .pipeline_config import EventIndexPipelineConfig ############################################################################### @@ -24,14 +26,31 @@ class AlmostCompleteIndexedEventGram(NamedTuple): datetime_weighted_value: float +@task +def pull_chunk( + credentials_file: str, + bucket_name: str, + filename: str, +) -> str: + return fs_functions.download_file( + credentials_file=credentials_file, + bucket=bucket_name, + remote_filepath=filename, + save_path=filename, + ) + + @task def chunk_n_grams( - n_grams_df: pd.DataFrame, + chunk_path: str, ) -> List[List[AlmostCompleteIndexedEventGram]]: """ Split the large n_grams dataframe into multiple lists of IndexedEventGram models for batched, mapped, upload. """ + # Read index chunk + n_grams_df = pd.read_parquet(chunk_path) + # Split single large dataframe into many dataframes chunk_size = 500 n_grams_dfs = [ @@ -120,12 +139,16 @@ def create_event_index_upload_pipeline( flow: Flow The constructed CDP Event Index Pipeline as a Prefect Flow. """ - # Read index chunk - index_chunk = pd.read_parquet(index_chunk) - with Flow("CDP Event Index Pipeline") as flow: + # Pull the file + index_chunk_local = pull_chunk( + credentials_file=config.google_credentials_file, + bucket_name=config.validated_gcs_bucket_name, + filename=f"{REMOTE_INDEX_CHUNK_DIR}/{index_chunk}", + ) + # Route to remote database storage - chunked_scored_n_grams = chunk_n_grams(index_chunk) + chunked_scored_n_grams = chunk_n_grams(index_chunk_local) store_n_gram_chunk.map( n_gram_chunk=chunked_scored_n_grams, credentials_file=unmapped(config.google_credentials_file), From 652dea131524eb4f49977e195a37c86c42f8b844 Mon Sep 17 00:00:00 2001 From: JacksonMaxfield Date: Mon, 13 Jun 2022 15:28:20 -0700 Subject: [PATCH 13/15] gitignore, and lint and format --- .gitignore | 1 + cdp_backend/file_store/functions.py | 4 +--- cdp_backend/pipeline/generate_event_index_pipeline.py | 2 +- 3 files changed, 3 insertions(+), 4 deletions(-) diff --git a/.gitignore b/.gitignore index 469c26df..ff9b0a39 100644 --- a/.gitignore +++ b/.gitignore @@ -125,6 +125,7 @@ test.err test.out tfidf-*.parquet index/ +index-chunks/ 7490ea6cf56648d60a40dd334e46e5d7de0f31dde0c7ce4d85747896fdd2ab42-* abc123-cdp* *-person-picture diff --git a/cdp_backend/file_store/functions.py b/cdp_backend/file_store/functions.py index c051d8ec..0d63413d 100644 --- a/cdp_backend/file_store/functions.py +++ b/cdp_backend/file_store/functions.py @@ -131,10 +131,8 @@ def download_file( bucket: str, remote_filepath: str, save_path: str, -) -> Path: +) -> str: fs = initialize_gcs_file_system(credentials_file) - print(remote_filepath) - print(save_path) fs.get(f"{bucket}/{remote_filepath}", save_path) return save_path diff --git a/cdp_backend/pipeline/generate_event_index_pipeline.py b/cdp_backend/pipeline/generate_event_index_pipeline.py index df9dd675..0591defb 100644 --- a/cdp_backend/pipeline/generate_event_index_pipeline.py +++ b/cdp_backend/pipeline/generate_event_index_pipeline.py @@ -383,7 +383,7 @@ def chunk_index( fs_functions.upload_file( credentials_file=credentials_file, bucket=bucket_name, - filepath=local_chunk_path, + filepath=str(local_chunk_path), save_name=f"{REMOTE_INDEX_CHUNK_DIR}/{save_filename}", ) From 148b5c744a22276dd15c7474346d816b617ea56a Mon Sep 17 00:00:00 2001 From: JacksonMaxfield Date: Mon, 13 Jun 2022 16:02:48 -0700 Subject: [PATCH 14/15] Mock indexing tests --- .../tests/pipeline/test_event_index_pipeline.py | 12 +++++++++++- 1 file changed, 11 insertions(+), 1 deletion(-) diff --git a/cdp_backend/tests/pipeline/test_event_index_pipeline.py b/cdp_backend/tests/pipeline/test_event_index_pipeline.py index 0539c695..c23ad996 100644 --- a/cdp_backend/tests/pipeline/test_event_index_pipeline.py +++ b/cdp_backend/tests/pipeline/test_event_index_pipeline.py @@ -39,8 +39,14 @@ ############################################################################# +@mock.patch("gcsfs.credentials.GoogleCredentials.connect") +@mock.patch(f"{PIPELINE_PATH}.EventIndexPipelineConfig.validated_gcs_bucket_name") @pytest.mark.parametrize("n_grams", [1, 2, 3]) -def test_create_event_index_flow(n_grams: int) -> None: +def test_create_event_index_flow( + mocked_validated_bucket_name: MagicMock, + mocked_gcs_connect: MagicMock, + n_grams: int, +) -> None: flow = pipeline.create_event_index_generation_pipeline( config=EventIndexPipelineConfig("/fake/creds.json", "doesn't-matter"), n_grams=n_grams, @@ -225,9 +231,11 @@ def test_get_transcripts_per_event( @mock.patch(f"{PIPELINE_PATH}.get_transcripts.run") @mock.patch("gcsfs.credentials.GoogleCredentials.connect") +@mock.patch(f"{PIPELINE_PATH}.EventIndexPipelineConfig.validated_gcs_bucket_name") @mock.patch("gcsfs.GCSFileSystem.get") def test_mocked_pipeline_run( mocked_file_get: MagicMock, + mocked_validated_bucket_name: MagicMock, mocked_gcs_connect: MagicMock, mocked_get_transcript_models: MagicMock, resources_dir: Path, @@ -248,6 +256,8 @@ def test_mocked_pipeline_run( session_three_transcript_one, ] + mocked_validated_bucket_name.return_value = "doesn't-matter" + def copy_test_file(rpath: str, lpath: str) -> None: if "fake_captions.json" in rpath: resource_copy( From 3f8c9bf48c789f15a15c51e253dd4b17438aab1f Mon Sep 17 00:00:00 2001 From: JacksonMaxfield Date: Wed, 15 Jun 2022 10:08:51 -0700 Subject: [PATCH 15/15] Parametrize chunk and batch sizes --- cdp_backend/bin/process_cdp_event_index_chunk.py | 7 +++++++ cdp_backend/bin/run_cdp_event_index_generation.py | 7 +++++++ .../pipeline/generate_event_index_pipeline.py | 11 ++++++++--- .../process_event_index_chunk_pipeline.py | 15 +++++++++++---- 4 files changed, 33 insertions(+), 7 deletions(-) diff --git a/cdp_backend/bin/process_cdp_event_index_chunk.py b/cdp_backend/bin/process_cdp_event_index_chunk.py index c10f2969..0f471146 100644 --- a/cdp_backend/bin/process_cdp_event_index_chunk.py +++ b/cdp_backend/bin/process_cdp_event_index_chunk.py @@ -50,6 +50,12 @@ def __parse(self) -> None: type=Path, help="Filename for the parquet index chunk to process and upload.", ) + p.add_argument( + "--upload_batch_size", + type=int, + default=500, + help="Number of ngrams to upload to database in a single batch.", + ) p.add_argument( "-p", "--parallel", @@ -76,6 +82,7 @@ def main() -> None: flow = pipeline.create_event_index_upload_pipeline( config=config, index_chunk=args.chunk, + upload_batch_size=args.upload_batch_size, ) # Determine executor diff --git a/cdp_backend/bin/run_cdp_event_index_generation.py b/cdp_backend/bin/run_cdp_event_index_generation.py index 9e3c376d..79e5cb48 100644 --- a/cdp_backend/bin/run_cdp_event_index_generation.py +++ b/cdp_backend/bin/run_cdp_event_index_generation.py @@ -51,6 +51,12 @@ def __parse(self) -> None: default=1, help="N number of terms to act as a unique entity.", ) + p.add_argument( + "--ngrams_per_chunk", + type=int, + default=50_000, + help="Number of ngrams to store in a single chunk file.", + ) p.add_argument( "-s", "--store_remote", @@ -87,6 +93,7 @@ def main() -> None: flow = pipeline.create_event_index_generation_pipeline( config=config, n_grams=args.n_grams, + ngrams_per_chunk=args.ngrams_per_chunk, store_remote=args.store_remote, ) diff --git a/cdp_backend/pipeline/generate_event_index_pipeline.py b/cdp_backend/pipeline/generate_event_index_pipeline.py index 0591defb..881004cd 100644 --- a/cdp_backend/pipeline/generate_event_index_pipeline.py +++ b/cdp_backend/pipeline/generate_event_index_pipeline.py @@ -351,6 +351,7 @@ def chunk_index( n_grams: int, credentials_file: str, bucket_name: str, + ngrams_per_chunk: int = 50_000, storage_dir: Path = Path("index/"), store_remote: bool = False, ) -> None: @@ -369,11 +370,10 @@ def chunk_index( storage_dir.mkdir(parents=True) # Split single large dataframe into many dataframes - chunk_size = 50_000 for chunk_index, chunk_offset in enumerate( - range(0, n_grams_df.shape[0], chunk_size) + range(0, n_grams_df.shape[0], ngrams_per_chunk) ): - n_grams_chunk = n_grams_df[chunk_offset : chunk_offset + chunk_size] + n_grams_chunk = n_grams_df[chunk_offset : chunk_offset + ngrams_per_chunk] save_filename = f"n_gram-{n_grams}--index_chunk-{chunk_index}.parquet" local_chunk_path = storage_dir / save_filename n_grams_chunk.to_parquet(local_chunk_path) @@ -391,6 +391,7 @@ def chunk_index( def create_event_index_generation_pipeline( config: EventIndexPipelineConfig, n_grams: int = 1, + ngrams_per_chunk: int = 50_000, store_remote: bool = False, ) -> Flow: """ @@ -403,6 +404,9 @@ def create_event_index_generation_pipeline( Configuration options for the pipeline. n_grams: int N number of terms to act as a unique entity. Default: 1 + ngrams_per_chunks: int + The number of ngrams to store in a single chunk file. + Default: 50_000 store_remote: bool Should the generated index chunks be sent to cloud storage. Default: False (only store locally) @@ -467,6 +471,7 @@ def create_event_index_generation_pipeline( n_grams=n_grams, credentials_file=config.google_credentials_file, bucket_name=config.validated_gcs_bucket_name, + ngrams_per_chunk=ngrams_per_chunk, storage_dir=config.local_storage_dir, store_remote=store_remote, ) diff --git a/cdp_backend/pipeline/process_event_index_chunk_pipeline.py b/cdp_backend/pipeline/process_event_index_chunk_pipeline.py index 8127e53a..f8ea6d82 100644 --- a/cdp_backend/pipeline/process_event_index_chunk_pipeline.py +++ b/cdp_backend/pipeline/process_event_index_chunk_pipeline.py @@ -43,6 +43,7 @@ def pull_chunk( @task def chunk_n_grams( chunk_path: str, + upload_batch_size: int = 500, ) -> List[List[AlmostCompleteIndexedEventGram]]: """ Split the large n_grams dataframe into multiple lists of IndexedEventGram models @@ -52,10 +53,9 @@ def chunk_n_grams( n_grams_df = pd.read_parquet(chunk_path) # Split single large dataframe into many dataframes - chunk_size = 500 n_grams_dfs = [ - n_grams_df[i : i + chunk_size] - for i in range(0, n_grams_df.shape[0], chunk_size) + n_grams_df[i : i + upload_batch_size] + for i in range(0, n_grams_df.shape[0], upload_batch_size) ] # Convert each dataframe into a list of indexed event gram @@ -122,6 +122,7 @@ def store_n_gram_chunk( def create_event_index_upload_pipeline( config: EventIndexPipelineConfig, index_chunk: Union[str, Path], + upload_batch_size: int = 500, ) -> Flow: """ Create the Prefect Flow object to preview, run, or visualize for uploading a @@ -133,6 +134,9 @@ def create_event_index_upload_pipeline( Configuration options for the pipeline. n_grams: int N number of terms to act as a unique entity. Default: 1 + upload_batch_size: int + Number of ngrams to upload to database in a single batch. + Default: 500 (max) Returns ------- @@ -148,7 +152,10 @@ def create_event_index_upload_pipeline( ) # Route to remote database storage - chunked_scored_n_grams = chunk_n_grams(index_chunk_local) + chunked_scored_n_grams = chunk_n_grams( + index_chunk_local, + upload_batch_size=upload_batch_size, + ) store_n_gram_chunk.map( n_gram_chunk=chunked_scored_n_grams, credentials_file=unmapped(config.google_credentials_file),