Celery: Büyük İşlemlerin Asenkron İşletilmesi (Django)

Son yazımda arkadaşımın akademik çalışmasından bahsetmiştim. Bu yazının içeriği de Python’da Django Framework üzerinde Celery kullanarak, büyük işlemlerin arka planda asenkron olarak işletilmesi üzerine olacak.

Asenkron işlemlere neden ihtiyaç duyduğumu belirtmek için projemden kısaca bahsedeyim. Projenin amacı özetle; sembolik regresyon problemlerinin, kullanıcı dostu bir arayüzle evrimsel hesaplama algoritmaları kullanılarak web uygulaması üzerinden çözülebilmesini sağlamak. Amaçtan da anlaşıldığı gibi bir web uygulamasında saatler sürecek işlemlerin gerçekleştirilmesi gerekiyor. Bu işlemlerin, web uygulamasındaki gezintiyi etkilememesi için işlemlerin arkaplanda asenkron olarak çalıştırılması gerekiyor.

Web uygulamasının implementasyonunda Django altyapısını ve asenkron işlemlerin çalıştırılmasında ise bu yazının baş kahramanı olan Celery‘i kullandım.

Celery

Kısaca Celery, bir veya birden çok işin arkaplanda, asenkron olarak çalıştırılmasını sağlayan Python ile yazılmış bir pakettir. Tabii bu tanım, benim işime yarayan tanım, daha detaylı tanım ve yetenekleri için projenin sitesine bakılabilir.

yine çok detaya girmeden çalışma mekanizmasından bahsedeyim. Django ile başlatılan bir işlem “Broker” adı verilen bir aracıyla “Worker” ‘lara gönderilir. “Worker”, işlemi işletir ve işlemin çıktısını veritabanına kaydeder. Aşağıdaki görselde bahsettiğim birimlerin işleyişi görülebilir.

Django – Broker – Workers – Database İlişkisi
Broker; RabbitMQ

Tercih edilebilecek birden çok message broker bulunuyordu ben RabbitMQ ‘yu tercih ettim. O yüzden öncelikle işletim sistemime (archlinux) aşağıdaki komutla RabbitMQ kurdum.

pacman -S rabbitmq

Ardından Celery ‘nin dokümantasyonunda yer alan “Setting Up RabbitMQ” başlığındaki yönergeleri izleyip, Celery ile ilişkilendirmek üzere aşağıdaki formata uygun bir link oluşturdum [1]. Bu linki settings.py dosyasına aşağıdaki gibi ekledim.

#settings.py
BROKER_URL = 'amqp://rabbitcboz:***@localhost:5672/rabbithost'

Celery’nin RabbitMQ servisini kullanabilmesi için rabbitmq servisinin başlatılması gerekiyor;

sudo systemctl start rabbitmq

Broker kurulumundan sonra Celery kurulumu için proje web sitesinden “Introduction to Celery” başlıklı sayfa incelenebilir [3]. Kısaca, kurulum için gerekli komut:

pip install Celery

Django ile Entegrasyon

Django ile kolay entegrasyonu işlerimi çok rahatlattı diyebilirim. Bazı yerlerde takıldığım oldu onlardan da yazının ilerleyen kısımlarında bahsediyor olacağım.

Celery projesinin web sitesinde yer alan “First steps with Django” başlıklı sayfadaki yönergeleri takip ederek entegrasyonu kolayca gerçekleştirdim [2]. Kaynakta her şey açık ve net, o yüzden detaylı buraya yazmak Türkçe çeviriden başka bir şey olmayacağı için sadece kısa cümlelerle neler yaptığımı not etmek istiyorum.

Kaynakta bahsedilen dizin yapısına uygun olarak celery.py dosyasını oluşturup __init__.py dosyasına gerekli satırları ekledim. Django uygulama dizinine işlemlerin tanımlanacağı tasks.py adlı bir dosya oluşturdum ve çalışması uzun sürecek olan fonksiyonları shared_task olarak tanımladım. Buraya kadar her şey gayet kaynakta açık ve implementasyonu kolay o yüzden detaylı yazmıyorum.

Örnek üzerinden devam edebilme adına tasks.py içeriğini buraya alayım;

# tasks.py
from __future__ import absolute_import, unicode_literals
from celery import shared_task
from api.models import Run

@shared_task
def add(self, x, y):
    run = Run(...)
    return x + y

Buraya kadar verdiğim kaynak iyi idare etti. Bu adımlarla birlikte Celery’nin Django’ya entegrasyonu tamamlandı. RabbitMQ kuruldu. Bir tane toplama işlemi yapan asenkron çalıştırılmak üzere çağrılmaya hazır bir fonksiyon da var o halde geriye sadece bu fonksiyonu çağırmak ve çıktısını kaydetmek kalıyor.

İşlem Çağırma

Ben, butona basıldığında çalıştırma ihtiyacı duyduğum için views.py içinden toplama fonksiyonunu çağırdım. Çağırmak için gerekli satırlar; [3]

# views.py
from tasks import add
add.delay(4, 4)

Fonksiyonların çalıştırılabilmesi için en az bir worker çalıştırılması gerekir bunun için de proje ana dizinindeyken, aşağıdaki komut çalıştırılır[4];

celery -A proje-adi worker -l info

Bu haliyle fonksiyon çağrıldığı zaman çalışacaktır.

İşlem Çıktısının Alınması

Heralde en zorlandığım kısım diyebilirim. Önce saf Celery kullanarak yapmaya çalıştım. Veritabanı olarak PostgreSQL kullanıyorum projede ve işlem çıktılarının veritabanına kaydedilmesini istiyordum. Uzun süre uğraştım ta ki django-celery-results eklentisini farkedene kadar oysaki yukarıda verdiğim kaynakların birinin son kısmında yer alıyormuş [2]. Kaynağın işime yarayan kısmını okuduğumu zannedip sayfayı kapattığımdan böyle bir eklentinin varlığından çok sonradan haberdar oldum.

Bu eklentiyi kurdum ve başka bir şey yapmadım o her şeyi arkaplanda hallediyor diyebilirim. TaskResult adında bir model oluşturuyor. Kurulumda migrate işleminden sonra django_celery_results_taskresult adında veritabanında bir tablo oluşturuyor. Çıktılar bu tabloda tutuluyor. Bu çıktılarla ilgili bir modele sahip olduğum için istediğim diğer modellerin herhangi biriyle ilişki de kurabiliyorum.

Kurulum da detaylı biçimde kaynakta [5] yer aldığından yazıyı uzatmamak adına değinmiyorum. Kurulum yerine örnek üzerinden kullanımından biraz bahsedeyim. Her bir işlem çalıştığında veritabanına kayıt işleminin gerçekleştirilmesi için fonksiyonların dönüş değerinin olması gerekir. Yukarıdaki toplama işleminde olduğu gibi. return x + y satırında iki sayının toplama işleminin sonucu veritabanına otomatik olarak kaydedilecektir. Bunun dışında başka bir modelle ilişkilendirmek için yapılması gerekenler var. Bunlardan bahsedeyim;

Ben projemde her bir çalıştırmanın bir işlemi ve bir çıktısı olacağı için one-to-one ilişki modelini kullandım. İlişkilendireceğim modelin adı Run, diğeri yukarıda bahsettiğim TaskResult. Önce Run modelinde task adında bir alan tanımladım ve TaskResult ile ilişkiyi aşağıdaki ek satırlarla kurdum.

# models.py
from django_celery_results.models import TaskResult
class Run(models.Model):
    ...
    task = models.ForeignKey(TaskResult, on_delete=models.DO_NOTHING)

Ardından tasks.py ‘yi aşağıdaki gibi güncelledim.

# tasks.py
from __future__ import absolute_import, unicode_literals
from celery import shared_task
from api.models import Run
from django_celery_results.models import TaskResult

@shared_task
def add(self, x, y):
    celery_task = TaskResult.objects.get(task_id=self.request.id)
    run = Run(...,
              task = celery_task)
    run.save()
    return x + y

Jinja ‘dan bir işlemin çıktısına  aşağıdaki şekilde erişilebilir;

<td> {% if run.task.status == 'SUCCESS' %} {{run.task.result.average_cost|scientific}} {% endif %} </td>

TaskResult modeli ile ilgili detaylı bilgiye [6]’dan erişilebilir.

Sonuç

Aşağıdaki gif ‘te görüldüğü gibi Run butonuna tıkladığımda çaplı bir işlem başlatılıyor ve uygulamada herhangi bir bekletme, aksama vb. problemler görülmüyor. İşlem arkaplana atılıyor ve kullanıcıya işlemin başladığına dair bilgi gösteriliyor.

Not: Başlatılan bir işlemin sonlandırılmasını da ilerleyen günlerde bu yazıyı güncelleyerek ekleyeceğim.

Kaynaklar

1. Setting Up RabbitMQ, http://docs.celeryproject.org/en/latest/getting-started/brokers/rabbitmq.html#id4

2. First steps with Django, http://docs.celeryproject.org/en/latest/django/first-steps-with-django.html

3. First steps with Celery, http://docs.celeryproject.org/en/latest/getting-started/first-steps-with-celery.html

4. Workers Guide, http://docs.celeryproject.org/en/latest/userguide/workers.html

5. django-celery-results Using the Django ORM/Cache as a result backend, http://docs.celeryproject.org/en/latest/django/first-steps-with-django.html#django-celery-results-using-the-django-orm-cache-as-a-result-backend

6. https://django-celery-results.readthedocs.io/en/latest/reference/django_celery_results.models.html

Bir cevap yazın

E-posta hesabınız yayımlanmayacak. Gerekli alanlar * ile işaretlenmişlerdir