doc.go 3.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100
  1. /*
  2. Package databusutil provides a util for building databus based async job with
  3. single partition message aggregation and parallel consumption features.
  4. Group
  5. The group is the primary struct for working with this util.
  6. Applications create groups by calling the package NewGroup function with a
  7. databusutil config and a databus message chan.
  8. To start a initiated group, the application must call the group Start method.
  9. The application must call the group Close method when the application is
  10. done with the group.
  11. Callbacks
  12. After a new group is created, the following callbacks: New, Split and Do must
  13. be assigned, otherwise the job will not works as your expectation.
  14. The callback New represents how the consume proc of the group parsing the target
  15. object from a new databus message that it received for merging, if the error
  16. returned is not nil, the consume proc will omit this message and continue.
  17. A example of the callback New is:
  18. func newTestMsg(msg *databus.Message) (res interface{}, err error) {
  19. res = new(testMsg)
  20. if err = json.Unmarshal(msg.Value, &res); err != nil {
  21. log.Error("json.Unmarshal(%s) error(%v)", msg.Value, err)
  22. }
  23. return
  24. }
  25. The callback Split represents how the consume proc of the group getting the
  26. sharding dimension from a databus message or the object parsed from the databus
  27. message, it will be used along with the configuration item Num to decide which
  28. merge goroutine to use to merge the parsed object. In more detail, if we take
  29. the result of callback Split as sr, then the sharding result will be sr % Num.
  30. A example of the callback Split is:
  31. func split(msg *databus.Message, data interface{}) int {
  32. t, ok := data.(*testMsg)
  33. if !ok {
  34. return 0
  35. }
  36. return int(t.Mid)
  37. }
  38. If your messages is already assigned to their partitions corresponding to the split you want,
  39. you may want to directly use its partition as split, here is the example:
  40. func anotherSplit(msg *databus.Message, data interface{}) int {
  41. return int(msg.Partition)
  42. }
  43. Do not forget to ensure the max value your callback Split returns, as maxSplit,
  44. greater than or equal to the configuration item Num, otherwise the merge
  45. goroutines will not be fully used, in more detail, the last (Num - maxSplit)
  46. merge goroutines are initiated by will never be used.
  47. The callback Do represents how the merge proc of the group processing the merged
  48. objects, define your business in it.
  49. A example of the callback Do is:
  50. func do(msgs []interface{}) {
  51. for _, m := range msgs {
  52. // process messages you merged here, the example type asserts and prints each
  53. if msg, ok := m.(*testMsg); ok {
  54. fmt.Printf("msg: %+v", msg)
  55. }
  56. }
  57. }
  58. Usage Example
  59. The typical usage for databusutil is:
  60. // new a databus to subscribe from
  61. dsSub := databus.New(dsSubConf)
  62. defer dsSub.Close()
  63. // new a group
  64. g := NewGroup(
  65. c,
  66. dsSub.Messages(),
  67. )
  68. // fill callbacks
  69. g.New = yourNewFunc
  70. g.Split = yourSplitFunc
  71. g.Do = yourDoFunc
  72. // start the group
  73. g.Start()
  74. // must close the group before the job exits
  75. defer g.Close()
  76. // signal handler
  77. */
  78. package databusutil