-
-
Notifications
You must be signed in to change notification settings - Fork 19
/
tasks.py
114 lines (94 loc) · 3.64 KB
/
tasks.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
from __future__ import annotations
import importlib
from django.apps import apps
from django.core.files.storage import Storage
from django.db import transaction
from PIL import Image
from pictures import conf
from pictures.models import PictureFieldFile
def _process_picture(field_file: PictureFieldFile) -> None:
# field_file.file may already be closed and can't be reopened.
# Therefore, we always open it from storage.
with field_file.storage.open(field_file.name) as fs:
with Image.open(fs) as img:
for ratio, sources in field_file.aspect_ratios.items():
for file_type, srcset in sources.items():
for width, picture in srcset.items():
picture.save(img)
process_picture = _process_picture
def construct_storage(
storage_cls: str, storage_args: tuple, storage_kwargs: dict
) -> Storage:
storage_module, storage_class = storage_cls.rsplit(".", 1)
storage_cls = getattr(importlib.import_module(storage_module), storage_class)
return storage_cls(*storage_args, **storage_kwargs)
def process_picture_async(
app_name: str, model_name: str, field_name: str, file_name: str, storage_construct
) -> None:
model = apps.get_model(f"{app_name}.{model_name}")
field = model._meta.get_field(field_name)
storage = construct_storage(*storage_construct)
with storage.open(file_name) as file:
with Image.open(file) as img:
for ratio, sources in PictureFieldFile.get_picture_files(
file_name=file_name,
img_width=img.width,
img_height=img.height,
storage=storage,
field=field,
).items():
for file_type, srcset in sources.items():
for width, picture in srcset.items():
picture.save(img)
try:
import dramatiq
except ImportError:
pass
else:
@dramatiq.actor(queue_name=conf.get_settings().QUEUE_NAME)
def process_picture_with_dramatiq(
app_name, model_name, field_name, file_name, storage_construct
) -> None:
process_picture_async(
app_name, model_name, field_name, file_name, storage_construct
)
def process_picture(field_file: PictureFieldFile) -> None: # noqa: F811
opts = field_file.instance._meta
transaction.on_commit(
lambda: process_picture_with_dramatiq.send(
app_name=opts.app_label,
model_name=opts.model_name,
field_name=field_file.field.name,
file_name=field_file.name,
storage_construct=field_file.storage.deconstruct(),
)
)
try:
from celery import shared_task
except ImportError:
pass
else:
@shared_task(
ignore_results=True,
retry_backoff=True,
)
def process_picture_with_celery(
app_name, model_name, field_name, file_name, storage_construct
) -> None:
process_picture_async(
app_name, model_name, field_name, file_name, storage_construct
)
def process_picture(field_file: PictureFieldFile) -> None: # noqa: F811
opts = field_file.instance._meta
transaction.on_commit(
lambda: process_picture_with_celery.apply_async(
kwargs=dict(
app_name=opts.app_label,
model_name=opts.model_name,
field_name=field_file.field.name,
file_name=field_file.name,
storage_construct=field_file.storage.deconstruct(),
),
queue=conf.get_settings().QUEUE_NAME,
)
)