Skip to content

Commit

Permalink
[wip] schema definitions for CWL Workflow steps + CWL SubWorkflow and…
Browse files Browse the repository at this point in the history
… Multiple Input requirements
  • Loading branch information
fmigneault committed May 13, 2024
1 parent 173b7aa commit d4eadd2
Show file tree
Hide file tree
Showing 2 changed files with 188 additions and 8 deletions.
12 changes: 10 additions & 2 deletions weaver/processes/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,12 +94,14 @@ class OpenSearchField(Constants):
CWL_RequirementInplaceUpdateType = Literal["InplaceUpdateRequirement"]
CWL_RequirementLoadListingType = Literal["LoadListingRequirement"]
CWL_RequirementMPIType = Literal["MPIRequirement"]
CWL_RequirementMultipleInputFeatureType = Literal["MultipleInputFeatureRequirement"]
CWL_RequirementNetworkAccessType = Literal["NetworkAccess"]
CWL_RequirementProcessGeneratorType = Literal["ProcessGenerator"]
CWL_RequirementResourceType = Literal["ResourceRequirement"]
CWL_RequirementScatterFeatureType = Literal["ScatterFeatureRequirement"]
CWL_RequirementStepInputExpressionType = Literal["StepInputExpressionRequirement"]
CWL_RequirementSecretsType = Literal["cwltool:Secrets"]
CWL_RequirementStepInputExpressionType = Literal["StepInputExpressionRequirement"]
CWL_RequirementSubworkflowFeatureType = Literal["SubworkflowFeatureRequirement"]
CWL_RequirementToolTimeLimitType = Literal["ToolTimeLimit"]
CWL_RequirementWorkReuseType = Literal["WorkReuse"]

Expand Down Expand Up @@ -167,12 +169,14 @@ class OpenSearchField(Constants):
CWL_REQUIREMENT_INPLACE_UPDATE = get_args(CWL_RequirementInplaceUpdateType)[0]
CWL_REQUIREMENT_LOAD_LISTING = get_args(CWL_RequirementLoadListingType)[0]
CWL_REQUIREMENT_MPI = get_args(CWL_RequirementMPIType)[0] # no implication yet
CWL_REQUIREMENT_MULTIPLE_INPUT = get_args(CWL_RequirementMultipleInputFeatureType)[0]
CWL_REQUIREMENT_NETWORK_ACCESS = get_args(CWL_RequirementNetworkAccessType)[0]
CWL_REQUIREMENT_PROCESS_GENERATOR = get_args(CWL_RequirementProcessGeneratorType)[0]
CWL_REQUIREMENT_RESOURCE = get_args(CWL_RequirementResourceType)[0]
CWL_REQUIREMENT_SCATTER = get_args(CWL_RequirementScatterFeatureType)[0]
CWL_REQUIREMENT_STEP_INPUT_EXPRESSION = get_args(CWL_RequirementStepInputExpressionType)[0]
CWL_REQUIREMENT_SECRETS = get_args(CWL_RequirementSecretsType)[0]
CWL_REQUIREMENT_SUBWORKFLOW = get_args(CWL_RequirementSubworkflowFeatureType)[0]
CWL_REQUIREMENT_TIME_LIMIT = get_args(CWL_RequirementToolTimeLimitType)[0]
# default is to reuse, employed to explicitly disable
CWL_REQUIREMENT_WORK_REUSE = get_args(CWL_RequirementWorkReuseType)[0]
Expand All @@ -185,12 +189,14 @@ class OpenSearchField(Constants):
CWL_REQUIREMENT_INLINE_JAVASCRIPT,
CWL_REQUIREMENT_LOAD_LISTING,
# CWL_REQUIREMENT_MPI, # no implication yet
CWL_REQUIREMENT_MULTIPLE_INPUT,
CWL_REQUIREMENT_NETWORK_ACCESS,
# CWL_REQUIREMENT_PROCESS_GENERATOR, # explicitly unsupported, works against Weaver's behavior
CWL_REQUIREMENT_RESOURCE, # FIXME: perform pre-check on job submit? (https://github.com/crim-ca/weaver/issues/138)
CWL_REQUIREMENT_SCATTER,
CWL_REQUIREMENT_STEP_INPUT_EXPRESSION,
CWL_REQUIREMENT_SECRETS, # note: only allowed in 'hints' because of 'cwltool:' namespace
CWL_REQUIREMENT_SUBWORKFLOW,
CWL_REQUIREMENT_TIME_LIMIT,
CWL_REQUIREMENT_WORK_REUSE, # allow it, but makes sense only for Workflow steps if cwltool handles it by itself
])
Expand Down Expand Up @@ -297,11 +303,13 @@ class JobInputsOutputsSchema(Constants):
CWL_RequirementInplaceUpdateType,
CWL_RequirementLoadListingType,
CWL_RequirementMPIType,
CWL_RequirementMultipleInputFeatureType,
CWL_RequirementNetworkAccessType,
CWL_RequirementResourceType,
CWL_RequirementScatterFeatureType,
CWL_RequirementStepInputExpressionType,
CWL_RequirementSecretsType,
CWL_RequirementStepInputExpressionType,
CWL_RequirementSubworkflowFeatureType,
CWL_RequirementToolTimeLimitType,
CWL_RequirementWorkReuseType,
]
Expand Down
184 changes: 178 additions & 6 deletions weaver/wps_restapi/swagger_definitions.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,10 +46,12 @@
CWL_REQUIREMENT_INLINE_JAVASCRIPT,
CWL_REQUIREMENT_INPLACE_UPDATE,
CWL_REQUIREMENT_LOAD_LISTING,
CWL_REQUIREMENT_MULTIPLE_INPUT,
CWL_REQUIREMENT_NETWORK_ACCESS,
CWL_REQUIREMENT_RESOURCE,
CWL_REQUIREMENT_SCATTER,
CWL_REQUIREMENT_SECRETS,
CWL_REQUIREMENT_SUBWORKFLOW,
CWL_REQUIREMENT_STEP_INPUT_EXPRESSION,
CWL_REQUIREMENT_TIME_LIMIT,
CWL_REQUIREMENT_WORK_REUSE,
Expand Down Expand Up @@ -4448,6 +4450,21 @@ class IdentifierArray(ExtendedSequenceSchema):
item = AnyIdentifier()


class MultipleInputRequirementSpecification(StrictMappingSchema):
description = inspect.cleandoc(f"""
Indicates that a Workflow Step Input must support multiple 'source' simultaneously
(see also: {CWL_WORKFLOW_URL}#WorkflowStepInput).
""")


class MultipleInputRequirementMap(ExtendedMappingSchema):
req = MultipleInputRequirementSpecification(name=CWL_REQUIREMENT_MULTIPLE_INPUT)


class MultipleInputRequirementClass(MultipleInputRequirementSpecification):
_class = RequirementClass(example=CWL_REQUIREMENT_MULTIPLE_INPUT, validator=OneOf([CWL_REQUIREMENT_MULTIPLE_INPUT]))


class ScatterIdentifiersSchema(OneOfKeywordSchema):
title = "Scatter"
description = inspect.cleandoc("""
Expand Down Expand Up @@ -4541,6 +4558,18 @@ class StepInputExpressionRequirementClass(StepInputExpressionSpecification):
)


class SubworkflowRequirementSpecification(StrictMappingSchema):
description = "Indicates that a Workflow employs another Workflow as one of its steps."


class SubworkflowRequirementMap(ExtendedMappingSchema):
req = SubworkflowRequirementSpecification(name=CWL_REQUIREMENT_SUBWORKFLOW)


class SubworkflowRequirementClass(SubworkflowRequirementSpecification):
_class = RequirementClass(example=CWL_REQUIREMENT_SUBWORKFLOW, validator=OneOf([CWL_REQUIREMENT_SUBWORKFLOW]))


class TimeLimitValue(OneOfKeywordSchema):
_one_of = [
ExtendedSchemaNode(Float(), validator=Range(min=0.0)),
Expand Down Expand Up @@ -4697,10 +4726,12 @@ class CWLRequirementsMap(AnyOfKeywordSchema):
InlineJavascriptRequirementMap(missing=drop),
InplaceUpdateRequirementMap(missing=drop),
LoadListingRequirementMap(missing=drop),
MultipleInputRequirementMap(missing=drop),
NetworkAccessRequirementMap(missing=drop),
ResourceRequirementMap(missing=drop),
ScatterFeatureRequirementMap(missing=drop),
StepInputExpressionRequirementMap(missing=drop),
SubworkflowRequirementMap(missing=drop),
ToolTimeLimitRequirementMap(missing=drop),
WorkReuseRequirementMap(missing=drop),
UnknownRequirementMap(missing=drop), # allows anything, must be last
Expand All @@ -4718,10 +4749,12 @@ class CWLRequirementsItem(OneOfKeywordSchema):
InlineJavascriptRequirementClass(missing=drop),
InplaceUpdateRequirementClass(missing=drop),
LoadListingRequirementClass(missing=drop),
MultipleInputRequirementClass(missing=drop),
NetworkAccessRequirementClass(missing=drop),
ResourceRequirementClass(missing=drop),
ScatterFeatureRequirementClass(missing=drop),
StepInputExpressionRequirementClass(missing=drop),
SubworkflowRequirementClass(missing=drop),
ToolTimeLimitRequirementClass(missing=drop),
WorkReuseRequirementClass(missing=drop),
UnknownRequirementClass(missing=drop), # allows anything, must be last
Expand Down Expand Up @@ -5039,7 +5072,150 @@ class CWLScatterMethod(ExtendedSchemaNode):
validator = OneOf(["dotproduct", "nested_crossproduct", "flat_crossproduct"])


class CWLApp(PermissiveMappingSchema):
class CWLScatterDefinition(PermissiveMappingSchema):
scatter = CWLScatter(missing=drop, description=(
"One or more input identifier of an application step within a Workflow were an array-based input to that "
"Workflow should be scattered across multiple instances of the step application."
))
scatterMethod = CWLScatterMethod(missing=drop)


class CWLWorkflowStepRunDefinition(AnyOfKeywordSchema):
# note:
# Nested definitions are possible in pure CWL, but Weaver will support only references to another process.
# To support references, URL are allowed as well.
_any_of = [
AnyIdentifier(
title="CWLWorkflowStepRunID",
description="Reference to local process ID, with or without '.cwl' extension."
),
ProcessURL(),
]


class CWLWorkflowStepInputSourceValue(ExtendedSchemaNode):
description = "Specifies the workflow parameter that will provide input to the underlying step parameter."
schema_type = String()


class CWLWorkflowStepInputSourceList(ExtendedSequenceSchema):
source = CWLWorkflowStepInputSourceValue()
validator = Length(min=1)


class CWLWorkflowStepInputSource(OneOfKeywordSchema):
_one_of = [
CWLWorkflowStepInputSourceValue(),
CWLWorkflowStepInputSourceList(),
]


class LinkMergeMethod(ExtendedSchemaNode):
schema_type = String
title = "LinkMergeMethod"
validator = OneOf(["merge_nested", "merge_flattened"])


class PickValueMethod(ExtendedSchemaNode):
schema_type = String
title = "PickValueMethod"
validator = OneOf(["first_non_null", "the_only_non_null", "all_non_null"])


class CWLWorkflowStepInputObject(PermissiveSequenceSchema):
id = AnyIdentifier(description="Identifier of the step input.", missing=drop)
source = CWLWorkflowStepInputSource(missing=drop)
linkMerge = LinkMergeMethod(missing=drop)
pickValue = PickValueMethod(missing=drop)
loadListing = LoadListingEnum(missing=drop)
valueFrom = CWLExpression(missing=drop)


class CWLWorkflowStepInputItem(CWLWorkflowStepInputObject):
id = AnyIdentifier(description="Identifier of the step input.")


class CWLWorkflowStepInput(OneOfKeywordSchema):
_one_of = [
CWLWorkflowStepInputSourceValue(),
CWLWorkflowStepInputObject()
]


class CWLWorkflowStepInputMap(ExtendedMappingSchema):
step_in = CWLWorkflowStepInput(variable="{step-in}", title="CWLWorkflowStepInput")


class CWLWorkflowStepInputList(ExtendedSequenceSchema):
step_in = CWLWorkflowStepInputItem()
validator = Length(min=1)


class CWLWorkflowStepInputDefinition(OneOfKeywordSchema):
description = "Defines the input parameters of the workflow step."
_one_of = [
CWLWorkflowStepInputMap(),
CWLWorkflowStepInputList(),
]


class CWLWorkflowStepOutputID(ExtendedSchemaNode):
schema_type = String()
description = "Identifier of the tool's output to use as the step output."


class CWLWorkflowStepOutputObject(PermissiveMappingSchema):
id = CWLWorkflowStepOutputID()


class CWLWorkflowStepOutputItem(OneOfKeywordSchema):
_one_of = [
CWLWorkflowStepOutputID(),
CWLWorkflowStepOutputObject(),
]


class CWLWorkflowStepOutputList(ExtendedSequenceSchema):
out = CWLWorkflowStepOutputItem()
validator = Length(min=1)


class CWLWorkflowStepOutputDefinition(OneOfKeywordSchema):
description = "Defines the parameters representing the output of the process."
_one_of = [
CWLWorkflowStepOutputList(), # only list allowed
]


class CWLWorkflowStepObject(PermissiveMappingSchema, CWLScatterDefinition):
id = AnyIdentifier(description="Identifier of the step.", missing=drop)
_in = CWLWorkflowStepInputDefinition()
out = CWLWorkflowStepOutputDefinition()
run = CWLWorkflowStepRunDefinition()
requirements = CWLRequirements(missing=drop)
hints = CWLHints(missing=drop)


class CWLWorkflowStepItem(CWLWorkflowStepObject):
id = AnyIdentifier(description="Identifier of the step.")


class CWLWorkflowStepsMap(ExtendedMappingSchema):
step_id = CWLWorkflowStepObject(variable="{step-id}", title="CWLWorkflowStep")


class CWLWorkflowStepsList(ExtendedSequenceSchema):
step = CWLWorkflowStepItem()


class CWLWorkflowStepsDefinition(OneOfKeywordSchema):
_one_of = [
CWLWorkflowStepsMap(),
CWLWorkflowStepsList(),
]


class CWLApp(PermissiveMappingSchema, CWLScatterDefinition):
_class = CWLClass()
id = CWLIdentifier(missing=drop) # can be omitted only if within a process deployment that also includes it
intent = CWLIntent(missing=drop)
Expand All @@ -5051,11 +5227,7 @@ class CWLApp(PermissiveMappingSchema):
arguments = CWLArguments(description="Base arguments passed to the command.", missing=drop)
inputs = CWLInputsDefinition(description="All inputs available to the Application Package.")
outputs = CWLOutputsDefinition(description="All outputs produced by the Application Package.")
scatter = CWLScatter(missing=drop, description=(
"One or more input identifier of an application step within a Workflow were an array-based input to that "
"Workflow should be scattered across multiple instances of the step application."
))
scatterMethod = CWLScatterMethod(missing=drop)
steps = CWLWorkflowStepsDefinition(missing=drop)


class CWL(CWLBase, CWLApp):
Expand Down

0 comments on commit d4eadd2

Please sign in to comment.