MoinMoin Logo
  • Comments
  • Immutable Page
  • Menu
    • Navigation
    • RecentChanges
    • FindPage
    • Local Site Map
    • Help
    • HelpContents
    • HelpOnMoinWikiSyntax
    • Display
    • Attachments
    • Info
    • Raw Text
    • Print View
    • Edit
    • Load
    • Save
  • Login

Navigation

  • Start
  • Sitemap

Upload page content

You can upload content for the page named below. If you change the page name, you can also upload content for another page. If the page name is empty, we derive the page name from the file name.

File to load page content from
Page name
Comment

Revision 10 as of 2021-08-01 00:04:54
  • Python
  • Celery

Celery

Celery is an asynchronous task queue/job queue based on distributed message passing.

http://docs.celeryproject.org/en/latest/getting-started/first-steps-with-celery.html

Slackware 14 installation

  • easy_install celery
  • pip install celery

Check installation:

Toggle line numbers
   1 python
   2 Python 2.7.3 (default, Jul  3 2012, 19:58:39) 
   3 [GCC 4.7.1] on linux2
   4 Type "help", "copyright", "credits" or "license" for more information.
   5 >>> import celery
   6 >>> celery.__version__
   7 '3.1.7'
   8 >>>

Test app with Redis

Run redis:

Toggle line numbers
   1 $redis-server & 
   2 $redis-cli     
   3 redis 127.0.0.1:6379> quit

App:

Toggle line numbers
   1 # celeryTest.py
   2 from celery import Celery
   3 
   4 app = Celery('tasks', broker='redis://localhost')
   5 app.conf.update(CELERY_RESULT_BACKEND="redis://")
   6 
   7 @app.task
   8 def add(x, y):
   9     return x + y

Toggle line numbers
   1 $celery -A celeryTest worker --loglevel=info

Run task

Toggle line numbers
   1 $ python
   2 Python 2.7.3 (default, Jul  3 2012, 19:58:39) 
   3 [GCC 4.7.1] on linux2
   4 Type "help", "copyright", "credits" or "license" for more information.
   5 >>> from celeryTest import add
   6 >>> xx=add.delay(3,3)
   7 >>> xx.ready()
   8 True
   9 >>> print(xx.result)
  10 6

Docker compose example

app.py

Toggle line numbers
   1 import time
   2 import redis
   3 from flask import Flask,jsonify
   4 from celery import Celery
   5 
   6 celery = Celery()
   7 celery.config_from_object('celeryconfig')
   8 
   9 """
  10 docker-compose build
  11 docker-compose run
  12 docker exec -it celerytest_web_1 sh
  13 docker-compose buid
  14 docker-compose restart
  15 
  16 docker-compose stop
  17 docker system prune -a
  18 docker-compose up
  19 docker exec -it celerytest_web_1 sh
  20 pip freeze
  21 """
  22 
  23 app = Flask(__name__)
  24 cache = redis.Redis(host='redis', port=6379)
  25 
  26 def get_hit_count():
  27     retries = 5
  28     while True:
  29         try:
  30             return cache.incr('hits')
  31         except redis.exceptions.ConnectionError as exc:
  32             if retries == 0:
  33                 raise exc
  34             retries -= 1
  35             time.sleep(0.5)
  36 
  37 @app.route('/')
  38 def hello():
  39     count = get_hit_count()
  40     return 'Hello World!?= I have been seen {} times.\n'.format(count)
  41 
  42 @app.route('/add/<int:op1>/<int:op2>')
  43 def add(op1=None,op2=None):
  44     """
  45     http://localhost:5000/add/2/8
  46     """
  47     result=(celery.send_task('celery_worker.add', (op1,op2)))
  48     return jsonify( add_res=result.get())

Dockerfile-celery

FROM python:3.7-alpine
WORKDIR /code
RUN apk add --no-cache gcc musl-dev linux-headers
COPY requirements.txt requirements.txt
RUN pip install -r requirements.txt
EXPOSE 5000
COPY . .
CMD ["celery","-A","celery_worker","worker","--loglevel=info"]

Dockerfile

FROM python:3.7-alpine
WORKDIR /code
ENV FLASK_APP=app.py
ENV FLASK_RUN_HOST=0.0.0.0
RUN apk add --no-cache gcc musl-dev linux-headers
COPY requirements.txt requirements.txt
RUN pip install -r requirements.txt
EXPOSE 5000
COPY . .
CMD ["flask", "run"]

docker-compose.yml

Toggle line numbers
   1 version: "3.3"
   2 services:
   3   web:
   4     build: .
   5     ports:
   6       - "5000:5000"
   7     volumes:
   8       - .:/code
   9     environment:
  10       FLASK_ENV: development
  11   redis:
  12     image: "redis:alpine"
  13   celery:
  14     build:
  15       context: .
  16       dockerfile: Dockerfile-celery

celery_client.py

Toggle line numbers
   1 from celery import Celery
   2 
   3 def callback(taskid):
   4     print("callback taskid %s"%(taskid))
   5     print("callback ready %d task id %s"%(result.get() , result.task_id  ))
   6 
   7 celery = Celery()
   8 celery.config_from_object('celeryconfig')
   9 result=(celery.send_task('celery_worker.add', (2,2)))
  10 
  11 result.on_ready.then(callback) 
  12 print(result.get())

celery_worker.py

Toggle line numbers
   1 from celery import Celery
   2 
   3 app = Celery('tasks', broker='redis://redis')
   4 app.conf.update(CELERY_RESULT_BACKEND="redis://redis")
   5 
   6 @app.task
   7 def add(x, y):
   8     return x + y

requirements.txt

flask
redis
celery

celeryconfig.py

Toggle line numbers
   1 broker_url = 'redis://redis'
   2 result_backend = 'redis://redis'
   3 
   4 task_serializer = 'json'
   5 result_serializer = 'json'
   6 accept_content = ['json']
   7 timezone = 'Europe/Lisbon'
   8 enable_utc = True
  • MoinMoin Powered
  • Python Powered
  • GPL licensed
  • Valid HTML 4.01