[Source code analysis] Distributed task queue Celery multi-threaded model child process

[Source code analysis] Distributed task queue Celery multi-threaded model child process

0x00 summary

Celery is a simple, flexible and reliable distributed system that processes a large number of messages. It focuses on asynchronous task queues for real-time processing and also supports task scheduling.

In the previous article, we introduced the Celery multithreading model, but we skipped the sub-process stage, this article will see how the sub-process continues to start.

We still have to ask a few questions first:

  • What preparations need to be made before starting the child process?
    • How do I know what command the child process wants to run?
    • How to build a communication mechanism between father and son processes?
    • How to pass the parent process information to the child process?
  • Currently, the Celery application is in the parent process.
    • How does the child process get the Celery application?
    • How to restore Celery application?
  • How does the parent process know that the child process is ready so that it can schedule work for the child process?
  • How does the child process accept the tasks arranged by the parent process?

In order to make it easier for everyone to understand, we first give a final diagram of this article.

0x01 Previous article review

1.1 The role of the base class

As we mentioned earlier, in the base class Pool(object) of AsynPool, various message processing functions are established and child processes are established.

The code location is: billiard/pool.py

The specific code is as follows, where _create_worker_process creates a child process.

class Pool ( object ): def __init__ ( self, processes = None , initializer= None , initargs=(), maxtasksperchild= None , timeout = None , soft_timeout = None , lost_worker_timeout = None , max_restarts= None , max_restart_freq = 1 , on_process_up = None , on_process_down = None , on_timeout_set= None , on_timeout_cancel= None , threads= True , semaphore= None , putlocks= False , allow_restart= False , synack= False , on_process_exit= None , context= None , max_memory_per_child= None , enable_timeouts= False , **kwargs ): for i in range (self._processes): self._create_worker_process(i) Copy code

1.2 Sub-process abstraction

The following code establishes the child process abstraction.

for i in range (self._processes): self._create_worker_process(i) Copy code

The main work of _create_worker_process is as follows:

  • inq, outq, synq = self.get_process_queues() gets an abstract object of a read and write pipeline. This pipeline was previously created ( created by self.create_process_queues() above ). It is mainly used for the child process that is about to fork. The child process will listen to the read event in the abstract instance of the pipeline data structure and can also write data from the write pipeline.

  • w , which is an instance of self.WorkerProcess , is actually an abstract encapsulation of the child process from fork. Used to manage child processes conveniently and quickly, abstracted into a process pool, this w will record some meta information of the child process from fork, such as pid, read and write fd of pipeline, etc., and register it in the main process, the main process It can be used for task distribution;

  • Record the instance of WorkerProcess in self._pool;

  • w.start() contains the specific fork process;

The specific fork process is contained in w.start() .

def _create_worker_process ( self, i ): w = self.WorkerProcess(self.Worker( inq, outq, synq, self._initializer, self._initargs, self._maxtasksperchild, sentinel, self._on_process_exit, # Need to handle all signals if using the ipc semaphore, # to make sure the semaphore is released. sigprotection=self.threads, wrap_exception=self._wrap_exception, max_memory_per_child=self._max_memory_per_child, on_ready_counter=on_ready_counter, )) w.start() # here return wCopy code

1.3 Fork process

The specific code of Fork is as follows:

class BaseProcess ( object ): ''' Process objects represent activity that is run in a separate process The class is analagous to `threading.Thread` ''' def run ( self ): ''' Method to be run in sub-process; can be overridden in sub-class ''' if self._target: self._target(*self._args, **self._kwargs) def start ( self ): ''' Start child process ''' assert self._popen is None , 'cannot start a process twice' assert self._parent_pid == os.getpid(),/ 'can only start a process object created by current process' _cleanup() self._popen = self._Popen(self) self._sentinel = self._popen.sentinel _children.add(self) Copy code

The main one is that self._popen = self._Popen(self) is more important.

The code is located at: billiard/context.py.

It can be seen that because of the different operating systems, the specific usage is also different.

The following are various fork functions of the *nix system.

class ForkProcess ( process.BaseProcess ): _start_method = 'fork' @staticmethod def _Popen ( process_obj ): from .popen_fork import Popen return Popen(process_obj) class SpawnProcess ( process.BaseProcess ): _start_method = 'spawn' @staticmethod def _Popen ( process_obj ): from .popen_spawn_posix import Popen return Popen(process_obj) class ForkServerProcess ( process.BaseProcess ): _start_method = 'forkserver' @staticmethod DEF _popen ( process_obj ): from .popen_forkserver Import the Popen return the Popen (process_obj) copying the code

The following is the windows system.

class SpawnProcess ( process.BaseProcess ): _start_method = 'spawn' @staticmethod DEF _popen ( process_obj ): from .popen_spawn_win32 Import the Popen return the Popen (process_obj) copying the code

So let's take a look at how the child process is processed.

0x02 Prepare in advance

Before the child process is started, Celery will do a lot of preparations, such as constructing commands to run the child process, setting up pipes, and transferring parent process information, etc.

2.1 Overall preparation process

After debugging, we found that whether it is windows or *nix systems, there are inconveniences in debugging, so we will take the windows system as an example to analyze.

Because the previous article is a *nix system, the child process abstraction is ForkProcess , this article is Windows, replaced by SpawnProcess

Because it is a windows system, we call to:

class SpawnProcess ( process.BaseProcess ): _start_method = 'spawn' @staticmethod DEF _popen ( process_obj ): from .popen_spawn_win32 Import the Popen return the Popen (process_obj) copying the code

So use

from .popen_spawn_win32 import Popen
.

The following code is located in: billiard/popen_spawn_win32.py

The main functions are as follows:

  • First call _winapi.CreatePipe(None, 0) to get the read and write pipeline of the pipe created earlier;

  • Secondly, call get_command_line to piece together the child process execution command. Note that the pipe_handle passed here is the read pipe, and the parent_pid is the pid of the parent process. In the child process, *nix and windows get the read pipe according to pipe_handle and parent_pid respectively;

  • Then open the read pipeline, this is very important;

  • Call the windows system method CreateProcess again to execute the child process;

  • Because the read pipeline has been opened, the key auxiliary information of the parent process is passed to the child process through reduction.dump(prep_data, to_child), and the child process can interpret the parent process information through this information;

  • Pass the parent process information to the child process through reduction.dump(process_obj, to_child), and the parent process information is SpawnProcess;

  • In the parent process, close the read pipeline of the parent process through _winapi.CloseHandle(rhandle). In this way, the parent process and the child process are connected through the read pipeline of the child process;

details as follows:

class Popen ( object ): ''' Start a subprocess to run the code of a process object ''' method = 'spawn' sentinel = None def __init__ ( self, process_obj ): os.environ[ "MULTIPROCESSING_FORKING_DISABLE" ] = "1" spawn._Django_old_layout_hack__save() prep_data = spawn.get_preparation_data(process_obj._name) # read end of pipe will be "stolen" by the child process # - see spawn_main() in spawn.py. rhandle, whandle = _winapi.CreatePipe( None , 0 ) wfd = msvcrt.open_osfhandle(whandle, 0 ) cmd = spawn.get_command_line(parent_pid=os.getpid(), pipe_handle=rhandle) cmd = '' .join( '"%s"' % x for x in cmd) with io. open (wfd, 'wb' , closefd = True ) as to_child: # start process try : hp, ht, pid, tid = CreateProcess( spawn.get_executable(), cmd, None , None , False , 0 , None , None , None ) close_thread_handle(ht) except : _winapi.CloseHandle(rhandle) raise # set attributes of self self.pid = pid self.returncode = None self._handle = hp self.sentinel = int (hp) # send information to child context.set_spawning_popen(self) try : reduction.dump(prep_data, to_child) reduction.dump(process_obj, to_child) finally : context.set_spawning_popen ( None ) copying the code

Let's take a look at several important points in this preparation process in detail.

2.2 Get commands

First of all, the important point is: call get_command_line to piece together the child process to execute the command.

The code is located at: billiard/spawn.py.

Is to splice out a celery run command.

def get_command_line ( **kwds ): ''' Returns prefix of command line used for spawning a child process ''' if getattr (sys, 'frozen' , False ): return ([sys.executable, '--billiard-fork' ] + [ '%s=%r' % item for item in kwds.items()]) else : prog = 'from billiard.spawn import spawn_main; spawn_main(%s)' prog %= ', ' .join( '%s=%r' % item for item in kwds.items()) opts = util._args_from_interpreter_flags() return [_python_exe] the opts + + [ '-C' , PROG, 'the fork---billiard' ] duplicated code

The command line result is similar to:

python -c'from billiard.spawn import spawn_main; spawn_main(....)' -billiard+fork ..

2.3 Call windows system method

Then it will call the windows system method to start the child process.

hp, ht, pid, tid = CreateProcess( spawn.get_executable(), cmd, None , none , False , 0 , none , none , none ) copy the code

Therefore, the logic is as follows:

+-----------------------------+ | SpawnProcess | | | | | | os.getpid() +-----------------+ | | | | rhandle +---------------+ | | Popen | | | | + whandle | | | | | | | | +-----------------------------+ | | | | | | | | | get_command_line | | | | |. | | | v | | vv python -c'from billiard.spawn import spawn_main; spawn_main(....)' --billiard+fork .. + | | | | CreateProcess | | | v +--------+--------+ | windows kernel | +-----------------+ Copy code

2.4 Passing parent process information

Because the read pipeline has been opened, the key auxiliary information of the parent process is passed to the child process through reduction.dump(prep_data, to_child), and the parent process information can be interpreted through this information. Here, the parent process information obj is SpawnProcess itself.

The code is completed by picker, as follows:

def dump ( obj, file, protocol = None ): '''Replacement for pickle.dump() using ForkingPickler.''' ForkingPickler(file, protocol).dump(obj) Copy code

as well as:

if PY3: import copyreg class ForkingPickler ( pickle.Pickler ): '''Pickler subclass used by multiprocessing.''' _extra_reducers = {} _copyreg_dispatch_table = copyreg.dispatch_table def __init__ ( self, *args ): super (ForkingPickler, self).__init__(*args) self.dispatch_table = self._copyreg_dispatch_table.copy() self.dispatch_table.update(self._extra_reducers) @classmethod def register ( cls, type , reduce ): '''Register a reduce function for a type.''' cls._extra_reducers[ type ] = reduce @classmethod def dumps ( cls, obj, protocol = None ): buf = io.BytesIO() cls(buf, protocol).dump(obj) return buf.getbuffer() @classmethod def loadbuf ( cls, buf, protocol = None ): return cls.loads(buf.getbuffer()) loads = pickle.loads else : class ForkingPickler ( pickle.Pickler ): # noqa '''Pickler subclass used by multiprocessing.''' dispatch = pickle.Pickler.dispatch.copy() @classmethod def register ( cls, type , reduce ): '''Register a reduce function for a type.''' def dispatcher ( self, obj ): rv = reduce(obj) self.save_reduce(obj=obj, *rv) cls.dispatch[ type ] = dispatcher @classmethod def dumps ( cls, obj, protocol = None ): buf = io.BytesIO() cls(buf, protocol).dump(obj) return buf.getvalue() @classmethod def loadbuf ( cls, buf, protocol = None ): return cls.loads(buf.getvalue()) @classmethod def loads ( cls, buf, loads=pickle.loads ): if isinstance (buf, io.BytesIO): buf = buf.getvalue() return loads(buf) Copy code

At this point, the preparatory work is completed, and the child process will be entered.

0x03 child process started

Now that windows has been notified, windows makes system calls.

3.1 Enter from the command line

Since spawn_main is clearly mentioned in the previous command line results:

python -c'from billiard.spawn import spawn_main; spawn_main(....)' -billiard+fork ..

So the child process is started from spawn_main.

The code is located at: billiard/spawn.py

def spawn_main ( pipe_handle, parent_pid = None , tracker_fd = None ): ''' Run code specified by data received over pipe ''' assert is_forking(sys.argv) if sys.platform == 'win32' : import msvcrt from .reduction import steal_handle new_handle = steal_handle(parent_pid, pipe_handle) fd = msvcrt.open_osfhandle(new_handle, os.O_RDONLY) else : from . import semaphore_tracker semaphore_tracker._semaphore_tracker._fd = tracker_fd fd = pipe_handle exitcode = _main(fd) # will be called here. sys.exit(exitcode) Copy code

note:

The pipe_handle here is the read pipe passed in. parent_pid is the parent process ID. In the child process, *nix and windows get the read pipe according to pipe_handle and parent_pid respectively.

The logic at this time is:

+ parent process | child process +-----------------------------+ | | SpawnProcess | | | | | | os.getpid()+-----------------+ | | | | | | rhandle +---------------+ | | +---------------+ | Popen | | | | | spawn_main | | + whandle | | | | parent_pid | | | | | | | | | | +---+-------------------------+ | | | +--------------- > | | | | | | | | | fd | | | | | | | +-----------> | ^ | | | get_command_line | | | | | pipe_handle | | | | | | | | | | +---------------+ | | | | | | | | | v | | | | | ^ | | vv | | | | | python -c'from billi|rd.spawn import spawn_main; spawn_main(....)' --billiard-fork ... | | | | | | + + + | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | CreateProcess | | | | | | | | | | +---------------------------------+ | | | | | +---------------------------------------+ | | | | | | | | 1 +-----------------+ 2 | | | +----------------------------------------------> | windows kernel | +---------------------+ | | +-----------------+ | | | | | +------------------------------------------------- -------------------------------------------------- --------+ 3 reduction.dump(process_obj, to_child) Copy code

The phones are as follows:

Therefore, the program makes a call to _main.

3.2 _main reads the key information of the parent process

As mentioned earlier, the parent process will write key information. So the child process opens the read pipeline here to read the key information of the parent process, where the parent process information is SpawnProcess itself, so the child process can operate SpawnProcess.

def _main ( fd ): _Django_old_layout_hack__load() with io. open (fd, 'rb' , closefd = True ) as from_parent: process.current_process()._inheriting = True try : preparation_data = pickle.load(from_parent) prepare(preparation_data) _setup_logging_in_child_hack() = pickle.load Self (from_parent) # read the parent process critical information, it is SpawnProcess a finally : del process.current_process () ._ inheriting return self._bootstrap () Copy the code

The logic is as follows:

+ parent process | child process +-----------------------------+ | | SpawnProcess | | | | | | os.getpid()+-----------------+ | | | | | | rhandle +---------------+ | | +---------------+ | Popen | | | | | spawn_main | | + whandle | | | | parent_pid | | 4 +-------------+ | | | | | | | self+--------> |SpawnProcess | +---+-------------------------+ | | | +--------------- > | | +------+------+ | | | | | | | fd | | | | | | | | +-----------> | ^ | | | | get_command_line | | | | | pipe_handle | | | 5 | _bootstrap() | | | | | | | +---------------+ | | | | | | | | | | v | v | | | | | ^ | | vv | | | | | python -c'from billi|rd.spawn import spawn_main; spawn_main(....)' --billiard-fork ... | | | | | | + + + | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | CreateProcess | | | | | | | | | | +---------------------------------+ | | | | | +---------------------------------------+ | | | | | | | | 1 +-----------------+ 2 | | | +----------------------------------------------> | windows kernel | +---------------------+ | | +-----------------+ | | | | | +------------------------------------------------- -------------------------------------------------- --------+ 3 reduction.dump(process_obj, to_child) Copy code

The phones are as follows:

3.3 SpawnProcess start

Now that the child process already knows about SpawnProcess, it calls the base class of SpawnProcess.

The code is located at: billiard/process.py.

class BaseProcess ( object ): ''' Process objects represent activity that is run in a separate process The class is analagous to `threading.Thread` ''' Copy code

3.3.1 _bootstrap configuration necessary information

In the base class billiard/process.py, necessary information is configured through _bootstrap, such as stdin, and then run is called.

def _bootstrap ( self ): from . import util, context global _current_process, _process_counter, _children try : # Set stdin and so on if self._start_method is not None : context._force_start_method(self._start_method) _process_counter = itertools.count( 1 ) _children = set () if sys.stdin is not None : try : sys.stdin.close() sys.stdin = open (os.devnull) except (EnvironmentError, OSError, ValueError): pass old_process = _current_process _set_current_process(self) # Set up logger, etc. loggerDict = logging.Logger.manager.loggerDict logger_names = list (loggerDict.keys()) logger_names.append( None ) # for root logger for name in logger_names: if not name or not isinstance (loggerDict[name], logging.PlaceHolder): for handler in logging.getLogger(name).handlers: handler.createLock() logging._lock = threading.RLock() try : util._finalizer_registry.clear() util._run_after_forkers() finally : # delay finalization of the old process object until after # _run_after_forkers() is executed del old_process util.info( 'child process %s calling self.run()' , self.pid) try : self.run() # run here exitcode = 0 finally : util._exit_function() return exitcodecopy code

3.3.2 Start Service Worker

SpawnProcess continues to call run.

def run ( self ): ''' Method to be run in sub-process; can be overridden in sub-class ''' if self._target: self._target(*self._args, **self._kwargs) Copy code

From the previous article,

} = {the Worker _target <celery.concurrency.asynpool.Worker Object AT 0x7f9ad358b240 > copy the code

So came to celery.concurrency.asynpool.Worker, this is the child process work cycle.

as follows:

+ parent process | child process +-----------------------------+ | | SpawnProcess | | | | | | os.getpid()+-----------------+ | | | | | | rhandle +---------------+ | | +---------------+ | Popen | | | | | spawn_main | | + whandle | | | | parent_pid | | 4 +-------------+ | | | | | | | self+--------> |SpawnProcess | +---+-------------------------+ | | | +--------------- > | | +------+------+ | | | | | | | fd | | | | | | | | +-----------> | ^ | | | | get_command_line | | | | | pipe_handle | | | 5 | _bootstrap() | | | | | | | +---------------+ | | | | | | | | | | v | v | | | | | ^ | | vv | | | | | +----------+ python -c'from billi|rd.spawn import spawn_main; spawn_main(....)' --billiard-fork ... | | | | | | Worker | | + + + | | | | | | | | | | | | | | | | | +-----+----+ | | | | | | | | | | | | | | | | | | | | | | | | CreateProcess | | | | | | | | | | | +---------------------------------+ | | | | | | +---------------------------------------+ | | | | | | | v | | 1 +-----------------+ 2 | | | +----------------------------------------------> | windows kernel | +---------------------+ | | +-----------------+ | | | | | +------------------------------------------------- -------------------------------------------------- --------+ 3 reduction.dump(process_obj, to_child) Copy code

The phones are as follows:

3.4 Worker service

The code is located at: celery/billiard/pool.py

After entering the Worker, I came to

__call__
, The main functions are as follows:

+ parent process | child process +-----------------------------+ | | SpawnProcess | | | | | | os.getpid()+-----------------+ | | | | | | rhandle +---------------+ | | +---------------+ | Popen | | | | | spawn_main | | + whandle | | | | parent_pid | | 4 +-------------+ | | | | | | | self+--------> |SpawnProcess | +---+-------------------------+ | | | +--------------- > | | +------+------+ | | | | | | | fd | | | | | | | | +-----------> | ^ | | | | get_command_line | | | | | pipe_handle | | | 5 | _bootstrap() | | | | | | | +---------------+ | | | | | | | | | | v | v | | | | | ^ | | vv | | | | | +----------+ python -c'from billi|rd.spawn import spawn_main; spawn_main(....)' --billiard-fork ... | | | | | | Worker | | + + + | | | | | | | | | | | | | | | | | +-----+----+ | | | | | | | | | | | | | | | | | | | | | | | | CreateProcess | | | | | | | | | | | +---------------------------------+ | | | | | | +---------------------------------------+ | | | | | | | v | | 1 +-----------------+ 2 | | | +----------------------------------------------> | windows kernel | +---------------------+ | __call__ | +-----------------+ | | | | | +------------------------------------------------- -------------------------------------------------- --------+ 3 reduction.dump(process_obj, to_child) + | | | + Copy code

The phones are as follows:

__call__
, The main functions are as follows:

  • Use _make_child_methods to configure monitoring tasks and synchronization methods;

  • Use after_fork to restore application information;

  • Use on_loop_start to send a WORKER_UP to notify the parent process;

  • Use sys.exit(self.workloop(pid=pid)) to officially enter the loop;

class Worker ( object ): def __call__ ( self ): _exit = sys.exit _exitcode = [ None ] def exit ( status = None ): _exitcode[ 0 ] = status return _exit(status) sys.exit = exit pid = os.getpid() self._make_child_methods() self.after_fork() self.on_loop_start(pid=pid) # callback on loop start try : sys.exit(self.workloop(pid=pid)) except Exception as exc: error( 'Pool process %r error: %r' , self, exc, exc_info = 1 ) self._do_exit(pid, _exitcode[ 0 ], exc) finally : self._do_exit (PID, _exitcode [ 0 ], None ) copy the code

We will analyze in detail below.

3.4.1 Configure monitoring tasks and synchronization methods

The child process uses _make_child_methods to configure the method of monitoring tasks and synchronization;

def _make_child_methods ( self, loads=pickle_loads ): self.wait_for_job = self._make_protected_receive(self.inq) self.wait_for_syn = (self._make_protected_receive(self.synq) IF self.synq the else None ) copying the code

3.4.2 Configure application related information

So we encountered a problem: Celery application is in the parent process, how can the child process get it.

Although in some multi-process mechanisms, the variables of the parent process are copied to the child process, this is not necessarily true, so there must be a mechanism for the parent process to set the Celery application to the child process.

Therefore, we need to sort out how the parent process configures the Celery application for the child process and how the child process gets this application.

3.4.2.1 Application information sources

Before in the parent process, when the process pool is started, the corresponding configuration of class Pool(object): is as follows (path: billiard/pool.py):

have to be aware of is:

  • Here is to return to the parent process to discuss;
  • The parameter initializer is the Celery variable itself.

The code is:

class Pool ( object ): ''' Class which supports an async version of applying functions to arguments. ''' _wrap_exception = True Worker = Worker Supervisor = Supervisor TaskHandler = TaskHandler TimeoutHandler = TimeoutHandler ResultHandler = ResultHandler SoftTimeLimitExceeded = SoftTimeLimitExceeded def __init__ ( self, processes = None , initializer = None , initargs=(), ...... **kwargs ): self._ctx = context or get_context() self.synack = synack self._setup_queues() self._taskqueue = Queue() self._cache = {} self._state = RUN self.timeout = timeout self.soft_timeout = soft_timeout self._maxtasksperchild = maxtasksperchild self._max_memory_per_child = max_memory_per_child self._initializer = initializer self._initargs = initargs Copy code

So the relevant variables of the Pool class are as follows, where Celery myTest is the Celery application itself:

self._initializer = {function} <function process_initializer at 0x7f90c9387488 > self._initargs = {tuple: 2 } (<Celery myTest at 0x7f90c8812f98 >, 'celery' ) } = {AsynPool Self <celery.concurrency.asynpool.AsynPool Object AT 0x7f90c97379b0 > copy the code

Thus the class Worker(object): configuration in the parent process is as follows, you can see that the initializer is set:

class Worker ( object ): def __init__ ( self, inq, outq, synq = None , initializer = None , initargs=(), maxtasks = None , sentinel = None , on_exit = None , sigprotection = True , wrap_exception = True , max_memory_per_child = None , on_ready_counter = None ): assert maxtasks is None or ( type (maxtasks) == int and maxtasks> 0 ) self.initializer = initializer self.initargs = initargs Copy code
3.4.2.2 Invoke Recovery

As mentioned earlier, after the child process is started, after_fork will be called to resume the application.

process_initializer, prefork.py: 44 after_fork, pool.py: 421 __call__, pool.py: 289 run, process.py: 114 _bootstrap, process.py: 327 _main, spawn.py: 210 spawn_main, spawn.py: 165 < frame not available> copy code

Take a closer look and find that after_fork restores application information through self.initializer(*self.initargs).

def after_fork ( self ): if hasattr (self.inq, '_writer' ): self.inq._writer.close() if hasattr (self.outq, '_reader' ): self.outq._reader.close() if self.initializer is not None : self.initializer(*self.initargs) # Make sure all exiting signals call finally: blocks. # This is important for the semaphore to be released. reset_signals(full=self.sigprotection) # install signal handler for soft timeouts. if SIG_SOFT_TIMEOUT is not None : signal.signal(SIG_SOFT_TIMEOUT, soft_timeout_sighandler) try : signal.signal(signal.SIGINT, signal.SIG_IGN) the except AttributeError: Pass copy the code
3.4.2.3 Restore application information

The specific recovery method is in process_initializer.

The code location is: celery/concurrency/prefork.py

The important thing here is

app.set_current()
, Is to configure the incoming Celery to the child process itself.

The specific code is:

def process_initializer ( app, hostname ): """Pool child process initializer. Initialize the child pool process to ensure the correct app instance is used and things like logging works. """ _set_task_join_will_block( True ) platforms.signals.reset(*WORKER_SIGRESET) platforms.signals.ignore(*WORKER_SIGIGNORE) platforms.set_mp_process_title( 'celeryd' , hostname=hostname) # This is for Windows and other platforms not supporting # fork(). Note that init_worker makes sure it's only # run once per process. app.loader.init_worker() app.loader.init_worker_process() if os.environ.get( 'FORKED_BY_MULTIPROCESSING' ): # pool did execv after fork trace.setup_worker_optimizations(app, hostname) else : app.set_current() # Configure here set_default_app(app) app.finalize() trace._tasks = app._tasks # enables fast_trace_task optimization. # rebuild execution handler for all tasks. from celery.app.trace import build_tracer for name, task in app.tasks.items(): task.__trace__ = build_tracer(name, task, app.loader, hostname, app=app) from celery.worker import state as worker_state worker_state.reset_state() signals.worker_process_init.send (SENDER = None ) to copy the code
Configure Celery yourself

In the child process, the specific configuration code is located in: celery/app/base.py, we can see TLS related information.

def set_current ( self ): """Make this the current app for this thread.""" _set_current_app(self) def _set_current_app ( app ): _tls.current_app = app def _get_current_app (): if default_app is None : #: creates the global fallback app instance. from celery.app.base import Celery set_default_app(Celery( 'default' , fixups=[], set_as_current= False , = os.environ.get Loader ( 'CELERY_LOADER' ) or 'default' , )) return _tls.current_app or default_app copy the code
TLS

TLS definition is located at: celery/_state.py

It is a variable that is independent of each process or thread, and the difference depends on different implementations.

class _TLS ( threading.local ): #: Apps with the :attr:`~celery.app.base.BaseApp.set_as_current` attribute #: sets this, so it will always contain the last instantiated app, #: and is the default app returned by :func:`app_or_default`. current_app = None _tls = _TLS() Copy code
Follow-up use

So that subsequent use can be used

get_current_app
Bring up Celery itself to get application information.

The specific follow-up is further encapsulated in celery/_state.py, and how to use it, we will explain below.

if os.environ.get( 'C_STRICT_APP' ): # pragma: no cover def get_current_app (): """Return the current app.""" raise RuntimeError( 'USES CURRENT APP' ) elif os.environ.get( ' C_WARN_APP' ): # pragma: no cover def get_current_app (): # noqa import traceback print ( '-- USES CURRENT_APP' , file=sys.stderr) # noqa+ traceback.print_stack(file=sys.stderr) return _get_current_app() Copy code

3.4.3 Notify the parent process

At the end of the child process start, on_loop_start will be used to send a WORKER_UP, you can see that the interaction is through the pipeline.

So in the parent process ResultHandler. on_process_alive will respond.

class Worker ( _pool.Worker ): """Pool worker process.""" def on_loop_start ( self, pid ): # our version sends a WORKER_UP message when the process is ready # to accept work, this will tell the parent that the inqueue fd # is writable. self.outq.put((WORKER_UP, (pid,))) Copy code

When the parent process starts, a message response function is set, so that the parent process knows that the child process is ready and can schedule work for the child process.

class ResultHandler ( _pool.ResultHandler ): """Handles messages from the pool processes.""" def __init__ ( self, *args, **kwargs ): self.fileno_to_outq = kwargs.pop( 'fileno_to_outq' ) self.on_process_alive = kwargs.pop( 'on_process_alive' ) super ().__init__(*args, **kwargs) # add our custom message handler self.state_handlers[WORKER_UP] = self.on_process_alive Copy code

3.4.4 Formally enter the business logic

The child process uses sys.exit(self.workloop(pid=pid)) to officially enter the loop;

Code location: billiard/pool.py

As you can see, use req = wait_for_job() to monitor job information and then run.

The specific stack is:

workloop, pool.py: 351 the __call__, pool.py: 292 RUN, process.py: 114 _bootstrap, process.py: 327 _MAIN, spawn.py: 210 spawn_main, spawn.py: 165 <Frame Not Available> copy the code

The specific code logic is as follows:

def workloop ( self, debug=debug, now=monotonic, pid = None ): pid = pid or os.getpid() put = self.outq.put inqW_fd = self.inqW_fd synqW_fd = self.synqW_fd maxtasks = self.maxtasks max_memory_per_child = self.max_memory_per_child or 0 prepare_result = self.prepare_result wait_for_job = self.wait_for_job _wait_for_syn = self.wait_for_syn def wait_for_syn ( jid ): i = 0 while 1 : if i> 60 : error( '!!!WAIT FOR ACK TIMEOUT: job:%r fd:%r!!!' , jid, self.synq._reader.fileno(), exc_info = 1 ) req = _wait_for_syn() if req: type_, args = req if type_ == NACK: return False assert type_ == ACK return True i += 1 completed = 0 try : while maxtasks is None or (maxtasks and completed <maxtasks): req = wait_for_job() if req: type_, args_ = req assert type_ == TASK job, i, fun, args, kwargs = args_ put((ACK, (job, i, now(), pid, synqW_fd))) if _wait_for_syn: confirm = wait_for_syn(job) if not confirm: continue # received NACK try : result = ( True , prepare_result(fun(*args, **kwargs))) except Exception: result = ( False , ExceptionInfo()) try : put((READY, (job, i, result, inqW_fd))) except Exception as exc: _, _, tb = sys.exc_info() try : wrapped = MaybeEncodingError(exc, result[ 1 ]) einfo = ExceptionInfo(( MaybeEncodingError, wrapped, tb, )) put((READY, (job, i, ( False , einfo), inqW_fd))) finally : del (tb) completed += 1 if max_memory_per_child> 0 : used_kb = mem_rss() if used_kb <= 0 : error( 'worker unable to determine memory usage' ) if used_kb> 0 and used_kb> max_memory_per_child: warning(MAXMEM_USED_FMT. format ( used_kb, max_memory_per_child)) return EX_RECYCLE if maxtasks: return EX_RECYCLE if completed == maxtasks else EX_FAILURE return EX_OK finally : # Before exiting the worker, we want to ensure that that all # messages produced by the worker have been consumed by the main # process. This prevents the worker being terminated prematurely # and messages being lost. self._ensure_messages_consumed(completed=completed) Copy code

The logic is as follows:

+ parent process | child process +-----------------------------+ | | SpawnProcess | | +-----------+ | | | | Celery | | os.getpid()+-----------------+ | | | | | | | +-----------+ | rhandle +---------------+ | | +---------------+ | Popen | | | | | spawn_main | ^ | + whandle | | | | parent_pid | | 4 +-------------+ | | | | | | | | self+--------> |SpawnProcess | | +---+-------------------------+ | | | +--------------- > | | +------+------+ | | | | | | | | fd | | | | | | | | | +-----------> | ^ | | | | | get_command_line | | | | | pipe_handle | | | 5 | _bootstrap() | | | | | | | | +---------------+ | | | | | | | | | | | v +---------+ | v | | | | | ^ | | | vv | | | | | +---------------------------+ python -c'from billi|rd.spawn import spawn_main; spawn_main(....)' --billiard-fork ... | | | | | | Worker | | | + + + | | | | | | | | | <---------+ | | | | | | | | | | | + | | | | | | | | | | | | | _tls.current_app | | | | | | | | | | | | | | | | | CreateProcess | | | | | | | +------------+--------------+ | | | | +---------------------------------+ | | | | | | | +---------------------------------------+ | | | | | | | | | | | | 1 +-----------------+ 2 | | | | | +----------------------------------------------> | windows kernel | +---------------------+ | | | | +-----------------+ | | | | | | | | | | | +------------------------------------------------- -------------------------------------------------- --------+ | | 3 reduction.dump(process_obj, to_child) | 6 __call__ | + | | | +------------------------->+ | | + Copy code

The phones are as follows:

At this point, the child process is started, and how to run the tasks from the parent process will be introduced in the next issue.

0xEE personal information

Thinking about life and technology

WeChat public account: Rossi s thinking

If you want to get the news push of personally written articles in time, or want to see the technical materials recommended by individuals, stay tuned.

0xFF reference

Celery source code learning (two) multi-process model