Location via proxy:   [ UP ]  
[Report a bug]   [Manage cookies]                
Skip to main content

Distributed task queue for Python backed by Redis, a minimal Celery.

Project description

WakaQ

Distributed background task queue for Python backed by Redis, a super minimal Celery.

Features

  • Queue priority
  • Delayed tasks (run tasks after a timedelta eta)
  • Scheduled periodic tasks
  • Broadcast a task to all workers
  • Task soft and hard timeout limits
  • Optionally retry tasks on soft timeout
  • Combat memory leaks with max_mem_percent or max_tasks_per_worker
  • Super minimal

Want more features like rate limiting, task deduplication, etc? Too bad, feature PRs are not accepted. Maximal features belong in your app’s worker tasks.

Installing

pip install wakaq

Using

import logging
from datetime import timedelta
from wakaq import WakaQ, Queue, CronTask


wakaq = WakaQ(

    # List your queues and their priorities.
    # Queues can be defined as Queue instances, tuples, or just a str.
    queues=[
        (0, 'a-high-priority-queue'),
        (1, 'a-medium-priority-queue'),
        (2, 'a-low-priority-queue'),
        'default-lowest-priority-queue',
        Queue('another-queue', priority=3, default_max_retries=5),
    ],

    # Number of worker processes. Must be an int or str which evaluates to an
    # int. The variable "cores" is replaced with the number of processors on
    # the current machine.
    concurrency="cores*4",

    # Raise SoftTimeout in a task if it runs longer than 30 seconds.
    soft_timeout=30,  # seconds

    # SIGKILL a task if it runs longer than 1 minute.
    hard_timeout=timedelta(minutes=1),

    # If the task soft timeouts, retry up to 3 times. Max retries comes first
    # from the task decorator if set, next from the Queue's default_max_retries,
    # lastly from the option below. If No default_max_retries is found, the task
    # is not retried on a soft timeout.
    default_max_retries=3,

    # Combat memory leaks by reloading a worker (the one using the most RAM),
    # when the total machine RAM usage is at or greater than 98%.
    max_mem_percent=98,

    # Combat memory leaks by reloading a worker after it's processed 5000 tasks.
    max_tasks_per_worker=5000,

    # Schedule two tasks, the first runs every minute, the second once every ten minutes.
    # Scheduled tasks can be passed as CronTask instances or tuples.
    schedules=[

        # Runs mytask on the queue with priority 1.
        CronTask('* * * * *', 'mytask', queue='a-medium-priority-queue', args=[2, 2], kwargs={}),

        # Runs mytask once every 5 minutes.
        ('*/5 * * * *', 'mytask', [1, 1], {}),

        # Runs anothertask on the default lowest priority queue.
        ('*/10 * * * *', 'anothertask'),
    ],
)


@wakaq.task(queue='a-medium-priority-queue', max_retries=7)
def mytask(x, y):
    print(x + y)


@wakaq.task
def anothertask():
    print("hello world")


@wakaq.wrap_tasks_with
def custom_task_decorator(fn):
    def inner(*args, **kwargs):
        # do something before each task runs
        fn(*args, **kwargs)
        # do something after each task runs
    return inner


if __name__ == '__main__':
    # add 1 plus 1 on a worker somewhere, overwriting the task's queue from medium to high
    mytask.delay(1, 1, queue='a-high-priority-queue')
    # add 1 plus 1 on a worker somewhere, running on the default lowest priority queue
    anothertask.delay()

Deploying

Optimizing

When using in production, make sure to increase the max open ports allowed for your Redis server process.

Running as a Daemon

Here’s an example systemd config to run wakaq-worker as a daemon:

[Unit]
Description=WakaQ Worker Service

[Service]
WorkingDirectory=/opt/yourapp
ExecStart=/opt/yourapp/venv/bin/python /opt/yourapp/venv/bin/wakaq-worker --app=yourapp.wakaq
RemainAfterExit=no
Restart=always
RestartSec=30s
KillSignal=SIGQUIT
LimitNOFILE=99999

[Install]
WantedBy=multi-user.target

Create a file at /etc/systemd/system/wakaqworker.service with the above contents, then run:

systemctl daemon-reload && systemctl enable wakaqworker

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

WakaQ-1.0.0.tar.gz (14.9 kB view details)

Uploaded Source

File details

Details for the file WakaQ-1.0.0.tar.gz.

File metadata

  • Download URL: WakaQ-1.0.0.tar.gz
  • Upload date:
  • Size: 14.9 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/4.0.1 CPython/3.9.12

File hashes

Hashes for WakaQ-1.0.0.tar.gz
Algorithm Hash digest
SHA256 e226c984d1245bae8eec18fdd68fdfcd9c96c353827f6b7beba01088881fa9ff
MD5 891056c60cb53bf4c34d167944ddff6a
BLAKE2b-256 6b590c709f842f5839d8cdd96a1a1e54ebe1a5dc6f11f0e8e4ca6c97f5928f1c

See more details on using hashes here.

Provenance

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page