๐Ÿ’ป Dev

๐Ÿƒ‍โ™€๏ธ [python] ray๋กœ ์นผํ‡ด๋ฅผ ํ•ด๋ณด์ž

์ด์œ  YIYU 2024. 5. 2. 15:10

์ด์ „ ํฌ์ŠคํŒ…์—์„  ๋ณ‘๋ ฌ์ฒ˜๋ฆฌ๋ฅผ ์œ„ํ•ด multiprocessing์„ ์‚ฌ์šฉํ–ˆ์ง€๋งŒ ์š”์ฆ˜ ๋“ค์–ด์„œ๋Š” ray๋ฅผ ํ™œ์šฉํ•˜๊ณ  ์žˆ์Šต๋‹ˆ๋‹ค. ์™œ multiprocessing์—์„œ ray๋กœ ๊ฐˆ์•„ํƒ”๋Š”์ง€์— ๋Œ€ํ•œ ์ด์œ ์™€ ray ์‚ฌ์šฉ๋ฒ•์— ๋Œ€ํ•ด ์ •๋ฆฌํ•˜์˜€์Šต๋‹ˆ๋‹ค.

๐Ÿ™‹๐Ÿป‍โ™€๏ธ multiprocessing์ด๋ž€?

multiprocessing์€ ํ”„๋กœ์„ธ์Šค ์Šคํฌ๋‹(process spawning)์„ ํ†ตํ•ด ์—ฌ๋Ÿฌ ํ”„๋กœ์„ธ์„œ๋ฅผ ํ™œ์šฉํ•œ๋‹ค. ์—ฌ๊ธฐ์„œ ๋งํ•˜๋Š” ํ”„๋กœ์„ธ์Šค ์Šคํฌ๋‹์— ๋Œ€ํ•ด ์„ค๋ช…ํ•˜์ž๋ฉด ์šฐ์„ , ์Šคํฌ๋‹์€ ์•Œ์„ ๋‚ณ๋Š”๋‹ค๋ž€ ์˜๋ฏธ์ด๋‹ค. ์ฆ‰ ๋ถ€๋ชจ ํ”„๋กœ์„ธ์Šค๊ฐ€ ์ž์‹ ํ”„๋กœ์„ธ์Šค๋ฅผ ๋งŒ๋“œ๋Š” ๊ณผ์ •์„ ์˜๋ฏธํ•œ๋‹ค. Pool ๊ฐ์ฒด๋ฅผ ํ†ตํ•ด ๋ณ‘๋ ฌ ์ฒ˜๋ฆฌ๋ฅผ ํ•˜๋ฉฐ ๋น„๋™๊ธฐ์ฒ˜๋ฆฌ, ์ดํ„ฐ๋ ˆ์ดํ„ฐ ๋“ฑ์˜ ์ฐจ์ด์— ๋”ฐ๋ผ map, imap, map_async, imap_unordered ๋“ฑ์˜ ๋ฉ”์„œ๋“œ๋ฅผ ์‚ฌ์šฉํ•œ๋‹ค.

multiprocessing ๋ผ์ด๋ธŒ๋Ÿฌ๋ฆฌ์—์„œ ํ”„๋กœ์„ธ์Šค ๊ฐ„์— ๊ฐ์ฒด๋ฅผ ์ „๋‹ฌํ•  ๋•Œ pickle์„ ์‚ฌ์šฉํ•ด ์ „๋‹ฌํ•˜๋Š”๋ฐ pickling์ด ๊ฐ€๋Šฅํ•œ ๊ฐ์ฒด(pickle-able)๋งŒ ์‚ฌ์šฉ ๊ฐ€๋Šฅํ•˜๋‹ค. pickle์€ ํŒŒ์ด์ฌ ๊ฐ์ฒด๋ฅผ ๋ฐ”์ดํŠธ ์ŠคํŠธ๋ฆผ์œผ๋กœ ์ง๋ ฌํ™”ํ•˜๊ณ  ๋‹ค์‹œ ์—ญ์ง๋ ฌํ™” ํ•˜์—ฌ ๊ฐ์ฒด๋ฅผ ์žฌ๊ตฌ์„ฑํ•˜๋Š” ๋ฐฉ์‹์ด๋‹ค. pickle ๋ชจ๋“ˆ ๋ฌธ์„œ์—์„  pickling์ด ๊ฐ€๋Šฅํ•œ ๋ชจ๋“ˆ์„ ๋‹ค์Œ๊ณผ ๊ฐ™์€ type์ด ์žˆ๋‹ค๊ณ  ํ•œ๋‹ค. ์ด๋Ÿฐ pickling ๋ฐฉ์‹์€ ๋ชจ๋“  ํ”„๋กœ์„ธ์Šค๊ฐ€ ๋ฐ์ดํ„ฐ์— ๋Œ€ํ•œ ๋ณต์‚ฌ๋ณธ์„ ๋งŒ๋“ค์–ด์•ผ ํ•˜๋ฉฐ, ํฐ ๋ฉ”๋ชจ๋ฆฌ๋ฅผ ํ• ๋‹นํ•˜๊ณ  ์—ญ์ง๋ ฌํ™”์—์„œ ๋ฐœ์ƒํ•˜๋Š” ์˜ค๋ฒ„ํ—ค๋“œ๋ฅผ ๊ฐ€์งˆ ์ˆ˜๋ฐ–์— ์—†๋‹ค.

  • None, true, and false
  • Integers, long integers, floating point numbers, complex numbers
  • Normal and Unicode strings
  • Tuples, lists, sets, and dictionaries containing only picklable objects
  • Functions defined at the top level of a module
  • Built-in functions defined at the top level of a module
  • Classes that are defined at the top level of a module

๐Ÿคท๐Ÿป‍โ™€๏ธ ray๋Š” ๋‹ค๋ฅธ๊ฒŒ ๋ญ”๋ฐ?

ray๋Š” ๋ณ‘๋ ฌ ์ฒ˜๋ฆฌ๋ฅผ ์œ„ํ•œ ์˜คํ”ˆ ์†Œ์Šค ํ†ตํ•ฉ ํ”„๋ ˆ์ž„์›Œํฌ์ด๋‹ค. ๋ฐ์ดํ„ฐ ์ „์ฒ˜๋ฆฌ ๋ฟ๋งŒ ์•„๋‹ˆ๋ผ ๋ถ„์‚ฐ training, hyperparameter tuing, model serving์—์„œ๋„ ์‚ฌ์šฉํ•  ์ˆ˜ ์žˆ๋‹ค. multiprocessing ๋ผ์ด๋ธŒ๋Ÿฌ๋ฆฌ์™€ ๋‹ค๋ฅด๊ฒŒ ๊ธฐ์กด์— ์ž‘์„ฑํ•œ ์ฝ”๋“œ๋ฅผ ์กฐ๊ธˆ๋งŒ ์ˆ˜์ •ํ•˜๋ฉด ๋ณ‘๋ ฌ์ฒ˜๋ฆฌ๋ฅผ ํ•  ์ˆ˜ ์žˆ๊ณ  ๋ฉ”๋ชจ๋ฆฌ ๋ˆ„์ˆ˜ ํ˜„์ƒ์ด ๋ฐœ์ƒํ•˜์ง€ ์•Š๋Š”๋‹ค. ray๋Š” ์ง๋ ฌํ™” ์˜ค๋ฒ„ํ—ค๋“œ๊ฐ€ ์ ์€ Apache Arrow๋ฅผ ์‚ฌ์šฉํ•˜์—ฌ Zero-copy ์ง๋ ฌํ™”๋ฅผ ์ˆ˜ํ–‰ํ•œ๋‹ค. zero-copy ์ง๋ ฌํ™”๋ž€ ๋ฐ์ดํ„ฐ๋ฅผ ๋ณต์‚ฌํ•˜์ง€ ์•Š๊ณ  ์ง์ ‘ ๋ฉ”๋ชจ๋ฆฌ์— ์ ‘๊ทผํ•˜์—ฌ ์ง๋ ฌํ™”๋ฅผ ์ˆ˜ํ–‰ํ•˜๊ธฐ ๋•Œ๋ฌธ์— ๋ณต์‚ฌ ์—ฐ์‚ฐ์„ ํ”ผํ•  ์ˆ˜ ์žˆ์–ด ๋Œ€๊ทœ๋ชจ ๋ฐ์ดํ„ฐ ์ฒ˜๋ฆฌ ์ž‘์—…์— ์œ ์šฉํ•˜๋‹ค. ์ฆ‰, ray๋Š” high-level์—์„œ ์‰ฝ๊ฒŒ ๋ถ„์‚ฐ ๊ฐ€๋Šฅํ•˜๋ฉฐ ๋ณ‘๋ ฌ์ ์œผ๋กœ ์‹คํ–‰์‹œํ‚ฌ ์ˆ˜ ์žˆ๋Š” ์‹œ์Šคํ…œ์„ ๊ตฌ์ถ•ํ•  ์ˆ˜ ์žˆ๋‹ค. ๋˜ํ•œ, ๋ถ„์‚ฐ/๋ณ‘๋ ฌ ์‹œ์Šคํ…œ์˜ ์•„๋ž˜์™€ ๊ฐ™์€ ์š”๊ตฌ์‚ฌํ•ญ๋“ค์„ ๊ฐ•๋ ฅํ•˜๊ณ  ์‰ฌ์šด ๋ฐฉ๋ฒ•์œผ๋กœ ํ’€ ์ˆ˜ ์žˆ๋„๋ก ๋„์™€์ค€๋‹ค.

  • ๋‹ค์ˆ˜์˜ ์ปดํ“จํ„ฐ์—์„œ ๋™์ผํ•œ ์ฝ”๋“œ๋กœ ์‹คํ–‰์‹œํ‚ฌ ์ˆ˜ ์žˆ์–ด์•ผ ํ•œ๋‹ค.
  • statefulํ•˜๊ณ  ํ†ต์‹  ๊ฐ€๋Šฅํ•œ Microservice ๋ฐ Actor๋ฅผ ๊ตฌ์ถ•ํ•  ์ˆ˜ ์žˆ์–ด์•ผ ํ•œ๋‹ค.
  • ๊ธฐ๊ณ„ ๊ณ ์žฅ ๋ฐ ์‹œ์Šคํ…œ ๊ณ ์žฅ์„ ํ›Œ๋ฅญํ•˜๊ฒŒ ์ฒ˜๋ฆฌํ•  ์ˆ˜ ์žˆ์–ด์•ผ ํ•œ๋‹ค.
  • ๊ฑฐ๋Œ€ํ•œ ๋ฐ์ดํ„ฐ์™€ ์ˆ˜์ฒด ๋ฐ์ดํ„ฐ๋ฅผ ํšจ์œจ์ ์œผ๋กœ ์ฒ˜๋ฆฌํ•  ์ˆ˜ ์žˆ์–ด์•ผ ํ•œ๋‹ค.

๐Ÿค” ray ์‚ฌ์šฉํ•ด๋ณด๊ธฐ

ray๋ฅผ ์„ค์น˜ํ•˜๊ณ  ๊ฐ„๋‹จํžˆ ์‚ฌ์šฉํ•ด๋ณด์ž.

pip install ray

๋จผ์ €, ray.init()์œผ๋กœ ray cluster๋ฅผ ์‹คํ–‰ํ•œ๋‹ค. ray.init์„ ํ†ตํ•ด dashboard ์‹คํ–‰, cpu core ๊ฐฏ์ˆ˜๋ฅผ ์„ค์ •ํ•  ์ˆ˜ ์žˆ๋‹ค. ๋ณ‘๋ ฌ์ฒ˜๋ฆฌ๋ฅผ ํ•˜๊ณ  ์‹ถ์€ ํ•จ์ˆ˜ ์œ„์— ray.remote ๋ฐ์ฝ”๋ ˆ์ดํ„ฐ๋กœ ๊ฐ์‹ธ์ค€๋‹ค. ๋ฐ์ฝ”๋ ˆ์ดํ„ฐ๋กœ ๊ฐ์‹ธ์ค€ ํ•จ์ˆ˜๋Š” remote(ObjectRef) ๋ฉ”์„œ๋“œ๋ฅผ ํ˜ธ์ถœํ•  ์ˆ˜ ์žˆ๋‹ค. remote ๋ฉ”์„œ๋“œ๋ฅผ ํ˜ธ์ถœํ•˜๋ฉด Object(Future ๊ฐ์ฒด) Ref(๊ณต์œ  ๋ฉ”๋ชจ๋ฆฌ ์ฃผ์†Œ)๋ฅผ ๋ฐ˜ํ™˜ํ•œ๋‹ค. ray.put()์„ ์ด์šฉํ•˜๋ฉด ๋ฐ์ดํ„ฐ๋ฅผ ๊ณต์œ  ๋ฉ”๋ชจ๋ฆฌ์— ์ €์žฅํ•˜์—ฌ ๋ณต์‚ฌ๋ณธ์„ ๋งŒ๋“ค์ง€ ์•Š๊ณ  ๋ชจ๋“  ํ”„๋กœ์„ธ์Šค์—์„œ ์ ‘๊ทผํ•  ์ˆ˜ ์žˆ๋‹ค. ray.get(ObjectRef)๋กœ ๊ฐ’์„ ๋ฐ˜ํ™˜๋ฐ›์„ ์ˆ˜ ์žˆ๋‹ค.

%%time
import numpy as np
import ray
 
ray.init()
arr = np.random.random(100000000)
 
@ray.remote
def mul(x):
    return x * x

arr = ray.put(arr)
result = ray.get(mul.remote(arr))

# CPU times: user 2.99 s, sys: 5.24 s, total: 8.22 s
# Wall time: 12 s

ray๋ฅผ ๋‹ค ์‚ฌ์šฉํ–ˆ์œผ๋ฉด ray.shutdown()์„ ์ด์šฉํ•ด ํ”„๋กœ์„ธ์Šค๋ฅผ ์ข…๋ฃŒํ•ด์•ผํ•œ๋‹ค.

ray.shutdown()

๐Ÿค– dashboard๋กœ ray๊ฐ€ ์ž˜ ์‹คํ–‰๋˜๊ณ  ์žˆ๋Š”์ง€ ํ™•์ธํ•˜๊ธฐ

๋จผ์ € ์•„๋ž˜์™€ ๊ฐ™์ด default๋กœ ์„ค์น˜ํ•ด์ค˜์•ผํ•œ๋‹ค.

pip install "ray[default]"

ray.init ์‹œ parameter์— ์•„๋ž˜์™€ ๊ฐ™์ด ์ถ”๊ฐ€ํ•ด์ฃผ๋ฉด 0.0.0.0:8265๋กœ dashboard๋ฅผ ํ™•์ธํ•  ์ˆ˜ ์žˆ๋‹ค.

ray.init(ignore_reinit_error=True, dashboard_host="0.0.0.0", dashboard_port=8265, include_dashboard=True)

cluster์—์„œ cpu ์‚ฌ์šฉ๋Ÿ‰๊ณผ memory๋ฅผ ํ™•์ธํ•  ์ˆ˜ ์žˆ๊ณ  jobs์—์„œ task๋“ค์„ ํ™•์ธํ•  ์ˆ˜ ์žˆ๋‹ค.

๊ฐ„๋‹จํžˆ, cli๋กœ๋„ ํ™•์ธํ•  ์ˆ˜ ์žˆ๋Š”๋ฐ ray status๋ฅผ ์ž…๋ ฅํ•˜๋ฉด ์‚ฌ์šฉํ•˜๊ณ  ์žˆ๋Š” cpu core, memory๋ฅผ ํ™•์ธํ•  ์ˆ˜ ์žˆ๋‹ค.

ray status
======== Autoscaler status: 2023-04-14 15:08:54.505294 ========
Node status
---------------------------------------------------------------
Healthy:
 1 node_a97857866db864797421c265df4c57eb57b2c4025798091347b49a9c
Pending:
 (no pending nodes)
Recent failures:
 (no failures)

Resources
---------------------------------------------------------------
Usage:
 16.0/16.0 CPU
 0.00/4.369 GiB memory
 0.00/2.000 GiB object_store_memory

Demands:
 {'CPU': 1.0}: 65115+ pending tasks/actors

โœ๏ธ ๋งˆ์น˜๋ฉฐ

์ด๋ ‡๊ฒŒ ray์™€ multiprocessing์˜ ์ฐจ์ด์™€ ray ์‚ฌ์šฉ๋ฒ•์— ๋Œ€ํ•ด ๋‹ค๋ค„๋ดค๋‹ค. multiprocessing์„ ์‚ฌ์šฉํ•  ๋•Œ ์Šคํฌ๋‹ ์•ˆ๋˜๋Š” ๋ผ์ด๋ธŒ๋Ÿฌ๋ฆฌ๋ฅผ ํ™œ์šฉํ–ˆ๋‹ค๊ฐ€ ์ด์œ ๊ฐ€ ๋ญ”์ง€ ์ž˜ ์ดํ•ดํ•˜์ง€ ๋ชปํ–ˆ์—ˆ๋Š”๋ฐ ์ด๋ฒˆ ํฌ์ŠคํŒ…์„ ์ž‘์„ฑํ•˜๋ฉด์„œ ์ดํ•ดํ•  ์ˆ˜ ์žˆ๊ฒŒ ๋๋‹ค. multiprocessing์„ ํ™œ์šฉํ•  ๋• ์–ด๋А์ •๋„ task๊ฐ€ ์ง„ํ–‰๋˜๊ณ  ์žˆ๋Š”์ง€ ์•Œ ์ˆ˜ ์—†์–ด ๋ถˆํŽธํ–ˆ๋Š”๋ฐ ray๋Š” ๋ณด๊ธฐ ์ข‹์€ dashboard๋ฅผ ์ œ๊ณตํ•ด์ค˜์„œ ์ผ์˜ ์ง„์ฒ™๋„๋„ ์‰ฝ๊ฒŒ ํŒŒ์•…ํ•  ์ˆ˜ ์žˆ์„ ๊ฒƒ ๊ฐ™๋‹ค. ๋ฌธ์„œ์—์„œ๋Š” ๋จธ์‹ ๋Ÿฌ๋‹ ๋ชจ๋ธ์„ ํ•™์Šตํ•  ๋•Œ๋„ ray๋ฅผ ํ™œ์šฉํ•  ์ˆ˜ ์žˆ๋‹ค๊ณ  ์†Œ๊ฐœํ•˜๊ณ  ์žˆ๋Š”๋ฐ ์ด ๋ถ€๋ถ„๋„ ์ข€ ๋” ํŒŒ๊ณ  ๋“ค์–ด๋ณด๋ฉด ์ข‹์„ ๊ฒƒ ๊ฐ™๋‹ค.

๐Ÿง ์ฐธ๊ณ ํ•œ ์ž๋ฃŒ