ABOUT ME

-

Today
-
Yesterday
-
Total
-
  • [파이썬] Ray로 병렬처리하기 (feat. Multiprocessing)
    개발 2022. 11. 7. 09:51

    Multiprocessing

    - 프로세스 스포닝(Process Spawning)을 지원하여 자원 내에서 사용 가능한 다중 프로세서를 활용 가능하게 함

    더보기

    프로세스 스포닝 (Process Spawning) : 부모 프로세스가 운영 체제에 요청하여 자식 프로세스를 생성하는 과정

    - 모듈 내 정의된 Pool, Process 함수 이용

    Pool

    - 실행시키고자 하는 함수를 process에 분배하여 병렬처리

    # 로컬에서 활용 가능한 CPU 및 프로세스 확인
    import multiprocessing as mp
    print(mp.cpu_count())
    
    from multiprocessing import Pool
    
    # basic
    def process_P():
      num_cores = 4 # job을 할당받을 Process의 수
      pool = Pool(num_cores)
      pool.map(func, myList)
    
    # ex. 데이터프레임 전처리
    def parallel_dataframe(df, func, num_cores):
        df_split = np.array_split(df, num_cores)
        pool = Pool(num_cores)
        df = pd.concat(pool.map(func, df_split))
        # 작업 완료 후에도 메모리를 계속적으로 잡아먹는 것을 방지하기 위해 close, join 활용
        pool.close()
        pool.join()
        return df

    Process

    - 하나의 프로세스에 하나의 함수를 할당하여 실행

    from multiprocessing import Process
    
    def main():
        procs = []  
      
        for num in range(1,10):
            proc = Process(target=work_func, args=(num,)) # target=인자에 작업을 할당하고, args=(agr1,)에 인자를 할당하여 프로세스 객체를 생성
            procs.append(proc)
            proc.start()
            
        for proc in procs:
            proc.join() # 프로세스 종료


    https://jonsyou.tistory.com/27
    https://yganalyst.github.io/data_handling/memo_17_parallel/

    Multiprocessing의 단점

    - 큰 데이터를 다른 프로세스에 전달할 때 pickle을 사용해 직렬화(Serialize)한 뒤 전달

    더보기
    • 직렬화 : 파이썬 객체를 일련의 바이트들로 변환하는 것
    • pickle : 파이썬에서 사용하는 딕셔너리, 리스트, 클래스 등의 자료형을 변환없이 그대로 파일에 저장하고 불러올 때 사용하는 모듈 및 그러한 자료 형식

    - 프로세스 개수만큼 데이터 복사본을 만들어야 하므로 큰 메모리를 사용
    - 역직렬화를 통해 데이터를 받아야 하므로 굉장히 큰 오버헤드가 발생

    더보기
    • 오버헤드
      • 프로그램의 실행흐름에서 나타나는 현상중 하나
      • ex. 프로그램의 실행흐름 도중에 동떨어진 위치의 코드를 실행시켜야 할 때 , 추가적으로 시간,메모리,자원이 사용되는 현상
      • 즉, 특정 기능을 수행하는데 드는 간접적인 시간, 메모리 등 자원

     

    - 프로세스에 데이터를 전달하는 행위가 많아짐에 따라 오히려 수행 시간에 악영향
    - multiprocessing 라이브러리를 사용하기 위해 기존에 작성한 코드를 수정해야 함 -> Ray의 경우 기존 함수에 데코레이터(Decorator)를 추가하고, 함수를 호출할 때 remote() 메서드를 이용하는 것 말고는 큰 차이가 없음


    Ray가 빠른 이유

    Ray는 multiprocessing 에서 발생하는 직렬화 오버헤드를 해결하기 위해 Apache Arrow를 사용합니다. Apache Arrow는 행(Row) 기반이 아닌 컬럼 기반의 인메모리 포맷으로 Zero-Copy 직렬화를 수행합니다. 또한 직렬화된 데이터를 인메모리 객체 저장소 (In-Memory Object Store)인 Plasma를 이용해 직렬화된 데이터를 빠르게 공유합니다.

    Ray 코드 예시

    # ----- ray -----
    import ray
    print(ray.__version__)
    
    # ray 실행하기
    ray.init(num_cpus=16, ignore_reinit_error=True) # num_cpus : CPU 개수
    
    # Ray Task(Remote Function) 정의
    # 기존 함수에 데코레이터(Decorator)를 추가하고, 함수를 호출할 때 remote() 메서드를 이용하는 것 말고는 큰 차이가 없음
    @ray.remote
    def mul(x):          # Task
        return x * 10
    
    # 큰 데이터를 반복적으로 사용하게 된다면  ray.put()을 사용해 메모리 사용을 줄일 수 있음
    # ray.put()은 데이터를 공유 메모리에 저장하여 복사본을 만들지 않고 모든 프로세스에서 접근할 수 있음
    arr = ray.put(arr)   # Object
    print(arr)
    
    # remote()를 사용해 Task 호출
    # ObjectRef가 반환됨
    arr = mul.remote(arr)
    print(arr)
    
    # Task를 실행해서 값을 python object로 반환
    result = ray.get(arr)
    
    # Ray의 사용이 끝났다면 프로세스를 종료
    ray.shutdown()

    Ray 구성 요소

    Task

    - @ray.remote라는 데코레이터로 감싼 함수 → stateless worker
    - 호출하는 곳과 다른 프로세스에서 실행되는 함수 또는 클래스
    - Remote Function이라고도 부르며, 호출할 경우 비동기적(asynchronously)으로 실행됨

    더보기
    • Synchronous programming (동기 프로그래밍) : 주어진 task들을 순차적으로 실행하는 프로그래밍 방식. 하나의 작업이 수행중이면 그 다음 작업은 수행 완료까지 대기
    • Asynchronous programming (비동기 프로그래밍) : task들을 병렬적으로 수행하는 프로그래밍 방식. 먼저 실행된 작업이 완료되지 않아도 다음 작업이 대기하지 않고 수행

    - remote() 메서드를 써서 호출 가능하며 Future 객체(ObjectRef)를 반환
    - ray.get(ObjectRef) 를 하여 Task를 실행하고 값을 반환받을 수 있음

    Actor

    - @ray.remote라는 데코레이터로 감싼 파이썬 클래스 인스턴스 → stateful worker
    - @ray.remote를 통해서 Actor Class로 만들 수 있고, 이 클래스의 메서드 호출은 Stateful Task가 됨

    Task와 Actor

    - Task 와 Actor 모두 worker로 ray 에서는 remote 함수를 통해 실행 가능
    - remote 함수는 파이썬 객체 대신 future 를 리턴하며 이 값은 다른 remote 함수의 인자로 사용할 수 있음
    - @ray.remote로 감싼 함수(Task)는 stateless하고, @ray.remote로 감싼 클래스(Actor)의 Task는 stateful

    더보기
    • 무상태 (Stateless) 
      • 클라이언트와 서버 관계에서 서버가 클라이언트의 상태를 보존하지 않음
      • 이전의 클라이언트의 요청(상태)을 유지하지 않는 서버
      • 장점 : 서버의 확장성이 높기 때문에 대량의 트래픽 발생 시에도 대처가 수월
      • 단점 : 클라이언트의 요청에 상대적으로 Stateful 보다 더 많은 데이터가 소모
    • 상태 유지 (Stateful)
      • 클라이언트와 서버 관계에서 서버가 클라이언트의 상태를 보존함
      • 서버는 사용자의 이전 요청을 모두 기억하며 진행
      • 장점 : 기존의 서버가 혼잡해져서 새로운 서버를 가져다 놓아도 기존의 비즈니스 로직을 그대로 구현하고 있다면 이전의 사용자 요청이 어떤지에 관계없이 계속 일을 처리할 수 있음
      • 단점 : 클라이언트가 하고자하는 최종 목적을 위해 지나는 과정마다 점점 전달해야하는 내용이 많아짐
    • https://irostub.github.io/web/stateful-stateless/

     

    Object

    - Task를 통해서 반환되거나 ray.put()을 통해 생성되는 값
    - 데이터의 크기가 큰 경우 ray.put()을 통해 Object로 만들어 Ray에서 빠르게 사용할 수 있음
    - 그러나 Spark의 RDD와 같이 immutable함
    - ray.put()은 데이터를 공유 메모리에 저장하여 모든 프로세스에 접근할 수 있으며 복사본을 만들지 않으므로 메모리를 아낄 수 있음

    Driver

    - 프로그램의 메인 루트
    - ray.init()을 호출하면 실행

    Job

    - 동일한 드라이버에서 발생한 Task, Actor, Object의 컬렉션

    Ray 함수

    ray.init()

    - ray 컨텍스트(context) 초기화를 진행

    @ray.remote

    - 병렬처리를 위한 ray만의 파이썬 데코레이터(decorator)
    - 기본적으로 함수에 붙여서 사용하며 클래스(class)에 붙여서 사용하면 액터(actor)라고 부름
    - ray.remote()로 사용할 수도 있음

    .remote()

    - @ray.remote 데코레이터를 붙였다면 언제든지 호출 할수 있음
    - remote()로 호출한 함수에 인자(argument)를 던져줄때 .remote()에 던져줌

    ray.put()

    - Ray는 in-memory object storage 를 가지고 있는데, 이 메모리를 shared memory 로 사용할 수 있기 때문에 큰 객체를 shared memory에 업로드 해두면 worker나 actor에서 빠르게 접근할 수 있음
    - 일반적인 경우 -> large_image가 num_cpu 만큼 프로세스에 할당되기 때문에 오버헤드가 발생

    large_image = np.zeros((3000, 3000))
    for i in range(num_cpus):
    	result_ref = f.remote(large_image, filter[i])


    - ray.put()을 이용해서 ray를 object store에 업로드한 후 referece id 정보를 task에 넘겨줄 수 있음 -> task는 shared memory 에 저장된 image 를 이용해서 computational graph 를 만들게 됨

    # ray.put() 을 사용한 data id 공유
    large_image = np.zeros((3000, 3000))
    image_ref = ray.put(large_image)
    for i in range(num_cpus):
    	result_ref = f.remote(image_ref, filter[i])

    ray.get()

    - referece id 값에 해당하는 값을 반환
    - 이 함수가 호출될 때 비로소 분산처리가 시작되며 연속적으로(sequential) ray.get()을 지정했다면 선 ray.get() 작업이 끝날때 까지 뒤
    업들은 대기
    - 즉, 모든 worker가 작업을 종료할때 까지 기다려야함

    ray.wait()

    - Task 의 작업시간이 차이가 나는 경우 ray.wait()를 활용
    - remote()로 지정된 오브젝트들 중에서 준비가 된 referece id를 반환
    - 전체 실행시간을 줄일 수 있을 뿐 아니라 작업이 끝나지 않는 예외 상황에도 유연하게 대처할 수 있음
    - (위) ray.get()과 (아래) ray.wait()의 실행 방식 비교


    기타 참고 글
    https://velog.io/@otzslayer/Ray%EB%A5%BC-%EC%9D%B4%EC%9A%A9%ED%95%B4-Python-%EB%B3%91%EB%A0%AC-%EC%B2%98%EB%A6%AC-%EC%89%BD%EA%B2%8C-%ED%95%98%EA%B8%B0
    https://zzsza.github.io/mlops/2021/01/03/python-ray/
    https://medium.com/naver-shopping-dev/ray-%EB%A1%9C-pytorch-model-inference-%ED%95%98%EA%B8%B0-77ce11304604
    https://titania7777.tistory.com/15
    https://github.com/ray-project/ray
    https://medium.com/riiid-teamblog-kr/ray-%ED%99%95%EC%9E%A5-%EA%B0%80%EB%8A%A5%ED%95%9C-%EA%B3%A0%EC%84%B1%EB%8A%A5-%EB%B6%84%EC%82%B0-%EB%B3%91%EB%A0%AC-machine-learning-%ED%94%84%EB%A0%88%EC%9E%84%EC%9B%8C%ED%81%AC-f17f9c9cbef3

    기타 파이썬 병렬 처리 모듈

    swifter
    https://kibua20.tistory.com/202
    modin
    https://zephyrus1111.tistory.com/170
    dask
    https://devtimes.com/python-dask/
    gil
    https://velog.io/@ceaseless/python-GIL%EA%B3%BC-%EB%B3%91%EB%A0%AC%ED%94%84%EB%A1%9C%EA%B7%B8%EB%9E%98%EB%B0%8D

Designed by Tistory.