pubsub.go 9.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319
  1. package main
  2. import (
  3. "bufio"
  4. "encoding/base64"
  5. "encoding/json"
  6. "fmt"
  7. "log"
  8. "net"
  9. "net/http"
  10. "net/textproto"
  11. "os"
  12. "strings"
  13. pubsub "google.golang.org/api/pubsub/v1beta2"
  14. )
  15. const USAGE = `Available arguments are:
  16. <project_id> list_topics
  17. <project_id> create_topic <topic>
  18. <project_id> delete_topic <topic>
  19. <project_id> list_subscriptions
  20. <project_id> create_subscription <subscription> <linked topic>
  21. <project_id> delete_subscription <subscription>
  22. <project_id> connect_irc <topic> <server> <channel>
  23. <project_id> pull_messages <subscription>
  24. `
  25. type IRCBot struct {
  26. server string
  27. port string
  28. nick string
  29. user string
  30. channel string
  31. conn net.Conn
  32. tpReader *textproto.Reader
  33. }
  34. func NewIRCBot(server, channel, nick string) *IRCBot {
  35. return &IRCBot{
  36. server: server,
  37. port: "6667",
  38. nick: nick,
  39. channel: channel,
  40. conn: nil,
  41. user: nick,
  42. }
  43. }
  44. func (bot *IRCBot) Connect() {
  45. conn, err := net.Dial("tcp", bot.server+":"+bot.port)
  46. if err != nil {
  47. log.Fatal("unable to connect to IRC server ", err)
  48. }
  49. bot.conn = conn
  50. log.Printf("Connected to IRC server %s (%s)\n",
  51. bot.server, bot.conn.RemoteAddr())
  52. bot.tpReader = textproto.NewReader(bufio.NewReader(bot.conn))
  53. bot.Sendf("USER %s 8 * :%s\r\n", bot.nick, bot.nick)
  54. bot.Sendf("NICK %s\r\n", bot.nick)
  55. bot.Sendf("JOIN %s\r\n", bot.channel)
  56. }
  57. func (bot *IRCBot) CheckConnection() {
  58. for {
  59. line, err := bot.ReadLine()
  60. if err != nil {
  61. log.Fatal("Unable to read a line during checking the connection.")
  62. }
  63. if parts := strings.Split(line, " "); len(parts) > 1 {
  64. if parts[1] == "004" {
  65. log.Println("The nick accepted.")
  66. } else if parts[1] == "433" {
  67. log.Fatalf("The nick is already in use: %s", line)
  68. } else if parts[1] == "366" {
  69. log.Println("Starting to publish messages.")
  70. return
  71. }
  72. }
  73. }
  74. }
  75. func (bot *IRCBot) Sendf(format string, args ...interface{}) {
  76. fmt.Fprintf(bot.conn, format, args...)
  77. }
  78. func (bot *IRCBot) Close() {
  79. bot.conn.Close()
  80. }
  81. func (bot *IRCBot) ReadLine() (line string, err error) {
  82. return bot.tpReader.ReadLine()
  83. }
  84. func init() {
  85. registerDemo("pubsub", pubsub.PubsubScope, pubsubMain)
  86. }
  87. func pubsubUsage() {
  88. fmt.Fprint(os.Stderr, USAGE)
  89. }
  90. // Returns a fully qualified resource name for Cloud Pub/Sub.
  91. func fqrn(res, proj, name string) string {
  92. return fmt.Sprintf("projects/%s/%s/%s", proj, res, name)
  93. }
  94. func fullTopicName(proj, topic string) string {
  95. return fqrn("topics", proj, topic)
  96. }
  97. func fullSubName(proj, topic string) string {
  98. return fqrn("subscriptions", proj, topic)
  99. }
  100. // Check the length of the arguments.
  101. func checkArgs(argv []string, min int) {
  102. if len(argv) < min {
  103. pubsubUsage()
  104. os.Exit(2)
  105. }
  106. }
  107. func listTopics(service *pubsub.Service, argv []string) {
  108. next := ""
  109. for {
  110. topicsList, err := service.Projects.Topics.List(fmt.Sprintf("projects/%s", argv[0])).PageToken(next).Do()
  111. if err != nil {
  112. log.Fatalf("listTopics query.Do() failed: %v", err)
  113. }
  114. for _, topic := range topicsList.Topics {
  115. fmt.Println(topic.Name)
  116. }
  117. next = topicsList.NextPageToken
  118. if next == "" {
  119. break
  120. }
  121. }
  122. }
  123. func createTopic(service *pubsub.Service, argv []string) {
  124. checkArgs(argv, 3)
  125. topic, err := service.Projects.Topics.Create(fullTopicName(argv[0], argv[2]), &pubsub.Topic{}).Do()
  126. if err != nil {
  127. log.Fatalf("createTopic Create().Do() failed: %v", err)
  128. }
  129. fmt.Printf("Topic %s was created.\n", topic.Name)
  130. }
  131. func deleteTopic(service *pubsub.Service, argv []string) {
  132. checkArgs(argv, 3)
  133. topicName := fullTopicName(argv[0], argv[2])
  134. if _, err := service.Projects.Topics.Delete(topicName).Do(); err != nil {
  135. log.Fatalf("deleteTopic Delete().Do() failed: %v", err)
  136. }
  137. fmt.Printf("Topic %s was deleted.\n", topicName)
  138. }
  139. func listSubscriptions(service *pubsub.Service, argv []string) {
  140. next := ""
  141. for {
  142. subscriptionsList, err := service.Projects.Subscriptions.List(fmt.Sprintf("projects/%s", argv[0])).PageToken(next).Do()
  143. if err != nil {
  144. log.Fatalf("listSubscriptions query.Do() failed: %v", err)
  145. }
  146. for _, subscription := range subscriptionsList.Subscriptions {
  147. sub_text, _ := json.MarshalIndent(subscription, "", " ")
  148. fmt.Printf("%s\n", sub_text)
  149. }
  150. next = subscriptionsList.NextPageToken
  151. if next == "" {
  152. break
  153. }
  154. }
  155. }
  156. func createSubscription(service *pubsub.Service, argv []string) {
  157. checkArgs(argv, 4)
  158. name := fullSubName(argv[0], argv[2])
  159. sub := &pubsub.Subscription{Topic: fullTopicName(argv[0], argv[3])}
  160. subscription, err := service.Projects.Subscriptions.Create(name, sub).Do()
  161. if err != nil {
  162. log.Fatalf("createSubscription Create().Do() failed: %v", err)
  163. }
  164. fmt.Printf("Subscription %s was created.\n", subscription.Name)
  165. }
  166. func deleteSubscription(service *pubsub.Service, argv []string) {
  167. checkArgs(argv, 3)
  168. name := fullSubName(argv[0], argv[2])
  169. if _, err := service.Projects.Subscriptions.Delete(name).Do(); err != nil {
  170. log.Fatalf("deleteSubscription Delete().Do() failed: %v", err)
  171. }
  172. fmt.Printf("Subscription %s was deleted.\n", name)
  173. }
  174. func connectIRC(service *pubsub.Service, argv []string) {
  175. checkArgs(argv, 5)
  176. topicName := fullTopicName(argv[0], argv[2])
  177. server := argv[3]
  178. channel := argv[4]
  179. nick := fmt.Sprintf("bot-%s", argv[2])
  180. ircbot := NewIRCBot(server, channel, nick)
  181. ircbot.Connect()
  182. defer ircbot.Close()
  183. ircbot.CheckConnection()
  184. privMark := fmt.Sprintf("PRIVMSG %s :", ircbot.channel)
  185. for {
  186. line, err := ircbot.ReadLine()
  187. if err != nil {
  188. log.Fatal("Unable to read a line from the connection.")
  189. }
  190. parts := strings.Split(line, " ")
  191. if len(parts) > 0 && parts[0] == "PING" {
  192. ircbot.Sendf("PONG %s\r\n", parts[1])
  193. } else {
  194. pos := strings.Index(line, privMark)
  195. if pos == -1 {
  196. continue
  197. }
  198. privMsg := line[pos+len(privMark) : len(line)]
  199. pubsubMessage := &pubsub.PubsubMessage{
  200. Data: base64.StdEncoding.EncodeToString([]byte(privMsg)),
  201. }
  202. publishRequest := &pubsub.PublishRequest{
  203. Messages: []*pubsub.PubsubMessage{pubsubMessage},
  204. }
  205. if _, err := service.Projects.Topics.Publish(topicName, publishRequest).Do(); err != nil {
  206. log.Fatalf("connectIRC Publish().Do() failed: %v", err)
  207. }
  208. log.Println("Published a message to the topic.")
  209. }
  210. }
  211. }
  212. func pullMessages(service *pubsub.Service, argv []string) {
  213. checkArgs(argv, 3)
  214. subName := fullSubName(argv[0], argv[2])
  215. pullRequest := &pubsub.PullRequest{
  216. ReturnImmediately: false,
  217. MaxMessages: 1,
  218. }
  219. for {
  220. pullResponse, err := service.Projects.Subscriptions.Pull(subName, pullRequest).Do()
  221. if err != nil {
  222. log.Fatalf("pullMessages Pull().Do() failed: %v", err)
  223. }
  224. for _, receivedMessage := range pullResponse.ReceivedMessages {
  225. data, err := base64.StdEncoding.DecodeString(receivedMessage.Message.Data)
  226. if err != nil {
  227. log.Fatalf("pullMessages DecodeString() failed: %v", err)
  228. }
  229. fmt.Printf("%s\n", data)
  230. ackRequest := &pubsub.AcknowledgeRequest{
  231. AckIds: []string{receivedMessage.AckId},
  232. }
  233. if _, err = service.Projects.Subscriptions.Acknowledge(subName, ackRequest).Do(); err != nil {
  234. log.Printf("pullMessages Acknowledge().Do() failed: %v", err)
  235. }
  236. }
  237. }
  238. }
  239. // This example demonstrates calling the Cloud Pub/Sub API. As of 20
  240. // Aug 2014, the Cloud Pub/Sub API is only available if you're
  241. // whitelisted. If you're interested in using it, please apply for the
  242. // Limited Preview program at the following form:
  243. // http://goo.gl/Wql9HL
  244. //
  245. // Also, before running this example, be sure to enable Cloud Pub/Sub
  246. // service on your project in Developer Console at:
  247. // https://console.developers.google.com/
  248. //
  249. // It has 8 subcommands as follows:
  250. //
  251. // <project_id> list_topics
  252. // <project_id> create_topic <topic>
  253. // <project_id> delete_topic <topic>
  254. // <project_id> list_subscriptions
  255. // <project_id> create_subscription <subscription> <linked topic>
  256. // <project_id> delete_subscription <subscription>
  257. // <project_id> connect_irc <topic> <server> <channel>
  258. // <project_id> pull_messages <subscription>
  259. //
  260. // You can use either of your alphanumerical or numerial Cloud Project
  261. // ID for project_id. You can choose any names for topic and
  262. // subscription as long as they follow the naming rule described at:
  263. // https://developers.google.com/pubsub/overview#names
  264. //
  265. // You can list/create/delete topics/subscriptions by self-explanatory
  266. // subcommands, as well as connect to an IRC channel and publish
  267. // messages from the IRC channel to a specified Cloud Pub/Sub topic by
  268. // the "connect_irc" subcommand, or continuously pull messages from a
  269. // specified Cloud Pub/Sub subscription and display the data by the
  270. // "pull_messages" subcommand.
  271. func pubsubMain(client *http.Client, argv []string) {
  272. checkArgs(argv, 2)
  273. service, err := pubsub.New(client)
  274. if err != nil {
  275. log.Fatalf("Unable to create PubSub service: %v", err)
  276. }
  277. m := map[string]func(service *pubsub.Service, argv []string){
  278. "list_topics": listTopics,
  279. "create_topic": createTopic,
  280. "delete_topic": deleteTopic,
  281. "list_subscriptions": listSubscriptions,
  282. "create_subscription": createSubscription,
  283. "delete_subscription": deleteSubscription,
  284. "connect_irc": connectIRC,
  285. "pull_messages": pullMessages,
  286. }
  287. f, ok := m[argv[1]]
  288. if !ok {
  289. pubsubUsage()
  290. os.Exit(2)
  291. }
  292. f(service, argv)
  293. }