这篇文章主要为大家详细介绍了安卓7.0消息队列的相关资料,具有一定的参考价值,感兴趣的小伙伴们可以参考一下
机器人中的消息处理机制大量依赖于处理程序。每个处理者都有对应的Looper,用于不断地从对应的消息队列中取出消息处理。
一直以来,觉得消息队列应该是Java 语言(一种计算机语言,尤用于创建网站)语言(一种计算机语言,尤用于创建网站)层的抽象,然而事实上消息队列的主要部分在当地的层中。
自己对消息队列在当地的层的工作不太熟悉,借此机会分析一下。
一、消息队列的创建
当需要使用尺蠖时,我们会调用尺蠖的准备函数:
公共静态void准备(){
准备(真);
}
私有静态空的准备(布尔quitAllowed) {
if (sThreadLocal.get()!=null) {
抛出新的RuntimeException("每个线程只能创建一个循环");
}
//sThreadLocal为线程本地存储区;每个线程仅有一个尺蠖
sthreadlocal。set(新活套(允许退出));
}
私有Looper(boolean quitAllowed) {
//创建出消息队列
mQueue=新消息队列(允许退出);
mThread=线程。当前线程();
}
一个NativeMessageQueue
我们看看消息队列的构造函数:
消息队列(允许布尔退出){
mQuitAllowed=quitAllowed
//mPtr的类型为龙?
mPtr=native init();
}
消息队列的构造函数中就调用了当地的函数,我们看看安卓_操作系统_消息队列。卡片打印处理机(Card Print Processor的缩写)中的实现:
静态jlong Android _ OS _ message queue _ native init(JNIEnv * env,jclass clazz) {
//消息队列的当地的层实体
NativeMessageQueue * NativeMessageQueue=new NativeMessageQueue();
..
//这里应该类似与将指针转化成长的类型,放在Java 语言(一种计算机语言,尤用于创建网站)语言(一种计算机语言,尤用于创建网站)层保存;估计Java 语言(一种计算机语言,尤用于创建网站)语言(一种计算机语言,尤用于创建网站)层使用时,会在当地的层将长的变成指针,就可以操作队列了
返回reinterpret _ castjlong(nativeMessageQueue);
}
我们跟进NativeMessageQueue的构造函数:
NativeMessageQueue:NativeMessageQueue():
mPollEnv(空),mPollObj(空),mExceptionObj(空){
//创建一个当地的层的Looper,也是线程唯一的
m Looper=Looper:getForThread();
if (mLooper==NULL) {
m活套=新活套(假);
looper:setForThread(m looper);
}
}
从代码来看,本地层和Java 语言(一种计算机语言,尤用于创建网站)语言(一种计算机语言,尤用于创建网站)层均有尺蠖对象,应该都是操作消息队列的消息队列在Java 语言(一种计算机语言,尤用于创建网站)语言(一种计算机语言,尤用于创建网站)层和当地的层有各自的存储结构,分别存储Java 语言(一种计算机语言,尤用于创建网站)语言(一种计算机语言,尤用于创建网站)层和当地的层的消息。
2 Native层的looper
我们看看当地的层尺蠖的构造函数:
Looper:Looper(bool allowNonCallbacks):
mAllowNonCallbacks(allowNonCallbacks),mSendingMessage(false),
mPolling(false),mEpollFd(-1),mEpollRebuildRequired(false),
mNextRequestSeq(0),mResponseIndex(0),mNextMessageUptime(LLONG_MAX) {
//此处创建了个软驱
mWakeEventFd=eventfd(0,EFD _非阻塞| EFD _ clo exec);
.
rebuildEpollLocked();
}
在当地的层中,消息队列中的尺蠖初始化时,还调用了rebuildEpollLocked函数,我们跟进一下:
void Looper:rebuildEpollLocked(){
//如果有旧的epoll实例,请将其关闭。
if (mEpollFd=0) {
关闭(mEpollFd);
}
//分配新的epoll实例并注册唤醒管道。
mEpollFd=EPOLL _ create(EPOLL _ SIZE _ HINT);
..
结构epoll _ event事件项
memset( eventItem,0,sizeof(epoll _ event));//将数据字段联合中未使用的成员清零
事件项目。events=epoll in
事件项目。数据。FD=mWakeEventFd
//在mEpollFd上监听mWakeEventFd上是否有数据到来
int result=epoll_ctl(mEpollFd,EPOLL_CTL_ADD,mWakeEventFd,event item);
..
for(size _ t I=0;我请求。size();i ) {
const Request Request=mrequests。(I)处的值;
结构epoll _ event事件项
request.initEventItem(事件项);
//监听请求对应软驱上数据的到来
int epollResult=EPOLL _ CTL(mEpollFd,EPOLL_CTL_ADD,request.fd,event item);
..
}
}
从当地的层的尺蠖来看,我们知道当地的层依赖于epoll来驱动事件处理。此处我们先保留一下大致的映像,后文详细分析。
二、使用消息队列
1 写入消息
机器人中既可以在Java 语言(一种计算机语言,尤用于创建网站)语言(一种计算机语言,尤用于创建网站)层向消息队列写入消息,也可以在当地的层向消息队列写入消息。我们分别看一下对应的操作流程。
1.1 Java层写入消息
Java 语言(一种计算机语言,尤用于创建网站)语言(一种计算机语言,尤用于创建网站)层向消息队列写入消息,依赖于入队消息函数:
布尔入队消息(消息消息,长时间){
if (msg.target==null) {
抛出新的IllegalArgumentException('消息必须有目标。);
}
if (msg.isInUse()) {
抛出新的IllegalStateException(msg '此消息已在使用中。);
}
同步(这){
如果(退出){
.
返回错误的
}
味精。markin use();
当=当
消息p=消息;
布尔needWake
if(p==null | | when==0 | | when p . when){
//新头,如果阻塞就唤醒事件队列。
味精。next=p;
mMessages=msg .
//在头部插入数据,如果之前消息队列是阻塞的,那么现在需要唤醒
needWake=mBlocked
}否则{
//插入到队列中间。通常我们不用醒来
//向上移动事件队列,除非队列头有障碍物
//并且该消息是队列中最早的异步消息。
need wake=mBlocked p . target==null msg。isa synchronous();
消息上一个
for(;) {
prev=p;
p=p . next
if (p==null || when p.when) {
打破;
}
//不是第一个异步消息时,需要唤醒置为错误的
if(需要唤醒p . isa同步()){
needWake=false
}
}
味精。next=p;//不变量:p==prev.next
prev.next=消息
}
//我们可以假设mPtr!=0,因为m退出为假。
如果(需要唤醒){
原生尾流(mPtr);
}
}
返回真实的
}
上述代码比较简单,主要就是将新加入的消息按执行时间插入到原有的队列中,然后根据情况调用自然觉醒函数。
我们跟进一下nativeAwake:
void NativeMessageQueue:wake(){
m looper-wake();
}
void Looper:wake() {
uint 64 _ t Inc=1;
//就是向mWakeEventFd写入数据
ssize _ t nWrite=TEMP _ FAILURE _ RETRY(write(mWakeEventFd,inc,sizeof(uint 64 _ t)));
..
}
在当地的层的尺蠖初始化时,我们提到过当地的层的尺蠖将利用epoll来驱动事件,其中构造出的epoll句柄就监听了mWakeEventFd。
实际上从消息队列中取出数据时,若没有数据到来,就会利用epoll进行等待;因此当Java 语言(一种计算机语言,尤用于创建网站)语言(一种计算机语言,尤用于创建网站)层写入消息时,将会将唤醒处于等待状态的消息队列。
在后文介绍从消息队列中提取消息时,将再次分析这个问题。
1.2本地层写入消息
当地的层写入消息,依赖于当地的层尺蠖的发送消息函数:
void Looper:sendMessage(const spMessageHandler处理程序,常量消息消息){
nsecs_t now=systemTime(系统时间单调);
sendMessageAtTime(现在,处理程序,消息);
}
void Looper:sendmessage at time(nsecs _ t uptime,const spMessageHandler处理程序,
常量消息消息){
size _ t I=0;
{
AutoMutex _ l(m锁);
//同样需要按时间插入
size _ t消息计数=mmessageenvelopes。size();
while(I消息计数正常运行时间=mmessageenvelopes。第(一)项.正常运行时间){
I=1;
}
//将消息包装成一个邮件信封对象
邮件信封邮件信封(正常运行时间、处理程序、消息);
梅萨吉恩韦罗斯。insertat(消息信封,I,1);
//优化:如果尺蠖当前正在发送消息,那么我们可以跳过
//对唤醒()的调用,因为尺蠖处理后要做的下一件事
//消息决定下一次唤醒时间。事实上,的确如此
//甚至不管这段代码是否运行在循环线程上。
if (mSendingMessage) {
返回;
}
}
//仅当我们在开头将新消息排队时,才唤醒轮询循环。
if (i==0) {
//若插入在队列头部,同样利用叫醒函数触发epoll唤醒
wake();
}
}
以上就是向消息队列中加入消息的主要流程,接下来我们看看从消息队列中取出消息的流程。
2、提取消息
当Java 语言(一种计算机语言,尤用于创建网站)语言(一种计算机语言,尤用于创建网站)层的尺蠖对象调用环函数时,就开始使用消息队列提取消息了:
公共静态空的循环(){
最终Looper me=我的Looper();
.
for(;) {
消息msg=队列。next();//可能会阻止
.
尝试{
//调用消息的处理函数进行处理
味精。目标。调度消息(msg);
}.
}
}
此处我们看看消息队列的然后函数:
下一条消息(){
//mPtr保存了NativeMessageQueue的指针
最终长ptr=mPtr
.
int pendingIdleHandlerCount=-1;//-1仅在第一次迭代期间
int nextPollTimeoutMillis=0;
for(;) {
if (nextPollTimeoutMillis!=0) {
//会调用当地的函数,最终调用IPCThread的与司机交谈,将数据写入粘合剂驱动或者读取一次数据
//不知道在此处进行这个操作的理由?
粘合剂。flushpendingcommands();
}
//处理当地的层的数据,此处会利用epoll进行堵塞的
nativePollOnce(ptr,nextPollTimeoutMillis);
同步(这){
最终长now=系统时钟。正常运行时间millis();
消息prevMsg=null
消息msg=消息
//下面其实就是找出下一个异步处理类型的消息;异步处理类型的消息,才含有对应的执行函数
如果(味精!=null msg.target==null) {
//因障碍而停滞。查找队列中的下一条异步消息。
做{
prevMsg=msg
msg=msg.next
}而(味精!=null!味精。isasynchronous());
}
如果(味精!=null) {
if (now msg.when) {
//下一条消息未准备好。设置一个超时来唤醒它。
nextPollTimeoutMillis=(int)math。最小(消息。当-现在,整数MAX _ VALUE);
}否则{
//收到消息。
mBlocked=false
//完成然后记录的存储
if (prevMsg!=null) {
上一条消息。下一个=msg。接下来;
}否则{
mMessages=msg.next
}
msg.next=null
if(调试)Log.v(标签,'返回消息:' msg ');
味精。markin use();
返回味精
}
}否则{
//没有更多的消息。
nextPollTimeoutMillis=-1;
}
//处理退出消息,因为所有挂起的消息都已处理。
如果(退出){
dispose();
返回空
}
//消息队列中引入了IdleHandler接口,即当消息队列没有数据处理时,调用IdleHandler进行一些工作
//pendingIdleHandlerCount表示待处理的IdleHandler初始为-1
if (pendingIdleHandlerCount 0
(mMessages==null | |现在是mMessages。什么时候){
//mIdleHandlers的大小默认为0,调用接口addIdleHandler才能增加
pendingIdleHandlerCount=midle处理程序。size();
}
if (pendingIdleHandlerCount=0) {
//没有要运行的空闲处理程序。循环,再等一会儿。
mBlocked=true
继续;
}
//将待处理的IdleHandler加入到PendingIdleHandlers中
if(mPendingIdleHandlers==null){
mPendingIdleHandlers=新的空闲处理程序[math。max(pendingIdleHandlerCount,4)];
}
//调用ArrayList.toArray(T[])节省每次分配的开销;毕竟对于消息。然后这样调用频率较高的函数,能省一点就是一点
mPendingIdleHandlers=midle处理程序。to数组(mPendingIdleHandlers);
}
for(int I=0;i pendingIdleHandlerCounti ) {
最终idle handler idler=mPendingIdleHandlers[I];
mPendingIdleHandlers[I]=null;//释放对处理程序的引用
布尔keep=false
尝试{
//执行实现类的队列空闲函数,返回值决定是否继续保留
保持=空转。队列空闲();
} catch (Throwable t) {
Log.wtf(标签,' IdleHandler抛出异常,t);
}
如果(!保持){
同步(这){
mIdleHandlers.remove(惰轮);
}
}
}
pendingIdleHandlerCount=0;
nextPollTimeoutMillis=0;
}
}
整个提取消息的过程,大致上如上图所示。
可以看到在Java 语言(一种计算机语言,尤用于创建网站)语言(一种计算机语言,尤用于创建网站)层,活套除了要取出消息队列的消息外,还会在队列空闲期执行IdleHandler定义的函数。
2.1 nativePollOnce
现在唯一的疑点是nativePollOnce是如何处理当地的层数据的,我们看看对应的当地的函数:
静态void Android _ OS _ message queue _ native poll once(JNIEnv * env,jobject obj,
jlong ptr,jint timeoutMillis) {
//果然Java 语言(一种计算机语言,尤用于创建网站)语言(一种计算机语言,尤用于创建网站)层调用当地的层消息队列时,将长的类型的光电带读数机(光电磁带阅读器)变为指针
NativeMessageQueue * NativeMessageQueue=reinterpret _ castNativeMessageQueue *(ptr);
nativeMessageQueue-poll once(env,obj,time out millis);
}
void NativeMessageQueue:poll once(JNIEnv * env,jobject pollObj,int timeoutMillis) {
mPollEnv=env
mPollObj=pollObj
//最后还是进入到当地的层尺蠖的波洛塞函数
m looper-轮询一次(超时毫秒);
mPollObj=NULL
mPollEnv=NULL
if(mexceptionbj){
..
}
}
看看当地的层尺蠖的波洛塞函数:
//timeoutMillis为超时等待时间。值为-1时,表示无限等待直到有事件到来;值为0时,表示无需等待
//outFd此时为空,含义是:存储产生事件的文件句柄
//outEvents此时为空,含义是:存储outFd上发生了哪些事件,包括可读、可写、错误和中断
//outData此时为空,含义是:存储上下文数据,其实调用时传入的参数
int Looper:poll once(int time out millis,int* outFd,int* outEvents,void** outData) {
int result=0;
for(;) {
//处理回应,目前我们先不关注反应的内含
while(mResponseIndex mresponses。size()){
const Response Response=m responses。itemat(Mr response指数);
int ident=响应。请求。ident
if (ident=0) {
int FD=响应。请求。FD;
int events=response.events
void *数据=响应。请求。数据;
如果(outFd!=NULL)* outFd=FD;
if (outEvents!=NULL)* out events=events;
if (outData!=NULL)* out data=data;
返回标识;
}
}
//根据波利内的结果,进行操作
如果(结果!=0) {
如果(outFd!=NULL)* outFd=0;
if (outEvents!=NULL)* out events=0;
if (outData!=NULL)* out data=NULL;
返回结果;
}
//主力还是靠波利内
结果=内部轮询(超时毫秒);
}
}
跟进一下波利内函数:
int Looper:poll inner(int超时毫秒){
//根据下一条消息的到期时间调整超时。
//timeoutMillis是Java 语言(一种计算机语言,尤用于创建网站)语言(一种计算机语言,尤用于创建网站)层事件等待事件
//本机层维持了本地消息的等待时间
//此处其实就是选择最小的等待时间
if (timeoutMillis!=0 mNextMessageUptime!=LLONG_MAX) {
nsecs_t now=systemTime(系统时间单调);
int messageTimeoutMillis=tomillsecondtimeoutdelay(now,mNextMessageUptime);
如果(messageTimeoutMillis=0
(超时毫秒数0 | | messageTimeoutMillis超时毫秒数)){
time out millis=messageTimeoutMillis;
}
}
int结果=POLL _ WAKE
//轮询器初始就清空反应
回应先生。clear();
mResponseIndex=0;
//我们即将闲置。
mPolling=true
//利用epoll等待mEpollFd监控的句柄上事件到达
struct EPOLL _ event事件项[EPOLL _ MAX _ EVENTS];
int event count=EPOLL _ wait(mEpollFd,eventItems,EPOLL_MAX_EVENTS,超时毫秒);
//不再空转。
mPolling=false
//获取锁。
姆洛克。lock();
//重新调用rebuildEpollLocked时,将使得epoll句柄能够监听新加入请求对应的软驱
if(mepollebuildrequired){
mEpollRebuildRequired=false
rebuildEpollLocked();
转到完成;
}
//检查轮询错误。
if (eventCount 0) {
if (errno==EINTR) {
转到完成;
}
.
结果=轮询_错误
转到完成;
}
//检查轮询超时。
if (eventCount==0) {
结果=轮询超时
转到完成;
}
for(int I=0;如果有{
if (fd==mWakeEventFd) {
if (epollEvents EPOLLIN) {
//前面已经分析过,当Java 语言(一种计算机语言,尤用于创建网站)语言(一种计算机语言,尤用于创建网站)层或当地的层有数据写入队列时,将写mWakeEventFd,以触发epoll唤醒
//唤醒将读取并清空mWakeEventFd上的数据
awoken();
}否则{
..
}
}否则{
//epoll同样监听的请求对应的软驱
ssize _ t请求索引=mrequests。indexofkey(FD);
if (requestIndex=0) {
int events=0;
if(epollEvents epoll in)events |=EVENT _ INPUT;
if(epollEvents EPOLLOUT)events |=EVENT _ OUTPUT;
if(epollEvents EPOLLERR)events |=EVENT _ ERROR;
if(epollEvents EPOLLHUP)events |=EVENT _ hang up;
//存储这个软驱对应的反应
pushResponse(事件,请求。valueat(请求索引));
}否则{
..
}
}
}
完成:
//调用挂起的消息回调。
mNextMessageUptime=LLONG _ MAX
//处理当地的层的消息
while (mMessageEnvelopes.size()!=0) {
nsecs_t now=systemTime(系统时间单调);
const消息信封消息信封=mmessageenvelopes。(0)处的项目;
如果(消息信封。正常运行时间=现在){
//从列表中移除信封。
//我们保持对处理程序的强引用,直到调用处理消息
//完成。然后我们删除它,这样可以在*之前*删除处理程序
//我们重新获得我们的锁。
{
spMessageHandler=消息信封。处理者;
消息消息=消息信封。消息;
梅萨吉恩韦罗斯。在(0)处删除;
mSendingMessage=true
姆洛克。unlock();
//处理本地消息
handler-handleMessage(消息);
}
姆洛克。lock();
mSendingMessage=false
结果=轮询_回调
}否则{
//留在队列头的最后一条消息决定下一次唤醒时间。
mNextMessageUptime=消息信封。正常运行时间;
打破;
}
}
//释放锁定。
姆洛克。unlock();
//处理带回调函数的反应
for(size _ t I=0;我回应。size();i ) {
response response=mresponses。edititemat(一);
如果(回应。请求。ident==POLL _ CALLBACK){
int FD=响应。请求。FD;
int events=response.events
void *数据=响应。请求。数据;
//调用反应的回收
int回调结果=响应。请求。回调处理事件(FD,events,data);
if (callbackResult==0) {
removeFd(fd,响应。请求。seq);
}
回应。请求。回电。clear();
结果=轮询_回调
}
}
返回结果;
}
说实话当地的层的代码写的很乱,该函数的功能比较多。
如上图所示,在nativePollOnce中利用epoll监听是否有数据到来,然后处理本地消息、本地响应。
最后,我们看看如何在当地的层中加入请求。
3添加监控请求
当地的层增加请求依赖于尺蠖的接口addFd:
//fd表示需要监听的句柄
//ident的含义还没有搞明白
//事件表示需要监听的事件,例如事件输入、事件输出、事件错误和事件_挂断中的一个或多个
//回调为事件发生后的回调函数
//数据为回调函数对应的参数
int Looper:addFd(int fd,int ident,int events,Looper_callbackFunc回调,void* data) {
返回addFd(fd,ident,events,callback?新SimpleLooperCallback(回调):空,数据);
}
结合上文当地的层轮询队列的操作,我们大致可以知道:addFd的目的,就是让当地的层的尺蠖监控新加入的软驱上是否有指定事件发生。
如果发生了指定的事件,就利用回调函数及参数构造对应的回应。
当地的层的尺蠖处理反应时,就可以执行对应的回调函数了。
看看实际的代码:
int Looper:addFd(int fd,int ident,int events,const spLooperCallback回调,void* data) {
.
{
AutoMutex _ l(m锁);
//利用参数构造一个请求
请求请求;
request.fd=fd
request.ident=识别
请求.事件=事件;
request.seq=mNextRequestSeq
request.callback=回调;
request.data=data
if(mNextRequestSeq==-1)mNextRequestSeq=0;//保留序列号-1
结构epoll _ event事件项
request.initEventItem(事件项);
//判断之前是否已经利用该软驱构造过请求
ssize _ t请求索引=mrequests。indexofkey(FD);
if (requestIndex 0) {
//mEpollFd新增一个需监听软驱
int epollResult=EPOLL _ CTL(mEpollFd,EPOLL_CTL_ADD,Fd,event item);
.
mRequests.add(fd,request);
}否则{
//mEpollFd修改旧的软驱对应的监听事件
int epollResult=EPOLL _ CTL(mEpollFd,EPOLL_CTL_MOD,Fd,event item);
if (epollResult 0) {
if (errno==ENOENT) {
//容忍ENOENT,因为这意味着旧的文件描述符
//在其回调被注销之前关闭,同时一个新的
//已经创建了具有相同编号的文件描述符,现在是
//第一次注册。
epollResult=epoll_ctl(mEpollFd,EPOLL_CTL_ADD,Fd,event item);
.
}
//当有错误需要重新加入时,安排EpollRebuildLocked,这样会让epollFd重新添加要监控的Fd。
scheduleepolrebuildlocked();
}
mrequests . replace value at(request index,request);
}
}
}
上面介绍pollInner函数时已经分析了join monitoring请求的处理,这里不再赘述。
三、总结
1、流程总结
MessageQueue的整个流程包括Java部分和原生部分。从图中可以看出,原生层的比例还是很大的。我们结合上图回忆一下整个MessageQueue对应的处理流程:
1.在Java层创建Looper对象时,MessageQueue将创建Java层的;当初始化Java层的MessageQueue时,将使用本机函数创建本机层的MessageQueue。
2.原生层的MessageQueue初始化后,会创建相应的原生Looper对象。当本机对象初始化时,将创建相应的epollFd和WakeEventFd。其中epollfd将作为epollFd的监控句柄,最初epollFd只监控WakeEventFd。
3.图中红线是Looper从MessageQueue取消息时处理逻辑的流向。
3.1.当Java层的Looper开始循环时,首先需要通过JNI函数调用Native Looper来执行pollOnce操作。
3.2.本地Looper开始运行后,需要等待epollFd唤醒。当epollFd等待超时或事件到达它正在侦听的句柄时,本地Looper可以开始处理该事件。
3.3.在原生层,原生Looper会先处理原生MessageQueue中的消息,然后调用Response对应的回调函数。
3.4.在这个循环中,处理完原生层事件后,开始处理Java层的MessageQueue的消息。如果MessageQueue中没有要处理的消息,而MessageQueue中有一个IdleHandler,则调用IdleHandler定义的处理函数。
图中的蓝色部分显示了相应的函数调用:
在Java层:
用MessageQueue的addIdleHandler,可以添加IdleHandler到MessageQueue
使用MessageQueue的enqueueMessage,可以将消息添加到message queue;必要时,将使用本地函数向本地层中的WakeEventFd写入消息,以唤醒epollFd。
在Native层:
使用looper:sendMessage,可以将消息添加到原生MessageQueue同样,必要时会向Native层的WakeEventFd写入一条消息,唤醒epollFd;
使用looper:addFd,可以向本地looper注册一个监控请求。监控请求包含要监控的fd、被监控的事件和相应的回调函数等。对应于监控请求的fd将是由epollFd监控的对象。当被监控的fd有相应的事件时,它将唤醒epollFd,相应的响应将被生成并添加到响应列表中进行处理。一旦响应被处理,相应的回调函数将被调用。
2.有关注意事项
MessageQueue在Java层和Native层有自己的存储结构,可以分别添加消息。从处理逻辑来说,会先处理原生层的消息,然后处理原生层生成的响应,最后处理Java层的消息。
这就是本文的全部内容。希望对大家的学习有帮助,支持我们。
郑重声明:本文由网友发布,不代表盛行IT的观点,版权归原作者所有,仅为传播更多信息之目的,如有侵权请联系,我们将第一时间修改或删除,多谢。