legacy.go 7.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231
  1. // Copyright 2016 Google Inc. All Rights Reserved.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package pubsub
  15. import (
  16. "encoding/base64"
  17. "errors"
  18. "fmt"
  19. "net/http"
  20. "time"
  21. "golang.org/x/net/context"
  22. "google.golang.org/api/googleapi"
  23. raw "google.golang.org/api/pubsub/v1"
  24. "google.golang.org/cloud/internal"
  25. )
  26. // batchLimit is maximun size of a single batch.
  27. const batchLimit = 1000
  28. // CreateTopic creates a new topic with the specified name on the backend.
  29. //
  30. // Deprecated: Use Client.NewTopic instead.
  31. //
  32. // It will return an error if topic already exists.
  33. func CreateTopic(ctx context.Context, name string) error {
  34. _, err := rawService(ctx).Projects.Topics.Create(fullTopicName(internal.ProjID(ctx), name), &raw.Topic{}).Do()
  35. return err
  36. }
  37. // DeleteTopic deletes the specified topic.
  38. //
  39. // Deprecated: Use TopicHandle.Delete instead.
  40. func DeleteTopic(ctx context.Context, name string) error {
  41. _, err := rawService(ctx).Projects.Topics.Delete(fullTopicName(internal.ProjID(ctx), name)).Do()
  42. return err
  43. }
  44. // TopicExists returns true if a topic exists with the specified name.
  45. //
  46. // Deprecated: Use TopicHandle.Exists instead.
  47. func TopicExists(ctx context.Context, name string) (bool, error) {
  48. _, err := rawService(ctx).Projects.Topics.Get(fullTopicName(internal.ProjID(ctx), name)).Do()
  49. if e, ok := err.(*googleapi.Error); ok && e.Code == http.StatusNotFound {
  50. return false, nil
  51. }
  52. if err != nil {
  53. return false, err
  54. }
  55. return true, nil
  56. }
  57. // DeleteSub deletes the subscription.
  58. //
  59. // Deprecated: Use SubscriptionHandle.Delete instead.
  60. func DeleteSub(ctx context.Context, name string) error {
  61. _, err := rawService(ctx).Projects.Subscriptions.Delete(fullSubName(internal.ProjID(ctx), name)).Do()
  62. return err
  63. }
  64. // SubExists returns true if subscription exists.
  65. //
  66. // Deprecated: Use SubscriptionHandle.Exists instead.
  67. func SubExists(ctx context.Context, name string) (bool, error) {
  68. _, err := rawService(ctx).Projects.Subscriptions.Get(fullSubName(internal.ProjID(ctx), name)).Do()
  69. if e, ok := err.(*googleapi.Error); ok && e.Code == http.StatusNotFound {
  70. return false, nil
  71. }
  72. if err != nil {
  73. return false, err
  74. }
  75. return true, nil
  76. }
  77. // CreateSub creates a Pub/Sub subscription on the backend.
  78. //
  79. // Deprecated: Use TopicHandle.Subscribe instead.
  80. //
  81. // A subscription should subscribe to an existing topic.
  82. //
  83. // The messages that haven't acknowledged will be pushed back to the
  84. // subscription again when the default acknowledgement deadline is
  85. // reached. You can override the default deadline by providing a
  86. // non-zero deadline. Deadline must not be specified to
  87. // precision greater than one second.
  88. //
  89. // As new messages are being queued on the subscription, you
  90. // may recieve push notifications regarding to the new arrivals.
  91. // To receive notifications of new messages in the queue,
  92. // specify an endpoint callback URL.
  93. // If endpoint is an empty string the backend will not notify the
  94. // client of new messages.
  95. //
  96. // If the subscription already exists an error will be returned.
  97. func CreateSub(ctx context.Context, name string, topic string, deadline time.Duration, endpoint string) error {
  98. sub := &raw.Subscription{
  99. Topic: fullTopicName(internal.ProjID(ctx), topic),
  100. }
  101. if int64(deadline) > 0 {
  102. if !isSec(deadline) {
  103. return errors.New("pubsub: deadline must not be specified to precision greater than one second")
  104. }
  105. sub.AckDeadlineSeconds = int64(deadline / time.Second)
  106. }
  107. if endpoint != "" {
  108. sub.PushConfig = &raw.PushConfig{PushEndpoint: endpoint}
  109. }
  110. _, err := rawService(ctx).Projects.Subscriptions.Create(fullSubName(internal.ProjID(ctx), name), sub).Do()
  111. return err
  112. }
  113. // Pull pulls up to n messages from the subscription. n must not be larger than 100.
  114. //
  115. // Deprecated: Use Subscription.Pull instead
  116. func Pull(ctx context.Context, sub string, n int) ([]*Message, error) {
  117. return pull(ctx, sub, n, true)
  118. }
  119. // PullWait pulls up to n messages from the subscription. If there are no
  120. // messages in the queue, it will wait until at least one message is
  121. // available or a timeout occurs. n must not be larger than 100.
  122. //
  123. // Deprecated: Use Subscription.Pull instead
  124. func PullWait(ctx context.Context, sub string, n int) ([]*Message, error) {
  125. return pull(ctx, sub, n, false)
  126. }
  127. func pull(ctx context.Context, sub string, n int, retImmediately bool) ([]*Message, error) {
  128. if n < 1 || n > batchLimit {
  129. return nil, fmt.Errorf("pubsub: cannot pull less than one, more than %d messages, but %d was given", batchLimit, n)
  130. }
  131. resp, err := rawService(ctx).Projects.Subscriptions.Pull(fullSubName(internal.ProjID(ctx), sub), &raw.PullRequest{
  132. ReturnImmediately: retImmediately,
  133. MaxMessages: int64(n),
  134. }).Do()
  135. if err != nil {
  136. return nil, err
  137. }
  138. msgs := make([]*Message, len(resp.ReceivedMessages))
  139. for i := 0; i < len(resp.ReceivedMessages); i++ {
  140. msg, err := toMessage(resp.ReceivedMessages[i])
  141. if err != nil {
  142. return nil, fmt.Errorf("pubsub: cannot decode the retrieved message at index: %d, PullResponse: %+v", i, resp.ReceivedMessages[i])
  143. }
  144. msgs[i] = msg
  145. }
  146. return msgs, nil
  147. }
  148. // ModifyAckDeadline modifies the acknowledgement deadline
  149. // for the messages retrieved from the specified subscription.
  150. // Deadline must not be specified to precision greater than one second.
  151. //
  152. // Deprecated: Use Subscription.Pull instead, which automatically extends ack deadlines.
  153. func ModifyAckDeadline(ctx context.Context, sub string, id string, deadline time.Duration) error {
  154. if !isSec(deadline) {
  155. return errors.New("pubsub: deadline must not be specified to precision greater than one second")
  156. }
  157. _, err := rawService(ctx).Projects.Subscriptions.ModifyAckDeadline(fullSubName(internal.ProjID(ctx), sub), &raw.ModifyAckDeadlineRequest{
  158. AckDeadlineSeconds: int64(deadline / time.Second),
  159. AckIds: []string{id},
  160. }).Do()
  161. return err
  162. }
  163. // Ack acknowledges one or more Pub/Sub messages on the
  164. // specified subscription.
  165. //
  166. // Deprecated: Call Message.Done on a Message returned by Iterator.Next instead.
  167. func Ack(ctx context.Context, sub string, id ...string) error {
  168. for idx, ackID := range id {
  169. if ackID == "" {
  170. return fmt.Errorf("pubsub: empty ackID detected at index %d", idx)
  171. }
  172. }
  173. _, err := rawService(ctx).Projects.Subscriptions.Acknowledge(fullSubName(internal.ProjID(ctx), sub), &raw.AcknowledgeRequest{
  174. AckIds: id,
  175. }).Do()
  176. return err
  177. }
  178. func isSec(dur time.Duration) bool {
  179. return dur%time.Second == 0
  180. }
  181. // Publish publishes messages to the topic's subscribers. It returns
  182. // message IDs upon success.
  183. //
  184. // Deprecated: Use TopicHandle.Publish instead.
  185. func Publish(ctx context.Context, topic string, msgs ...*Message) ([]string, error) {
  186. var rawMsgs []*raw.PubsubMessage
  187. if len(msgs) == 0 {
  188. return nil, errors.New("pubsub: no messages to publish")
  189. }
  190. if len(msgs) > batchLimit {
  191. return nil, fmt.Errorf("pubsub: %d messages given, but maximum batch size is %d", len(msgs), batchLimit)
  192. }
  193. rawMsgs = make([]*raw.PubsubMessage, len(msgs))
  194. for i, msg := range msgs {
  195. rawMsgs[i] = &raw.PubsubMessage{
  196. Data: base64.StdEncoding.EncodeToString(msg.Data),
  197. Attributes: msg.Attributes,
  198. }
  199. }
  200. resp, err := rawService(ctx).Projects.Topics.Publish(fullTopicName(internal.ProjID(ctx), topic), &raw.PublishRequest{
  201. Messages: rawMsgs,
  202. }).Do()
  203. if err != nil {
  204. return nil, err
  205. }
  206. return resp.MessageIds, nil
  207. }
  208. // fullTopicName returns the fully qualified name for a topic.
  209. // E.g. /topics/project-id/topic-name.
  210. func fullTopicName(proj, name string) string {
  211. // TODO(mcgreevy): remove this in favour of Topic.fullyQualifiedName.
  212. return fmt.Sprintf("projects/%s/topics/%s", proj, name)
  213. }