123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138 |
- // Copyright 2016 Google Inc. All Rights Reserved.
- //
- // Licensed under the Apache License, Version 2.0 (the "License");
- // you may not use this file except in compliance with the License.
- // You may obtain a copy of the License at
- //
- // http://www.apache.org/licenses/LICENSE-2.0
- //
- // Unless required by applicable law or agreed to in writing, software
- // distributed under the License is distributed on an "AS IS" BASIS,
- // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- // See the License for the specific language governing permissions and
- // limitations under the License.
- package pubsub
- import (
- "fmt"
- "time"
- "golang.org/x/net/context"
- )
- const MaxPublishBatchSize = 1000
- // TopicHandle is a reference to a PubSub topic.
- type TopicHandle struct {
- c *Client
- // The fully qualified identifier for the topic, in the format "projects/<projid>/topics/<name>"
- name string
- }
- // NewTopic creates a new topic.
- // The specified topic name must start with a letter, and contain only letters
- // ([A-Za-z]), numbers ([0-9]), dashes (-), underscores (_), periods (.),
- // tildes (~), plus (+) or percent signs (%). It must be between 3 and 255
- // characters in length, and must not start with "goog".
- // If the topic already exists an error will be returned.
- func (c *Client) NewTopic(ctx context.Context, name string) (*TopicHandle, error) {
- t := c.Topic(name)
- err := c.s.createTopic(ctx, t.Name())
- return t, err
- }
- // Topic creates a reference to a topic.
- func (c *Client) Topic(name string) *TopicHandle {
- return &TopicHandle{c: c, name: fmt.Sprintf("projects/%s/topics/%s", c.projectID, name)}
- }
- // Topics lists all of the topics for the client's project.
- func (c *Client) Topics(ctx context.Context) ([]*TopicHandle, error) {
- topicNames, err := c.s.listProjectTopics(ctx, c.fullyQualifiedProjectName())
- if err != nil {
- return nil, err
- }
- topics := []*TopicHandle{}
- for _, t := range topicNames {
- topics = append(topics, &TopicHandle{c: c, name: t})
- }
- return topics, nil
- }
- // Name returns the globally unique name for the topic.
- func (t *TopicHandle) Name() string {
- return t.name
- }
- // Delete deletes the topic.
- func (t *TopicHandle) Delete(ctx context.Context) error {
- return t.c.s.deleteTopic(ctx, t.name)
- }
- // Exists reports whether the topic exists on the server.
- func (t *TopicHandle) Exists(ctx context.Context) (bool, error) {
- if t.name == "_deleted-topic_" {
- return false, nil
- }
- return t.c.s.topicExists(ctx, t.name)
- }
- // Subscriptions lists the subscriptions for this topic.
- func (t *TopicHandle) Subscriptions(ctx context.Context) ([]*SubscriptionHandle, error) {
- subNames, err := t.c.s.listTopicSubscriptions(ctx, t.name)
- if err != nil {
- return nil, err
- }
- subs := []*SubscriptionHandle{}
- for _, s := range subNames {
- subs = append(subs, &SubscriptionHandle{c: t.c, name: s})
- }
- return subs, nil
- }
- // Subscribe creates a new subscription to the topic.
- // The specified subscription name must start with a letter, and contain only
- // letters ([A-Za-z]), numbers ([0-9]), dashes (-), underscores (_), periods
- // (.), tildes (~), plus (+) or percent signs (%). It must be between 3 and 255
- // characters in length, and must not start with "goog".
- //
- // ackDeadline is the maximum time after a subscriber receives a message before
- // the subscriber should acknowledge the message. It must be between 10 and 600
- // seconds (inclusive), and is rounded down to the nearest second. If the
- // provided ackDeadline is 0, then the default value of 10 seconds is used.
- // Note: messages which are obtained via an Iterator need not be acknowledged
- // within this deadline, as the deadline will be automatically extended.
- //
- // pushConfig may be set to configure this subscription for push delivery.
- //
- // If the subscription already exists an error will be returned.
- func (t *TopicHandle) Subscribe(ctx context.Context, name string, ackDeadline time.Duration, pushConfig *PushConfig) (*SubscriptionHandle, error) {
- if ackDeadline == 0 {
- ackDeadline = 10 * time.Second
- }
- if d := ackDeadline.Seconds(); d < 10 || d > 600 {
- return nil, fmt.Errorf("ack deadline must be between 10 and 600 seconds; got: %v", d)
- }
- sub := t.c.Subscription(name)
- err := t.c.s.createSubscription(ctx, t.name, sub.Name(), ackDeadline, pushConfig)
- return sub, err
- }
- // Publish publishes the supplied Messages to the topic.
- // If successful, the server-assigned message IDs are returned in the same order as the supplied Messages.
- // At most MaxPublishBatchSize messages may be supplied.
- func (t *TopicHandle) Publish(ctx context.Context, msgs ...*Message) ([]string, error) {
- if len(msgs) == 0 {
- return nil, nil
- }
- if len(msgs) > MaxPublishBatchSize {
- return nil, fmt.Errorf("pubsub: got %d messages, but maximum batch size is %d", len(msgs), MaxPublishBatchSize)
- }
- return t.c.s.publishMessages(ctx, t.name, msgs)
- }
|