subscription.go 3.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117
  1. // Copyright 2016 Google Inc. All Rights Reserved.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package pubsub
  15. import (
  16. "fmt"
  17. "time"
  18. "golang.org/x/net/context"
  19. )
  20. // SubscriptionHandle is a reference to a PubSub subscription.
  21. type SubscriptionHandle struct {
  22. c *Client
  23. // The fully qualified identifier for the subscription, in the format "projects/<projid>/subscriptions/<name>"
  24. name string
  25. }
  26. // Subscription creates a reference to a subscription.
  27. func (c *Client) Subscription(name string) *SubscriptionHandle {
  28. return &SubscriptionHandle{
  29. c: c,
  30. name: fmt.Sprintf("projects/%s/subscriptions/%s", c.projectID, name),
  31. }
  32. }
  33. // Name returns the globally unique name for the subscription.
  34. func (s *SubscriptionHandle) Name() string {
  35. return s.name
  36. }
  37. // Subscriptions lists all of the subscriptions for the client's project.
  38. func (c *Client) Subscriptions(ctx context.Context) ([]*SubscriptionHandle, error) {
  39. subNames, err := c.s.listProjectSubscriptions(ctx, c.fullyQualifiedProjectName())
  40. if err != nil {
  41. return nil, err
  42. }
  43. subs := []*SubscriptionHandle{}
  44. for _, s := range subNames {
  45. subs = append(subs, &SubscriptionHandle{c: c, name: s})
  46. }
  47. return subs, nil
  48. }
  49. // PushConfig contains configuration for subscriptions that operate in push mode.
  50. type PushConfig struct {
  51. // A URL locating the endpoint to which messages should be pushed.
  52. Endpoint string
  53. // Endpoint configuration attributes. See https://cloud.google.com/pubsub/reference/rest/v1/projects.subscriptions#PushConfig.FIELDS.attributes for more details.
  54. Attributes map[string]string
  55. }
  56. // Subscription config contains the configuration of a subscription.
  57. type SubscriptionConfig struct {
  58. Topic *TopicHandle
  59. PushConfig PushConfig
  60. // The default maximum time after a subscriber receives a message
  61. // before the subscriber should acknowledge the message. Note:
  62. // messages which are obtained via an Iterator need not be acknowledged
  63. // within this deadline, as the deadline will be automatically
  64. // extended.
  65. AckDeadline time.Duration
  66. }
  67. // Delete deletes the subscription.
  68. func (s *SubscriptionHandle) Delete(ctx context.Context) error {
  69. return s.c.s.deleteSubscription(ctx, s.name)
  70. }
  71. // Exists reports whether the subscription exists on the server.
  72. func (s *SubscriptionHandle) Exists(ctx context.Context) (bool, error) {
  73. return s.c.s.subscriptionExists(ctx, s.name)
  74. }
  75. // Config fetches the current configuration for the subscription.
  76. func (s *SubscriptionHandle) Config(ctx context.Context) (*SubscriptionConfig, error) {
  77. sub, topicName, err := s.c.s.getSubscriptionConfig(ctx, s.name)
  78. if err != nil {
  79. return nil, err
  80. }
  81. sub.Topic = &TopicHandle{
  82. c: s.c,
  83. name: topicName,
  84. }
  85. return sub, nil
  86. }
  87. // Pull returns an Iterator that can be used to fetch Messages.
  88. // The Iterator will automatically extend the ack deadline of all fetched
  89. // Messages, for the period specified maxExtension. Automatic deadline
  90. // extension may be disabled by specifying a maxExtension of 0.
  91. //
  92. // The caller must call Close on the Iterator once finished with it.
  93. func (s *SubscriptionHandle) Pull(ctx context.Context, maxExtension time.Duration) (*Iterator, error) {
  94. // TODO(mcgreevy): accept pulloptions.
  95. config, err := s.Config(ctx)
  96. if err != nil {
  97. return nil, err
  98. }
  99. return s.c.newIterator(ctx, s.name, config.AckDeadline, maxExtension), nil
  100. }