Source code for connector.queue.worker

# -*- coding: utf-8 -*-
##############################################################################
#
#    Author: Guewen Baconnier
#    Copyright 2013 Camptocamp SA
#
#    This program is free software: you can redistribute it and/or modify
#    it under the terms of the GNU Affero General Public License as
#    published by the Free Software Foundation, either version 3 of the
#    License, or (at your option) any later version.
#
#    This program is distributed in the hope that it will be useful,
#    but WITHOUT ANY WARRANTY; without even the implied warranty of
#    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
#    GNU Affero General Public License for more details.
#
#    You should have received a copy of the GNU Affero General Public License
#    along with this program.  If not, see <http://www.gnu.org/licenses/>.
#
##############################################################################

import re
import logging
import os
import threading
import time
import traceback
import uuid
from datetime import datetime
from StringIO import StringIO

from psycopg2 import OperationalError, ProgrammingError

import openerp
from openerp.service.model import PG_CONCURRENCY_ERRORS_TO_RETRY
from openerp.service import db
from openerp.tools import config
from .queue import JobsQueue
from ..session import ConnectorSessionHandler
from .job import (OpenERPJobStorage,
                  PENDING,
                  DONE)
from ..exception import (NoSuchJobError,
                         NotReadableJobError,
                         RetryableJobError,
                         FailedJobError,
                         NothingToDoJob)
from ..jobrunner import _channels

_logger = logging.getLogger(__name__)

WAIT_CHECK_WORKER_ALIVE = 30  # seconds
WAIT_WHEN_ONLY_AFTER_JOBS = 10  # seconds
WORKER_TIMEOUT = 5 * 60  # seconds
PG_RETRY = 5  # seconds


[docs]class Worker(threading.Thread): """ Post and retrieve jobs from the queue, execute them""" queue_class = JobsQueue job_storage_class = OpenERPJobStorage def __init__(self, db_name, watcher_): super(Worker, self).__init__() self.queue = self.queue_class() self.db_name = db_name threading.current_thread().dbname = db_name self.uuid = unicode(uuid.uuid4()) self.watcher = watcher_
[docs] def run_job(self, job): """ Execute a job """ def retry_postpone(job, message, seconds=None): with session_hdl.session() as session: job.postpone(result=message, seconds=seconds) job.set_enqueued(self) self.job_storage_class(session).store(job) self.queue.enqueue(job) session_hdl = ConnectorSessionHandler(self.db_name, openerp.SUPERUSER_ID) try: with session_hdl.session() as session: job = self._load_job(session, job.uuid) if job is None: return # if the job has been manually set to DONE or PENDING # before its execution, stop if job.state in (DONE, PENDING): return # the job has been enqueued in this worker but has likely be # modified in the database since its enqueue if job.worker_uuid != self.uuid: # put the job in pending so it can be requeued _logger.error('Job %s was enqueued in worker %s but ' 'was linked to worker %s. Reset to pending.', job.uuid, self.uuid, job.worker_uuid) with session_hdl.session() as session: job.set_pending() self.job_storage_class(session).store(job) return if job.eta and job.eta > datetime.now(): # The queue is sorted by 'eta' date first # so if we dequeued a job expected to be run in # the future, we have no jobs to do right now! self.queue.enqueue(job) # Wait some time just to avoid to loop over # the same 'future' jobs _logger.debug('Wait %s seconds because the delayed ' 'jobs have been reached', WAIT_WHEN_ONLY_AFTER_JOBS) time.sleep(WAIT_WHEN_ONLY_AFTER_JOBS) return with session_hdl.session() as session: job.set_started() self.job_storage_class(session).store(job) _logger.debug('%s started', job) with session_hdl.session() as session: job.perform(session) job.set_done() self.job_storage_class(session).store(job) _logger.debug('%s done', job) except NothingToDoJob as err: if unicode(err): msg = unicode(err) else: msg = None job.cancel(msg) with session_hdl.session() as session: self.job_storage_class(session).store(job) except RetryableJobError as err: # delay the job later, requeue retry_postpone(job, unicode(err), seconds=err.seconds) _logger.debug('%s postponed', job) except OperationalError as err: # Automatically retry the typical transaction serialization errors if err.pgcode not in PG_CONCURRENCY_ERRORS_TO_RETRY: raise retry_postpone(job, unicode(err), seconds=PG_RETRY) _logger.debug('%s OperationalError, postponed', job) except (FailedJobError, Exception): buff = StringIO() traceback.print_exc(file=buff) _logger.error(buff.getvalue()) job.set_failed(exc_info=buff.getvalue()) with session_hdl.session() as session: self.job_storage_class(session).store(job) raise
def _load_job(self, session, job_uuid): """ Reload a job from the backend """ try: job = self.job_storage_class(session).load(job_uuid) except NoSuchJobError: # just skip it job = None except NotReadableJobError: _logger.exception('Could not read job: %s', job_uuid) raise return job
[docs] def run(self): """ Worker's main loop Check if it still exists in the ``watcher``. When it does no longer exist, it break the loop so the thread stops properly. Wait for jobs and execute them sequentially. """ while 1: # check if the worker has to exit (db destroyed, connector # uninstalled) if self.watcher.worker_lost(self): break job = self.queue.dequeue() try: self.run_job(job) except: continue
[docs] def enqueue_job_uuid(self, job_uuid): """ Enqueue a job: It will be executed by the worker as soon as possible (according to the job's priority """ session_hdl = ConnectorSessionHandler(self.db_name, openerp.SUPERUSER_ID) with session_hdl.session() as session: job = self._load_job(session, job_uuid) if job is None: # skip a deleted job return job.set_enqueued(self) self.job_storage_class(session).store(job) # the change of state should be commited before # the enqueue otherwise we may have concurrent updates # if the job is started directly self.queue.enqueue(job) _logger.debug('%s enqueued in %s', job, self)
[docs]class WorkerWatcher(threading.Thread): """ Keep a sight on the workers and signal their aliveness. A `WorkerWatcher` is shared between databases, so only 1 instance is necessary to check the aliveness of the workers for every database. """ def __init__(self): super(WorkerWatcher, self).__init__() self._workers = {} def _new(self, db_name): """ Create a new worker for the database """ if db_name in self._workers: raise Exception('Database %s already has a worker (%s)' % (db_name, self._workers[db_name].uuid)) worker = Worker(db_name, self) self._workers[db_name] = worker worker.daemon = True worker.start() def _delete(self, db_name): """ Delete a worker associated with a database """ if db_name in self._workers: # the worker will exit (it checks ``worker_lost()``) del self._workers[db_name]
[docs] def worker_for_db(self, db_name): return self._workers.get(db_name)
[docs] def worker_lost(self, worker): """ Indicate if a worker is no longer referenced by the watcher. Used by the worker threads to know if they have to exit. """ return worker not in self._workers.itervalues()
[docs] @staticmethod def available_db_names(): """ Returns the databases for the server having the connector module installed. Available means that they can be used by a `Worker`. :return: database names :rtype: list """ if config['db_name']: db_names = config['db_name'].split(',') else: db_names = db.exp_list(True) dbfilter = config['dbfilter'] if dbfilter and '%d' not in dbfilter and '%h' not in dbfilter: db_names = [d for d in db_names if re.match(dbfilter, d)] available_db_names = [] for db_name in db_names: session_hdl = ConnectorSessionHandler(db_name, openerp.SUPERUSER_ID) with session_hdl.session() as session: cr = session.cr try: cr.execute("SELECT 1 FROM ir_module_module " "WHERE name = %s " "AND state = %s", ('connector', 'installed'), log_exceptions=False) except ProgrammingError as err: no_db_error = 'relation "ir_module_module" does not exist' if unicode(err).startswith(no_db_error): _logger.debug('Database %s is not an OpenERP database,' ' connector worker not started', db_name) else: raise else: if cr.fetchone(): available_db_names.append(db_name) return available_db_names
def _update_workers(self): """ Refresh the list of workers according to the available databases and registries. A new database can be available, so we need to create a new `Worker` or a database could have been dropped, so we have to discard the Worker. """ db_names = self.available_db_names() # deleted db or connector uninstalled: remove the workers for db_name in set(self._workers) - set(db_names): self._delete(db_name) for db_name in db_names: if db_name not in self._workers: self._new(db_name)
[docs] def run(self): """ `WorkerWatcher`'s main loop """ while 1: self._update_workers() for db_name, worker in self._workers.items(): self.check_alive(db_name, worker) time.sleep(WAIT_CHECK_WORKER_ALIVE)
[docs] def check_alive(self, db_name, worker): """ Check if the the worker is still alive and notify its aliveness. Check if the other workers are still alive, if they are dead, remove them from the worker's pool. """ session_hdl = ConnectorSessionHandler(db_name, openerp.SUPERUSER_ID) with session_hdl.session() as session: if worker.is_alive(): self._notify_alive(session, worker) session.commit() self._purge_dead_workers(session) session.commit()
def _notify_alive(self, session, worker): _logger.debug('Worker %s is alive on process %s', worker.uuid, os.getpid()) session.env['queue.worker']._notify_alive(worker) def _purge_dead_workers(self, session): session.env['queue.worker']._purge_dead_workers()
watcher = WorkerWatcher()
[docs]def start_service(): """ Start the watcher """ watcher.daemon = True watcher.start()
# We have to launch the Jobs Workers only if: # 0. The alternative connector runner is not enabled (i.e. no ``_channels()``) # 1. OpenERP is used in standalone mode (monoprocess) # 2. Or it is used in multiprocess (with option ``--workers``) # but the current process is a Connector Worker # (launched with the ``openerp-connector-worker`` script). if not _channels(): if (not getattr(openerp, 'multi_process', False) or getattr(openerp, 'worker_connector', False)): start_service()