Source code for odoo.addons.queue_job.jobrunner.channels

# Copyright (c) 2015-2016 ACSONE SA/NV (<>)
# Copyright 2015-2016 Camptocamp SA
# License LGPL-3.0 or later (
import logging
from functools import total_ordering
from heapq import heappop, heappush
from weakref import WeakValueDictionary

from ..exception import ChannelNotFound


_logger = logging.getLogger(__name__)

[docs]class PriorityQueue(object): """A priority queue that supports removing arbitrary objects. Adding an object already in the queue is a no op. Popping an empty queue returns None. >>> q = PriorityQueue() >>> q.add(2) >>> q.add(3) >>> q.add(3) >>> q.add(1) >>> q[0] 1 >>> len(q) 3 >>> q.pop() 1 >>> q.remove(2) >>> len(q) 1 >>> q[0] 3 >>> q.pop() 3 >>> q.pop() >>> q.add(2) >>> q.remove(2) >>> q.add(2) >>> q.pop() 2 """ def __init__(self): self._heap = [] self._known = set() # all objects in the heap (including removed) self._removed = set() # all objects that have been removed def __len__(self): return len(self._known) - len(self._removed) def __getitem__(self, i): if i != 0: raise IndexError() while True: if not self._heap: raise IndexError() o = self._heap[0] if o in self._removed: o2 = heappop(self._heap) assert o2 == o self._removed.remove(o) self._known.remove(o) else: return o def __contains__(self, o): return o in self._known and o not in self._removed
[docs] def add(self, o): if o is None: raise ValueError() if o in self._removed: self._removed.remove(o) if o in self._known: return self._known.add(o) heappush(self._heap, o)
[docs] def remove(self, o): if o is None: raise ValueError() if o not in self._known: return if o not in self._removed: self._removed.add(o)
[docs] def pop(self): while True: try: o = heappop(self._heap) except IndexError: # queue is empty return None self._known.remove(o) if o in self._removed: self._removed.remove(o) else: return o
[docs]class SafeSet(set): """A set that does not raise KeyError when removing non-existent items. >>> s = SafeSet() >>> s.remove(1) >>> len(s) 0 >>> s.remove(1) """
[docs] def remove(self, o): # pylint: disable=missing-return,except-pass try: super(SafeSet, self).remove(o) except KeyError: pass
[docs]@total_ordering class ChannelJob(object): """A channel job is attached to a channel and holds the properties of a job that are necessary to prioritise them. Channel jobs are comparable according to the following rules: * jobs with an eta come before all other jobs * then jobs with a smaller eta come first * then jobs with a smaller priority come first * then jobs with a smaller creation time come first * then jobs with a smaller sequence come first Here are some examples. j1 comes before j2 because it has an earlier date_created >>> j1 = ChannelJob(None, None, 1, ... seq=0, date_created=1, priority=9, eta=None) >>> j1 <ChannelJob 1> >>> j2 = ChannelJob(None, None, 2, ... seq=0, date_created=2, priority=9, eta=None) >>> j1 < j2 True j3 comes first because it has lower priority, despite having a creation date after j1 and j2 >>> j3 = ChannelJob(None, None, 3, ... seq=0, date_created=3, priority=2, eta=None) >>> j3 < j1 True j4 and j5 comes even before j3, because they have an eta >>> j4 = ChannelJob(None, None, 4, ... seq=0, date_created=4, priority=9, eta=9) >>> j5 = ChannelJob(None, None, 5, ... seq=0, date_created=5, priority=9, eta=9) >>> j4 < j5 < j3 True j6 has same date_created and priority as j5 but a smaller eta >>> j6 = ChannelJob(None, None, 6, ... seq=0, date_created=5, priority=9, eta=2) >>> j6 < j4 < j5 True Here is the complete suite: >>> j6 < j4 < j5 < j3 < j1 < j2 True j0 has the same properties as j1 but they are not considered equal as they are different instances >>> j0 = ChannelJob(None, None, 1, ... seq=0, date_created=1, priority=9, eta=None) >>> j0 == j1 False >>> j0 == j0 True Comparison excluding eta: >>> j1.sorting_key_ignoring_eta() < j2.sorting_key_ignoring_eta() True """ def __init__(self, db_name, channel, uuid, seq, date_created, priority, eta): self.db_name = db_name = channel self.uuid = uuid self.seq = seq self.date_created = date_created self.priority = priority self.eta = eta def __repr__(self): return "<ChannelJob %s>" % self.uuid def __eq__(self, other): return id(self) == id(other) def __hash__(self): return id(self)
[docs] def sorting_key(self): return self.eta, self.priority, self.date_created, self.seq
[docs] def sorting_key_ignoring_eta(self): return self.priority, self.date_created, self.seq
def __lt__(self, other): if self.eta and not other.eta: return True elif not self.eta and other.eta: return False return self.sorting_key() < other.sorting_key()
[docs]class ChannelQueue(object): """A channel queue is a priority queue for jobs. Jobs with an eta are set aside until their eta is past due, at which point they start competing normally with other jobs. >>> q = ChannelQueue() >>> j1 = ChannelJob(None, None, 1, ... seq=0, date_created=1, priority=1, eta=10) >>> j2 = ChannelJob(None, None, 2, ... seq=0, date_created=2, priority=1, eta=None) >>> j3 = ChannelJob(None, None, 3, ... seq=0, date_created=3, priority=1, eta=None) >>> q.add(j1) >>> q.add(j2) >>> q.add(j3) Wakeup time is the eta of job 1. >>> q.get_wakeup_time() 10 We have not reached the eta of job 1, so we get job 2. >>> q.pop(now=1) <ChannelJob 2> Wakeup time is still the eta of job 1, and we get job 1 when we are past it's eta. >>> q.get_wakeup_time() 10 >>> q.pop(now=11) <ChannelJob 1> Now there is no wakeup time anymore, because no job have an eta. >>> q.get_wakeup_time() 0 >>> q.pop(now=12) <ChannelJob 3> >>> q.get_wakeup_time() 0 >>> q.pop(now=13) Observe that job with past eta still run after jobs with higher priority. >>> j4 = ChannelJob(None, None, 4, ... seq=0, date_created=4, priority=10, eta=20) >>> j5 = ChannelJob(None, None, 5, ... seq=0, date_created=5, priority=1, eta=None) >>> q.add(j4) >>> q.add(j5) >>> q.get_wakeup_time() 20 >>> q.pop(21) <ChannelJob 5> >>> q.get_wakeup_time() 0 >>> q.pop(22) <ChannelJob 4> Test a sequential queue. >>> sq = ChannelQueue(sequential=True) >>> j6 = ChannelJob(None, None, 6, ... seq=0, date_created=6, priority=1, eta=None) >>> j7 = ChannelJob(None, None, 7, ... seq=0, date_created=7, priority=1, eta=20) >>> j8 = ChannelJob(None, None, 8, ... seq=0, date_created=8, priority=1, eta=None) >>> sq.add(j6) >>> sq.add(j7) >>> sq.add(j8) >>> sq.pop(10) <ChannelJob 6> >>> sq.pop(15) >>> sq.pop(20) <ChannelJob 7> >>> sq.pop(30) <ChannelJob 8> """ def __init__(self, sequential=False): self._queue = PriorityQueue() self._eta_queue = PriorityQueue() self.sequential = sequential def __len__(self): return len(self._eta_queue) + len(self._queue) def __contains__(self, o): return o in self._eta_queue or o in self._queue
[docs] def add(self, job): if job.eta: self._eta_queue.add(job) else: self._queue.add(job)
[docs] def remove(self, job): self._eta_queue.remove(job) self._queue.remove(job)
[docs] def pop(self, now): while self._eta_queue and self._eta_queue[0].eta <= now: eta_job = self._eta_queue.pop() eta_job.eta = None self._queue.add(eta_job) if self.sequential and self._eta_queue and self._queue: eta_job = self._eta_queue[0] job = self._queue[0] if eta_job.sorting_key_ignoring_eta() < job.sorting_key_ignoring_eta(): # eta ignored, the job with eta has higher priority # than the job without eta; since it's a sequential # queue we wait until eta return None return self._queue.pop()
[docs] def get_wakeup_time(self, wakeup_time=0): if self._eta_queue: if not wakeup_time: wakeup_time = self._eta_queue[0].eta else: wakeup_time = min(wakeup_time, self._eta_queue[0].eta) return wakeup_time
[docs]class Channel(object): """A channel for jobs, with a maximum capacity. When jobs are created by queue_job modules, they may be associated to a job channel. Jobs with no channel are inserted into the root channel. Job channels are joined in a hierarchy down to the root channel. When a job channel has available capacity, jobs are dequeued, marked as running in the channel and are inserted into the queue of the parent channel where they wait for available capacity and so on. Job channels can be visualized as water channels with a given flow limit (= capacity). Channels are joined together in a downstream channel and the flow limit of the downstream channel limits upstream channels.:: ---------------------+ | | Ch. A C:4,Q:12,R:4 +----------------------- ---------------------+ Ch. root C:5,Q:0,R:4 | ---------------------+ Ch. B C:1,Q:0,R:0 ---------------------+----------------------- The above diagram illustrates two channels joining in the root channel. The root channel has a capacity of 5, and 4 running jobs coming from Channel A. Channel A has a capacity of 4, all in use (passed down to the root channel), and 12 jobs enqueued. Channel B has a capacity of 1, none in use. This means that whenever a new job comes in channel B, there will be available room for it to run in the root channel. Note that from the point of view of a channel, 'running' means enqueued in the downstream channel. Only jobs marked running in the root channel are actually sent to Odoo for execution. Should a downstream channel have less capacity than its upstream channels, jobs going downstream will be enqueued in the downstream channel, and compete normally according to their properties (priority, etc). Using this technique, it is possible to enforce sequence in a channel with a capacity of 1. It is also possible to dedicate a channel with a limited capacity for application-autocreated subchannels without risking to overflow the system. """ def __init__(self, name, parent, capacity=None, sequential=False, throttle=0): = name self.parent = parent if self.parent: self.parent.children[name] = self self.children = {} self._queue = ChannelQueue() self._running = SafeSet() self._failed = SafeSet() self._pause_until = 0 # utc seconds since the epoch self.capacity = capacity self.throttle = throttle # seconds self.sequential = sequential @property def sequential(self): return self._queue.sequential @sequential.setter def sequential(self, val): self._queue.sequential = val
[docs] def configure(self, config): """Configure a channel from a dictionary. Supported keys are: * capacity * sequential * throttle """ assert self.fullname.endswith(config["name"]) self.capacity = config.get("capacity", None) self.sequential = bool(config.get("sequential", False)) self.throttle = int(config.get("throttle", 0)) if self.sequential and self.capacity != 1: raise ValueError("A sequential channel must have a capacity of 1")
@property def fullname(self): """The full name of the channel, in dot separated notation.""" if self.parent: return self.parent.fullname + "." + else: return
[docs] def get_subchannel_by_name(self, subchannel_name): return self.children.get(subchannel_name)
def __str__(self): capacity = "∞" if self.capacity is None else str(self.capacity) return "%s(C:%s,Q:%d,R:%d,F:%d)" % ( self.fullname, capacity, len(self._queue), len(self._running), len(self._failed), )
[docs] def remove(self, job): """Remove a job from the channel.""" self._queue.remove(job) self._running.remove(job) self._failed.remove(job) if self.parent: self.parent.remove(job)
[docs] def set_done(self, job): """Mark a job as done. This removes it from the channel queue. """ self.remove(job) _logger.debug("job %s marked done in channel %s", job.uuid, self)
[docs] def set_pending(self, job): """Mark a job as pending. This puts the job in the channel queue and remove it from parent channels queues. """ if job not in self._queue: self._queue.add(job) self._running.remove(job) self._failed.remove(job) if self.parent: self.parent.remove(job) _logger.debug("job %s marked pending in channel %s", job.uuid, self)
[docs] def set_running(self, job): """Mark a job as running. This also marks the job as running in parent channels. """ if job not in self._running: self._queue.remove(job) self._running.add(job) self._failed.remove(job) if self.parent: self.parent.set_running(job) _logger.debug("job %s marked running in channel %s", job.uuid, self)
[docs] def set_failed(self, job): """Mark the job as failed. """ if job not in self._failed: self._queue.remove(job) self._running.remove(job) self._failed.add(job) if self.parent: self.parent.remove(job) _logger.debug("job %s marked failed in channel %s", job.uuid, self)
[docs] def has_capacity(self): if self.sequential and self._failed: # a sequential queue blocks on failed jobs return False if not self.capacity: # unlimited capacity return True return len(self._running) < self.capacity
[docs] def get_jobs_to_run(self, now): """Get jobs that are ready to run in channel. This works by enqueuing jobs that are ready to run in children channels, then yielding jobs from the channel queue until ``capacity`` jobs are marked running in the channel. If the ``throttle`` option is set on the channel, then it yields no job until at least throttle seconds have elapsed since the previous yield. :param now: the current datetime in seconds :return: iterator of :class:`odoo.addons.queue_job.jobrunner.ChannelJob` """ # enqueue jobs of children channels for child in self.children.values(): for job in child.get_jobs_to_run(now): self._queue.add(job) # is this channel paused? if self.throttle and self._pause_until: if now < self._pause_until: if self.has_capacity(): _logger.debug( "channel %s paused until %s because " "of throttle delay between jobs", self, self._pause_until, ) return else: # unpause, this is important to avoid perpetual wakeup # while the channel is at full capacity self._pause_until = 0 _logger.debug("channel %s unpaused at %s", self, now) # yield jobs that are ready to run, while we have capacity while self.has_capacity(): job = self._queue.pop(now) if not job: return self._running.add(job) _logger.debug("job %s marked running in channel %s", job.uuid, self) yield job if self.throttle: self._pause_until = now + self.throttle _logger.debug("pausing channel %s until %s", self, self._pause_until) return
[docs] def get_wakeup_time(self, wakeup_time=0): if not self.has_capacity(): # this channel is full, do not request timed wakeup, as # a notification will wakeup the runner when a job finishes return wakeup_time if self._pause_until: # this channel is paused, request wakeup at the end of the pause if not wakeup_time: wakeup_time = self._pause_until else: wakeup_time = min(wakeup_time, self._pause_until) # since this channel is paused, no need to look at the # wakeup time of children nor eta jobs, as such jobs would not # run anyway because they would end up in this paused channel return wakeup_time wakeup_time = self._queue.get_wakeup_time(wakeup_time) for child in self.children.values(): wakeup_time = child.get_wakeup_time(wakeup_time) return wakeup_time
[docs]def split_strip(s, sep, maxsplit=-1): """Split string and strip each component. >>> split_strip("foo: bar baz\\n: fred:", ":") ['foo', 'bar baz', 'fred', ''] """ return [x.strip() for x in s.split(sep, maxsplit)]
[docs]class ChannelManager(object): """High level interface for channels This class handles: * configuration of channels * high level api to create and remove jobs (notify, remove_job, remove_db) * get jobs to run Here is how the runner will use it. Let's create a channel manager and configure it. >>> from pprint import pprint as pp >>> cm = ChannelManager() >>> cm.simple_configure('root:4,A:4,B:1') >>> db = 'db' Add a few jobs in channel A with priority 10 >>> cm.notify(db, 'A', 'A1', 1, 0, 10, None, 'pending') >>> cm.notify(db, 'A', 'A2', 2, 0, 10, None, 'pending') >>> cm.notify(db, 'A', 'A3', 3, 0, 10, None, 'pending') >>> cm.notify(db, 'A', 'A4', 4, 0, 10, None, 'pending') >>> cm.notify(db, 'A', 'A5', 5, 0, 10, None, 'pending') >>> cm.notify(db, 'A', 'A6', 6, 0, 10, None, 'pending') Add a few jobs in channel B with priority 5 >>> cm.notify(db, 'B', 'B1', 1, 0, 5, None, 'pending') >>> cm.notify(db, 'B', 'B2', 2, 0, 5, None, 'pending') We must now run one job from queue B which has a capacity of 1 and 3 jobs from queue A so the root channel capacity of 4 is filled. >>> pp(list(cm.get_jobs_to_run(now=100))) [<ChannelJob B1>, <ChannelJob A1>, <ChannelJob A2>, <ChannelJob A3>] Job A2 is done. Next job to run is A5, even if we have higher priority job in channel B, because channel B has a capacity of 1. >>> cm.notify(db, 'A', 'A2', 2, 0, 10, None, 'done') >>> pp(list(cm.get_jobs_to_run(now=100))) [<ChannelJob A4>] Job B1 is done. Next job to run is B2 because it has higher priority. >>> cm.notify(db, 'B', 'B1', 1, 0, 5, None, 'done') >>> pp(list(cm.get_jobs_to_run(now=100))) [<ChannelJob B2>] Let's say A1 is done and A6 gets a higher priority. A6 will run next. >>> cm.notify(db, 'A', 'A1', 1, 0, 10, None, 'done') >>> cm.notify(db, 'A', 'A6', 6, 0, 5, None, 'pending') >>> pp(list(cm.get_jobs_to_run(now=100))) [<ChannelJob A6>] Let's test the throttling mechanism. Configure a 2 seconds delay on channel A, end enqueue two jobs. >>> cm = ChannelManager() >>> cm.simple_configure('root:4,A:4:throttle=2') >>> cm.notify(db, 'A', 'A1', 1, 0, 10, None, 'pending') >>> cm.notify(db, 'A', 'A2', 2, 0, 10, None, 'pending') We have only one job to run, because of the throttle. >>> pp(list(cm.get_jobs_to_run(now=100))) [<ChannelJob A1>] >>> cm.get_wakeup_time() 102 We have no job to run, because of the throttle. >>> pp(list(cm.get_jobs_to_run(now=101))) [] >>> cm.get_wakeup_time() 102 2 seconds later, we can run the other job (even though the first one is still running, because we have enough capacity). >>> pp(list(cm.get_jobs_to_run(now=102))) [<ChannelJob A2>] >>> cm.get_wakeup_time() 104 Let's test throttling in combination with a queue reaching full capacity. >>> cm = ChannelManager() >>> cm.simple_configure('root:4,T:2:throttle=2') >>> cm.notify(db, 'T', 'T1', 1, 0, 10, None, 'pending') >>> cm.notify(db, 'T', 'T2', 2, 0, 10, None, 'pending') >>> cm.notify(db, 'T', 'T3', 3, 0, 10, None, 'pending') >>> pp(list(cm.get_jobs_to_run(now=100))) [<ChannelJob T1>] >>> pp(list(cm.get_jobs_to_run(now=102))) [<ChannelJob T2>] Channel is now full, so no job to run even though throttling delay is over. >>> pp(list(cm.get_jobs_to_run(now=103))) [] >>> cm.get_wakeup_time() # no wakeup time, since queue is full 0 >>> pp(list(cm.get_jobs_to_run(now=104))) [] >>> cm.get_wakeup_time() # queue is still full 0 >>> cm.notify(db, 'T', 'T1', 1, 0, 10, None, 'done') >>> pp(list(cm.get_jobs_to_run(now=105))) [<ChannelJob T3>] >>> cm.get_wakeup_time() # queue is full 0 >>> cm.notify(db, 'T', 'T2', 1, 0, 10, None, 'done') >>> cm.get_wakeup_time() 107 Test wakeup time behaviour in presence of eta. >>> cm = ChannelManager() >>> cm.simple_configure('root:4,E:1') >>> cm.notify(db, 'E', 'E1', 1, 0, 10, None, 'pending') >>> cm.notify(db, 'E', 'E2', 2, 0, 10, None, 'pending') >>> cm.notify(db, 'E', 'E3', 3, 0, 10, None, 'pending') >>> pp(list(cm.get_jobs_to_run(now=100))) [<ChannelJob E1>] >>> pp(list(cm.get_jobs_to_run(now=101))) [] >>> cm.notify(db, 'E', 'E1', 1, 0, 10, 105, 'pending') >>> cm.get_wakeup_time() # wakeup at eta 105 >>> pp(list(cm.get_jobs_to_run(now=102))) # but there is capacity [<ChannelJob E2>] >>> pp(list(cm.get_jobs_to_run(now=106))) # no capacity anymore [] >>> cm.get_wakeup_time() # no timed wakeup because no capacity 0 >>> cm.notify(db, 'E', 'E2', 1, 0, 10, None, 'done') >>> cm.get_wakeup_time() 105 >>> pp(list(cm.get_jobs_to_run(now=107))) # no capacity anymore [<ChannelJob E1>] >>> cm.get_wakeup_time() 0 Test wakeup time behaviour in a sequential queue. >>> cm = ChannelManager() >>> cm.simple_configure('root:4,S:1:sequential') >>> cm.notify(db, 'S', 'S1', 1, 0, 10, None, 'pending') >>> cm.notify(db, 'S', 'S2', 2, 0, 10, None, 'pending') >>> cm.notify(db, 'S', 'S3', 3, 0, 10, None, 'pending') >>> pp(list(cm.get_jobs_to_run(now=100))) [<ChannelJob S1>] >>> cm.notify(db, 'S', 'S1', 1, 0, 10, None, 'failed') >>> pp(list(cm.get_jobs_to_run(now=101))) [] >>> cm.notify(db, 'S', 'S2', 2, 0, 10, 105, 'pending') >>> pp(list(cm.get_jobs_to_run(now=102))) [] No wakeup time because due to eta, because the sequential queue is waiting for a failed job. >>> cm.get_wakeup_time() 0 >>> cm.notify(db, 'S', 'S1', 1, 0, 10, None, 'pending') >>> cm.get_wakeup_time() 105 >>> pp(list(cm.get_jobs_to_run(now=102))) [<ChannelJob S1>] >>> pp(list(cm.get_jobs_to_run(now=103))) [] >>> cm.notify(db, 'S', 'S1', 1, 0, 10, None, 'done') At this stage, we have S2 with an eta of 105 and since the queue is sequential, we wait for it. >>> pp(list(cm.get_jobs_to_run(now=103))) [] >>> pp(list(cm.get_jobs_to_run(now=105))) [<ChannelJob S2>] >>> cm.notify(db, 'S', 'S2', 2, 0, 10, 105, 'done') >>> pp(list(cm.get_jobs_to_run(now=105))) [<ChannelJob S3>] >>> cm.notify(db, 'S', 'S3', 3, 0, 10, None, 'done') >>> pp(list(cm.get_jobs_to_run(now=105))) [] """ def __init__(self): self._jobs_by_uuid = WeakValueDictionary() self._root_channel = Channel(name="root", parent=None, capacity=1) self._channels_by_name = WeakValueDictionary(root=self._root_channel)
[docs] @classmethod def parse_simple_config(cls, config_string): """Parse a simple channels configuration string. The general form is as follow: channel(.subchannel)*(:capacity(:key(=value)?)*)? [, ...] If capacity is absent, it defaults to 1. If a key is present without value, it gets True as value. When declaring subchannels, the root channel may be omitted (ie sub:4 is the same as root.sub:4). Returns a list of channel configuration dictionaries. >>> from pprint import pprint as pp >>> pp(ChannelManager.parse_simple_config('root:4')) [{'capacity': 4, 'name': 'root'}] >>> pp(ChannelManager.parse_simple_config('root:4,root.sub:2')) [{'capacity': 4, 'name': 'root'}, {'capacity': 2, 'name': 'root.sub'}] >>> pp(ChannelManager.parse_simple_config('root:4,root.sub:2:' ... 'sequential:k=v')) [{'capacity': 4, 'name': 'root'}, {'capacity': 2, 'k': 'v', 'name': 'root.sub', 'sequential': True}] >>> pp(ChannelManager.parse_simple_config('root')) [{'capacity': 1, 'name': 'root'}] >>> pp(ChannelManager.parse_simple_config('sub:2')) [{'capacity': 2, 'name': 'sub'}] It ignores whitespace around values, and drops empty entries which would be generated by trailing commas, or commented lines on the Odoo config file. >>> pp(ChannelManager.parse_simple_config(''' ... root : 4, ... , ... foo bar:1: k=va lue, ... ''')) [{'capacity': 4, 'name': 'root'}, {'capacity': 1, 'k': 'va lue', 'name': 'foo bar'}] It's also possible to replace commas with line breaks, which is more readable if the channel configuration comes from the odoo config file. >>> pp(ChannelManager.parse_simple_config(''' ... root : 4 ... foo bar:1: k=va lue ... baz ... ''')) [{'capacity': 4, 'name': 'root'}, {'capacity': 1, 'k': 'va lue', 'name': 'foo bar'}, {'capacity': 1, 'name': 'baz'}] """ res = [] config_string = config_string.replace("\n", ",") for channel_config_string in split_strip(config_string, ","): if not channel_config_string: # ignore empty entries (commented lines, trailing commas) continue config = {} config_items = split_strip(channel_config_string, ":") name = config_items[0] if not name: raise ValueError( "Invalid channel config %s: missing channel name" % config_string ) config["name"] = name if len(config_items) > 1: capacity = config_items[1] try: config["capacity"] = int(capacity) except Exception: raise ValueError( "Invalid channel config %s: " "invalid capacity %s" % (config_string, capacity) ) for config_item in config_items[2:]: kv = split_strip(config_item, "=") if len(kv) == 1: k, v = kv[0], True elif len(kv) == 2: k, v = kv else: raise ValueError( "Invalid channel config %s: " "incorrect config item %s" % (config_string, config_item) ) if k in config: raise ValueError( "Invalid channel config %s: " "duplicate key %s" % (config_string, k) ) config[k] = v else: config["capacity"] = 1 res.append(config) return res
[docs] def simple_configure(self, config_string): """Configure the channel manager from a simple configuration string >>> cm = ChannelManager() >>> c = cm.get_channel_by_name('root') >>> c.capacity 1 >>> cm.simple_configure('root:4,autosub.sub:2,seq:1:sequential') >>> cm.get_channel_by_name('root').capacity 4 >>> cm.get_channel_by_name('root').sequential False >>> cm.get_channel_by_name('root.autosub').capacity >>> cm.get_channel_by_name('root.autosub.sub').capacity 2 >>> cm.get_channel_by_name('root.autosub.sub').sequential False >>> cm.get_channel_by_name('autosub.sub').capacity 2 >>> cm.get_channel_by_name('seq').capacity 1 >>> cm.get_channel_by_name('seq').sequential True """ for config in ChannelManager.parse_simple_config(config_string): self.get_channel_from_config(config)
[docs] def get_channel_from_config(self, config): """Return a Channel object from a parsed configuration. If the channel does not exist it is created. The configuration is applied on the channel before returning it. If some of the parent channels are missing when creating a subchannel, the parent channels are auto created with an infinite capacity (except for the root channel, which defaults to a capacity of 1 when not configured explicity). """ channel = self.get_channel_by_name(config["name"], autocreate=True) channel.configure(config)"Configured channel: %s", channel) return channel
[docs] def get_channel_by_name(self, channel_name, autocreate=False): """Return a Channel object by its name. If it does not exist and autocreate is True, it is created with a default configuration and inserted in the Channels structure. If autocreate is False and the channel does not exist, an exception is raised. >>> cm = ChannelManager() >>> c = cm.get_channel_by_name('root', autocreate=False) >>> 'root' >>> c.fullname 'root' >>> c = cm.get_channel_by_name('root.sub', autocreate=True) >>> 'sub' >>> c.fullname 'root.sub' >>> c = cm.get_channel_by_name('sub', autocreate=True) >>> 'sub' >>> c.fullname 'root.sub' >>> c = cm.get_channel_by_name('autosub.sub', autocreate=True) >>> 'sub' >>> c.fullname 'root.autosub.sub' >>> c = cm.get_channel_by_name(None) >>> c.fullname 'root' >>> c = cm.get_channel_by_name('root.sub') >>> c.fullname 'root.sub' >>> c = cm.get_channel_by_name('sub') >>> c.fullname 'root.sub' """ if not channel_name or channel_name == return self._root_channel if not channel_name.startswith( + "."): channel_name = + "." + channel_name if channel_name in self._channels_by_name: return self._channels_by_name[channel_name] if not autocreate: raise ChannelNotFound("Channel %s not found" % channel_name) parent = self._root_channel for subchannel_name in channel_name.split(".")[1:]: subchannel = parent.get_subchannel_by_name(subchannel_name) if not subchannel: subchannel = Channel(subchannel_name, parent, capacity=None) self._channels_by_name[subchannel.fullname] = subchannel parent = subchannel return parent
[docs] def notify( self, db_name, channel_name, uuid, seq, date_created, priority, eta, state ): try: channel = self.get_channel_by_name(channel_name) except ChannelNotFound: _logger.warning( "unknown channel %s, using root channel for job %s", channel_name, uuid ) channel = self._root_channel job = self._jobs_by_uuid.get(uuid) if job: # db_name is invariant assert job.db_name == db_name # date_created is invariant assert job.date_created == date_created # if one of the job properties that influence # scheduling order has changed, we remove the job # from the queues and create a new job object if ( seq != job.seq or priority != job.priority or eta != job.eta or channel != ): _logger.debug("job %s properties changed, rescheduling it", uuid) self.remove_job(uuid) job = None if not job: job = ChannelJob(db_name, channel, uuid, seq, date_created, priority, eta) self._jobs_by_uuid[uuid] = job # state transitions if not state or state == DONE: elif state == PENDING: elif state in (ENQUEUED, STARTED): elif state == FAILED: else: _logger.error("unexpected state %s for job %s", state, job)
[docs] def remove_job(self, uuid): job = self._jobs_by_uuid.get(uuid) if job: del self._jobs_by_uuid[job.uuid]
[docs] def remove_db(self, db_name): for job in list(self._jobs_by_uuid.values()): if job.db_name == db_name: del self._jobs_by_uuid[job.uuid]
[docs] def get_jobs_to_run(self, now): return self._root_channel.get_jobs_to_run(now)
[docs] def get_wakeup_time(self): return self._root_channel.get_wakeup_time()