pubsub_test.go 1.6 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970
  1. package tcp
  2. import (
  3. "fmt"
  4. "io/ioutil"
  5. "net"
  6. "testing"
  7. "time"
  8. "go-common/app/infra/databus/conf"
  9. . "github.com/smartystreets/goconvey/convey"
  10. )
  11. var (
  12. pubCfg = &conf.Kafka{
  13. Cluster: "test_topic",
  14. Brokers: []string{"172.22.33.174:9092", "172.22.33.183:9092", "172.22.33.185:9092"},
  15. }
  16. )
  17. func TestDatabus(t *testing.T) {
  18. Convey("Test publish:", t, func() {
  19. l, _ := net.Listen("tcp", ":8888")
  20. go func() {
  21. for {
  22. conn, err := l.Accept()
  23. if err != nil {
  24. continue
  25. }
  26. b, err := ioutil.ReadAll(conn)
  27. if err == nil {
  28. fmt.Printf("test conn: %s", b)
  29. }
  30. conn.Close()
  31. }
  32. }()
  33. conn, err := net.Dial("tcp", ":8888")
  34. So(err, ShouldBeNil)
  35. p, err := NewPub(newConn(conn, time.Second, time.Second), "pub", "", _testTopic, pubCfg)
  36. So(err, ShouldBeNil)
  37. key := []byte("key")
  38. header := []byte("header")
  39. msg := []byte("message")
  40. err = p.publish(key, header, msg)
  41. So(err, ShouldBeNil)
  42. time.Sleep(time.Second)
  43. Convey("test sub", func() {
  44. conn, _ := net.Dial("tcp", ":8888")
  45. s, err := NewSub(newConn(conn, time.Second, time.Second), "sub", "", _testTopic, pubCfg, 1)
  46. So(err, ShouldBeNil)
  47. t.Logf("subscriptions: %v", s.consumer.Subscriptions())
  48. for {
  49. select {
  50. case msg := <-s.consumer.Messages():
  51. s.consumer.CommitOffsets()
  52. t.Logf("sub message: %s timestamp: %d", msg.Value, msg.Timestamp.Unix())
  53. return
  54. case err := <-s.consumer.Errors():
  55. t.Errorf("error: %v", err)
  56. So(err, ShouldBeNil)
  57. case n := <-s.consumer.Notifications():
  58. t.Logf("notify: %v", n)
  59. err := p.publish(key, header, msg)
  60. So(err, ShouldBeNil)
  61. }
  62. }
  63. })
  64. })
  65. }