Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Build metadata endpoint and directory walker #760

Open
wants to merge 29 commits into
base: main
Choose a base branch
from

Conversation

hanyuli1995
Copy link
Collaborator

@hanyuli1995 hanyuli1995 commented May 6, 2024

Summary

We are supporting metadata upload to k-v store for key-value pair key->conf right now. We want to add a general class metadata endpoint to support more potential use cases.

This PR is to add two general class MetadataEndPoint and MetadataDirWalker

MetadataEndPoint:

case class MetadataEndPoint[Conf <: TBase[_, _]: Manifest: ClassTag](
    extractFn: (String, Conf) => (String, String),
    name: String
)

Defined with a extract function and an end point name. Extract function extracts the key-value pair from Conf(could be Join/GroupBy/StagingQuery) and file path(string). The name is the dataset name when we send the data to k-v store.

MetadataDirWalker:

class MetadataDirWalker(dirPath: String, metadataEndPointNames: List[String])

Go through the directory to iterate over all the config files and generate k-v pair metadata based on the metadata end points provided.

The PR adds two metadata endpoint ZIPLINE_METADATA and ZIPLINE_METADATA_BY_TEAM

CHRONON_METADATA:
key -> conf json in string format e.g : joins/team/team.example_join.v1 -> {...}

CHRONON_METADATA_BY_TEAM:
type/team -> list of key in string format e.g : joins/team -> a, b, c

Why / Goal

Test Plan

  • Added Unit Tests
  • Covered by existing CI
  • Integration tested

Checklist

  • Documentation update

Reviewers

@hanyuli1995 hanyuli1995 changed the title [WIP]Build metadata endpoint general class [WIP]Build metadata endpoint May 6, 2024
@@ -732,8 +732,16 @@ object Driver {
}

def run(args: Args): Unit = {
val putRequest = args.metaDataStore.putConf(args.confPath())
val res = Await.result(putRequest, 1.hour)
val acceptedEndPoints = List("ZIPLINE_METADATA", "ZIPLINE_METADATA_BY_TEAM")
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We probably can make this a passed-in config, WDYT? @nikhilsimha

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

agreed - we could take this from conf

else v.mkString("\n").getBytes()
PutRequest(keyBytes = kBytes,
valueBytes = vBytes,
dataset = dataset,
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am using the end point name as dataset here.

}

// derive a feature team key from config, e.g: joins/team
def confPathToTeamKey: Option[String] = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So we should make it so that we only parse the file once and run a bunch of extractors for the conf type.

So we need to maintain a list of (key, value) extractors for groupBys, joins and stagingQueries.

We simply parse once, run all the extractors per file, print it to console and write to metastore.

Copy link
Contributor

@nikhilsimha nikhilsimha left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added a high level comment

hanyuli1995 and others added 5 commits May 9, 2024 09:46
Co-authored-by: Nikhil <r.nikhilsimha@gmail.com>
Signed-off-by: hanyuli1995 <31667422+hanyuli1995@users.noreply.github.com>
@hanyuli1995 hanyuli1995 changed the title [WIP]Build metadata endpoint Build metadata endpoint and directory walker May 13, 2024
Comment on lines 77 to 78
* ZIPLINE_METADATA_BY_TEAM -> (team -> List("join1", "join2")),
* ZIPLINE_METADATA -> (teams/joins/join1 -> config1)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
* ZIPLINE_METADATA_BY_TEAM -> (team -> List("join1", "join2")),
* ZIPLINE_METADATA -> (teams/joins/join1 -> config1)
* CHRONON_METADATA_BY_TEAM -> (team -> List("join1", "join2")),
* CHRONON_METADATA -> (teams/joins/join1 -> config1)

Comment on lines 17 to 18
val ConfByKeyEndPointName = "ZIPLINE_METADATA"
val NameByTeamEndPointName = "ZIPLINE_METADATA_BY_TEAM"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
val ConfByKeyEndPointName = "ZIPLINE_METADATA"
val NameByTeamEndPointName = "ZIPLINE_METADATA_BY_TEAM"
val ConfByKeyEndPointName = "CHRONON_METADATA"
val NameByTeamEndPointName = "CHRONON_METADATA_BY_TEAM"

@@ -732,8 +732,14 @@ object Driver {
}

def run(args: Args): Unit = {
val putRequest = args.metaDataStore.putConf(args.confPath())
val res = Await.result(putRequest, 1.hour)
val acceptedEndPoints = List("ZIPLINE_METADATA")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's not use the old term zipline as much as we could.
Could you make those configurable or read from a config file or read from command line params?

Suggested change
val acceptedEndPoints = List("ZIPLINE_METADATA")
val acceptedEndPoints = List("CHRONON_METADATA")

@@ -39,7 +38,7 @@ case class DataMetrics(series: Seq[(Long, SortedMap[String, Any])])
class MetadataStore(kvStore: KVStore, val dataset: String = ChrononMetadataKey, timeoutMillis: Long) {
@transient implicit lazy val logger = LoggerFactory.getLogger(getClass)
private var partitionSpec = PartitionSpec(format = "yyyy-MM-dd", spanMillis = WindowUtils.Day.millis)
private val CONF_BATCH_SIZE = 50
private val CONF_BATCH_SIZE = 100
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we could make this configurable.
We had some put request batch oversize error from our KV store before. That's why we set it to 50 for the safer side.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep it is a configurable parameter in put function. https://github.com/airbnb/chronon/blob/19664365ace064476d49b6830c980612fa2627d5/online/src/main/scala/ai/chronon/online/MetadataStore.scala#L154c I increase this to 100 because 50 doesn't work well for groupBy(there are about 1k groupBy and 200 joins. )

Comment on lines 165 to 166
else if (v.length == 1) v.head.getBytes()
else v.mkString("\n").getBytes()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do we want this? If it is already bytes, what is the use of \n?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@better365 after the change the value could be a list of string. not sure what will be the value in k-v store if apply getBytes on a list instead of a string. I am trying to convert the list to a string first then apply toBytes

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see. What will be used for the list of the configs? it will be easier to code it out here.

@hanyuli1995
Copy link
Collaborator Author

@better365 I update the dataset name from zipline to chronon now. For airbnb use case we also need to change the code in mussel otherwise the job will still fail. Will raise another PR in treehouse once this PR get stamped.

Copy link
Contributor

@better365 better365 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Stamped. Please fix the failed UT.

val vBytes =
if (v.isEmpty) Array.emptyByteArray
else if (v.length == 1) v.head.getBytes()
else v.mkString(",").getBytes()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this will break if there are commas in a single member string of the seq.

Chatgpt gave me this - I haven't test it myself.

import java.util.Base64
import java.nio.charset.StandardCharsets

object StringArrayConverter {
  // Method to convert an array of strings to a byte array using Base64 encoding for each element
  def stringsToBytes(strings: Array[String]): Array[Byte] = {
    val base64EncodedStrings = strings.map(s => Base64.getEncoder.encodeToString(s.getBytes(StandardCharsets.UTF_8)))
    base64EncodedStrings.mkString(",").getBytes(StandardCharsets.UTF_8)
  }

  // Method to convert a byte array back to an array of strings by decoding Base64
  def bytesToStrings(bytes: Array[Byte]): Array[String] = {
    val encodedString = new String(bytes, StandardCharsets.UTF_8)
    encodedString.split(",").map(s => new String(Base64.getDecoder.decode(s), StandardCharsets.UTF_8))
  }
}

// Testing the methods
val originalStrings = Array("a", "b,c,d", "e.f.g", "h|i|j")
val bytes = StringArrayConverter.stringsToBytes(originalStrings)
val decodedStrings = StringArrayConverter.bytesToStrings(bytes)

println("Original Strings: " + originalStrings.mkString("[", ", ", "]"))
println("Decoded Strings: " + decodedStrings.mkString("[", ", ", "]"))

@@ -74,7 +74,10 @@ class FetcherTest extends TestCase {
val singleFileMetadataStore = new MetadataStore(inMemoryKvStore, singleFileDataSet, timeoutMillis = 10000)
inMemoryKvStore.create(singleFileDataSet)
// set the working directory to /chronon instead of $MODULE_DIR in configuration if Intellij fails testing
val singleFilePut = singleFileMetadataStore.putConf(confResource.getPath)
val acceptedEndPoints = List(MetadataEndPoint.ConfByKeyEndPointName)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should break it out into its own test. There is a metadataexportertest.scala file

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @nikhilsimha I need to update the tests here because the original putConf method has been removed from metadataStore file.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

3 participants