arku

pypi license

Job queues and RPC in python with asyncio and redis.

arku, pronounced as ar·kyu, was conceived as a simple, modern and performant successor to rq.

Why use arku?

non-blocking

arku is built using python 3’s asyncio allowing non-blocking job enqueuing and execution. Multiple jobs (potentially hundreds) can be run simultaneously using a pool of asyncio Tasks.

powerful-features

Deferred execution, easy retrying of jobs, and pessimistic execution (see below) means arku is great for critical jobs that must be completed.

fast

Asyncio and no forking make arku around 7x faster than rq for short jobs with no io. With io that might increase to around 40x faster. (TODO)

elegant

I’m a long time contributor to and user of rq, arku is designed to be simpler, clearer and more powerful.

small

and easy to reason with - currently arku is only about 700 lines, that won’t change significantly.

Install

Just:

python -m pip install arku

Redesigned to be less elegant?

The approach used in arku v0.16 of enqueueing jobs by name rather than “just calling a function” and knowing it will be called on the worker (as used in arku <= v0.15, rq, celery et al.) might seem less elegant, but it’s for good reason.

This approach means your frontend (calling the worker) doesn’t need access to the worker code, meaning better code separation and possibly smaller images etc.

Usage

Warning

Jobs may be called more than once!

arku v0.16 has what I’m calling “pessimistic execution”: jobs aren’t removed from the queue until they’ve either succeeded or failed. If the worker shuts down, the job will be cancelled immediately and will remain in the queue to be run again when the worker starts up again (or run by another worker which is still running).

(This differs from other similar libraries like arku <= v0.15, rq, celery et al. where jobs generally don’t get rerun when a worker shuts down. This in turn requires complex logic to try and let jobs finish before shutting down (I wrote the HerokuWorker for rq), however this never really works unless either: all jobs take less than 6 seconds or your worker never shuts down when a job is running (impossible).)

All arku jobs should therefore be designed to cope with being called repeatedly if they’re cancelled, eg. use database transactions, idempotency keys or redis to mark when an API request or similar has succeeded to avoid making it twice.

In summary: sometimes exactly once can be hard or impossible, arku favours multiple times over zero times.

Simple Usage

import asyncio

from aiohttp import ClientSession
from arku import create_pool
from arku.connections import RedisSettings


async def download_content(ctx, url):
    session: ClientSession = ctx['session']
    async with session.get(url) as response:
        content = await response.text()
        print(f'{url}: {content:.80}...')
    return len(content)


async def startup(ctx):
    ctx['session'] = ClientSession()


async def shutdown(ctx):
    await ctx['session'].close()


async def main():
    redis = await create_pool(RedisSettings())
    for url in ('https://facebook.com', 'https://microsoft.com', 'https://github.com'):
        await redis.enqueue_job('download_content', url)


# WorkerSettings defines the settings to use when creating the work,
# it's used by the arku cli
class WorkerSettings:
    functions = [download_content]
    on_startup = startup
    on_shutdown = shutdown


if __name__ == '__main__':
    asyncio.run(main())

(This script is complete, it should run “as is” both to enqueue jobs and run them)

To enqueue the jobs, simply run the script:

python demo.py

To execute the jobs, either after running demo.py or before/during:

arku demo.WorkerSettings

Append --burst to stop the worker once all jobs have finished. See arku.worker.Worker for more available properties of WorkerSettings.

You can also watch for changes and reload the worker when the source changes:

arku demo.WorkerSettings --watch path/to/src

This requires watchgod to be installed (pip install watchgod).

For details on the arku CLI:

arku --help

Startup & Shutdown coroutines

The on_startup and on_shutdown coroutines are provided as a convenient way to run logic as the worker starts and finishes, see arku.worker.Worker.

For example, in the above example session is created once when the work starts up and is then used in subsequent jobs.

Deferring Jobs

By default, when a job is enqueued it will run as soon as possible (provided a worker is running). However you can schedule jobs to run in the future, either by a given duration (_defer_by) or at a particular time _defer_until, see arku.connections.ArkuRedis.enqueue_job().

import asyncio
from datetime import datetime, timedelta

from arku import create_pool
from arku.connections import RedisSettings


async def the_task(ctx):
    print('this is the tasks, delay since enqueueing:', datetime.now() - ctx['enqueue_time'])


async def main():
    redis = await create_pool(RedisSettings())

    # deferred by 10 seconds
    await redis.enqueue_job('the_task', _defer_by=10)

    # deferred by 1 minute
    await redis.enqueue_job('the_task', _defer_by=timedelta(minutes=1))

    # deferred until jan 28th 2032, you'll be waiting a long time for this...
    await redis.enqueue_job('the_task', _defer_until=datetime(2032, 1, 28))


class WorkerSettings:
    functions = [the_task]


if __name__ == '__main__':
    asyncio.run(main())

Job Uniqueness

Sometimes you want a job to only be run once at a time (eg. a backup) or once for a given parameter (eg. generating invoices for a particular company).

arku supports this via custom job ids, see arku.connections.ArkuRedis.enqueue_job(). It guarantees that a job with a particular ID cannot be enqueued again until its execution has finished.

import asyncio

from arku import create_pool
from arku.connections import RedisSettings


async def the_task(ctx):
    print('running the task with id', ctx['job_id'])


async def main():
    redis = await create_pool(RedisSettings())

    # no id, random id will be generated
    job1 = await redis.enqueue_job('the_task')
    print(job1)
    """
    >  <arku job 99edfef86ccf4145b2f64ee160fa3297>
    """

    # random id again, again the job will be enqueued and a job will be returned
    job2 = await redis.enqueue_job('the_task')
    print(job2)
    """
    >  <arku job 7d2163c056e54b62a4d8404921094f05>
    """

    # custom job id, job will be enqueued
    job3 = await redis.enqueue_job('the_task', _job_id='foobar')
    print(job3)
    """
    >  <arku job foobar>
    """

    # same custom job id, job will not be enqueued and enqueue_job will return None
    job4 = await redis.enqueue_job('the_task', _job_id='foobar')
    print(job4)
    """
    >  None
    """


class WorkerSettings:
    functions = [the_task]


if __name__ == '__main__':
    asyncio.run(main())

The check of job_id uniqueness in the queue is performed using a redis transaction so you can be certain jobs with the same id won’t be enqueued twice (or overwritten) even if they’re enqueued at exactly the same time.

Job Results

You can access job information, status and job results using the arku.jobs.Job instance returned from arku.connections.ArkuRedis.enqueue_job().

import asyncio

from arku import create_pool
from arku.connections import RedisSettings
# requires `pip install devtools`, used for pretty printing of job info
from devtools import debug


async def the_task(ctx):
    print('running the task')
    return 42


async def main():
    redis = await create_pool(RedisSettings())

    job = await redis.enqueue_job('the_task')

    # get the job's id
    print(job.job_id)
    """
    >  68362958a244465b9be909db4b7b5ab4 (or whatever)
    """

    # get information about the job, will include results if the job has finished, but
    # doesn't await the job's result
    debug(await job.info())
    """
    >   docs/examples/job_results.py:23 main
    JobDef(
        function='the_task',
        args=(),
        kwargs={},
        job_try=None,
        enqueue_time=datetime.datetime(2019, 4, 23, 13, 58, 56, 781000),
        score=1556027936781
    ) (JobDef)
    """

    # get the Job's status
    print(await job.status())
    """
    >  JobStatus.queued
    """

    # poll redis for the job result, if the job raised an exception,
    # it will be raised here
    # (You'll need the worker running at the same time to get a result here)
    print(await job.result(timeout=5))
    """
    >  42
    """


class WorkerSettings:
    functions = [the_task]


if __name__ == '__main__':
    asyncio.run(main())

Retrying jobs and cancellation

As described above, when an arku work shuts down any going jobs are cancelled immediately (via vanilla task.cancel(), so a CancelledError will be raised). You can see this by running a slow job (eg. add await asyncio.sleep(5)) and hitting Ctrl+C once it’s started.

You’ll get something like.

➤  arku slow_job.WorkerSettings
12:42:38: Starting worker for 1 functions: the_task
12:42:38: redis_version=4.0.9 mem_usage=904.50K clients_connected=4 db_keys=3
12:42:38:  10.23s → c3dd4acc171541b9ac10b1d791750cde:the_task() delayed=10.23s
^C12:42:40: shutdown on SIGINT ◆ 0 jobs complete ◆ 0 failed ◆ 0 retries ◆ 1 ongoing to cancel
12:42:40:   1.16s ↻ c3dd4acc171541b9ac10b1d791750cde:the_task cancelled, will be run again



➤  arku slow_job.WorkerSettings
12:42:50: Starting worker for 1 functions: the_task
12:42:50: redis_version=4.0.9 mem_usage=904.61K clients_connected=4 db_keys=4
12:42:50:  21.78s → c3dd4acc171541b9ac10b1d791750cde:the_task() try=2 delayed=21.78s
12:42:55:   5.00s ← c3dd4acc171541b9ac10b1d791750cde:the_task ●
^C12:42:57: shutdown on SIGINT ◆ 1 jobs complete ◆ 0 failed ◆ 0 retries ◆ 0 ongoing to cancel

You can also retry jobs by raising the arku.worker.Retry exception from within a job, optionally with a duration to defer rerunning the jobs by:

import asyncio

from aiohttp import ClientSession
from arku import create_pool, Retry
from arku.connections import RedisSettings


async def download_content(ctx, url):
    session: ClientSession = ctx['session']
    async with session.get(url) as response:
        if response.status != 200:
            # retry the job with increasing back-off
            # delays will be 5s, 10s, 15s, 20s
            # after max_tries (default 5) the job will permanently fail
            raise Retry(defer=ctx['job_try'] * 5)
        content = await response.text()
    return len(content)


async def startup(ctx):
    ctx['session'] = ClientSession()


async def shutdown(ctx):
    await ctx['session'].close()


async def main():
    redis = await create_pool(RedisSettings())
    await redis.enqueue_job('download_content', 'https://httpbin.org/status/503')


class WorkerSettings:
    functions = [download_content]
    on_startup = startup
    on_shutdown = shutdown


if __name__ == '__main__':
    asyncio.run(main())

To abort a job, call arku.job.Job.abort(). (Note for the arku.job.Job.abort() method to have any effect, you need to set allow_abort_jobs to True on the worker, this is for performance reason. allow_abort_jobs=True may become the default in future)

arku.job.Job.abort() will abort a job if it’s already running or prevent it being run if it’s currently in the queue.

import asyncio

from arku import create_pool
from arku.connections import RedisSettings


async def do_stuff(ctx):
    print('doing stuff...')
    await asyncio.sleep(10)
    return 'stuff done'


async def main():
    redis = await create_pool(RedisSettings())
    job = await redis.enqueue_job('do_stuff')
    await asyncio.sleep(1)
    await job.abort()


class WorkerSettings:
    functions = [do_stuff]
    allow_abort_jobs = True


if __name__ == '__main__':
    asyncio.run(main())

Health checks

arku will automatically record some info about its current state in redis every health_check_interval seconds. That key/value will expire after health_check_interval + 1 seconds so you can be sure if the variable exists arku is alive and kicking (technically you can be sure it was alive and kicking health_check_interval seconds ago).

You can run a health check with the CLI (assuming you’re using the first example above):

arku --check demo.WorkerSettings

The command will output the value of the health check if found; then exit 0 if the key was found and 1 if it was not.

A health check value takes the following form:

Mar-01 17:41:22 j_complete=0 j_failed=0 j_retried=0 j_ongoing=0 queued=0

Where the items have the following meaning:

  • j_complete the number of jobs completed

  • j_failed the number of jobs which have failed eg. raised an exception

  • j_ongoing the number of jobs currently being performed

  • j_retried the number of jobs retries run

Cron Jobs

Functions can be scheduled to be run periodically at specific times. See arku.cron.cron().

from arku import cron


async def run_regularly(ctx):
    print('run foo job at 9.12am, 12.12pm and 6.12pm')


class WorkerSettings:
    cron_jobs = [
        cron(run_regularly, hour={9, 12, 18}, minute=12)
    ]

Usage roughly shadows cron except None is equivalent on * in crontab. As per the example sets can be used to run at multiple of the given unit.

Note that second defaults to 0 so you don’t in inadvertently run jobs every second and microsecond defaults to 123456 so you don’t inadvertently run jobs every microsecond and so arku avoids enqueuing jobs at the top of a second when the world is generally slightly busier.

Synchronous Jobs

Functions that can block the loop for extended periods should be run in an executor like concurrent.futures.ThreadPoolExecutor or concurrent.futures.ProcessPoolExecutor using loop.run_in_executor as shown below.

import time
import functools
import asyncio
from concurrent import futures


def sync_task(t):
    return time.sleep(t)


async def the_task(ctx, t):
    blocking = functools.partial(sync_task, t)
    loop = asyncio.get_running_loop()
    await loop.run_in_executor(ctx['pool'], blocking)


async def startup(ctx):
    ctx['pool'] = futures.ProcessPoolExecutor()


class WorkerSettings:
    functions = [the_task]
    on_startup = startup

Custom job serializers

By default, arku will use the built-in pickle module to serialize and deserialize jobs. If you wish to use an alternative serialization methods, you can do so by specifying them when creating the connection pool and the worker settings. A serializer function takes a Python object and returns a binary representation encoded in a bytes object. A deserializer function, on the other hand, creates Python objects out of a bytes sequence.

Warning

It is essential that the serialization functions used by arku.connections.create_pool() and arku.worker.Worker are the same, otherwise jobs created by the former cannot be executed by the latter. This also applies when you update your serialization functions: you need to ensure that your new functions are backward compatible with the old jobs, or that there are no jobs with the older serialization scheme in the queue.

Here is an example with MsgPack, an efficient binary serialization format that may enable significant memory improvements over pickle:

import asyncio

import msgpack  # installable with "pip install msgpack"

from arku import create_pool
from arku.connections import RedisSettings


async def the_task(ctx):
    return 42


async def main():
    redis = await create_pool(
        RedisSettings(),
        job_serializer=msgpack.packb,
        job_deserializer=lambda b: msgpack.unpackb(b, raw=False),
    )
    await redis.enqueue_job('the_task')


class WorkerSettings:
    functions = [the_task]
    job_serializer = msgpack.packb
    # refer to MsgPack's documentation as to why raw=False is required
    job_deserializer = lambda b: msgpack.unpackb(b, raw=False)


if __name__ == '__main__':
    asyncio.run(main())

Reference

class arku.connections.SSLContext[source]

Required to avoid problems with

class arku.connections.RedisSettings(host: Union[str, List[Tuple[str, int]]] = 'localhost', port: int = 6379, database: int = 0, password: Optional[str] = None, ssl: Union[bool, None, arku.connections.SSLContext] = None, conn_timeout: int = 1, conn_retries: int = 5, conn_retry_delay: int = 1, sentinel: bool = False, sentinel_master: str = 'mymaster')[source]

No-Op class used to hold redis connection redis_settings.

Used by arku.connections.create_pool() and arku.worker.Worker.

class arku.connections.ArkuRedis(pool_or_conn: Optional[aioredis.connection.ConnectionPool] = None, job_serializer: Optional[Callable[[Dict[str, Any]], bytes]] = None, job_deserializer: Optional[Callable[[bytes], Dict[str, Any]]] = None, default_queue_name: str = 'arku:queue', **kwargs)[source]

Thin subclass of aioredis.Redis which adds arku.connections.enqueue_job().

Parameters
  • redis_settings – an instance of arku.connections.RedisSettings.

  • job_serializer – a function that serializes Python objects to bytes, defaults to pickle.dumps

  • job_deserializer – a function that deserializes bytes into Python objects, defaults to pickle.loads

  • default_queue_name – the default queue name to use, defaults to arku.queue.

  • kwargs – keyword arguments directly passed to aioredis.Redis.

all_job_results() → List[arku.jobs.JobResult][source]

Get results for all jobs in redis.

enqueue_job(function: str, *args, _job_id: Optional[str] = None, _queue_name: Optional[str] = None, _defer_until: Optional[datetime.datetime] = None, _defer_by: Union[None, int, float, datetime.timedelta] = None, _expires: Union[None, int, float, datetime.timedelta] = None, _job_try: Optional[int] = None, **kwargs) → Optional[arku.jobs.Job][source]

Enqueue a job.

Parameters
  • function – Name of the function to call

  • args – args to pass to the function

  • _job_id – ID of the job, can be used to enforce job uniqueness

  • _queue_name – queue of the job, can be used to create job in different queue

  • _defer_until – datetime at which to run the job

  • _defer_by – duration to wait before running the job

  • _expires – if the job still hasn’t started after this duration, do not run it

  • _job_try – useful when re-enqueueing jobs within a job

  • kwargs – any keyword arguments to pass to the function

Returns

arku.jobs.Job instance or None if a job with this ID already exists

queued_jobs(*, queue_name: str = 'arku:queue') → List[arku.jobs.JobDef][source]

Get information about queued, mostly useful when testing.

arku.connections.create_pool(settings_: arku.connections.RedisSettings = None, *, retry: int = 0, job_serializer: Optional[Callable[[Dict[str, Any]], bytes]] = None, job_deserializer: Optional[Callable[[bytes], Dict[str, Any]]] = None, default_queue_name: str = 'arku:queue') → arku.connections.ArkuRedis[source]

Create a new redis pool, retrying up to conn_retries times if the connection fails.

Similar to aioredis.create_redis_pool except it returns a arku.connections.ArkuRedis instance, thus allowing job enqueuing.

arku.worker.func(coroutine: Union[str, arku.worker.Function, WorkerCoroutine], *, name: Optional[str] = None, keep_result: Optional[SecondsTimedelta] = None, timeout: Optional[SecondsTimedelta] = None, keep_result_forever: Optional[bool] = None, max_tries: Optional[int] = None) → arku.worker.Function[source]

Wrapper for a job function which lets you configure more settings.

Parameters
  • coroutine – coroutine function to call, can be a string to import

  • name – name for function, if None, coroutine.__qualname__ is used

  • keep_result – duration to keep the result for, if 0 the result is not kept

  • keep_result_forever – whether to keep results forever, if None use Worker default, wins over keep_result

  • timeout – maximum time the job should take

  • max_tries – maximum number of tries allowed for the function, use 1 to prevent retrying

exception arku.worker.Retry(defer: Optional[SecondsTimedelta] = None)[source]

Special exception to retry the job (if max_retries hasn’t been reached).

Parameters

defer – duration to wait before rerunning the job

class arku.worker.Worker(functions: Sequence[Union[arku.worker.Function, WorkerCoroutine]] = (), *, queue_name: Optional[str] = 'arku:queue', cron_jobs: Optional[Sequence[arku.cron.CronJob]] = None, redis_settings: arku.connections.RedisSettings = None, redis_pool: arku.connections.ArkuRedis = None, burst: bool = False, on_startup: Optional[StartupShutdown] = None, on_shutdown: Optional[StartupShutdown] = None, handle_signals: bool = True, max_jobs: int = 10, job_timeout: SecondsTimedelta = 300, keep_result: SecondsTimedelta = 3600, keep_result_forever: bool = False, poll_delay: SecondsTimedelta = 0.5, queue_read_limit: Optional[int] = None, max_tries: int = 5, health_check_interval: SecondsTimedelta = 3600, health_check_key: Optional[str] = None, ctx: Optional[Dict[Any, Any]] = None, retry_jobs: bool = True, allow_abort_jobs: bool = False, max_burst_jobs: int = -1, job_serializer: Optional[Callable[[Dict[str, Any]], bytes]] = None, job_deserializer: Optional[Callable[[bytes], Dict[str, Any]]] = None)[source]

Main class for running jobs.

Parameters
  • functions – list of functions to register, can either be raw coroutine functions or the result of arku.worker.func().

  • queue_name – queue name to get jobs from

  • cron_jobs – list of cron jobs to run, use arku.cron.cron() to create them

  • redis_settings – settings for creating a redis connection

  • redis_pool – existing redis pool, generally None

  • burst – whether to stop the worker once all jobs have been run

  • on_startup – coroutine function to run at startup

  • on_shutdown – coroutine function to run at shutdown

  • handle_signals – default true, register signal handlers, set to false when running inside other async framework

  • max_jobs – maximum number of jobs to run at a time

  • job_timeout – default job timeout (max run time)

  • keep_result – default duration to keep job results for

  • keep_result_forever – whether to keep results forever

  • poll_delay – duration between polling the queue for new jobs

  • queue_read_limit – the maximum number of jobs to pull from the queue each time it’s polled; by default it equals max_jobs

  • max_tries – default maximum number of times to retry a job

  • health_check_interval – how often to set the health check key

  • health_check_key – redis key under which health check is set

  • ctx – dictionary to hold extra user defined state

  • retry_jobs – whether to retry jobs on Retry or CancelledError or not

  • allow_abort_jobs – whether to abort jobs on a call to arku.jobs.Job.abort()

  • max_burst_jobs – the maximum number of jobs to process in burst mode (disabled with negative values)

  • job_serializer – a function that serializes Python objects to bytes, defaults to pickle.dumps

  • job_deserializer – a function that deserializes bytes into Python objects, defaults to pickle.loads

run() → None[source]

Sync function to run the worker, finally closes worker connections.

async_run() → None[source]

Asynchronously run the worker, does not close connections. Useful when testing.

run_check(retry_jobs: Optional[bool] = None, max_burst_jobs: Optional[int] = None) → int[source]

Run arku.worker.Worker.async_run(), check for failed jobs and raise arku.worker.FailedJobs if any jobs have failed.

Returns

number of completed jobs

start_jobs(job_ids: List[str]) → None[source]

For each job id, get the job definition, check it’s not running and start it in a task

arku.cron.cron(coroutine: Union[str, arku.typing.WorkerCoroutine], *, name: Optional[str] = None, month: Union[None, Set[int], int] = None, day: Union[None, Set[int], int] = None, weekday: Union[None, Set[int], int, Literal[mon, tues, wed, thurs, fri, sat, sun]] = None, hour: Union[None, Set[int], int] = None, minute: Union[None, Set[int], int] = None, second: Union[None, Set[int], int] = 0, microsecond: int = 123456, run_at_startup: bool = False, unique: bool = True, timeout: Union[int, float, datetime.timedelta, None] = None, keep_result: Optional[float] = 0, keep_result_forever: Optional[bool] = False, max_tries: Optional[int] = 1) → arku.cron.CronJob[source]

Create a cron job, eg. it should be executed at specific times.

Workers will enqueue this job at or just after the set times. If unique is true (the default) the job will only be run once even if multiple workers are running.

Parameters
  • coroutine – coroutine function to run

  • name – name of the job, if None, the name of the coroutine is used

  • month – month(s) to run the job on, 1 - 12

  • day – day(s) to run the job on, 1 - 31

  • weekday – week day(s) to run the job on, 0 - 6 or mon - sun

  • hour – hour(s) to run the job on, 0 - 23

  • minute – minute(s) to run the job on, 0 - 59

  • second – second(s) to run the job on, 0 - 59

  • microsecond – microsecond(s) to run the job on, defaults to 123456 as the world is busier at the top of a second, 0 - 1e6

  • run_at_startup – whether to run as worker starts

  • unique – whether the job should be only be executed once at each time

  • timeout – job timeout

  • keep_result – how long to keep the result for

  • keep_result_forever – whether to keep results forever

  • max_tries – maximum number of tries for the job

class arku.jobs.JobStatus[source]

Enum of job statuses.

deferred = 'deferred'

job is in the queue, time it should be run not yet reached

queued = 'queued'

job is in the queue, time it should run has been reached

in_progress = 'in_progress'

job is in progress

complete = 'complete'

job is complete, result is available

not_found = 'not_found'

job not found in any way

class arku.jobs.Job(job_id: str, redis: aioredis.client.Redis, _queue_name: str = 'arku:queue', _deserializer: Optional[Callable[[bytes], Dict[str, Any]]] = None)[source]

Holds data a reference to a job.

result(timeout: Optional[float] = None, *, poll_delay: float = 0.5, pole_delay: float = None) → Any[source]

Get the result of the job, including waiting if it’s not yet available. If the job raised an exception, it will be raised here.

Parameters
  • timeout – maximum time to wait for the job result before raising TimeoutError, will wait forever

  • poll_delay – how often to poll redis for the job result

  • pole_delay – deprecated, use poll_delay instead

info() → Optional[arku.jobs.JobDef][source]

All information on a job, including its result if it’s available, does not wait for the result.

abort(*, timeout: Optional[float] = None, poll_delay: float = 0.5) → bool[source]

Abort the job.

Parameters
  • timeout – maximum time to wait for the job result before raising TimeoutError, will wait forever on None

  • poll_delay – how often to poll redis for the job result

Returns

True if the job aborted properly, False otherwise

result_info() → Optional[arku.jobs.JobResult][source]

Information about the job result if available, does not wait for the result. Does not raise an exception even if the job raised one.

status() → arku.jobs.JobStatus[source]

Status of the job.

History

v0.22 (2021-09-02)

  • fix package importing in example, #261, thanks @cdpath

  • restrict aioredis to <2.0.0 (soon we’ll support aioredis>=2.0.0), #258, thanks @PaxPrz

  • auto setting version on release, 759fe03

v0.21 (2021-07-06)

  • CI improvements #243

  • fix log_redis_info #255

v0.20 (2021-04-26)

  • Added queue_name attribute to JobResult, #198

  • set job_deserializer, job_serializer and default_queue_name on worker pools to better supported nested jobs, #203, #215 and #218

  • All job results to be kept indefinitely, #205

  • refactor cron jobs to prevent duplicate jobs, #200

  • correctly handle CancelledError in python 3.8+, #213

  • allow jobs to be aborted, #212

  • depreciate pole_delay and use correct spelling poll_delay, #242

  • docs improvements, #207 and #232

v0.19.1 (2020-10-26)

  • fix timestamp issue in _defer_until without timezone offset, #182

  • add option to disable signal handler registration from running inside other frameworks, #183

  • add default_queue_name to create_redis_pool and ArqRedis, #191

  • Worker can retrieve the queue_name from the connection pool, if present

  • fix potential race condition when starting jobs, #194

  • support python 3.9 and pydantic 1.7, #214

v0.19.0 (2020-04-24)

  • Python 3.8 support, #178

  • fix concurrency with multiple workers, #180

  • full mypy coverage, #181

v0.18.4 (2019-12-19)

  • Add py.typed file to tell mypy the package has type hints, #163

  • Added ssl option to RedisSettings, #165

v0.18.3 (2019-11-13)

  • Include queue_name when for job object in response to enqueue_job, #160

v0.18.2 (2019-11-01)

  • Fix cron scheduling on a specific queue, by @dmvass and @Tinche

v0.18.1 (2019-10-28)

  • add support for Redis Sentinel fix #132

  • fix Worker.abort_job invalid expire time error, by @dmvass

v0.18 (2019-08-30)

  • fix usage of max_burst_jobs, improve coverage fix #152

  • stop lots of WatchVariableError errors in log, #153

v0.17.1 (2019-08-21)

  • deal better with failed job deserialization, #149 by @samuelcolvin

  • fix run_check(xmax_burst_jobs=...) when a jobs fails, #150 by @samuelcolvin

v0.17 (2019-08-11)

  • add worker.queue_read_limit, fix #141, by @rubik

  • custom serializers, eg. to use msgpack rather than pickle, #143 by @rubik

  • add ArqRedis.queued_jobs utility method for getting queued jobs while testing, fix #145 by @samuelcolvin

v0.16.1 (2019-08-02)

  • prevent duplicate job_id when job result exists, fix #137

  • add “don’t retry mode” via worker.retry_jobs = False, fix #139

  • add worker.max_burst_jobs

v0.16 (2019-07-30)

  • improved error when a job is aborted (eg. function not found)

v0.16.0b3 (2019-05-14)

  • fix semaphore on worker with many expired jobs

v0.16.0b2 (2019-05-14)

  • add support for different queues, #127 thanks @tsutsarin

v0.16.0b1 (2019-04-23)

  • use dicts for pickling not tuples, better handling of pickling errors, #123

v0.16.0a5 (2019-04-22)

  • use pipeline in enqueue_job

  • catch any error when pickling job result

  • add support for python 3.6

v0.16.0a4 (2019-03-15)

  • add Worker.run_check, fix #115

v0.16.0a3 (2019-03-12)

  • fix Worker with custom redis settings

v0.16.0a2 (2019-03-06)

  • add job_try argument to enqueue_job, #113

  • adding --watch mode to the worker (requires watchgod), #114

  • allow ctx when creating Worker

  • add all_job_results to ArqRedis

  • fix python path when starting worker

v0.16.0a1 (2019-03-05)

  • Breaking Change: COMPLETE REWRITE!!! see docs for details, #110

v0.15.0 (2018-11-15)

  • update dependencies

  • reconfigure Job, return a job instance when enqueuing tasks #93

  • tweaks to docs #106

v0.14.0 (2018-05-28)

  • package updates, particularly compatibility for msgpack 0.5.6

v0.13.0 (2017-11-27)

  • Breaking Change: integration with aioredis >= 1.0, basic usage hasn’t changed but look at aioredis’s migration docs for changes in redis API #76

v0.12.0 (2017-11-16)

  • better signal handling, support uvloop #73

  • drain pending tasks and drain task cancellation #74

  • add aiohttp and docker demo /demo #75

v0.11.0 (2017-08-25)

  • extract create_pool_lenient from RedixMixin

  • improve redis connection traceback

v0.10.4 (2017-08-22)

  • RedisSettings repr method

  • add create_connection_timeout to connection pool

v0.10.3 (2017-08-19)

  • fix bug with RedisMixin.get_redis_pool creating multiple queues

  • tweak drain logs

v0.10.2 (2017-08-17)

  • only save job on task in drain if re-enqueuing

  • add semaphore timeout to drains

  • add key count to log_redis_info

v0.10.1 (2017-08-16)

  • correct format of log_redis_info

v0.10.0 (2017-08-16)

  • log redis version when starting worker, fix #64

  • log “connection success” when connecting to redis after connection failures, fix #67

  • add job ids, for now they’re just used in logging, fix #53

v0.9.0 (2017-06-23)

  • allow set encoding in msgpack for jobs #49

  • cron tasks allowing scheduling of functions in the future #50

  • Breaking change: switch to_unix_ms to just return the timestamp int, add to_unix_ms_tz to return tz offset too

v0.8.1 (2017-06-05)

  • uprev setup requires

  • correct setup arguments

v0.8.0 (2017-06-05)

  • add async-timeout dependency

  • use async-timeout around shadow_factory

  • change logger name for control process log messages

  • use Semaphore rather than asyncio.wait(...return_when=asyncio.FIRST_COMPLETED) for improved performance

  • improve log display

  • add timeout and retry logic to RedisMixin.create_redis_pool

v0.7.0 (2017-06-01)

  • implementing reusable Drain which takes tasks from a redis list and allows them to be execute asynchronously.

  • Drain uses python 3.6 async yield, therefore python 3.5 is no longer supported.

  • prevent repeated identical health check log messages

v0.6.1 (2017-05-06)

  • mypy at last passing, #30

  • adding trove classifiers, #29

v0.6.0 (2017-04-14)

  • add StopJob exception for cleaning ending jobs, #21

  • add flushdb to MockRedis, #23

  • allow configurable length job logging via log_curtail on Worker, #28

v0.5.2 (2017-02-25)

  • add shadow_kwargs method to BaseWorker to make customising actors easier.

v0.5.1 (2017-02-25)

  • reimplement worker reuse as it turned out to be useful in tests.

v0.5.0 (2017-02-20)

  • use gather rather than wait for startup and shutdown so exceptions propagate.

  • add --check option to confirm arq worker is running.

v0.4.1 (2017-02-11)

  • fix issue with Concurrent class binding with multiple actor instances.

v0.4.0 (2017-02-10)

  • improving naming of log handlers and formatters

  • upgrade numerous packages, nothing significant

  • add startup and shutdown methods to actors

  • switch @concurrent to return a Concurrent instance so the direct method is accessible via <func>.direct

v0.3.2 (2017-01-24)

  • improved solution for preventing new jobs starting when the worker is about to stop

  • switch SIGRTMIN > SIGUSR1 to work with mac

v0.3.1 (2017-01-20)

  • fix main process signal handling so the worker shuts down when just the main process receives a signal

  • re-enqueue un-started jobs popped from the queue if the worker is about to exit

v0.3.0 (2017-01-19)

  • rename settings class to RedisSettings and simplify significantly

v0.2.0 (2016-12-09)

  • add concurrency_enabled argument to aid in testing

  • fix conflict with unitest.mock

v0.1.0 (2016-12-06)

  • prevent logs disabling other logs

v0.0.6 (2016-08-14)

  • first proper release