ํฐ์คํ ๋ฆฌ ๋ทฐ
โจ multiprocessing์ด ํ์ํ ์ด์
๋ง์ ๊ฐ๋ฐ์๋ค์ด ํ๋ ์ผ ์ค ํ๋๋ ๊ธฐ๋ค๋ฆฌ๊ธฐ์ผ ๊ฒ์ด๋ค. ์ต๊ทผ ํ์ฌ์ ์ทจ์ ์ ํ๊ณ ๋์ ์ ์ผ ๋จผ์ ๋งก์ ์ผ์ ๋๊ฐ์ ๋ฐ์ดํฐ๋ฒ ์ด์ค์ ์กด์ฌํ๋ sequence ์ ๋ณด๋ค์ ๊ฐ์ง๊ณ similarity๋ฅผ ๊ตฌํด relation ๋ฐ์ดํฐ๋ฒ ์ด์ค๋ฅผ ๋ง๋๋ ๊ฒ์ธ๋ฐ ๊ฑฐ์ ์ผ์ฃผ์ผ ๋๊ฒ ์ฝ์ง์ ํ์๋ค. (ํผ๋จ์ ์ฐ์..๐๐ปโ๏ธ) ๊ฐ ๋ฐ์ดํฐํ๋ ์์ sequence๋ค์ cross mappingํ์ฌ ๊ณ์ฐํด์ผํ๊ธฐ ๋๋ฌธ์ ๋๋ต์ ์ผ๋ก ์ฝ๋๋ฅผ ์ง๋ฉด ์๋์ ๊ฐ๋ค(sequence similarity ๊ณ์ฐ์ ์๋ตํ๊ณ sequence๋ฅผ ๋จ์ํ ๋ํ๋ ๊ณ์ฐ ์ฝ๋์)
def calculate(first_df, second_df):
results = []
for first_index, first_seq in first_df.values:
for second_index, second_seq in second_df.values:
results.append(first_seq+second_seq)
return results
๋ฐ์ดํฐ๊ฐ ์ ์ผ๋ฉด ๋๋ ค๋ ์ค๋ ๊ฑธ๋ฆด ์ผ์ด ์์ง๋ง ๋๊ฐ์ ๊ฒฝ์ฐ๋..3000 row์ ๋ฐ์ดํฐ๋ฒ ์ด์ค์ 8000 row์ ๋ฐ์ดํฐ๋ฒ ์ด์ค๋ผ ๋๋ต ๊ณ์ฐํ์ ๋ ์ฝ 14์๊ฐ ์ค์ ๋ก๋ ๋ ์ค๋ ๊ฑธ๋ฆด ๊ฒ์ด๋ค. ๊ฐ๋ฐ์์๊ฒ ์ด๋ฐ ๋นํจ์จ์ ์ธ ์ฝ๋๋ ํจ์จํ์ ์๊ตฌ๋ฅผ ๋ถ๋ฌ์ผ์ผํฌ ๊ฒ์ด๋ค. ๊ทธ๋์ ์ฌ์ฉํ๊ฒ multiprocessing module์ด๋ค.
โจ multiprocessing ์ ์์์ผ ํ ๊ฒ
๐ก CPU๊ฐ ๋ญ์ง?
multiprocessing์ ์ฌ์ฉํ๊ธฐ ์ ์ ๊ธฐ๋ณธ์ ์ผ๋ก ์์์ผํ ๊ฒ๋ค์ด ์๋ค. ๋ ์ด ๋ถ๋ถ์ ๋ชฐ๋ผ์ ์์ฒญ ์ฝ์งํ๋ค๐ ๋ค๋ฅธ ์ธ์ด๋ ์ ๋ชจ๋ฅด๊ฒ ์ง๋ง python์ 1๊ฐ์ cpu ์์์ ์ฝ๋๊ฐ ์คํ๋๋ค. cpu ๋ง์ด ๋ค์ด๋ดค์๊ฑฐ๋ค. python์์ ์๋์ ๊ฐ์ด ์ฝ๋๋ฅผ ์ ๋ ฅํ๋ฉด cpu ๊ฐฏ์๊ฐ ๋์จ๋ค. cpu๋ ์ปดํจํฐ์ ๋จธ๋ฆฌ๋ผ๊ณ ์๊ฐํ๋ฉด ๋๋๋ฐ ์ฐ๋ฆฌ๊ฐ ์์์ ๊ณ์ฐํ ๋ ๋จธ๋ฆฌ๋ฅผ ์ฌ์ฉํ๋ ๊ฒ์ฒ๋ผ ์ปดํจํฐ๊ฐ ์ฐ์ฐํ ๋ cpu๋ฅผ ์ฌ์ฉํ๋ค. ์์ฒ๋ผ for๋ฌธ์ for๋ฌธ์ ๋๋ฆฌ๋ ๊ฒฝ์ฐ๋ cpu๊ฐ ์ด์ฌํ ์ผํ๋ ๊ฒฝ์ฐ์ด๋ค. multiprocessing์์ pool์ ์ฌ์ฉํ๋ฉด ๋๋ค.
๐ก process๋ ๋ญ๋ฐ?
multiprocessing documentation์ ๋ณด๋ฉด multiprocessing์๋ process์ pool์ด ์๋ค. pool์ ๋ด๊ฐ ์์์ ๋งํ ๊ฒ์ฒ๋ผ cpu๋ฅผ ์ฌ๋ฌ๊ฐ ๋๋ฆฌ๊ธฐ ์ํด ์ฐ๋ ๊ฒ์ด๊ณ (cpu bound๋ฅผ ์ํ ๊ฒ์ด๋ผ ํ ์ ์๋ค.) process๋ I/O bound๋ฅผ ์ํ ๊ฒ์ด๋ค. I/O bound๊ฐ ๋ญ๋ฐ? I/O bound๋ input/output bound์ธ๋ฐ ์๋ฅผ ๋ค๋ฉด ์ด๋ฐ๊ฑฐ๋ค. ๋ด๊ฐ ์ฝ๋ฉ์ ํ๊ธฐ ์ํด pycharm์ ์ผ๋๋๋ค. ๊ทธ๋ฆฌ๊ณ ์์ ๊ตฌ๊ธ๋ง์ ์ํด ํฌ๋กฌ์ ํจ๋ค. ๊ทธ๋ฆฌ๊ณ ์ค๋ ๋ฐฐ์ด ๊ฒ์ ์ ์ด๋๊ธฐ ์ํด ๋ ธ์ ์ ์ผ๋๊ณ ์ค๊ฐ์ค๊ฐ ์ด๋ ๊ธ์ ์ฐ๊ธฐ ์ํด velog๋ฅผ ์ผ๋๋๋ค. ๋๋ ์ด์ ์ฝ๋ฉ์ ์์ํ ๊ฒ์ด๋ค. ๋จผ์ ์ฝ๋ฉ์ ํ๋ค. ๊ทธ๋ฌ๋ค ๋ชจ๋ฅด๋๊ฒ ์๊ฒผ๋ค. ๊ทธ๋ฌ๋ฉด ๋ฐ๋ก ๊ตฌ๊ธ๋ง์ ํ๋ค. ๊ทธ๋ฌ๋ค ์์๋์ผ๋ฉด ๋ค์ ์ฝ๋ฉ ๊ทธ๋ฌ๋ค ์ง์์ ํฐ๋ํ์ผ๋ฉด ๋ ธ์ ์ ๋ฐ๋ก ์ ๋ฆฌ ์ด๋ ๊ฒ ํ๋์ ๋ชธ๊ณผ ๋จธ๋ฆฌ๋ก ์ฌ๋ฌ๊ฐ๋ฅผ ์กฐ๊ธ์ฉ ์๋ผ์ ๋์์ ์คํํ๋ ๊ฒ์ I/O bound๋ผ๊ณ ํ๋ค.
import multiprocessing
multiprocessing.cpu_count() #๋ 10๊ฐ
cpu bound๋ ํ์ฌ์์ 10๊ฐ์ ์์ ์ ์งํํ๊ฒ ๋์๋ค๋ฉด ์ง์ 2๋ช ํํ 5๊ฐ์ฉ ๋๋ ์ ์ผ์ ์ํจ๋ค. ๊ทธ๋ฌ๋ฉด ๋์์ ๊ฐ์ ์์ ์ ์งํํ๊ฒ ๋ ๊ฑฐ๊ณ ๋ง์ง๋ง์ ๋ณํฉํ๋ฉด ๋๋ค. ์ด ๊ฒ์ cpu bound๋ผ๊ณ ํ๋ค. ๊ทธ๋์ ๊ฒฐ๊ตญ ๋ for๋ฌธ์ for๋ฌธ์ ๋๋ ค์ ์ฐ์ฐ์ ํด์ผํ๊ธฐ ๋๋ฌธ์ pool์ ์ฌ์ฉํ ๊ฒ์ด๋ค.
โจ multiprocessing ์ฌ์ฉ๋ฒ
์ฐ์ ์ ์ผ ๋จผ์ ํด์ผํ๋ ๊ฒ์ ๊ฐ cpu์ ๋๋ ์ค ์ ์๊ฒ ๋ฐ์ดํฐ๋ฅผ ๋น์ทํ ์์ผ๋ก ์ชผ๊ฐ๋ ๊ฒ์ด๋ค. ๋ด๊ฐ ์ฌ์ฉํ cpu์ ๊ฐฏ์๋ฅผ 10๊ฐ๋ผ๊ณ ํ๋ค๋ฉด ์๋์ ๊ฐ์ด ๋ฐ์ดํฐ๋ฅผ ์ชผ๊ฐค ์ ์๋ค. ์ฐธ๊ณ ๋ก ์ฌ์ฉํ cpu๋ฅผ multiprocessing์์ worker๋ผ๊ณ ๋ถ๋ฅธ๋ค. second_df๋ฅผ ์ถ๋ ฅํด๋ณด๋ฉด 10๊ฐ์ ๋ฆฌ์คํธ๋ก ๋์ด์๊ณ ๋ฆฌ์คํธ ์์๋ ๋ฐ์ดํฐํ๋ ์์ด ๋ค์ด์๋ค. 10๊ฐ์ ๋ฆฌ์คํธ ํ๋์ฉ 10๊ฐ์ worker๋ค์ด ์ผํ๊ฒ ๋๋ค.
import numpy as np
max_worker = 10
second_df = np.array_split(second_df, max_worker)
์ด ํ์ pool์ ์คํํด๋ณด์. ํ๋์ฉ ์ค๋ช ํ์๋ฉด ๋จผ์ Pool์ ์ด์ด map์ ์คํํ๋๋ฐ ์ฌ๊ธฐ์ ๊ถ๊ธํ ์ ํ๊ฐ์ง partial์ด ๋ญ์ง? partial์ ๊ณ ์ ์์ผ์ฃผ๋ ๊ฒ์ด๋ค. ๊ฒฐ๊ตญ ๋ for๋ฌธ์ for๋ฌธ์ ๋๋ ค์ผํ๋ค. ๊ทธ๋ฌ๋ฉด map์๋ ์ด๋ฐ ์๋ค์ด ๋ค์ด๊ฐ์ผ ํ๋ค. first_df์ ๊ฐฏ์๊ฐ 3000, second_df์ ๊ฐฏ์๊ฐ 8000์ด๋ผํ ๋ (first_seq1, second_seq1), (first_seq1, second_seq2), (first_seq1, second_seq3), …, (first_seq3000, second_seq7999), (first_seq3000, second_seq8000) ์ด๋ ๊ฒ cross mapping์ผ๋ก ๋ค์ด๊ฐ์ผํ๋ค. ์ด ๋ถ๋ถ์ ๋ ํจ์จํํ๋ ๊ฒ์ด partial์ด๋ค. first_df๋ ๊ณ ์ ํด์ฃผ๋ฉด second_df๊ฐ ๋ฐ๋ณตํ๋ ๋งํผ ์์์ ๋ฐ๋ณต์์ผ์ค๋ค. ์ผ๋ฐ์ ์ธ mapํจ์์ ๋น์ทํ๊ฒ ๋ค์ด๊ฐ๋ฉด ๋๋ค. ๊ทธ๋ฌ๊ณ ๊ฐ worker๋ค์ด ์ด์ฌํ ๊ณ์ฐํ ๊ฒ์ด ์ชผ๊ฐ์ ธ์ results์ ์ ์ฅ๋๋ค. ๊ทธ๋์ chain.from_iterable๋ก shape์ ์ค์ฌ์ค๋ค.
with Pool(processes=max_worker) as pool:
results = pool.map(partial(calculate, first_df), second_df)
df = pd.DataFrame(list(chain.from_iterable(results))
๋ชจ๋ ์ฝ๋๊ฐ ์์ฑ๋์๋ค. ์ฝ๋๋ฅผ ํฉ์ณ๋ณด๋ฉด ์ด๋ ๊ฒ ๋๋ค.
import numpy as np
import multiprocessing
from functools import partial
from itertools import chain
max_worker = 10
def calculate(first_df, second_df):
results = []
for first_index, first_seq in first_df.values:
for second_index, second_seq in second_df.values:
results.append(first_seq+second_seq)
return results
second_df = np.array_split(second_df, max_worker)
with Pool(processes=max_worker) as pool:
results = pool.map(partial(calculate, first_df), second_df)
df = pd.DataFrame(list(chain.from_iterable(results))
์ถ๊ฐ๋ก tqdm๋ ์ฌ์ฉํ ์ ์๋ค. ๋ด ์ฝ๋์์ tqdm(first_df.values)๋ก chopํ๋ฉด ๋๋ค.
โจ ์ฃผ์ํ ์
multiprocessing์ ์ฌ์ฉํ ๋ ๊ผญ ์๋์ ์ฝ๋์ ํจ๊ป ์จ์ผํ๋ค๊ณ ์ ํ์๋ค. ์จ์ผํ๋ ์ด์ ๋ module์ importํ ๋ ๋ฏธ๋ฆฌ ์คํ๋๋ ๊ฒ์ ๋ฐฉ์งํ๊ธฐ ์ํด์์ด๋ค. ์์ธํ ๋ด์ฉ์ ๊ตฌ๊ธ๋งํ๋ฉด ๋ง์ด ๋์จ๋ค.
if __name__ == '__main__'
์ฐธ๊ณ ์๋ฃ
'๐ป Dev' ์นดํ ๊ณ ๋ฆฌ์ ๋ค๋ฅธ ๊ธ
๐ฏ [Git] merge๋์ง ์์ ์์ค์ฝ๋ pullํ๊ธฐ (0) | 2024.04.17 |
---|---|
๐ฃ๏ธ vscode์์ jupyter pythonpath ์ค์ ํ๊ธฐ (0) | 2024.04.17 |
โ๏ธ jupyter notebook autoreloadํ๊ธฐ (0) | 2024.04.08 |
๐งผ python ํด๋ฆฐ์ฝ๋๋ฅผ ์ํ ๋ ธ๋ ฅ (0) | 2024.04.03 |
๐ tmux ์ฌ์ฉ๋ฒ (0) | 2024.04.03 |
- ์ฑ ๋ฆฌ๋ทฐ
- python
- ๋ฒ ์ด์ฆ ์ ๋ฆฌ
- vscode
- tmux
- GIT
- ๋จธ์ ๋ฌ๋ ์ด๋ก
- Generative Model
- linux
- ๊ฐ๋ฐ์
- ๋ ํ๊ฐ
- ํ๊ณ
- Multiprocessing
- Computer Vision
- ๊ธ๋
์ผ | ์ | ํ | ์ | ๋ชฉ | ๊ธ | ํ |
---|---|---|---|---|---|---|
1 | 2 | 3 | 4 | 5 | ||
6 | 7 | 8 | 9 | 10 | 11 | 12 |
13 | 14 | 15 | 16 | 17 | 18 | 19 |
20 | 21 | 22 | 23 | 24 | 25 | 26 |
27 | 28 | 29 | 30 | 31 |
- Total
- Today
- Yesterday