123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223 |
- package databusutil
- import (
- "runtime"
- "sync"
- "time"
- "go-common/library/queue/databus"
- xtime "go-common/library/time"
- )
- const (
- _stateStarted = 1
- _stateClosed = 2
- )
- // Config the config is the base configuration for initiating a new group.
- type Config struct {
- // Size merge size
- Size int
- // Num merge goroutine num
- Num int
- // Ticker duration of submit merges when no new message
- Ticker xtime.Duration
- // Chan size of merge chan and done chan
- Chan int
- }
- func (c *Config) fix() {
- if c.Size <= 0 {
- c.Size = 1024
- }
- if int64(c.Ticker) <= 0 {
- c.Ticker = xtime.Duration(time.Second * 5)
- }
- if c.Num <= 0 {
- c.Num = runtime.GOMAXPROCS(0)
- }
- if c.Chan <= 0 {
- c.Chan = 1024
- }
- }
- type message struct {
- next *message
- data *databus.Message
- object interface{}
- done bool
- }
- // Group group.
- type Group struct {
- c *Config
- head, last *message
- state int
- mu sync.Mutex
- mc []chan *message // merge chan
- dc chan []*message // done chan
- qc chan struct{} // quit chan
- msg <-chan *databus.Message
- New func(msg *databus.Message) (interface{}, error)
- Split func(msg *databus.Message, data interface{}) int
- Do func(msgs []interface{})
- pool *sync.Pool
- }
- // NewGroup new a group.
- func NewGroup(c *Config, m <-chan *databus.Message) *Group {
- // NOTE if c || m == nil runtime panic
- if c == nil {
- c = new(Config)
- }
- c.fix()
- g := &Group{
- c: c,
- msg: m,
- mc: make([]chan *message, c.Num),
- dc: make(chan []*message, c.Chan),
- qc: make(chan struct{}),
- pool: &sync.Pool{
- New: func() interface{} {
- return new(message)
- },
- },
- }
- for i := 0; i < c.Num; i++ {
- g.mc[i] = make(chan *message, c.Chan)
- }
- return g
- }
- // Start start group, it is safe for concurrent use by multiple goroutines.
- func (g *Group) Start() {
- g.mu.Lock()
- if g.state == _stateStarted {
- g.mu.Unlock()
- return
- }
- g.state = _stateStarted
- g.mu.Unlock()
- go g.consumeproc()
- for i := 0; i < g.c.Num; i++ {
- go g.mergeproc(g.mc[i])
- }
- go g.commitproc()
- }
- // Close close group, it is safe for concurrent use by multiple goroutines.
- func (g *Group) Close() (err error) {
- g.mu.Lock()
- if g.state == _stateClosed {
- g.mu.Unlock()
- return
- }
- g.state = _stateClosed
- g.mu.Unlock()
- close(g.qc)
- return
- }
- func (g *Group) message() *message {
- return g.pool.Get().(*message)
- }
- func (g *Group) freeMessage(m *message) {
- *m = message{}
- g.pool.Put(m)
- }
- func (g *Group) consumeproc() {
- var (
- ok bool
- err error
- msg *databus.Message
- )
- for {
- select {
- case <-g.qc:
- return
- case msg, ok = <-g.msg:
- if !ok {
- g.Close()
- return
- }
- }
- // marked head to first commit
- m := g.message()
- m.data = msg
- if m.object, err = g.New(msg); err != nil {
- g.freeMessage(m)
- continue
- }
- g.mu.Lock()
- if g.head == nil {
- g.head = m
- g.last = m
- } else {
- g.last.next = m
- g.last = m
- }
- g.mu.Unlock()
- g.mc[g.Split(m.data, m.object)%g.c.Num] <- m
- }
- }
- func (g *Group) mergeproc(mc <-chan *message) {
- ticker := time.NewTicker(time.Duration(g.c.Ticker))
- msgs := make([]interface{}, 0, g.c.Size)
- marks := make([]*message, 0, g.c.Size)
- for {
- select {
- case <-g.qc:
- return
- case msg := <-mc:
- msgs = append(msgs, msg.object)
- marks = append(marks, msg)
- if len(msgs) < g.c.Size {
- continue
- }
- case <-ticker.C:
- }
- if len(msgs) > 0 {
- g.Do(msgs)
- msgs = make([]interface{}, 0, g.c.Size)
- }
- if len(marks) > 0 {
- g.dc <- marks
- marks = make([]*message, 0, g.c.Size)
- }
- }
- }
- func (g *Group) commitproc() {
- commits := make(map[int32]*databus.Message)
- for {
- select {
- case <-g.qc:
- return
- case done := <-g.dc:
- // merge partitions to commit offset
- for _, d := range done {
- d.done = true
- }
- g.mu.Lock()
- for g.head != nil && g.head.done {
- cur := g.head
- commits[cur.data.Partition] = cur.data
- g.head = cur.next
- g.freeMessage(cur)
- }
- g.mu.Unlock()
- for k, m := range commits {
- m.Commit()
- delete(commits, k)
- }
- }
- }
- }
|