cache_delay.go 1.7 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798
  1. package service
  2. import (
  3. "context"
  4. "time"
  5. "go-common/app/service/main/account/model/queue"
  6. "go-common/library/log"
  7. "go-common/library/stat/prom"
  8. )
  9. // Item is
  10. type Item struct {
  11. Mid int64
  12. Time time.Time
  13. Action string
  14. }
  15. // Compare is
  16. func (i *Item) Compare(other queue.Item) int {
  17. o := asItem(other)
  18. if o == nil {
  19. return -1
  20. }
  21. if i.Time.Equal(o.Time) {
  22. return 0
  23. }
  24. if i.Time.After(o.Time) {
  25. return 1
  26. }
  27. return -1
  28. }
  29. // HashCode is
  30. func (i *Item) HashCode() int64 {
  31. return i.Mid
  32. }
  33. func asItem(in queue.Item) *Item {
  34. o, ok := in.(*Item)
  35. if !ok {
  36. return nil
  37. }
  38. return o
  39. }
  40. func asItems(in []queue.Item) []*Item {
  41. out := make([]*Item, 0, len(in))
  42. for _, i := range in {
  43. item := asItem(i)
  44. if item == nil {
  45. continue
  46. }
  47. out = append(out, item)
  48. }
  49. return out
  50. }
  51. func (s *Service) cachedelayproc(ctx context.Context) {
  52. fiveSeconds := time.Second * 5
  53. t := time.NewTicker(fiveSeconds)
  54. delayed := func(t time.Time) bool {
  55. top := asItem(s.cachepq.Peek())
  56. if top == nil {
  57. log.Info("Empty cache queue top at: %v", t)
  58. return false
  59. }
  60. if t.Sub(top.Time) < fiveSeconds {
  61. log.Info("Top item is in five seconds, skip and waiting for next tick")
  62. return false
  63. }
  64. return true
  65. }
  66. for ti := range t.C {
  67. prom.BusinessInfoCount.State("cachepq-enqueued", int64(s.cachepq.Len()))
  68. if !delayed(ti) {
  69. continue
  70. }
  71. for {
  72. qitems, err := s.cachepq.Get(1)
  73. if err != nil {
  74. log.Error("Failed to get queue items from cache queue: %+v", err)
  75. return
  76. }
  77. items := asItems(qitems)
  78. for _, it := range items {
  79. log.Info("Delete cache in delay queue with item: %+v", it)
  80. s.dao.DelCache(ctx, it.Mid)
  81. }
  82. if s.cachepq.Empty() || !delayed(time.Now()) {
  83. break
  84. }
  85. }
  86. }
  87. }