mysql.go 1.2 KB

12345678910111213141516171819202122232425262728293031323334353637383940
  1. package dao
  2. import (
  3. "context"
  4. "go-common/app/infra/canal/model"
  5. "go-common/library/database/sql"
  6. "go-common/library/log"
  7. )
  8. const (
  9. _tidbPositonSQL = "SELECT name, cluster_id, offset, tso FROM tidb_info WHERE name = ?"
  10. _updateTidbPositonSQL = "INSERT INTO tidb_info(name, cluster_id, offset, tso) VALUES(?,?,?,?) ON DUPLICATE KEY UPDATE offset = ?, tso = ?"
  11. )
  12. // TiDBPosition get tidb positon
  13. func (d *Dao) TiDBPosition(c context.Context, name string) (res *model.TiDBInfo, err error) {
  14. res = &model.TiDBInfo{}
  15. if err = d.db.QueryRow(c, _tidbPositonSQL, name).Scan(&res.Name, &res.ClusterID, &res.Offset, &res.CommitTS); err != nil {
  16. if err == sql.ErrNoRows {
  17. res = nil
  18. err = nil
  19. return
  20. }
  21. log.Error("db.TidbPosition.Query error(%v,%v,%v)", _tidbPositonSQL, name, err)
  22. return
  23. }
  24. return
  25. }
  26. // UpdateTiDBPosition update tidb position
  27. func (d *Dao) UpdateTiDBPosition(c context.Context, info *model.TiDBInfo) (err error) {
  28. if info == nil {
  29. return
  30. }
  31. if _, err = d.db.Exec(c, _updateTidbPositonSQL, info.Name, info.ClusterID, info.Offset, info.CommitTS, info.Offset, info.CommitTS); err != nil {
  32. log.Error("db.UpdateTiDBPosition.Exec error(%v,%+v,%v)", _updateTidbPositonSQL, info, err)
  33. }
  34. return
  35. }