Skip to content

Commit

Permalink
lua-wip
Browse files Browse the repository at this point in the history
  • Loading branch information
serprex committed Mar 8, 2024
1 parent 559d82e commit 2dacc7a
Show file tree
Hide file tree
Showing 4 changed files with 54 additions and 12 deletions.
61 changes: 49 additions & 12 deletions flow/connectors/kafka/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"sync"

"github.com/twmb/franz-go/pkg/kgo"
"github.com/yuin/gopher-lua"
"go.temporal.io/sdk/log"

metadataStore "github.com/PeerDB-io/peer-flow/connectors/external_metadata"
Expand All @@ -21,12 +22,6 @@ type KafkaConnector struct {
logger log.Logger
}

type KafkaRecord struct {
Action string `json:"action"`
Old any `json:"old"`
New any `json:"new"`
}

func NewKafkaConnector(
ctx context.Context,
config *protos.KafkaConfig,
Expand Down Expand Up @@ -88,6 +83,23 @@ func (c *KafkaConnector) SyncFlowCleanup(ctx context.Context, jobName string) er
return c.pgMetadata.DropMetadata(ctx, jobName)
}

type LuaUserDataType[T any] struct{ Name string }

var LuaRecord = LuaUserDataType[model.Record]{Name: "peerdb_row"}

func RegisterTypes(L *lua.LState) {
mt := L.NewTypeMetatable(LuaRecord.Name)
L.SetField(mt, "__index", LuaRecordGet)
}

func (udt *LuaUserDataType[T]) Construct(ls *lua.LState, val T) *lua.LUserData {
return &lua.LUserData{
Value: val,
Env: ls.Env,
Metatable: ls.GetTypeMetatable(udt.Name),
}
}

func (c *KafkaConnector) SyncRecords(ctx context.Context, req *model.SyncRecordsRequest) (*model.SyncResponse, error) {
err := c.client.BeginTransaction()
if err != nil {
Expand All @@ -106,18 +118,38 @@ func (c *KafkaConnector) SyncRecords(ctx context.Context, req *model.SyncRecords
numRecords := int64(0)
tableNameRowsMapping := make(map[string]uint32)

var fn *lua.LFunction
var state *lua.LState
if req.Script != "" {
state = lua.NewState(lua.Options{SkipOpenLibs: true})
defer state.Close()
state.SetContext(wgCtx)
state.DoString(req.Script)

var ok bool
fn, ok = state.GetGlobal("onRow").(*lua.LFunction)
if !ok {
return nil, fmt.Errorf("Script should define `onRow` function")
}
} else {
return nil, fmt.Errorf("Kafka mirror must have script")
}

for record := range req.Records.GetRecords() {
if err := wgCtx.Err(); err != nil {
return nil, err
}
topic := "peerdb_" + record.GetDestinationTableName()
topic := record.GetDestinationTableName()
switch typedRecord := record.(type) {
case *model.InsertRecord:
insertData := KafkaRecord{
Old: nil,
New: typedRecord.Items.Values,
}

state.Push(KafkaRecord)
state.Call()
lfn(insertData)
insertJSON, err := json.Marshal(insertData)
if err != nil {
return nil, fmt.Errorf("failed to serialize insert data to JSON: %w", err)
Expand Down Expand Up @@ -157,11 +189,9 @@ func (c *KafkaConnector) SyncRecords(ctx context.Context, req *model.SyncRecords
}

numRecords += 1
tableNameRowsMapping[record.GetDestinationTableName()] += 1
tableNameRowsMapping[topic] += 1
}

lastCP := req.Records.GetLastCheckpoint()

// TODO handle
waitChan := make(chan struct{})
go func() {
Expand All @@ -182,10 +212,17 @@ func (c *KafkaConnector) SyncRecords(ctx context.Context, req *model.SyncRecords
return nil, fmt.Errorf("could not commit transaction: %w", err)
}

lastCheckpoint := req.Records.GetLastCheckpoint()
err = c.pgMetadata.FinishBatch(ctx, req.FlowJobName, req.SyncBatchID, lastCheckpoint)
if err != nil {
return nil, err
}

return &model.SyncResponse{
LastSyncedCheckpointID: lastCP,
NumRecordsSynced: numRecords,
CurrentSyncBatchID: req.SyncBatchID,
LastSyncedCheckpointID: lastCheckpoint,
NumRecordsSynced: numRecords,
TableNameRowsMapping: tableNameRowsMapping,
TableSchemaDeltas: req.Records.SchemaDeltas,
}, nil
}
1 change: 1 addition & 0 deletions flow/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ require (
github.com/twpayne/go-geos v0.16.1
github.com/urfave/cli/v3 v3.0.0-alpha9
github.com/youmark/pkcs8 v0.0.0-20201027041543-1326539a0a0a
github.com/yuin/gopher-lua v1.1.1
go.temporal.io/api v1.26.0
go.temporal.io/sdk v1.25.1
go.uber.org/automaxprocs v1.5.3
Expand Down
2 changes: 2 additions & 0 deletions flow/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -408,6 +408,8 @@ github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9de
github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
github.com/yuin/goldmark v1.3.5/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1Zlc8k=
github.com/yuin/goldmark v1.4.13/go.mod h1:6yULJ656Px+3vBD8DxQVa3kxgyrAnzto9xy5taEt/CY=
github.com/yuin/gopher-lua v1.1.1 h1:kYKnWBjvbNP4XLT3+bPEwAXJx262OhaHDWDVOPjL46M=
github.com/yuin/gopher-lua v1.1.1/go.mod h1:GBR0iDaNXjAgGg9zfCvksxSRnQx76gclCIb7kdAd1Pw=
github.com/zeebo/assert v1.3.0 h1:g7C04CbJuIDKNPFHmsk4hwZDO5O+kntRxzaUoNXj+IQ=
github.com/zeebo/assert v1.3.0/go.mod h1:Pq9JiuJQpG8JLJdtkwrJESF0Foym2/D9XMU5ciN/wJ0=
github.com/zeebo/xxh3 v1.0.2 h1:xZmwmqxHZA8AI603jOQ0tMqmBr9lPeFwGg6d+xy9DC0=
Expand Down
2 changes: 2 additions & 0 deletions flow/model/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,8 @@ type SyncRecordsRequest struct {
TableMappings []*protos.TableMapping
// Staging path for AVRO files in CDC
StagingPath string
// Lua script
Script string
}

type NormalizeRecordsRequest struct {
Expand Down

0 comments on commit 2dacc7a

Please sign in to comment.