integration_test.go 4.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162
  1. // Copyright 2014 Google Inc. All Rights Reserved.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package pubsub
  15. import (
  16. "fmt"
  17. "testing"
  18. "time"
  19. "google.golang.org/cloud"
  20. "google.golang.org/cloud/internal/testutil"
  21. )
  22. func TestAll(t *testing.T) {
  23. if testing.Short() {
  24. t.Skip("Integration tests skipped in short mode")
  25. }
  26. // TODO(djd): Replace this ctx with context.Background() when the new API is complete.
  27. ctx := testutil.Context(ScopePubSub, ScopeCloudPlatform)
  28. if ctx == nil {
  29. t.Skip("Integration tests skipped. See CONTRIBUTING.md for details")
  30. }
  31. ts := testutil.TokenSource(ctx, ScopePubSub, ScopeCloudPlatform)
  32. if ts == nil {
  33. t.Skip("Integration tests skipped. See CONTRIBUTING.md for details")
  34. }
  35. now := time.Now()
  36. topicName := fmt.Sprintf("topic-%d", now.Unix())
  37. subName := fmt.Sprintf("subscription-%d", now.Unix())
  38. client, err := NewClient(ctx, testutil.ProjID(), cloud.WithTokenSource(ts))
  39. if err != nil {
  40. t.Fatalf("Creating client error: %v", err)
  41. }
  42. var topic *TopicHandle
  43. if topic, err = client.NewTopic(ctx, topicName); err != nil {
  44. t.Errorf("CreateTopic error: %v", err)
  45. }
  46. var sub *SubscriptionHandle
  47. if sub, err = topic.Subscribe(ctx, subName, 0, nil); err != nil {
  48. t.Errorf("CreateSub error: %v", err)
  49. }
  50. exists, err := topic.Exists(ctx)
  51. if err != nil {
  52. t.Fatalf("TopicExists error: %v", err)
  53. }
  54. if !exists {
  55. t.Errorf("topic %s should exist, but it doesn't", topic)
  56. }
  57. exists, err = sub.Exists(ctx)
  58. if err != nil {
  59. t.Fatalf("SubExists error: %v", err)
  60. }
  61. if !exists {
  62. t.Errorf("subscription %s should exist, but it doesn't", subName)
  63. }
  64. max := 10
  65. msgs := make([]*Message, max)
  66. expectedMsgs := make(map[string]bool, max)
  67. for i := 0; i < max; i++ {
  68. text := fmt.Sprintf("a message with an index %d", i)
  69. attrs := make(map[string]string)
  70. attrs["foo"] = "bar"
  71. msgs[i] = &Message{
  72. Data: []byte(text),
  73. Attributes: attrs,
  74. }
  75. expectedMsgs[text] = false
  76. }
  77. ids, err := Publish(ctx, topicName, msgs...)
  78. if err != nil {
  79. t.Fatalf("Publish (1) error: %v", err)
  80. }
  81. if len(ids) != max {
  82. t.Errorf("unexpected number of message IDs received; %d, want %d", len(ids), max)
  83. }
  84. expectedIDs := make(map[string]bool, max)
  85. for _, id := range ids {
  86. expectedIDs[id] = false
  87. }
  88. received, err := PullWait(ctx, subName, max)
  89. if err != nil {
  90. t.Fatalf("PullWait error: %v", err)
  91. }
  92. if len(received) != max {
  93. t.Errorf("unexpected number of messages received; %d, want %d", len(received), max)
  94. }
  95. for _, msg := range received {
  96. expectedMsgs[string(msg.Data)] = true
  97. expectedIDs[msg.ID] = true
  98. if msg.Attributes["foo"] != "bar" {
  99. t.Errorf("message attribute foo is expected to be 'bar', found '%s'", msg.Attributes["foo"])
  100. }
  101. }
  102. for msg, found := range expectedMsgs {
  103. if !found {
  104. t.Errorf("message '%s' should be received", msg)
  105. }
  106. }
  107. for id, found := range expectedIDs {
  108. if !found {
  109. t.Errorf("message with the message id '%s' should be received", id)
  110. }
  111. }
  112. // base64 test
  113. data := "=@~"
  114. msg := &Message{
  115. Data: []byte(data),
  116. }
  117. _, err = Publish(ctx, topicName, msg)
  118. if err != nil {
  119. t.Fatalf("Publish (2) error: %v", err)
  120. }
  121. received, err = PullWait(ctx, subName, 1)
  122. if err != nil {
  123. t.Fatalf("PullWait error: %v", err)
  124. }
  125. if len(received) != 1 {
  126. t.Fatalf("unexpected number of messages received; %d, want %d", len(received), 1)
  127. }
  128. if string(received[0].Data) != data {
  129. t.Errorf("unexpexted message received; %s, want %s", string(received[0].Data), data)
  130. }
  131. err = sub.Delete(ctx)
  132. if err != nil {
  133. t.Errorf("DeleteSub error: %v", err)
  134. }
  135. err = topic.Delete(ctx)
  136. if err != nil {
  137. t.Errorf("DeleteTopic error: %v", err)
  138. }
  139. }