arku¶
Job queues and RPC in python with asyncio and redis.
arku, pronounced as ar·kyu
, was conceived as a simple, modern and performant successor to rq.
Why use arku?
- non-blocking
arku is built using python 3’s asyncio allowing non-blocking job enqueuing and execution. Multiple jobs (potentially hundreds) can be run simultaneously using a pool of asyncio
Tasks
.- powerful-features
Deferred execution, easy retrying of jobs, and pessimistic execution (see below) means arku is great for critical jobs that must be completed.
- fast
Asyncio and no forking make arku around 7x faster than rq for short jobs with no io. With io that might increase to around 40x faster. (TODO)
- elegant
I’m a long time contributor to and user of rq, arku is designed to be simpler, clearer and more powerful.
- small
and easy to reason with - currently arku is only about 700 lines, that won’t change significantly.
Redesigned to be less elegant?¶
The approach used in arku v0.16
of enqueueing jobs by name rather than “just calling a function” and knowing it
will be called on the worker (as used in arku <= v0.15
, rq, celery et al.) might seem less elegant,
but it’s for good reason.
This approach means your frontend (calling the worker) doesn’t need access to the worker code, meaning better code separation and possibly smaller images etc.
Usage¶
Warning
Jobs may be called more than once!
arku v0.16 has what I’m calling “pessimistic execution”: jobs aren’t removed from the queue until they’ve either succeeded or failed. If the worker shuts down, the job will be cancelled immediately and will remain in the queue to be run again when the worker starts up again (or run by another worker which is still running).
(This differs from other similar libraries like arku <= v0.15
, rq, celery et al. where jobs generally don’t get
rerun when a worker shuts down. This in turn requires complex logic to try and let jobs finish before
shutting down (I wrote the HerokuWorker
for rq), however this never really works unless either: all jobs take
less than 6 seconds or your worker never shuts down when a job is running (impossible).)
All arku jobs should therefore be designed to cope with being called repeatedly if they’re cancelled, eg. use database transactions, idempotency keys or redis to mark when an API request or similar has succeeded to avoid making it twice.
In summary: sometimes exactly once can be hard or impossible, arku favours multiple times over zero times.
Simple Usage¶
import asyncio
from aiohttp import ClientSession
from arku import create_pool
from arku.connections import RedisSettings
async def download_content(ctx, url):
session: ClientSession = ctx['session']
async with session.get(url) as response:
content = await response.text()
print(f'{url}: {content:.80}...')
return len(content)
async def startup(ctx):
ctx['session'] = ClientSession()
async def shutdown(ctx):
await ctx['session'].close()
async def main():
redis = await create_pool(RedisSettings())
for url in ('https://facebook.com', 'https://microsoft.com', 'https://github.com'):
await redis.enqueue_job('download_content', url)
# WorkerSettings defines the settings to use when creating the work,
# it's used by the arku cli
class WorkerSettings:
functions = [download_content]
on_startup = startup
on_shutdown = shutdown
if __name__ == '__main__':
asyncio.run(main())
(This script is complete, it should run “as is” both to enqueue jobs and run them)
To enqueue the jobs, simply run the script:
python demo.py
To execute the jobs, either after running demo.py
or before/during:
arku demo.WorkerSettings
Append --burst
to stop the worker once all jobs have finished. See arku.worker.Worker
for more available
properties of WorkerSettings
.
You can also watch for changes and reload the worker when the source changes:
arku demo.WorkerSettings --watch path/to/src
This requires watchgod to be installed (pip install watchgod
).
For details on the arku CLI:
arku --help
Startup & Shutdown coroutines¶
The on_startup
and on_shutdown
coroutines are provided as a convenient way to run logic as the worker
starts and finishes, see arku.worker.Worker
.
For example, in the above example session
is created once when the work starts up and is then used in subsequent
jobs.
Deferring Jobs¶
By default, when a job is enqueued it will run as soon as possible (provided a worker is running). However
you can schedule jobs to run in the future, either by a given duration (_defer_by
) or
at a particular time _defer_until
, see arku.connections.ArkuRedis.enqueue_job()
.
import asyncio
from datetime import datetime, timedelta
from arku import create_pool
from arku.connections import RedisSettings
async def the_task(ctx):
print('this is the tasks, delay since enqueueing:', datetime.now() - ctx['enqueue_time'])
async def main():
redis = await create_pool(RedisSettings())
# deferred by 10 seconds
await redis.enqueue_job('the_task', _defer_by=10)
# deferred by 1 minute
await redis.enqueue_job('the_task', _defer_by=timedelta(minutes=1))
# deferred until jan 28th 2032, you'll be waiting a long time for this...
await redis.enqueue_job('the_task', _defer_until=datetime(2032, 1, 28))
class WorkerSettings:
functions = [the_task]
if __name__ == '__main__':
asyncio.run(main())
Job Uniqueness¶
Sometimes you want a job to only be run once at a time (eg. a backup) or once for a given parameter (eg. generating invoices for a particular company).
arku supports this via custom job ids, see arku.connections.ArkuRedis.enqueue_job()
. It guarantees
that a job with a particular ID cannot be enqueued again until its execution has finished.
import asyncio
from arku import create_pool
from arku.connections import RedisSettings
async def the_task(ctx):
print('running the task with id', ctx['job_id'])
async def main():
redis = await create_pool(RedisSettings())
# no id, random id will be generated
job1 = await redis.enqueue_job('the_task')
print(job1)
"""
> <arku job 99edfef86ccf4145b2f64ee160fa3297>
"""
# random id again, again the job will be enqueued and a job will be returned
job2 = await redis.enqueue_job('the_task')
print(job2)
"""
> <arku job 7d2163c056e54b62a4d8404921094f05>
"""
# custom job id, job will be enqueued
job3 = await redis.enqueue_job('the_task', _job_id='foobar')
print(job3)
"""
> <arku job foobar>
"""
# same custom job id, job will not be enqueued and enqueue_job will return None
job4 = await redis.enqueue_job('the_task', _job_id='foobar')
print(job4)
"""
> None
"""
class WorkerSettings:
functions = [the_task]
if __name__ == '__main__':
asyncio.run(main())
The check of job_id
uniqueness in the queue is performed using a redis transaction so you can be certain jobs
with the same id won’t be enqueued twice (or overwritten) even if they’re enqueued at exactly the same time.
Job Results¶
You can access job information, status and job results using the arku.jobs.Job
instance returned from
arku.connections.ArkuRedis.enqueue_job()
.
import asyncio
from arku import create_pool
from arku.connections import RedisSettings
# requires `pip install devtools`, used for pretty printing of job info
from devtools import debug
async def the_task(ctx):
print('running the task')
return 42
async def main():
redis = await create_pool(RedisSettings())
job = await redis.enqueue_job('the_task')
# get the job's id
print(job.job_id)
"""
> 68362958a244465b9be909db4b7b5ab4 (or whatever)
"""
# get information about the job, will include results if the job has finished, but
# doesn't await the job's result
debug(await job.info())
"""
> docs/examples/job_results.py:23 main
JobDef(
function='the_task',
args=(),
kwargs={},
job_try=None,
enqueue_time=datetime.datetime(2019, 4, 23, 13, 58, 56, 781000),
score=1556027936781
) (JobDef)
"""
# get the Job's status
print(await job.status())
"""
> JobStatus.queued
"""
# poll redis for the job result, if the job raised an exception,
# it will be raised here
# (You'll need the worker running at the same time to get a result here)
print(await job.result(timeout=5))
"""
> 42
"""
class WorkerSettings:
functions = [the_task]
if __name__ == '__main__':
asyncio.run(main())
Retrying jobs and cancellation¶
As described above, when an arku work shuts down any going jobs are cancelled immediately
(via vanilla task.cancel()
, so a CancelledError
will be raised). You can see this by running a slow job
(eg. add await asyncio.sleep(5)
) and hitting Ctrl+C
once it’s started.
You’ll get something like.
➤ arku slow_job.WorkerSettings
12:42:38: Starting worker for 1 functions: the_task
12:42:38: redis_version=4.0.9 mem_usage=904.50K clients_connected=4 db_keys=3
12:42:38: 10.23s → c3dd4acc171541b9ac10b1d791750cde:the_task() delayed=10.23s
^C12:42:40: shutdown on SIGINT ◆ 0 jobs complete ◆ 0 failed ◆ 0 retries ◆ 1 ongoing to cancel
12:42:40: 1.16s ↻ c3dd4acc171541b9ac10b1d791750cde:the_task cancelled, will be run again
➤ arku slow_job.WorkerSettings
12:42:50: Starting worker for 1 functions: the_task
12:42:50: redis_version=4.0.9 mem_usage=904.61K clients_connected=4 db_keys=4
12:42:50: 21.78s → c3dd4acc171541b9ac10b1d791750cde:the_task() try=2 delayed=21.78s
12:42:55: 5.00s ← c3dd4acc171541b9ac10b1d791750cde:the_task ●
^C12:42:57: shutdown on SIGINT ◆ 1 jobs complete ◆ 0 failed ◆ 0 retries ◆ 0 ongoing to cancel
You can also retry jobs by raising the arku.worker.Retry
exception from within a job,
optionally with a duration to defer rerunning the jobs by:
import asyncio
from aiohttp import ClientSession
from arku import create_pool, Retry
from arku.connections import RedisSettings
async def download_content(ctx, url):
session: ClientSession = ctx['session']
async with session.get(url) as response:
if response.status != 200:
# retry the job with increasing back-off
# delays will be 5s, 10s, 15s, 20s
# after max_tries (default 5) the job will permanently fail
raise Retry(defer=ctx['job_try'] * 5)
content = await response.text()
return len(content)
async def startup(ctx):
ctx['session'] = ClientSession()
async def shutdown(ctx):
await ctx['session'].close()
async def main():
redis = await create_pool(RedisSettings())
await redis.enqueue_job('download_content', 'https://httpbin.org/status/503')
class WorkerSettings:
functions = [download_content]
on_startup = startup
on_shutdown = shutdown
if __name__ == '__main__':
asyncio.run(main())
To abort a job, call arku.job.Job.abort()
. (Note for the arku.job.Job.abort()
method to
have any effect, you need to set allow_abort_jobs
to True
on the worker, this is for performance reason.
allow_abort_jobs=True
may become the default in future)
arku.job.Job.abort()
will abort a job if it’s already running or prevent it being run if it’s currently
in the queue.
import asyncio
from arku import create_pool
from arku.connections import RedisSettings
async def do_stuff(ctx):
print('doing stuff...')
await asyncio.sleep(10)
return 'stuff done'
async def main():
redis = await create_pool(RedisSettings())
job = await redis.enqueue_job('do_stuff')
await asyncio.sleep(1)
await job.abort()
class WorkerSettings:
functions = [do_stuff]
allow_abort_jobs = True
if __name__ == '__main__':
asyncio.run(main())
Health checks¶
arku will automatically record some info about its current state in redis every health_check_interval
seconds.
That key/value will expire after health_check_interval + 1
seconds so you can be sure if the variable exists arku
is alive and kicking (technically you can be sure it was alive and kicking health_check_interval
seconds ago).
You can run a health check with the CLI (assuming you’re using the first example above):
arku --check demo.WorkerSettings
The command will output the value of the health check if found;
then exit 0
if the key was found and 1
if it was not.
A health check value takes the following form:
Mar-01 17:41:22 j_complete=0 j_failed=0 j_retried=0 j_ongoing=0 queued=0
Where the items have the following meaning:
j_complete
the number of jobs completedj_failed
the number of jobs which have failed eg. raised an exceptionj_ongoing
the number of jobs currently being performedj_retried
the number of jobs retries run
Cron Jobs¶
Functions can be scheduled to be run periodically at specific times. See arku.cron.cron()
.
from arku import cron
async def run_regularly(ctx):
print('run foo job at 9.12am, 12.12pm and 6.12pm')
class WorkerSettings:
cron_jobs = [
cron(run_regularly, hour={9, 12, 18}, minute=12)
]
Usage roughly shadows cron except None
is equivalent on *
in crontab.
As per the example sets can be used to run at multiple of the given unit.
Note that second
defaults to 0
so you don’t in inadvertently run jobs every second and microsecond
defaults to 123456
so you don’t inadvertently run jobs every microsecond and so arku avoids enqueuing jobs
at the top of a second when the world is generally slightly busier.
Synchronous Jobs¶
Functions that can block the loop for extended periods should be run in an executor like
concurrent.futures.ThreadPoolExecutor
or concurrent.futures.ProcessPoolExecutor
using
loop.run_in_executor
as shown below.
import time
import functools
import asyncio
from concurrent import futures
def sync_task(t):
return time.sleep(t)
async def the_task(ctx, t):
blocking = functools.partial(sync_task, t)
loop = asyncio.get_running_loop()
await loop.run_in_executor(ctx['pool'], blocking)
async def startup(ctx):
ctx['pool'] = futures.ProcessPoolExecutor()
class WorkerSettings:
functions = [the_task]
on_startup = startup
Custom job serializers¶
By default, arku will use the built-in pickle
module to serialize and deserialize jobs. If you wish to
use an alternative serialization methods, you can do so by specifying them when creating the connection pool
and the worker settings. A serializer function takes a Python object and returns a binary representation
encoded in a bytes
object. A deserializer function, on the other hand, creates Python objects out of
a bytes
sequence.
Warning
It is essential that the serialization functions used by arku.connections.create_pool()
and
arku.worker.Worker
are the same, otherwise jobs created by the former cannot be executed by the
latter. This also applies when you update your serialization functions: you need to ensure that your new
functions are backward compatible with the old jobs, or that there are no jobs with the older serialization
scheme in the queue.
Here is an example with MsgPack, an efficient binary serialization format that may enable significant memory improvements over pickle:
import asyncio
import msgpack # installable with "pip install msgpack"
from arku import create_pool
from arku.connections import RedisSettings
async def the_task(ctx):
return 42
async def main():
redis = await create_pool(
RedisSettings(),
job_serializer=msgpack.packb,
job_deserializer=lambda b: msgpack.unpackb(b, raw=False),
)
await redis.enqueue_job('the_task')
class WorkerSettings:
functions = [the_task]
job_serializer = msgpack.packb
# refer to MsgPack's documentation as to why raw=False is required
job_deserializer = lambda b: msgpack.unpackb(b, raw=False)
if __name__ == '__main__':
asyncio.run(main())
Reference¶
-
class
arku.connections.
RedisSettings
(host: Union[str, List[Tuple[str, int]]] = 'localhost', port: int = 6379, database: int = 0, password: Optional[str] = None, ssl: Union[bool, None, arku.connections.SSLContext] = None, conn_timeout: int = 1, conn_retries: int = 5, conn_retry_delay: int = 1, sentinel: bool = False, sentinel_master: str = 'mymaster')[source]¶ No-Op class used to hold redis connection redis_settings.
Used by
arku.connections.create_pool()
andarku.worker.Worker
.
-
class
arku.connections.
ArkuRedis
(pool_or_conn: Optional[aioredis.connection.ConnectionPool] = None, job_serializer: Optional[Callable[[Dict[str, Any]], bytes]] = None, job_deserializer: Optional[Callable[[bytes], Dict[str, Any]]] = None, default_queue_name: str = 'arku:queue', **kwargs)[source]¶ Thin subclass of
aioredis.Redis
which addsarku.connections.enqueue_job()
.- Parameters
redis_settings – an instance of
arku.connections.RedisSettings
.job_serializer – a function that serializes Python objects to bytes, defaults to pickle.dumps
job_deserializer – a function that deserializes bytes into Python objects, defaults to pickle.loads
default_queue_name – the default queue name to use, defaults to
arku.queue
.kwargs – keyword arguments directly passed to
aioredis.Redis
.
-
enqueue_job
(function: str, *args, _job_id: Optional[str] = None, _queue_name: Optional[str] = None, _defer_until: Optional[datetime.datetime] = None, _defer_by: Union[None, int, float, datetime.timedelta] = None, _expires: Union[None, int, float, datetime.timedelta] = None, _job_try: Optional[int] = None, **kwargs) → Optional[arku.jobs.Job][source]¶ Enqueue a job.
- Parameters
function – Name of the function to call
args – args to pass to the function
_job_id – ID of the job, can be used to enforce job uniqueness
_queue_name – queue of the job, can be used to create job in different queue
_defer_until – datetime at which to run the job
_defer_by – duration to wait before running the job
_expires – if the job still hasn’t started after this duration, do not run it
_job_try – useful when re-enqueueing jobs within a job
kwargs – any keyword arguments to pass to the function
- Returns
arku.jobs.Job
instance orNone
if a job with this ID already exists
-
arku.connections.
create_pool
(settings_: arku.connections.RedisSettings = None, *, retry: int = 0, job_serializer: Optional[Callable[[Dict[str, Any]], bytes]] = None, job_deserializer: Optional[Callable[[bytes], Dict[str, Any]]] = None, default_queue_name: str = 'arku:queue') → arku.connections.ArkuRedis[source]¶ Create a new redis pool, retrying up to
conn_retries
times if the connection fails.Similar to
aioredis.create_redis_pool
except it returns aarku.connections.ArkuRedis
instance, thus allowing job enqueuing.
-
arku.worker.
func
(coroutine: Union[str, arku.worker.Function, WorkerCoroutine], *, name: Optional[str] = None, keep_result: Optional[SecondsTimedelta] = None, timeout: Optional[SecondsTimedelta] = None, keep_result_forever: Optional[bool] = None, max_tries: Optional[int] = None) → arku.worker.Function[source]¶ Wrapper for a job function which lets you configure more settings.
- Parameters
coroutine – coroutine function to call, can be a string to import
name – name for function, if None,
coroutine.__qualname__
is usedkeep_result – duration to keep the result for, if 0 the result is not kept
keep_result_forever – whether to keep results forever, if None use Worker default, wins over
keep_result
timeout – maximum time the job should take
max_tries – maximum number of tries allowed for the function, use 1 to prevent retrying
-
exception
arku.worker.
Retry
(defer: Optional[SecondsTimedelta] = None)[source]¶ Special exception to retry the job (if
max_retries
hasn’t been reached).- Parameters
defer – duration to wait before rerunning the job
-
class
arku.worker.
Worker
(functions: Sequence[Union[arku.worker.Function, WorkerCoroutine]] = (), *, queue_name: Optional[str] = 'arku:queue', cron_jobs: Optional[Sequence[arku.cron.CronJob]] = None, redis_settings: arku.connections.RedisSettings = None, redis_pool: arku.connections.ArkuRedis = None, burst: bool = False, on_startup: Optional[StartupShutdown] = None, on_shutdown: Optional[StartupShutdown] = None, handle_signals: bool = True, max_jobs: int = 10, job_timeout: SecondsTimedelta = 300, keep_result: SecondsTimedelta = 3600, keep_result_forever: bool = False, poll_delay: SecondsTimedelta = 0.5, queue_read_limit: Optional[int] = None, max_tries: int = 5, health_check_interval: SecondsTimedelta = 3600, health_check_key: Optional[str] = None, ctx: Optional[Dict[Any, Any]] = None, retry_jobs: bool = True, allow_abort_jobs: bool = False, max_burst_jobs: int = -1, job_serializer: Optional[Callable[[Dict[str, Any]], bytes]] = None, job_deserializer: Optional[Callable[[bytes], Dict[str, Any]]] = None)[source]¶ Main class for running jobs.
- Parameters
functions – list of functions to register, can either be raw coroutine functions or the result of
arku.worker.func()
.queue_name – queue name to get jobs from
cron_jobs – list of cron jobs to run, use
arku.cron.cron()
to create themredis_settings – settings for creating a redis connection
redis_pool – existing redis pool, generally None
burst – whether to stop the worker once all jobs have been run
on_startup – coroutine function to run at startup
on_shutdown – coroutine function to run at shutdown
handle_signals – default true, register signal handlers, set to false when running inside other async framework
max_jobs – maximum number of jobs to run at a time
job_timeout – default job timeout (max run time)
keep_result – default duration to keep job results for
keep_result_forever – whether to keep results forever
poll_delay – duration between polling the queue for new jobs
queue_read_limit – the maximum number of jobs to pull from the queue each time it’s polled; by default it equals
max_jobs
max_tries – default maximum number of times to retry a job
health_check_interval – how often to set the health check key
health_check_key – redis key under which health check is set
ctx – dictionary to hold extra user defined state
retry_jobs – whether to retry jobs on Retry or CancelledError or not
allow_abort_jobs – whether to abort jobs on a call to
arku.jobs.Job.abort()
max_burst_jobs – the maximum number of jobs to process in burst mode (disabled with negative values)
job_serializer – a function that serializes Python objects to bytes, defaults to pickle.dumps
job_deserializer – a function that deserializes bytes into Python objects, defaults to pickle.loads
-
async_run
() → None[source]¶ Asynchronously run the worker, does not close connections. Useful when testing.
-
run_check
(retry_jobs: Optional[bool] = None, max_burst_jobs: Optional[int] = None) → int[source]¶ Run
arku.worker.Worker.async_run()
, check for failed jobs and raisearku.worker.FailedJobs
if any jobs have failed.- Returns
number of completed jobs
-
arku.cron.
cron
(coroutine: Union[str, arku.typing.WorkerCoroutine], *, name: Optional[str] = None, month: Union[None, Set[int], int] = None, day: Union[None, Set[int], int] = None, weekday: Union[None, Set[int], int, Literal[mon, tues, wed, thurs, fri, sat, sun]] = None, hour: Union[None, Set[int], int] = None, minute: Union[None, Set[int], int] = None, second: Union[None, Set[int], int] = 0, microsecond: int = 123456, run_at_startup: bool = False, unique: bool = True, timeout: Union[int, float, datetime.timedelta, None] = None, keep_result: Optional[float] = 0, keep_result_forever: Optional[bool] = False, max_tries: Optional[int] = 1) → arku.cron.CronJob[source]¶ Create a cron job, eg. it should be executed at specific times.
Workers will enqueue this job at or just after the set times. If
unique
is true (the default) the job will only be run once even if multiple workers are running.- Parameters
coroutine – coroutine function to run
name – name of the job, if None, the name of the coroutine is used
month – month(s) to run the job on, 1 - 12
day – day(s) to run the job on, 1 - 31
weekday – week day(s) to run the job on, 0 - 6 or mon - sun
hour – hour(s) to run the job on, 0 - 23
minute – minute(s) to run the job on, 0 - 59
second – second(s) to run the job on, 0 - 59
microsecond – microsecond(s) to run the job on, defaults to 123456 as the world is busier at the top of a second, 0 - 1e6
run_at_startup – whether to run as worker starts
unique – whether the job should be only be executed once at each time
timeout – job timeout
keep_result – how long to keep the result for
keep_result_forever – whether to keep results forever
max_tries – maximum number of tries for the job
-
class
arku.jobs.
JobStatus
[source]¶ Enum of job statuses.
-
deferred
= 'deferred'¶ job is in the queue, time it should be run not yet reached
-
queued
= 'queued'¶ job is in the queue, time it should run has been reached
-
in_progress
= 'in_progress'¶ job is in progress
-
complete
= 'complete'¶ job is complete, result is available
-
not_found
= 'not_found'¶ job not found in any way
-
-
class
arku.jobs.
Job
(job_id: str, redis: aioredis.client.Redis, _queue_name: str = 'arku:queue', _deserializer: Optional[Callable[[bytes], Dict[str, Any]]] = None)[source]¶ Holds data a reference to a job.
-
result
(timeout: Optional[float] = None, *, poll_delay: float = 0.5, pole_delay: float = None) → Any[source]¶ Get the result of the job, including waiting if it’s not yet available. If the job raised an exception, it will be raised here.
- Parameters
timeout – maximum time to wait for the job result before raising
TimeoutError
, will wait foreverpoll_delay – how often to poll redis for the job result
pole_delay – deprecated, use poll_delay instead
-
info
() → Optional[arku.jobs.JobDef][source]¶ All information on a job, including its result if it’s available, does not wait for the result.
-
abort
(*, timeout: Optional[float] = None, poll_delay: float = 0.5) → bool[source]¶ Abort the job.
- Parameters
timeout – maximum time to wait for the job result before raising
TimeoutError
, will wait forever on Nonepoll_delay – how often to poll redis for the job result
- Returns
True if the job aborted properly, False otherwise
-
History¶
v0.22 (2021-09-02)¶
fix package importing in example, #261, thanks @cdpath
restrict
aioredis
to<2.0.0
(soon we’ll supportaioredis>=2.0.0
), #258, thanks @PaxPrzauto setting version on release, 759fe03
v0.21 (2021-07-06)¶
CI improvements #243
fix
log_redis_info
#255
v0.20 (2021-04-26)¶
Added
queue_name
attribute toJobResult
, #198set
job_deserializer
,job_serializer
anddefault_queue_name
on worker pools to better supported nested jobs, #203, #215 and #218All job results to be kept indefinitely, #205
refactor
cron
jobs to prevent duplicate jobs, #200correctly handle
CancelledError
in python 3.8+, #213allow jobs to be aborted, #212
depreciate
pole_delay
and use correct spellingpoll_delay
, #242docs improvements, #207 and #232
v0.19.1 (2020-10-26)¶
fix timestamp issue in _defer_until without timezone offset, #182
add option to disable signal handler registration from running inside other frameworks, #183
add
default_queue_name
tocreate_redis_pool
andArqRedis
, #191Worker
can retrieve thequeue_name
from the connection pool, if presentfix potential race condition when starting jobs, #194
support python 3.9 and pydantic 1.7, #214
v0.19.0 (2020-04-24)¶
Python 3.8 support, #178
fix concurrency with multiple workers, #180
full mypy coverage, #181
v0.18.4 (2019-12-19)¶
Add
py.typed
file to tell mypy the package has type hints, #163Added
ssl
option toRedisSettings
, #165
v0.18.3 (2019-11-13)¶
Include
queue_name
when for job object in response toenqueue_job
, #160
v0.18.2 (2019-11-01)¶
Fix cron scheduling on a specific queue, by @dmvass and @Tinche
v0.18.1 (2019-10-28)¶
add support for Redis Sentinel fix #132
fix
Worker.abort_job
invalid expire time error, by @dmvass
v0.18 (2019-08-30)¶
fix usage of
max_burst_jobs
, improve coverage fix #152stop lots of
WatchVariableError
errors in log, #153
v0.17.1 (2019-08-21)¶
deal better with failed job deserialization, #149 by @samuelcolvin
fix
run_check(xmax_burst_jobs=...)
when a jobs fails, #150 by @samuelcolvin
v0.17 (2019-08-11)¶
add
worker.queue_read_limit
, fix #141, by @rubikcustom serializers, eg. to use msgpack rather than pickle, #143 by @rubik
add
ArqRedis.queued_jobs
utility method for getting queued jobs while testing, fix #145 by @samuelcolvin
v0.16.1 (2019-08-02)¶
prevent duplicate
job_id
when job result exists, fix #137add “don’t retry mode” via
worker.retry_jobs = False
, fix #139add
worker.max_burst_jobs
v0.16 (2019-07-30)¶
improved error when a job is aborted (eg. function not found)
v0.16.0b3 (2019-05-14)¶
fix semaphore on worker with many expired jobs
v0.16.0b2 (2019-05-14)¶
add support for different queues, #127 thanks @tsutsarin
v0.16.0b1 (2019-04-23)¶
use dicts for pickling not tuples, better handling of pickling errors, #123
v0.16.0a5 (2019-04-22)¶
use
pipeline
inenqueue_job
catch any error when pickling job result
add support for python 3.6
v0.16.0a4 (2019-03-15)¶
add
Worker.run_check
, fix #115
v0.16.0a3 (2019-03-12)¶
fix
Worker
with custom redis settings
v0.16.0a2 (2019-03-06)¶
add
job_try
argument toenqueue_job
, #113adding
--watch
mode to the worker (requireswatchgod
), #114allow
ctx
when creating Workeradd
all_job_results
toArqRedis
fix python path when starting worker
v0.16.0a1 (2019-03-05)¶
Breaking Change: COMPLETE REWRITE!!! see docs for details, #110
v0.15.0 (2018-11-15)¶
update dependencies
reconfigure
Job
, return a job instance when enqueuing tasks #93tweaks to docs #106
v0.14.0 (2018-05-28)¶
package updates, particularly compatibility for
msgpack 0.5.6
v0.13.0 (2017-11-27)¶
Breaking Change: integration with aioredis >= 1.0, basic usage hasn’t changed but look at aioredis’s migration docs for changes in redis API #76
v0.12.0 (2017-11-16)¶
better signal handling, support
uvloop
#73drain pending tasks and drain task cancellation #74
add aiohttp and docker demo
/demo
#75
v0.11.0 (2017-08-25)¶
extract
create_pool_lenient
fromRedixMixin
improve redis connection traceback
v0.10.4 (2017-08-22)¶
RedisSettings
repr methodadd
create_connection_timeout
to connection pool
v0.10.3 (2017-08-19)¶
fix bug with
RedisMixin.get_redis_pool
creating multiple queuestweak drain logs
v0.10.2 (2017-08-17)¶
only save job on task in drain if re-enqueuing
add semaphore timeout to drains
add key count to
log_redis_info
v0.10.1 (2017-08-16)¶
correct format of
log_redis_info
v0.10.0 (2017-08-16)¶
log redis version when starting worker, fix #64
log “connection success” when connecting to redis after connection failures, fix #67
add job ids, for now they’re just used in logging, fix #53
v0.9.0 (2017-06-23)¶
allow set encoding in msgpack for jobs #49
cron tasks allowing scheduling of functions in the future #50
Breaking change: switch
to_unix_ms
to just return the timestamp int, addto_unix_ms_tz
to return tz offset too
v0.8.1 (2017-06-05)¶
uprev setup requires
correct setup arguments
v0.8.0 (2017-06-05)¶
add
async-timeout
dependencyuse async-timeout around
shadow_factory
change logger name for control process log messages
use
Semaphore
rather thanasyncio.wait(...return_when=asyncio.FIRST_COMPLETED)
for improved performanceimprove log display
add timeout and retry logic to
RedisMixin.create_redis_pool
v0.7.0 (2017-06-01)¶
implementing reusable
Drain
which takes tasks from a redis list and allows them to be execute asynchronously.Drain uses python 3.6
async yield
, therefore python 3.5 is no longer supported.prevent repeated identical health check log messages
v0.6.1 (2017-05-06)¶
mypy at last passing, #30
adding trove classifiers, #29
v0.6.0 (2017-04-14)¶
add
StopJob
exception for cleaning ending jobs, #21add
flushdb
toMockRedis
, #23allow configurable length job logging via
log_curtail
onWorker
, #28
v0.5.2 (2017-02-25)¶
add
shadow_kwargs
method toBaseWorker
to make customising actors easier.
v0.5.1 (2017-02-25)¶
reimplement worker reuse as it turned out to be useful in tests.
v0.5.0 (2017-02-20)¶
use
gather
rather thanwait
for startup and shutdown so exceptions propagate.add
--check
option to confirm arq worker is running.
v0.4.1 (2017-02-11)¶
fix issue with
Concurrent
class binding with multiple actor instances.
v0.4.0 (2017-02-10)¶
improving naming of log handlers and formatters
upgrade numerous packages, nothing significant
add
startup
andshutdown
methods to actorsswitch
@concurrent
to return aConcurrent
instance so the direct method is accessible via<func>.direct
v0.3.2 (2017-01-24)¶
improved solution for preventing new jobs starting when the worker is about to stop
switch
SIGRTMIN
>SIGUSR1
to work with mac
v0.3.1 (2017-01-20)¶
fix main process signal handling so the worker shuts down when just the main process receives a signal
re-enqueue un-started jobs popped from the queue if the worker is about to exit
v0.3.0 (2017-01-19)¶
rename settings class to
RedisSettings
and simplify significantly
v0.2.0 (2016-12-09)¶
add
concurrency_enabled
argument to aid in testingfix conflict with unitest.mock
v0.1.0 (2016-12-06)¶
prevent logs disabling other logs
v0.0.6 (2016-08-14)¶
first proper release