В настоящее время я работаю над приложением фляги (Python 3.6) и хотел интегрировать сельдерей, потому что у меня есть несколько длительных фоновых задач.
Изменить: сельдерей 4.1
Интеграция прошла без проблем, задачи с сельдереем выполняются правильно, но я не могу получить доступ к текущему состоянию выполняющейся задачи.
Сельдерей, настройка колбы:
def make_celery(app):
celery = Celery(app.import_name,
backend=app.config["result_backend"],
broker=app.config["broker_url"])
celery.conf.update(app.config)
TaskBase = celery.Task
class ContextTask(TaskBase):
abstract = True
def __call__(self, *args, **kwargs):
with app.app_context():
return TaskBase.__call__(self, *args, **kwargs)
celery.Task = ContextTask
return celery
app = Flask(__name__)
app.config["broker_url"] = "redis://localhost:6379"
app.config["result_backend"] = "redis://localhost:6379"
app.config["DB"] = "../pyhodl.sqlite"
celery_app = make_celery(app)
Задача сельдерея:
@celery_app.task(bind=True, name="server.tasks.update_trading_pair")
def update_trading_pair(self, exchange, currency_a, currency_b):
print(exchange, currency_a, currency_b)
time.sleep(50)
Вызовите задачу и сохраните значение в словаре:
task_id = update_trading_pair.delay(exchange, currency_a, currency_b)
print("NEW TASK")
print(task_id)
id = exchange_mnemonic + "_" + currency_a + "_" + currency_b
TASK_STATES[id] = task_id
Получить состояние задачи:
result = update_trading_pair.AsyncResult(TASK_STATES[market.__id__()])
print(result.state)
print(result) # works but only prints the task_id
Это была ошибка. Когда я печатаю только объект результата, он просто печатает task_id. И если я пытаюсь получить текущее состояние, возникает следующее исключение:
TypeError: sequence item 1: expected a bytes-like object, AsyncResult found
как насчет print(result.status)
— person Martin Pichler schedule 06.03.2018
вызывает ту же ошибку — person Martin Pichler schedule 06.03.2018
Изучите этот вопрос stackoverflow .com / questions / 9034091 / Надеюсь, это поможет. — person Martin Pichler schedule 06.03.2018
Я уже прочитал этот вопрос. Моя проблема в том, что я даже не могу получить доступ к свойствам AsyncResult, потому что возникает ошибка. Я даже не могу вызвать result.ready (), result.get () или любой другой метод. Хотя, когда я проверяю тип результата с помощью type (result), он говорит, что это celery.result.AsyncResult. Ошибка говорит о том же найденном AsyncResult, поэтому исключение даже не имеет смысла. — person Martin Pichler schedule 06.03.2018
ПОЯСНЕНИЕ:
Когда вы называете свою задачу:
ваша переменная
task_id
является экземпляромAsyncResult
, а не строкой.Итак, ваша переменная
TASK_STATES[market.__id__()]
также является экземпляромAsyncResult
, тогда как она должна быть строкой.И затем вы пытаетесь создать экземпляр объекта
AsyncResult
с нимТаким образом, вы создаете экземпляр объекта
AsyncResult
с другим объектомAsyncResult
, тогда как он должен быть создан со строкой.Возможно, ваше замешательство вызвано вашим
print(task_id)
, который показывает вам строку, но когда вы это делаете, под капотом вызывается метод__str__
объектаAsyncResult
, и если вы посмотрите на него в исходном коде здесь,он просто печатает атрибут
id
вашегоtask_id
объекта.РЕШЕНИЕ:
Вы можете исправить это, выполнив
TASK_STATES[id] = task_id.id
или выполнивСпасибо! Это была точная проблема. Я думал, что где-то читал, что delay () возвращает только id. Загляните в документацию, чтобы решить проблему. docs.celeryproject.org/en/latest/reference/celery. app.task.html — person Martin Pichler; 06.03.2018