複数スレッドのキューのジョブのスケジュール

複数スレッドのキューのジョブのスケジュール

ディレクトリセット(5〜300ファイル間)のすべてのファイルを処理する必要がある関数があります。使用する並列スレッドの数は、ユーザーが指定します(通常4つ)。アイデアは、4つの別々のスレッドで関数を実行することです。 1つのスレッドが返されると、次(5番目)のファイル処理を開始し、すべてのファイルが完了するまで続きます。

Windowsではここで助けをWaitForMultipleObjects()受けましたbWaitAll=False。配列に埋め込むことができる構造体があります。

map<UINT, string>::iterator iter = m_FileList.begin();
string outputPath = GetOutputPath();
void ***threadArgs = (void***)malloc(sizeof(void**)*numThreads);
HANDLE *hdl = (HANDLE*)malloc(sizeof(HANDLE)*numThreads);
DWORD *thr = (DWORD*)malloc(sizeof(DWORD)*numThreads);

for (int t = 0; iter != m_FileList.end() && t < numThreads; t++, iter++)
{
    threadArgs[t] = prepThreadData(t, iter->second, opPath);
    printf("main: starting thread :%d %s outputPath: %s\n", t, iter->second.c_str(), threadArgs[t][2]);
    hdl[t] = CreateThread(NULL, 0, fileProc, (void*)threadArgs[t], 0, &thr[t]);
    if (hdl[t] == NULL)
    {
        err = GetLastError();
        printf("main: thread failed %x %x %s %s\n", err, iter->second.c_str(), threadArgs[t][2]);
    }
}

for (;iter != m_FileList.end(); iter++)
{
    int t = (int)WaitForMultipleObjects(numThreads, hdl, FALSE, INFINITE);
    if (t == WAIT_FAILED)
    {
        err = GetLastError();
        printf("main: thread failed %x %x\n", t, err);
    }
    if (t - WAIT_OBJECT_0 >= 0 && t - WAIT_OBJECT_0 < numThreads)
    {
        free(threadArgs[t][1]);
        free(threadArgs[t][2]);
        free(threadArgs[t]);
        threadArgs[t] = prepThreadData(t, iter->second, opPath);
        printf("main: starting thread :%d %s outputPath: %s\n", t, iter->second.c_str(), threadArgs[t][2]);
        hdl[t] = CreateThread(NULL, 0, fileProc, (void*)threadArgs[t], 0, &thr[t]);
        if (hdl[t] == NULL)
        {
            err = GetLastError();
            printf("main: thread failed %x %x %s %s\n", err, iter->second.c_str(), threadArgs[t][2]);
        }
    }
}
if (WAIT_FAILED == WaitForMultipleObjects(numThreads - 1, hdl, TRUE, INFINITE))     
{
    err = GetLastError();
    printf("main: thread failed %x %x\n", err);
}

私の問題は今、pthreadを使って同様の機能を得ることです。私が考える最善の方法は、セマフォを使用し、そのうちの1つが利用可能になったら、threadArgs配列を使用する代わりに新しいスレッドを作成することです。各スレッドに割り当てられたメモリを生成するポインタを使いましょう。さらに、メモリ管理を容易にするために、threadArgs [t]に割り当てられたメモリは、生成されたスレッドによって所有されます。

より良い解決策はありますか?それともWaitForMutlipleObjects()pthreadに似たものがありますか?より具体的にCreateThread()に変更したら、pthread_create()何に置き換えるべきですかWaitForMultipleObjects()

答え1

ジョブキューが欲しいようです。処理する必要があるファイルの集まりでそのキューを埋め、スレッド間の競合を防ぐために必要なロックを実行する関数を使用して、キューからエントリを削除します。その後、目的のスレッドを起動します。各スレッドはキューからエントリを取得して処理し、次のエントリを取得します。キューが空の場合、スレッドは追加の入力を待つことをブロックでき、追加の入力がない場合は終了する可能性があります。

簡単な例は次のとおりです。

#include <cstdio>
#include <mutex>
#include <queue>
#include <thread>

template<typename T>
class ThreadSafeQueue {
public:
    void enqueue(const T& element)
    {
        std::lock_guard<std::mutex> lock(m_mutex);

        m_queue.push(element);
    }

    bool dequeue(T& value)
    {
        std::lock_guard<std::mutex> lock(m_mutex);

        if (m_queue.empty()) {
            return false;
        }

        value = m_queue.front();
        m_queue.pop();

        return true;
    }

private:
    std::mutex m_mutex;
    std::queue<T> m_queue;
};

static void threadEntry(const int threadNumber, ThreadSafeQueue<std::string>* const queue)
{
    std::string filename;

    while (queue->dequeue(filename)) {
        printf("Thread %d processing file '%s'\n", threadNumber, filename.c_str());
    }
}

int main()
{
    ThreadSafeQueue<std::string> queue;

    // Populate queue
    for (int i = 0; i < 100000; ++i) {
        queue.enqueue("filename_" + std::to_string(i) + ".txt");
    }

    const size_t NUM_THREADS = 4;

    // Spin up some threads
    std::thread threads[NUM_THREADS];
    for (int i = 0; i < NUM_THREADS; ++i) {
        threads[i] = std::thread(threadEntry, i, &queue);
    }

    // Wait for threads to finish
    for (int i = 0; i < NUM_THREADS; ++i) {
        threads[i].join();
    }

    return 0;
}

編む:

$ g++ example.cpp -pthread

プログラムは、ThreadSafeQueue複数のスレッドが同時にアクセスできるように内部ロックを持つキューを定義します。

このmain関数は最初にキューに入れます。その後、4つのスレッドを起動します。各スレッドはキューから値を読み取り、それを「処理」します(ここではメッセージを標準出力に印刷します)。キューが空の場合、スレッドは終了します。関数mainは、スレッドが返される前に終了するのを待ちます。

この設計では、スレッドが開始される前にすべての要素がキューに入れられると仮定します。一部の変更を適用すると、スレッドの実行中に新しいジョブ処理をサポートするように拡張できます。

関連情報