lsn13zeromq源碼分析之性能高效_第1頁
lsn13zeromq源碼分析之性能高效_第2頁
lsn13zeromq源碼分析之性能高效_第3頁
lsn13zeromq源碼分析之性能高效_第4頁
lsn13zeromq源碼分析之性能高效_第5頁
已閱讀5頁,還剩25頁未讀, 繼續(xù)免費(fèi)閱讀

下載本文檔

版權(quán)說明:本文檔由用戶提供并上傳,收益歸屬內(nèi)容提供方,若內(nèi)容存在侵權(quán),請(qǐng)進(jìn)行舉報(bào)或認(rèn)領(lǐng)

文檔簡介

本課講師:Lee哥為什么高效--ZeroMQ自我介紹Lee

李哥10年(C/C++)開發(fā)經(jīng)驗(yàn)和產(chǎn)品管理經(jīng)驗(yàn),研究過多款C/C++優(yōu)秀開源軟件的框架,主持開發(fā)過大型音頻廣播云平臺(tái),精通需求分析、架構(gòu)分析和產(chǎn)品管理,精通敏捷開發(fā)流程和項(xiàng)目管理。華為技術(shù)有限公司項(xiàng)目經(jīng)理3years諾基亞通信系統(tǒng)技術(shù)4years技術(shù)經(jīng)理comtom產(chǎn)品總監(jiān)2years高屋建瓴ONTENTSC目錄代碼藝術(shù)23工程藝術(shù)101ZeroMQ架構(gòu)背后的歷史最初設(shè)想是作為開發(fā)的重點(diǎn)轉(zhuǎn)變重點(diǎn)主要集中開源和發(fā)布等股票交易中的一成為構(gòu)建分布式于提高系統(tǒng)的等…….個(gè)極快速的消息應(yīng)用程序而提供可用性,將學(xué)通信系統(tǒng),因此的一個(gè)通用系統(tǒng)習(xí)曲線平坦化重點(diǎn)放在了高度,支持任意模式。我們已經(jīng)采優(yōu)化上。項(xiàng)目開始的頭一年都花的消息通信、多種傳輸機(jī)制、對(duì)用了BSD套接字API,嘗試整在制定性能基準(zhǔn)多種編程語言的理單個(gè)消息通測試的方法上了綁定等等。。信模式的語義,并嘗試設(shè)計(jì)出等等一個(gè)盡可能高效的架構(gòu)。02高屋建瓴(1)AMQP穿越broker,通過兩次協(xié)議棧服務(wù)之間和不同公司應(yīng)該是橋接程序庫03如何高效和可用全局狀態(tài)下的庫可用性能RTTTPS1.

關(guān)閉Nagle算法,關(guān)閉網(wǎng)卡中斷匯聚2.

區(qū)別大小消息消息、零拷貝ZMQ、glibc、用戶/內(nèi)核空間邊以太網(wǎng)鏈路層、網(wǎng)卡本身,然后批量發(fā)送和接收消息非同步零數(shù)據(jù)CPU遷移的并發(fā)編程無鎖算法高屋建瓴ONTENTSC目錄代碼藝術(shù)工程藝術(shù)12301整體架構(gòu)networkworkthread1zmq_socketlistenersessionenginecreateoutboundinboundpipessessionengineoutboundinboundworkthread2APIsessionengineoutboundinboundmailboxpollermailboxpollerapplication02消息的結(jié)構(gòu)//實(shí)際代碼

zmq_msg_t

msg;zmq_msg_init(&msg);……typedef

struct

zmq_msg_t{unsigned

char

_

[64];}

zmq_msg_t;struct

{metadata_t

*metadata;content_t

*content;unsigned

char

unused

[msg_t_size

-…];unsigned

char

type;unsigned

char

flags;char

group

[16];uint32_t

routing_id;}

lmsg;03大小消息區(qū)別對(duì)待消息內(nèi)存管理小消息(小于83字節(jié))分配在棧上大消息->零拷貝消息copy僅僅是引用計(jì)數(shù)增加04multipart

message因?yàn)閠cp是一種字節(jié)流類型的協(xié)議,木有邊界,所以把該消息邊界的制定留給了應(yīng)用層。通常有兩種方式實(shí)現(xiàn):在傳輸?shù)臄?shù)據(jù)中添加分隔符。在每條消息中添加size字段。frame1frame2empty

message

part分包到組包

-->

解決粘包和半包zmq_msg_send

(&message_1,

socket,

ZMQ_SNDMORE);…zmq_msg_send

(&message_m,

socket,

ZMQ_SNDMORE);…zmq_msg_send

(&message_n,

socket,

0);05動(dòng)態(tài)調(diào)整批量發(fā)送out_batch_size

=

8192

bytes06批量接收消息network內(nèi)存塊………….in_batch_size

=

8192

bytesrecv

/

timeinbound

pipezmq_socketAPI高屋建瓴ONTENTSC目錄代碼藝術(shù)工程藝術(shù)12301類的層次02類的層次1、object_t,主要用于發(fā)送命令和處理命令,所有繼承object_t的子類都具備該類的功能

2、io_thread_t,內(nèi)含一個(gè)poller,可監(jiān)聽句柄的讀寫異常狀態(tài),繼承自object_t,具有接收命令、處理命令、發(fā)送命令的功能

3、io_object_t,可以獲取一個(gè)io_thread_t的poller,從而具備poller功能,所有繼承自該類的子類都具有pollere功能,可監(jiān)聽句柄的讀寫異常狀態(tài)4、reaper_t,zmq的回收線程

5、own_t,zmq的對(duì)象樹結(jié)點(diǎn),或者說多叉樹的結(jié)點(diǎn),其主要用于對(duì)象的銷毀,可以想到,對(duì)象的銷毀就是這棵樹的銷毀過程,必須要使用深度優(yōu)先的算法來銷毀。關(guān)于zmq對(duì)象樹在Internal

Architecture

of

libzmq有詳細(xì)講解6、tcp_connector_t,zmq_socket的連接器,使用她來建立tcp連接

7、tcp_listener_t,zmq_socket的監(jiān)聽器8、stream_engine,負(fù)責(zé)處理io事件中的一種----網(wǎng)絡(luò)事件,把網(wǎng)絡(luò)字節(jié)流轉(zhuǎn)換成zeromq的msg_t消息傳遞給session_base_t。另外一些和版本兼容相關(guān)的雜務(wù)也stream_engine處理的。stream_engine_t處理完雜務(wù),到session_base_t就只看見msg_t了。

9、session_base_t,管理zmq_socket的連接和通信,主要與engine進(jìn)行交換

10、socket_base_t,zeromq的socket,在zmq中,被當(dāng)成一種特殊的”線程“,具有收發(fā)命令的功能03無鎖隊(duì)列盡量少的申請(qǐng)內(nèi)存,盡量使用即將釋放的內(nèi)存一個(gè)讀線程,一個(gè)寫線程,讀寫之間使用無鎖解決互斥批量寫入,預(yù)取機(jī)制,提高效率04ZeroMQ下的內(nèi)存池–yqueue_t05ZeroMQ下的內(nèi)存池–ypipe_t//

Allocation-efficient

queue

to

store

pipe

items.// Front

of

the

queue

points

to

the

first

prefetched

item,

back

of// the

pipe

points

to

last

un-flushed

item.

Front

is

used

only

by// reader

thread,

while

back

is

used

only

by

writer

thread.yqueue_t<T,N>queue;

/*

內(nèi)存池*/// Points

to

the

first

un-flushed

item.

This

variable

is

used

exclusively

by

writer

thread.T

*w;// Points

to

the

first

un-prefetched

item.

This

variable

is

used

exclusively

by

reader

threT

*r;// Points

to

the

first

item

to

be

flushed

in

the

future.T

*f;// The

single

point

of

contention

between

writer

and

reader

thread.// Points

past

the

last

flushed

item.

If

it

is

NULL,

reader

is

asleep.// This

pointer

should

be

always

accessed

using

atomic

operations.atomic_ptr_t

<T>

c;06Ypipe_t的初始化inline

ypipe_t

(){// Insert

terminator

element

into

the

queue.queue.push

();// Let

all

the

pointers

to

point

to

the

terminator.// (unless

pipe

is

dead,

in

which

case

c

is

set

to

NULL).r

=

w

=

f

=

&queue.back

();c.set

(&queue.back

());}07Ypipe_t的批量寫機(jī)制// Write

an

item

to

the

pipe. Don't

flush

it

yet.

If

incomplete

is// set

to

true

the

item

is

assumed

to

be

continued

by

items// subsequently

written

to

the

pipe.

Incomplete

items

are

never// flushed

down

the

stream.inline

void

write

(const

T

&value_,

bool

incomplete_){// Place

the

value

to

the

queue,

add

new

terminator

element.queue.back

()

=

value_;queue.push

();// Move

the

"flush

up

to

here"

poiter.if(!incomplete_)f

=

&queue.back

();}08Ypipe_t的預(yù)讀機(jī)制inline

bool

read

(T*value_){// Try

to

prefetch

a

value.if

(!check_read

())return

false;// There

was

at

least

one

value

prefetched.// Return

it

to

the

caller.*value_

=

queue.front

();queue.pop

();return

true;}inline

bool

check_read

(){// Was

the

value

prefetched

already?if

(&queue.front

()

!=

r

&&

r)return

true;r

=

c.cas

(&queue.front

(),

NULL);if

(&queue.front()

==

r

||

!r)return

false;return

true;}09嘿?。?!--這就是原子操作T*

atomic_ptr_t::cas(w,f){ret=c

;if(c==w)c

=

f;return

ret;}10高效IO線程高效IO處理IO線程綁定CPUmailbox;pipe多個(gè)IO線程

真正并發(fā)公平處理連接

公平處理流量11Mailbox(1)IO

threadmobileboxpollersessionengine…socketmobileboxstruct

command_t{//

Object

to

process

the

command.zmq::object_t

*destination;type_t

type;union

args_t

args;}12Mailbox(2)mobilebox_t{cpipe_t

cpipe;signaler_t

signaler;}pipeeventfdsocketpairsocket

…senderpollerreceiverio

thread創(chuàng)建時(shí),創(chuàng)建了poller,且創(chuàng)建了signaler和

pipe;然后這個(gè)mailbox被放在ctx的slot里,同時(shí)也被綁定到socket里,發(fā)cmd時(shí),首先放在pipe里,然后signaler發(fā)送一個(gè)信號(hào),poller收到信號(hào)后,調(diào)用iothread的in_event函數(shù)讀取pipe里的cmd,之后調(diào)用具體的對(duì)象處理cmd。13公平處理連接到公平處理流量zmq::io_thread_t

*zmq::ctx_t::choose_io_thread{if

(io_threads.empty

())return

NULL;//

Find

the

I/O

thread

with

minimum

load.int

min_load

=-1;io_thread_t

*selected_io_thread

=

NULL;for

(io_threads_t::size_type

i

=

0;

i

!=

io_threadif

(!affinity_

||

(affinity_

&(uint64_t

(1)

<<

i)))int

load

=

io_threads

[i]->get_load

();if

(selected_io_thread

==

NULL

||

load

<min_load

=

load;selected_io_thread

=

io_threads

[i];}}}return

selected_io_thread;}void

zmq::tcp_listener_t::in_event

(){fd_t

fd

=

accept

();…io_thread_t

*io_thread

=

choose_io_thread

(options.affinity);...socket->event_accepted

(endpoint,

(int)

fd);}int64_t

affinity;/*

Incoming

connections

on

TCP

port

5555

shall

be

handled

by

I/O

thread

1

*/affinity

=

1;rc

=

zmq_setsockopt

(socket,

ZMQ_AFFINITY,

&affinity,

sizeof

(affinity));assert

(rc);rc

=

zmq_bind

(socket,

"tcp://lo:5555");

assert

(rc);/*

Incoming

connections

on

TCP

port

5556

shall

be

handled

by

I/O

thread

2

*/affinit

溫馨提示

  • 1. 本站所有資源如無特殊說明,都需要本地電腦安裝OFFICE2007和PDF閱讀器。圖紙軟件為CAD,CAXA,PROE,UG,SolidWorks等.壓縮文件請(qǐng)下載最新的WinRAR軟件解壓。
  • 2. 本站的文檔不包含任何第三方提供的附件圖紙等,如果需要附件,請(qǐng)聯(lián)系上傳者。文件的所有權(quán)益歸上傳用戶所有。
  • 3. 本站RAR壓縮包中若帶圖紙,網(wǎng)頁內(nèi)容里面會(huì)有圖紙預(yù)覽,若沒有圖紙預(yù)覽就沒有圖紙。
  • 4. 未經(jīng)權(quán)益所有人同意不得將文件中的內(nèi)容挪作商業(yè)或盈利用途。
  • 5. 人人文庫網(wǎng)僅提供信息存儲(chǔ)空間,僅對(duì)用戶上傳內(nèi)容的表現(xiàn)方式做保護(hù)處理,對(duì)用戶上傳分享的文檔內(nèi)容本身不做任何修改或編輯,并不能對(duì)任何下載內(nèi)容負(fù)責(zé)。
  • 6. 下載文件中如有侵權(quán)或不適當(dāng)內(nèi)容,請(qǐng)與我們聯(lián)系,我們立即糾正。
  • 7. 本站不保證下載資源的準(zhǔn)確性、安全性和完整性, 同時(shí)也不承擔(dān)用戶因使用這些下載資源對(duì)自己和他人造成任何形式的傷害或損失。

最新文檔

評(píng)論

0/150

提交評(píng)論