クイックソートをpthreadで並列化

クイックソートはソート対象領域をどんどん分割していく方式なので単純に並列化できるらしい。pthreadでやってみた。

pthread_createには関数へのポインタと、その関数への引数をvoid型ポインタで渡すようになっている。ということは複数の引数を渡すのって構造体にするのかなと思って、そう書いてみた。動いてるっぽい。

200万個のintの並べ替えだと、シングルスレッド処理のものに比べて7倍ぐらい速い気がする。うーん、7倍?

追記:中途半端にしかソートができてなくて、なんかコレ、壊れてまっせ。。。

#include <stdio.h>
#include <stdlib.h>
#include <time.h>
#include <pthread.h>

#define NUM_THREADS 3
#define SWAP(type, x, y) do {type tmp = x; x = y; y = tmp;}while(0)

typedef struct {
  int *ary;
  int begin;
  int end;
} partition;

partition *init_part(void) {
  return malloc(sizeof(partition));
}

void *myqsort(void *p) {

  pthread_t threads[NUM_THREADS];
  long t = 0;

  partition *part, *pnext;
  part = init_part();
  pnext = init_part();

  part = (partition *)p;

  int left = part->begin;
  int right = part->end;

  if (left >= right) {
    pthread_exit(NULL);
  }

  int pivot = part->ary[(left + right) / 2];
  
  while (1) {
    while(part->ary[left] < pivot) {
      left++;
    }
    while(part->ary[right] > pivot) {
      right--;
    }
    if (left >= right) {
      break;
    }

    SWAP(int, part->ary[left], part->ary[right]);
    left++;
    right--;
  }

  if (left - part->begin >= 2) {
    pnext->ary = part->ary;
    pnext->begin = part->begin;
    pnext->end = left - 1;
    pthread_create(&threads[t++], NULL, myqsort, (void *)pnext);
  }
  if (part->end - right >= 2) {
    pnext->ary = part->ary;
    pnext->begin = right + 1;
    pnext->end = part->end;
    pthread_create(&threads[t++], NULL, myqsort, (void *)pnext);
  }
}

int show_ary(int ary[], int size) {
  int i;
  for (i = 0; i < size; i++) {
    printf("%d:", ary[i]);
  }
  printf("\n");
}

int isLarger(const void *a, const void *b) {
  if (*(int *)a > *(int *)b) {
    return 1;
  } else {
    return -1;
  }
}

int main (int argc, char *argv[]) {
  int size;
  partition p;

  if (argc < 1) exit(1);
  size = atoi(argv[1]);

  int x[size], i, r; 

  srand(time(NULL));
  for (i = 0; i < size; i++) {
    x[i] = rand() % size;
  }

  //  show_ary(x, size);
  p.ary = x;
  p.begin = 0;
  p.end = size - 1;
  myqsort(&p);
  //  qsort(x, size, sizeof(int), isLarger);
  printf("--sort--\n");
  //show_ary(x, size);
}