Queue

Job

class connector.queue.job.Job(func=None, model_name=None, args=None, kwargs=None, priority=None, eta=None, job_uuid=None, max_retries=None, description=None)[source]

Bases: object

A Job is a task to execute.

uuid

Id (UUID) of the job.

worker_uuid

When the job is enqueued, UUID of the worker.

state

State of the job, can pending, enqueued, started, done or failed. The start state is pending and the final state is done.

retry

The current try, starts at 0 and each time the job is executed, it increases by 1.

max_retries

The maximum number of retries allowed before the job is considered as failed.

func_name

Name of the function (in the form module.function_name).

args

Arguments passed to the function when executed.

kwargs

Keyword arguments passed to the function when executed.

func_string

Full string representing the function to be executed, ie. module.function(args, kwargs)

description

Human description of the job.

func

The python function itself.

model_name

OpenERP model on which the job will run.

priority

Priority of the job, 0 being the higher priority.

date_created

Date and time when the job was created.

date_enqueued

Date and time when the job was enqueued.

date_started

Date and time when the job was started.

date_done

Date and time when the job was done.

result

A description of the result (for humans).

exc_info

Exception information (traceback) when the job failed.

user_id

OpenERP user id which created the job

eta

Estimated Time of Arrival of the job. It will not be executed before this date/time.

canceled

True if the job has been canceled.

cancel(msg=None)[source]
description
eta
func
func_string
perform(session)[source]

Execute the job.

The job is executed with the user which has initiated it.

Parameters:session (ConnectorSession) – session to execute the job
postpone(result=None, seconds=None)[source]

Write an estimated time arrival to n seconds later than now. Used when an retryable exception want to retry a job later.

related_action(session)[source]
set_done(result=None)[source]
set_enqueued(worker)[source]
set_failed(exc_info=None)[source]
set_pending(result=None, reset_retry=True)[source]
set_started()[source]
uuid

Job ID, this is an UUID

class connector.queue.job.JobStorage[source]

Bases: object

Interface for the storage of jobs

exists(job_uuid)[source]

Returns if a job still exists in the storage.

load(job_uuid)[source]

Read the job’s data from the storage

store(job_)[source]

Store a job

class connector.queue.job.OpenERPJobStorage(session)[source]

Bases: connector.queue.job.JobStorage

Store a job on OpenERP

db_record(job_)[source]
db_record_from_uuid(job_uuid)[source]
enqueue(func, model_name=None, args=None, kwargs=None, priority=None, eta=None, max_retries=None, description=None)[source]

Create a Job and enqueue it in the queue. Return the job uuid.

This expects the arguments specific to the job to be already extracted from the ones to pass to the job function.

enqueue_resolve_args(func, *args, **kwargs)[source]

Create a Job and enqueue it in the queue. Return the job uuid.

exists(job_uuid)[source]

Returns if a job still exists in the storage.

load(job_uuid)[source]

Read a job from the Database

store(job_)[source]

Store the Job

connector.queue.job.job(func=None, default_channel='root', retry_pattern=None)[source]

Decorator for jobs.

Optional argument:

Parameters:
  • default_channel – the channel wherein the job will be assigned. This channel is set at the installation of the module and can be manually changed later using the views.
  • retry_pattern (dict(retry_count,retry_eta_seconds)) – The retry pattern to use for postponing a job. If a job is postponed and there is no eta specified, the eta will be determined from the dict in retry_pattern. When no retry pattern is provided, jobs will be retried after RETRY_INTERVAL seconds.

Add a delay attribute on the decorated function.

When delay is called, the function is transformed to a job and stored in the OpenERP queue.job model. The arguments and keyword arguments given in delay will be the arguments used by the decorated function when it is executed.

retry_pattern is a dict where keys are the count of retries and the values are the delay to postpone a job.

The delay() function of a job takes the following arguments:

session
Current ConnectorSession
model_name
name of the model on which the job has something to do
*args and **kargs

Arguments and keyword arguments which will be given to the called function once the job is executed. They should be pickle-able.

There are 5 special and reserved keyword arguments that you can use:

  • priority: priority of the job, the smaller is the higher priority.

    Default is 10.

  • max_retries: maximum number of retries before giving up and set

    the job state to ‘failed’. A value of 0 means infinite retries. Default is 5.

  • eta: the job can be executed only after this datetime

    (or now + timedelta if a timedelta or integer is given)

  • description : a human description of the job,

    intended to discriminate job instances (Default is the func.__doc__ or

    ‘Function %s’ % func.__name__)

Example:

@job
def export_one_thing(session, model_name, one_thing):
    # work
    # export one_thing

export_one_thing(session, 'a.model', the_thing_to_export)
# => normal and synchronous function call

export_one_thing.delay(session, 'a.model', the_thing_to_export)
# => the job will be executed as soon as possible

export_one_thing.delay(session, 'a.model', the_thing_to_export,
                       priority=30, eta=60*60*5)
# => the job will be executed with a low priority and not before a
# delay of 5 hours from now

@job(default_channel='root.subchannel')
def export_one_thing(session, model_name, one_thing):
    # work
    # export one_thing

@job(retry_pattern={1: 10 * 60,
                    5: 20 * 60,
                    10: 30 * 60,
                    15: 12 * 60 * 60})
def retryable_example(session):
    # 5 first retries postponed 10 minutes later
    # retries 5 to 10 postponed 20 minutes later
    # retries 10 to 15 postponed 30 minutes later
    # all subsequent retries postponed 12 hours later
    raise RetryableJobError('Must be retried later')

retryable_example.delay(session)

See also: related_action() a related action can be attached to a job

connector.queue.job.related_action(action=<function <lambda>>, **kwargs)[source]

Attach a Related Action to a job.

A Related Action will appear as a button on the OpenERP view. The button will execute the action, usually it will open the form view of the record related to the job.

The action must be a callable that responds to arguments:

session, job, **kwargs

Example usage:

def related_action_partner(session, job):
    model = job.args[0]
    partner_id = job.args[1]
    # eventually get the real ID if partner_id is a binding ID
    action = {
        'name': _("Partner"),
        'type': 'ir.actions.act_window',
        'res_model': model,
        'view_type': 'form',
        'view_mode': 'form',
        'res_id': partner_id,
    }
    return action

@job
@related_action(action=related_action_partner)
def export_partner(session, model_name, partner_id):
    # ...

The kwargs are transmitted to the action:

def related_action_product(session, job, extra_arg=1):
    assert extra_arg == 2
    model = job.args[0]
    product_id = job.args[1]

@job
@related_action(action=related_action_product, extra_arg=2)
def export_product(session, model_name, product_id):
    # ...
connector.queue.job.whitelist_unpickle_global(fn_or_class)[source]

Allow a function or class to be used in jobs

By default, the only types allowed to be used in job arguments are:

  • the builtins: str/unicode, int/long, float, bool, tuple, list, dict, None
  • the pre-registered: datetime.datetime datetime.timedelta

If you need to use an argument in a job which is not in this whitelist, you can add it by using:

whitelist_unpickle_global(fn_or_class_to_register)

Worker

class connector.queue.worker.Worker(db_name, watcher_)[source]

Bases: threading.Thread

Post and retrieve jobs from the queue, execute them

enqueue_job_uuid(job_uuid)[source]

Enqueue a job:

It will be executed by the worker as soon as possible (according to the job’s priority

job_storage_class

alias of connector.queue.job.OpenERPJobStorage

queue_class

alias of connector.queue.queue.JobsQueue

run()[source]

Worker’s main loop

Check if it still exists in the watcher. When it does no longer exist, it break the loop so the thread stops properly.

Wait for jobs and execute them sequentially.

run_job(job)[source]

Execute a job

class connector.queue.worker.WorkerWatcher[source]

Bases: threading.Thread

Keep a sight on the workers and signal their aliveness.

A WorkerWatcher is shared between databases, so only 1 instance is necessary to check the aliveness of the workers for every database.

static available_db_names()[source]

Returns the databases for the server having the connector module installed.

Available means that they can be used by a Worker.

Returns:database names
Return type:list
check_alive(db_name, worker)[source]

Check if the the worker is still alive and notify its aliveness. Check if the other workers are still alive, if they are dead, remove them from the worker’s pool.

run()[source]

WorkerWatcher’s main loop

worker_for_db(db_name)[source]
worker_lost(worker)[source]

Indicate if a worker is no longer referenced by the watcher.

Used by the worker threads to know if they have to exit.

connector.queue.worker.start_service()[source]

Start the watcher

Queue

class connector.queue.queue.JobsQueue[source]

Bases: object

Holds the jobs planned for execution in memory.

The Jobs are sorted, the higher the priority is, the earlier the jobs are dequeued.

dequeue()[source]

Take the first job according to its priority and return it

enqueue(job)[source]

Models

class connector.queue.model.JobChannel(pool, cr)[source]

Bases: openerp.models.Model

complete_name

Basic string field, can be length-limited, usually displayed as a single-line string in clients

Parameters:
  • size (int) – the maximum size of values stored for that field
  • translate (bool) – whether the values of this field can be translated
job_function_ids

One2many field; the value of such a field is the recordset of all the records in comodel_name such that the field inverse_name is equal to the current record.

Parameters:
  • comodel_name – name of the target model (string)
  • inverse_name – name of the inverse Many2one field in comodel_name (string)
  • domain – an optional domain to set on candidate values on the client side (domain or string)
  • context – an optional context to use on the client side when handling that field (dictionary)
  • auto_join – whether JOINs are generated upon search through that field (boolean, by default False)
  • limit – optional limit to use upon read (integer)

The attributes comodel_name and inverse_name are mandatory except in the case of related fields or field extensions.

name

Basic string field, can be length-limited, usually displayed as a single-line string in clients

Parameters:
  • size (int) – the maximum size of values stored for that field
  • translate (bool) – whether the values of this field can be translated
name_get() → [(id, name), ...][source]

Returns a textual representation for the records in self. By default this is the value of the display_name field.

Returns:list of pairs (id, text_repr) for each records
Return type:list(tuple)
parent_id

The value of such a field is a recordset of size 0 (no record) or 1 (a single record).

Parameters:
  • comodel_name – name of the target model (string)
  • domain – an optional domain to set on candidate values on the client side (domain or string)
  • context – an optional context to use on the client side when handling that field (dictionary)
  • ondelete – what to do when the referred record is deleted; possible values are: 'set null', 'restrict', 'cascade'
  • auto_join – whether JOINs are generated upon search through that field (boolean, by default False)
  • delegate – set it to True to make fields of the target model accessible from the current model (corresponds to _inherits)

The attribute comodel_name is mandatory except in the case of related fields or field extensions.

parent_required(*args, **kwargs)[source]
write(*args, **kwargs)[source]
class connector.queue.model.JobFunction(pool, cr)[source]

Bases: openerp.models.Model

channel

Basic string field, can be length-limited, usually displayed as a single-line string in clients

Parameters:
  • size (int) – the maximum size of values stored for that field
  • translate (bool) – whether the values of this field can be translated
channel_id

The value of such a field is a recordset of size 0 (no record) or 1 (a single record).

Parameters:
  • comodel_name – name of the target model (string)
  • domain – an optional domain to set on candidate values on the client side (domain or string)
  • context – an optional context to use on the client side when handling that field (dictionary)
  • ondelete – what to do when the referred record is deleted; possible values are: 'set null', 'restrict', 'cascade'
  • auto_join – whether JOINs are generated upon search through that field (boolean, by default False)
  • delegate – set it to True to make fields of the target model accessible from the current model (corresponds to _inherits)

The attribute comodel_name is mandatory except in the case of related fields or field extensions.

name

Basic string field, can be length-limited, usually displayed as a single-line string in clients

Parameters:
  • size (int) – the maximum size of values stored for that field
  • translate (bool) – whether the values of this field can be translated
class connector.queue.model.QueueJob(pool, cr)[source]

Bases: openerp.models.Model

Job status and result

active
autovacuum(*args, **kwargs)[source]

Delete all jobs (active or not) done since more than _removal_interval days.

Called from a cron.

button_done(*args, **kwargs)[source]
channel

Basic string field, can be length-limited, usually displayed as a single-line string in clients

Parameters:
  • size (int) – the maximum size of values stored for that field
  • translate (bool) – whether the values of this field can be translated
company_id

The value of such a field is a recordset of size 0 (no record) or 1 (a single record).

Parameters:
  • comodel_name – name of the target model (string)
  • domain – an optional domain to set on candidate values on the client side (domain or string)
  • context – an optional context to use on the client side when handling that field (dictionary)
  • ondelete – what to do when the referred record is deleted; possible values are: 'set null', 'restrict', 'cascade'
  • auto_join – whether JOINs are generated upon search through that field (boolean, by default False)
  • delegate – set it to True to make fields of the target model accessible from the current model (corresponds to _inherits)

The attribute comodel_name is mandatory except in the case of related fields or field extensions.

date_created
date_done
date_enqueued
date_started
eta
exc_info

Very similar to Char but used for longer contents, does not have a size and usually displayed as a multiline text box.

Parameters:translate – whether the value of this field can be translated
func
func_name

Basic string field, can be length-limited, usually displayed as a single-line string in clients

Parameters:
  • size (int) – the maximum size of values stored for that field
  • translate (bool) – whether the values of this field can be translated
func_string

Basic string field, can be length-limited, usually displayed as a single-line string in clients

Parameters:
  • size (int) – the maximum size of values stored for that field
  • translate (bool) – whether the values of this field can be translated
job_function_id

The value of such a field is a recordset of size 0 (no record) or 1 (a single record).

Parameters:
  • comodel_name – name of the target model (string)
  • domain – an optional domain to set on candidate values on the client side (domain or string)
  • context – an optional context to use on the client side when handling that field (dictionary)
  • ondelete – what to do when the referred record is deleted; possible values are: 'set null', 'restrict', 'cascade'
  • auto_join – whether JOINs are generated upon search through that field (boolean, by default False)
  • delegate – set it to True to make fields of the target model accessible from the current model (corresponds to _inherits)

The attribute comodel_name is mandatory except in the case of related fields or field extensions.

max_retries
model_name

Basic string field, can be length-limited, usually displayed as a single-line string in clients

Parameters:
  • size (int) – the maximum size of values stored for that field
  • translate (bool) – whether the values of this field can be translated
name

Basic string field, can be length-limited, usually displayed as a single-line string in clients

Parameters:
  • size (int) – the maximum size of values stored for that field
  • translate (bool) – whether the values of this field can be translated

Open the related action associated to the job

priority
requeue(*args, **kwargs)[source]
result

Very similar to Char but used for longer contents, does not have a size and usually displayed as a multiline text box.

Parameters:translate – whether the value of this field can be translated
retry
state
Parameters:
  • selection – specifies the possible values for this field. It is given as either a list of pairs (value, string), or a model method, or a method name.
  • selection_add – provides an extension of the selection in the case of an overridden field. It is a list of pairs (value, string).

The attribute selection is mandatory except in the case of related fields or field extensions.

user_id

The value of such a field is a recordset of size 0 (no record) or 1 (a single record).

Parameters:
  • comodel_name – name of the target model (string)
  • domain – an optional domain to set on candidate values on the client side (domain or string)
  • context – an optional context to use on the client side when handling that field (dictionary)
  • ondelete – what to do when the referred record is deleted; possible values are: 'set null', 'restrict', 'cascade'
  • auto_join – whether JOINs are generated upon search through that field (boolean, by default False)
  • delegate – set it to True to make fields of the target model accessible from the current model (corresponds to _inherits)

The attribute comodel_name is mandatory except in the case of related fields or field extensions.

uuid

Basic string field, can be length-limited, usually displayed as a single-line string in clients

Parameters:
  • size (int) – the maximum size of values stored for that field
  • translate (bool) – whether the values of this field can be translated
worker_id

The value of such a field is a recordset of size 0 (no record) or 1 (a single record).

Parameters:
  • comodel_name – name of the target model (string)
  • domain – an optional domain to set on candidate values on the client side (domain or string)
  • context – an optional context to use on the client side when handling that field (dictionary)
  • ondelete – what to do when the referred record is deleted; possible values are: 'set null', 'restrict', 'cascade'
  • auto_join – whether JOINs are generated upon search through that field (boolean, by default False)
  • delegate – set it to True to make fields of the target model accessible from the current model (corresponds to _inherits)

The attribute comodel_name is mandatory except in the case of related fields or field extensions.

write(*args, **kwargs)[source]
class connector.queue.model.QueueWorker(pool, cr)[source]

Bases: openerp.models.Model

Worker

assign_jobs(*args, **kwargs)[source]

Assign n jobs to the worker of the current process

n is max_jobs or unlimited if max_jobs is None

Parameters:max_jobs (int) – maximal limit of jobs to assign on a worker
assign_then_enqueue(*args, **kwargs)[source]

Assign all the jobs not already assigned to a worker. Then enqueue all the jobs having a worker but not enqueued.

Each operation is atomic.

Warning

commit transaction cr.commit() is called, so please always call this method in your own transaction, not in the main OpenERP’s transaction

Parameters:max_jobs (int) – maximal limit of jobs to assign on a worker
date_alive
date_start
enqueue_jobs(*args, **kwargs)[source]

Enqueue all the jobs assigned to the worker of the current process

job_ids

One2many field; the value of such a field is the recordset of all the records in comodel_name such that the field inverse_name is equal to the current record.

Parameters:
  • comodel_name – name of the target model (string)
  • inverse_name – name of the inverse Many2one field in comodel_name (string)
  • domain – an optional domain to set on candidate values on the client side (domain or string)
  • context – an optional context to use on the client side when handling that field (dictionary)
  • auto_join – whether JOINs are generated upon search through that field (boolean, by default False)
  • limit – optional limit to use upon read (integer)

The attributes comodel_name and inverse_name are mandatory except in the case of related fields or field extensions.

pid

Basic string field, can be length-limited, usually displayed as a single-line string in clients

Parameters:
  • size (int) – the maximum size of values stored for that field
  • translate (bool) – whether the values of this field can be translated
uuid

Basic string field, can be length-limited, usually displayed as a single-line string in clients

Parameters:
  • size (int) – the maximum size of values stored for that field
  • translate (bool) – whether the values of this field can be translated
worker_timeout = 300
class connector.queue.model.RequeueJob(pool, cr)[source]

Bases: openerp.models.TransientModel

job_ids

Many2many field; the value of such a field is the recordset.

Parameters:comodel_name – name of the target model (string)

The attribute comodel_name is mandatory except in the case of related fields or field extensions.

Parameters:
  • relation – optional name of the table that stores the relation in the database (string)
  • column1 – optional name of the column referring to “these” records in the table relation (string)
  • column2 – optional name of the column referring to “those” records in the table relation (string)

The attributes relation, column1 and column2 are optional. If not given, names are automatically generated from model names, provided model_name and comodel_name are different!

Parameters:
  • domain – an optional domain to set on candidate values on the client side (domain or string)
  • context – an optional context to use on the client side when handling that field (dictionary)
  • limit – optional limit to use upon read (integer)
requeue(*args, **kwargs)[source]