進程池(process pool)是一種管理進程的技術(shù),通過預(yù)先創(chuàng)建一定數(shù)量的進程來減少頻繁創(chuàng)建和銷毀進程的開銷。這種方法特別適用于需要并發(fā)執(zhí)行大量任務(wù)的場景,尤其是在處理cpu密集型任務(wù)時。
上圖展示了進程池模型,其中父進程(master進程)通過創(chuàng)建多個子進程(worker或slaver進程)并通過管道連接,向各個子進程派發(fā)任務(wù)。
進程池的主要作用包括:
- 提高性能:通過預(yù)先創(chuàng)建進程,減少了頻繁創(chuàng)建和銷毀進程的開銷,提升整體執(zhí)行效率。
- 減少系統(tǒng)資源消耗:避免了頻繁創(chuàng)建和銷毀進程導(dǎo)致的資源浪費,進程池通過復(fù)用已有進程,節(jié)省了資源。
- 提升任務(wù)響應(yīng)速度:由于進程池中的進程是預(yù)先創(chuàng)建的,可以快速分配空閑進程處理任務(wù)。
- 更好的資源管理:進程池可以限制系統(tǒng)中的最大并發(fā)進程數(shù),保護系統(tǒng)資源,避免過載。
- 并行處理:對于CPU密集型任務(wù),進程池通過并行化處理多個任務(wù),顯著提升處理效率。
進程池適用于大規(guī)模并發(fā)任務(wù)的處理,如Web爬蟲、數(shù)據(jù)處理和大規(guī)模計算等場景。
代碼模擬進程池管道信息
首先,我們通過控制創(chuàng)建子進程的數(shù)量來實現(xiàn)進程池。可以通過main函數(shù)的參數(shù)控制子進程的創(chuàng)建個數(shù)。
enum{ OK = 0, UsageError, PipeError, ForkError, }; void Usage(string proc){ cout << "Usage: " << proc << " <number_of_processes>" << endl; cout << "Example: " << proc << " 4" << endl; exit(UsageError); } // main函數(shù)的第一個參數(shù)是數(shù)組元素個數(shù),第二個元素是創(chuàng)建子進程個數(shù) int main(int argc, char *argv[]){ if(argc != 2){ Usage(argv[0]); return UsageError; } int num = atoi(argv[1]); // 接下來創(chuàng)建管道和子進程 }
我們封裝一個類來管理進程池:
using work_t = function<void>; class ProcessPool{ public: ProcessPool(int n, work_t w) : num(n), work(w) {} ~ProcessPool() {} int InitProcesspool(); void DisPatchTasks(); void CleanProcessPool(); private: vector<channel> channels; // 管道 int num; // 進程的個數(shù) work_t work; // 工作類型(void()) };
Channel類用于管理管道:
class Channel{ public: Channel(int wfd, pid_t who) : _wfd(wfd), _who(who) { _name = "Channel-" + to_string(wfd) + '-' + to_string(who); } string Name() { return _name; } void Send(int cmd) { write(_wfd, &cmd, sizeof(cmd)); } void Close() { close(_wfd); } pid_t Id() { return _who; } ~Channel() {} private: int _wfd; // master的寫端 string _name; // 對應(yīng)worker的名字 pid_t _who; // 記錄哪個子進程的管道 };
任務(wù)類模擬三種任務(wù):
void Download() { cout << "Downloading..." << endl; } void Log() { cout << "Logging..." << endl; } void Sql() { cout << "Executing SQL..." << endl; } using task_t = function<void>; class TaskManager{ public: TaskManager(){ srand(time(nullptr)); InsertTask(Download); InsertTask(Log); InsertTask(Sql); } ~TaskManager() {} void InsertTask(task_t t) { tasks[number++] = t; } int SelectTask() { return rand() % number; } void Excute(int number) { if(tasks.find(number) == tasks.end()) return; tasks[number](); } private: unordered_map<int, task_t> tasks; }; static int number = 0; TaskManager tmp; void Usage(string proc){ cout << "Usage: " << proc << " <number_of_processes>" << endl; cout << "Example: " << proc << " 4" << endl; exit(UsageError); }
接下來實現(xiàn)InitProcesspool()、DisPatchTasks()和CleanProcessPool()函數(shù):
int InitProcesspool(){ for (int i = 0; i < num; i++) { int pipefd[2]; if (pipe(pipefd) < 0) { perror("pipe"); return PipeError; } pid_t pid = fork(); if (pid < 0) { perror("fork"); return ForkError; } else if (pid == 0) { close(pipefd[1]); dup2(pipefd[0], 0); close(pipefd[0]); work(); exit(OK); } else { close(pipefd[0]); channels.push_back(Channel(pipefd[1], pid)); } } return OK; } void DisPatchTasks(){ int who = 0; int num = 20; while (num--) { int task = tmp.SelectTask(); Channel &curr = channels[who]; who++; who %= channels.size(); cout << "Dispatch task " << task << " to " << curr.Name() << endl; curr.Send(task); } } void CleanProcessPool() { for (auto &c : channels) { c.Close(); } for (auto &e : channels) { pid_t rid = waitpid(e.Id(), nullptr, 0); if (rid > 0) { cout << "子進程 " << rid << " 已回收" << endl; } } }
#include "ProcessPool.hpp" #include "Task.hpp" int main(int argc, char *argv[]){ if(argc != 2) { Usage(argv[0]); return UsageError; } int num = atoi(argv[1]); ProcessPool *pp = new ProcessPool(num, Worker); pp->InitProcesspool(); pp->DisPatchTasks(); pp->CleanProcessPool(); delete pp; return 0; }
相關(guān)頭文件和Makefile:
// Channel.hpp #ifndef __CHANNEL_HPP__ #define __CHANNEL_HPP__ #include <iostream> #include <string> #include <unistd.h> using namespace std; // ... (Channel類定義) #endif // ProcessPool.hpp #include <string> #include <unistd.h> #include <cstdlib> #include <vector> #include <sys> #include <functional> #include <sys> #include "Channel.hpp" #include "Task.hpp" // ... (ProcessPool類定義) // Task.hpp #pragma once #include <iostream> #include <unordered_map> #include <functional> #include <ctime> using namespace std; // ... (TaskManager類定義) // Makefile BIN=processpool cc=g++ FLAGS=-c -Wall -std=c++11 LDFLAGS=-o SRC=$(shell ls *.cc) OBJ=$(SRC:.cc=.o) $(BIN):$(OBJ) $(cc) $(LDFLAGS) $@ $^ %.o:%.cc $(cc) $(FLAGS) $< clean: rm -f $(OBJ) $(BIN)
總結(jié):
本文詳細介紹了進程池的概念及其在實際應(yīng)用中的作用。通過代碼模擬,我們展示了如何初始化進程池、分發(fā)任務(wù)、執(zhí)行任務(wù)邏輯以及清理進程池。文章還涵蓋了相關(guān)的封裝類和文件結(jié)構(gòu),如main.cc、Channel.hpp、ProcessPool.hpp、Task.hpp和Makefile,這些內(nèi)容為理解和實現(xiàn)進程池提供了全面的指導(dǎo)。
進程池是一種有效的資源管理技術(shù),能夠提高多任務(wù)處理的效率和系統(tǒng)性能。通過合理的設(shè)計和實現(xiàn),進程池可以在復(fù)雜的系統(tǒng)中發(fā)揮重要作用,減少資源浪費并提升任務(wù)執(zhí)行的穩(wěn)定性。希望本文的內(nèi)容能為讀者在實際項目中應(yīng)用進程池提供有價值的參考。