Channels

This is the API documentation for the job channels and the scheduling mechanisms of the job runner.

These classes are not intended for use by module developers.

class odoo.addons.queue_job.jobrunner.channels.PriorityQueue[source]

Bases: object

A priority queue that supports removing arbitrary objects.

Adding an object already in the queue is a no op. Popping an empty queue returns None.

>>> q = PriorityQueue()
>>> q.add(2)
>>> q.add(3)
>>> q.add(3)
>>> q.add(1)
>>> q[0]
1
>>> len(q)
3
>>> q.pop()
1
>>> q.remove(2)
>>> len(q)
1
>>> q[0]
3
>>> q.pop()
3
>>> q.pop()
>>> q.add(2)
>>> q.remove(2)
>>> q.add(2)
>>> q.pop()
2
add(o)[source]
remove(o)[source]
pop()[source]
class odoo.addons.queue_job.jobrunner.channels.SafeSet[source]

Bases: set

A set that does not raise KeyError when removing non-existent items.

>>> s = SafeSet()
>>> s.remove(1)
>>> len(s)
0
>>> s.remove(1)
remove(o)[source]
class odoo.addons.queue_job.jobrunner.channels.ChannelJob(db_name, channel, uuid, seq, date_created, priority, eta)[source]

Bases: object

A channel job is attached to a channel and holds the properties of a job that are necessary to prioritise them.

Channel jobs are comparable according to the following rules:
  • jobs with an eta come before all other jobs
  • then jobs with a smaller eta come first
  • then jobs with a smaller priority come first
  • then jobs with a smaller creation time come first
  • then jobs with a smaller sequence come first

Here are some examples.

j1 comes before j2 before it has a smaller date_created

>>> j1 = ChannelJob(None, None, 1,
...                 seq=0, date_created=1, priority=9, eta=None)
>>> j1
<ChannelJob 1>
>>> j2 = ChannelJob(None, None, 2,
...                 seq=0, date_created=2, priority=9, eta=None)
>>> j1 < j2
True

j3 comes first because it has lower priority, despite having a creation date after j1 and j2

>>> j3 = ChannelJob(None, None, 3,
...                 seq=0, date_created=3, priority=2, eta=None)
>>> j3 < j1
True

j4 and j5 comes even before j3, because they have an eta

>>> j4 = ChannelJob(None, None, 4,
...                 seq=0, date_created=4, priority=9, eta=9)
>>> j5 = ChannelJob(None, None, 5,
...                 seq=0, date_created=5, priority=9, eta=9)
>>> j4 < j5 < j3
True

j6 has same date_created and priority as j5 but a smaller eta

>>> j6 = ChannelJob(None, None, 6,
...                 seq=0, date_created=5, priority=9, eta=2)
>>> j6 < j4 < j5
True

Here is the complete suite:

>>> j6 < j4 < j5 < j3 < j1 < j2
True

j0 has the same properties as j1 but they are not considered equal as they are different instances

>>> j0 = ChannelJob(None, None, 1,
...                 seq=0, date_created=1, priority=9, eta=None)
>>> j0 == j1
False
>>> j0 == j0
True

Comparison excluding eta:

>>> j1.cmp_no_eta(j2)
-1
cmp_no_eta(other)[source]
class odoo.addons.queue_job.jobrunner.channels.ChannelQueue(sequential=False)[source]

Bases: object

A channel queue is a priority queue for jobs.

Jobs with an eta are set aside until their eta is past due, at which point they start competing normally with other jobs.

>>> q = ChannelQueue()
>>> j1 = ChannelJob(None, None, 1,
...                 seq=0, date_created=1, priority=1, eta=10)
>>> j2 = ChannelJob(None, None, 2,
...                 seq=0, date_created=2, priority=1, eta=None)
>>> j3 = ChannelJob(None, None, 3,
...                 seq=0, date_created=3, priority=1, eta=None)
>>> q.add(j1)
>>> q.add(j2)
>>> q.add(j3)

Wakeup time is the eta of job 1.

>>> q.get_wakeup_time()
10

We have not reached the eta of job 1, so we get job 2.

>>> q.pop(now=1)
<ChannelJob 2>

Wakeup time is still the eta of job 1, and we get job 1 when we are past it’s eta.

>>> q.get_wakeup_time()
10
>>> q.pop(now=11)
<ChannelJob 1>

Now there is no wakeup time anymore, because no job have an eta.

>>> q.get_wakeup_time()
0
>>> q.pop(now=12)
<ChannelJob 3>
>>> q.get_wakeup_time()
0
>>> q.pop(now=13)

Observe that job with past eta still run after jobs with higher priority.

>>> j4 = ChannelJob(None, None, 4,
...                 seq=0, date_created=4, priority=10, eta=20)
>>> j5 = ChannelJob(None, None, 5,
...                 seq=0, date_created=5, priority=1, eta=None)
>>> q.add(j4)
>>> q.add(j5)
>>> q.get_wakeup_time()
20
>>> q.pop(21)
<ChannelJob 5>
>>> q.get_wakeup_time()
0
>>> q.pop(22)
<ChannelJob 4>

Test a sequential queue.

>>> sq = ChannelQueue(sequential=True)
>>> j6 = ChannelJob(None, None, 6,
...                 seq=0, date_created=6, priority=1, eta=None)
>>> j7 = ChannelJob(None, None, 7,
...                 seq=0, date_created=7, priority=1, eta=20)
>>> j8 = ChannelJob(None, None, 8,
...                 seq=0, date_created=8, priority=1, eta=None)
>>> sq.add(j6)
>>> sq.add(j7)
>>> sq.add(j8)
>>> sq.pop(10)
<ChannelJob 6>
>>> sq.pop(15)
>>> sq.pop(20)
<ChannelJob 7>
>>> sq.pop(30)
<ChannelJob 8>
add(job)[source]
remove(job)[source]
pop(now)[source]
get_wakeup_time(wakeup_time=0)[source]
class odoo.addons.queue_job.jobrunner.channels.Channel(name, parent, capacity=None, sequential=False, throttle=0)[source]

Bases: object

A channel for jobs, with a maximum capacity.

When jobs are created by queue_job modules, they may be associated to a job channel. Jobs with no channel are inserted into the root channel.

Job channels are joined in a hierarchy down to the root channel. When a job channel has available capacity, jobs are dequeued, marked as running in the channel and are inserted into the queue of the parent channel where they wait for available capacity and so on.

Job channels can be visualized as water channels with a given flow limit (= capacity). Channels are joined together in a downstream channel and the flow limit of the downstream channel limits upstream channels.:

---------------------+
                     |
                     |
 Ch. A C:4,Q:12,R:4  +-----------------------

---------------------+  Ch. root C:5,Q:0,R:4
                     |
---------------------+
 Ch. B C:1,Q:0,R:0
---------------------+-----------------------

The above diagram illustrates two channels joining in the root channel. The root channel has a capacity of 5, and 4 running jobs coming from Channel A. Channel A has a capacity of 4, all in use (passed down to the root channel), and 12 jobs enqueued. Channel B has a capacity of 1, none in use. This means that whenever a new job comes in channel B, there will be available room for it to run in the root channel.

Note that from the point of view of a channel, ‘running’ means enqueued in the downstream channel. Only jobs marked running in the root channel are actually sent to Odoo for execution.

Should a downstream channel have less capacity than its upstream channels, jobs going downstream will be enqueued in the downstream channel, and compete normally according to their properties (priority, etc).

Using this technique, it is possible to enforce sequence in a channel with a capacity of 1. It is also possible to dedicate a channel with a limited capacity for application-autocreated subchannels without risking to overflow the system.

sequential
configure(config)[source]

Configure a channel from a dictionary.

Supported keys are:

  • capacity
  • sequential
  • throttle
fullname

The full name of the channel, in dot separated notation.

get_subchannel_by_name(subchannel_name)[source]
remove(job)[source]

Remove a job from the channel.

set_done(job)[source]

Mark a job as done.

This removes it from the channel queue.

set_pending(job)[source]

Mark a job as pending.

This puts the job in the channel queue and remove it from parent channels queues.

set_running(job)[source]

Mark a job as running.

This also marks the job as running in parent channels.

set_failed(job)[source]

Mark the job as failed.

has_capacity()[source]
get_jobs_to_run(now)[source]

Get jobs that are ready to run in channel.

This works by enqueuing jobs that are ready to run in children channels, then yielding jobs from the channel queue until capacity jobs are marked running in the channel.

If the throttle option is set on the channel, then it yields no job until at least throttle seconds have elapsed since the previous yield.

Parameters:now – the current datetime in seconds
Returns:iterator of odoo.addons.queue_job.jobrunner.ChannelJob
get_wakeup_time(wakeup_time=0)[source]
odoo.addons.queue_job.jobrunner.channels.split_strip(s, sep, maxsplit=-1)[source]

Split string and strip each component.

>>> split_strip("foo: bar baz\n: fred:", ":")
['foo', 'bar baz', 'fred', '']
class odoo.addons.queue_job.jobrunner.channels.ChannelManager[source]

Bases: object

High level interface for channels

This class handles:

  • configuration of channels
  • high level api to create and remove jobs (notify, remove_job, remove_db)
  • get jobs to run

Here is how the runner will use it.

Let’s create a channel manager and configure it.

>>> from pprint import pprint as pp
>>> cm = ChannelManager()
>>> cm.simple_configure('root:4,A:4,B:1')
>>> db = 'db'

Add a few jobs in channel A with priority 10

>>> cm.notify(db, 'A', 'A1', 1, 0, 10, None, 'pending')
>>> cm.notify(db, 'A', 'A2', 2, 0, 10, None, 'pending')
>>> cm.notify(db, 'A', 'A3', 3, 0, 10, None, 'pending')
>>> cm.notify(db, 'A', 'A4', 4, 0, 10, None, 'pending')
>>> cm.notify(db, 'A', 'A5', 5, 0, 10, None, 'pending')
>>> cm.notify(db, 'A', 'A6', 6, 0, 10, None, 'pending')

Add a few jobs in channel B with priority 5

>>> cm.notify(db, 'B', 'B1', 1, 0, 5, None, 'pending')
>>> cm.notify(db, 'B', 'B2', 2, 0, 5, None, 'pending')

We must now run one job from queue B which has a capacity of 1 and 3 jobs from queue A so the root channel capacity of 4 is filled.

>>> pp(list(cm.get_jobs_to_run(now=100)))
[<ChannelJob B1>, <ChannelJob A1>, <ChannelJob A2>, <ChannelJob A3>]

Job A2 is done. Next job to run is A5, even if we have higher priority job in channel B, because channel B has a capacity of 1.

>>> cm.notify(db, 'A', 'A2', 2, 0, 10, None, 'done')
>>> pp(list(cm.get_jobs_to_run(now=100)))
[<ChannelJob A4>]

Job B1 is done. Next job to run is B2 because it has higher priority.

>>> cm.notify(db, 'B', 'B1', 1, 0, 5, None, 'done')
>>> pp(list(cm.get_jobs_to_run(now=100)))
[<ChannelJob B2>]

Let’s say A1 is done and A6 gets a higher priority. A6 will run next.

>>> cm.notify(db, 'A', 'A1', 1, 0, 10, None, 'done')
>>> cm.notify(db, 'A', 'A6', 6, 0, 5, None, 'pending')
>>> pp(list(cm.get_jobs_to_run(now=100)))
[<ChannelJob A6>]

Let’s test the throttling mechanism. Configure a 2 seconds delay on channel A, end enqueue two jobs.

>>> cm = ChannelManager()
>>> cm.simple_configure('root:4,A:4:throttle=2')
>>> cm.notify(db, 'A', 'A1', 1, 0, 10, None, 'pending')
>>> cm.notify(db, 'A', 'A2', 2, 0, 10, None, 'pending')

We have only one job to run, because of the throttle.

>>> pp(list(cm.get_jobs_to_run(now=100)))
[<ChannelJob A1>]
>>> cm.get_wakeup_time()
102

We have no job to run, because of the throttle.

>>> pp(list(cm.get_jobs_to_run(now=101)))
[]
>>> cm.get_wakeup_time()
102

2 seconds later, we can run the other job (even though the first one is still running, because we have enough capacity).

>>> pp(list(cm.get_jobs_to_run(now=102)))
[<ChannelJob A2>]
>>> cm.get_wakeup_time()
104

Let’s test throttling in combination with a queue reaching full capacity.

>>> cm = ChannelManager()
>>> cm.simple_configure('root:4,T:2:throttle=2')
>>> cm.notify(db, 'T', 'T1', 1, 0, 10, None, 'pending')
>>> cm.notify(db, 'T', 'T2', 2, 0, 10, None, 'pending')
>>> cm.notify(db, 'T', 'T3', 3, 0, 10, None, 'pending')
>>> pp(list(cm.get_jobs_to_run(now=100)))
[<ChannelJob T1>]
>>> pp(list(cm.get_jobs_to_run(now=102)))
[<ChannelJob T2>]

Channel is now full, so no job to run even though throttling delay is over.

>>> pp(list(cm.get_jobs_to_run(now=103)))
[]
>>> cm.get_wakeup_time()  # no wakeup time, since queue is full
0
>>> pp(list(cm.get_jobs_to_run(now=104)))
[]
>>> cm.get_wakeup_time()  # queue is still full
0
>>> cm.notify(db, 'T', 'T1', 1, 0, 10, None, 'done')
>>> pp(list(cm.get_jobs_to_run(now=105)))
[<ChannelJob T3>]
>>> cm.get_wakeup_time()  # queue is full
0
>>> cm.notify(db, 'T', 'T2', 1, 0, 10, None, 'done')
>>> cm.get_wakeup_time()
107

Test wakeup time behaviour in presence of eta.

>>> cm = ChannelManager()
>>> cm.simple_configure('root:4,E:1')
>>> cm.notify(db, 'E', 'E1', 1, 0, 10, None, 'pending')
>>> cm.notify(db, 'E', 'E2', 2, 0, 10, None, 'pending')
>>> cm.notify(db, 'E', 'E3', 3, 0, 10, None, 'pending')
>>> pp(list(cm.get_jobs_to_run(now=100)))
[<ChannelJob E1>]
>>> pp(list(cm.get_jobs_to_run(now=101)))
[]
>>> cm.notify(db, 'E', 'E1', 1, 0, 10, 105, 'pending')
>>> cm.get_wakeup_time()  # wakeup at eta
105
>>> pp(list(cm.get_jobs_to_run(now=102)))  # but there is capacity
[<ChannelJob E2>]
>>> pp(list(cm.get_jobs_to_run(now=106)))  # no capacity anymore
[]
>>> cm.get_wakeup_time()  # no timed wakeup because no capacity
0
>>> cm.notify(db, 'E', 'E2', 1, 0, 10, None, 'done')
>>> cm.get_wakeup_time()
105
>>> pp(list(cm.get_jobs_to_run(now=107)))  # no capacity anymore
[<ChannelJob E1>]
>>> cm.get_wakeup_time()
0

Test wakeup time behaviour in a sequential queue.

>>> cm = ChannelManager()
>>> cm.simple_configure('root:4,S:1:sequential')
>>> cm.notify(db, 'S', 'S1', 1, 0, 10, None, 'pending')
>>> cm.notify(db, 'S', 'S2', 2, 0, 10, None, 'pending')
>>> cm.notify(db, 'S', 'S3', 3, 0, 10, None, 'pending')
>>> pp(list(cm.get_jobs_to_run(now=100)))
[<ChannelJob S1>]
>>> cm.notify(db, 'S', 'S1', 1, 0, 10, None, 'failed')
>>> pp(list(cm.get_jobs_to_run(now=101)))
[]
>>> cm.notify(db, 'S', 'S2', 2, 0, 10, 105, 'pending')
>>> pp(list(cm.get_jobs_to_run(now=102)))
[]

No wakeup time because due to eta, because the sequential queue is waiting for a failed job.

>>> cm.get_wakeup_time()
0
>>> cm.notify(db, 'S', 'S1', 1, 0, 10, None, 'pending')
>>> cm.get_wakeup_time()
105
>>> pp(list(cm.get_jobs_to_run(now=102)))
[<ChannelJob S1>]
>>> pp(list(cm.get_jobs_to_run(now=103)))
[]
>>> cm.notify(db, 'S', 'S1', 1, 0, 10, None, 'done')

At this stage, we have S2 with an eta of 105 and since the queue is sequential, we wait for it.

>>> pp(list(cm.get_jobs_to_run(now=103)))
[]
>>> pp(list(cm.get_jobs_to_run(now=105)))
[<ChannelJob S2>]
>>> cm.notify(db, 'S', 'S2', 2, 0, 10, 105, 'done')
>>> pp(list(cm.get_jobs_to_run(now=105)))
[<ChannelJob S3>]
>>> cm.notify(db, 'S', 'S3', 3, 0, 10, None, 'done')
>>> pp(list(cm.get_jobs_to_run(now=105)))
[]
classmethod parse_simple_config(config_string)[source]

Parse a simple channels configuration string.

The general form is as follow: channel(.subchannel)*(:capacity(:key(=value)?)*)? [, …]

If capacity is absent, it defaults to 1. If a key is present without value, it gets True as value. When declaring subchannels, the root channel may be omitted (ie sub:4 is the same as root.sub:4).

Returns a list of channel configuration dictionaries.

>>> from pprint import pprint as pp
>>> pp(ChannelManager.parse_simple_config('root:4'))
[{'capacity': 4, 'name': 'root'}]
>>> pp(ChannelManager.parse_simple_config('root:4,root.sub:2'))
[{'capacity': 4, 'name': 'root'}, {'capacity': 2, 'name': 'root.sub'}]
>>> pp(ChannelManager.parse_simple_config('root:4,root.sub:2:'
...                                       'sequential:k=v'))
[{'capacity': 4, 'name': 'root'},
 {'capacity': 2, 'k': 'v', 'name': 'root.sub', 'sequential': True}]
>>> pp(ChannelManager.parse_simple_config('root'))
[{'capacity': 1, 'name': 'root'}]
>>> pp(ChannelManager.parse_simple_config('sub:2'))
[{'capacity': 2, 'name': 'sub'}]

It ignores whitespace around values, and drops empty entries which would be generated by trailing commas, or commented lines on the Odoo config file.

>>> pp(ChannelManager.parse_simple_config('''
...     root : 4,
...     ,
...     foo bar:1: k=va lue,
... '''))
[{'capacity': 4, 'name': 'root'},
 {'capacity': 1, 'k': 'va lue', 'name': 'foo bar'}]

It’s also possible to replace commas with line breaks, which is more readable if the channel configuration comes from the odoo config file.

>>> pp(ChannelManager.parse_simple_config('''
...     root : 4
...     foo bar:1: k=va lue
...     baz
... '''))
[{'capacity': 4, 'name': 'root'},
 {'capacity': 1, 'k': 'va lue', 'name': 'foo bar'},
 {'capacity': 1, 'name': 'baz'}]
simple_configure(config_string)[source]

Configure the channel manager from a simple configuration string

>>> cm = ChannelManager()
>>> c = cm.get_channel_by_name('root')
>>> c.capacity
1
>>> cm.simple_configure('root:4,autosub.sub:2,seq:1:sequential')
>>> cm.get_channel_by_name('root').capacity
4
>>> cm.get_channel_by_name('root').sequential
False
>>> cm.get_channel_by_name('root.autosub').capacity
>>> cm.get_channel_by_name('root.autosub.sub').capacity
2
>>> cm.get_channel_by_name('root.autosub.sub').sequential
False
>>> cm.get_channel_by_name('autosub.sub').capacity
2
>>> cm.get_channel_by_name('seq').capacity
1
>>> cm.get_channel_by_name('seq').sequential
True
get_channel_from_config(config)[source]

Return a Channel object from a parsed configuration.

If the channel does not exist it is created. The configuration is applied on the channel before returning it. If some of the parent channels are missing when creating a subchannel, the parent channels are auto created with an infinite capacity (except for the root channel, which defaults to a capacity of 1 when not configured explicity).

get_channel_by_name(channel_name, autocreate=False)[source]

Return a Channel object by its name.

If it does not exist and autocreate is True, it is created with a default configuration and inserted in the Channels structure. If autocreate is False and the channel does not exist, an exception is raised.

>>> cm = ChannelManager()
>>> c = cm.get_channel_by_name('root', autocreate=False)
>>> c.name
'root'
>>> c.fullname
'root'
>>> c = cm.get_channel_by_name('root.sub', autocreate=True)
>>> c.name
'sub'
>>> c.fullname
'root.sub'
>>> c = cm.get_channel_by_name('sub', autocreate=True)
>>> c.name
'sub'
>>> c.fullname
'root.sub'
>>> c = cm.get_channel_by_name('autosub.sub', autocreate=True)
>>> c.name
'sub'
>>> c.fullname
'root.autosub.sub'
>>> c = cm.get_channel_by_name(None)
>>> c.fullname
'root'
>>> c = cm.get_channel_by_name('root.sub')
>>> c.fullname
'root.sub'
>>> c = cm.get_channel_by_name('sub')
>>> c.fullname
'root.sub'
notify(db_name, channel_name, uuid, seq, date_created, priority, eta, state)[source]
remove_job(uuid)[source]
remove_db(db_name)[source]
get_jobs_to_run(now)[source]
get_wakeup_time()[source]