Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Asynchronous function invocation support #368

Draft
wants to merge 1 commit into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions cel/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ go_test(
"//common/types/ref:go_default_library",
"//common/types/traits:go_default_library",
"//interpreter/functions:go_default_library",
"//test:go_default_library",
"//test/proto2pb:go_default_library",
"//test/proto3pb:go_default_library",
"@io_bazel_rules_go//proto/wkt:descriptor_go_proto",
Expand Down
98 changes: 98 additions & 0 deletions cel/cel_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,13 @@
package cel

import (
"context"
"fmt"
"io/ioutil"
"log"
"sync"
"testing"
"time"

"github.com/golang/protobuf/proto"
"github.com/google/cel-go/checker/decls"
Expand All @@ -32,6 +34,7 @@ import (
"github.com/google/cel-go/interpreter"
"github.com/google/cel-go/interpreter/functions"
"github.com/google/cel-go/parser"
"github.com/google/cel-go/test"

descpb "github.com/golang/protobuf/protoc-gen-go/descriptor"
proto2pb "github.com/google/cel-go/test/proto2pb"
Expand Down Expand Up @@ -876,3 +879,98 @@ func Test_CustomInterpreterDecorator(t *testing.T) {
t.Errorf("got %v as the last observed constant, wanted 1", lastConst)
}
}

func Test_AsyncExtension(t *testing.T) {
env, err := NewEnv(
Declarations(
decls.NewVar("x", decls.String),
decls.NewFunction("asyncEcho",
decls.NewOverload(
"async_echo_string",
[]*exprpb.Type{decls.String},
decls.String)),
),
)
if err != nil {
t.Fatal(err)
}
funcs := Functions(
&functions.Overload{
Operator: "asyncEcho",
Async: test.FakeRPC(25 * time.Millisecond),
},
&functions.Overload{
Operator: "async_echo_string",
Async: test.FakeRPC(25 * time.Millisecond),
},
)

tests := []struct {
expr string
parseOnly bool
evalOpts EvalOption
out ref.Val
}{
{
expr: `asyncEcho(x)`,
out: types.String("async echo success!"),
},
{
expr: `asyncEcho(x)`,
parseOnly: true,
out: types.String("async echo success!"),
},
{
expr: `asyncEcho(x)`,
evalOpts: OptOptimize,
out: types.String("async echo success!"),
},
{
expr: `asyncEcho(x) == 'async echo success!'`,
evalOpts: OptOptimize | OptTrackState,
out: types.True,
},
{
expr: `asyncEcho(x) == 'async echo success!' || true`,
evalOpts: OptOptimize | OptTrackState,
out: types.True,
},
}
for i, tst := range tests {
tc := tst
t.Run(fmt.Sprintf("%d", i), func(tt *testing.T) {
var ast *Ast
var iss *Issues
if tc.parseOnly {
ast, iss = env.Parse(tc.expr)
} else {
ast, iss = env.Compile(tc.expr)
}
if iss.Err() != nil {
tt.Fatal(iss.Err())
}
opts := []ProgramOption{funcs}
if tc.evalOpts != 0 {
opts = append(opts, EvalOptions(tc.evalOpts))
}
prg, err := env.AsyncProgram(ast, opts...)
if err != nil {
tt.Fatal(err)
}
ctx := context.TODO()
out, det, err := prg.AsyncEval(ctx, map[string]interface{}{
"x": "async echo",
})
if err != nil {
tt.Fatal(err)
}
if out.Equal(tc.out) != types.True {
tt.Errorf("got %v, wanted %v", out, tc.out)
}
if tc.evalOpts&OptTrackState == OptTrackState && det == nil {
tt.Error("details was nil, expected non-nil")
}
})
}

}
19 changes: 9 additions & 10 deletions cel/env.go
Original file line number Diff line number Diff line change
Expand Up @@ -307,14 +307,13 @@ func (e *Env) ParseSource(src common.Source) (*Ast, *Issues) {

// Program generates an evaluable instance of the Ast within the environment (Env).
func (e *Env) Program(ast *Ast, opts ...ProgramOption) (Program, error) {
optSet := e.progOpts
if len(opts) != 0 {
mergedOpts := []ProgramOption{}
mergedOpts = append(mergedOpts, e.progOpts...)
mergedOpts = append(mergedOpts, opts...)
optSet = mergedOpts
}
return newProgram(e, ast, optSet)
return e.newProgram(ast, opts /* async= */, false)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's odd to have the comma between the comment and the field it documents, here and below.

}

// AsyncProgram generates an evaluable instance of the Ast with support for asynchronous extension
// functions.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's unclear what an "asynchronous extension function" is.

func (e *Env) AsyncProgram(ast *Ast, opts ...ProgramOption) (AsyncProgram, error) {
return e.newProgram(ast, opts /* async= */, true)
}

// SetFeature sets the given feature flag, as enumerated in options.go.
Expand Down Expand Up @@ -427,8 +426,8 @@ func (i *Issues) Err() error {
if i == nil {
return nil
}
if len(i.errs.GetErrors()) > 0 {
return errors.New(i.errs.ToDisplayString())
if len(i.Errors()) > 0 {
return errors.New(i.String())
}
return nil
}
Expand Down