Chcę wdrożyć rozwiązanie, w którym wszystkie zadania z ETA są zapisywane w mojej bazie danych, zamiast być planowane na brokerze. Robię to z powodu ograniczeń pamięciowych moich pracowników i charakteru obciążenia pracą. Seler planuje zadania ETA w pamięci pracowników.

Chcę to osiągnąć za pomocą czegoś takiego jak poniżej

@before_task_publish.connect
def handle_tasks_with_eta(body, headers, **kwargs):
    # If the task is getting scheduled for future
    if headers.get("eta"):
        # Store the task in database with ETA, args, kwargs and sender
        # Prevent the task from getting scheduled <------ THIS IS WHERE I NEED HELP
    else:
        pass

Zgłoszenie niestandardowego wyjątku nie pomogło. Jest łapany na - https:// github.com/celery/celery/blob/bef4c1642586b89ed86ef61b5824cd7cfbd9aa55/celery/utils/dispatch/signal.py#L289

Z góry dziękuję.

0
Abhishek 16 grudzień 2019, 15:36
Czy zastąpienie metody apply_async nie byłoby bardziej odpowiednie do tego rodzaju pracy?
 – 
JRajan
16 grudzień 2019, 15:44
- Masz na myśli małpę łatącą Task.apply_async i sprawdzającą eta w kwargs?
 – 
Abhishek
16 grudzień 2019, 16:05
1
Tak. Coś na tych liniach. Albo załataj metodę Task, albo utwórz klasę CustomTask, która dziedziczy po klasie Task selera i załataj ją do modułu selera.
 – 
JRajan
16 grudzień 2019, 16:08
- Tak, rozważam jako prawdopodobne rozwiązanie. Tylko nie jestem pewien, czy łatanie wewnętrznych interfejsów API, takich jak apply_async, jest dobrym pomysłem. Ale tak, to jest jedno z rozwiązań, na które patrzę.
 – 
Abhishek
16 grudzień 2019, 16:12
Chcę zrobić to samo. Gdzie skończyłeś?
 – 
chrislondon
5 czerwiec 2020, 21:16

3 odpowiedzi

Myślę, że łatwiej byłoby podejść do problemu nieco inaczej. Pracowałem nad podobnym problemem i zdecydowałem się na dwie kolejki, jedną do segregowania zadań, a drugą do ich wykonywania. Następnie użyłem jednego pracownika z opcją -c1 do obsługi kolejki segregacyjnej, a drugiego do wykonania prawdziwej pracy.

Jeśli potrzebujesz mieć możliwość prześledzenia wyników z powrotem do żądania, możesz określić task_id podczas wykonywania zadania segregacji.

@app.task
def triage(args, kwargs, eta=None, task_id=None):
    if eta:
        # store in database
    else:
        other_task.apply_async(args=args, kwargs=kwargs, task_id=task_id)

@app.task
def other_task(a1, a2, kw1=None):
    # do stuff
2
Jonah 17 grudzień 2019, 09:54
1
To zdecydowanie ciekawe podejście.
 – 
Abhishek
17 grudzień 2019, 10:28

Do mojej implementacji użyłem haka before_task_publish, który zrobiłeś w pierwotnym pytaniu. Unieważniłem zadanie, jeśli zapiszę je w bazie danych. W ten sposób mogę użyć natywnej funkcji apply_async.

from datetime import datetime, timedelta

from celery.signals import before_task_publish
from django.utils import timezone


@before_task_publish.connect
def task_sent_handler(sender=None, headers=None, body=None, **kwargs):
    info = headers if "task" in headers else body
    if headers.get("eta"):
        eta = datetime.strptime(headers["eta"], '%Y-%m-%dT%H:%M:%S.%f%z')
        if eta - timezone.now() > timedelta(minutes=10):
            # Any tasks that are delayed over 10 minutes will be stored in the database until
            # it's time for them to run.
            from .models import DelayTask
            DelayTask.objects.create(
                task=headers["task"],
                eta=eta,
                argsrepr=headers["argsrepr"],
                kwargsrepr=headers["kwargsrepr"],
            )

            from app.celery import app
            app.control.revoke(info["id"])

Następnie mam PeriodicTask, który uruchamia się co 10 minut i wysyła wszystkie zadania, które muszą zostać uruchomione w ciągu następnych 10 minut.

@shared_task
def reschedule_tasks():
    tasks = DelayTask.objects.filter(eta__lte=timezone.now() + timedelta(minutes=10))

    for task in tasks:
        # Grab all the tasks that are going to run in the next 10 minutes and send them to celery.
        task.send()
        task.delete()

Oto mój model DelayTask

import uuid
from ast import literal_eval

from celery.execute import send_task
from django.db import models

from app.shared.models import TimestampMixin


class DelayTask(TimestampMixin):
    id = models.UUIDField(primary_key=True, default=uuid.uuid4, editable=False)
    task = models.CharField(max_length=255)
    argsrepr = models.TextField()
    kwargsrepr = models.TextField()
    eta = models.DateTimeField(db_index=True)

    def send(self):
        send_task(
            self.task,
            args=literal_eval(self.argsrepr),
            kwargs=literal_eval(self.kwargsrepr),
            eta=self.eta,
        )
2
chrislondon 8 czerwiec 2020, 22:18
To jest naprawdę fajne. :)
 – 
Abhishek
9 czerwiec 2020, 06:23

Skończyło się na stworzeniu wewnętrznej implementacji do planowania zadań w trwałym magazynie danych. Stworzyłem klasę, która implementowałaby metody create_or_update_schedule i send_task_to_queue. Moja implementacja TaskScheduleHelper wyglądała mniej więcej tak, jak poniżej.

from celery.app.task import Task
class TaskScheduleHelper(object):

    @staticmethod
    def create_or_update_schedule(*args, **kwargs):
        """
        Create or update an ETA schedule in database in an idempotent way based on key property of TaskSchedule model.
        One should not be calling this method directly, but should use celery APIs to call this method. See Example
        below. eta and key are required keyword arguments.

        :param args: Arguments populated by @task decorator of the form <tuple>(task handler, arguments for the task
            handler)
        :param kwargs: Keyword arguments which must contain key and eta arguments. Any additional keyword
            arguments are passed as keyword arguments to the task handler
        :return: Not Applicable
        """

        # Ensure that required keyword arguments are not None
        required_kwargs = ['eta', 'key']
        for kwarg in required_kwargs:
            if kwargs.get(kwarg) is None:
                raise ValueError(f'Expected {kwarg} but got None')

        # Ensure that first argument from args list should be of type Task
        if not isinstance(args[0], Task):
            raise ValueError(f'Expected first argument to be of type Task, got {type(args[0])} instead.')

        eta = kwargs.pop('eta')
        key = kwargs.pop('key')
        task_name = args[0].name
        try:
            task_args = args[1]
        except IndexError:
            task_args = None

        # Insert or update task schedule entry in the database
        TaskSchedule.objects.update_or_create(key=key, defaults={
            'eta': eta,
            'status': TaskScheduleStatus.SCHEDULED,
            'data': {
                'handler': task_name,
                'argsrepr': task_args,
                'kwargsrepr': kwargs
            }
        })
    @staticmethod
    def send_task_to_queue(task_schedule_object):
        """
        Takes a TaskSchedule object and queues in celery for execution immediately
        :param task_schedule_object: object of type TaskSchedule which contains handler and args,
        :return: Not Applicable
        """

        # ETA of task_schedule_object should be less than current time
        assert (task_schedule_object.eta <= timezone.now())

        # status of task_schedule_object should be SCHEDULED
        assert (task_schedule_object.status == TaskScheduleStatus.SCHEDULED)

        # Fetch the task by name from app.tasks
        try:
            _task = app.tasks[task_schedule_object.handler]
        except KeyError:
            # Create an error text which can be logged using logger.error and also stored
            # in TaskSchedule object's data field for further debugging and reporting
            error_text = f'{task_schedule_object.handler} not found in app.tasks'
            logger.error("send_task_to_queue:handler_not_found", handler=task_schedule_object.handler)

            # Update the TaskSchedule object with status = error and data with execution time
            # and error_text
            TaskSchedule.objects.filter(
                key=task_schedule_object.key
            ).update(
                status=TaskScheduleStatus.ERROR,
                data=RawSQL('''jsonb_set(jsonb_set(data, '{error}', %s)::jsonb,
                            '{execution_time}', %s)''', (json.dumps(error_text), json.dumps(timezone.now().__str__())))
            )
            return

        async_task_result = _task.apply_async(task_schedule_object.argsrepr, task_schedule_object.kwargsrepr)

        # Update the task schedule with task ID so that we can query logs for this
        # particular task's execution
        # TODO (Abhishek): Set the status to executed only when CELERY_TASK_ACKS_LATE is disabled
        TaskSchedule.objects.filter(
            key=task_schedule_object.key
        ).update(
            status=TaskScheduleStatus.EXECUTED,
            data=RawSQL(
                '''jsonb_set(jsonb_set(data, '{task_id}', %s)::jsonb, '{execution_time}', %s)''',
                (json.dumps(async_task_result.task_id), json.dumps(timezone.now().__str__()))
            )
        )
        logger.info(f'send_task_to_queue:successfully_executed', task_key=task_schedule_object.key,
                    task_id=async_task_result.task_id)

        return async_task_result

Następnie dodałem create_or_update_schedule w celery.app.task, abym mógł go wywołać metodą zadania.

from celery.app.task import Task
Task.po_schedule = TaskScheduleHelper.create_or_update_schedule

Teraz mogę zaplanować zadanie, dzwoniąc pod numer po_schedule. W bazie danych zostanie utworzony obiekt odpowiadający unikalnemu kluczowi tego zadania a_unique_key.

@app.task
def divide(dividend, divisor):
    return dividend/divisor

divide.po_schedule(
    (20, 5),
    key='a_unique_key',
    eta=timezone.now() + timedelta(hours=1)
)

Mam zadanie cron, które okresowo odpytuje tabelę bazy danych TaskSchedule dla eta i wywołuje metodę send_task_to_queue TaskScheduleHelper.

task_schedule_objects = TaskSchedule.objects.filter(status=TaskScheduleStatus.SCHEDULED,
                                                            eta__lte=timezone.now())
for task_schedule_object in task_schedule_objects:
    TaskScheduleHelper.send_task_to_queue(task_schedule_object)
1
Abhishek 7 czerwiec 2020, 18:28