试着简化了Celery官方教程以便更容易地进行测试

简要
概述
总结
概括

https://docs.celeryproject.org/en/stable/django/first-steps-with-django.html

以上のチュートリアルに情報を補足させた記事です。

ソースコードもGithubに置いておきます。

Django和Celery的集成

    シンプルなDockerコンテナを用意します。
FROM python:3.8

RUN apt-get update
RUN apt-get install -y vim less tmux
RUN pip install --upgrade pip
EXPOSE 8000
version: '3'
services:
  python:
    build: .
    container_name: 'python3'
    working_dir: '/usr/local/src'
    tty: true
    ports:
      - "8000:8000"
    volumes:
      - .:/usr/local/src
  redis:
    image: "redis:latest"
    ports:
      - "6379:6379"
    volumes:
      - "./data/redis:/data"
    Djangoプロジェクトを用意します。ブローカーはタスクのIDがuuidだったりしてsqlite3ではきつい可能性があります。
$ pip install django==3.2
$ pip install Redis psycopg2
$ django-admin startproject proj .
    proj/celery.py
import os

from celery import Celery


# 'celery' プログラムのためのデフォルトの Django 設定モジュールを設定します。
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'proj.settings')

app = Celery('proj')

# settings.pyにCelory関連の設定を書くときにCELERY_*という接頭辞をつけることができる
app.config_from_object('django.conf:settings', namespace='CELERY')

# Djangoに登録された全ての タスクモジュールをロードします。
app.autodiscover_tasks()

# bind=Trueは現在のタスクインスタンスを簡単に取得でいる
# このタスク自体はリクエスト情報を出力する
@app.task(bind=True)
def debug_task(self):
    print(f'Request: {self.request!r}')
    proj/__init__.py
# django起動時にアプリが読み込まれ
# @shared_taskデコレータを利用できるようになる
from .celery import app as celery_app

__all__ = ('celery_app',)

创建Django应用程序

$ python manage.py startapp demoapp
    demoapp/models.py
from django.db import models

class Widget(models.Model):
    name = models.CharField(max_length=140)
    proj/settings.py
INSTALLED_APPS = [
	...
	
    'demoapp.apps.DemoappConfig'
]
    demoapp/tasks.py
from demoapp.models import Widget

from celery import shared_task

# チュートリアルから2つモデルが絡んでいるタスクを持ってきました。
# @shared_taskデコレータをつけると具体的なCeleryインスタンスがなくてもタスクを定義できる
@shared_task
def count_widgets():
    return Widget.objects.count()

@shared_task
def rename_widget(widget_id, name):
    w = Widget.objects.get(id=widget_id)
    w.name = name
    w.save()

结果后端

    django-celery-results拡張機能でDjangoo ORM、DjangoCacheフレームワークを選択。
$ pip install django-celery-results
    proj/settings.py
INSTALLED_APPS = (
	...
	'django_celery_results',
)

CELERY_RESULT_BACKEND = 'django-db'

# DjangoCacheにするには
CELERY_RESULT_BACKEND = 'django-cache'とする
CELERY_CACHE_BACKEND = 'default'
CACHES = {
    'default': {
        'BACKEND': 'django.core.cache.backends.db.DatabaseCache',
        'LOCATION': 'my_cache_table',
    }
}
    Celeryのデータベーステーブルを作ります。さらに追加したWidgetモデルもデータベースに反映させます
$ python manage.py migrate django_celery_results
$ python manage.py makemigrations
$ python manage.py maigrate demoapp

工作进程的启动

$ celery -A proj worker -l INFO
$ pyython manage.py migrate
$ python manage.py createsuperuser
$ python manage.py runserver 0.0.0.0:8000
    • adminに行くとTask Results、Group resultsというテーブルが作られています。

 

    • あとはViewからタスクを実行するとよさそうか

 

    チュートリアルはここで終了しているがViewとテンプレートも定義してみます。

创建页面

    今回CreateViewの実装は割愛するのでshellでWidgetデータを2つほど作っておきます。
$ python manage.py shell
>>> from demoapp.models import Widget
>>> w = Widget(name="WidgetA")
>>> w.save()

>>> w = Widget(name="WidgetB")
>>> w.save()
    proj/urls.py
urlpatterns = [
	...
	path('demoapp/', inclcue('demoapp.urls'))
]
    demoapp/urls.py
appname="demoapp"
urlpatterns = [
    path('widget_count/', WidgetListView.as_view()),    
    path('widget_rename/<int:pk>/', WidgetRenameView.as_view()),
    path('tasks/<str:task_id>/', WidgetTaskView.as_view())
]
    deomapp/templates/demoapp/rename.html
<html>
<body>
	<form method="post">
	<p>Widgetの名前変更</p>
	{{ form.as_table }}
	{% csrf_token %}
	<button type="submit">送信</button>
	</form>
</body>
</html>
    demoapp/forms.py
class WidgetForm(forms.ModelForm):
    class Meta:
        model = Widget
        fields = ['name']
    demoapp/views.py
from django.views import View
from django.shortcuts import render
from django.http import HttpResponse
from celery.result import AsyncResult

from demoapp.models import Widget
from demoapp.forms import WidgetForm
from proj.tasks import count_widgets, rename_widget


class WidgetListView(View):
    """
    Widgetの総数を計算するタスクを呼び出す
    """
    def get(self, request, *args, **kwargs):
        # タスクの呼び出し
        result = count_widgets.delay()
        response = HttpResponse()
        response.write('<a href="/demoapp/tasks/{}/">タスクの結果</a>'.format(result.task_id))
        return response


class WidgetRenameView(View):
    """
    Widgetをリネームするタスクを呼び出す
    """
    def get(self, request, pk, *args, **kwargs):
        instance = Widget.objects.get(pk=pk)
        form = WidgetForm(instance=instance)
        return render(request, 'demoapp/rename.html', {'form': form})

    def post(self, request, pk, *args, **kwargs):
        name = request.POST['name']
        # タスクの呼び出し
        result = rename_widget.delay(pk, name)
        response = HttpResponse()
        response.write('<a href="/demoapp/tasks/{}">タスクの結果</a>'.format(result.task_id))
        return response


class WidgetTaskView(View):
    """
    タスクIDからタスクの状態を追跡する
    """
    def get(self, request, task_id, *args, **kwargs):
        task = AsyncResult(task_id)
        return HttpResponse(
                '<ul>' +
                f'<li>id = {task.id} </li>' +
                f'<li>status = {task.status} </li>' +
                f'<li>result = {task.result} </li>'+
                '</ul>'
        )

我最初以为可以通过django-celery-results.models.TaskResults模型从数据库中获取结果,但是似乎在任务完成之前会出现DoesNotExist的错误。
因此,需要通过celery.result.AsyncResult来获取任务的状态。

如果你想获得关于任务的更多信息,你可以在状态为PENDNG的时候从AsyncResult获取数据,直到状态变为SUCCESS,然后可以使用相同的task_id来使用django-celery-results的TaskResult。

    また、django-celery-resultsを使っているので、管理画面からタスクの状態と結果を見ることができます。(/admin/django_celery_results/taskresult/)

请参照以下内容在中文中进行本地化转述,只需提供一种选择:

    • https://houdoukyokucho.com/2022/01/18/post-3785/

 

    https://github.com/celery/django-celery-results/blob/master/django_celery_results/models.py
广告
将在 10 秒后关闭
bannerAds