service.go 7.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245
  1. // Copyright 2016 Google Inc. All Rights Reserved.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package pubsub
  15. import (
  16. "encoding/base64"
  17. "fmt"
  18. "net/http"
  19. "time"
  20. "golang.org/x/net/context"
  21. "google.golang.org/api/googleapi"
  22. raw "google.golang.org/api/pubsub/v1"
  23. )
  24. // service provides an internal abstraction to isolate the generated
  25. // PubSub API; most of this package uses this interface instead.
  26. // The single implementation, *apiService, contains all the knowledge
  27. // of the generated PubSub API (except for that present in legacy code).
  28. type service interface {
  29. createSubscription(ctx context.Context, topicName, subName string, ackDeadline time.Duration, pushConfig *PushConfig) error
  30. getSubscriptionConfig(ctx context.Context, subName string) (*SubscriptionConfig, string, error)
  31. listProjectSubscriptions(ctx context.Context, projName string) ([]string, error)
  32. deleteSubscription(ctx context.Context, name string) error
  33. subscriptionExists(ctx context.Context, name string) (bool, error)
  34. createTopic(ctx context.Context, name string) error
  35. deleteTopic(ctx context.Context, name string) error
  36. topicExists(ctx context.Context, name string) (bool, error)
  37. listProjectTopics(ctx context.Context, projName string) ([]string, error)
  38. listTopicSubscriptions(ctx context.Context, topicName string) ([]string, error)
  39. modifyAckDeadline(ctx context.Context, subName string, deadline time.Duration, ackIDs []string) error
  40. acknowledge(ctx context.Context, subName string, ackIDs []string) error
  41. fetchMessages(ctx context.Context, subName string, maxMessages int64) ([]*Message, error)
  42. publishMessages(ctx context.Context, topicName string, msgs []*Message) ([]string, error)
  43. }
  44. type apiService struct {
  45. s *raw.Service
  46. }
  47. func newPubSubService(client *http.Client, endpoint string) (*apiService, error) {
  48. s, err := raw.New(client)
  49. if err != nil {
  50. return nil, err
  51. }
  52. s.BasePath = endpoint
  53. return &apiService{s: s}, nil
  54. }
  55. func (s *apiService) createSubscription(ctx context.Context, topicName, subName string, ackDeadline time.Duration, pushConfig *PushConfig) error {
  56. var rawPushConfig *raw.PushConfig
  57. if pushConfig != nil {
  58. rawPushConfig = &raw.PushConfig{
  59. Attributes: pushConfig.Attributes,
  60. PushEndpoint: pushConfig.Endpoint,
  61. }
  62. }
  63. rawSub := &raw.Subscription{
  64. AckDeadlineSeconds: int64(ackDeadline.Seconds()),
  65. PushConfig: rawPushConfig,
  66. Topic: topicName,
  67. }
  68. _, err := s.s.Projects.Subscriptions.Create(subName, rawSub).Context(ctx).Do()
  69. return err
  70. }
  71. func (s *apiService) getSubscriptionConfig(ctx context.Context, subName string) (*SubscriptionConfig, string, error) {
  72. rawSub, err := s.s.Projects.Subscriptions.Get(subName).Context(ctx).Do()
  73. if err != nil {
  74. return nil, "", err
  75. }
  76. sub := &SubscriptionConfig{
  77. AckDeadline: time.Second * time.Duration(rawSub.AckDeadlineSeconds),
  78. PushConfig: PushConfig{
  79. Endpoint: rawSub.PushConfig.PushEndpoint,
  80. Attributes: rawSub.PushConfig.Attributes,
  81. },
  82. }
  83. return sub, rawSub.Topic, err
  84. }
  85. func (s *apiService) listProjectSubscriptions(ctx context.Context, projName string) ([]string, error) {
  86. subs := []string{}
  87. err := s.s.Projects.Subscriptions.List(projName).
  88. Pages(ctx, func(res *raw.ListSubscriptionsResponse) error {
  89. for _, s := range res.Subscriptions {
  90. subs = append(subs, s.Name)
  91. }
  92. return nil
  93. })
  94. if err != nil {
  95. return nil, err
  96. }
  97. return subs, nil
  98. }
  99. func (s *apiService) deleteSubscription(ctx context.Context, name string) error {
  100. _, err := s.s.Projects.Subscriptions.Delete(name).Context(ctx).Do()
  101. return err
  102. }
  103. func (s *apiService) subscriptionExists(ctx context.Context, name string) (bool, error) {
  104. _, err := s.s.Projects.Subscriptions.Get(name).Context(ctx).Do()
  105. if err == nil {
  106. return true, nil
  107. }
  108. if e, ok := err.(*googleapi.Error); ok && e.Code == http.StatusNotFound {
  109. return false, nil
  110. }
  111. return false, err
  112. }
  113. func (s *apiService) createTopic(ctx context.Context, name string) error {
  114. // Note: The raw API expects a Topic body, but ignores it.
  115. _, err := s.s.Projects.Topics.Create(name, &raw.Topic{}).
  116. Context(ctx).
  117. Do()
  118. return err
  119. }
  120. func (s *apiService) listProjectTopics(ctx context.Context, projName string) ([]string, error) {
  121. topics := []string{}
  122. err := s.s.Projects.Topics.List(projName).
  123. Pages(ctx, func(res *raw.ListTopicsResponse) error {
  124. for _, topic := range res.Topics {
  125. topics = append(topics, topic.Name)
  126. }
  127. return nil
  128. })
  129. if err != nil {
  130. return nil, err
  131. }
  132. return topics, nil
  133. }
  134. func (s *apiService) deleteTopic(ctx context.Context, name string) error {
  135. _, err := s.s.Projects.Topics.Delete(name).Context(ctx).Do()
  136. return err
  137. }
  138. func (s *apiService) topicExists(ctx context.Context, name string) (bool, error) {
  139. _, err := s.s.Projects.Topics.Get(name).Context(ctx).Do()
  140. if err == nil {
  141. return true, nil
  142. }
  143. if e, ok := err.(*googleapi.Error); ok && e.Code == http.StatusNotFound {
  144. return false, nil
  145. }
  146. return false, err
  147. }
  148. func (s *apiService) listTopicSubscriptions(ctx context.Context, topicName string) ([]string, error) {
  149. subs := []string{}
  150. err := s.s.Projects.Topics.Subscriptions.List(topicName).
  151. Pages(ctx, func(res *raw.ListTopicSubscriptionsResponse) error {
  152. for _, s := range res.Subscriptions {
  153. subs = append(subs, s)
  154. }
  155. return nil
  156. })
  157. if err != nil {
  158. return nil, err
  159. }
  160. return subs, nil
  161. }
  162. func (s *apiService) modifyAckDeadline(ctx context.Context, subName string, deadline time.Duration, ackIDs []string) error {
  163. req := &raw.ModifyAckDeadlineRequest{
  164. AckDeadlineSeconds: int64(deadline.Seconds()),
  165. AckIds: ackIDs,
  166. }
  167. _, err := s.s.Projects.Subscriptions.ModifyAckDeadline(subName, req).
  168. Context(ctx).
  169. Do()
  170. return err
  171. }
  172. func (s *apiService) acknowledge(ctx context.Context, subName string, ackIDs []string) error {
  173. req := &raw.AcknowledgeRequest{
  174. AckIds: ackIDs,
  175. }
  176. _, err := s.s.Projects.Subscriptions.Acknowledge(subName, req).
  177. Context(ctx).
  178. Do()
  179. return err
  180. }
  181. func (s *apiService) fetchMessages(ctx context.Context, subName string, maxMessages int64) ([]*Message, error) {
  182. req := &raw.PullRequest{
  183. MaxMessages: maxMessages,
  184. }
  185. resp, err := s.s.Projects.Subscriptions.Pull(subName, req).
  186. Context(ctx).
  187. Do()
  188. if err != nil {
  189. return nil, err
  190. }
  191. msgs := make([]*Message, 0, len(resp.ReceivedMessages))
  192. for i, m := range resp.ReceivedMessages {
  193. msg, err := toMessage(m)
  194. if err != nil {
  195. return nil, fmt.Errorf("pubsub: cannot decode the retrieved message at index: %d, message: %+v", i, m)
  196. }
  197. msgs = append(msgs, msg)
  198. }
  199. return msgs, nil
  200. }
  201. func (s *apiService) publishMessages(ctx context.Context, topicName string, msgs []*Message) ([]string, error) {
  202. rawMsgs := make([]*raw.PubsubMessage, len(msgs))
  203. for i, msg := range msgs {
  204. rawMsgs[i] = &raw.PubsubMessage{
  205. Data: base64.StdEncoding.EncodeToString(msg.Data),
  206. Attributes: msg.Attributes,
  207. }
  208. }
  209. req := &raw.PublishRequest{Messages: rawMsgs}
  210. resp, err := s.s.Projects.Topics.Publish(topicName, req).
  211. Context(ctx).
  212. Do()
  213. if err != nil {
  214. return nil, err
  215. }
  216. return resp.MessageIds, nil
  217. }