-
[파이썬] Ray로 병렬처리하기 (feat. Multiprocessing)개발 2022. 11. 7. 09:51
Multiprocessing
- 프로세스 스포닝(Process Spawning)을 지원하여 자원 내에서 사용 가능한 다중 프로세서를 활용 가능하게 함
더보기프로세스 스포닝 (Process Spawning) : 부모 프로세스가 운영 체제에 요청하여 자식 프로세스를 생성하는 과정
- 모듈 내 정의된 Pool, Process 함수 이용
Pool
- 실행시키고자 하는 함수를 process에 분배하여 병렬처리
# 로컬에서 활용 가능한 CPU 및 프로세스 확인 import multiprocessing as mp print(mp.cpu_count()) from multiprocessing import Pool # basic def process_P(): num_cores = 4 # job을 할당받을 Process의 수 pool = Pool(num_cores) pool.map(func, myList) # ex. 데이터프레임 전처리 def parallel_dataframe(df, func, num_cores): df_split = np.array_split(df, num_cores) pool = Pool(num_cores) df = pd.concat(pool.map(func, df_split)) # 작업 완료 후에도 메모리를 계속적으로 잡아먹는 것을 방지하기 위해 close, join 활용 pool.close() pool.join() return df
Process
- 하나의 프로세스에 하나의 함수를 할당하여 실행
from multiprocessing import Process def main(): procs = [] for num in range(1,10): proc = Process(target=work_func, args=(num,)) # target=인자에 작업을 할당하고, args=(agr1,)에 인자를 할당하여 프로세스 객체를 생성 procs.append(proc) proc.start() for proc in procs: proc.join() # 프로세스 종료
https://jonsyou.tistory.com/27
https://yganalyst.github.io/data_handling/memo_17_parallel/
Multiprocessing의 단점
- 큰 데이터를 다른 프로세스에 전달할 때 pickle을 사용해 직렬화(Serialize)한 뒤 전달
더보기- 직렬화 : 파이썬 객체를 일련의 바이트들로 변환하는 것
- pickle : 파이썬에서 사용하는 딕셔너리, 리스트, 클래스 등의 자료형을 변환없이 그대로 파일에 저장하고 불러올 때 사용하는 모듈 및 그러한 자료 형식
- 프로세스 개수만큼 데이터 복사본을 만들어야 하므로 큰 메모리를 사용
- 역직렬화를 통해 데이터를 받아야 하므로 굉장히 큰 오버헤드가 발생더보기- 오버헤드
- 프로그램의 실행흐름에서 나타나는 현상중 하나
- ex. 프로그램의 실행흐름 도중에 동떨어진 위치의 코드를 실행시켜야 할 때 , 추가적으로 시간,메모리,자원이 사용되는 현상
- 즉, 특정 기능을 수행하는데 드는 간접적인 시간, 메모리 등 자원
- 프로세스에 데이터를 전달하는 행위가 많아짐에 따라 오히려 수행 시간에 악영향
- multiprocessing 라이브러리를 사용하기 위해 기존에 작성한 코드를 수정해야 함 -> Ray의 경우 기존 함수에 데코레이터(Decorator)를 추가하고, 함수를 호출할 때 remote() 메서드를 이용하는 것 말고는 큰 차이가 없음
Ray가 빠른 이유
Ray는 multiprocessing 에서 발생하는 직렬화 오버헤드를 해결하기 위해 Apache Arrow를 사용합니다. Apache Arrow는 행(Row) 기반이 아닌 컬럼 기반의 인메모리 포맷으로 Zero-Copy 직렬화를 수행합니다. 또한 직렬화된 데이터를 인메모리 객체 저장소 (In-Memory Object Store)인 Plasma를 이용해 직렬화된 데이터를 빠르게 공유합니다.
Ray 코드 예시
# ----- ray ----- import ray print(ray.__version__) # ray 실행하기 ray.init(num_cpus=16, ignore_reinit_error=True) # num_cpus : CPU 개수 # Ray Task(Remote Function) 정의 # 기존 함수에 데코레이터(Decorator)를 추가하고, 함수를 호출할 때 remote() 메서드를 이용하는 것 말고는 큰 차이가 없음 @ray.remote def mul(x): # Task return x * 10 # 큰 데이터를 반복적으로 사용하게 된다면 ray.put()을 사용해 메모리 사용을 줄일 수 있음 # ray.put()은 데이터를 공유 메모리에 저장하여 복사본을 만들지 않고 모든 프로세스에서 접근할 수 있음 arr = ray.put(arr) # Object print(arr) # remote()를 사용해 Task 호출 # ObjectRef가 반환됨 arr = mul.remote(arr) print(arr) # Task를 실행해서 값을 python object로 반환 result = ray.get(arr) # Ray의 사용이 끝났다면 프로세스를 종료 ray.shutdown()
Ray 구성 요소
Task
- @ray.remote라는 데코레이터로 감싼 함수 → stateless worker
- 호출하는 곳과 다른 프로세스에서 실행되는 함수 또는 클래스
- Remote Function이라고도 부르며, 호출할 경우 비동기적(asynchronously)으로 실행됨더보기- Synchronous programming (동기 프로그래밍) : 주어진 task들을 순차적으로 실행하는 프로그래밍 방식. 하나의 작업이 수행중이면 그 다음 작업은 수행 완료까지 대기
- Asynchronous programming (비동기 프로그래밍) : task들을 병렬적으로 수행하는 프로그래밍 방식. 먼저 실행된 작업이 완료되지 않아도 다음 작업이 대기하지 않고 수행
- remote() 메서드를 써서 호출 가능하며 Future 객체(ObjectRef)를 반환
- ray.get(ObjectRef) 를 하여 Task를 실행하고 값을 반환받을 수 있음
Actor
- @ray.remote라는 데코레이터로 감싼 파이썬 클래스 인스턴스 → stateful worker
- @ray.remote를 통해서 Actor Class로 만들 수 있고, 이 클래스의 메서드 호출은 Stateful Task가 됨
Task와 Actor
- Task 와 Actor 모두 worker로 ray 에서는 remote 함수를 통해 실행 가능
- remote 함수는 파이썬 객체 대신 future 를 리턴하며 이 값은 다른 remote 함수의 인자로 사용할 수 있음
- @ray.remote로 감싼 함수(Task)는 stateless하고, @ray.remote로 감싼 클래스(Actor)의 Task는 stateful함더보기- 무상태 (Stateless)
- 클라이언트와 서버 관계에서 서버가 클라이언트의 상태를 보존하지 않음
- 이전의 클라이언트의 요청(상태)을 유지하지 않는 서버
- 장점 : 서버의 확장성이 높기 때문에 대량의 트래픽 발생 시에도 대처가 수월
- 단점 : 클라이언트의 요청에 상대적으로 Stateful 보다 더 많은 데이터가 소모
- 상태 유지 (Stateful)
- 클라이언트와 서버 관계에서 서버가 클라이언트의 상태를 보존함
- 서버는 사용자의 이전 요청을 모두 기억하며 진행
- 장점 : 기존의 서버가 혼잡해져서 새로운 서버를 가져다 놓아도 기존의 비즈니스 로직을 그대로 구현하고 있다면 이전의 사용자 요청이 어떤지에 관계없이 계속 일을 처리할 수 있음
- 단점 : 클라이언트가 하고자하는 최종 목적을 위해 지나는 과정마다 점점 전달해야하는 내용이 많아짐
- https://irostub.github.io/web/stateful-stateless/
Object
- Task를 통해서 반환되거나 ray.put()을 통해 생성되는 값
- 데이터의 크기가 큰 경우 ray.put()을 통해 Object로 만들어 Ray에서 빠르게 사용할 수 있음
- 그러나 Spark의 RDD와 같이 immutable함
- ray.put()은 데이터를 공유 메모리에 저장하여 모든 프로세스에 접근할 수 있으며 복사본을 만들지 않으므로 메모리를 아낄 수 있음
Driver
- 프로그램의 메인 루트
- ray.init()을 호출하면 실행
Job
- 동일한 드라이버에서 발생한 Task, Actor, Object의 컬렉션
Ray 함수
ray.init()
- ray 컨텍스트(context) 초기화를 진행
@ray.remote
- 병렬처리를 위한 ray만의 파이썬 데코레이터(decorator)
- 기본적으로 함수에 붙여서 사용하며 클래스(class)에 붙여서 사용하면 액터(actor)라고 부름
- ray.remote()로 사용할 수도 있음
.remote()
- @ray.remote 데코레이터를 붙였다면 언제든지 호출 할수 있음
- remote()로 호출한 함수에 인자(argument)를 던져줄때 .remote()에 던져줌
ray.put()
- Ray는 in-memory object storage 를 가지고 있는데, 이 메모리를 shared memory 로 사용할 수 있기 때문에 큰 객체를 shared memory에 업로드 해두면 worker나 actor에서 빠르게 접근할 수 있음
- 일반적인 경우 -> large_image가 num_cpu 만큼 프로세스에 할당되기 때문에 오버헤드가 발생large_image = np.zeros((3000, 3000)) for i in range(num_cpus): result_ref = f.remote(large_image, filter[i])
- ray.put()을 이용해서 ray를 object store에 업로드한 후 referece id 정보를 task에 넘겨줄 수 있음 -> task는 shared memory 에 저장된 image 를 이용해서 computational graph 를 만들게 됨# ray.put() 을 사용한 data id 공유 large_image = np.zeros((3000, 3000)) image_ref = ray.put(large_image) for i in range(num_cpus): result_ref = f.remote(image_ref, filter[i])
ray.get()
- referece id 값에 해당하는 값을 반환
- 이 함수가 호출될 때 비로소 분산처리가 시작되며 연속적으로(sequential) ray.get()을 지정했다면 선 ray.get() 작업이 끝날때 까지 뒤
업들은 대기
- 즉, 모든 worker가 작업을 종료할때 까지 기다려야함
ray.wait()
- Task 의 작업시간이 차이가 나는 경우 ray.wait()를 활용
- remote()로 지정된 오브젝트들 중에서 준비가 된 referece id를 반환
- 전체 실행시간을 줄일 수 있을 뿐 아니라 작업이 끝나지 않는 예외 상황에도 유연하게 대처할 수 있음
- (위) ray.get()과 (아래) ray.wait()의 실행 방식 비교
기타 참고 글
https://velog.io/@otzslayer/Ray%EB%A5%BC-%EC%9D%B4%EC%9A%A9%ED%95%B4-Python-%EB%B3%91%EB%A0%AC-%EC%B2%98%EB%A6%AC-%EC%89%BD%EA%B2%8C-%ED%95%98%EA%B8%B0
https://zzsza.github.io/mlops/2021/01/03/python-ray/
https://medium.com/naver-shopping-dev/ray-%EB%A1%9C-pytorch-model-inference-%ED%95%98%EA%B8%B0-77ce11304604
https://titania7777.tistory.com/15
https://github.com/ray-project/ray
https://medium.com/riiid-teamblog-kr/ray-%ED%99%95%EC%9E%A5-%EA%B0%80%EB%8A%A5%ED%95%9C-%EA%B3%A0%EC%84%B1%EB%8A%A5-%EB%B6%84%EC%82%B0-%EB%B3%91%EB%A0%AC-machine-learning-%ED%94%84%EB%A0%88%EC%9E%84%EC%9B%8C%ED%81%AC-f17f9c9cbef3
기타 파이썬 병렬 처리 모듈
swifter
https://kibua20.tistory.com/202
modin
https://zephyrus1111.tistory.com/170
dask
https://devtimes.com/python-dask/
gil
https://velog.io/@ceaseless/python-GIL%EA%B3%BC-%EB%B3%91%EB%A0%AC%ED%94%84%EB%A1%9C%EA%B7%B8%EB%9E%98%EB%B0%8D
'개발' 카테고리의 다른 글
[파이썬] 주피터 노트북에서 argparse 사용하기 (0) 2022.12.09 [파이썬] dotenv로 환경변수 관리하기 (0) 2022.11.27 [파이썬] git repo로 패키지 올려서 pip install하기 (0) 2022.11.04 [리눅스] xshell 사용법 (0) 2022.11.04 [에러] 정리 (1) 2022.11.04