简述node.js的流的主要作用,nodejs流媒体
本篇文章带大家聊聊节点。射流研究…中的网络与流,涉及的知识点有ibuv中网络的实现、BSD套接字、UNIX域协议使用等,下面一起来看看吧!
节点。射流研究…极速入门课程:进入学习
【推荐学习: 《nodejs 教程》 】
涉及的知识点
libuv中网络的实现libuv解决接受(EMFILE错误)BSD套接字SOCKADDR_INUNIX域协议使用!在进程间传递"文件描述符"
例子 tcp-echo-server/main.c
libuv异步使用加州大学伯克利分校软件(伯克利软件发行版)套接字的例子
int main() {
loop=uv _ default _ loop();
uv_tcp_t服务器;
uv_tcp_init(loop,server);
uv_ip4_addr(0.0.0.0 ,DEFAULT_PORT,addr);
uv_tcp_bind(server,(const struct sockaddr*)addr,0);
int r=uv _ listen((uv _ stream _ t *)server,DEFAULT_BACKLOG,on _ new _ connection);
如果(r) {
fprintf(stderr, Listen error %s\n ,uv _ strerror(r));
返回1;
}
返回uv_run(loop,UV _ RUN _ DEFAULT);
}
void on _ new _ connection(uv _ stream _ t * server,int status) {
如果(状态0) {
fprintf(stderr,新连接错误“%s\n”,uv _ strerror(status));
//错误!
返回;
}
uv _ TCP _ t * client=(uv _ TCP _ t *)malloc(sizeof(uv _ TCP _ t));
uv_tcp_init(loop,client);
if (uv_accept(server,(uv_stream_t*) client)==0) {
uv _ read _ start((uv _ stream _ t *)client,alloc_buffer,echo _ read);
}
同步的例子
这是一个正常同步使用加州大学伯克利分校软件(伯克利软件发行版)套接字的例子。
作为参照可以发现主要有如下几步
首先调用插座()为通讯创建一个端点,为套接字返回一个文件描述符。
接着调用绑定()为一个套接字分配地址。当使用插座()创建套接字后,只赋予其所使用的协议,并未分配地址。在接受其它主机的连接前,必须先调用绑定()为套接字分配一个地址。
当窝和一个地址绑定之后,再调用听()函数会开始监听可能的连接请求。
最后调用接受,当应用程序监听来自其他主机的面对数据流的连接时,通过事件(比如Unix操作系统选择()系统调用)通知它。必须用接受()函数初始化连接接受()为每个连接创立新的套接字并从监听队列中移除这个连接。
int main(void)
{
struct sockaddr _ in stSockAddr
int SocketFD=socket(PF_INET,SOCK_STREAM,IP proto _ TCP);
if(-1==SocketFD)
{
perror(’无法创建套接字);
退出(退出_失败);
}
memset(stSockAddr,0,sizeof(struct sockaddr _ in));
stSockAddr.sin _ family=AF _ INET
stsockaddr。sin _ port=htons(1100);
stsockaddr。sin _ addr。s _ addr=in addr _ ANY
if(-1==bind(SocketFD,(const struct sockaddr *)stSockAddr,sizeof(struct sockaddr_in)))
{
perror(’错误绑定失败);
关闭(插座FD);
退出(退出_失败);
}
if(-1==listen(SocketFD,10))
{
perror(“错误侦听失败");
关闭(插座FD);
退出(退出_失败);
}
for(;)
{
int ConnectFD=accept(SocketFD,NULL,NULL);
如果(0连接功能)
{
perror(’错误接受失败);
关闭(插座FD);
退出(退出_失败);
}
/*执行读写操作.*/
shutdown(ConnectFD,SHUT _ RDWR);
关闭(连接FD);
}
关闭(插座FD);
返回0;
}
uv_tcp_init
主uv_tcp_init
1、对领域进行了验证,需要是下面3种的一种
AF_INET表示IPv4网络协议AF_INET6表示IPv6AF_UNSPEC表示适用于指定主机名和服务名且适合任何协议族的地址2、tcp也是一种流,调用uv _ _流_初始化对流数据进行初始化
int uv_tcp_init(uv_loop_t* loop,uv_tcp_t* tcp) {
返回uv_tcp_init_ex(loop,tcp,AF _ UNSPEC);
}
int uv _ TCP _ init _ ex(uv _ loop _ t * loop,uv_tcp_t* tcp,unsigned int flags) {
(同Internationalorganizations)国际组织域;
/*对域使用低8位*/
domain=标志0xFF
如果(域!=AF_INET域!=AF_INET6域!=AF_UNSPEC)
返回UV _ EINVAL
如果(标志~0xFF)
返回UV _ EINVAL
uv__stream_init(loop,(uv_stream_t*)tcp,UV _ TCP);
.
返回0;
}
uv__stream_init
main uv _ TCP _ init uv _ _ stream _ init
stream的初始化函数用的地方很多,也很重要。下面i/o的完整实现,请参考【libuv源码学习笔记】线程池和I/O。
1.初始化将调用流的回调函数。
比如read_cb函数,本例中on_new_connection uv_read_start函数实际上会将read_cb设置为用户传入的参数echo_read,其调用时间为流上设置的io_watcher.fd有数据写入时,在事件循环阶段被epoll捕获。alloc_cb函数的调用过程与read_cb相同,alloc类型的函数一般会设置目前要读取的内容的长度。传输流数据时,通常先写传输数据的长度,再写具体内容,主要是为了接收方合理申请内存进行存储。比如grpc,线程加载器都有详细的应用。Close_cb函数在流数据结束或发生错误时调用。connection_cb函数类似于本例中的tcp流,在accept接收到新连接时调用。在本例中,on_new_connectionconnect_req结构主要用于挂载tcp客户端的连接回调等数据。shutdown_req结构主要用于在流式销毁时挂载回调等数据。Accepted _当accept接收到一个新的连接时,存储accept返回的connect FD(socket FD,NULL,NULL)。Ued _ FDS用于存储等待处理的连接,主要用于节点集群cluster的实现。//排队_fds
1.当通过ipc接收其他进程写入的数据时,调用uv__stream_recv_cmsg函数。
2.UV _ _ stream _ recv _ cmsg函数读取进程传递的fd引用,并调用uv__stream_queue_fd函数保存它。
3.queued_fds主要消耗在src/stream _ wrap . cc libuvstreamrap:onuvread accept handle函数中。2.uv__open_cloexec方法专门为loop-emfile_fd创建了一个指向空文件(/dev/null)的idlefd文件描述符,通过跟踪发现,原来是为了解决accept (EMFILE错误)。我们在讲uv__accept的时候再详细讲一下这个循环的妙用——em file _ FD。
3.通过调用uv__io_init初始化的流的i/o观察器的回调函数是uv__stream_io。
void uv _ _ stream _ init(uv _ loop _ t * loop,
uv_stream_t* stream,
uv_handle_type type) {
int err
uv__handle_init(loop,(uv_handle_t*)stream,type);
stream-read _ CB=NULL;
stream-alloc _ CB=NULL;
stream-close _ CB=NULL;
流-连接_cb=空;
stream-connect _ req=NULL;
stream-shut down _ req=NULL;
stream-accepted _ FD=-1;
stream-queued _ FDS=NULL;
stream-delayed _ error=0;
QUEUE_INIT(流写队列);
QUEUE_INIT(流写完成队列);
流写队列大小=0;
if (loop-emfile_fd==-1) {
err=uv _ _ open _ clo exec(/dev/null ,O _ RDONLY);
如果(错误0)
/*在“/dev/null”没有打开“/”的极少数情况下
*相反。
*/
err=uv__open_cloexec(/,O _ rd only);
if (err=0)
loop-em file _ FD=err;
}
#如果定义了(__APPLE__)
stream-select=NULL;
#endif /*已定义(__APPLE_) */
uv__io_init(stream-io_watcher,uv__stream_io,-1);
}
uv__open_cloexec
main uv _ TCP _ init uv _ _ stream _ init uv _ _ open _ clo exec
同步调用open方法获取fd。也许你会问为什么不像【libuv源码学习笔记】那样线程池用I/O中的调用uv_fs_open异步获取fd,其实并不是所有的libuv实现都是异步的。例如,在当前示例中,在启动tcp服务之前进行一些初始化是可以接受的,而不是在用户请求期间执行任务。
int uv _ _ open _ clo exec(const char * path,int flags) {
#如果已定义(O_CLOEXEC)
int fd
fd=open(path,flags O _ clo exec);
如果(fd==-1)
返回UV _ _ ERR(errno);
返回FD;
#else /* O_CLOEXEC */
int err
int fd
fd=open(路径,标志);
如果(fd==-1)
返回UV _ _ ERR(errno);
err=uv__cloexec(fd,1);
如果(错误){
uv _ _ close(FD);
返回err
}
返回FD;
#endif /* O_CLOEXEC */
}
uv__stream_io
main uv _ TCP _ init uv _ _ stream _ init uv _ _ stream _ io
双工流的输入输出观察者回调函数,如调用的流连接请求函数,其值是例子中紫外线_听函数的最后一个参数在新连接上。
当发生POLLIN POLLERR POLLHUP事件时:该软驱有可读数据时调用uv__read函数
当发生POLLOUT POLLERR POLLHUP事件时:该软驱有可读数据时调用uv__write函数
静态void uv _ _ stream _ io(uv _ loop _ t * loop,uv__io_t* w,unsigned int events) {
紫外线流t *流
stream=container_of(w,uv_stream_t,io _ watcher);
assert(stream-type==UV_TCP
stream-type==UV_NAMED_PIPE
stream-type==UV _ TTY);
断言(!(流标志UV _ HANDLE _ CLOSING));
if (stream-connect_req) {
紫外_ _流_连接(流);
返回;
}
assert(uv _ _ stream _ FD(stream)=0);
if(events(POLLIN POLLERR POLLHUP))
uv__read(流);
if (uv__stream_fd(stream)==-1)
返回;/*读取_cb关闭流。*/
如果(事件轮询)
(流标志UV _ HANDLE _读数)
(流标志UV_HANDLE_READ_PARTIAL
!(流标志UV_HANDLE_READ_EOF)) {
uv_buf_t buf={ NULL,0 };
uv__stream_eof(stream,buf);
}
if (uv__stream_fd(stream)==-1)
返回;/*读取_cb关闭流。*/
if(events(poll out POLLERR poll hup)){
uv__write(流);
uv__write_callbacks(流);
/*写队列已清空。*/
if(QUEUE _ EMPTY(stream-write _ QUEUE))
uv__drain(流);
}
}
uv_ip4_addr
主uv _ ip4 _地址
uv _ ip4 _地址用于将人类可读的互联网协议(互联网协议)地址、端口对转换为加州大学伯克利分校软件(伯克利软件发行版)套接字应用程序接口所需的sockaddr_in结构。
int uv_ip4_addr(const char* ip,int port,struct sockaddr_in* addr) {
memset(addr,0,sizeof(* addr));
addr-sin _ family=AF _ INET;
addr-sin _ port=htons(port);
#ifdef SIN6_LEN
addr-sin _ len=sizeof(* addr);
#endif
返回uv_inet_pton(AF_INET,ip,(addr-sin _ addr。s _ addr));
}
uv_tcp_bind
主uv_tcp_bind
从uv _ ip4 _地址函数的实现,其实是在地址的罪恶家庭上面设置值为AF_INET,但在uv_tcp_bind函数里面却是从地址的sa _家庭属性上面取的值,这让c初学者的我又陷入了一阵思考.
原来是这样,这里通过强制指针类型转换const struct sockaddr* addr达到的目的,函数的最后调用了uv__tcp_bind函数。
int uv_tcp_bind(uv_tcp_t* handle,
const struct sockaddr* addr,
无符号整数标志){
无符号整数地址;
if (handle-type!=UV_TCP)
返回UV _ EINVAL
if (addr-sa_family==AF_INET)
addrlen=sizeof(struct sockaddr _ in);
else if(addr-sa _ family==AF _ inet 6)
addrlen=sizeof(struct sockaddr _ in6);
其他
返回UV _ EINVAL
返回uv__tcp_bind(handle,addr,addrlen,flags);
}
uv__tcp_bind
main uv _ TCP _ bind uv _ _ TCP _ bind
调用也许_新_插座,如果当前未设置socketfd,则调用新插座获取
调用套接字选项用于为指定的套接字设定一个特定的套接字选项
调用约束为一个套接字分配地址。当使用插座()创建套接字后,只赋予其所使用的协议,并未分配地址。
int uv__tcp_bind(uv_tcp_t* tcp,
const struct sockaddr* addr,
无符号整数地址,
无符号整数标志){
int错误
int on
/*无法在非IPv6套接字上设置仅IPv6模式。*/
如果(仅标志UV _ TCP _ IPv6)addr-sa _ family!=AF_INET6)
返回UV _ EINVAL
err=maybe_new_socket(tcp,addr-sa_family,0);
如果(错误)
返回犯罪
on=1;
if(setsockopt(TCP-io _ watcher。FD,SOL_SOCKET,SO_REUSEADDR,on,sizeof(on)))
返回UV _ _ ERR(errno);
.
errno=0;
if (bind(tcp-io_watcher.fd,addr,addrlen) errno!=EADDRINUSE) {
if (errno==EAFNOSUPPORT)
返回UV _ EINVAL
返回UV _ _ ERR(errno);
}
.
}
new_socket
主uv _ TCP _ bind uv _ _ TCP _ bind maybe _ new _ socket new _ socket
通过uv _ _插座其本质调用窝获取到套接字
调用uv__stream_open设置流输入输出观察的软驱为步骤一拿到的套接字
static int new _ socket(uv _ TCP _ t * handle,int domain,unsigned long flags) {
struct sockaddr _ storage saddr
socklen _ t slen
int sockfd
int错误
err=uv__socket(domain,SOCK_STREAM,0);
如果(错误0)
返回犯罪
sockfd=err
err=uv _ _ stream _ open((uv _ stream _ t *)handle,sockfd,flags);
.
返回0;
}
uv__stream_open
main uv _ TCP _ bind uv _ _ TCP _ bind maybe _ new _ socket new _ socket uv _ _ stream _ open
主要用于设置stream-io_watcher.fd为参数传入的fd。
int uv _ _ stream _ open(uv _ stream _ t * stream,int fd,int flags) {
#如果定义了(__苹果_ _)
(同Internationalorganizations)国际组织使能;
#endif
如果(!(stream-io _ watcher。FD==-1 流-io _ watcher。FD==FD))
返回电子商务
断言(FD=0);
stream-flags =flags;
if (stream-type==UV_TCP) {
if((stream-flags UV _ HANDLE _ TCP _ NODELAY)UV _ _ TCP _ NODELAY(FD,1))
返回UV _ _ ERR(errno);
/* TODO使用用户传入的延迟。*/
如果(流标志UV_HANDLE_TCP_KEEPALIVE)
uv__tcp_keepalive(fd,1,60)) {
返回UV _ _ ERR(errno);
}
}
#如果定义了(__苹果_ _)
enable=1;
if (setsockopt(fd,SOL_SOCKET,SO_OOBINLINE,enable,sizeof(enable))
不对!=ENOTSOCK
不对!=EINVAL) {
返回UV _ _ ERR(errno);
}
#endif
流io观察器。FD=FD
返回0;
}
uv_listen
主uv _听
主要调用了uv _ tcp _监听函数。
int uv _ listen(uv _ stream _ t * stream,int backlog,uv_connection_cb cb) {
int错误
err=错误_无效_参数;
开关(流型){
案例UV_TCP:
err=uv _ TCP _ listen((uv _ TCP _ t *)stream,backlog,CB);
打破;
案例紫外线_命名管道:
err=uv _ pipe _ listen((uv _ pipe _ t *)stream,backlog,CB);
打破;
默认值:
断言(0);
}
返回uv _ translate _ sys _ error(err);
}
uv_tcp_listen
main uv _ listen uv _ TCP _ listen
调用听开始监听可能的连接请求
挂载例子中传入的回调在新连接上
暴力改写输入输出观察者的回调,在上面的uv _ _流_初始化函数中,通过uv__io_init设置了输入输出观察者的回调为uv__stream_io,作为普通的双工流是适用的,这里传输控制协议流直接通过TCP-io _ watcher。CB=uv _ _ server _ io赋值语句设置输入输出观察者回调为uv _ _服务器_io
调用uv__io_start注册输入输出观察者,开始监听工作。
int uv_tcp_listen(uv_tcp_t* tcp,int backlog,uv _ connection _ CB){
.
if (listen(tcp-io_watcher.fd,backlog))
返回UV _ _ ERR(errno);
TCP-connection _ CB=CB;
TCP-flags =UV _ HANDLE _ BOUND;
/*开始监听连接。*/
TCP-io _ watcher。CB=uv _ _ server _ io
uv__io_start(tcp-loop,tcp-io_watcher,POLLIN);
返回0;
}
uv__server_io
main uv _ listen uv _ TCP _ listen uv _ _ server _ io
传输控制协议流的输入输出观察者回调函数
调用uv _ _接受,拿到该连接的连接f
此时如果出现了上面uv _ _流_初始化时说的接受(EMFILE错误), 则调用uv__emfile_trick函数
把步骤一拿到的连接f挂载在了流接受_fd上面
调用例子中传入的回调在新连接上
void uv _ _ server _ io(uv _ loop _ t * loop,uv__io_t* w,unsigned int events) {
.
while (uv__stream_fd(stream)!=-1) {
assert(stream-accepted _ FD==-1);
err=uv _ _ accept(uv _ _ stream _ FD(stream));
if (err 0) {
if(ERR==UV _ EAGAIN ERR==UV _ _ ERR(EWOULDBLOCK))
返回;/*不是错误。*/
if(err==UV _ econ abled)
继续;/*忽略。我们对此无能为力。*/
if(err==UV _ em file err==UV _ ENFILE){
err=uv__emfile_trick(loop,uv _ _ stream _ FD(stream));
if(ERR==UV _ EAGAIN ERR==UV _ _ ERR(EWOULDBLOCK))
打破;
}
stream-connection_cb(stream,err);
继续;
}
UV_DEC_BACKLOG(w)
stream-accepted _ FD=err;
stream-connection_cb(stream,0);
.
}
uv__emfile_trick
main uv _ listen uv _ TCP _ listen uv _ _ server _ io uv _ _ em file _ trick
在上面的uv _ _流_初始化函数中,我们发现环的emfile_fd属性上通过uv__open_cloexec方法创建一个指向空文件(/dev/null)的idlefd文件描述符。
当出现接受(EMFILE错误)即文件描述符用尽时的错误时
static int uv _ _ em file _ trick(uv _ loop _ t * loop,int accept_fd) {
int错误
int emfile _ fd
if (loop-emfile_fd==-1)
返回UV _ EMFILE
uv _ _ close(loop-em file _ FD);
loop-em file _ FD=-1;
做{
err=uv _ _ accept(accept _ FD);
if (err=0)
uv _ _ close(err);
} while(err=0 err==UV _ EINTR);
emfile_fd=uv__open_cloexec(/,仅限O _ rd);
if (emfile_fd=0)
loop-em file _ FD=em file _ FD;
返回犯罪
}
on_new_connection
当收到一个新连接,例子中的在新连接上函数被调用
通过uv_tcp_init初始化了一个传输控制协议客户端流
调用紫外线_接受函数
void on _ new _ connection(uv _ stream _ t * server,int status) {
如果(状态0) {
fprintf(stderr,新连接错误“%s\n”,uv _ strerror(status));
//错误!
返回;
}
uv _ TCP _ t * client=(uv _ TCP _ t *)malloc(sizeof(uv _ TCP _ t));
uv_tcp_init(loop,client);
if (uv_accept(server,(uv_stream_t*) client)==0) {
uv _ read _ start((uv _ stream _ t *)client,alloc_buffer,echo _ read);
}
uv_accept
开_新_连接紫外线_接受
根据不同的协议调用不同的方法,该例子传输控制协议调用uv__stream_open方法
uv__stream_open设置给初始化完成的客户流设置了输入输出观察者的fd。该软驱即是uv _ _服务器_io中提到的连接FD。
int uv _ accept(uv _ stream _ t * server,uv_stream_t* client) {
int错误
assert(server-loop==client-loop);
if (server-accepted_fd==-1)
返回UV _ EAGAIN
交换机(客户端类型){
案例紫外线_命名管道:
案例UV_TCP:
err=uv__stream_open(客户端,
服务器接受的_fd,
UV _ HANDLE _ READABLE UV _ HANDLE _ WRITABLE);
如果(错误){
/* TODO句柄错误*/
uv _ _ close(server-accepted _ FD);
转到完成;
}
打破;
案例UV_UDP:
err=uv _ UDP _ open((uv _ UDP _ t *)client,server-accepted _ FD);
如果(错误){
uv _ _ close(server-accepted _ FD);
转到完成;
}
打破;
默认值:
返回UV _ EINVAL
}
client-flags =UV _ HANDLE _ BOUND;
完成:
/*处理排队的FD */
if (server-queued_fds!=NULL) {
uv _ _ stream _ queued _ FDS _ t * queued _ FDS;
queued_fds=服务器排队_ FDS;
/*先读*/
server-accepted _ FD=queued _ FDS-FDS[0];
/*全部免费阅读*/
assert(排队_ FDS-偏移量0);
if ( - queued_fds-offset==0) {
uv _ _ free(排队_ FDS);
服务器排队_ FDS=空;
}否则{
/*换档休息*/
memmove(queued_fds-fds,
queued_fds-fds 1,
排队_ FDS-offset * sizeof(*排队_ FDS-FDS));
}
}否则{
server-accepted _ FD=-1;
if (err==0)
uv__io_start(server-loop,server-io_watcher,POLLIN);
}
返回犯罪
}
uv_read_start
开_新_连接紫外线_读_开始
开启一个流的监听工作
挂载回调函数read_cb为例子中的回显_读取当流有数据写入时被调用
挂载回调函数alloc_cb为例子中的分配缓冲区
调用uv__io_start函数,这可是老朋友了,通常用在uv__io_init初始化输入输出观察者后面,用于注册输入输出观察者。
紫外线_读取_开始主要是调用了uv__read_start函数。开始了普通流的输入输出过程。
初始化输入输出观察者在uv _ TCP _ init uv _ TCP _ init _ ex uv _ _ stream _ init uv _ _ io _ init设置其观察者回调函数为uv__stream_io注册输入输出观察者为uv__io_start开始监听工作。int uv _ _ read _ start(uv _ stream _ t * stream,
uv_alloc_cb
uv_read_cb read_cb) {
assert(stream-type==UV _ TCP stream-type==UV _ NAMED _ PIPE
stream-type==UV _ TTY);
/*紫外线_处理_阅读标志与传输控制协议的状态无关,它只是
*表示用户期望的状态。
*/
stream-flags =UV _ HANDLE _ READING;
/* TODO:尝试内联读取?*/
/* TODO:跟踪传输控制协议状态。如果我们得到了一个EOF,那么我们应该
*不启动超正析象管(Image Orthicon)观察器。
*/
assert(uv _ _ stream _ FD(stream)=0);
assert(alloc _ CB);
stream-read _ CB=read _ CB;
stream-alloc _ CB=alloc _ CB;
uv__io_start(stream-loop,stream-io_watcher,POLLIN);
uv__handle_start(流);
uv _ _ stream _ OS x _ interrupt _ select(stream);
返回0;
}
小结
uv_tcp_init初始化tcp服务器句柄,其绑定的fd是Socket返回的socketFD。Uv_tcp_bind调用bind为套接字分配一个地址。uv_listen调用listen开始监听可能的连接请求。uv_accept调用accept来接收新的连接。uv_tcp_init初始化tcp客户端句柄,其绑定fd为accept返回的accept FD。剩下的就是一个普通流的读写i/o观察。有关编程的更多信息,请访问:编程视频!以上是Node.js中关于网络和流程的详细内容,更多内容请关注我们的其他相关文章!
郑重声明:本文由网友发布,不代表盛行IT的观点,版权归原作者所有,仅为传播更多信息之目的,如有侵权请联系,我们将第一时间修改或删除,多谢。