sigqueue()/sigwaitinfo()を使用したスレッド間の非同期データ転送
(プログラムの概要)
シグナルと付加データを送るsigqueue()、シグナルを受信するまで待つsigwaitinfo()を使用して、スレッド間で非同期にデータを転送します。前々回のpthread_condを使用したデータ転送と違うのは、排他制御を行っていません。送信側はデータを送るだけ送り、受信側は受け取れるときにデータを受け取ります。 |
/*
* sample program
* data transfer with sigqueue() and sigwaitinfo().
*/
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <errno.h>
#include <pthread.h>
#include <signal.h>
#define BUFSIZE 256
typedef struct {
int cnt;
char buf[BUFSIZE];
} input_t;
void *output(void*);
int main(void)
{
int loop = 1, cnt = 0;
pid_t ownpid;
pthread_t thread_id = 0;
input_t *input_data;
sigset_t sigmask;
printf("sample program(%s) start\n", __FILE__);
ownpid = getpid();
/* block SIGRTMIN+1 in all threads */
sigemptyset(&sigmask);
sigaddset(&sigmask, SIGRTMIN + 1);
if(pthread_sigmask(SIG_BLOCK, &sigmask, NULL) != 0){
perror("pthread_sigmask error");
exit(1);
}
/* create new thread */
if(pthread_create(&thread_id, NULL, output, NULL) < 0){
perror("pthread_create error");
exit(1);
}
while(loop){
input_data = calloc(1, sizeof(input_t));
read(STDIN_FILENO, input_data->buf, BUFSIZE -1);
input_data->cnt = ++cnt;
/* send data to output() thread */
if(sigqueue(ownpid, SIGRTMIN + 1, (sigval_t)((void *)input_data)) < 0){
perror("sigqueue error");
exit(1);
}
if(strncmp(input_data->buf, "quit\n", 5) == 0){
loop = 0;
}
}
if(pthread_join(thread_id, NULL) < 0){
perror("pthread_join error");
exit(1);
}
printf("sample program end\n");
return 0;
}
void *output(void *arg)
{
int loop = 1;
sigset_t sigmask;
siginfo_t siginfo;
input_t *input_data;
printf("thread start\n");
/* set sigmask for sigwaitinfo() */
sigemptyset(&sigmask);
sigaddset(&sigmask, SIGRTMIN + 1);
while(loop){
/* waiting for signal and input data */
if(sigwaitinfo(&sigmask, &siginfo) < 0){
if(errno != EINTR){
perror("sigwaitinfo error");
exit(1);
}
continue;
}
input_data = (input_t *)siginfo.si_value.sival_ptr;
if(input_data == NULL){
printf("no data\n");
continue;
}
printf("%d:%s\n", input_data->cnt, input_data->buf);
if(strncmp(input_data->buf, "quit\n", 5) == 0){
loop = 0;
}
free(input_data);
}
printf("thread end\n");
pthread_exit(0);
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
(プログラムの概要)
前々回の「スレッドの同期1」で作成したプログラムと同じことをします。キーボードからの入力を待ち、入力された文字列を別スレッドに渡して表示を行います。前回との違いはデータの渡し方です。
sigwaitinfo()でシグナルを待っているスレッドに対して、sigqueue()でシグナルを通知します。この時void型ポインタを付加することでデータの受け渡しを実現します。 mainスレッド
29L:自分自身のプロセスIDを取得
sigqueue()でシグナルを送る先は自分自身になるためあらかじめ取得しておく。
31L~32L:シグナルのブロック
特定のスレッドのみがシグナルを受け取れるようにあらかじめシグナルをブロックしておく。
33L~36L:スレッドの生成
表示を行うoutputスレッドを作ります。
44L~56L:入力を受付る
標準入力から入力を受付、取得したデータをヒープから取得したメモリにコピーし、sigqueue()でoutputスレッドにシグナルを送っています。この時付加データであるsigvalとしてアドレスを指定しています。(正確には自分自身のプロセスにシグナルを発行、後述するsigwaitoinfo()で待っているoutputスレッドがシグナルを受信する。付加データのsigvalはintとvoid*のunionです。)
58L~61L:スレッドの終了を待つ。
outputスレッドの終了を待ってリソースを回収しています。
outputスレッド
75L~76L:シグナルマスクの設定
sigwaitinfo()で受け取るシグナルを設定する。今回はSIGRTMIN+1を使用している。
80L~86L:シグナルを待つ
上設定したシグナルが発行されるまで停止する。
87L~シグナルの付加データを取り出す。
siginfoのsi_valueに付加データが設定されている。mainから送られてきたデータを表示し、メモリを解放する。
(コンパイルと動作)
gcc -Wall -o sigqueue sigqueue.c -lpthread
キーボードから文字を入力すると入力された文字が表示されます。表示しているのはoutputスレッド
なのでデータがきちんと受け渡されていることがわかります。
(補足)
sigtimedwait()を使うと待ち時間を指定することができます。本当はsigtimedwait()を使用して10msに1回程度タイムアウトさせ、loopの値を確認する。loopはmainスレッドで落とすようにしてjoinさせる動きが正しい動きだと思います。今回はsigwaitinfo()の方が簡単だったのでこちらをサンプルにしました。