group.go 3.9 KB


  1. package databusutil
  2. import (
  3. "runtime"
  4. "sync"
  5. "time"
  6. "go-common/library/queue/databus"
  7. xtime "go-common/library/time"
  8. )
  9. const (
  10. _stateStarted = 1
  11. _stateClosed = 2
  12. )
  13. // Config the config is the base configuration for initiating a new group.
  14. type Config struct {
  15. // Size merge size
  16. Size int
  17. // Num merge goroutine num
  18. Num int
  19. // Ticker duration of submit merges when no new message
  20. Ticker xtime.Duration
  21. // Chan size of merge chan and done chan
  22. Chan int
  23. }
  24. func (c *Config) fix() {
  25. if c.Size <= 0 {
  26. c.Size = 1024
  27. }
  28. if int64(c.Ticker) <= 0 {
  29. c.Ticker = xtime.Duration(time.Second * 5)
  30. }
  31. if c.Num <= 0 {
  32. c.Num = runtime.GOMAXPROCS(0)
  33. }
  34. if c.Chan <= 0 {
  35. c.Chan = 1024
  36. }
  37. }
  38. type message struct {
  39. next *message
  40. data *databus.Message
  41. object interface{}
  42. done bool
  43. }
  44. // Group group.
  45. type Group struct {
  46. c *Config
  47. head, last *message
  48. state int
  49. mu sync.Mutex
  50. mc []chan *message // merge chan
  51. dc chan []*message // done chan
  52. qc chan struct{} // quit chan
  53. msg <-chan *databus.Message
  54. New func(msg *databus.Message) (interface{}, error)
  55. Split func(msg *databus.Message, data interface{}) int
  56. Do func(msgs []interface{})
  57. pool *sync.Pool
  58. }
  59. // NewGroup new a group.
  60. func NewGroup(c *Config, m <-chan *databus.Message) *Group {
  61. // NOTE if c || m == nil runtime panic
  62. if c == nil {
  63. c = new(Config)
  64. }
  65. c.fix()
  66. g := &Group{
  67. c: c,
  68. msg: m,
  69. mc: make([]chan *message, c.Num),
  70. dc: make(chan []*message, c.Chan),
  71. qc: make(chan struct{}),
  72. pool: &sync.Pool{
  73. New: func() interface{} {
  74. return new(message)
  75. },
  76. },
  77. }
  78. for i := 0; i < c.Num; i++ {
  79. g.mc[i] = make(chan *message, c.Chan)
  80. }
  81. return g
  82. }
  83. // Start start group, it is safe for concurrent use by multiple goroutines.
  84. func (g *Group) Start() {
  85. g.mu.Lock()
  86. if g.state == _stateStarted {
  87. g.mu.Unlock()
  88. return
  89. }
  90. g.state = _stateStarted
  91. g.mu.Unlock()
  92. go g.consumeproc()
  93. for i := 0; i < g.c.Num; i++ {
  94. go g.mergeproc(g.mc[i])
  95. }
  96. go g.commitproc()
  97. }
  98. // Close close group, it is safe for concurrent use by multiple goroutines.
  99. func (g *Group) Close() (err error) {
  100. g.mu.Lock()
  101. if g.state == _stateClosed {
  102. g.mu.Unlock()
  103. return
  104. }
  105. g.state = _stateClosed
  106. g.mu.Unlock()
  107. close(g.qc)
  108. return
  109. }
  110. func (g *Group) message() *message {
  111. return g.pool.Get().(*message)
  112. }
  113. func (g *Group) freeMessage(m *message) {
  114. *m = message{}
  115. g.pool.Put(m)
  116. }
  117. func (g *Group) consumeproc() {
  118. var (
  119. ok bool
  120. err error
  121. msg *databus.Message
  122. )
  123. for {
  124. select {
  125. case <-g.qc:
  126. return
  127. case msg, ok = <-g.msg:
  128. if !ok {
  129. g.Close()
  130. return
  131. }
  132. }
  133. // marked head to first commit
  134. m := g.message()
  135. m.data = msg
  136. if m.object, err = g.New(msg); err != nil {
  137. g.freeMessage(m)
  138. continue
  139. }
  140. g.mu.Lock()
  141. if g.head == nil {
  142. g.head = m
  143. g.last = m
  144. } else {
  145. g.last.next = m
  146. g.last = m
  147. }
  148. g.mu.Unlock()
  149. g.mc[g.Split(m.data, m.object)%g.c.Num] <- m
  150. }
  151. }
  152. func (g *Group) mergeproc(mc <-chan *message) {
  153. ticker := time.NewTicker(time.Duration(g.c.Ticker))
  154. msgs := make([]interface{}, 0, g.c.Size)
  155. marks := make([]*message, 0, g.c.Size)
  156. for {
  157. select {
  158. case <-g.qc:
  159. return
  160. case msg := <-mc:
  161. msgs = append(msgs, msg.object)
  162. marks = append(marks, msg)
  163. if len(msgs) < g.c.Size {
  164. continue
  165. }
  166. case <-ticker.C:
  167. }
  168. if len(msgs) > 0 {
  169. g.Do(msgs)
  170. msgs = make([]interface{}, 0, g.c.Size)
  171. }
  172. if len(marks) > 0 {
  173. g.dc <- marks
  174. marks = make([]*message, 0, g.c.Size)
  175. }
  176. }
  177. }
  178. func (g *Group) commitproc() {
  179. commits := make(map[int32]*databus.Message)
  180. for {
  181. select {
  182. case <-g.qc:
  183. return
  184. case done := <-g.dc:
  185. // merge partitions to commit offset
  186. for _, d := range done {
  187. d.done = true
  188. }
  189. g.mu.Lock()
  190. for g.head != nil && g.head.done {
  191. cur := g.head
  192. commits[cur.data.Partition] = cur.data
  193. g.head = cur.next
  194. g.freeMessage(cur)
  195. }
  196. g.mu.Unlock()
  197. for k, m := range commits {
  198. m.Commit()
  199. delete(commits, k)
  200. }
  201. }
  202. }
  203. }