




版權(quán)說(shuō)明:本文檔由用戶提供并上傳,收益歸屬內(nèi)容提供方,若內(nèi)容存在侵權(quán),請(qǐng)進(jìn)行舉報(bào)或認(rèn)領(lǐng)
文檔簡(jiǎn)介
第深入理解Golangchannel的應(yīng)用目錄前言整體結(jié)構(gòu)創(chuàng)建發(fā)送接收關(guān)閉
前言
channel是用于goroutine之間的同步、通信的數(shù)據(jù)結(jié)構(gòu)
channel的底層是通過(guò)mutex來(lái)控制并發(fā)的,但它為程序員提供了更高一層次的抽象,封裝了更多的功能,這樣并發(fā)編程變得更加容易和安全,得以讓程序員把注意力留到業(yè)務(wù)上去,提升開(kāi)發(fā)效率
channel的用途包括但不限于以下幾點(diǎn):
協(xié)程間通信,同步定時(shí)任務(wù):和timer結(jié)合解耦生產(chǎn)方和消費(fèi)方,實(shí)現(xiàn)阻塞隊(duì)列控制并發(fā)數(shù)
本文將介紹channel的底層原理,包括數(shù)據(jù)結(jié)構(gòu),channel的創(chuàng)建,發(fā)送,接收,關(guān)閉的實(shí)現(xiàn)邏輯
整體結(jié)構(gòu)
Gochannel的數(shù)據(jù)結(jié)構(gòu)如下所示:
typehchanstruct{
qcountuint//totaldatainthequeue
dataqsizuint//sizeofthecircularqueue
bufunsafe.Pointer//pointstoanarrayofdataqsizelements
elemsizeuint16
closeduint32
elemtype*_type//elementtype
sendxuint//sendindex
recvxuint//receiveindex
recvqwaitq//listofrecvwaiters
sendqwaitq//listofsendwaiters
lockmutex
qcount:已經(jīng)存儲(chǔ)了多少個(gè)元素
dataqsie:最多存儲(chǔ)多少個(gè)元素,即緩沖區(qū)容量
buf:指向緩沖區(qū)的位置,實(shí)際上是一個(gè)數(shù)組
elemsize:每個(gè)元素占多大空間
closed:channel能夠關(guān)閉,這里記錄其關(guān)閉狀態(tài)
elemtype:保存數(shù)據(jù)的類(lèi)型信息,用于go運(yùn)行時(shí)使用
sendx,recvx:
記錄下一個(gè)要發(fā)送到的位置,下一次從哪里還是接收這里用數(shù)組模擬隊(duì)列,這兩個(gè)變量即表示隊(duì)列的隊(duì)頭,隊(duì)尾因此channel的緩沖也被稱(chēng)為環(huán)形緩沖區(qū)
recvq,sendq:
當(dāng)發(fā)送個(gè)接收不能立即完成時(shí),需要讓協(xié)程在channel上等待,所以有兩個(gè)等待隊(duì)列,分別針對(duì)接收和發(fā)送
lock:channel支持協(xié)程間并發(fā)訪問(wèn),因此需要一把鎖來(lái)保護(hù)
創(chuàng)建
創(chuàng)建channel會(huì)被編譯器編譯為調(diào)用makechan函數(shù)
//無(wú)緩沖通道
ch1:=make(chanint)
//有緩沖通道
ch2:=make(chanint,10)
會(huì)根據(jù)創(chuàng)建的是帶緩存,還是無(wú)緩沖,決定第二個(gè)參數(shù)size的值
可以看出,創(chuàng)建出來(lái)的是hchan指針,這樣就能在函數(shù)間直接傳遞channel,而不用傳遞channel的指針
funcmakechan(t*chantype,sizeint)*hchan{
elem:=t.elem
//mem:緩沖區(qū)大小
mem,overflow:=math.MulUintptr(elem.size,uintptr(size))
ifoverflow||memmaxAlloc-hchanSize||size0{
panic(plainError("makechan:sizeoutofrange"))
varc*hchan
switch{
//緩沖區(qū)大小為空,只申請(qǐng)hchanSize大小的內(nèi)存
casemem==0:
c=(*hchan)(mallocgc(hchanSize,nil,true))
c.buf=c.raceaddr()
//元素類(lèi)型不包含指針,一次性分配hchanSize+mem大小的內(nèi)存
caseelem.ptrdata==0:
c=(*hchan)(mallocgc(hchanSize+mem,nil,true))
c.buf=add(unsafe.Pointer(c),hchanSize)
//否則就是帶緩存,且有指針,分配兩次內(nèi)存
default:
//Elementscontainpointers.
c=new(hchan)
c.buf=mallocgc(mem,elem,true)
//保存元素類(lèi)型,元素大小,容量
c.elemsize=uint16(elem.size)
c.elemtype=elem
c.dataqsiz=uint(size)
lockInit(c.lock,lockRankHchan)
returnc
發(fā)送
執(zhí)行以下代碼時(shí):
ch-3
編譯器會(huì)轉(zhuǎn)化為對(duì)chansend的調(diào)用
funcchansend(c*hchan,epunsafe.Pointer,blockbool,callerpcuintptr)bool{
//如果channel是空
ifc==nil{
//非阻塞,直接返回
if!block{
returnfalse
//否則阻塞當(dāng)前協(xié)程
gopark(nil,nil,waitReasonChanSendNilChan,traceEvGoStop,2)
throw("unreachable")
//非阻塞,沒(méi)有關(guān)閉,且容量滿了,無(wú)法發(fā)送,直接返回
if!blockc.closed==0full(c){
returnfalse
//加鎖
lock(c.lock)
//如果已經(jīng)關(guān)閉,無(wú)法發(fā)送,直接panic
ifc.closed!=0{
unlock(c.lock)
panic(plainError("sendonclosedchannel"))
//從接收隊(duì)列彈出一個(gè)協(xié)程的包裝結(jié)構(gòu)sudog
ifsg:=c.recvq.dequeue();sg!=nil{
//如果能彈出,即有等到接收的協(xié)程,說(shuō)明:
//該channel要么是無(wú)緩沖,要么緩沖區(qū)為空,不然不可能有協(xié)程在等待
//將要發(fā)送的數(shù)據(jù)拷貝到該協(xié)程的接收指針上
send(c,sg,ep,func(){unlock(c.lock)},3)
returntrue
//緩沖區(qū)還有空間
ifc.qcountc.dataqsiz{
//qp:計(jì)算要發(fā)送到的位置的地址
qp:=chanbuf(c,c.sendx)
//將數(shù)據(jù)從ep拷貝到qp
typedmemmove(c.elemtype,qp,ep)
//待發(fā)送位置移動(dòng)
c.sendx++
//由于是數(shù)組模擬隊(duì)列,sendx到頂了需要?dú)w零
ifc.sendx==c.dataqsiz{
c.sendx=0
//緩沖區(qū)數(shù)量++
c.qcount++
unlock(c.lock)
returntrue
//往下就是緩沖區(qū)無(wú)數(shù)據(jù),也沒(méi)有等到接收協(xié)程的情況了
//如果是非阻塞模式,直接返回
if!block{
unlock(c.lock)
returnfalse
//將當(dāng)前協(xié)程包裝成sudog,阻塞到channel上
gp:=getg()
mysg:=acquireSudog()
mysg.releasetime=0
ift0!=0{
mysg.releasetime=-1
mysg.elem=ep
mysg.waitlink=nil
mysg.g=gp
mysg.isSelect=false
mysg.c=c
gp.waiting=mysg
gp.param=nil
//當(dāng)前協(xié)程進(jìn)入發(fā)送等待隊(duì)列
c.sendq.enqueue(mysg)
atomic.Store8(gp.parkingOnChan,1)
gopark(chanparkcommit,unsafe.Pointer(c.lock),waitReasonChanSend,traceEvGoBlockSend,2)
//被喚醒后從這里開(kāi)始執(zhí)行
KeepAlive(ep)
ifmysg!=gp.waiting{
throw("Gwaitinglistiscorrupted")
gp.waiting=nil
gp.activeStackChans=false
closed:=!mysg.success
gp.param=nil
ifmysg.releasetime0{
blockevent(mysg.releasetime-t0,2)
mysg.c=nil
releaseSudog(mysg)
//被喚醒后發(fā)現(xiàn)channel關(guān)閉了,panic
ifclosed{
ifc.closed==0{
throw("chansend:spuriouswakeup")
panic(plainError("sendonclosedchannel"))
returntrue
整體流程為:
如果當(dāng)前操作為非阻塞,channel沒(méi)有關(guān)閉,且容量滿了,無(wú)法發(fā)送,直接返回
從接收隊(duì)列彈出一個(gè)協(xié)程的包裝結(jié)構(gòu)sudog,如果能彈出,即有等到接收的協(xié)程,說(shuō)明:
該channel要么是無(wú)緩沖,要么緩沖區(qū)為空,不然不可能有協(xié)程在等待將要發(fā)送的數(shù)據(jù)拷貝到該協(xié)程的接收指針上,返回這里直接從發(fā)送者拷貝到接收者的內(nèi)存,而不是先把數(shù)據(jù)拷貝到緩沖區(qū),再?gòu)木彌_區(qū)拷貝到接收者,節(jié)約了一次內(nèi)存拷貝
否則看看緩沖區(qū)還有空間,如果有,將數(shù)據(jù)拷貝到緩沖區(qū)上,也返回
接下來(lái)就是既沒(méi)有接收者等待,緩沖區(qū)也為空的情況,就需要將當(dāng)前協(xié)程包裝成sudog,阻塞到channel上
將協(xié)程阻塞到channel的等待隊(duì)列時(shí),將其包裝成了sudog結(jié)構(gòu):
typesudogstruct{
//協(xié)程
g*g
//前一個(gè),后一個(gè)指針
next*sudog
prev*sudog
//等到發(fā)送的數(shù)據(jù)在哪,等待從哪個(gè)位置接收數(shù)據(jù)
elemunsafe.Pointer
acquiretimeint64
releasetimeint64
ticketuint32
isSelectbool
successbool
parent*sudog//semaRootbinarytree
waitlink*sudog//g.waitinglistorsemaRoot
waittail*sudog//semaRoot
//在哪個(gè)channel上等待
c*hchan//channel
其目的是:
g本身沒(méi)有存儲(chǔ)前一個(gè),后一個(gè)指針,需要用sudog結(jié)構(gòu)包裝才能加入隊(duì)列elem字段存儲(chǔ)等到發(fā)送的數(shù)據(jù)在哪,等待從哪個(gè)位置接收數(shù)據(jù),用于從數(shù)據(jù)能從協(xié)程到協(xié)程的直接拷貝
來(lái)看看一些子函數(shù):
1.判斷channel是否是滿的
funcfull(c*hchan)bool{
//無(wú)緩沖
ifc.dataqsiz==0{
//并且沒(méi)有其他協(xié)程在等待
returnc.recvq.first==nil
//有緩沖,但容量裝滿了
returnc.qcount==c.dataqsiz
2.send方法:
/**
c:要操作的channel
sg:彈出的接收者協(xié)程
ep:要發(fā)送的數(shù)據(jù)在的位置
funcsend(c*hchan,sg*sudog,epunsafe.Pointer,unlockffunc(),skipint){
//如果接收者指針不為空,直接把數(shù)據(jù)從ep拷貝到sg.elem
ifsg.elem!=nil{
sendDirect(c.elemtype,sg,ep)
sg.elem=nil
gp:=sg.g
unlockf()
gp.param=unsafe.Pointer(sg)
sg.success=true
ifsg.releasetime!=0{
sg.releasetime=cputicks()
//喚醒該接收者協(xié)程
goready(gp,skip+1)
接收
從channel中接收數(shù)據(jù)有幾種寫(xiě)法:
帶不帶ok接不接收返回值
根據(jù)帶不帶ok,決定用下面哪個(gè)方法
funcchanrecv1(c*hchan,elemunsafe.Pointer){
chanrecv(c,elem,true)
funcchanrecv2(c*hchan,elemunsafe.Pointer)(receivedbool){
_,received=chanrecv(c,elem,true)
return
根據(jù)接不接收返回值,決定elem是不是nil
最終都會(huì)調(diào)用chanrecv方法:
funcchanrecv(c*hchan,epunsafe.Pointer,blockbool)(selected,receivedbool){
//如果channel為nil,根據(jù)參數(shù)中是否阻塞來(lái)決定是否阻塞
ifc==nil{
if!block{
return
gopark(nil,nil,waitReasonChanReceiveNilChan,traceEvGoStop,2)
throw("unreachable")
//非阻塞,并且channel為空
if!blockempty(c){
//如果還沒(méi)關(guān)閉,直接返回
ifatomic.Load(c.closed)==0{
return
//否則已經(jīng)關(guān)閉,
//如果為空,返回該類(lèi)型的零值
ifempty(c){
ifep!=nil{
typedmemclr(c.elemtype,ep)
returntrue,false
lock(c.lock)
//同樣,如果channel已經(jīng)關(guān)閉,且緩沖區(qū)沒(méi)有元素,返回該類(lèi)型零值
ifc.closed!=0c.qcount==0{
unlock(c.lock)
ifep!=nil{
typedmemclr(c.elemtype,ep)
returntrue,false
//如果有發(fā)送者正在阻塞,說(shuō)明:
//1.無(wú)緩沖
//2.有緩沖,但緩沖區(qū)滿了。因?yàn)橹挥芯彌_區(qū)滿了,才可能有發(fā)送者在等待
ifsg:=c.sendq.dequeue();sg!=nil{
//將數(shù)據(jù)從緩沖區(qū)拷貝到ep,再將sg的數(shù)據(jù)拷貝到緩沖區(qū),該函數(shù)詳細(xì)流程可看下文
recv(c,sg,ep,func(){unlock(c.lock)},3)
returntrue,true
//如果緩存區(qū)有數(shù)據(jù),
ifc.qcount0{
//qp為緩沖區(qū)中下一次接收的位置
qp:=chanbuf(c,c.recvx)
//將數(shù)據(jù)從qp拷貝到ep
ifep!=nil{
typedmemmove(c.elemtype,ep,qp)
typedmemclr(c.elemtype,qp)
c.recvx++
ifc.recvx==c.dataqsiz{
c.recvx=0
c.qcount--
unlock(c.lock)
returntrue,true
//接下來(lái)就是既沒(méi)有發(fā)送者在等待,也緩沖區(qū)也沒(méi)數(shù)據(jù)
if!block{
unlock(c.lock)
returnfalse,false
//將當(dāng)前協(xié)程包裝成sudog,阻塞到channel中
gp:=getg()
mysg:=acquireSudog()
mysg.releasetime=0
ift0!=0{
mysg.releasetime=-1
//記錄接收地址
mysg.elem=ep
mysg.waitlink=nil
gp.waiting=mysg
mysg.g=gp
mysg.isSelect=false
mysg.c=c
gp.param=nil
c.recvq.enqueue(mysg)
atomic.Store8(gp.parkingOnChan,1)
gopark(chanparkcommit,unsafe.Pointer(c.lock),waitReasonChanReceive,traceEvGoBlockRecv,2)
//從這里喚醒
ifmysg!=gp.waiting{
throw("Gwaitinglistiscorrupted")
gp.waiting=nil
gp.activeStackChans=false
ifmysg.releasetime0{
blockevent(mysg.releasetime-t0,2)
success:=mysg.success
gp.param=nil
mysg.c=nil
releaseSudog(mysg)
returntrue,success
接收流程如為:
如果channel為nil,根據(jù)參數(shù)中是否阻塞來(lái)決定是否阻塞
如果channel已經(jīng)關(guān)閉,且緩沖區(qū)沒(méi)有元素,返回該類(lèi)型零值
如果有發(fā)送者正在阻塞,說(shuō)明:
要么是無(wú)緩沖有緩沖,但緩沖區(qū)滿了。因?yàn)橹挥芯彌_區(qū)滿了,才可能有發(fā)送者在等待將數(shù)據(jù)從緩沖區(qū)拷貝到ep,再將發(fā)送者的數(shù)據(jù)拷貝到緩沖區(qū),并喚該發(fā)送者
如果緩存區(qū)有數(shù)據(jù),則從緩沖區(qū)將數(shù)據(jù)復(fù)制到ep,返回
接下來(lái)就是既沒(méi)有發(fā)送者在等待,也緩沖區(qū)也沒(méi)數(shù)據(jù)的情況:
將當(dāng)前協(xié)程包裝成sudog,阻塞到channel中
來(lái)看其中的子函數(shù)recv():
/**
c:操作的channel
sg:阻塞的發(fā)送協(xié)程
ep:接收者接收數(shù)據(jù)的地址
funcrecv(c*hchan,sg*sudog,epunsafe.Pointer,unlockffunc(),skipint){
//如果是無(wú)緩沖channel,直接將數(shù)據(jù)從發(fā)送者sg拷貝到ep
ifc.dataqsiz==0{
ifep!=nil{
recvDirect(c.elemtype,sg,ep)
//接下來(lái)是有緩沖,且緩沖區(qū)滿的情況
}else{
//qp為channel緩沖區(qū)中,接收者下一次接收的地址
qp:=chanbuf(c,c.recvx)
//將數(shù)據(jù)從qp拷貝到ep
ifep!=nil{
typedmemmove(c.elemtype,ep,qp)
//將發(fā)送者的數(shù)據(jù)從sg.elem拷貝到qp
typedmemmove(c.elemtype,qp,sg.elem)
c.recvx++
ifc.recvx==c.dataqsiz{
c.recvx=0
//由于一接收已發(fā)送,緩沖區(qū)還是滿的,因此c.sendx=c.recvx
c.sendx=c.recvx
sg.elem=nil
gp:=sg.g
unlockf()
gp.param=unsafe.Pointer(sg)
sg.success=true
ifsg.releasetime!=0{
sg.releasetime=cputicks()
//喚醒發(fā)送者
goready(gp,skip+1)
關(guān)閉
funcclosechan(c*hchan){
//不能關(guān)閉空channel
ifc==nil{
panic(plainError("closeofnilchannel"))
lock(c.lock)
//不能重復(fù)關(guān)閉
ifc.closed!=0{
unlock(c.lock)
panic(plainError("closeofclosedchannel"))
//修改關(guān)閉狀態(tài)
c.close
溫馨提示
- 1. 本站所有資源如無(wú)特殊說(shuō)明,都需要本地電腦安裝OFFICE2007和PDF閱讀器。圖紙軟件為CAD,CAXA,PROE,UG,SolidWorks等.壓縮文件請(qǐng)下載最新的WinRAR軟件解壓。
- 2. 本站的文檔不包含任何第三方提供的附件圖紙等,如果需要附件,請(qǐng)聯(lián)系上傳者。文件的所有權(quán)益歸上傳用戶所有。
- 3. 本站RAR壓縮包中若帶圖紙,網(wǎng)頁(yè)內(nèi)容里面會(huì)有圖紙預(yù)覽,若沒(méi)有圖紙預(yù)覽就沒(méi)有圖紙。
- 4. 未經(jīng)權(quán)益所有人同意不得將文件中的內(nèi)容挪作商業(yè)或盈利用途。
- 5. 人人文庫(kù)網(wǎng)僅提供信息存儲(chǔ)空間,僅對(duì)用戶上傳內(nèi)容的表現(xiàn)方式做保護(hù)處理,對(duì)用戶上傳分享的文檔內(nèi)容本身不做任何修改或編輯,并不能對(duì)任何下載內(nèi)容負(fù)責(zé)。
- 6. 下載文件中如有侵權(quán)或不適當(dāng)內(nèi)容,請(qǐng)與我們聯(lián)系,我們立即糾正。
- 7. 本站不保證下載資源的準(zhǔn)確性、安全性和完整性, 同時(shí)也不承擔(dān)用戶因使用這些下載資源對(duì)自己和他人造成任何形式的傷害或損失。
最新文檔
- 數(shù)字化營(yíng)銷(xiāo)賦能教育行業(yè)招生:2025年招生團(tuán)隊(duì)建設(shè)與培訓(xùn)方案
- T/31SIOT 001.4-2017工業(yè)物聯(lián)網(wǎng)應(yīng)用開(kāi)發(fā)組件規(guī)范第4部分:監(jiān)視與控制
- 2025年教育精準(zhǔn)扶貧項(xiàng)目對(duì)農(nóng)村學(xué)校師資隊(duì)伍穩(wěn)定性的影響
- 農(nóng)村土地流轉(zhuǎn)規(guī)范化管理:2025年政策創(chuàng)新與實(shí)施效果分析報(bào)告
- 文化與科技融合在智慧教育平臺(tái)中的應(yīng)用模式研究報(bào)告
- DB37/T 4440.4-2021城市軌道交通互聯(lián)互通體系規(guī)范信號(hào)系統(tǒng)第4部分:車(chē)載人機(jī)界面
- 有肺栓塞的危險(xiǎn)護(hù)理查房
- 2025年模具制造數(shù)字化設(shè)計(jì)仿真技術(shù)在精密模具制造中的關(guān)鍵作用
- 廣告設(shè)計(jì)畢業(yè)答辯
- 愛(ài)上語(yǔ)文課課件設(shè)計(jì)與實(shí)踐
- 企業(yè)管理-《資產(chǎn)減值損失的稅務(wù)情況說(shuō)明》
- 高速公路養(yǎng)護(hù)施工安全管理經(jīng)驗(yàn)
- 老年人智能手機(jī)使用教程課件
- 3.6.3關(guān)門(mén)車(chē)課件講解
- 貴陽(yáng)2024年貴州貴陽(yáng)貴安事業(yè)單位招聘599人筆試歷年典型考題及考點(diǎn)附答案解析
- IATF16949-COP-內(nèi)部審核檢查表+填寫(xiě)記錄
- 實(shí)際控制人與法人協(xié)議模板
- 全屋家具定制合同
- 大數(shù)據(jù)技術(shù)基礎(chǔ)(第2版)全套教學(xué)課件
- 康養(yǎng)旅游區(qū)項(xiàng)目可行性研究報(bào)告
- 大鎖孫天宇小品《時(shí)間都去哪了》臺(tái)詞劇本完整版-一年一度喜劇大賽
評(píng)論
0/150
提交評(píng)論