programing

python의 멀티프로세싱 풀에서 키보드 인터럽트

megabox 2023. 6. 18. 12:30
반응형

python의 멀티프로세싱 풀에서 키보드 인터럽트

키보드 처리 방법파이썬의 멀티프로세싱 풀로 이벤트를 중단하시겠습니까?다음은 간단한 예입니다.

from multiprocessing import Pool
from time import sleep
from sys import exit

def slowly_square(i):
    sleep(1)
    return i*i

def go():
    pool = Pool(8)
    try:
        results = pool.map(slowly_square, range(40))
    except KeyboardInterrupt:
        # **** THIS PART NEVER EXECUTES. ****
        pool.terminate()
        print "You cancelled the program!"
        sys.exit(1)
    print "\nFinally, here are the results: ", results

if __name__ == "__main__":
    go()

때, 위의코를때실할행드,,KeyboardInterrupt누르면 올라갑니다.^C하지만 그 과정은 그 시점에서 중단될 뿐이고 외부적으로 죽여야 합니다.

나는 누를 수 있기를 원합니다.^C모든 프로세스가 정상적으로 종료되도록 합니다.

이것은 파이썬 버그입니다.스레드에서 조건을 대기할 때.조건 .wait(), 키보드인터럽트는 전송되지 않습니다.재생성:

import threading
cond = threading.Condition(threading.Lock())
cond.acquire()
cond.wait(None)
print "done"

키보드wait()이 반환될 때까지 인터럽트 예외는 전달되지 않으며, 다시는 반환되지 않으므로 인터럽트는 발생하지 않습니다.키보드인터럽트는 거의 확실히 상태 대기를 중단해야 합니다.

시간 초과가 지정된 경우에는 이러한 현상이 발생하지 않습니다. cond.wait(1)이 인터럽트를 즉시 수신합니다.따라서 해결 방법은 시간 초과를 지정하는 것입니다.이 작업을 수행하려면 대체

    results = pool.map(slowly_square, range(40))

와 함께

    results = pool.map_async(slowly_square, range(40)).get(9999999)

또는 그와 유사합니다.

제가 최근에 발견한 바로는, 가장 좋은 해결책은 SIGINT를 완전히 무시하도록 worker process를 설정하고, 모든 cleanup code를 parent process로 제한하는 것입니다.이렇게 하면 유휴 작업자 프로세스와 사용 중인 작업자 프로세스 모두에서 문제가 해결되고 하위 프로세스에서 오류 처리 코드가 필요하지 않습니다.

import signal

...

def init_worker():
    signal.signal(signal.SIGINT, signal.SIG_IGN)

...

def main()
    pool = multiprocessing.Pool(size, init_worker)

    ...

    except KeyboardInterrupt:
        pool.terminate()
        pool.join()

설명 및 전체 예제 코드는 각각 http://noswap.com/blog/python-multiprocessing-keyboardinterrupt/ 및 http://github.com/jreese/multiprocessing-keyboardinterrupt 에서 확인할 수 있습니다.

이유로 .Exception수업은 정상적으로 처리됩니다.해결책으로, 당신은 당신의 것을 다시 제기할 수 있습니다.KeyboardInterrupt한 사람으로서Exception인스턴스:

from multiprocessing import Pool
import time

class KeyboardInterruptError(Exception): pass

def f(x):
    try:
        time.sleep(x)
        return x
    except KeyboardInterrupt:
        raise KeyboardInterruptError()

def main():
    p = Pool(processes=4)
    try:
        print 'starting the pool map'
        print p.map(f, range(10))
        p.close()
        print 'pool map complete'
    except KeyboardInterrupt:
        print 'got ^C while pool mapping, terminating the pool'
        p.terminate()
        print 'pool is terminated'
    except Exception, e:
        print 'got exception: %r, terminating the pool' % (e,)
        p.terminate()
        print 'pool is terminated'
    finally:
        print 'joining pool processes'
        p.join()
        print 'join complete'
    print 'the end'

if __name__ == '__main__':
    main()

일반적으로 다음과 같은 출력이 표시됩니다.

staring the pool map
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
pool map complete
joining pool processes
join complete
the end

그래서 치면은.^C다음을 얻을 수 있습니다.

staring the pool map
got ^C while pool mapping, terminating the pool
pool is terminated
joining pool processes
join complete
the end

투표된 답변은 핵심 문제를 다루지 않고 유사한 부작용을 초래합니다.

Library의 인 Jesse Ctrl+C를 사용할 Ctrl+합니다.multiprocessing.Pool오래된 블로그 게시물에서.

import signal
from multiprocessing import Pool


def initializer():
    """Ignore CTRL+C in the worker process."""
    signal.signal(signal.SIGINT, signal.SIG_IGN)


pool = Pool(initializer=initializer)

try:
    pool.map(perform_download, dowloads)
except KeyboardInterrupt:
    pool.terminate()
    pool.join()

제출된 모든 작업이 완료될 때까지 차단하는 등의 방법을 실행하는 경우 이러한 답변 중 많은 수가 이전 버전의 Windows Python(3.8.5 실행 중)에서 작동하지 않는 것 같습니다.다음은 저의 해결책입니다.

  1. 으로 하세요.signal.signal(signal.SIGINT, signal.SIG_IGN)기본 프로세스에서 Ctrl-C를 완전히 무시합니다.
  2. 각 프로세서를 초기화할 풀 이니셜라이저를 사용하여 처리 풀을 초기화합니다. 글로벌 변수ctrl_c_entered됩니다.False 전화에로 했습니다.signal.signal(signal.SIGINT, signal.SIG_IGN)Ctrl-C를 처음에는 무시하도록 실행됩니다.이 통화의 반환 값이 저장됩니다. 이것은 원래의 기본 핸들러이며, 다시 설정하면 다음을 처리할 수 있습니다.KyboardInterrupt예외 사항
  3. 식가장,handle_ctrl_c입력 중인 Ctrl-C에서 즉시 종료해야 하는 다중 처리 함수 및 메서드를 장식하는 데 사용할 수 있습니다.이 장식가는 세계적인 것을 확인하기 위해 테스트할 것입니다.ctrl_c_entered " , "약플워를실다행", "드래는하것것반조입니환", "할차/대귀에신않찮고지키실행▁a▁flag하/것다▁is드를워▁return니입▁and는▁willmethod▁set▁thenot▁so▁if▁,/▁and▁bother▁instead▁even.KeyboardInterrupt예외 인스턴스입니다. 않은 경우 " " " " " " " " " " " " " " " " " " " " " " " " " " " " " " " " " " " " " " " " " " " " " " " " " " " " " " " " " " " "KeyboardInterrupt설정되고 장식된 기능/기능이 호출됩니다.한 경우 글로벌 Ctrl-C를을 .ctrl_c_entered됩니다.True a 리고a.KeyboardInterrupt예외 인스턴스가 반환됩니다.어쨌든, 반환하기 전에 장식자는 SIG_IGN 핸들러를 다시 설정합니다.

기본적으로 제출된 모든 작업은 시작할 수 있지만 반환 값이 다음과 같이 즉시 종료됩니다.KeyBoardInterruptCtrl-C를 입력하면 예외가 발생합니다.주 프로세스는 Ctrl-C가 입력되었는지 여부를 탐지하기 위해 이러한 반환 값의 존재 여부를 테스트할 수 있습니다.

from multiprocessing import Pool
import signal
from time import sleep
from functools import wraps

def handle_ctrl_c(func):
    @wraps(func)
    def wrapper(*args, **kwargs):
        global ctrl_c_entered
        if not ctrl_c_entered:
            signal.signal(signal.SIGINT, default_sigint_handler) # the default
            try:
                return func(*args, **kwargs)
            except KeyboardInterrupt:
                ctrl_c_entered = True
                return KeyboardInterrupt()
            finally:
                signal.signal(signal.SIGINT, pool_ctrl_c_handler)
        else:
            return KeyboardInterrupt()
    return wrapper

@handle_ctrl_c
def slowly_square(i):
    sleep(1)
    return i*i

def pool_ctrl_c_handler(*args, **kwargs):
    global ctrl_c_entered
    ctrl_c_entered = True

def init_pool():
    # set global variable for each process in the pool:
    global ctrl_c_entered
    global default_sigint_handler
    ctrl_c_entered = False
    default_sigint_handler = signal.signal(signal.SIGINT, pool_ctrl_c_handler)

def main():
    signal.signal(signal.SIGINT, signal.SIG_IGN)
    pool = Pool(initializer=init_pool)
    results = pool.map(slowly_square, range(10))
    if any(map(lambda x: isinstance(x, KeyboardInterrupt), results)):
        print('Ctrl-C was entered.')
    print(results)
    pool.close()
    pool.join()

if __name__ == '__main__':
    main()

인쇄:

Ctrl-C was entered.
[0, 1, 4, 9, 16, 25, 36, 49, KeyboardInterrupt(), KeyboardInterrupt()]

일반적으로 이 간단한 구조는 - on Pool에 C적용됩니다.

def signal_handle(_signal, frame):
    print "Stopping the Jobs."

signal.signal(signal.SIGINT, signal_handle)

몇 개의 유사한 게시물에 언급된 바와 같이:

Try-except 없이 Python에서 키보드 인터럽트

멀티프로세싱이 번거로우면서도 예외를 만드는 두 가지 문제가 있는 것 같습니다.(글렌이 한) 첫번글째것사한것다입니다는야한언급해용은당이신이렌을 해야 한다는 것입니다.map_async 으로웃아 대신 시간 map즉, 전체 목록 처리를 완료하지 않는 것이 좋습니다.는두번째(Andrey 가언는)한멀다상이음싱다것않에서 입니다.Exception (계속):SystemExit 이 두 다음은이두모두다솔루다루니입션는가지를▁).다.

import sys
import functools
import traceback
import multiprocessing

def _poolFunctionWrapper(function, arg):
    """Run function under the pool

    Wrapper around function to catch exceptions that don't inherit from
    Exception (which aren't caught by multiprocessing, so that you end
    up hitting the timeout).
    """
    try:
        return function(arg)
    except:
        cls, exc, tb = sys.exc_info()
        if issubclass(cls, Exception):
            raise # No worries
        # Need to wrap the exception with something multiprocessing will recognise
        import traceback
        print "Unhandled exception %s (%s):\n%s" % (cls.__name__, exc, traceback.format_exc())
        raise Exception("Unhandled exception: %s (%s)" % (cls.__name__, exc))

def _runPool(pool, timeout, function, iterable):
    """Run the pool

    Wrapper around pool.map_async, to handle timeout.  This is required so as to
    trigger an immediate interrupt on the KeyboardInterrupt (Ctrl-C); see
    http://stackoverflow.com/questions/1408356/keyboard-interrupts-with-pythons-multiprocessing-pool

    Further wraps the function in _poolFunctionWrapper to catch exceptions
    that don't inherit from Exception.
    """
    return pool.map_async(functools.partial(_poolFunctionWrapper, function), iterable).get(timeout)

def myMap(function, iterable, numProcesses=1, timeout=9999):
    """Run the function on the iterable, optionally with multiprocessing"""
    if numProcesses > 1:
        pool = multiprocessing.Pool(processes=numProcesses, maxtasksperchild=1)
        mapFunc = functools.partial(_runPool, pool, timeout)
    else:
        pool = None
        mapFunc = map
    results = mapFunc(function, iterable)
    if pool is not None:
        pool.close()
        pool.join()
    return results

저는 파이썬의 신입입니다.저는 답을 찾기 위해 모든 곳을 찾고 있었고 이것과 다른 블로그와 유튜브 비디오 몇 개를 우연히 발견했습니다.나는 위의 작성자 코드를 복사하여 윈도우 7 64비트의 파이썬 2.7.13에 복사하려고 했습니다.그것은 제가 이루고 싶은 것에 가깝습니다.

저는 C 제어를 무시하고 부모 프로세스를 종료하도록 자녀 프로세스를 만들었습니다.자식 프로세스를 우회하는 것이 저에게는 이 문제를 피할 수 있는 것 같습니다.

#!/usr/bin/python

from multiprocessing import Pool
from time import sleep
from sys import exit


def slowly_square(i):
    try:
        print "<slowly_square> Sleeping and later running a square calculation..."
        sleep(1)
        return i * i
    except KeyboardInterrupt:
        print "<child processor> Don't care if you say CtrlC"
        pass


def go():
    pool = Pool(8)

    try:
        results = pool.map(slowly_square, range(40))
    except KeyboardInterrupt:
        pool.terminate()
        pool.close()
        print "You cancelled the program!"
        exit(1)
    print "Finally, here are the results", results


if __name__ == '__main__':
    go()

으로 시작하는 .pool.terminate()절대 실행되지 않을 것 같습니다.

현재로서는 multiprocessing.pool 기능을 사용하지 않고 자신만의 풀 기능을 롤하는 것이 가장 좋은 해결책이라는 것을 알게 되었습니다.apply_async의 오류를 보여주는 예시와 풀 기능을 함께 사용하지 않는 방법을 보여주는 예시를 제공했습니다.

http://www.bryceboe.com/2010/08/26/python-multiprocessing-and-keyboardinterrupt/

다음과 같이 Pool 개체의 apply_async 메서드를 사용할 수 있습니다.

import multiprocessing
import time
from datetime import datetime


def test_func(x):
    time.sleep(2)
    return x**2


def apply_multiprocessing(input_list, input_function):
    pool_size = 5
    pool = multiprocessing.Pool(processes=pool_size, maxtasksperchild=10)

    try:
        jobs = {}
        for value in input_list:
            jobs[value] = pool.apply_async(input_function, [value])

        results = {}
        for value, result in jobs.items():
            try:
                results[value] = result.get()
            except KeyboardInterrupt:
                print "Interrupted by user"
                pool.terminate()
                break
            except Exception as e:
                results[value] = e
        return results
    except Exception:
        raise
    finally:
        pool.close()
        pool.join()


if __name__ == "__main__":
    iterations = range(100)
    t0 = datetime.now()
    results1 = apply_multiprocessing(iterations, test_func)
    t1 = datetime.now()
    print results1
    print "Multi: {}".format(t1 - t0)

    t2 = datetime.now()
    results2 = {i: test_func(i) for i in iterations}
    t3 = datetime.now()
    print results2
    print "Non-multi: {}".format(t3 - t2)

출력:

100
Multiprocessing run time: 0:00:41.131000
100
Non-multiprocessing run time: 0:03:20.688000

이 방법의 장점은 중단 전에 처리된 결과가 결과 사전에 반환된다는 것입니다.

>>> apply_multiprocessing(range(100), test_func)
Interrupted by user
{0: 0, 1: 1, 2: 4, 3: 9, 4: 16, 5: 25}

이상하게도 당신이 처리해야 할 것처럼 보입니다.KeyboardInterrupt 될 줄 ...나는 이것이 기록된 대로 작동할 것이라고 예상했을 것입니다.바꿔 보다slowly_square대상:

def slowly_square(i):
    try:
        sleep(1)
        return i * i
    except KeyboardInterrupt:
        print 'You EVIL bastard!'
        return 0

그것은 당신이 예상한 대로 작동할 것입니다.

언급URL : https://stackoverflow.com/questions/1408356/keyboard-interrupts-with-pythons-multiprocessing-pool

반응형