线程是操作系统能够进行运算调度的最小单位。它被包含在进程之中,是进程中的实际运作单位。一条线程指的是进程中一个单一顺序的控制流,一个进程中可以并发多个线程,每条线程并行执行不同的任务。
在Linux中,程序文件的格式都是ELF,这些文件在被执行的瞬间,就被载入内存,所谓的载入内存,如上图所示,就是将数据段、代码段这些运行时必要的资源拷贝到内存,另外系统会再分配相应的栈、堆等内存空间给这个进程,使之成为一个动态的实体。
线程的状态
- 所有进程(除了系统初始进程
systemd之外),都有一个父进程。
- 父进程通过调用
fork()函数,将自身复制一份形成一个子进程。
- 新创建的子进程拥有与父进程一样的执行代码、内存空间(父子进程的内存空间的内容是一致的,但分属不同的区域各自独立)等信息,并处于就绪态(
TASK_RUNNING)。
- 当进程退出时(不管是主动退出还是被动退出),进入僵尸态(
EXIT_ZOMBIE),僵尸态下的进程无法运行,也无法被调度,但其所占据的系统资源未被释放。僵尸态是进程的必经状态,编程过程中不能避免僵尸态,但要避免进程长时间处于僵尸态。
- 僵尸态进程要等待其父进程对其资源进程回收后,才能变成死亡态(
EXIT_DEAD),死亡态的进程所有占据的系统资源可以被系统随时回收。
在Linux中,线程的创建是通过pthread_create函数来实现的。该函数的原型如下:
1
|
int pthread_create(pthread_t *thread, const pthread_attr_t *attr, void *(*start_routine) (void *), void *arg);
|
其中,thread参数用于接收新创建线程的ID,attr参数用于设置新线程的属性,start_routine参数是线程运行的函数,arg参数是传递给start_routine函数的参数。
创建线程的步骤如下:
- 调用
pthread_create函数,传入线程ID、属性、运行函数和参数。
- 内核创建一个新的线程,并将其加入调度队列中。
- 运行
start_routine函数,完成线程的初始化。
- 线程开始运行,并执行用户指定的任务。
线程的终止可以通过以下方式实现:
- 线程主动调用
pthread_exit函数来结束自己的执行。
- 主线程调用
pthread_join函数来等待某个线程结束。
- 主线程调用
pthread_cancel函数来取消某个线程的执行。
- 某个线程被其他线程取消。
孤儿线程是指一个父线程退出,而它的子线程还在运行的情况。在Linux中,孤儿线程会被init进程(PID为1)所收养,从而成为init进程的子线程。
僵尸线程是指一个线程已经退出,但它的退出状态还没有被父线程获取的情况。在Linux中,僵尸线程会被内核回收,从而释放其占用的资源。
使用pthread_join函数可以等待一个线程结束,从而避免僵尸线程的产生。
线程安全性(Thread Safety)在编程中是指在多线程环境下,程序或代码段能够正确地执行,并且不引发任何竞态条件或数据损坏的问题。线程安全性主要涉及多个线程访问共享资源时的正确性和一致性。
线程安全问题的产生主要有以下几个原因:
-
共享资源的竞争:
- 多个线程同时读写共享变量或数据结构,可能会导致数据不一致或数据竞态问题。
- 举例:两个线程同时对一个全局变量进行写操作,可能导致数据错乱。
-
原子性操作:
- 某些操作在多线程环境下需要保证原子性,但在现实中,操作可能被中断。
- 举例:一个线程在读写操作之间被切换到另一个线程,这可能导致读到不完整的数据。
-
内存可见性:
- 多个线程对共享变量的更新,可能在其他线程中不可见,导致线程读取到旧的数据。
- 举例:一个线程更新了某个变量,但没有立即刷新到主内存,其他线程读到的仍然是旧值。
-
指令重排:
- 编译器或处理器为了优化性能,可能会对指令顺序进行重排,从而导致意外的执行顺序。
- 举例:一个线程期待某个操作在另一个操作之前执行,但由于重排,实际执行顺序并非如此。
为了解决线程安全性问题,通常采用以下方法:
- 互斥锁(Mutex):
通过互斥锁(如pthread_mutex_t)保护临界区,保证同一时间只有一个线程可以进入临界区。
1
2
3
4
5
|
pthread_mutex_t lock;
pthread_mutex_init(&lock, NULL);
pthread_mutex_lock(&lock);
// 临界区代码
pthread_mutex_unlock(&lock);
|
- 读写锁(Read-Write Lock):
允许多个线程同时读操作,但写操作是独占的,以提高并发性。
1
2
3
4
5
6
7
8
|
pthread_rwlock_t rwlock;
pthread_rwlock_init(&rwlock, NULL);
pthread_rwlock_rdlock(&rwlock);
// 读操作
pthread_rwlock_unlock(&rwlock);
pthread_rwlock_wrlock(&rwlock);
// 写操作
pthread_rwlock_unlock(&rwlock);
|
- 原子操作:
使用原子操作(如atomic_fetch_add)来保证操作的原子性,避免竞态条件。
1
2
3
4
5
6
7
|
#include <stdatomic.h>
atomic_store();//原子存储值。
atomic_load(); //原子读取值。
atomic_fetch_add(); //原子增加。
atomic_fetch_sub(); //原子减少。
atomic_exchange(); //原子交换值。
atomic_compare_exchange_strong();atomic_compare_exchange_weak(); //原子比较和交换。
|
- 条件变量(Condition Variables):
通过条件变量和互斥锁结合,解决线程间的同步问题,如果未达到某个条件,线程会阻塞,在条件达到时由生产者线程唤醒。
消费者线程
1
2
3
4
5
6
7
8
9
10
|
pthread_cond_t cond;
pthread_mutex_t cond_lock;
pthread_cond_init(&cond, NULL);
pthread_mutex_init(&cond_lock, NULL);
pthread_mutex_lock(&cond_lock);
while (condition_not_met) {
pthread_cond_wait(&cond, &cond_lock);
}
// 临界区代码
pthread_mutex_unlock(&cond_lock);
|
生产者线程
1
2
3
4
5
|
// 唤醒
pthread_mutex_lock(&cond_lock);
//生成者生产了资源,选择一个消费者线程唤醒
pthread_cond_signal(&cond);
pthread_mutex_unlock(&cond_lock);
|
- 线程局部存储(Thread Local Storage, TLS):
使用线程局部存储(如__thread或pthread_key_t)来避免共享数据。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
|
#include <stdio.h>
#include <pthread.h>
__thread int tls_var; // 线程局部变量
void *thread_func(void *arg) {
tls_var = (int)arg;
printf("Thread %d: tls_var = %d\n", (int)arg, tls_var);
return NULL;
}
int main() {
pthread_t t1, t2;
pthread_create(&t1, NULL, thread_func, (void *)1);
pthread_create(&t2, NULL, thread_func, (void *)2);
pthread_join(t1, NULL);
pthread_join(t2, NULL);
return 0;
}
|
每个线程都有独立的tls_var变量,线程1将其设置为1,线程2将其设置为2。它们的值互不干扰,各自独立。
编译器在编译时会检测到 __thread 或 thread_local 关键字,并为每个线程分配独立的存储空间。
编译器生成的代码会在每个线程的上下文中管理这些独立的存储区域。
- 内存屏障:
现在大多数现代计算机为了提高性能而采取乱序执行,这可能会导致程序运行不符合我们预期,内存屏障就是一类同步屏障指令,是CPU或者编译器在对内存随机访问的操作中的一个同步点,只有在此点之前的所有读写操作都执行后才可以执行此点之后的操作。
使用内存屏障(如__sync_synchronize)防止编译器和处理器对指令进行重排序或者使用原子操作atomic_thread_fence函数。
1
2
3
4
|
#include <stdatomic.h>
//这个函数会产生一个全局内存屏障,memory_order_seq_cst 表示严格顺序一致性
atomic_thread_fence(memory_order_seq_cst);
|
指定内存顺序
memory_order_relaxed (__ATOMIC_RELAXED)
定义:无顺序保证。
用途:这种内存顺序不提供任何同步或顺序保证,仅保证原子操作本身是原子的。这意味着其他线程可能看不到立即发生的变化。
应用场景:用于不需要同步的计数器等场景,例如非关键数据的统计。
memory_order_consume (__ATOMIC_CONSUME)
定义:数据依赖顺序(很少使用)。
用途:确保依赖于加载值的所有操作在加载之后执行,类似于 memory_order_acquire,但只对数据依赖有效。
应用场景:在现代硬件中使用较少,因为大多数实现将其等同于 memory_order_acquire。
memory_order_acquire (__ATOMIC_ACQUIRE)
定义:获取操作。
用途:防止后续的内存操作在原子操作之前执行。适用于从内存读取并确保接下来的读取操作不会被重排序到此原子操作之前。
应用场景:用于读取某个变量并确保读取后的操作能看到这个变量之前的所有写入。例如,在读锁中使用。
memory_order_release (__ATOMIC_RELEASE)
定义:释放操作。
用途:防止之前的内存操作在原子操作之后执行。适用于写入内存并确保写入之前的所有操作不会被重排序到此原子操作之后。
应用场景:用于写入某个变量并确保所有之前的写操作对其他线程可见。例如,在写锁中使用。
memory_order_acq_rel (__ATOMIC_ACQ_REL)
定义:获取和释放操作的组合。
用途:同时具备 memory_order_acquire 和 memory_order_release 的特性,确保之前的写操作不会重排序到原子操作之后,且后续的读操作不会重排序到原子操作之前。
应用场景:用于读-修改-写操作,例如在实现自旋锁或其他同步原语时使用。
memory_order_seq_cst (__ATOMIC_SEQ_CST)
定义:顺序一致性。
用途:提供最强的同步保证,所有的原子操作看起来按顺序执行。确保所有线程看到的操作顺序一致。
应用场景:用于需要严格同步和顺序保证的场景,确保多线程程序的可预测行为
这些内存屏障原语主要用于指示编译器和处理器不要对其前后的指令进行优化重排,以确保内存操作按照程序员预期的顺序执行。
死锁指的是由于某种逻辑问题,导致等待一把永远无法获得的锁的困境。比如最简单的是同一线程,连续对同一锁资源进行加锁,就进入了死锁。
死锁示例
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
|
void *routine(void *arg) {
thread_pool *pool = (thread_pool *)arg;
struct task *p;
while(1) {
// 操作临界资源之前,加锁
pthread_mutex_lock(&pool->lock);
// 条件不允许时,进入条件量等待
while(pool->waiting_tasks == 0 && !pool->shutdown)
pthread_cond_wait(&pool->cond, &pool->lock);
// 条件允许时,操作临界资源
p = pool->task_list->next;
pool->task_list->next = p->next;
pool->waiting_tasks--;
// !!! 注意 !!!
// 线程若恰好在此处被意外终止,将导致死锁
// 解锁
pthread_mutex_unlock(&pool->lock);
// 其他操作
pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, NULL);
(p->do_task)(p->arg);
pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL);
free(p);
}
pthread_exit(NULL);
}
|
线程池是一种常见的并发编程模型,它将任务提交和任务执行解耦,提高程序的并发性能和资源利用率。
线程池由一组线程组成,这些线程等待并处理任务。当有新的任务提交时,线程池会从线程池中选择一个空闲的线程来处理该任务。如果所有线程都处于忙碌状态,线程池会等待直到有空闲线程为止。
线程池的优点:
- 提高程序的并发性能和资源利用率。
- 降低线程创建和销毁的开销。
- 提高任务处理的安全性。
线程池的定义
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
|
#ifndef __THREADPOOL_H__
#define __THREADPOOL_H__
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <pthread.h>
#include <stdbool.h>
#include "kernel_list.h"
// 管理线程等待时间 ms
#define MANAGER_WAIT_TIME 1000
#define WAIT_PROPORTION 0.5
#define CHANGE_THREAD_NUM_PROPORTION 0.2
/**
* @brief 任务结构体
* @param func: 任务函数
* @param arg: 任务函数参数
* @param list: 链表节点
*/
typedef struct {
void* (*func)(void*);
void* arg;
struct list_head list;
} Task_t;
/**
* @brief 线程池结构体
* @param lock: 互斥锁
* @param cond: 条件变量
* @param threads: 线程数组
* @param manager: 管理线程
* @param taskQueue: 任务队列
* @param threadNum: 线程数量
* @param maxThreads: 最大线程数量
* @param minThreads: 最小线程数量
* @param waiting_tasks: 等待的任务数量
* @param max_waiting_tasks: 最大等待的任务数量
* @param active_threads: 活动线程数量
* @param shutdown: 是否关闭线程池
* @param started: 线程池是否已启动
*/
typedef struct {
pthread_mutex_t lock;
pthread_cond_t cond;
pthread_t* threads;
pthread_t manager;
Task_t taskQueue;
unsigned int threadNum;
unsigned int maxThreads;
unsigned int minThreads;
unsigned int waiting_tasks;
unsigned int active_threads;
bool shutdown;
bool started;
} ThreadPool_t;
/**
* @brief 创建线程池
* @param minThreads: 最小线程数量
* @param maxThreads: 最大线程数量
* @return 线程池指针
*/
ThreadPool_t* threadpool_init(unsigned int minThreads, unsigned int maxThreads);
/**
* @brief 销毁线程池
* @param pool: 线程池指针
*/
void threadpool_destory(ThreadPool_t* pool);
/**
* @brief 添加任务到线程池
* @param pool: 线程池指针
* @param func: 任务函数
* @param arg: 任务函数参数
*/
void threadpool_add_task(ThreadPool_t* pool, void* (*func)(void*), void* arg);
#endif
|
线程池实现
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
|
#include "threadpool.h"
#include <stdio.h>
/**
* @brief 线程池清理函数
* @param arg: 线程池指针
*/
void threadpool_thread_cleanup(void* arg) {
ThreadPool_t* pool = (ThreadPool_t*)arg;
pthread_mutex_unlock(&pool->lock);
}
/**
* @brief 线程池线程函数
* @param arg: 线程池指针
*/
void* threadpool_thread(void* arg) {
ThreadPool_t* pool = (ThreadPool_t*)arg;
pthread_cleanup_push(threadpool_thread_cleanup, pool);
while (1) {
pthread_mutex_lock(&pool->lock);
// 检查线程池是否关闭
if (pool->shutdown) {
pthread_mutex_unlock(&pool->lock);
break;
}
// 等待任务
while (list_empty(&pool->taskQueue.list)) {
pool->waiting_tasks++;
pthread_cond_wait(&pool->cond, &pool->lock);
}
// 执行任务前,不响应取消信号
pthread_setcanceltype(PTHREAD_CANCEL_DISABLE, NULL);
// 获取任务
Task_t* task = list_entry(pool->taskQueue.list.next, Task_t, list);
list_del(&task->list);
pool->waiting_tasks--;
pool->active_threads++;
pthread_mutex_unlock(&pool->lock);
// 执行任务
task->func(task->arg);
// 任务执行完毕,释放内存
free(task);
pthread_mutex_lock(&pool->lock);
pool->active_threads--;
pthread_mutex_unlock(&pool->lock);
// 响应取消信号
pthread_setcanceltype(PTHREAD_CANCEL_ENABLE, NULL);
}
pthread_cleanup_pop(0);
return NULL;
}
/**
* @brief 线程池管理函数
* @param arg: 线程池指针
*/
void* threadpool_manager(void* arg) {
ThreadPool_t* pool = (ThreadPool_t*)arg;
while (!pool->shutdown) {
usleep(1000*MANAGER_WAIT_TIME);
pthread_mutex_lock(&pool->lock);
int waiting_tasks = pool->waiting_tasks;
int active_threads = pool->active_threads;
int min_threads = pool->minThreads;
int max_threads = pool->maxThreads;
int threadNum = pool->threadNum;
pthread_mutex_unlock(&pool->lock);
// 判断是否需要增加线程
if (waiting_tasks > threadNum*WAIT_PROPORTION && threadNum < max_threads) {
int addNum = threadNum*CHANGE_THREAD_NUM_PROPORTION;
addNum = addNum > max_threads ? max_threads: addNum;
// 增加线程
pthread_mutex_lock(&pool->lock);
for (int i = threadNum; i < threadNum+addNum; i++) {
pthread_create(&pool->threads[i], NULL, threadpool_thread, (void*)pool);
pool->threadNum++;
}
pthread_mutex_unlock(&pool->lock);
} else if (waiting_tasks <= 0 && active_threads < threadNum) {
// 没有任务,且活跃线程数小于线程池大小,减少线程
int delNum = threadNum*CHANGE_THREAD_NUM_PROPORTION;
delNum = threadNum-delNum < min_threads ? threadNum-min_threads: delNum;
// 减少线程
pthread_mutex_lock(&pool->lock);
for (int i = threadNum-1; i >= threadNum-delNum; i--) {
// 杀死线程
pthread_cancel(pool->threads[i]);
pool->threadNum--;
}
pthread_mutex_unlock(&pool->lock);
// 等待线程退出
for (int i = threadNum-1; i >= threadNum-delNum; i--) {
pthread_join(pool->threads[i], NULL);
}
}
}
}
/**
* @brief 创建线程池
* @param minThreads: 最小线程数量
* @param maxThreads: 最大线程数量
* @return 线程池指针
*/
ThreadPool_t* threadpool_init(unsigned int minThreads, unsigned int maxThreads) {
// 分配线程池内存
ThreadPool_t* pool = (ThreadPool_t*)malloc(sizeof(ThreadPool_t));
if (pool == NULL) {
perror("init threadpool failed");
return NULL;
}
pool->threads = (pthread_t*)malloc(sizeof(pthread_t) * maxThreads);
if (pool->threads == NULL) {
perror("init threads failed");
free(pool);
return NULL;
}
// 初始化任务链表
INIT_LIST_HEAD(&pool->taskQueue.list);
// 初始化互斥锁和条件变量
pthread_mutex_init(&pool->lock, NULL);
pthread_cond_init(&pool->cond, NULL);
// 设置线程池参数
pool->minThreads = minThreads;
pool->maxThreads = maxThreads;
pool->threadNum = 0;
pool->waiting_tasks = 0;
pool->active_threads = 0;
pool->shutdown = false;
pool->started = true;
// 创建最小数量的线程
for (int i = 0; i < minThreads; i++) {
pthread_create(&pool->threads[i], NULL, threadpool_thread, pool);
pool->threadNum++;
}
// 创建管理线程
pthread_create(&pool->manager, NULL, threadpool_manager, pool);
// 返回线程池
return pool;
}
/**
* @brief 销毁线程池
* @param pool: 线程池指针
*/
void threadpool_destory(ThreadPool_t* pool) {
// 设置线程池关闭标志
pool->shutdown = true;
// 唤醒所有等待线程
pthread_cond_broadcast(&pool->cond);
// 取消所有线程
for (int i = 0; i < pool->threadNum; i++) {
pthread_cancel(pool->threads[i]);
}
// 等待所有线程退出
for (int i = 0; i < pool->threadNum; i++) {
pthread_join(pool->threads[i], NULL);
}
// 销毁互斥锁和条件变量
pthread_mutex_destroy(&pool->lock);
pthread_cond_destroy(&pool->cond);
// 释放线程池内存
free(pool->threads);
free(pool);
}
/**
* @brief 添加任务到线程池
* @param pool: 线程池指针
* @param func: 任务函数
* @param arg: 任务函数参数
*/
void threadpool_add_task(ThreadPool_t* pool, void* (*func)(void*), void* arg) {
// 创建任务节点
Task_t* task = (Task_t*)malloc(sizeof(Task_t));
task->func = func;
task->arg = arg;
// 加锁
pthread_mutex_lock(&pool->lock);
// 将任务节点添加到任务链表尾部
list_add_tail(&task->list, &pool->taskQueue.list);
pool->waiting_tasks++;
// 解锁
pthread_mutex_unlock(&pool->lock);
// 唤醒等待线程
pthread_cond_signal(&pool->cond);
}
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
|
#include <stdio.h>
#include <dirent.h>
#include <sys/stat.h>
#include <sys/types.h>
#include "threadpool.h"
typedef struct {
char srcPath[512];
char destPath[512];
} copyInfo;
void* copy(void* arg) {
copyInfo* info = (copyInfo*)arg;
FILE* srcFp = fopen(info->srcPath, "rb"); // 二进制读取模式
if (srcFp == NULL) {
perror("Error opening source file");
return NULL;
}
FILE* destFp = fopen(info->destPath, "wb"); // 二进制写入模式
if (destFp == NULL) {
perror("Error opening destination file");
fclose(srcFp);
return NULL;
}
// 使用大块缓冲区进行复制,提高效率
const size_t bufSize = 1024 * 1024; // 1 MB
char* buf = (char*)malloc(bufSize);
if (buf == NULL) {
perror("Error allocating buffer");
fclose(srcFp);
fclose(destFp);
return NULL;
}
size_t readSz;
while ((readSz = fread(buf, 1, bufSize, srcFp)) > 0) {
size_t writeSz = fwrite(buf, 1, readSz, destFp);
if (writeSz != readSz) {
perror("Error writing to destination file");
free(buf);
fclose(srcFp);
fclose(destFp);
return NULL;
}
}
free(buf);
fclose(srcFp);
fclose(destFp);
return NULL;
}
int main(int argc, char const *argv[]) {
// 从main函数获取源文件夹路径和目标文件夹路径
char* srcPath = argv[1];
char* destPath = argv[2];
if (access(srcPath, F_OK) != 0) {
perror("sec directory does not exist");
return 1;
}
if (access(destPath, F_OK) == 0) {
printf("Destination directory already exists\n");
return 1;
}
ThreadPool_t* pool = threadpool_init(5, 200);
// 判断目标文件夹是否存在,如果不存在则创建, access
if (access(destPath, F_OK) != 0){
if (mkdir(destPath, 0777) != 0) {
perror("Error creating destination directory");
return 1;
}
}
// 打开源文件夹
DIR* dir = opendir(srcPath);
// 遍历文件夹
struct dirent* entry;
while (1) {
entry = readdir(dir);
printf("read\n");
if (entry == NULL) {
break;
}
if (entry->d_type == DT_REG) { // 只复制普通文件
copyInfo* info = (copyInfo*)malloc(sizeof(copyInfo));
sprintf(info->srcPath, "%s/%s", srcPath, entry->d_name);
sprintf(info->destPath, "%s/%s", destPath, entry->d_name);
printf("Adding task: %s -> %s\n", info->srcPath, info->destPath);
threadpool_add_task(pool, copy, info);
}
}
printf("Added all tasks\n");
// 等待所有任务完成
while (!list_empty(&pool->taskQueue.list)) {
printf("not empty\n");
sleep(1);
}
printf("All tasks completed\n");
printf("当前线程数:%d", pool->threadNum);
closedir(dir);
threadpool_destory(pool);
return 0;
}
|