Channels

This is the API documentation for the job channels and the scheduling mechanisms of the job runner.

These classes are not intended for use by module developers.

class connector.jobrunner.channels.Channel(name, parent, capacity=None, sequential=False)[source]

Bases: object

A channel for jobs, with a maximum capacity.

When jobs are created by connector modules, they may be associated to a job channel. Jobs with no channel are inserted into the root channel.

Job channels are joined in a hierarchy down to the root channel. When a job channel has available capacity, jobs are dequeued, marked as running in the channel and are inserted into the queue of the parent channel where they wait for available capacity and so on.

Job channels can be visualized as water channels with a given flow limit (= capacity). Channels are joined together in a downstream channel and the flow limit of the downstream channel limits upstream channels.:

---------------------+
                     |
                     |
 Ch. A C:4,Q:12,R:4  +-----------------------

---------------------+  Ch. root C:5,Q:0,R:4
                     |
---------------------+
 Ch. B C:1,Q:0,R:0
---------------------+-----------------------

The above diagram illustrates two channels joining in the root channel. The root channel has a capacity of 5, and 4 running jobs coming from Channel A. Channel A has a capacity of 4, all in use (passed down to the root channel), and 12 jobs enqueued. Channel B has a capacity of 1, none in use. This means that whenever a new job comes in channel B, there will be available room for it to run in the root channel.

Note that from the point of view of a channel, ‘running’ means enqueued in the downstream channel. Only jobs marked running in the root channel are actually sent to Odoo for execution.

Should a downstream channel have less capacity than its upstream channels, jobs going downstream will be enqueued in the downstream channel, and compete normally according to their properties (priority, etc).

Using this technique, it is possible to enforce sequence in a channel with a capacity of 1. It is also possible to dedicate a channel with a limited capacity for application-autocreated subchannels without risking to overflow the system.

configure(config)[source]

Configure a channel from a dictionary.

Supported keys are:

  • capacity
  • sequential
fullname

The full name of the channel, in dot separated notation.

get_jobs_to_run(now)[source]

Get jobs that are ready to run in channel.

This works by enqueuing jobs that are ready to run in children channels, then yielding jobs from the channel queue until capacity jobs are marked running in the channel.

Parameters:now – the current datetime using a type that is comparable to jobs eta attribute
Returns:iterator of connector.jobrunner.ChannelJob
get_subchannel_by_name(subchannel_name)[source]
remove(job)[source]

Remove a job from the channel.

set_done(job)[source]

Mark a job as done.

This removes it from the channel queue.

set_failed(job)[source]

Mark the job as failed.

set_pending(job)[source]

Mark a job as pending.

This puts the job in the channel queue and remove it from parent channels queues.

set_running(job)[source]

Mark a job as running.

This also marks the job as running in parent channels.

class connector.jobrunner.channels.ChannelJob(db_name, channel, uuid, seq, date_created, priority, eta)[source]

Bases: object

A channel job is attached to a channel and holds the properties of a job that are necessary to prioritise them.

Channel jobs are comparable according to the following rules:
  • jobs with an eta come before all other jobs
  • then jobs with a smaller eta come first
  • then jobs with smaller priority come first
  • then jobs with a smaller creation time come first
  • then jobs with a smaller sequence come first

Here are some examples.

j1 comes before j2 before it has a smaller date_created

>>> j1 = ChannelJob(None, None, 1,
...                 seq=0, date_created=1, priority=9, eta=None)
>>> j1
<ChannelJob 1>
>>> j2 = ChannelJob(None, None, 2,
...                 seq=0, date_created=2, priority=9, eta=None)
>>> j1 < j2
True

j3 comes first because it has lower priority, despite having a creation date after j1 and j2

>>> j3 = ChannelJob(None, None, 3,
...                 seq=0, date_created=3, priority=2, eta=None)
>>> j3 < j1
True

j4 and j5 comes even before j3, because they have an eta

>>> j4 = ChannelJob(None, None, 4,
...                 seq=0, date_created=4, priority=9, eta=9)
>>> j5 = ChannelJob(None, None, 5,
...                 seq=0, date_created=5, priority=9, eta=9)
>>> j4 < j5 < j3
True

j6 has same date_created and priority as j5 but a smaller eta

>>> j6 = ChannelJob(None, None, 6,
...                 seq=0, date_created=5, priority=9, eta=2)
>>> j6 < j4 < j5
True

Here is the complete suite:

>>> j6 < j4 < j5 < j3 < j1 < j2
True

j0 has the same properties as j1 but they are not considered equal as they are different instances

>>> j0 = ChannelJob(None, None, 1,
...                 seq=0, date_created=1, priority=9, eta=None)
>>> j0 == j1
False
>>> j0 == j0
True
class connector.jobrunner.channels.ChannelManager[source]

Bases: object

High level interface for channels

This class handles:

  • configuration of channels
  • high level api to create and remove jobs (notify, remove_job, remove_db)
  • get jobs to run

Here is how the runner will use it.

Let’s create a channel manager and configure it.

>>> from pprint import pprint as pp
>>> cm = ChannelManager()
>>> cm.simple_configure('root:4,A:4,B:1')
>>> db = 'db'

Add a few jobs in channel A with priority 10

>>> cm.notify(db, 'A', 'A1', 1, 0, 10, None, 'pending')
>>> cm.notify(db, 'A', 'A2', 2, 0, 10, None, 'pending')
>>> cm.notify(db, 'A', 'A3', 3, 0, 10, None, 'pending')
>>> cm.notify(db, 'A', 'A4', 4, 0, 10, None, 'pending')
>>> cm.notify(db, 'A', 'A5', 5, 0, 10, None, 'pending')
>>> cm.notify(db, 'A', 'A6', 6, 0, 10, None, 'pending')

Add a few jobs in channel B with priority 5

>>> cm.notify(db, 'B', 'B1', 1, 0, 5, None, 'pending')
>>> cm.notify(db, 'B', 'B2', 2, 0, 5, None, 'pending')

We must now run one job from queue B which has a capacity of 1 and 3 jobs from queue A so the root channel capacity of 4 is filled.

>>> pp(list(cm.get_jobs_to_run(now=100)))
[<ChannelJob B1>, <ChannelJob A1>, <ChannelJob A2>, <ChannelJob A3>]

Job A2 is done. Next job to run is A5, even if we have higher priority job in channel B, because channel B has a capacity of 1.

>>> cm.notify(db, 'A', 'A2', 2, 0, 10, None, 'done')
>>> pp(list(cm.get_jobs_to_run(now=100)))
[<ChannelJob A4>]

Job B1 is done. Next job to run is B2 because it has higher priority.

>>> cm.notify(db, 'B', 'B1', 1, 0, 5, None, 'done')
>>> pp(list(cm.get_jobs_to_run(now=100)))
[<ChannelJob B2>]

Let’s say A1 is done and A6 gets a higher priority. A6 will run next.

>>> cm.notify(db, 'A', 'A1', 1, 0, 10, None, 'done')
>>> cm.notify(db, 'A', 'A6', 6, 0, 5, None, 'pending')
>>> pp(list(cm.get_jobs_to_run(now=100)))
[<ChannelJob A6>]
get_channel_by_name(channel_name, autocreate=False)[source]

Return a Channel object by its name.

If it does not exist and autocreate is True, it is created with a default configuration and inserted in the Channels structure. If autocreate is False and the channel does not exist, an exception is raised.

>>> cm = ChannelManager()
>>> c = cm.get_channel_by_name('root', autocreate=False)
>>> c.name
'root'
>>> c.fullname
'root'
>>> c = cm.get_channel_by_name('root.sub', autocreate=True)
>>> c.name
'sub'
>>> c.fullname
'root.sub'
>>> c = cm.get_channel_by_name('sub', autocreate=True)
>>> c.name
'sub'
>>> c.fullname
'root.sub'
>>> c = cm.get_channel_by_name('autosub.sub', autocreate=True)
>>> c.name
'sub'
>>> c.fullname
'root.autosub.sub'
>>> c = cm.get_channel_by_name(None)
>>> c.fullname
'root'
>>> c = cm.get_channel_by_name('root.sub')
>>> c.fullname
'root.sub'
>>> c = cm.get_channel_by_name('sub')
>>> c.fullname
'root.sub'
get_channel_from_config(config)[source]

Return a Channel object from a parsed configuration.

If the channel does not exist it is created. The configuration is applied on the channel before returning it. If some of the parent channels are missing when creating a subchannel, the parent channels are auto created with an infinite capacity (except for the root channel, which defaults to a capacity of 1 when not configured explicity).

get_jobs_to_run(now)[source]
notify(db_name, channel_name, uuid, seq, date_created, priority, eta, state)[source]
classmethod parse_simple_config(config_string)[source]

Parse a simple channels configuration string.

The general form is as follow: channel(.subchannel)*(:capacity(:key(=value)?)*)? [, …]

If capacity is absent, it defaults to 1. If a key is present without value, it gets True as value. When declaring subchannels, the root channel may be omitted (ie sub:4 is the same as root.sub:4).

Returns a list of channel configuration dictionaries.

>>> from pprint import pprint as pp
>>> pp(ChannelManager.parse_simple_config('root:4'))
[{'capacity': 4, 'name': 'root'}]
>>> pp(ChannelManager.parse_simple_config('root:4,root.sub:2'))
[{'capacity': 4, 'name': 'root'}, {'capacity': 2, 'name': 'root.sub'}]
>>> pp(ChannelManager.parse_simple_config('root:4,root.sub:2:'
...                                       'sequential:k=v'))
[{'capacity': 4, 'name': 'root'},
 {'capacity': 2, 'k': 'v', 'name': 'root.sub', 'sequential': True}]
>>> pp(ChannelManager.parse_simple_config('root'))
[{'capacity': 1, 'name': 'root'}]
>>> pp(ChannelManager.parse_simple_config('sub:2'))
[{'capacity': 2, 'name': 'sub'}]

It ignores whitespace around values, and drops empty entries which would be generated by trailing commas, or commented lines on the Odoo config file.

>>> pp(ChannelManager.parse_simple_config('''
...     root : 4,
...     ,
...     foo bar:1: k=va lue,
... '''))
[{'capacity': 4, 'name': 'root'},
 {'capacity': 1, 'k': 'va lue', 'name': 'foo bar'}]

It’s also possible to replace commas with line breaks, which is more readable if you’re taking the channel configuration from a ConfigParser file.

>>> pp(ChannelManager.parse_simple_config('''
...     root : 4
...     foo bar:1: k=va lue
...     baz
... '''))
[{'capacity': 4, 'name': 'root'},
 {'capacity': 1, 'k': 'va lue', 'name': 'foo bar'},
 {'capacity': 1, 'name': 'baz'}]
remove_db(db_name)[source]
remove_job(uuid)[source]
simple_configure(config_string)[source]

Configure the channel manager from a simple configuration string

>>> cm = ChannelManager()
>>> c = cm.get_channel_by_name('root')
>>> c.capacity
1
>>> cm.simple_configure('root:4,autosub.sub:2')
>>> cm.get_channel_by_name('root').capacity
4
>>> cm.get_channel_by_name('root.autosub').capacity
>>> cm.get_channel_by_name('root.autosub.sub').capacity
2
>>> cm.get_channel_by_name('autosub.sub').capacity
2
static split_strip(s, sep, maxsplit=-1)[source]

Split string and strip each component.

>>> ChannelManager.split_strip("foo: bar baz\n: fred:", ":")
['foo', 'bar baz', 'fred', '']
class connector.jobrunner.channels.ChannelQueue[source]

Bases: object

A channel queue is a priority queue for jobs that returns jobs with a past ETA first.

>>> q = ChannelQueue()
>>> j1 = ChannelJob(None, None, 1,
...                 seq=0, date_created=1, priority=1, eta=10)
>>> j2 = ChannelJob(None, None, 2,
...                 seq=0, date_created=2, priority=1, eta=None)
>>> j3 = ChannelJob(None, None, 3,
...                 seq=0, date_created=3, priority=1, eta=None)
>>> q.add(j1)
>>> q.add(j2)
>>> q.add(j3)
>>> q.pop(now=1)
<ChannelJob 2>
>>> q.pop(now=11)
<ChannelJob 1>
>>> q.pop(now=12)
<ChannelJob 3>
add(job)[source]
pop(now)[source]
remove(job)[source]
class connector.jobrunner.channels.PriorityQueue[source]

Bases: object

A priority queue that supports removing arbitrary objects.

Adding an object already in the queue is a no op. Popping an empty queue returns None.

>>> q = PriorityQueue()
>>> q.add(2)
>>> q.add(3)
>>> q.add(3)
>>> q.add(1)
>>> q[0]
1
>>> len(q)
3
>>> q.pop()
1
>>> q.remove(2)
>>> len(q)
1
>>> q[0]
3
>>> q.pop()
3
>>> q.pop()
>>> q.add(2)
>>> q.remove(2)
>>> q.add(2)
>>> q.pop()
2
add(o)[source]
pop()[source]
remove(o)[source]
class connector.jobrunner.channels.SafeSet[source]

Bases: set

A set that does not raise KeyError when removing non-existent items.

>>> s = SafeSet()
>>> s.remove(1)
>>> len(s)
0
>>> s.remove(1)
remove(o)[source]

Remove an element from a set; it must be a member.

If the element is not a member, raise a KeyError.