Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adds ability to unregister workflows #650

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
44 changes: 44 additions & 0 deletions internal/internal_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -462,6 +462,33 @@ func (th *hostEnvImpl) RegisterWorkflowWithOptions(
return nil
}

func (th *hostEnvImpl) UnRegisterWorkflow(wf interface{}) error {
fnType := reflect.TypeOf(wf)

var registrationName string
var functioName string
if wfName, ok := wf.(string); ok {
// we suppose the workflow name is the alias here
registrationName = wfName
} else {
if err := validateFnFormat(fnType, true); err != nil {
return err
}
functioName = getFunctionName(wf)
if alias, ok := th.getWorkflowAlias(functioName); ok {
registrationName = alias
} else {
registrationName = functioName
}
}

th.delWorkflowFn(registrationName)
th.delWorkflowAliasByAlias(registrationName)

return nil
}


func (th *hostEnvImpl) RegisterActivity(af interface{}) error {
return th.RegisterActivityWithOptions(af, RegisterActivityOptions{})
}
Expand Down Expand Up @@ -498,6 +525,16 @@ func (th *hostEnvImpl) addWorkflowAlias(fnName string, alias string) {
th.workflowAliasMap[fnName] = alias
}

func (th *hostEnvImpl) delWorkflowAliasByAlias(alias string) {
th.Lock()
defer th.Unlock()
for k, v := range th.workflowAliasMap {
if v == alias {
delete(th.workflowAliasMap, k)
}
}
}

func (th *hostEnvImpl) getWorkflowAlias(fnName string) (string, bool) {
th.Lock()
defer th.Unlock()
Expand All @@ -511,6 +548,13 @@ func (th *hostEnvImpl) addWorkflowFn(fnName string, wf interface{}) {
th.workflowFuncMap[fnName] = wf
}

func (th *hostEnvImpl) delWorkflowFn(fnName string) {
th.Lock()
defer th.Unlock()
delete(th.workflowFuncMap, fnName)
}


func (th *hostEnvImpl) getWorkflowFn(fnName string) (interface{}, bool) {
th.Lock()
defer th.Unlock()
Expand Down
60 changes: 60 additions & 0 deletions internal/internal_worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -521,6 +521,66 @@ func (s *internalWorkerTestSuite) TestRecordActivityHeartbeatByID() {
require.NotNil(s.T(), heartbeatRequest)
}

func (s *internalWorkerTestSuite) TestWorkflowUnRegistration() {

wf1 := func(ctx Context) (string, error) { return "wf1", nil }
wf2 := func(ctx Context) (string, error) { return "wf2", nil }

s.T().Run("Should allow re-registering an unregistered workflow", func(t *testing.T) {
RegisterWorkflow(wf1)
require.Panics(s.T(), func() {
RegisterWorkflow(wf1)
}, "should not allow to register the same workflow twice")
UnRegisterWorkflow(wf1)
require.NotPanics(s.T(), func() {
RegisterWorkflow(wf1)
}, "should allow to register the workflow once it has been unregistered")
UnRegisterWorkflow(wf1)
})

s.T().Run("Should not mix-up two different workflows registration/unregistration", func(t *testing.T) {
RegisterWorkflow(wf1)
RegisterWorkflow(wf2)
UnRegisterWorkflow(wf2)
require.Panics(s.T(), func() {
RegisterWorkflow(wf1)
}, "un-registration of wf2 should not affect the registration of wf1")
UnRegisterWorkflow(wf1)
})

s.T().Run("Should allow un-registration of workflow registered with alias", func(t *testing.T) {
RegisterWorkflowWithOptions(wf2, RegisterWorkflowOptions{Name: "alias"})
UnRegisterWorkflow(wf2)
require.NotPanics(s.T(), func() {
RegisterWorkflowWithOptions(wf2, RegisterWorkflowOptions{Name: "alias"})
}, "workflow should not be registered after its un-registration")
UnRegisterWorkflow(wf2)
})

s.T().Run("Should allow un-registration by name of workflow registered with alias", func(t *testing.T) {
RegisterWorkflowWithOptions(wf2, RegisterWorkflowOptions{Name: "alias"})
UnRegisterWorkflow("alias")
require.NotPanics(s.T(), func() {
RegisterWorkflowWithOptions(wf2, RegisterWorkflowOptions{Name: "alias"})
}, "workflow should not be registered after its un-registration by alias")
UnRegisterWorkflow(wf2)
})

s.T().Run("Should allow selective un-registration by alias name of workflow registered twice with different aliases", func(t *testing.T) {
RegisterWorkflowWithOptions(wf2, RegisterWorkflowOptions{Name: "alias1"})
RegisterWorkflowWithOptions(wf2, RegisterWorkflowOptions{Name: "alias2"})
UnRegisterWorkflow("alias1")
require.NotPanics(s.T(), func() {
RegisterWorkflowWithOptions(wf2, RegisterWorkflowOptions{Name: "alias1"})
}, "workflow should not be registered after its un-registration by alias1")
require.Panics(s.T(), func() {
RegisterWorkflowWithOptions(wf2, RegisterWorkflowOptions{Name: "alias2"})
}, "workflow alias2 still be registered after the un-registration of alias1")
UnRegisterWorkflow("alias1")
UnRegisterWorkflow("alias2")
})
}

type activitiesCallingOptionsWorkflow struct {
t *testing.T
}
Expand Down
13 changes: 13 additions & 0 deletions internal/workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -232,6 +232,19 @@ func RegisterWorkflow(workflowFunc interface{}) {
RegisterWorkflowWithOptions(workflowFunc, RegisterWorkflowOptions{})
}

// UnRegisterWorkflow - un-registers a workflow from the framework.
// The supplied workflow can be a workflow function or a string.
// In case a string is passed, that should be the alias of the workflow.
// In case a function is passed, all the aliases referring to the workflow (if any) will be unregistered as well.
// This method calls panic if workflowFunc doesn't comply with the expected format.
func UnRegisterWorkflow(wf interface{}) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would you add comment in which situation this function is needed. Otherwise it creates false impression that it should be called on shutdown. My understanding that it is needed only for test scenarios, correct?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi Maxim,
you are right, this function is only for testing. If you have a better idea on how to expose this functionality I can move it around, otherwise I will just add a comment.
Please note that it would be nice if cadencefx unregisters all workflows OnStop, see T2502463

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see. So unregistering is not really what you want. You want ability to register workflows not globally, but to a specific Worker.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe #563 is what you want.

Copy link
Contributor Author

@algobardo algobardo Feb 25, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think #563 is more than I want. There are two problems that I want to solve, and this diff is only the first step to solve them, possibly the wrong one.

  • Whenever there are two tests in the same package that attempt to do fxtest.New(Module).Start() only the first can succeed, since the second one will fail to start due to the attempt to register the same workflows twice. The same also applies for activities registration. Cadencefx should unregister all workflows and activities OnStop. (This is T2502463)

  • When testing a workflow that calls a child workflow we need to:

  1. Register the child workflow (mandatory)
  2. Mock it with OnWorkflow (optional)
    In the case we only want to do 1. and run the actual workflow, the child workflow will necessarily use a mock of its downstream dependencies. The ability to unregister and re-register workflows should enable us to register the same workflow with different mocks of its dependencies in different tests.
    The same applies for activities as well.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought that mocks are linked to a specific instance of a TestWorkflowEnvironment and are not related to registrations.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't register workflows nor activities in init(), and I don't fully understand how that is supposed to work within an fx service. Our workflows and our activities are methods of types that are "provided" (read instantiated) by fx https://github.com/uber-go/fx/blob/master/app.go#L89, and as such they are not available at init time. Such types have their own dependencies that are other types instantiated by fx.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note that #563 also mentions the same pattern:

func New(repositories, gateways, etc) activity { .  <---- fx constructor and dependencies
    act := &activity{...}
    // do I activity.Register(act.run) here?  <------  registration not in init

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see. You kind of fight against the original Cadence design that workflows and activities are functions, not methods.
The original design was that fx service instantiates a background activity context which activity uses to get all the external dependencies from. And registration is global as all it does is to map a function name to a specific function implementation. And it is overridable through a TestWorkflowEnvironment for unit testing.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree that some users are OK calling activities by string name instead of function pointer and want to use method pointers to register them. But until #563 is implemented I would recommend to stick to the supported way of using background context to pass dependencies to activities.

thImpl := getHostEnvironment()
err := thImpl.UnRegisterWorkflow(wf)
if err != nil {
panic(err)
}
}

// RegisterWorkflowWithOptions registers the workflow function with options
// The user can use options to provide an external name for the workflow or leave it empty if no
// external name is required. This can be used as
Expand Down