1、首先,关于如何开启一个线程,大多数人可能都会说3种,Thread、Runnable、Callback嘛!但事实却不是这样的。看JDK里怎么说的。
/**
* ...
* There are two ways to create a new thread of execution. One is to
* declare a class to be a subclass of <code>Thread</code>.
* The other way to create a thread is to declare a class that
* implements the <code>Runnable</code> interface.
* ....
*/
public class Thread implements Runnable{
}
Thread源码的类描述中有这样一段,翻译一下,只有两种方法去创建一个执行线程,一种是声明一个Thread的子类,另一种是创建一个类去实现Runnable接口。惊不惊讶,并没有提到Callback。
public class ThreadUnitTest {
@Test
public void testThread() {
//创建MyThread实例
MyThread myThread = new MyThread();
//调用线程start的方法,进入可执行状态
myThread.start();
}
//继承Thread类,重写内部run方法
static class MyThread extends Thread {
@Override
public void run() {
System.out.println("test MyThread run");
}
}
}
public class ThreadUnitTest {
@Test
public void testRunnable() {
//创建MyRunnable实例,这其实只是一个任务,并不是线程
MyRunnable myRunnable = new MyRunnable();
//交给线程去执行
new Thread(myRunnable).start();
}
//实现Runnable接口,并实现内部run方法
static class MyRunnable implements Runnable {
@Override
public void run() {
System.out.println("test MyRunnable run");
}
}
}
public class ThreadUnitTest {
@Test
public void testCallable() {
//创建MyCallable实例,需要与FutureTask结合使用
MyCallable myCallable = new MyCallable();
//创建FutureTask,与Runnable一样,也只能算是个任务
FutureTask<String> futureTask = new FutureTask<>(myCallable);
//交给线程去执行
new Thread(futureTask).start();
try {
//get方法获取任务返回值,该方法是阻塞的
String result = futureTask.get();
System.out.println(result);
} catch (ExecutionException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
//实现Callable接口,并实现call方法,不同之处是该方法有返回值
static class MyCallable implements Callable<String> {
@Override
public String call() throws Exception {
Thread.sleep();
return "test MyCallable run";
}
}
}
Callable的方式必须与FutureTask结合使用,我们看看FutureTask的继承关系
//FutureTask实现了RunnableFuture接口
public class FutureTask<V> implements RunnableFuture<V> {
}
//RunnableFuture接口继承Runnable和Future接口
public interface RunnableFuture<V> extends Runnable, Future<V> {
void run();
}
真相大白了,其实实现Callback接口创建线程的方式,归根到底就是Runnable方式,只不过它是在Runnable的基础上又增加了一些能力,例如取消任务执行等。
2、开启线程的几种方式算是答上了,那开启大量线程,或者说频繁开启线程到底会引起什么问题呢?
众所周知,在Java中,调用Thread的start方法后,该线程即置为就绪状态,等待CPU的调度。这个流程里有两个关注点需要去理解。
start内部怎样开启线程的?看看start方法是怎么实现的。
// Thread类的start方法
public synchronized void start() {
// 一系列状态检查
if (threadStatus != )
throw new IllegalThreadStateException();
group.add(this);
boolean started = false;
try {
//调用start0方法,真正启动java线程的地方
start0();
started = true;
} finally {
try {
if (!started) {
group.threadStartFailed(this);
}
} catch (Throwable ignore) {
}
}
}
//start0方法是一个native方法
private native void start0();
JVM中,native方法与java方法存在一个映射关系,Java中的start0对应c层的JVM_StartThread
方法,我们继续看一下。
JVM_ENTRY(void, JVM_StartThread(JNIEnv* env, jobject jthread))
JVMWrapper("JVM_StartThread");
JavaThread *native_thread = NULL;
bool throw_illegal_thread_state = false;
{
MutexLocker mu(Threads_lock);
// 判断Java线程是否已经启动,如果已经启动过,则会抛异常。
if (java_lang_Thread::thread(JNIHandles::resolve_non_null(jthread)) != NULL) {
throw_illegal_thread_state = true;
} else {
//如果没有启动过,走到这里else分支,去创建线程
//分配c++线程结构并创建native线程
jlong size =
java_lang_Thread::stackSize(JNIHandles::resolve_non_null(jthread));
size_t sz = size > ? (size_t) size : ;
//注意这里new JavaThread
native_thread = new JavaThread(&thread_entry, sz);
if (native_thread->osthread() != NULL) {
native_thread->prepare(jthread);
}
}
}
......
Thread::start(native_thread);
走到这里发现,Java层已经过渡到native层,但远远还没结束。
JavaThread::JavaThread(ThreadFunction entry_point, size_t stack_sz) :
Thread()
{
initialize();
_jni_attach_state = _not_attaching_via_jni;
set_entry_point(entry_point);
os::ThreadType thr_type = os::java_thread;
thr_type = entry_point == &compiler_thread_entry ? os::compiler_thread :
os::java_thread;
//根据平台,调用create_thread,创建真正的内核线程
os::create_thread(this, thr_type, stack_sz);
}
bool os::create_thread(Thread* thread, ThreadType thr_type,
size_t req_stack_size) {
......
pthread_t tid;
//利用pthread_create()来创建线程
int ret = pthread_create(&tid, &attr, (void* (*)(void*)) thread_native_entry, thread);
......
return true;
}
pthread_create
方法,第三个参数表示启动这个线程后要执行的方法的入口,第四个参数表示要给这个方法传入的参数。
static void *thread_native_entry(Thread *thread) {
......
//thread_native_entry方法的最下面的run方法,这个thread就是上面传递下来的参数,也就是JavaThread
thread->run();
......
return 0;
}
终于开始执行run方法了!!!
//thread.cpp类
void JavaThread::run() {
......
//调用内部thread_main_inner
thread_main_inner();
}
void JavaThread::thread_main_inner() {
if (!this->has_pending_exception() &&
!java_lang_Thread::is_stillborn(this->threadObj())) {
{
ResourceMark rm(this);
this->set_native_thread_name(this->get_thread_name());
}
HandleMark hm(this);
//注意:内部通过JavaCalls模块,调用了Java线程要执行的run方法
this->entry_point()(this, this);
}
DTRACE_THREAD_PROBE(stop, this);
this->exit(false);
delete this;
}
一条U字型代码调用链总算结束了,高呼一声,原来start之后做了这么多事情呀,稍微总结一下。
计算机的世界里,CPU会分为若干时间片,通过各种算法分配时间片来执行任务,有耳熟能详时间片轮转调度算法、短进程优先算法、优先级算法等。当一个任务的时间片用完,就会切换到另一个任务。在切换之前会保存上一个任务的状态,当下次再切换到该任务,就会加载这个状态, 这就是所谓的线程的上下文切换。
很明显,上下文的切换是有开销的,包括很多方面,操作系统保存和恢复上下文的开销、线程调度器调度线程的开销和高速缓存重新加载的开销等。
经过上面两个理论基础的回顾,开启大量线程引起的问题,总结起来,就两个字——开销。
3、针对上面提及到的问题,我们自然需要进行优化。线程的本质是为了执行任务,在计算机的世界里,任务分大致分为两类,CPU密集型任务和IO密集型任务。
另外,在无法避免,必须要开启大量线程的情况下,我们也可以使用线程池代替直接创建线程的做法进行优化。线程池的基本作用就是复用已有的线程,从而减少线程的创建,降低开销。
在Java中,线程池的使用还是非常方便的,JDK中提供了现成的ThreadPoolExecutor
类,我们只需要按照自己的需求进行相应的参数配置即可,这里提供一个示例。
/**
* 线程池使用
*/
public class ThreadPoolService {
/**
* 线程池变量
*/
private ThreadPoolExecutor mThreadPoolExecutor;
private static volatile ThreadPoolService sInstance = null;
/**
* 线程池中的核心线程数,默认情况下,核心线程一直存活在线程池中,即便他们在线程池中处于闲置状态。
* 除非我们将ThreadPoolExecutor的allowCoreThreadTimeOut属性设为true的时候,这时候处于闲置的核心 * 线程在等待新任务到来时会有超时策略,这个超时时间由keepAliveTime来指定。一旦超过所设置的超时时间,闲 * 置的核心线程就会被终止。
* CPU密集型任务 N+1 IO密集型任务 2*N
*/
private final int CORE_POOL_SIZE = Runtime.getRuntime().availableProcessors() + ;
/**
* 线程池中所容纳的最大线程数,如果活动的线程达到这个数值以后,后续的新任务将会被阻塞。包含核心线程数+非* * 核心线程数。
*/
private final int MAXIMUM_POOL_SIZE = Math.max(CORE_POOL_SIZE, );
/**
* 非核心线程闲置时的超时时长,对于非核心线程,闲置时间超过这个时间,非核心线程就会被回收。
* 只有对ThreadPoolExecutor的allowCoreThreadTimeOut属性设为true的时候,这个超时时间才会对核心线 * 程产生效果。
*/
private final long KEEP_ALIVE_TIME = ;
/**
* 用于指定keepAliveTime参数的时间单位。
*/
private final TimeUnit UNIT = TimeUnit.SECONDS;
/**
* 线程池中保存等待执行的任务的阻塞队列
* ArrayBlockingQueue 基于数组实现的有界的阻塞队列
* LinkedBlockingQueue 基于链表实现的阻塞队列
* SynchronousQueue 内部没有任何容量的阻塞队列。在它内部没有任何的缓存空间
* PriorityBlockingQueue 具有优先级的无限阻塞队列。
*/
private final BlockingQueue<Runnable> WORK_QUEUE = new LinkedBlockingDeque<>();
/**
* 线程工厂,为线程池提供新线程的创建。ThreadFactory是一个接口,里面只有一个newThread方法。 默认为DefaultThreadFactory类。
*/
private final ThreadFactory THREAD_FACTORY = Executors.defaultThreadFactory();
/**
* 拒绝策略,当任务队列已满并且线程池中的活动线程已经达到所限定的最大值或者是无法成功执行任务,这时候 * ThreadPoolExecutor会调用RejectedExecutionHandler中的rejectedExecution方法。
* CallerRunsPolicy 只用调用者所在线程来运行任务。
* AbortPolicy 直接抛出RejectedExecutionException异常。
* DiscardPolicy 丢弃掉该任务,不进行处理。
* DiscardOldestPolicy 丢弃队列里最近的一个任务,并执行当前任务。
*/
private final RejectedExecutionHandler REJECTED_HANDLER = new ThreadPoolExecutor.AbortPolicy();
private ThreadPoolService() {
}
/**
* 单例
* @return
*/
public static ThreadPoolService getInstance() {
if (sInstance == null) {
synchronized (ThreadPoolService.class) {
if (sInstance == null) {
sInstance = new ThreadPoolService();
sInstance.initThreadPool();
}
}
}
return sInstance;
}
/**
* 初始化线程池
*/
private void initThreadPool() {
try {
mThreadPoolExecutor = new ThreadPoolExecutor(
CORE_POOL_SIZE,
MAXIMUM_POOL_SIZE,
KEEP_ALIVE_TIME,
UNIT,
WORK_QUEUE,
THREAD_FACTORY,
REJECTED_HANDLER);
} catch (Exception e) {
LogUtil.printStackTrace(e);
}
}
/**
* 向线程池提交任务,无返回值
*
* @param runnable
*/
public void post(Runnable runnable) {
mThreadPoolExecutor.execute(runnable);
}
/**
* 向线程池提交任务,有返回值
*
* @param callable
*/
public <T> Future<T> post(Callable<T> callable) {
RunnableFuture<T> task = new FutureTask<T>(callable);
mThreadPoolExecutor.execute(task);
return task;
}
}