Android消息机制

Posted on 18-07-03

Handler,Message,Looper,ActivityThread

关键词 nativePollOnce nativeWake ActivityThread并不是Thread ActivityThread使用attach方法将主进程与AMS服务绑定起来 消息队列并不会阻塞 因为所有的生命周期和UI刷新全部用Handler消息机制执行

Handler

Handler 构造函数


public Handler(Callback callback, boolean async) {

        if (FIND_POTENTIAL_LEAKS) {

            final Class<? extends Handler> klass = getClass();

            if ((klass.isAnonymousClass() || klass.isMemberClass() || klass.isLocalClass()) &&

                    (klass.getModifiers() & Modifier.STATIC) == 0) {

                Log.w(TAG, "The following Handler class should be static or leaks might occur: " +

                    klass.getCanonicalName());

            }

        }



        mLooper = Looper.myLooper();

        if (mLooper == null) {

            throw new RuntimeException(

                "Can't create handler inside thread that has not called Looper.prepare()");

        }

        mQueue = mLooper.mQueue;

        mCallback = callback;

        mAsynchronous = async;

    }

Handler 主要的几个方法


/*

*消息进队

*/

private boolean enqueueMessage(MessageQueue queue, Message msg, long uptimeMillis) {

        msg.target = this;

        if (mAsynchronous) {

            msg.setAsynchronous(true);

        }

        return queue.enqueueMessage(msg, uptimeMillis);

    }

MessageQueue

MessageQueue构造函数


 MessageQueue(boolean quitAllowed) {

        mQuitAllowed = quitAllowed;

        mPtr = nativeInit();

    }

MessageQueue的主要方法


 Message next() {

        // Return here if the message loop has already quit and been disposed.

        // This can happen if the application tries to restart a looper after quit

        // which is not supported.

        final long ptr = mPtr;

        if (ptr == 0) {

            return null;

        }



        int pendingIdleHandlerCount = -1; // -1 only during first iteration

        int nextPollTimeoutMillis = 0;

        for (;;) {

            if (nextPollTimeoutMillis != 0) {

                Binder.flushPendingCommands();

            }



            nativePollOnce(ptr, nextPollTimeoutMillis);



            synchronized (this) {

                // Try to retrieve the next message.  Return if found.

                final long now = SystemClock.uptimeMillis();

                Message prevMsg = null;

                Message msg = mMessages;

                if (msg != null && msg.target == null) {

                    // Stalled by a barrier.  Find the next asynchronous message in the queue.

                    do {

                        prevMsg = msg;

                        msg = msg.next;

                    } while (msg != null && !msg.isAsynchronous());

                }

                if (msg != null) {

                    if (now < msg.when) {

                        // Next message is not ready.  Set a timeout to wake up when it is ready.

                        nextPollTimeoutMillis = (int) Math.min(msg.when - now, Integer.MAX_VALUE);

                    } else {

                        // Got a message.

                        mBlocked = false;

                        if (prevMsg != null) {

                            prevMsg.next = msg.next;

                        } else {

                            mMessages = msg.next;

                        }

                        msg.next = null;

                        if (DEBUG) Log.v(TAG, "Returning message: " + msg);

                        msg.markInUse();

                        return msg;

                    }

                } else {

                    // No more messages.

                    nextPollTimeoutMillis = -1;

                }



                // Process the quit message now that all pending messages have been handled.

                if (mQuitting) {

                    dispose();

                    return null;

                }



                // If first time idle, then get the number of idlers to run.

                // Idle handles only run if the queue is empty or if the first message

                // in the queue (possibly a barrier) is due to be handled in the future.

                if (pendingIdleHandlerCount < 0

                        && (mMessages == null || now < mMessages.when)) {

                    pendingIdleHandlerCount = mIdleHandlers.size();

                }

                if (pendingIdleHandlerCount <= 0) {

                    // No idle handlers to run.  Loop and wait some more.

                    mBlocked = true;

                    continue;

                }



                if (mPendingIdleHandlers == null) {

                    mPendingIdleHandlers = new IdleHandler[Math.max(pendingIdleHandlerCount, 4)];

                }

                mPendingIdleHandlers = mIdleHandlers.toArray(mPendingIdleHandlers);

            }



            // Run the idle handlers.

            // We only ever reach this code block during the first iteration.

            for (int i = 0; i < pendingIdleHandlerCount; i++) {

                final IdleHandler idler = mPendingIdleHandlers[i];

                mPendingIdleHandlers[i] = null; // release the reference to the handler



                boolean keep = false;

                try {

                    keep = idler.queueIdle();

                } catch (Throwable t) {

                    Log.wtf(TAG, "IdleHandler threw exception", t);

                }



                if (!keep) {

                    synchronized (this) {

                        mIdleHandlers.remove(idler);

                    }

                }

            }



            // Reset the idle handler count to 0 so we do not run them again.

            pendingIdleHandlerCount = 0;



            // While calling an idle handler, a new message could have been delivered

            // so go back and look again for a pending message without waiting.

            nextPollTimeoutMillis = 0;

        }

    }

Looper

Looper的生成函数


static final ThreadLocal<Looper> sThreadLocal = new ThreadLocal<Looper>();

    private static Looper sMainLooper;  // guarded by Looper.class



    final MessageQueue mQueue;

    final Thread mThread;

    /*

    * 生成Looper

    */

    private static void prepare(boolean quitAllowed) {

        if (sThreadLocal.get() != null) {

            throw new RuntimeException("Only one Looper may be created per thread");

        }

        sThreadLocal.set(new Looper(quitAllowed));

    }



    /**

     * 生成ActivityThread中的Looper

     */

    public static void prepareMainLooper() {

        prepare(false);

        synchronized (Looper.class) {

            if (sMainLooper != null) {

                throw new IllegalStateException("The main Looper has already been prepared.");

            }

            sMainLooper = myLooper();

        }

    }

Looper主要方法


public static void loop() {

        final Looper me = myLooper();

        if (me == null) {

            throw new RuntimeException("No Looper; Looper.prepare() wasn't called on this thread.");

        }

        final MessageQueue queue = me.mQueue;



        // Make sure the identity of this thread is that of the local process,

        // and keep track of what that identity token actually is.

        Binder.clearCallingIdentity();

        final long ident = Binder.clearCallingIdentity();



        for (;;) {

            Message msg = queue.next(); // might block

            if (msg == null) {

                // No message indicates that the message queue is quitting.

                return;

            }



            // This must be in a local variable, in case a UI event sets the logger

            final Printer logging = me.mLogging;

            if (logging != null) {

                logging.println(">>>>> Dispatching to " + msg.target + " " +

                        msg.callback + ": " + msg.what);

            }



            final long slowDispatchThresholdMs = me.mSlowDispatchThresholdMs;



            final long traceTag = me.mTraceTag;

            if (traceTag != 0 && Trace.isTagEnabled(traceTag)) {

                Trace.traceBegin(traceTag, msg.target.getTraceName(msg));

            }

            final long start = (slowDispatchThresholdMs == 0) ? 0 : SystemClock.uptimeMillis();

            final long end;

            try {

                msg.target.dispatchMessage(msg);

                end = (slowDispatchThresholdMs == 0) ? 0 : SystemClock.uptimeMillis();

            } finally {

                if (traceTag != 0) {

                    Trace.traceEnd(traceTag);

                }

            }

            if (slowDispatchThresholdMs > 0) {

                final long time = end - start;

                if (time > slowDispatchThresholdMs) {

                    Slog.w(TAG, "Dispatch took " + time + "ms on "

                            + Thread.currentThread().getName() + ", h=" +

                            msg.target + " cb=" + msg.callback + " msg=" + msg.what);

                }

            }



            if (logging != null) {

                logging.println("<<<<< Finished to " + msg.target + " " + msg.callback);

            }



            // Make sure that during the course of dispatching the

            // identity of the thread wasn't corrupted.

            final long newIdent = Binder.clearCallingIdentity();

            if (ident != newIdent) {

                Log.wtf(TAG, "Thread identity changed from 0x"

                        + Long.toHexString(ident) + " to 0x"

                        + Long.toHexString(newIdent) + " while dispatching to "

                        + msg.target.getClass().getName() + " "

                        + msg.callback + " what=" + msg.what);

            }



            msg.recycleUnchecked();

        }

    }

遇到主要问题

  • 消息机制是怎么运转起来的?
一般认为消息机制的Handler,Looper,MessageQueue在消息机制中承担的角色分别是Handler是消息的发射器和分发器,Looper是消息机制的发动机,MessageQueue是消息的存储队列。
Looper.loop()会运行一个死循环去MessageQueue取得Message,拿到Message中的Callback然后分发执行。
在loop方法中通过for (;;)死循环调用queue.next()来得到下一个处理的Message示例来执行,当queue.next()返回为null时,loop方法跳出执行,死循环结束。
而MessageQueue的next方法中同样有一个for (;;)死循环去调用nativePollOnce(ptr, nextPollTimeoutMillis);得到MessageQueue中的下一个Message实例。nativePollOnce()方法和另外一个方法nativeWake(mPtr);实现了Linux的epoll的I/O阻塞模型。(Linux中的epoll阻塞模型是经典的阻塞模型之一)
所以之前的想法是错误的 假设消息队列中有1000个消息,那么Looper,loop()方法的死循环执行1000次便跳出的想法是错误的。不管在loop方法中还是queue的next方法中的死循环在应用的生命周期中都不会被跳出,假设有1000个消息,消息全部处理完后,循环阻塞在nativePollOnce()方法中等待唤醒,当有Handler发射一个消息并且执行enqueueMessage方法调用queue.enqueueMessage(msg, uptimeMillis);方法在enqueueMessage中的nativeWake(mPtr)方法使得阻塞释放重新取出消息进行执行。
  • Looper.loop方法会不会阻塞主线程ActivityThread?
简单讲有两个原因
  • epoll模型 当没有消息的时候会epoll.wait,等待句柄写的时候再唤醒,这个时候其实是阻塞的。

  • 所有的ui操作都通过handler来发消息操作。比如屏幕刷新16ms一个消息,你的各种点击事件,所以就会有句柄写操作,唤醒上文的wait操作,所以不会被卡死了。

详细讲
要完全彻底理解这个问题,需要准备以下4方面的知识:Process/Thread,Android Binder IPC,Handler/Looper/MessageQueue消息机制,Linux pipe/epoll机制。总结一下楼主主要有3个疑惑:1.Android中为什么主线程不会因为Looper.loop()里的死循环卡死? 2.没看见哪里有相关代码为这个死循环准备了一个新线程去运转? 3.Activity的生命周期这些方法这些都是在主线程里执行的吧,那这些生命周期方法是怎么实现在死循环体外能够执行起来的?———
(1) Android中为什么主线程不会因为Looper.loop()里的死循环卡死? 这里涉及线程,先说说进程/线程,进程:每个app运行时前首先创建一个进程,该进程是由Zygote fork出来的,用于承载App上运行的各种Activity/Service等组件。进程对于上层应用来说是完全透明的,这也是google有意为之,让App程序都是运行在Android Runtime。大多数情况一个App就运行在一个进程中,除非在AndroidManifest.xml中配置Android:process属性,或通过native代码fork进程。线程:线程对应用来说非常常见,比如每次new Thread().start都会创建一个新的线程。该线程与App所在进程之间资源共享,从Linux角度来说进程与线程除了是否共享资源外,并没有本质的区别,都是一个task_struct结构体,在CPU看来进程或线程无非就是一段可执行的代码,CPU采用CFS调度算法,保证每个task都尽可能公平的享有CPU时间片。有了这么准备,再说说死循环问题:对于线程既然是一段可执行的代码,当可执行代码执行完成后,线程生命周期便该终止了,线程退出。而对于主线程,我们是绝不希望会被运行一段时间,自己就退出,那么如何保证能一直存活呢?简单做法就是可执行代码是能一直执行下去的,死循环便能保证不会被退出,例如,binder线程也是采用死循环的方法,通过循环方式不同与Binder驱动进行读写操作,当然并非简单地死循环,无消息时会休眠。但这里可能又引发了另一个问题,既然是死循环又如何去处理其他事务呢?通过创建新线程的方式。真正会卡死主线程的操作是在回调方法onCreate/onStart/onResume等操作时间过长,会导致掉帧,甚至发生ANR,looper.loop本身不会导致应用卡死。
(2) 没看见哪里有相关代码为这个死循环准备了一个新线程去运转? 事实上,会在进入死循环之前便创建了新binder线程,在代码ActivityThread.main()中:

public static void main(String[] args) {

        //创建Looper和MessageQueue对象,用于处理主线程的消息

        Looper.prepareMainLooper();



        //创建ActivityThread对象

        ActivityThread thread = new ActivityThread(); 



        //建立Binder通道 (创建新线程)

        thread.attach(false);



        Looper.loop(); //消息循环运行

        throw new RuntimeException("Main thread loop unexpectedly exited");

    }

thread.attach(false);便会创建一个Binder线程(具体是指ApplicationThread,Binder的服务端,用于接收系统服务AMS发送来的事件),该Binder线程通过Handler将Message发送给主线程,具体过程可查看 startService流程分析,这里不展开说,简单说Binder用于进程间通信,采用C/S架构。另外,ActivityThread实际上并非线程,不像HandlerThread类,ActivityThread并没有真正继承Thread类,只是往往运行在主线程,该人以线程的感觉,其实承载ActivityThread的主线程就是由Zygote fork而创建的进程。主线程的死循环一直运行是不是特别消耗CPU资源呢? 其实不然,这里就涉及到Linux pipe/epoll机制,简单说就是在主线程的MessageQueue没有消息时,便阻塞在loop的queue.next()中的nativePollOnce()方法里,此时主线程会释放CPU资源进入休眠状态,直到下个消息到达或者有事务发生,通过往pipe管道写端写入数据来唤醒主线程工作。这里采用的epoll机制,是一种IO多路复用机制,可以同时监控多个描述符,当某个描述符就绪(读或写就绪),则立刻通知相应程序进行读或写操作,本质同步I/O,即读写是阻塞的。 所以说,主线程大多数时候都是处于休眠状态,并不会消耗大量CPU资源。
(3) Activity的生命周期是怎么实现在死循环体外能够执行起来的?ActivityThread的内部类H继承于Handler,通过handler消息机制,简单说Handler机制用于同一个进程的线程间通信。Activity的生命周期都是依靠主线程的Looper.loop,当收到不同Message时则采用相应措施:在H.handleMessage(msg)方法中,根据接收到不同的msg,执行相应的生命周期。 比如收到msg=H.LAUNCH_ACTIVITY,则调用ActivityThread.handleLaunchActivity()方法,最终会通过反射机制,创建Activity实例,然后再执行Activity.onCreate()等方法; 再比如收到msg=H.PAUSE_ACTIVITY,则调用ActivityThread.handlePauseActivity()方法,最终会执行Activity.onPause()等方法。 上述过程,我只挑核心逻辑讲,真正该过程远比这复杂。主线程的消息又是哪来的呢?当然是App进程中的其他线程通过Handler发送给主线程,请看接下来的内容:————————————————————————————————————————————–最后,从进程与线程间通信的角度,通过一张图加深大家对App运行过程的理解:

Pic

system_server进程是系统进程,java framework框架的核心载体,里面运行了大量的系统服务,比如这里提供ApplicationThreadProxy(简称ATP),ActivityManagerService(简称AMS),这个两个服务都运行在system_server进程的不同线程中,由于ATP和AMS都是基于IBinder接口,都是binder线程,binder线程的创建与销毁都是由binder驱动来决定的。App进程则是我们常说的应用程序,主线程主要负责Activity/Service等组件的生命周期以及UI相关操作都运行在这个线程; 另外,每个App进程中至少会有两个binder线程 ApplicationThread(简称AT)和ActivityManagerProxy(简称AMP),除了图中画的线程,其中还有很多线程,比如signal catcher线程等,这里就不一一列举。Binder用于不同进程之间通信,由一个进程的Binder客户端向另一个进程的服务端发送事务,比如图中线程2向线程4发送事务;而handler用于同一个进程中不同线程的通信,比如图中线程4向主线程发送消息。结合图说说Activity生命周期,比如暂停Activity,流程如下:线程1的AMS中调用线程2的ATP;(由于同一个进程的线程间资源共享,可以相互直接调用,但需要注意多线程并发问题)线程2通过binder传输到App进程的线程4;线程4通过handler消息机制,将暂停Activity的消息发送给主线程;主线程在looper.loop()中循环遍历消息,当收到暂停Activity的消息时,便将消息分发给ActivityThread.H.handleMessage()方法,再经过方法的调用,最后便会调用到Activity.onPause(),当onPause()处理完后,继续循环loop下去。
private class H extends Handler {},其实例final H mH = new H();通过发送Message的方式完成Activity的生命周期的调用。例如

public final void scheduleStopActivity(IBinder token, boolean showWindow,

                int configChanges) {

            int seq = getLifecycleSeq();

            if (DEBUG_ORDER) Slog.d(TAG, "stopActivity " + ActivityThread.this

                    + " operation received seq: " + seq);

            sendMessage(

                showWindow ? H.STOP_ACTIVITY_SHOW : H.STOP_ACTIVITY_HIDE,

                token, 0, configChanges, seq);

        }

会在ActivityThread中的handleMessage方法中

case STOP_ACTIVITY_SHOW: {

                    Trace.traceBegin(Trace.TRACE_TAG_ACTIVITY_MANAGER, "activityStop");

                    SomeArgs args = (SomeArgs) msg.obj;

                    handleStopActivity((IBinder) args.arg1, true, args.argi2, args.argi3);

                    Trace.traceEnd(Trace.TRACE_TAG_ACTIVITY_MANAGER);

                } break;

Looper 怎么保证的在一个线程中只有一个Looper 的实例的?
在Looper中有这样一个变量

static final ThreadLocal<Looper> sThreadLocal = new ThreadLocal<Looper>();

这里采用ThreadLocal来保证各个线程的Looper的唯一性
ThreadLocal的主要方法有

public T get() {

        Thread t = Thread.currentThread();

        ThreadLocalMap map = getMap(t);

        if (map != null) {

            ThreadLocalMap.Entry e = map.getEntry(this);

            if (e != null) {

                @SuppressWarnings("unchecked")

                T result = (T)e.value;

                return result;

            }

        }

        return setInitialValue();

    }

    

public void set(T value) {

        Thread t = Thread.currentThread();

        ThreadLocalMap map = getMap(t);

        if (map != null)

            map.set(this, value);

        else

            createMap(t, value);

    }

    

static class ThreadLocalMap {}

而在Thread中存在


/* ThreadLocal values pertaining to this thread. This map is maintained

     * by the ThreadLocal class. */

    ThreadLocal.ThreadLocalMap threadLocals = null;

通过这样的方式保证了Looper的唯一性