Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support global-job and replicated-job modes in Docker Swarm #3016

Merged
merged 3 commits into from Aug 11, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
8 changes: 4 additions & 4 deletions docker/models/images.py
Expand Up @@ -224,10 +224,10 @@ def build(self, **kwargs):
Build an image and return it. Similar to the ``docker build``
command. Either ``path`` or ``fileobj`` must be set.

If you already have a tar file for the Docker build context (including a
Dockerfile), pass a readable file-like object to ``fileobj``
and also pass ``custom_context=True``. If the stream is also compressed,
set ``encoding`` to the correct value (e.g ``gzip``).
If you already have a tar file for the Docker build context (including
a Dockerfile), pass a readable file-like object to ``fileobj``
and also pass ``custom_context=True``. If the stream is also
compressed, set ``encoding`` to the correct value (e.g ``gzip``).

If you want to get the raw output of the build, use the
:py:meth:`~docker.api.build.BuildApiMixin.build` method in the
Expand Down
85 changes: 65 additions & 20 deletions docker/types/services.py
Expand Up @@ -29,6 +29,7 @@ class TaskTemplate(dict):
force_update (int): A counter that triggers an update even if no
relevant parameters have been changed.
"""

def __init__(self, container_spec, resources=None, restart_policy=None,
placement=None, log_driver=None, networks=None,
force_update=None):
Expand Down Expand Up @@ -115,6 +116,7 @@ class ContainerSpec(dict):
cap_drop (:py:class:`list`): A list of kernel capabilities to drop from
the default set for the container.
"""

def __init__(self, image, command=None, args=None, hostname=None, env=None,
workdir=None, user=None, labels=None, mounts=None,
stop_grace_period=None, secrets=None, tty=None, groups=None,
Expand Down Expand Up @@ -231,6 +233,7 @@ class Mount(dict):
tmpfs_size (int or string): The size for the tmpfs mount in bytes.
tmpfs_mode (int): The permission mode for the tmpfs mount.
"""

def __init__(self, target, source, type='volume', read_only=False,
consistency=None, propagation=None, no_copy=False,
labels=None, driver_config=None, tmpfs_size=None,
Expand Down Expand Up @@ -331,6 +334,7 @@ class Resources(dict):
``{ resource_name: resource_value }``. Alternatively, a list of
of resource specifications as defined by the Engine API.
"""

def __init__(self, cpu_limit=None, mem_limit=None, cpu_reservation=None,
mem_reservation=None, generic_resources=None):
limits = {}
Expand Down Expand Up @@ -401,6 +405,7 @@ class UpdateConfig(dict):
order (string): Specifies the order of operations when rolling out an
updated task. Either ``start-first`` or ``stop-first`` are accepted.
"""

def __init__(self, parallelism=0, delay=None, failure_action='continue',
monitor=None, max_failure_ratio=None, order=None):
self['Parallelism'] = parallelism
Expand Down Expand Up @@ -512,6 +517,7 @@ class DriverConfig(dict):
name (string): Name of the driver to use.
options (dict): Driver-specific options. Default: ``None``.
"""

def __init__(self, name, options=None):
self['Name'] = name
if options:
Expand All @@ -533,6 +539,7 @@ class EndpointSpec(dict):
is ``(target_port [, protocol [, publish_mode]])``.
Ports can only be provided if the ``vip`` resolution mode is used.
"""

def __init__(self, mode=None, ports=None):
if ports:
self['Ports'] = convert_service_ports(ports)
Expand Down Expand Up @@ -575,37 +582,70 @@ def convert_service_ports(ports):

class ServiceMode(dict):
"""
Indicate whether a service should be deployed as a replicated or global
service, and associated parameters
Indicate whether a service or a job should be deployed as a replicated
or global service, and associated parameters

Args:
mode (string): Can be either ``replicated`` or ``global``
mode (string): Can be either ``replicated``, ``global``,
``replicated-job`` or ``global-job``
replicas (int): Number of replicas. For replicated services only.
concurrency (int): Number of concurrent jobs. For replicated job
services only.
"""
def __init__(self, mode, replicas=None):
if mode not in ('replicated', 'global'):
raise errors.InvalidArgument(
'mode must be either "replicated" or "global"'
)
if mode != 'replicated' and replicas is not None:

def __init__(self, mode, replicas=None, concurrency=None):
replicated_modes = ('replicated', 'replicated-job')
supported_modes = replicated_modes + ('global', 'global-job')

if mode not in supported_modes:
raise errors.InvalidArgument(
'replicas can only be used for replicated mode'
'mode must be either "replicated", "global", "replicated-job"'
' or "global-job"'
)
self[mode] = {}

if mode not in replicated_modes:
if replicas is not None:
raise errors.InvalidArgument(
'replicas can only be used for "replicated" or'
' "replicated-job" mode'
)

if concurrency is not None:
raise errors.InvalidArgument(
'concurrency can only be used for "replicated-job" mode'
)

service_mode = self._convert_mode(mode)
self.mode = service_mode
self[service_mode] = {}

if replicas is not None:
self[mode]['Replicas'] = replicas
if mode == 'replicated':
self[service_mode]['Replicas'] = replicas

@property
def mode(self):
if 'global' in self:
return 'global'
return 'replicated'
if mode == 'replicated-job':
self[service_mode]['MaxConcurrent'] = concurrency or 1
self[service_mode]['TotalCompletions'] = replicas

@staticmethod
def _convert_mode(original_mode):
if original_mode == 'global-job':
return 'GlobalJob'

if original_mode == 'replicated-job':
return 'ReplicatedJob'

return original_mode

@property
def replicas(self):
if self.mode != 'replicated':
return None
return self['replicated'].get('Replicas')
if 'replicated' in self:
return self['replicated'].get('Replicas')

if 'ReplicatedJob' in self:
return self['ReplicatedJob'].get('TotalCompletions')

return None


class SecretReference(dict):
Expand Down Expand Up @@ -679,6 +719,7 @@ class Placement(dict):
platforms (:py:class:`list` of tuple): A list of platforms
expressed as ``(arch, os)`` tuples
"""

def __init__(self, constraints=None, preferences=None, platforms=None,
maxreplicas=None):
if constraints is not None:
Expand Down Expand Up @@ -711,6 +752,7 @@ class PlacementPreference(dict):
the scheduler will try to spread tasks evenly over groups of
nodes identified by this label.
"""

def __init__(self, strategy, descriptor):
if strategy != 'spread':
raise errors.InvalidArgument(
Expand All @@ -732,6 +774,7 @@ class DNSConfig(dict):
options (:py:class:`list`): A list of internal resolver variables
to be modified (e.g., ``debug``, ``ndots:3``, etc.).
"""

def __init__(self, nameservers=None, search=None, options=None):
self['Nameservers'] = nameservers
self['Search'] = search
Expand Down Expand Up @@ -762,6 +805,7 @@ class Privileges(dict):
selinux_type (string): SELinux type label
selinux_level (string): SELinux level label
"""

def __init__(self, credentialspec_file=None, credentialspec_registry=None,
selinux_disable=None, selinux_user=None, selinux_role=None,
selinux_type=None, selinux_level=None):
Expand Down Expand Up @@ -804,6 +848,7 @@ class NetworkAttachmentConfig(dict):
options (:py:class:`dict`): Driver attachment options for the
network target.
"""

def __init__(self, target, aliases=None, options=None):
self['Target'] = target
self['Aliases'] = aliases
Expand Down
2 changes: 1 addition & 1 deletion tests/helpers.py
Expand Up @@ -143,4 +143,4 @@ def ctrl_with(char):
if re.match('[a-z]', char):
return chr(ord(char) - ord('a') + 1).encode('ascii')
else:
raise(Exception('char must be [a-z]'))
raise Exception('char must be [a-z]')
33 changes: 33 additions & 0 deletions tests/integration/api_service_test.py
Expand Up @@ -626,6 +626,39 @@ def test_create_service_replicated_mode(self):
assert 'Replicated' in svc_info['Spec']['Mode']
assert svc_info['Spec']['Mode']['Replicated'] == {'Replicas': 5}

@requires_api_version('1.41')
def test_create_service_global_job_mode(self):
container_spec = docker.types.ContainerSpec(
TEST_IMG, ['echo', 'hello']
)
task_tmpl = docker.types.TaskTemplate(container_spec)
name = self.get_service_name()
svc_id = self.client.create_service(
task_tmpl, name=name, mode='global-job'
)
svc_info = self.client.inspect_service(svc_id)
assert 'Mode' in svc_info['Spec']
assert 'GlobalJob' in svc_info['Spec']['Mode']

@requires_api_version('1.41')
def test_create_service_replicated_job_mode(self):
container_spec = docker.types.ContainerSpec(
TEST_IMG, ['echo', 'hello']
)
task_tmpl = docker.types.TaskTemplate(container_spec)
name = self.get_service_name()
svc_id = self.client.create_service(
task_tmpl, name=name,
mode=docker.types.ServiceMode('replicated-job', 5)
)
svc_info = self.client.inspect_service(svc_id)
assert 'Mode' in svc_info['Spec']
assert 'ReplicatedJob' in svc_info['Spec']['Mode']
assert svc_info['Spec']['Mode']['ReplicatedJob'] == {
'MaxConcurrent': 1,
'TotalCompletions': 5
}

@requires_api_version('1.25')
def test_update_service_force_update(self):
container_spec = docker.types.ContainerSpec(
Expand Down
16 changes: 16 additions & 0 deletions tests/unit/dockertypes_test.py
Expand Up @@ -325,10 +325,26 @@ def test_global_simple(self):
assert mode.mode == 'global'
assert mode.replicas is None

def test_replicated_job_simple(self):
mode = ServiceMode('replicated-job')
assert mode == {'ReplicatedJob': {}}
assert mode.mode == 'ReplicatedJob'
assert mode.replicas is None

def test_global_job_simple(self):
mode = ServiceMode('global-job')
assert mode == {'GlobalJob': {}}
assert mode.mode == 'GlobalJob'
assert mode.replicas is None

def test_global_replicas_error(self):
with pytest.raises(InvalidArgument):
ServiceMode('global', 21)

def test_global_job_replicas_simple(self):
with pytest.raises(InvalidArgument):
ServiceMode('global-job', 21)

def test_replicated_replicas(self):
mode = ServiceMode('replicated', 21)
assert mode == {'replicated': {'Replicas': 21}}
Expand Down