Skip to content

Commit

Permalink
feature/reduce-and-fan-ngram-index (#189)
Browse files Browse the repository at this point in the history
* Always upload 500 grams at a time

* Ignore all files in index/

* Reorg index generation from upload

* Rename bin script

* Fix tests and lint

* Add workflow and bin for upload, cleanup generate

* Lint and format

* Move pyarrow for test to pipeline deps

* Fix filenaming in pipeline and tests

* Start on store remote work

* Upgrade pyarrow version

* Add download functionality and finish processing index chunks

* gitignore, and lint and format

* Mock indexing tests

* Parametrize chunk and batch sizes
  • Loading branch information
Jackson Maxfield Brown committed Jun 15, 2022
1 parent c8b6b57 commit e100f8c
Show file tree
Hide file tree
Showing 9 changed files with 408 additions and 118 deletions.
2 changes: 2 additions & 0 deletions .gitignore
Expand Up @@ -124,6 +124,8 @@ transcript_model.md
test.err
test.out
tfidf-*.parquet
index/
index-chunks/
7490ea6cf56648d60a40dd334e46e5d7de0f31dde0c7ce4d85747896fdd2ab42-*
abc123-cdp*
*-person-picture
Expand Down
128 changes: 128 additions & 0 deletions cdp_backend/bin/process_cdp_event_index_chunk.py
@@ -0,0 +1,128 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-

import argparse
import logging
import sys
import traceback
from pathlib import Path

from distributed import LocalCluster
from prefect import executors

from cdp_backend.pipeline import process_event_index_chunk_pipeline as pipeline
from cdp_backend.pipeline.pipeline_config import EventIndexPipelineConfig

###############################################################################

logging.basicConfig(
level=logging.INFO,
format="[%(levelname)4s: %(module)s:%(lineno)4s %(asctime)s] %(message)s",
)
log = logging.getLogger(__name__)

###############################################################################


class Args(argparse.Namespace):
def __init__(self) -> None:
self.__parse()

def __parse(self) -> None:
p = argparse.ArgumentParser(
prog="process_cdp_event_index_chunk",
description=(
"Download a single event index chunk from remote storage, "
"then process it and upload ngrams to a CDP database."
),
)
p.add_argument(
"config_file",
type=Path,
help=(
"Path to the pipeline configuration file. "
"See cdp_backend.pipeline.pipeline_config.EventIndexPipelineConfig "
"for more details."
),
)
p.add_argument(
"chunk",
type=Path,
help="Filename for the parquet index chunk to process and upload.",
)
p.add_argument(
"--upload_batch_size",
type=int,
default=500,
help="Number of ngrams to upload to database in a single batch.",
)
p.add_argument(
"-p",
"--parallel",
action="store_true",
dest="parallel",
help=(
"Boolean option to spin up a local multi-threaded "
"Dask Distributed cluster for event processing."
),
)

p.parse_args(namespace=self)


def main() -> None:
try:
args = Args()
with open(args.config_file, "r") as open_resource:
config = EventIndexPipelineConfig.from_json( # type: ignore
open_resource.read()
)

# Get flow definition
flow = pipeline.create_event_index_upload_pipeline(
config=config,
index_chunk=args.chunk,
upload_batch_size=args.upload_batch_size,
)

# Determine executor
if args.parallel:
# Create local cluster
log.info("Creating LocalCluster")
cluster = LocalCluster(processes=False)
log.info("Created LocalCluster")

# Set distributed_executor_address
distributed_executor_address = cluster.scheduler_address

# Log dashboard URI
log.info(f"Dask dashboard available at: {cluster.dashboard_link}")

# Use dask cluster
state = flow.run(
executor=executors.DaskExecutor(address=distributed_executor_address),
)

# Shutdown cluster after run
cluster.close()

else:
state = flow.run()

if state.is_failed():
raise ValueError("Flow run failed.")

except Exception as e:
log.error("=============================================")
log.error("\n\n" + traceback.format_exc())
log.error("=============================================")
log.error("\n\n" + str(e) + "\n")
log.error("=============================================")
sys.exit(1)


###############################################################################
# Allow caller to directly run this module (usually in development scenarios)

if __name__ == "__main__":
main()
Expand Up @@ -10,7 +10,7 @@
from distributed import LocalCluster
from prefect import executors

from cdp_backend.pipeline import event_index_pipeline as pipeline
from cdp_backend.pipeline import generate_event_index_pipeline as pipeline
from cdp_backend.pipeline.pipeline_config import EventIndexPipelineConfig

###############################################################################
Expand All @@ -30,7 +30,7 @@ def __init__(self) -> None:

def __parse(self) -> None:
p = argparse.ArgumentParser(
prog="run_cdp_event_index",
prog="run_cdp_event_index_generation",
description=(
"Index all event (session) transcripts from a CDP infrastructure."
),
Expand All @@ -52,11 +52,19 @@ def __parse(self) -> None:
help="N number of terms to act as a unique entity.",
)
p.add_argument(
"-l",
"--store_local",
"--ngrams_per_chunk",
type=int,
default=50_000,
help="Number of ngrams to store in a single chunk file.",
)
p.add_argument(
"-s",
"--store_remote",
action="store_true",
dest="store_remote",
help=(
"Should the pipeline store the generated index to a local parquet file."
"Store chunks to remote cloud storage. "
"Required to add a search index."
),
)
p.add_argument(
Expand All @@ -82,10 +90,11 @@ def main() -> None:
)

# Get flow definition
flow = pipeline.create_event_index_pipeline(
flow = pipeline.create_event_index_generation_pipeline(
config=config,
n_grams=args.n_grams,
store_local=args.store_local,
ngrams_per_chunk=args.ngrams_per_chunk,
store_remote=args.store_remote,
)

# Determine executor
Expand Down
11 changes: 11 additions & 0 deletions cdp_backend/file_store/functions.py
Expand Up @@ -126,6 +126,17 @@ def upload_file(
return save_url


def download_file(
credentials_file: str,
bucket: str,
remote_filepath: str,
save_path: str,
) -> str:
fs = initialize_gcs_file_system(credentials_file)
fs.get(f"{bucket}/{remote_filepath}", save_path)
return save_path


def get_open_url_for_gcs_file(credentials_file: str, uri: str) -> str:
"""
Simple wrapper around fsspec.FileSystem.url function for creating a connection
Expand Down

0 comments on commit e100f8c

Please sign in to comment.