Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feature/reduce-and-fan-ngram-index #189

Merged
merged 18 commits into from Jun 15, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 2 additions & 0 deletions .gitignore
Expand Up @@ -124,6 +124,8 @@ transcript_model.md
test.err
test.out
tfidf-*.parquet
index/
index-chunks/
7490ea6cf56648d60a40dd334e46e5d7de0f31dde0c7ce4d85747896fdd2ab42-*
abc123-cdp*
*-person-picture
Expand Down
128 changes: 128 additions & 0 deletions cdp_backend/bin/process_cdp_event_index_chunk.py
@@ -0,0 +1,128 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-

import argparse
import logging
import sys
import traceback
from pathlib import Path

from distributed import LocalCluster
from prefect import executors

from cdp_backend.pipeline import process_event_index_chunk_pipeline as pipeline
from cdp_backend.pipeline.pipeline_config import EventIndexPipelineConfig

###############################################################################

logging.basicConfig(
level=logging.INFO,
format="[%(levelname)4s: %(module)s:%(lineno)4s %(asctime)s] %(message)s",
)
log = logging.getLogger(__name__)

###############################################################################


class Args(argparse.Namespace):
def __init__(self) -> None:
self.__parse()

def __parse(self) -> None:
p = argparse.ArgumentParser(
prog="process_cdp_event_index_chunk",
description=(
"Download a single event index chunk from remote storage, "
"then process it and upload ngrams to a CDP database."
),
)
p.add_argument(
"config_file",
type=Path,
help=(
"Path to the pipeline configuration file. "
"See cdp_backend.pipeline.pipeline_config.EventIndexPipelineConfig "
"for more details."
),
)
p.add_argument(
"chunk",
type=Path,
help="Filename for the parquet index chunk to process and upload.",
)
p.add_argument(
"--upload_batch_size",
type=int,
default=500,
help="Number of ngrams to upload to database in a single batch.",
)
p.add_argument(
"-p",
"--parallel",
action="store_true",
dest="parallel",
help=(
"Boolean option to spin up a local multi-threaded "
"Dask Distributed cluster for event processing."
),
)

p.parse_args(namespace=self)


def main() -> None:
try:
args = Args()
with open(args.config_file, "r") as open_resource:
config = EventIndexPipelineConfig.from_json( # type: ignore
open_resource.read()
)

# Get flow definition
flow = pipeline.create_event_index_upload_pipeline(
config=config,
index_chunk=args.chunk,
upload_batch_size=args.upload_batch_size,
)

# Determine executor
if args.parallel:
# Create local cluster
log.info("Creating LocalCluster")
cluster = LocalCluster(processes=False)
log.info("Created LocalCluster")

# Set distributed_executor_address
distributed_executor_address = cluster.scheduler_address

# Log dashboard URI
log.info(f"Dask dashboard available at: {cluster.dashboard_link}")

# Use dask cluster
state = flow.run(
executor=executors.DaskExecutor(address=distributed_executor_address),
)

# Shutdown cluster after run
cluster.close()

else:
state = flow.run()

if state.is_failed():
raise ValueError("Flow run failed.")

except Exception as e:
log.error("=============================================")
log.error("\n\n" + traceback.format_exc())
log.error("=============================================")
log.error("\n\n" + str(e) + "\n")
log.error("=============================================")
sys.exit(1)


###############################################################################
# Allow caller to directly run this module (usually in development scenarios)

if __name__ == "__main__":
main()
Expand Up @@ -10,7 +10,7 @@
from distributed import LocalCluster
from prefect import executors

from cdp_backend.pipeline import event_index_pipeline as pipeline
from cdp_backend.pipeline import generate_event_index_pipeline as pipeline
from cdp_backend.pipeline.pipeline_config import EventIndexPipelineConfig

###############################################################################
Expand All @@ -30,7 +30,7 @@ def __init__(self) -> None:

def __parse(self) -> None:
p = argparse.ArgumentParser(
prog="run_cdp_event_index",
prog="run_cdp_event_index_generation",
description=(
"Index all event (session) transcripts from a CDP infrastructure."
),
Expand All @@ -52,11 +52,19 @@ def __parse(self) -> None:
help="N number of terms to act as a unique entity.",
)
p.add_argument(
"-l",
"--store_local",
"--ngrams_per_chunk",
type=int,
default=50_000,
help="Number of ngrams to store in a single chunk file.",
)
p.add_argument(
"-s",
"--store_remote",
action="store_true",
dest="store_remote",
help=(
"Should the pipeline store the generated index to a local parquet file."
"Store chunks to remote cloud storage. "
"Required to add a search index."
),
)
p.add_argument(
Expand All @@ -82,10 +90,11 @@ def main() -> None:
)

# Get flow definition
flow = pipeline.create_event_index_pipeline(
flow = pipeline.create_event_index_generation_pipeline(
config=config,
n_grams=args.n_grams,
store_local=args.store_local,
ngrams_per_chunk=args.ngrams_per_chunk,
store_remote=args.store_remote,
)

# Determine executor
Expand Down
11 changes: 11 additions & 0 deletions cdp_backend/file_store/functions.py
Expand Up @@ -126,6 +126,17 @@ def upload_file(
return save_url


def download_file(
evamaxfield marked this conversation as resolved.
Show resolved Hide resolved
credentials_file: str,
bucket: str,
remote_filepath: str,
save_path: str,
) -> str:
fs = initialize_gcs_file_system(credentials_file)
fs.get(f"{bucket}/{remote_filepath}", save_path)
return save_path


def get_open_url_for_gcs_file(credentials_file: str, uri: str) -> str:
"""
Simple wrapper around fsspec.FileSystem.url function for creating a connection
Expand Down