В настоящее время я работаю над приложением фляги (Python 3.6) и хотел интегрировать сельдерей, потому что у меня есть несколько длительных фоновых задач.
Изменить: сельдерей 4.1
Интеграция прошла без проблем, задачи с сельдереем выполняются правильно, но я не могу получить доступ к текущему состоянию выполняющейся задачи.
Сельдерей, настройка колбы:
def make_celery(app):
celery = Celery(app.import_name,
backend=app.config["result_backend"],
broker=app.config["broker_url"])
celery.conf.update(app.config)
TaskBase = celery.Task
class ContextTask(TaskBase):
abstract = True
def __call__(self, *args, **kwargs):
with app.app_context():
return TaskBase.__call__(self, *args, **kwargs)
celery.Task = ContextTask
return celery
app = Flask(__name__)
app.config["broker_url"] = "redis://localhost:6379"
app.config["result_backend"] = "redis://localhost:6379"
app.config["DB"] = "../pyhodl.sqlite"
celery_app = make_celery(app)
Задача сельдерея:
@celery_app.task(bind=True, name="server.tasks.update_trading_pair")
def update_trading_pair(self, exchange, currency_a, currency_b):
print(exchange, currency_a, currency_b)
time.sleep(50)
Вызовите задачу и сохраните значение в словаре:
task_id = update_trading_pair.delay(exchange, currency_a, currency_b)
print("NEW TASK")
print(task_id)
id = exchange_mnemonic + "_" + currency_a + "_" + currency_b
TASK_STATES[id] = task_id
Получить состояние задачи:
result = update_trading_pair.AsyncResult(TASK_STATES[market.__id__()])
print(result.state)
print(result) # works but only prints the task_id
Это была ошибка. Когда я печатаю только объект результата, он просто печатает task_id. И если я пытаюсь получить текущее состояние, возникает следующее исключение:
TypeError: sequence item 1: expected a bytes-like object, AsyncResult found
как насчет print(result.status) — person Martin Pichler schedule 06.03.2018
вызывает ту же ошибку — person Martin Pichler schedule 06.03.2018
Изучите этот вопрос stackoverflow .com / questions / 9034091 / Надеюсь, это поможет. — person Martin Pichler schedule 06.03.2018
Я уже прочитал этот вопрос. Моя проблема в том, что я даже не могу получить доступ к свойствам AsyncResult, потому что возникает ошибка. Я даже не могу вызвать result.ready (), result.get () или любой другой метод. Хотя, когда я проверяю тип результата с помощью type (result), он говорит, что это celery.result.AsyncResult. Ошибка говорит о том же найденном AsyncResult, поэтому исключение даже не имеет смысла. — person Martin Pichler schedule 06.03.2018
ПОЯСНЕНИЕ:
Когда вы называете свою задачу:
ваша переменная
task_idявляется экземпляромAsyncResult, а не строкой.Итак, ваша переменная
TASK_STATES[market.__id__()]также является экземпляромAsyncResult, тогда как она должна быть строкой.И затем вы пытаетесь создать экземпляр объекта
AsyncResultс нимТаким образом, вы создаете экземпляр объекта
AsyncResultс другим объектомAsyncResult, тогда как он должен быть создан со строкой.Возможно, ваше замешательство вызвано вашим
print(task_id), который показывает вам строку, но когда вы это делаете, под капотом вызывается метод__str__объектаAsyncResult, и если вы посмотрите на него в исходном коде здесь,он просто печатает атрибут
idвашегоtask_idобъекта.РЕШЕНИЕ:
Вы можете исправить это, выполнив
TASK_STATES[id] = task_id.idили выполнивСпасибо! Это была точная проблема. Я думал, что где-то читал, что delay () возвращает только id. Загляните в документацию, чтобы решить проблему. docs.celeryproject.org/en/latest/reference/celery. app.task.html — person Martin Pichler; 06.03.2018