Sayi / sayi.github.com

个人博客已切换到公众号Evoopeed,请搜索:deepoove
44 stars 7 forks source link

并发(一)基础之线程篇 #51

Open Sayi opened 6 years ago

Sayi commented 6 years ago

在并发编程中,我们会遇到一些问题:线程数到底多少合适?如何才能编写出并发访问安全的程序?如果管理这些线程?......理解这些问题有益于我们写出优秀的并发代码,本系列文章将一步步剖析Java并发编程的细节。

本文重点介绍并发基础单元线程,它是进程的一部分。

基础知识:Thread

我们知道,有两个方式去创建线程:

1.通过继承Thread类
Thread实现了Runnable接口,但是run方法什么都没有做,继承Thread类,重写run方法

public class HelloThread extends Thread {
  public void run() {
    System.out.println("Hello from a thread!");
  }
}

接着我们可以通过start方法来启动线程:(new HelloThread()).start();

2.实现Runnable接口
Thread类提供了一个Runnable参数的构造器,我们可以通过Runnable对象创建线程。

public class HelloRunnable implements Runnable {

  public void run() {
    System.out.println("Hello from a thread!");
  }

  public static void main(String args[]) {
    (new Thread(new HelloRunnable())).start();
  }

}

注意:我们更倾向于实现Runnable接口,因为这种方法将线程执行的任务和线程本身分开,并且Runnable对象无需继承Thread类,扩展性更好。

Thread初始化源码

通过源码我们来看看创建一个线程到底初始化了哪些信息,下面的代码是一个完整参数的构造器:

public Thread(ThreadGroup group, Runnable target, String name, long stackSize) {
  init(group, target, name, stackSize);
}

在详细进入init方法前,我们先来仔细研究下这里的参数的含义:

1. ThreadGroup

指定了属于哪个线程组,如果没有指定,默认归属于当前线程的线程组。线程组的作用是维护一组线程,也可以拥有线程组,每个线程组都有一个父线程组,根的父线程组是NULL。

我们来看看线程组可以对一组线程做哪些事情:

ThreadGroup被认为是一个失败的尝试(《Java编程思想》),我们大可不必了解这个类。Thread类也提供了设置异常处理器的方法public void setUncaughtExceptionHandler(UncaughtExceptionHandler eh),可以使用这个方法单独制定某个线程异常处理。

2. target表示要执行的任务

3. name是一个线程的命名

可以重复,如果为空,会默认生成一个名称,生成规则是"Thread-" + nextThreadNum()

private static int threadInitNumber;
private static synchronized int nextThreadNum() {
  return threadInitNumber++;
}

4. stackSize表示线程占用的栈大小

每个线程将会维护一个栈空间存储变量,参数等信息,对于多个线程的共享变量,它们也会在栈内维护一个副本(这里会引入内存一致性问题,会在下一篇文章中详细讨论),stackSize为0将会忽略这个参数,使用默认值,默认值是通过JVM参数-Xss设置的。

-Xss -Xss sets the thread stack size. Thread stacks are memory areas allocated for each Java thread for their internal use. This is where the thread stores its local execution state.

这个默认值是平台相关的,下面的命令可以看出来,在我的机器上,默认线程占用的栈大小为1M(JVM Options)。

$ java -XX:+PrintFlagsInitial | grep ThreadStackSize
 intx CompilerThreadStackSize                   = 0                                   {pd product}
 intx ThreadStackSize                           = 1024                                {pd product}
 intx VMThreadStackSize                         = 1024                                {pd product}

接下来我们再深入初始化的init方法看看如何构造Thread对象:

private void init(ThreadGroup g, Runnable target, String name,
          long stackSize, AccessControlContext acc,
          boolean inheritThreadLocals) {
  if (name == null) {
    throw new NullPointerException("name cannot be null");
  }

  this.name = name;

  Thread parent = currentThread();
  SecurityManager security = System.getSecurityManager();
  if (g == null) {
    /* Determine if it's an applet or not */

    /* If there is a security manager, ask the security manager
       what to do. */
    if (security != null) {
      g = security.getThreadGroup();
    }

    /* If the security doesn't have a strong opinion of the matter
       use the parent thread group. */
    if (g == null) {
      g = parent.getThreadGroup();
    }
  }

  /* checkAccess regardless of whether or not threadgroup is
     explicitly passed in. */
  g.checkAccess();

  /*
   * Do we have the required permissions?
   */
  if (security != null) {
    if (isCCLOverridden(getClass())) {
      security.checkPermission(SUBCLASS_IMPLEMENTATION_PERMISSION);
    }
  }

  g.addUnstarted();

  this.group = g;
  this.daemon = parent.isDaemon();
  this.priority = parent.getPriority();
  if (security == null || isCCLOverridden(parent.getClass()))
    this.contextClassLoader = parent.getContextClassLoader();
  else
    this.contextClassLoader = parent.contextClassLoader;
  this.inheritedAccessControlContext =
      acc != null ? acc : AccessController.getContext();
  this.target = target;
  setPriority(priority);
  if (inheritThreadLocals && parent.inheritableThreadLocals != null)
    this.inheritableThreadLocals =
      ThreadLocal.createInheritedMap(parent.inheritableThreadLocals);
  /* Stash the specified stack size in case the VM cares */
  this.stackSize = stackSize;

  /* Set thread ID */
  tid = nextThreadID();
}

这样一个线程就初始化好了它归属于一个线程组,并且拥有自己的名称,这段代码主要做了下面这些事情:

  1. 初始化了线程组
  2. 设置是否是守护线程daemon,守线程是一种后台线程,关于守护线程和用户线程,这里不作扩展介绍。
  3. 设置了线程的优先级priority,这和线程调度的先后有关系,Thread类提供了三个常量MIN_PRIORITY=1、NORM_PRIORITY=5和MAX_PRIORITY=10。
  4. 初始化了任务target
  5. 初始化了stackSize
  6. 生成线程tid

我们注意到,此时只是初始化了线程组和线程,并没有将线程加入线程组,g.addUnstarted()方法也仅仅是将线程组里面为启动线程的计数累加,真正将线程加入线程组的代码是在start启动方法中。

Thread启动源码

启动一个线程是通过start()方法,而不是run()方法,当调用start方法后,run方法是由JVM去调用的。

public synchronized void start() {
  /**
   * This method is not invoked for the main method thread or "system"
   * group threads created/set up by the VM. Any new functionality added
   * to this method in the future may have to also be added to the VM.
   *
   * A zero status value corresponds to state "NEW".
   */
  if (threadStatus != 0)
    throw new IllegalThreadStateException();

  /* Notify the group that this thread is about to be started
   * so that it can be added to the group's list of threads
   * and the group's unstarted count can be decremented. */
  group.add(this);

  boolean started = false;
  try {
    start0();
    started = true;
  } finally {
    try {
      if (!started) {
        group.threadStartFailed(this);
      }
    } catch (Throwable ignore) {
      /* do nothing. If start0 threw a Throwable then
        it will be passed up the call stack */
    }
  }
}

private native void start0();

start方法修改了是否启动标志,然后调用native方法启动执行,接下来线程的执行将会由CPU进行调度了。

注意到其中group.add(this)方法正是将当前线程加入了线程组。

基础知识:synchronized

synchronized是Java提供并发同步的基本关键字,它有两种形式:synchronized方法和synchronized语句。

// 序号1
public synchronized void increment() {
  c++;
}

// 序号2
public static synchronized void increment() {
  c++;
}

// 序号3
private Object lock1 = new Object();
public void inc1() {
  synchronized(lock1) {
      c1++;
  }
}

// 序号4
public void addName(String name) {
  synchronized(this) {
    lastName = name;
    nameCount++;
  }
  nameList.add(name);
}

// 序号5
public void add() {
  synchronized(User.class) {
    nameCount++;
  }
}

我们都知道synchronized保证了代码块当且只有一个线程可以进入,其余若干线程将会阻塞,即这个线程拥有了锁,当这个线程退出代码块后,调度器将会调度阻塞的某一个线程拥有锁。

但是我们还必须知道synchronized到底锁住了什么,才能知道哪些线程会被阻塞,才能更合理的使用这个关键字。

Java中每一个对象都关联一个内在锁(intrinsic lock)或者监视器(monitor),线程可以锁住这个内在锁,也可以释放这个内在锁,在某一刻,当且只能有一个线程拥有这个锁,然而同一个线程可以拥有这个锁多次,称之为重入锁。

我们可以查看同步块对应的字节码,可以看到monitorenter和monitorexit命令,代表拥有锁和释放锁,重入的实现机制是对计数器的累加和减少,当计数为0时,表示当前线程释放了锁。

我们理解了内在锁后,再看看上面的示例代码:

当前线程是否拥有某个对象的锁,可以通过Thread提供的静态方法判断:

public static native boolean holdsLock(Object obj);

线程状态转化

从启动一个线程后,线程会被阻塞,也会被重新调度执行,最终会被销毁,这些称之为状态转化。 image

Thread也提供了获取状态的方法public State getState(),我们可以仔细阅读下State的源码注释,对应上图我们把BLOCKED、WAITING、TIME_WAITING统称为阻塞状态:

public enum State {
  /**
   * Thread state for a thread which has not yet started.
   */
  NEW,

  /**
   * Thread state for a runnable thread.  A thread in the runnable
   * state is executing in the Java virtual machine but it may
   * be waiting for other resources from the operating system
   * such as processor.
   */
  RUNNABLE,

  /**
   * Thread state for a thread blocked waiting for a monitor lock.
   * A thread in the blocked state is waiting for a monitor lock
   * to enter a synchronized block/method or
   * reenter a synchronized block/method after calling
   * {@link Object#wait() Object.wait}.
   */
  BLOCKED,

  /**
   * Thread state for a waiting thread.
   * A thread is in the waiting state due to calling one of the
   * following methods:
   * <ul>
   *   <li>{@link Object#wait() Object.wait} with no timeout</li>
   *   <li>{@link #join() Thread.join} with no timeout</li>
   *   <li>{@link LockSupport#park() LockSupport.park}</li>
   * </ul>
   *
   * <p>A thread in the waiting state is waiting for another thread to
   * perform a particular action.
   *
   * For example, a thread that has called <tt>Object.wait()</tt>
   * on an object is waiting for another thread to call
   * <tt>Object.notify()</tt> or <tt>Object.notifyAll()</tt> on
   * that object. A thread that has called <tt>Thread.join()</tt>
   * is waiting for a specified thread to terminate.
   */
  WAITING,

  /**
   * Thread state for a waiting thread with a specified waiting time.
   * A thread is in the timed waiting state due to calling one of
   * the following methods with a specified positive waiting time:
   * <ul>
   *   <li>{@link #sleep Thread.sleep}</li>
   *   <li>{@link Object#wait(long) Object.wait} with timeout</li>
   *   <li>{@link #join(long) Thread.join} with timeout</li>
   *   <li>{@link LockSupport#parkNanos LockSupport.parkNanos}</li>
   *   <li>{@link LockSupport#parkUntil LockSupport.parkUntil}</li>
   * </ul>
   */
  TIMED_WAITING,

  /**
   * Thread state for a terminated thread.
   * The thread has completed execution.
   */
  TERMINATED;
}

obj.wait()、obj.notify()、obj.notifyAll()

wait方法是Object提供的一个final方法,目的是使当前线程进入等待阻塞状态,直到调用这个对象obj的notify或者notifyAll方法,还提供了一个时间参数的重载方法,表示等待阻塞一定时间。

obj.wait方法是让拥有当前对象内在锁的线程进入一个等待集合,然后释放这个对象的内在锁,进入等待状态。 所以当前线程必须拥有这个对象obj的内在锁,才能调用obj.wait方法,因为每个对象都关联一个内在锁,这也是为什么wait方法是作为Object类的一个方法。

obj.notify、obj.notifyAll方法是唤醒这些等待集合中的一个或者全部,这两个方法也 必须在拥有对象内在锁的线程中调用,调用notify后并不会里面唤醒等待阻塞的线程,而是等待释放了拥有的对象内在锁后,再由调度器进行调度。

我们可以假设有10个线程,第一个线程拥有了某个对象内在锁,然后调用此对象的wait方法进入等待阻塞状态,一直到第10个线程,那么这10个线程都会进入等待集合,此时如果第11个线程拥有了这个对象的内在锁,然后调用notifyAll方法,这10个线程都将会被唤醒,重新进行调度,它们之间 仍然会对这个对象的内在锁进行竞争,同一时刻只能有一个线程会执行。

打断等待阻塞状态除了notify、notifyAll方法,还有

虽然虚假唤醒很少会发生,但是为了避免此种情况,最佳实践是通过线程的一个局部变量标识是否应该等待,然后通过守护循环代码块来执行。

private boolean empty = true;
public synchronized void take() {
  // Wait until empty is false
  while (empty) {
    try {
      wait();
    } catch (InterruptedException e) {}
  }

}

Always invoke wait inside a loop that tests for the condition being waited for. Don't assume that the interrupt was for the particular condition you were waiting for, or that the condition is still true. 《The Java™ Tutorials》

t.interrupt()、t.isInterrupted()、Thread.interrupted()

我们试想一下如何中断线程执行,可能在run方法的循环里面,不断判断一个标记,当这个标记符合某个条件时,就停止循环。Thread类自带了这样的标记,我们可以通过t.interrupt()方法(即设置了中断标记interrupt flag)中断线程,通过Thread.interrupted()这个静态方法检查当前线程的中断标记,返回true或者false,调用这个静态方法后,会重置中断标记,即第二次调用总是会返回false,而t.isInterrupted()仅返回当前中断标记,不会重置。

之所以重置中断标记,应该是希望一个中断请求只希望通知一次,不希望代码中响应被通知两次。如果编写的代码对检查中断只会有一次响应操作,那么使用t.isInterrupted()不去重置中断标记也是可以的。

为了让线程能够中断,我们需要 在线程中自己编写代码来实现中断逻辑,类似这样:

for (int i = 0; i < inputs.length; i++) {
    heavyCrunch(inputs[i]);
    if (Thread.interrupted()) {
        // We've been interrupted: no more crunching.
        return;
    }
}

除了友好的通过Thread.interrupted()检查中断外,wait、sleep、join方法会直接抛出InterruptedException异常, 当异常抛出时,中断标记会被重置为false,我们可以在异常处理中实现中断逻辑:

for (int i = 0; i < importantInfo.length; i++) {
    // Pause for 4 seconds
    try {
        Thread.sleep(4000);
    } catch (InterruptedException e) {
        // We've been interrupted: no more messages.
        return;
    }
    // Print a message
    System.out.println(importantInfo[i]);
}

注意的是,上文提过,如果在wait方法执行的时候,不希望被中断,需要通过守护循环代码块来实现。

t.join

join是当前线程等待阻塞,直到另一个线程t结束,或者超过了join(millis)的指定时间,join也是可以被t.interrupt()中断,抛出InterruptedException异常,如果我们不希望被中断,可以通过守护循环代码实现,其中t.isAlive方法来判断线程是否存活:

t.start();
while (t.isAlive()){
    try{
        t.join();
    } catch (InterruptedException e) {
    }
}

接下来我们通过源码来看看join的原理。

public final synchronized void join(long millis)
throws InterruptedException {
  long base = System.currentTimeMillis();
  long now = 0;

  if (millis < 0) {
    throw new IllegalArgumentException("timeout value is negative");
  }

  if (millis == 0) {
    while (isAlive()) {
      wait(0);
    }
  } else {
    while (isAlive()) {
      long delay = millis - now;
      if (delay <= 0) {
        break;
      }
      wait(delay);
      now = System.currentTimeMillis() - base;
    }
  }
}

在主线程执行中,调用t.join方法,由于这个方法是synchronized修饰的,所以获得了线程t的内在锁,然后相当于调用了t.wait()方法进入等待阻塞状态,有个重要的知识点:当一个线程结束时,总会调用notifyAll()方法,所以当线程t运行结束时,主线程会被唤醒继续执行。这里比较容易迷惑的原因是主线程调用了其它线程Thread对象的wait方法,对Thread对象加锁了。

既然线程结束会调用notifyAll方法,说明拥有了此线程的内在锁,所以如果t.start()方法和t.join方法不在一个线程内执行呢?有没有可能在进入join方法时,因为关键字synchronized获取t的内在锁失败而阻塞?这个问题留作一个思考点,应该和JVM执行线程有关。

Thread.sleep

当前线程睡眠一段时间,睡眠过程中,调度器可能会调度其它线程执行,注意:当前线程不会失去任何锁,睡眠可以被t.interrupt()打断。

推荐使用TimeUnit.sleep方法,而不是使用Thread.sleep,TimeUnit枚举类提供了很多的便捷性,其中还包括时间单位的转换,同时它还提供了等待时间参数的方法的包装,比如timedJoin(Thread thread, long timeout)和timedWait(Object obj, long timeout),我们来看看这个枚举常量有哪些:

NANOSECONDS // 纳秒
MICROSECONDS // 微秒
MILLISECONDS // 毫秒
SECONDS // 秒
MINUTES // 分
HOURS // 时
DAYS // 天

JDK设计中,当需要时间作为参数的时候,习惯使用TimeUnit用作单位,比如参数为:(long time, TimeUnit unit)。当我们自己编写代码遇到时间参数时,可以利用TimeUnit来设计友好的代码。

Thread.yield

暗示调度器当前线程希望让步对处理器的占用,调度器可以忽略这个暗示,yield可以在线程做完了最主要工作后做出让步,常见的应用场景是我们在调试或者测试并发时,通过yield重现某些场景。

总结

熟悉线程类Thread和线程状态转化的方式是学习并发编程和多线程相关算法的基础,synchronized通过关键字提供了一种加锁方式,它是简单的,但不是唯一的,也不一定高效,wait、notify、notifyAll也并不是唯一的等待阻塞和唤醒线程的方式,这些都将在后续文章中详细解读。