# Copyright (c) 2015-2016 ACSONE SA/NV (<http://acsone.eu>)
# Copyright 2015-2016 Camptocamp SA
# License LGPL-3.0 or later (http://www.gnu.org/licenses/lgpl.html)
import logging
from functools import total_ordering
from heapq import heappop, heappush
from weakref import WeakValueDictionary
from ..exception import ChannelNotFound
from ..job import DONE, ENQUEUED, FAILED, PENDING, STARTED
NOT_DONE = (PENDING, ENQUEUED, STARTED, FAILED)
_logger = logging.getLogger(__name__)
[docs]class PriorityQueue(object):
"""A priority queue that supports removing arbitrary objects.
Adding an object already in the queue is a no op.
Popping an empty queue returns None.
>>> q = PriorityQueue()
>>> q.add(2)
>>> q.add(3)
>>> q.add(3)
>>> q.add(1)
>>> q[0]
1
>>> len(q)
3
>>> q.pop()
1
>>> q.remove(2)
>>> len(q)
1
>>> q[0]
3
>>> q.pop()
3
>>> q.pop()
>>> q.add(2)
>>> q.remove(2)
>>> q.add(2)
>>> q.pop()
2
"""
def __init__(self):
self._heap = []
self._known = set() # all objects in the heap (including removed)
self._removed = set() # all objects that have been removed
def __len__(self):
return len(self._known) - len(self._removed)
def __getitem__(self, i):
if i != 0:
raise IndexError()
while True:
if not self._heap:
raise IndexError()
o = self._heap[0]
if o in self._removed:
o2 = heappop(self._heap)
assert o2 == o
self._removed.remove(o)
self._known.remove(o)
else:
return o
def __contains__(self, o):
return o in self._known and o not in self._removed
[docs] def add(self, o):
if o is None:
raise ValueError()
if o in self._removed:
self._removed.remove(o)
if o in self._known:
return
self._known.add(o)
heappush(self._heap, o)
[docs] def remove(self, o):
if o is None:
raise ValueError()
if o not in self._known:
return
if o not in self._removed:
self._removed.add(o)
[docs] def pop(self):
while True:
try:
o = heappop(self._heap)
except IndexError:
# queue is empty
return None
self._known.remove(o)
if o in self._removed:
self._removed.remove(o)
else:
return o
[docs]class SafeSet(set):
"""A set that does not raise KeyError when removing non-existent items.
>>> s = SafeSet()
>>> s.remove(1)
>>> len(s)
0
>>> s.remove(1)
"""
[docs] def remove(self, o):
# pylint: disable=missing-return,except-pass
try:
super(SafeSet, self).remove(o)
except KeyError:
pass
[docs]@total_ordering
class ChannelJob(object):
"""A channel job is attached to a channel and holds the properties of a
job that are necessary to prioritise them.
Channel jobs are comparable according to the following rules:
* jobs with an eta come before all other jobs
* then jobs with a smaller eta come first
* then jobs with a smaller priority come first
* then jobs with a smaller creation time come first
* then jobs with a smaller sequence come first
Here are some examples.
j1 comes before j2 because it has an earlier date_created
>>> j1 = ChannelJob(None, None, 1,
... seq=0, date_created=1, priority=9, eta=None)
>>> j1
<ChannelJob 1>
>>> j2 = ChannelJob(None, None, 2,
... seq=0, date_created=2, priority=9, eta=None)
>>> j1 < j2
True
j3 comes first because it has lower priority,
despite having a creation date after j1 and j2
>>> j3 = ChannelJob(None, None, 3,
... seq=0, date_created=3, priority=2, eta=None)
>>> j3 < j1
True
j4 and j5 comes even before j3, because they have an eta
>>> j4 = ChannelJob(None, None, 4,
... seq=0, date_created=4, priority=9, eta=9)
>>> j5 = ChannelJob(None, None, 5,
... seq=0, date_created=5, priority=9, eta=9)
>>> j4 < j5 < j3
True
j6 has same date_created and priority as j5 but a smaller eta
>>> j6 = ChannelJob(None, None, 6,
... seq=0, date_created=5, priority=9, eta=2)
>>> j6 < j4 < j5
True
Here is the complete suite:
>>> j6 < j4 < j5 < j3 < j1 < j2
True
j0 has the same properties as j1 but they are not considered
equal as they are different instances
>>> j0 = ChannelJob(None, None, 1,
... seq=0, date_created=1, priority=9, eta=None)
>>> j0 == j1
False
>>> j0 == j0
True
Comparison excluding eta:
>>> j1.sorting_key_ignoring_eta() < j2.sorting_key_ignoring_eta()
True
"""
def __init__(self, db_name, channel, uuid, seq, date_created, priority, eta):
self.db_name = db_name
self.channel = channel
self.uuid = uuid
self.seq = seq
self.date_created = date_created
self.priority = priority
self.eta = eta
def __repr__(self):
return "<ChannelJob %s>" % self.uuid
def __eq__(self, other):
return id(self) == id(other)
def __hash__(self):
return id(self)
[docs] def sorting_key(self):
return self.eta, self.priority, self.date_created, self.seq
[docs] def sorting_key_ignoring_eta(self):
return self.priority, self.date_created, self.seq
def __lt__(self, other):
if self.eta and not other.eta:
return True
elif not self.eta and other.eta:
return False
return self.sorting_key() < other.sorting_key()
[docs]class ChannelQueue(object):
"""A channel queue is a priority queue for jobs.
Jobs with an eta are set aside until their eta is past due, at
which point they start competing normally with other jobs.
>>> q = ChannelQueue()
>>> j1 = ChannelJob(None, None, 1,
... seq=0, date_created=1, priority=1, eta=10)
>>> j2 = ChannelJob(None, None, 2,
... seq=0, date_created=2, priority=1, eta=None)
>>> j3 = ChannelJob(None, None, 3,
... seq=0, date_created=3, priority=1, eta=None)
>>> q.add(j1)
>>> q.add(j2)
>>> q.add(j3)
Wakeup time is the eta of job 1.
>>> q.get_wakeup_time()
10
We have not reached the eta of job 1, so we get job 2.
>>> q.pop(now=1)
<ChannelJob 2>
Wakeup time is still the eta of job 1, and we get job 1 when we are past
it's eta.
>>> q.get_wakeup_time()
10
>>> q.pop(now=11)
<ChannelJob 1>
Now there is no wakeup time anymore, because no job have an eta.
>>> q.get_wakeup_time()
0
>>> q.pop(now=12)
<ChannelJob 3>
>>> q.get_wakeup_time()
0
>>> q.pop(now=13)
Observe that job with past eta still run after jobs with higher priority.
>>> j4 = ChannelJob(None, None, 4,
... seq=0, date_created=4, priority=10, eta=20)
>>> j5 = ChannelJob(None, None, 5,
... seq=0, date_created=5, priority=1, eta=None)
>>> q.add(j4)
>>> q.add(j5)
>>> q.get_wakeup_time()
20
>>> q.pop(21)
<ChannelJob 5>
>>> q.get_wakeup_time()
0
>>> q.pop(22)
<ChannelJob 4>
Test a sequential queue.
>>> sq = ChannelQueue(sequential=True)
>>> j6 = ChannelJob(None, None, 6,
... seq=0, date_created=6, priority=1, eta=None)
>>> j7 = ChannelJob(None, None, 7,
... seq=0, date_created=7, priority=1, eta=20)
>>> j8 = ChannelJob(None, None, 8,
... seq=0, date_created=8, priority=1, eta=None)
>>> sq.add(j6)
>>> sq.add(j7)
>>> sq.add(j8)
>>> sq.pop(10)
<ChannelJob 6>
>>> sq.pop(15)
>>> sq.pop(20)
<ChannelJob 7>
>>> sq.pop(30)
<ChannelJob 8>
"""
def __init__(self, sequential=False):
self._queue = PriorityQueue()
self._eta_queue = PriorityQueue()
self.sequential = sequential
def __len__(self):
return len(self._eta_queue) + len(self._queue)
def __contains__(self, o):
return o in self._eta_queue or o in self._queue
[docs] def add(self, job):
if job.eta:
self._eta_queue.add(job)
else:
self._queue.add(job)
[docs] def remove(self, job):
self._eta_queue.remove(job)
self._queue.remove(job)
[docs] def pop(self, now):
while self._eta_queue and self._eta_queue[0].eta <= now:
eta_job = self._eta_queue.pop()
eta_job.eta = None
self._queue.add(eta_job)
if self.sequential and self._eta_queue and self._queue:
eta_job = self._eta_queue[0]
job = self._queue[0]
if eta_job.sorting_key_ignoring_eta() < job.sorting_key_ignoring_eta():
# eta ignored, the job with eta has higher priority
# than the job without eta; since it's a sequential
# queue we wait until eta
return None
return self._queue.pop()
[docs] def get_wakeup_time(self, wakeup_time=0):
if self._eta_queue:
if not wakeup_time:
wakeup_time = self._eta_queue[0].eta
else:
wakeup_time = min(wakeup_time, self._eta_queue[0].eta)
return wakeup_time
[docs]class Channel(object):
"""A channel for jobs, with a maximum capacity.
When jobs are created by queue_job modules, they may be associated
to a job channel. Jobs with no channel are inserted into the root channel.
Job channels are joined in a hierarchy down to the root channel.
When a job channel has available capacity, jobs are dequeued, marked
as running in the channel and are inserted into the queue of the
parent channel where they wait for available capacity and so on.
Job channels can be visualized as water channels with a given flow
limit (= capacity). Channels are joined together in a downstream channel
and the flow limit of the downstream channel limits upstream channels.::
---------------------+
|
|
Ch. A C:4,Q:12,R:4 +-----------------------
---------------------+ Ch. root C:5,Q:0,R:4
|
---------------------+
Ch. B C:1,Q:0,R:0
---------------------+-----------------------
The above diagram illustrates two channels joining in the root channel.
The root channel has a capacity of 5, and 4 running jobs coming from
Channel A. Channel A has a capacity of 4, all in use (passed down to the
root channel), and 12 jobs enqueued. Channel B has a capacity of 1,
none in use. This means that whenever a new job comes in channel B,
there will be available room for it to run in the root channel.
Note that from the point of view of a channel, 'running' means enqueued
in the downstream channel. Only jobs marked running in the root channel
are actually sent to Odoo for execution.
Should a downstream channel have less capacity than its upstream channels,
jobs going downstream will be enqueued in the downstream channel,
and compete normally according to their properties (priority, etc).
Using this technique, it is possible to enforce sequence in a channel
with a capacity of 1. It is also possible to dedicate a channel with a
limited capacity for application-autocreated subchannels
without risking to overflow the system.
"""
def __init__(self, name, parent, capacity=None, sequential=False, throttle=0):
self.name = name
self.parent = parent
if self.parent:
self.parent.children[name] = self
self.children = {}
self._queue = ChannelQueue()
self._running = SafeSet()
self._failed = SafeSet()
self._pause_until = 0 # utc seconds since the epoch
self.capacity = capacity
self.throttle = throttle # seconds
self.sequential = sequential
@property
def sequential(self):
return self._queue.sequential
@sequential.setter
def sequential(self, val):
self._queue.sequential = val
@property
def fullname(self):
"""The full name of the channel, in dot separated notation."""
if self.parent:
return self.parent.fullname + "." + self.name
else:
return self.name
[docs] def get_subchannel_by_name(self, subchannel_name):
return self.children.get(subchannel_name)
def __str__(self):
capacity = "∞" if self.capacity is None else str(self.capacity)
return "%s(C:%s,Q:%d,R:%d,F:%d)" % (
self.fullname,
capacity,
len(self._queue),
len(self._running),
len(self._failed),
)
[docs] def remove(self, job):
"""Remove a job from the channel."""
self._queue.remove(job)
self._running.remove(job)
self._failed.remove(job)
if self.parent:
self.parent.remove(job)
[docs] def set_done(self, job):
"""Mark a job as done.
This removes it from the channel queue.
"""
self.remove(job)
_logger.debug("job %s marked done in channel %s", job.uuid, self)
[docs] def set_pending(self, job):
"""Mark a job as pending.
This puts the job in the channel queue and remove it
from parent channels queues.
"""
if job not in self._queue:
self._queue.add(job)
self._running.remove(job)
self._failed.remove(job)
if self.parent:
self.parent.remove(job)
_logger.debug("job %s marked pending in channel %s", job.uuid, self)
[docs] def set_running(self, job):
"""Mark a job as running.
This also marks the job as running in parent channels.
"""
if job not in self._running:
self._queue.remove(job)
self._running.add(job)
self._failed.remove(job)
if self.parent:
self.parent.set_running(job)
_logger.debug("job %s marked running in channel %s", job.uuid, self)
[docs] def set_failed(self, job):
"""Mark the job as failed. """
if job not in self._failed:
self._queue.remove(job)
self._running.remove(job)
self._failed.add(job)
if self.parent:
self.parent.remove(job)
_logger.debug("job %s marked failed in channel %s", job.uuid, self)
[docs] def has_capacity(self):
if self.sequential and self._failed:
# a sequential queue blocks on failed jobs
return False
if not self.capacity:
# unlimited capacity
return True
return len(self._running) < self.capacity
[docs] def get_jobs_to_run(self, now):
"""Get jobs that are ready to run in channel.
This works by enqueuing jobs that are ready to run in children
channels, then yielding jobs from the channel queue until
``capacity`` jobs are marked running in the channel.
If the ``throttle`` option is set on the channel, then it yields
no job until at least throttle seconds have elapsed since the previous
yield.
:param now: the current datetime in seconds
:return: iterator of
:class:`odoo.addons.queue_job.jobrunner.ChannelJob`
"""
# enqueue jobs of children channels
for child in self.children.values():
for job in child.get_jobs_to_run(now):
self._queue.add(job)
# is this channel paused?
if self.throttle and self._pause_until:
if now < self._pause_until:
if self.has_capacity():
_logger.debug(
"channel %s paused until %s because "
"of throttle delay between jobs",
self,
self._pause_until,
)
return
else:
# unpause, this is important to avoid perpetual wakeup
# while the channel is at full capacity
self._pause_until = 0
_logger.debug("channel %s unpaused at %s", self, now)
# yield jobs that are ready to run, while we have capacity
while self.has_capacity():
job = self._queue.pop(now)
if not job:
return
self._running.add(job)
_logger.debug("job %s marked running in channel %s", job.uuid, self)
yield job
if self.throttle:
self._pause_until = now + self.throttle
_logger.debug("pausing channel %s until %s", self, self._pause_until)
return
[docs] def get_wakeup_time(self, wakeup_time=0):
if not self.has_capacity():
# this channel is full, do not request timed wakeup, as
# a notification will wakeup the runner when a job finishes
return wakeup_time
if self._pause_until:
# this channel is paused, request wakeup at the end of the pause
if not wakeup_time:
wakeup_time = self._pause_until
else:
wakeup_time = min(wakeup_time, self._pause_until)
# since this channel is paused, no need to look at the
# wakeup time of children nor eta jobs, as such jobs would not
# run anyway because they would end up in this paused channel
return wakeup_time
wakeup_time = self._queue.get_wakeup_time(wakeup_time)
for child in self.children.values():
wakeup_time = child.get_wakeup_time(wakeup_time)
return wakeup_time
[docs]def split_strip(s, sep, maxsplit=-1):
"""Split string and strip each component.
>>> split_strip("foo: bar baz\\n: fred:", ":")
['foo', 'bar baz', 'fred', '']
"""
return [x.strip() for x in s.split(sep, maxsplit)]
[docs]class ChannelManager(object):
"""High level interface for channels
This class handles:
* configuration of channels
* high level api to create and remove jobs (notify, remove_job, remove_db)
* get jobs to run
Here is how the runner will use it.
Let's create a channel manager and configure it.
>>> from pprint import pprint as pp
>>> cm = ChannelManager()
>>> cm.simple_configure('root:4,A:4,B:1')
>>> db = 'db'
Add a few jobs in channel A with priority 10
>>> cm.notify(db, 'A', 'A1', 1, 0, 10, None, 'pending')
>>> cm.notify(db, 'A', 'A2', 2, 0, 10, None, 'pending')
>>> cm.notify(db, 'A', 'A3', 3, 0, 10, None, 'pending')
>>> cm.notify(db, 'A', 'A4', 4, 0, 10, None, 'pending')
>>> cm.notify(db, 'A', 'A5', 5, 0, 10, None, 'pending')
>>> cm.notify(db, 'A', 'A6', 6, 0, 10, None, 'pending')
Add a few jobs in channel B with priority 5
>>> cm.notify(db, 'B', 'B1', 1, 0, 5, None, 'pending')
>>> cm.notify(db, 'B', 'B2', 2, 0, 5, None, 'pending')
We must now run one job from queue B which has a capacity of 1
and 3 jobs from queue A so the root channel capacity of 4 is filled.
>>> pp(list(cm.get_jobs_to_run(now=100)))
[<ChannelJob B1>, <ChannelJob A1>, <ChannelJob A2>, <ChannelJob A3>]
Job A2 is done. Next job to run is A5, even if we have
higher priority job in channel B, because channel B has a capacity of 1.
>>> cm.notify(db, 'A', 'A2', 2, 0, 10, None, 'done')
>>> pp(list(cm.get_jobs_to_run(now=100)))
[<ChannelJob A4>]
Job B1 is done. Next job to run is B2 because it has higher priority.
>>> cm.notify(db, 'B', 'B1', 1, 0, 5, None, 'done')
>>> pp(list(cm.get_jobs_to_run(now=100)))
[<ChannelJob B2>]
Let's say A1 is done and A6 gets a higher priority. A6 will run next.
>>> cm.notify(db, 'A', 'A1', 1, 0, 10, None, 'done')
>>> cm.notify(db, 'A', 'A6', 6, 0, 5, None, 'pending')
>>> pp(list(cm.get_jobs_to_run(now=100)))
[<ChannelJob A6>]
Let's test the throttling mechanism. Configure a 2 seconds delay
on channel A, end enqueue two jobs.
>>> cm = ChannelManager()
>>> cm.simple_configure('root:4,A:4:throttle=2')
>>> cm.notify(db, 'A', 'A1', 1, 0, 10, None, 'pending')
>>> cm.notify(db, 'A', 'A2', 2, 0, 10, None, 'pending')
We have only one job to run, because of the throttle.
>>> pp(list(cm.get_jobs_to_run(now=100)))
[<ChannelJob A1>]
>>> cm.get_wakeup_time()
102
We have no job to run, because of the throttle.
>>> pp(list(cm.get_jobs_to_run(now=101)))
[]
>>> cm.get_wakeup_time()
102
2 seconds later, we can run the other job (even though the first one
is still running, because we have enough capacity).
>>> pp(list(cm.get_jobs_to_run(now=102)))
[<ChannelJob A2>]
>>> cm.get_wakeup_time()
104
Let's test throttling in combination with a queue reaching full capacity.
>>> cm = ChannelManager()
>>> cm.simple_configure('root:4,T:2:throttle=2')
>>> cm.notify(db, 'T', 'T1', 1, 0, 10, None, 'pending')
>>> cm.notify(db, 'T', 'T2', 2, 0, 10, None, 'pending')
>>> cm.notify(db, 'T', 'T3', 3, 0, 10, None, 'pending')
>>> pp(list(cm.get_jobs_to_run(now=100)))
[<ChannelJob T1>]
>>> pp(list(cm.get_jobs_to_run(now=102)))
[<ChannelJob T2>]
Channel is now full, so no job to run even though throttling
delay is over.
>>> pp(list(cm.get_jobs_to_run(now=103)))
[]
>>> cm.get_wakeup_time() # no wakeup time, since queue is full
0
>>> pp(list(cm.get_jobs_to_run(now=104)))
[]
>>> cm.get_wakeup_time() # queue is still full
0
>>> cm.notify(db, 'T', 'T1', 1, 0, 10, None, 'done')
>>> pp(list(cm.get_jobs_to_run(now=105)))
[<ChannelJob T3>]
>>> cm.get_wakeup_time() # queue is full
0
>>> cm.notify(db, 'T', 'T2', 1, 0, 10, None, 'done')
>>> cm.get_wakeup_time()
107
Test wakeup time behaviour in presence of eta.
>>> cm = ChannelManager()
>>> cm.simple_configure('root:4,E:1')
>>> cm.notify(db, 'E', 'E1', 1, 0, 10, None, 'pending')
>>> cm.notify(db, 'E', 'E2', 2, 0, 10, None, 'pending')
>>> cm.notify(db, 'E', 'E3', 3, 0, 10, None, 'pending')
>>> pp(list(cm.get_jobs_to_run(now=100)))
[<ChannelJob E1>]
>>> pp(list(cm.get_jobs_to_run(now=101)))
[]
>>> cm.notify(db, 'E', 'E1', 1, 0, 10, 105, 'pending')
>>> cm.get_wakeup_time() # wakeup at eta
105
>>> pp(list(cm.get_jobs_to_run(now=102))) # but there is capacity
[<ChannelJob E2>]
>>> pp(list(cm.get_jobs_to_run(now=106))) # no capacity anymore
[]
>>> cm.get_wakeup_time() # no timed wakeup because no capacity
0
>>> cm.notify(db, 'E', 'E2', 1, 0, 10, None, 'done')
>>> cm.get_wakeup_time()
105
>>> pp(list(cm.get_jobs_to_run(now=107))) # no capacity anymore
[<ChannelJob E1>]
>>> cm.get_wakeup_time()
0
Test wakeup time behaviour in a sequential queue.
>>> cm = ChannelManager()
>>> cm.simple_configure('root:4,S:1:sequential')
>>> cm.notify(db, 'S', 'S1', 1, 0, 10, None, 'pending')
>>> cm.notify(db, 'S', 'S2', 2, 0, 10, None, 'pending')
>>> cm.notify(db, 'S', 'S3', 3, 0, 10, None, 'pending')
>>> pp(list(cm.get_jobs_to_run(now=100)))
[<ChannelJob S1>]
>>> cm.notify(db, 'S', 'S1', 1, 0, 10, None, 'failed')
>>> pp(list(cm.get_jobs_to_run(now=101)))
[]
>>> cm.notify(db, 'S', 'S2', 2, 0, 10, 105, 'pending')
>>> pp(list(cm.get_jobs_to_run(now=102)))
[]
No wakeup time because due to eta, because the sequential queue
is waiting for a failed job.
>>> cm.get_wakeup_time()
0
>>> cm.notify(db, 'S', 'S1', 1, 0, 10, None, 'pending')
>>> cm.get_wakeup_time()
105
>>> pp(list(cm.get_jobs_to_run(now=102)))
[<ChannelJob S1>]
>>> pp(list(cm.get_jobs_to_run(now=103)))
[]
>>> cm.notify(db, 'S', 'S1', 1, 0, 10, None, 'done')
At this stage, we have S2 with an eta of 105 and since the
queue is sequential, we wait for it.
>>> pp(list(cm.get_jobs_to_run(now=103)))
[]
>>> pp(list(cm.get_jobs_to_run(now=105)))
[<ChannelJob S2>]
>>> cm.notify(db, 'S', 'S2', 2, 0, 10, 105, 'done')
>>> pp(list(cm.get_jobs_to_run(now=105)))
[<ChannelJob S3>]
>>> cm.notify(db, 'S', 'S3', 3, 0, 10, None, 'done')
>>> pp(list(cm.get_jobs_to_run(now=105)))
[]
"""
def __init__(self):
self._jobs_by_uuid = WeakValueDictionary()
self._root_channel = Channel(name="root", parent=None, capacity=1)
self._channels_by_name = WeakValueDictionary(root=self._root_channel)
[docs] @classmethod
def parse_simple_config(cls, config_string):
"""Parse a simple channels configuration string.
The general form is as follow:
channel(.subchannel)*(:capacity(:key(=value)?)*)? [, ...]
If capacity is absent, it defaults to 1.
If a key is present without value, it gets True as value.
When declaring subchannels, the root channel may be omitted
(ie sub:4 is the same as root.sub:4).
Returns a list of channel configuration dictionaries.
>>> from pprint import pprint as pp
>>> pp(ChannelManager.parse_simple_config('root:4'))
[{'capacity': 4, 'name': 'root'}]
>>> pp(ChannelManager.parse_simple_config('root:4,root.sub:2'))
[{'capacity': 4, 'name': 'root'}, {'capacity': 2, 'name': 'root.sub'}]
>>> pp(ChannelManager.parse_simple_config('root:4,root.sub:2:'
... 'sequential:k=v'))
[{'capacity': 4, 'name': 'root'},
{'capacity': 2, 'k': 'v', 'name': 'root.sub', 'sequential': True}]
>>> pp(ChannelManager.parse_simple_config('root'))
[{'capacity': 1, 'name': 'root'}]
>>> pp(ChannelManager.parse_simple_config('sub:2'))
[{'capacity': 2, 'name': 'sub'}]
It ignores whitespace around values, and drops empty entries which
would be generated by trailing commas, or commented lines on the Odoo
config file.
>>> pp(ChannelManager.parse_simple_config('''
... root : 4,
... ,
... foo bar:1: k=va lue,
... '''))
[{'capacity': 4, 'name': 'root'},
{'capacity': 1, 'k': 'va lue', 'name': 'foo bar'}]
It's also possible to replace commas with line breaks, which is more
readable if the channel configuration comes from the odoo config file.
>>> pp(ChannelManager.parse_simple_config('''
... root : 4
... foo bar:1: k=va lue
... baz
... '''))
[{'capacity': 4, 'name': 'root'},
{'capacity': 1, 'k': 'va lue', 'name': 'foo bar'},
{'capacity': 1, 'name': 'baz'}]
"""
res = []
config_string = config_string.replace("\n", ",")
for channel_config_string in split_strip(config_string, ","):
if not channel_config_string:
# ignore empty entries (commented lines, trailing commas)
continue
config = {}
config_items = split_strip(channel_config_string, ":")
name = config_items[0]
if not name:
raise ValueError(
"Invalid channel config %s: missing channel name" % config_string
)
config["name"] = name
if len(config_items) > 1:
capacity = config_items[1]
try:
config["capacity"] = int(capacity)
except Exception:
raise ValueError(
"Invalid channel config %s: "
"invalid capacity %s" % (config_string, capacity)
)
for config_item in config_items[2:]:
kv = split_strip(config_item, "=")
if len(kv) == 1:
k, v = kv[0], True
elif len(kv) == 2:
k, v = kv
else:
raise ValueError(
"Invalid channel config %s: "
"incorrect config item %s" % (config_string, config_item)
)
if k in config:
raise ValueError(
"Invalid channel config %s: "
"duplicate key %s" % (config_string, k)
)
config[k] = v
else:
config["capacity"] = 1
res.append(config)
return res
[docs] def get_channel_from_config(self, config):
"""Return a Channel object from a parsed configuration.
If the channel does not exist it is created.
The configuration is applied on the channel before returning it.
If some of the parent channels are missing when creating a subchannel,
the parent channels are auto created with an infinite capacity
(except for the root channel, which defaults to a capacity of 1
when not configured explicity).
"""
channel = self.get_channel_by_name(config["name"], autocreate=True)
channel.configure(config)
_logger.info("Configured channel: %s", channel)
return channel
[docs] def get_channel_by_name(self, channel_name, autocreate=False):
"""Return a Channel object by its name.
If it does not exist and autocreate is True, it is created
with a default configuration and inserted in the Channels structure.
If autocreate is False and the channel does not exist, an exception
is raised.
>>> cm = ChannelManager()
>>> c = cm.get_channel_by_name('root', autocreate=False)
>>> c.name
'root'
>>> c.fullname
'root'
>>> c = cm.get_channel_by_name('root.sub', autocreate=True)
>>> c.name
'sub'
>>> c.fullname
'root.sub'
>>> c = cm.get_channel_by_name('sub', autocreate=True)
>>> c.name
'sub'
>>> c.fullname
'root.sub'
>>> c = cm.get_channel_by_name('autosub.sub', autocreate=True)
>>> c.name
'sub'
>>> c.fullname
'root.autosub.sub'
>>> c = cm.get_channel_by_name(None)
>>> c.fullname
'root'
>>> c = cm.get_channel_by_name('root.sub')
>>> c.fullname
'root.sub'
>>> c = cm.get_channel_by_name('sub')
>>> c.fullname
'root.sub'
"""
if not channel_name or channel_name == self._root_channel.name:
return self._root_channel
if not channel_name.startswith(self._root_channel.name + "."):
channel_name = self._root_channel.name + "." + channel_name
if channel_name in self._channels_by_name:
return self._channels_by_name[channel_name]
if not autocreate:
raise ChannelNotFound("Channel %s not found" % channel_name)
parent = self._root_channel
for subchannel_name in channel_name.split(".")[1:]:
subchannel = parent.get_subchannel_by_name(subchannel_name)
if not subchannel:
subchannel = Channel(subchannel_name, parent, capacity=None)
self._channels_by_name[subchannel.fullname] = subchannel
parent = subchannel
return parent
[docs] def notify(
self, db_name, channel_name, uuid, seq, date_created, priority, eta, state
):
try:
channel = self.get_channel_by_name(channel_name)
except ChannelNotFound:
_logger.warning(
"unknown channel %s, using root channel for job %s", channel_name, uuid
)
channel = self._root_channel
job = self._jobs_by_uuid.get(uuid)
if job:
# db_name is invariant
assert job.db_name == db_name
# date_created is invariant
assert job.date_created == date_created
# if one of the job properties that influence
# scheduling order has changed, we remove the job
# from the queues and create a new job object
if (
seq != job.seq
or priority != job.priority
or eta != job.eta
or channel != job.channel
):
_logger.debug("job %s properties changed, rescheduling it", uuid)
self.remove_job(uuid)
job = None
if not job:
job = ChannelJob(db_name, channel, uuid, seq, date_created, priority, eta)
self._jobs_by_uuid[uuid] = job
# state transitions
if not state or state == DONE:
job.channel.set_done(job)
elif state == PENDING:
job.channel.set_pending(job)
elif state in (ENQUEUED, STARTED):
job.channel.set_running(job)
elif state == FAILED:
job.channel.set_failed(job)
else:
_logger.error("unexpected state %s for job %s", state, job)
[docs] def remove_job(self, uuid):
job = self._jobs_by_uuid.get(uuid)
if job:
job.channel.remove(job)
del self._jobs_by_uuid[job.uuid]
[docs] def remove_db(self, db_name):
for job in list(self._jobs_by_uuid.values()):
if job.db_name == db_name:
job.channel.remove(job)
del self._jobs_by_uuid[job.uuid]
[docs] def get_jobs_to_run(self, now):
return self._root_channel.get_jobs_to_run(now)
[docs] def get_wakeup_time(self):
return self._root_channel.get_wakeup_time()