Skip to content Skip to sidebar Skip to footer

Python Daemonic Processes Are Not Allowed to Have Children

Python Process Pool non-daemonic?


Mungkinkah membuat Kolam python yang non-daemonic? Saya ingin sebuah pool dapat memanggil fungsi yang memiliki pool lain di dalamnya.

Saya menginginkan ini karena proses deamon tidak dapat membuat proses. Secara khusus, ini akan menyebabkan kesalahan:

                AssertionError: daemonic processes are                  not                  allowed to have children                              

Misalnya, pertimbangkan skenario di mana function_amemiliki kumpulan yang berjalan function_byang memiliki kumpulan yang berjalan function_c. Rantai fungsi ini akan gagal, karena function_bdijalankan dalam proses daemon, dan proses daemon tidak dapat membuat proses.



Jawaban:


The multiprocessing.pool.Poolkelas menciptakan proses pekerja di perusahaan __init__metode, membuat mereka kejam dan mulai mereka, dan tidak mungkin untuk kembali mengatur mereka daemonatribut Falsesebelum mereka mulai (dan setelah itu tidak diperbolehkan lagi). Tapi Anda bisa membuat sub-kelas Anda sendiri multiprocesing.pool.Pool( multiprocessing.Poolhanya fungsi pembungkus) dan mengganti multiprocessing.Processsub-kelas Anda sendiri , yang selalu non-daemonik, untuk digunakan untuk proses pekerja.

Berikut contoh lengkap tentang cara melakukan ini. Bagian yang penting adalah dua kelas NoDaemonProcessdan MyPooldi bagian atas dan untuk memanggil pool.close()dan pool.join()di MyPoolinstance Anda di bagian akhir.

                                  #!/usr/bin/env python                  # -*- coding: UTF-8 -*-                  import                  multiprocessing                  # We must import this explicitly, it is not imported by the top-level                  # multiprocessing module.                  import                  multiprocessing.pool                  import                  time                  from                  random                  import                  randint                                      class                    NoDaemonProcess(multiprocessing.Process):                  # make 'daemon' attribute always return False                                      def                    _get_daemon(self):                  return                  False                                      def                    _set_daemon(self, value):                  pass                  daemon = property(_get_daemon, _set_daemon)                  # We sub-class multiprocessing.pool.Pool instead of multiprocessing.Pool                  # because the latter is only a wrapper function, not a proper class.                                      class                    MyPool(multiprocessing.pool.Pool):                  Process = NoDaemonProcess                                      def                    sleepawhile(t):                  print("Sleeping %i seconds..."                  % t)     time.sleep(t)                  return                  t                                      def                    work(num_procs):                  print("Creating %i (daemon) workers and jobs in child."                  % num_procs)     pool = multiprocessing.Pool(num_procs)      result = pool.map(sleepawhile,         [randint(1,                  5)                  for                  x                  in                  range(num_procs)])                  # The following is not really needed, since the (daemon) workers of the                  # child's pool are killed when the child is terminated, but it's good                  # practice to cleanup after ourselves anyway.                  pool.close()     pool.join()                  return                  result                                      def                    test():                  print("Creating 5 (non-daemon) workers and jobs in main process.")     pool = MyPool(5)      result = pool.map(work, [randint(1,                  5)                  for                  x                  in                  range(5)])      pool.close()     pool.join()     print(result)                  if                  __name__ ==                  '__main__':     test()                              






Saya memiliki kebutuhan untuk menggunakan kumpulan non-daemonik dengan Python 3.7 dan akhirnya mengadaptasi kode yang diposting dalam jawaban yang diterima. Di bawah ini ada cuplikan yang membuat kumpulan non-daemonik:

                                  import                  multiprocessing.pool                                      class                    NoDaemonProcess(multiprocessing.Process):                                      @property                                      def                    daemon(self):                  return                  False                                      @daemon.setter                                      def                    daemon(self, value):                  pass                                      class                    NoDaemonContext(type(multiprocessing.get_context())):                  Process = NoDaemonProcess                  # We sub-class multiprocessing.pool.Pool instead of multiprocessing.Pool                  # because the latter is only a wrapper function, not a proper class.                                      class                    NestablePool(multiprocessing.pool.Pool):                                      def                    __init__(self, *args, **kwargs):                  kwargs['context'] = NoDaemonContext()         super(NestablePool, self).__init__(*args, **kwargs)                              

Karena implementasi saat multiprocessingini telah difaktorisasi ulang secara ekstensif untuk didasarkan pada konteks, kita perlu menyediakan NoDaemonContextkelas yang memiliki NoDaemonProcessatribut as kita . NestablePoolkemudian akan menggunakan konteks itu, bukan yang default.

Karena itu, saya harus memperingatkan bahwa setidaknya ada dua peringatan untuk pendekatan ini:

  1. Ini masih bergantung pada detail implementasi multiprocessingpaket, dan karenanya dapat rusak kapan saja.
  2. Ada alasan yang valid mengapa multiprocessingsangat sulit menggunakan proses non-daemonik, banyak di antaranya dijelaskan di sini . Yang paling menarik menurut saya adalah:

Adapun mengizinkan utas anak untuk menelurkan anak-anaknya sendiri menggunakan subproses menjalankan risiko menciptakan pasukan kecil 'cucu' zombie jika utas orang tua atau anak berakhir sebelum subproses selesai dan kembali.





The multiprocessing modul memiliki antarmuka yang bagus untuk menggunakan kolam dengan proses atau benang. Bergantung pada kasus penggunaan Anda saat ini, Anda mungkin mempertimbangkan multiprocessing.pool.ThreadPooluntuk menggunakan untuk Pool luar Anda, yang akan menghasilkan utas (yang memungkinkan untuk menelurkan proses dari dalam) sebagai lawan dari proses.

Mungkin dibatasi oleh GIL, tetapi dalam kasus khusus saya (saya menguji keduanya) , waktu startup untuk proses dari luar Poolseperti yang dibuat di sini jauh melebihi solusinya ThreadPool.


Ini benar-benar mudah untuk pertukaran Processesuntuk Threads. Baca lebih lanjut tentang cara menggunakan ThreadPoolsolusi di sini atau di sini .




Pada beberapa versi Python menggantikan standar Renang adat dapat meningkatkan error: AssertionError: group argument must be None for now.

Di sini saya menemukan solusi yang dapat membantu:

                                                class                  NoDaemonProcess(multiprocessing.Process):                # make 'daemon' attribute always return False                                  @property                                  def                  daemon(self):                return                False                                  @daemon.setter                                  def                  daemon(self, val):                pass                                  class                  NoDaemonProcessPool(multiprocessing.pool.Pool):                                  def                  Process(self, *args, **kwds):                proc = super(NoDaemonProcessPool, self).Process(*args, **kwds)         proc.__class__ = NoDaemonProcess                return                proc                          

concurrent.futures.ProcessPoolExecutor tidak memiliki batasan ini. Itu dapat memiliki kumpulan proses bersarang tanpa masalah sama sekali:

                                  from                  concurrent.futures                  import                  ProcessPoolExecutor                  as                  Pool                  from                  itertools                  import                  repeat                  from                  multiprocessing                  import                  current_process                  import                  time                                      def                    pid():                  return                  current_process().pid                                      def                    _square(i):                  # Runs in inner_pool                  square = i **                  2                  time.sleep(i /                  10)     print(f'{pid()=}                    {i=}                    {square=}')                  return                  square                                      def                    _sum_squares(i, j):                  # Runs in outer_pool                  with                  Pool(max_workers=2)                  as                  inner_pool:         squares = inner_pool.map(_square, (i, j))     sum_squares = sum(squares)     time.sleep(sum_squares **                  .5)     print(f'{pid()=},                    {i=},                    {j=}                    {sum_squares=}')                  return                  sum_squares                                      def                    main():                  with                  Pool(max_workers=3)                  as                  outer_pool:                  for                  sum_squares                  in                  outer_pool.map(_sum_squares, range(5), repeat(3)):             print(f'{pid()=}                    {sum_squares=}')                  if                  __name__ ==                  "__main__":     main()                              

Kode demonstrasi di atas telah diuji dengan Python 3.8.

Batasannya ProcessPoolExecutor, bagaimanapun, adalah tidak adanya maxtasksperchild. Jika Anda membutuhkan ini, pertimbangkan jawaban dari Massimiliano sebagai gantinya.

Kredit: jawaban oleh jfs




Masalah yang saya temui adalah mencoba mengimpor global antar modul, menyebabkan baris ProcessPool () dievaluasi beberapa kali.

globals.py

                              from                processing                import                Manager, Lock                from                pathos.multiprocessing                import                ProcessPool                from                pathos.threading                import                ThreadPool                                  class                  SingletonMeta(type):                                  def                  __new__(cls, name, bases, dict):                dict['__deepcopy__'] = dict['__copy__'] =                lambda                self, *args: self                return                super(SingletonMeta, cls).__new__(cls, name, bases, dict)                                  def                  __init__(cls, name, bases, dict):                super(SingletonMeta, cls).__init__(name, bases, dict)         cls.instance =                None                                  def                  __call__(cls,*args,**kw):                if                cls.instance                is                None:             cls.instance = super(SingletonMeta, cls).__call__(*args, **kw)                return                cls.instance                                  def                  __deepcopy__(self, item):                return                item.__class__.instance                                  class                  Globals(object):                __metaclass__ = SingletonMeta                """          This class is a workaround to the bug: AssertionError: daemonic processes are not allowed to have children           The root cause is that importing this file from different modules causes this file to be reevalutated each time,      thus ProcessPool() gets reexecuted inside that child thread, thus causing the daemonic processes bug         """                                  def                  __init__(self):                print                "%s::__init__()"                % (self.__class__.__name__)         self.shared_manager      = Manager()         self.shared_process_pool = ProcessPool()         self.shared_thread_pool  = ThreadPool()         self.shared_lock         = Lock()                #                  BUG:                  Windows: global name 'lock' is not defined | doesn't affect cygwin                          

Kemudian impor dengan aman dari tempat lain di kode Anda

                              from                globals                import                Globals Globals().shared_manager       Globals().shared_process_pool Globals().shared_thread_pool   Globals().shared_lock                          

Saya telah menulis kelas pembungkus yang lebih luas di sekitar pathos.multiprocessingsini:

  • https://github.com/JamesMcGuigan/python2-timeseries-datapipeline/blob/master/src/util/MultiProcessing.py

Sebagai catatan tambahan, jika kasus penggunaan Anda hanya memerlukan peta multiproses asinkron sebagai pengoptimalan kinerja, maka joblib akan mengelola semua kumpulan proses Anda di belakang layar dan memungkinkan sintaks yang sangat sederhana ini:

              squares = Parallel(-1)( delayed(lambda                num: num**2)(x)                for                x                in                range(100) )                          
  • https://joblib.readthedocs.io/

Saya telah melihat orang-orang berurusan dengan masalah ini dengan menggunakan celerygarpu yang multiprocessingdisebut billiard (ekstensi kolam multiprosesing), yang memungkinkan proses daemonik untuk menelurkan anak-anak. Panduannya adalah dengan mengganti multiprocessingmodul dengan:

                              import                billiard                as                multiprocessing                          

Ini memberikan solusi untuk kesalahan yang tampaknya positif palsu. Seperti juga dicatat oleh James , ini bisa terjadi pada impor yang tidak disengaja dari proses daemonik.

Misalnya, jika Anda memiliki kode sederhana berikut, WORKER_POOLsecara tidak sengaja dapat diimpor dari pekerja, yang menyebabkan kesalahan.

                              import                multiprocessing  WORKER_POOL = multiprocessing.Pool()                          

Pendekatan sederhana namun dapat diandalkan untuk solusi adalah:

                              import                multiprocessing                import                multiprocessing.pool                                  class                  MyClass:                                  @property                                  def                  worker_pool(self) -> multiprocessing.pool.Pool:                # Ref: https://stackoverflow.com/a/63984747/                try:                return                self._worker_pool                # type: ignore                except                AttributeError:                # pylint: disable=protected-access                self.__class__._worker_pool = multiprocessing.Pool()                # type: ignore                return                self.__class__._worker_pool                # type: ignore                # pylint: enable=protected-access                          

Dalam solusi di atas, MyClass.worker_pooldapat digunakan tanpa kesalahan. Jika menurut Anda pendekatan ini dapat diperbaiki, beri tahu saya.

Python Daemonic Processes Are Not Allowed to Have Children

Source: https://qastack.id/programming/6974695/python-process-pool-non-daemonic

Post a Comment for "Python Daemonic Processes Are Not Allowed to Have Children"