/* Student No.: 111550046 Student Name: 周哲煒 Email: chou.cs11@nycu.edu.tw SE tag: xnxcxtxuxoxsx Statement: I am fully aware that this program is not supposed to be posted to a public server, such as a public GitHub repository or a public web page. */ #define DEBUG 0 #include #include #include #include #include //#include #include //#include //#include using namespace std; // Define the number of max threads (NUM_THREADS) #define NUM_THREADS 8 // Struct to define a job struct Job { bool type; // 0 for bubble, 1 for merge int start, end; //arr idx int idx; int mid; }; // Global Variables sem_t job_sem; // Semaphore for signaling jobs sem_t done_sem; // Semaphore for signaling jobs pthread_mutex_t job_mutex; // Mutex for job list access vector job_list; // List of jobs vector sorted(15, false); vector> status; bool all_sorted = false; int *arr; // The array to be sorted int *arr_in; // The array to be sorted int n; // Size of the array // Function to perform bubble sort on part of the array void bubble_sort(int *array, int start, int end, int idx) { for (int i = start; i < end; i++) { for (int j = start; j < end - i + start ; j++) { if (array[j] > array[j + 1]) { swap(array[j], array[j + 1]); } } } sorted[idx] = true; if(DEBUG) cout << "bubble -->" << start << " : " << end << endl; } // Function to merge two sorted sub-arrays void merge(int *array, int left, int mid, int right, int idx) { int i = left, j = mid + 1, k = 0; vector temp(right - left + 1); while (i <= mid && j <= right) { if (array[i] <= array[j]) temp[k++] = array[i++]; else temp[k++] = array[j++]; } while (i <= mid) temp[k++] = array[i++]; while (j <= right) temp[k++] = array[j++]; for (i = left, k = 0; i <= right; i++, k++) { array[i] = temp[k]; } sorted[idx] = true; if(idx == 0){ all_sorted = true; } if(DEBUG) cout << "merge -->" << left << " : " << right << endl; } void *worker_thread(void *arg) { if(DEBUG) cout << "start worker" << endl; while (!all_sorted) { sem_wait(&job_sem); // Wait for a job to be available pthread_mutex_lock(&job_mutex); /** / if (job_list.size() == 0) { cout << "assert job_list.size() != 0 FAILED" << endl; int tmp; cin>> tmp; pthread_mutex_unlock(&job_mutex); sem_post(&done_sem); continue; } /**/ // Get the job from the list if(job_list.size() == 0){ pthread_mutex_unlock(&job_mutex); sem_post(&job_sem); continue; } Job job = job_list.back(); job_list.pop_back(); pthread_mutex_unlock(&job_mutex); //if (DEBUG) cout << job.idx << " , " << job.type << " " << job.start << " : " << job.end << endl; // Sort the array or merge if needed if (job.type == 0) { bubble_sort(arr, job.start, job.end, job.idx); } else { //int mid = ceil((job.start + job.end) / 2); merge(arr, job.start, job.mid, job.end, job.idx); } // Signal the dispatcher when the job is done (not shown yet) sem_post(&done_sem); } if(DEBUG) cout << "end worker" << endl; pthread_exit(NULL); return NULL; } // Dispatcher function to assign jobs void dispatcher_thread(int num_threads) { static int i; i=0; if(DEBUG) cout<< "start dispatcher" << endl; int chunk_size = n / 8; int tmp_remain = n % 8; status.clear(); sorted.clear(); sorted.resize(15, false); status.resize(15, {0,0}); while(!all_sorted){ /**/ bool flag =0; sem_wait(&done_sem); for(int i = 1; i < 15; i+=2){ if (sorted[i] && sorted[i + 1]) { int start = status[i][0]; int end = status[i+1][1]; int mid = status[i][1]; sorted[i] = false; sorted[i+1] = false; pthread_mutex_lock(&job_mutex); int idx = i/2; job_list.push_back(Job{1,start, end, idx, mid}); status[idx][0] = start; status[idx][1] = end; if(DEBUG) cout<< "assign merge -->" << start << " : " << end << endl; pthread_mutex_unlock(&job_mutex); sem_post(&job_sem); // Signal a new job flag=1; break; } } if(!flag){ //bubble for (; i < 8;) { int start = i * chunk_size + (i < tmp_remain + 1 ? i : tmp_remain); int end = (i+1) * chunk_size -1 + (i < tmp_remain ? i+1 : tmp_remain); int idx = 7+i; //cout << end-start+1 << " : (" << start << ", " << end << ")" << endl; pthread_mutex_lock(&job_mutex); job_list.push_back(Job{0,start, end,idx}); if(DEBUG) cout << "assign bubble -->" << start << " : " << end << endl; pthread_mutex_unlock(&job_mutex); status[idx][0] = start; status[idx][1] = end; sem_post(&job_sem); // Signal a new job flag = 1; i++; break; } } if(!flag){ sem_post(&done_sem); } /**/ } } /**/ //unit test int main(){ // Read input from input.txt ifstream input_file("input.txt"); input_file >> n; arr_in = new int[n]; for (int i = 0; i < n; i++) { input_file >> arr_in[i]; } input_file.close(); // Run the sorting process for different numbers of threads (1 to NUM_THREADS) for (int t = 1; t <= NUM_THREADS; t++) { arr = new int[n]; //vector array(arr, arr+n); for (int i = 0; i < n; i++) { arr[i] = arr_in[i]; } //vector array1(arr, arr+n); sem_init(&job_sem, 0, 0); sem_init(&done_sem, 0, t); job_list.clear(); pthread_t threads[NUM_THREADS]; int tmp1,tmp2; sem_getvalue(&job_sem, &tmp1); sem_getvalue(&done_sem, &tmp2); if(DEBUG) cout << "job_sem: " << tmp1 << " done_sem: " << tmp2 << endl; // Start measuring time struct timeval start, end; gettimeofday(&start, NULL); for (int i = 0; i < t; i++) { pthread_create(&threads[i], NULL, worker_thread, NULL); } //cout << "Sorting with " << t << " threads...\n"; // Dispatch sorting jobs dispatcher_thread(t); // in main thread // Wait for workers to finish for (int i = 0; i < t; i++) { sem_post(&job_sem); pthread_join(threads[i], NULL); } all_sorted = false; // Stop measuring time gettimeofday(&end, NULL); //ms double elapsed_time = (end.tv_sec - start.tv_sec)*1000 + (end.tv_usec - start.tv_usec) / 1000.0; cout << "worker thread #" << t << ", elapsed " << elapsed_time << " ms\n"; // Output sorted array to file string filename = "output_" + to_string(t) + ".txt"; ofstream output_file(filename); output_file << arr[0] ; for (int i = 1; i < n; i++) { output_file << " " << arr[i]; } output_file.close(); sem_destroy(&done_sem); sem_destroy(&job_sem); //pthread_mutex_destroy(&job_mutex); } } /**/