Skip to content

Commit

Permalink
Data Ingestion Improvement/Cleanup/Bug Fix - Part 2 (#307)
Browse files Browse the repository at this point in the history
Code ready for review, evaluations/testing is still in progress

### Description
- This is **part 2** of the data ingestion improvements. The goal is to
analyze and investigate any potential issues with the data ingestion
process, remove noisy data such as badly formatted content or gibberish
text, find bugs, and enforce standards on the process to have
predictable cost estimation/reduction.

### Technical Changes
- Renamed the original `split.py` file to `chunking_utils.py` (multiple
files changed due to import naming)
- During the above change, found 2 DAGs somehow were NOT changed.
Provider docs and SDK docs's incremental ingest DAGs somehow never had a
chunking/splitting task (even though the bulk ingest does). I added
chunking to these 2 DAGs
- Airflow Docs extraction process has been improved(see code for
details)
- Provider Docs extraction process has been improved (see code for
details)
- Astro SDK Docs extraction process has been improved (see code for
details) -> Astro SDK docs are not currently being ingested, but fixing
code since they exist.
- Provider docs used to ingest all past versions leading to many
duplicate docs in the DB. It is now changes to only ingest the `stable`
version of the docs.

### Evaluations
- Answer quality generally improved with some add on of more concise
answering and linking hyperlinks when the model knows the answer is not
exhaustive.
- Please attached for a quick list of results
- 

[data_ingest_comparison_part_2.csv](https://github.com/astronomer/ask-astro/files/14484682/data_ingest_comparison_part_2.csv)

[data_ingest_results_part_2.csv](https://github.com/astronomer/ask-astro/files/14484683/data_ingest_results_part_2.csv)
- Bulk ingest also runs with no issues
<img width="668" alt="image"
src="https://github.com/astronomer/ask-astro/assets/26350341/635c1ba0-b6b0-4fcd-81ed-8c337f7b3f78">

### Related Issues
closes #221
closes #258 
closes #295 (Reranker has been addressed in GCP environment variables,
embedding model change completed in a different PR)
closes #285 (This PR prevents empty docs from being ingested)
  • Loading branch information
davidgxue committed Mar 4, 2024
1 parent d8b0e1d commit df80ddc
Show file tree
Hide file tree
Showing 19 changed files with 139 additions and 56 deletions.
2 changes: 1 addition & 1 deletion airflow/Dockerfile
Original file line number Diff line number Diff line change
@@ -1 +1 @@
FROM quay.io/astronomer/astro-runtime:10.0.0
FROM quay.io/astronomer/astro-runtime:10.1.0
4 changes: 2 additions & 2 deletions airflow/dags/ingestion/ask-astro-forum-load.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,9 @@ def get_astro_forum_content():
),
)
def ask_astro_load_astro_forum():
from include.tasks import split
from include.tasks import chunking_utils

split_docs = task(split.split_html).expand(dfs=[get_astro_forum_content()])
split_docs = task(chunking_utils.split_html).expand(dfs=[get_astro_forum_content()])

_import_data = WeaviateDocumentIngestOperator.partial(
class_name=WEAVIATE_CLASS,
Expand Down
4 changes: 2 additions & 2 deletions airflow/dags/ingestion/ask-astro-load-airflow-docs.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,10 @@ def ask_astro_load_airflow_docs():
data from a point-in-time data capture. By using the upsert logic of the weaviate_import decorator
any existing documents that have been updated will be removed and re-added.
"""
from include.tasks import split
from include.tasks import chunking_utils
from include.tasks.extract import airflow_docs

extracted_airflow_docs = task(split.split_html).expand(
extracted_airflow_docs = task(chunking_utils.split_html).expand(
dfs=[airflow_docs.extract_airflow_docs(docs_base_url=airflow_docs_base_url)]
)

Expand Down
4 changes: 2 additions & 2 deletions airflow/dags/ingestion/ask-astro-load-astro-cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,11 @@ def ask_astro_load_astro_cli_docs():
data from a point-in-time data capture. By using the upsert logic of the weaviate_import decorator
any existing documents that have been updated will be removed and re-added.
"""
from include.tasks import split
from include.tasks import chunking_utils
from include.tasks.extract import astro_cli_docs

extract_astro_cli_docs = task(astro_cli_docs.extract_astro_cli_docs)()
split_md_docs = task(split.split_html).expand(dfs=[extract_astro_cli_docs])
split_md_docs = task(chunking_utils.split_html).expand(dfs=[extract_astro_cli_docs])

_import_data = WeaviateDocumentIngestOperator.partial(
class_name=WEAVIATE_CLASS,
Expand Down
6 changes: 5 additions & 1 deletion airflow/dags/ingestion/ask-astro-load-astro-sdk.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,10 @@ def get_astro_sdk_content():
),
)
def ask_astro_load_astro_sdk():
from include.tasks import chunking_utils

split_docs = task(chunking_utils.split_html).expand(dfs=[get_astro_sdk_content()])

_import_data = WeaviateDocumentIngestOperator.partial(
class_name=WEAVIATE_CLASS,
existing="replace",
Expand All @@ -45,7 +49,7 @@ def ask_astro_load_astro_sdk():
verbose=True,
conn_id=_WEAVIATE_CONN_ID,
task_id="WeaviateDocumentIngestOperator",
).expand(input_data=[get_astro_sdk_content()])
).expand(input_data=[split_docs])


ask_astro_load_astro_sdk()
4 changes: 2 additions & 2 deletions airflow/dags/ingestion/ask-astro-load-astronomer-docs.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,12 +31,12 @@ def ask_astro_load_astronomer_docs():
"""
This DAG performs incremental load for any new docs in astronomer docs.
"""
from include.tasks import split
from include.tasks import chunking_utils
from include.tasks.extract.astro_docs import extract_astro_docs

astro_docs = task(extract_astro_docs)()

split_html_docs = task(split.split_html).expand(dfs=[astro_docs])
split_html_docs = task(chunking_utils.split_html).expand(dfs=[astro_docs])

_import_data = WeaviateDocumentIngestOperator.partial(
class_name=WEAVIATE_CLASS,
Expand Down
6 changes: 5 additions & 1 deletion airflow/dags/ingestion/ask-astro-load-astronomer-provider.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,10 @@ def ask_astro_load_astronomer_providers():
any existing documents that have been updated will be removed and re-added.
"""

from include.tasks import chunking_utils

split_docs = task(chunking_utils.split_html).expand(dfs=[get_provider_content()])

_import_data = WeaviateDocumentIngestOperator.partial(
class_name=WEAVIATE_CLASS,
existing="replace",
Expand All @@ -51,7 +55,7 @@ def ask_astro_load_astronomer_providers():
verbose=True,
conn_id=_WEAVIATE_CONN_ID,
task_id="WeaviateDocumentIngestOperator",
).expand(input_data=[get_provider_content()])
).expand(input_data=[split_docs])


ask_astro_load_astronomer_providers()
4 changes: 2 additions & 2 deletions airflow/dags/ingestion/ask-astro-load-blogs.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,12 +34,12 @@ def ask_astro_load_blogs():
data from a point-in-time data capture. By using the upsert logic of the weaviate_import decorator
any existing documents that have been updated will be removed and re-added.
"""
from include.tasks import split
from include.tasks import chunking_utils
from include.tasks.extract import blogs

blogs_docs = task(blogs.extract_astro_blogs)(blog_cutoff_date=blog_cutoff_date)

split_md_docs = task(split.split_markdown).expand(dfs=[blogs_docs])
split_md_docs = task(chunking_utils.split_markdown).expand(dfs=[blogs_docs])

_import_data = WeaviateDocumentIngestOperator.partial(
class_name=WEAVIATE_CLASS,
Expand Down
4 changes: 2 additions & 2 deletions airflow/dags/ingestion/ask-astro-load-cosmos-docs.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,9 @@ def ask_astro_load_cosmos_docs():
any existing documents that have been updated will be removed and re-added.
"""

from include.tasks import split
from include.tasks import chunking_utils

split_docs = task(split.split_html).expand(dfs=[extract_cosmos_docs()])
split_docs = task(chunking_utils.split_html).expand(dfs=[extract_cosmos_docs()])

_import_data = WeaviateDocumentIngestOperator.partial(
class_name=WEAVIATE_CLASS,
Expand Down
4 changes: 2 additions & 2 deletions airflow/dags/ingestion/ask-astro-load-github.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ def ask_astro_load_github():
data from a point-in-time data capture. By using the upsert logic of the weaviate_import decorator
any existing documents that have been updated will be removed and re-added.
"""
from include.tasks import split
from include.tasks import chunking_utils
from include.tasks.extract import github

md_docs = (
Expand All @@ -58,7 +58,7 @@ def ask_astro_load_github():
.expand(repo_base=issues_docs_sources)
)

split_md_docs = task(split.split_markdown).expand(dfs=[md_docs, issues_docs])
split_md_docs = task(chunking_utils.split_markdown).expand(dfs=[md_docs, issues_docs])

_import_data = WeaviateDocumentIngestOperator.partial(
class_name=WEAVIATE_CLASS,
Expand Down
6 changes: 3 additions & 3 deletions airflow/dags/ingestion/ask-astro-load-registry.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,16 +32,16 @@ def ask_astro_load_registry():
data from a point-in-time data capture. By using the upsert logic of the weaviate_import decorator
any existing documents that have been updated will be removed and re-added.
"""
from include.tasks import split
from include.tasks import chunking_utils
from include.tasks.extract import registry

registry_cells_docs = task(registry.extract_astro_registry_cell_types)()

registry_dags_docs = task(registry.extract_astro_registry_dags)()

split_md_docs = task(split.split_markdown).expand(dfs=[registry_cells_docs])
split_md_docs = task(chunking_utils.split_markdown).expand(dfs=[registry_cells_docs])

split_code_docs = task(split.split_python).expand(dfs=[registry_dags_docs])
split_code_docs = task(chunking_utils.split_python).expand(dfs=[registry_dags_docs])

_import_data = WeaviateDocumentIngestOperator.partial(
class_name=WEAVIATE_CLASS,
Expand Down
4 changes: 2 additions & 2 deletions airflow/dags/ingestion/ask-astro-load-slack.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,12 +42,12 @@ def ask_astro_load_slack():
DAG should run nightly to capture threads between archive periods. By using the upsert logic of the
weaviate_import decorator any existing documents that have been updated will be removed and re-added.
"""
from include.tasks import split
from include.tasks import chunking_utils
from include.tasks.extract import slack

slack_docs = task(slack.extract_slack).expand(source=slack_channel_sources)

split_md_docs = task(split.split_markdown).expand(dfs=[slack_docs])
split_md_docs = task(chunking_utils.split_markdown).expand(dfs=[slack_docs])

_import_data = WeaviateDocumentIngestOperator.partial(
class_name=WEAVIATE_CLASS,
Expand Down
4 changes: 2 additions & 2 deletions airflow/dags/ingestion/ask-astro-load-stackoverflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ def ask_astro_load_stackoverflow():
data from a point-in-time data capture. By using the upsert logic of the weaviate_import decorator
any existing documents that have been updated will be removed and re-added.
"""
from include.tasks import split
from include.tasks import chunking_utils
from include.tasks.extract import stack_overflow

stack_overflow_docs = (
Expand All @@ -47,7 +47,7 @@ def ask_astro_load_stackoverflow():
.expand(tag=stackoverflow_tags)
)

split_md_docs = task(split.split_markdown).expand(dfs=[stack_overflow_docs])
split_md_docs = task(chunking_utils.split_markdown).expand(dfs=[stack_overflow_docs])

_import_data = WeaviateDocumentIngestOperator.partial(
class_name=WEAVIATE_CLASS,
Expand Down
8 changes: 4 additions & 4 deletions airflow/dags/ingestion/ask-astro-load.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ def ask_astro_load_bulk():
"""

from include.tasks import split
from include.tasks import chunking_utils

@task
def get_schema_and_process(schema_file: str) -> list:
Expand Down Expand Up @@ -432,11 +432,11 @@ def import_baseline(

python_code_tasks = [registry_dags_docs]

split_md_docs = task(split.split_markdown).expand(dfs=markdown_tasks)
split_md_docs = task(chunking_utils.split_markdown).expand(dfs=markdown_tasks)

split_code_docs = task(split.split_python).expand(dfs=python_code_tasks)
split_code_docs = task(chunking_utils.split_python).expand(dfs=python_code_tasks)

split_html_docs = task(split.split_html).expand(dfs=html_tasks)
split_html_docs = task(chunking_utils.split_html).expand(dfs=html_tasks)

_import_data = WeaviateDocumentIngestOperator.partial(
class_name=WEAVIATE_CLASS,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,13 +1,28 @@
from __future__ import annotations

import logging

import pandas as pd
import tiktoken
from langchain.schema import Document
from langchain.text_splitter import (
Language,
RecursiveCharacterTextSplitter,
)
from langchain_community.document_transformers import Html2TextTransformer

logger = logging.getLogger("airflow.task")

TARGET_CHUNK_SIZE = 2500


def enforce_max_token_len(text: str) -> str:
encoding = tiktoken.get_encoding("cl100k_base")
encoded_text = encoding.encode(text)
if len(encoded_text) > 8191:
logger.info("Token length of string exceeds the max content length of the tokenizer. Truncating...")
return encoding.decode(encoded_text[:8191])
return text


def split_markdown(dfs: list[pd.DataFrame]) -> pd.DataFrame:
"""
Expand All @@ -27,11 +42,25 @@ def split_markdown(dfs: list[pd.DataFrame]) -> pd.DataFrame:

df = pd.concat(dfs, axis=0, ignore_index=True)

splitter = RecursiveCharacterTextSplitter(chunk_size=4000, chunk_overlap=200, separators=["\n\n", "\n", " ", ""])
# directly from the langchain splitter library
separators = ["\n\n", "\n", " ", ""]
splitter = RecursiveCharacterTextSplitter.from_tiktoken_encoder(
# cl100k_base is used for text ada 002 and later embedding models
encoding_name="cl100k_base",
chunk_size=TARGET_CHUNK_SIZE,
chunk_overlap=200,
separators=separators,
is_separator_regex=True,
)

df["doc_chunks"] = df["content"].apply(lambda x: splitter.split_documents([Document(page_content=x)]))
df = df.explode("doc_chunks", ignore_index=True)
df["content"] = df["doc_chunks"].apply(lambda x: x.page_content)

# Remove blank doc chunks
df = df[~df["content"].apply(lambda x: x.isspace() or x == "")]

df["content"] = df["content"].apply(enforce_max_token_len)
df.drop(["doc_chunks"], inplace=True, axis=1)
df.reset_index(inplace=True, drop=True)

Expand All @@ -56,15 +85,36 @@ def split_python(dfs: list[pd.DataFrame]) -> pd.DataFrame:

df = pd.concat(dfs, axis=0, ignore_index=True)

splitter = RecursiveCharacterTextSplitter.from_language(
language=Language.PYTHON,
# chunk_size=50,
chunk_overlap=0,
# directly from the langchain splitter library
python_separators = [
# First, try to split along class definitions
"\nclass ",
"\ndef ",
"\n\tdef ",
# Now split by the normal type of lines
"\n\n",
"\n",
" ",
"",
]

splitter = RecursiveCharacterTextSplitter.from_tiktoken_encoder(
# cl100k_base is used for text ada 002 and later embedding models
encoding_name="cl100k_base",
chunk_size=TARGET_CHUNK_SIZE,
chunk_overlap=200,
separators=python_separators,
is_separator_regex=True,
)

df["doc_chunks"] = df["content"].apply(lambda x: splitter.split_documents([Document(page_content=x)]))
df = df.explode("doc_chunks", ignore_index=True)
df["content"] = df["doc_chunks"].apply(lambda x: x.page_content)

# Remove blank doc chunks
df = df[~df["content"].apply(lambda x: x.isspace() or x == "")]

df["content"] = df["content"].apply(enforce_max_token_len)
df.drop(["doc_chunks"], inplace=True, axis=1)
df.reset_index(inplace=True, drop=True)

Expand Down Expand Up @@ -93,9 +143,10 @@ def split_html(dfs: list[pd.DataFrame]) -> pd.DataFrame:
splitter = RecursiveCharacterTextSplitter.from_tiktoken_encoder(
# cl100k_base is used for text ada 002 and later embedding models
encoding_name="cl100k_base",
chunk_size=4000,
chunk_size=TARGET_CHUNK_SIZE,
chunk_overlap=200,
separators=separators,
is_separator_regex=True,
)

# Split by chunking first
Expand All @@ -110,17 +161,8 @@ def split_html(dfs: list[pd.DataFrame]) -> pd.DataFrame:
# Remove blank doc chunks
df = df[~df["content"].apply(lambda x: x.isspace() or x == "")]

df["content"] = df["content"].apply(enforce_max_token_len)
df.drop(["doc_chunks"], inplace=True, axis=1)
df.reset_index(inplace=True, drop=True)

return df


def split_list(urls: list[str], chunk_size: int = 0) -> list[list[str]]:
"""
split the list of string into chunk of list of string
param urls: URL list we want to chunk
param chunk_size: Max size of chunked list
"""
return [urls[i : i + chunk_size] for i in range(0, len(urls), chunk_size)]
5 changes: 2 additions & 3 deletions airflow/include/tasks/extract/airflow_docs.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,11 @@
import urllib.parse

import pandas as pd
import requests
from bs4 import BeautifulSoup
from weaviate.util import generate_uuid5

from airflow.decorators import task
from include.tasks.extract.utils.html_utils import get_internal_links
from include.tasks.extract.utils.html_utils import fetch_page_content, get_internal_links


@task
Expand Down Expand Up @@ -46,7 +45,7 @@ def extract_airflow_docs(docs_base_url: str) -> list[pd.DataFrame]:

df = pd.DataFrame(docs_links, columns=["docLink"])

df["html_content"] = df["docLink"].apply(lambda x: requests.get(x).content)
df["html_content"] = df["docLink"].apply(fetch_page_content)

df["content"] = df["html_content"].apply(
lambda x: str(BeautifulSoup(x, "html.parser").find(class_="body", role="main"))
Expand Down

0 comments on commit df80ddc

Please sign in to comment.