taskqueue.go 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496
  1. // Copyright 2011 Google Inc. All rights reserved.
  2. // Use of this source code is governed by the Apache 2.0
  3. // license that can be found in the LICENSE file.
  4. /*
  5. Package taskqueue provides a client for App Engine's taskqueue service.
  6. Using this service, applications may perform work outside a user's request.
  7. A Task may be constructed manually; alternatively, since the most common
  8. taskqueue operation is to add a single POST task, NewPOSTTask makes it easy.
  9. t := taskqueue.NewPOSTTask("/worker", url.Values{
  10. "key": {key},
  11. })
  12. taskqueue.Add(c, t, "") // add t to the default queue
  13. */
  14. package taskqueue // import "google.golang.org/appengine/taskqueue"
  15. import (
  16. "errors"
  17. "fmt"
  18. "net/http"
  19. "net/url"
  20. "time"
  21. "github.com/golang/protobuf/proto"
  22. "golang.org/x/net/context"
  23. "google.golang.org/appengine"
  24. "google.golang.org/appengine/internal"
  25. dspb "google.golang.org/appengine/internal/datastore"
  26. pb "google.golang.org/appengine/internal/taskqueue"
  27. )
  28. var (
  29. // ErrTaskAlreadyAdded is the error returned by Add and AddMulti when a task has already been added with a particular name.
  30. ErrTaskAlreadyAdded = errors.New("taskqueue: task has already been added")
  31. )
  32. // RetryOptions let you control whether to retry a task and the backoff intervals between tries.
  33. type RetryOptions struct {
  34. // Number of tries/leases after which the task fails permanently and is deleted.
  35. // If AgeLimit is also set, both limits must be exceeded for the task to fail permanently.
  36. RetryLimit int32
  37. // Maximum time allowed since the task's first try before the task fails permanently and is deleted (only for push tasks).
  38. // If RetryLimit is also set, both limits must be exceeded for the task to fail permanently.
  39. AgeLimit time.Duration
  40. // Minimum time between successive tries (only for push tasks).
  41. MinBackoff time.Duration
  42. // Maximum time between successive tries (only for push tasks).
  43. MaxBackoff time.Duration
  44. // Maximum number of times to double the interval between successive tries before the intervals increase linearly (only for push tasks).
  45. MaxDoublings int32
  46. // If MaxDoublings is zero, set ApplyZeroMaxDoublings to true to override the default non-zero value.
  47. // Otherwise a zero MaxDoublings is ignored and the default is used.
  48. ApplyZeroMaxDoublings bool
  49. }
  50. // toRetryParameter converts RetryOptions to pb.TaskQueueRetryParameters.
  51. func (opt *RetryOptions) toRetryParameters() *pb.TaskQueueRetryParameters {
  52. params := &pb.TaskQueueRetryParameters{}
  53. if opt.RetryLimit > 0 {
  54. params.RetryLimit = proto.Int32(opt.RetryLimit)
  55. }
  56. if opt.AgeLimit > 0 {
  57. params.AgeLimitSec = proto.Int64(int64(opt.AgeLimit.Seconds()))
  58. }
  59. if opt.MinBackoff > 0 {
  60. params.MinBackoffSec = proto.Float64(opt.MinBackoff.Seconds())
  61. }
  62. if opt.MaxBackoff > 0 {
  63. params.MaxBackoffSec = proto.Float64(opt.MaxBackoff.Seconds())
  64. }
  65. if opt.MaxDoublings > 0 || (opt.MaxDoublings == 0 && opt.ApplyZeroMaxDoublings) {
  66. params.MaxDoublings = proto.Int32(opt.MaxDoublings)
  67. }
  68. return params
  69. }
  70. // A Task represents a task to be executed.
  71. type Task struct {
  72. // Path is the worker URL for the task.
  73. // If unset, it will default to /_ah/queue/<queue_name>.
  74. Path string
  75. // Payload is the data for the task.
  76. // This will be delivered as the HTTP request body.
  77. // It is only used when Method is POST, PUT or PULL.
  78. // url.Values' Encode method may be used to generate this for POST requests.
  79. Payload []byte
  80. // Additional HTTP headers to pass at the task's execution time.
  81. // To schedule the task to be run with an alternate app version
  82. // or backend, set the "Host" header.
  83. Header http.Header
  84. // Method is the HTTP method for the task ("GET", "POST", etc.),
  85. // or "PULL" if this is task is destined for a pull-based queue.
  86. // If empty, this defaults to "POST".
  87. Method string
  88. // A name for the task.
  89. // If empty, a name will be chosen.
  90. Name string
  91. // Delay specifies the duration the task queue service must wait
  92. // before executing the task.
  93. // Either Delay or ETA may be set, but not both.
  94. Delay time.Duration
  95. // ETA specifies the earliest time a task may be executed (push queues)
  96. // or leased (pull queues).
  97. // Either Delay or ETA may be set, but not both.
  98. ETA time.Time
  99. // The number of times the task has been dispatched or leased.
  100. RetryCount int32
  101. // Tag for the task. Only used when Method is PULL.
  102. Tag string
  103. // Retry options for this task. May be nil.
  104. RetryOptions *RetryOptions
  105. }
  106. func (t *Task) method() string {
  107. if t.Method == "" {
  108. return "POST"
  109. }
  110. return t.Method
  111. }
  112. // NewPOSTTask creates a Task that will POST to a path with the given form data.
  113. func NewPOSTTask(path string, params url.Values) *Task {
  114. h := make(http.Header)
  115. h.Set("Content-Type", "application/x-www-form-urlencoded")
  116. return &Task{
  117. Path: path,
  118. Payload: []byte(params.Encode()),
  119. Header: h,
  120. Method: "POST",
  121. }
  122. }
  123. var (
  124. currentNamespace = http.CanonicalHeaderKey("X-AppEngine-Current-Namespace")
  125. defaultNamespace = http.CanonicalHeaderKey("X-AppEngine-Default-Namespace")
  126. )
  127. func getDefaultNamespace(ctx context.Context) string {
  128. return internal.IncomingHeaders(ctx).Get(defaultNamespace)
  129. }
  130. func newAddReq(c context.Context, task *Task, queueName string) (*pb.TaskQueueAddRequest, error) {
  131. if queueName == "" {
  132. queueName = "default"
  133. }
  134. path := task.Path
  135. if path == "" {
  136. path = "/_ah/queue/" + queueName
  137. }
  138. eta := task.ETA
  139. if eta.IsZero() {
  140. eta = time.Now().Add(task.Delay)
  141. } else if task.Delay != 0 {
  142. panic("taskqueue: both Delay and ETA are set")
  143. }
  144. req := &pb.TaskQueueAddRequest{
  145. QueueName: []byte(queueName),
  146. TaskName: []byte(task.Name),
  147. EtaUsec: proto.Int64(eta.UnixNano() / 1e3),
  148. }
  149. method := task.method()
  150. if method == "PULL" {
  151. // Pull-based task
  152. req.Body = task.Payload
  153. req.Mode = pb.TaskQueueMode_PULL.Enum()
  154. if task.Tag != "" {
  155. req.Tag = []byte(task.Tag)
  156. }
  157. } else {
  158. // HTTP-based task
  159. if v, ok := pb.TaskQueueAddRequest_RequestMethod_value[method]; ok {
  160. req.Method = pb.TaskQueueAddRequest_RequestMethod(v).Enum()
  161. } else {
  162. return nil, fmt.Errorf("taskqueue: bad method %q", method)
  163. }
  164. req.Url = []byte(path)
  165. for k, vs := range task.Header {
  166. for _, v := range vs {
  167. req.Header = append(req.Header, &pb.TaskQueueAddRequest_Header{
  168. Key: []byte(k),
  169. Value: []byte(v),
  170. })
  171. }
  172. }
  173. if method == "POST" || method == "PUT" {
  174. req.Body = task.Payload
  175. }
  176. // Namespace headers.
  177. if _, ok := task.Header[currentNamespace]; !ok {
  178. // Fetch the current namespace of this request.
  179. ns := internal.NamespaceFromContext(c)
  180. req.Header = append(req.Header, &pb.TaskQueueAddRequest_Header{
  181. Key: []byte(currentNamespace),
  182. Value: []byte(ns),
  183. })
  184. }
  185. if _, ok := task.Header[defaultNamespace]; !ok {
  186. // Fetch the X-AppEngine-Default-Namespace header of this request.
  187. if ns := getDefaultNamespace(c); ns != "" {
  188. req.Header = append(req.Header, &pb.TaskQueueAddRequest_Header{
  189. Key: []byte(defaultNamespace),
  190. Value: []byte(ns),
  191. })
  192. }
  193. }
  194. }
  195. if task.RetryOptions != nil {
  196. req.RetryParameters = task.RetryOptions.toRetryParameters()
  197. }
  198. return req, nil
  199. }
  200. var alreadyAddedErrors = map[pb.TaskQueueServiceError_ErrorCode]bool{
  201. pb.TaskQueueServiceError_TASK_ALREADY_EXISTS: true,
  202. pb.TaskQueueServiceError_TOMBSTONED_TASK: true,
  203. }
  204. // Add adds the task to a named queue.
  205. // An empty queue name means that the default queue will be used.
  206. // Add returns an equivalent Task with defaults filled in, including setting
  207. // the task's Name field to the chosen name if the original was empty.
  208. func Add(c context.Context, task *Task, queueName string) (*Task, error) {
  209. req, err := newAddReq(c, task, queueName)
  210. if err != nil {
  211. return nil, err
  212. }
  213. res := &pb.TaskQueueAddResponse{}
  214. if err := internal.Call(c, "taskqueue", "Add", req, res); err != nil {
  215. apiErr, ok := err.(*internal.APIError)
  216. if ok && alreadyAddedErrors[pb.TaskQueueServiceError_ErrorCode(apiErr.Code)] {
  217. return nil, ErrTaskAlreadyAdded
  218. }
  219. return nil, err
  220. }
  221. resultTask := *task
  222. resultTask.Method = task.method()
  223. if task.Name == "" {
  224. resultTask.Name = string(res.ChosenTaskName)
  225. }
  226. return &resultTask, nil
  227. }
  228. // AddMulti adds multiple tasks to a named queue.
  229. // An empty queue name means that the default queue will be used.
  230. // AddMulti returns a slice of equivalent tasks with defaults filled in, including setting
  231. // each task's Name field to the chosen name if the original was empty.
  232. // If a given task is badly formed or could not be added, an appengine.MultiError is returned.
  233. func AddMulti(c context.Context, tasks []*Task, queueName string) ([]*Task, error) {
  234. req := &pb.TaskQueueBulkAddRequest{
  235. AddRequest: make([]*pb.TaskQueueAddRequest, len(tasks)),
  236. }
  237. me, any := make(appengine.MultiError, len(tasks)), false
  238. for i, t := range tasks {
  239. req.AddRequest[i], me[i] = newAddReq(c, t, queueName)
  240. any = any || me[i] != nil
  241. }
  242. if any {
  243. return nil, me
  244. }
  245. res := &pb.TaskQueueBulkAddResponse{}
  246. if err := internal.Call(c, "taskqueue", "BulkAdd", req, res); err != nil {
  247. return nil, err
  248. }
  249. if len(res.Taskresult) != len(tasks) {
  250. return nil, errors.New("taskqueue: server error")
  251. }
  252. tasksOut := make([]*Task, len(tasks))
  253. for i, tr := range res.Taskresult {
  254. tasksOut[i] = new(Task)
  255. *tasksOut[i] = *tasks[i]
  256. tasksOut[i].Method = tasksOut[i].method()
  257. if tasksOut[i].Name == "" {
  258. tasksOut[i].Name = string(tr.ChosenTaskName)
  259. }
  260. if *tr.Result != pb.TaskQueueServiceError_OK {
  261. if alreadyAddedErrors[*tr.Result] {
  262. me[i] = ErrTaskAlreadyAdded
  263. } else {
  264. me[i] = &internal.APIError{
  265. Service: "taskqueue",
  266. Code: int32(*tr.Result),
  267. }
  268. }
  269. any = true
  270. }
  271. }
  272. if any {
  273. return tasksOut, me
  274. }
  275. return tasksOut, nil
  276. }
  277. // Delete deletes a task from a named queue.
  278. func Delete(c context.Context, task *Task, queueName string) error {
  279. err := DeleteMulti(c, []*Task{task}, queueName)
  280. if me, ok := err.(appengine.MultiError); ok {
  281. return me[0]
  282. }
  283. return err
  284. }
  285. // DeleteMulti deletes multiple tasks from a named queue.
  286. // If a given task could not be deleted, an appengine.MultiError is returned.
  287. func DeleteMulti(c context.Context, tasks []*Task, queueName string) error {
  288. taskNames := make([][]byte, len(tasks))
  289. for i, t := range tasks {
  290. taskNames[i] = []byte(t.Name)
  291. }
  292. if queueName == "" {
  293. queueName = "default"
  294. }
  295. req := &pb.TaskQueueDeleteRequest{
  296. QueueName: []byte(queueName),
  297. TaskName: taskNames,
  298. }
  299. res := &pb.TaskQueueDeleteResponse{}
  300. if err := internal.Call(c, "taskqueue", "Delete", req, res); err != nil {
  301. return err
  302. }
  303. if a, b := len(req.TaskName), len(res.Result); a != b {
  304. return fmt.Errorf("taskqueue: internal error: requested deletion of %d tasks, got %d results", a, b)
  305. }
  306. me, any := make(appengine.MultiError, len(res.Result)), false
  307. for i, ec := range res.Result {
  308. if ec != pb.TaskQueueServiceError_OK {
  309. me[i] = &internal.APIError{
  310. Service: "taskqueue",
  311. Code: int32(ec),
  312. }
  313. any = true
  314. }
  315. }
  316. if any {
  317. return me
  318. }
  319. return nil
  320. }
  321. func lease(c context.Context, maxTasks int, queueName string, leaseTime int, groupByTag bool, tag []byte) ([]*Task, error) {
  322. if queueName == "" {
  323. queueName = "default"
  324. }
  325. req := &pb.TaskQueueQueryAndOwnTasksRequest{
  326. QueueName: []byte(queueName),
  327. LeaseSeconds: proto.Float64(float64(leaseTime)),
  328. MaxTasks: proto.Int64(int64(maxTasks)),
  329. GroupByTag: proto.Bool(groupByTag),
  330. Tag: tag,
  331. }
  332. res := &pb.TaskQueueQueryAndOwnTasksResponse{}
  333. if err := internal.Call(c, "taskqueue", "QueryAndOwnTasks", req, res); err != nil {
  334. return nil, err
  335. }
  336. tasks := make([]*Task, len(res.Task))
  337. for i, t := range res.Task {
  338. tasks[i] = &Task{
  339. Payload: t.Body,
  340. Name: string(t.TaskName),
  341. Method: "PULL",
  342. ETA: time.Unix(0, *t.EtaUsec*1e3),
  343. RetryCount: *t.RetryCount,
  344. Tag: string(t.Tag),
  345. }
  346. }
  347. return tasks, nil
  348. }
  349. // Lease leases tasks from a queue.
  350. // leaseTime is in seconds.
  351. // The number of tasks fetched will be at most maxTasks.
  352. func Lease(c context.Context, maxTasks int, queueName string, leaseTime int) ([]*Task, error) {
  353. return lease(c, maxTasks, queueName, leaseTime, false, nil)
  354. }
  355. // LeaseByTag leases tasks from a queue, grouped by tag.
  356. // If tag is empty, then the returned tasks are grouped by the tag of the task with earliest ETA.
  357. // leaseTime is in seconds.
  358. // The number of tasks fetched will be at most maxTasks.
  359. func LeaseByTag(c context.Context, maxTasks int, queueName string, leaseTime int, tag string) ([]*Task, error) {
  360. return lease(c, maxTasks, queueName, leaseTime, true, []byte(tag))
  361. }
  362. // Purge removes all tasks from a queue.
  363. func Purge(c context.Context, queueName string) error {
  364. if queueName == "" {
  365. queueName = "default"
  366. }
  367. req := &pb.TaskQueuePurgeQueueRequest{
  368. QueueName: []byte(queueName),
  369. }
  370. res := &pb.TaskQueuePurgeQueueResponse{}
  371. return internal.Call(c, "taskqueue", "PurgeQueue", req, res)
  372. }
  373. // ModifyLease modifies the lease of a task.
  374. // Used to request more processing time, or to abandon processing.
  375. // leaseTime is in seconds and must not be negative.
  376. func ModifyLease(c context.Context, task *Task, queueName string, leaseTime int) error {
  377. if queueName == "" {
  378. queueName = "default"
  379. }
  380. req := &pb.TaskQueueModifyTaskLeaseRequest{
  381. QueueName: []byte(queueName),
  382. TaskName: []byte(task.Name),
  383. EtaUsec: proto.Int64(task.ETA.UnixNano() / 1e3), // Used to verify ownership.
  384. LeaseSeconds: proto.Float64(float64(leaseTime)),
  385. }
  386. res := &pb.TaskQueueModifyTaskLeaseResponse{}
  387. if err := internal.Call(c, "taskqueue", "ModifyTaskLease", req, res); err != nil {
  388. return err
  389. }
  390. task.ETA = time.Unix(0, *res.UpdatedEtaUsec*1e3)
  391. return nil
  392. }
  393. // QueueStatistics represents statistics about a single task queue.
  394. type QueueStatistics struct {
  395. Tasks int // may be an approximation
  396. OldestETA time.Time // zero if there are no pending tasks
  397. Executed1Minute int // tasks executed in the last minute
  398. InFlight int // tasks executing now
  399. EnforcedRate float64 // requests per second
  400. }
  401. // QueueStats retrieves statistics about queues.
  402. func QueueStats(c context.Context, queueNames []string) ([]QueueStatistics, error) {
  403. req := &pb.TaskQueueFetchQueueStatsRequest{
  404. QueueName: make([][]byte, len(queueNames)),
  405. }
  406. for i, q := range queueNames {
  407. if q == "" {
  408. q = "default"
  409. }
  410. req.QueueName[i] = []byte(q)
  411. }
  412. res := &pb.TaskQueueFetchQueueStatsResponse{}
  413. if err := internal.Call(c, "taskqueue", "FetchQueueStats", req, res); err != nil {
  414. return nil, err
  415. }
  416. qs := make([]QueueStatistics, len(res.Queuestats))
  417. for i, qsg := range res.Queuestats {
  418. qs[i] = QueueStatistics{
  419. Tasks: int(*qsg.NumTasks),
  420. }
  421. if eta := *qsg.OldestEtaUsec; eta > -1 {
  422. qs[i].OldestETA = time.Unix(0, eta*1e3)
  423. }
  424. if si := qsg.ScannerInfo; si != nil {
  425. qs[i].Executed1Minute = int(*si.ExecutedLastMinute)
  426. qs[i].InFlight = int(si.GetRequestsInFlight())
  427. qs[i].EnforcedRate = si.GetEnforcedRate()
  428. }
  429. }
  430. return qs, nil
  431. }
  432. func setTransaction(x *pb.TaskQueueAddRequest, t *dspb.Transaction) {
  433. x.Transaction = t
  434. }
  435. func init() {
  436. internal.RegisterErrorCodeMap("taskqueue", pb.TaskQueueServiceError_ErrorCode_name)
  437. // Datastore error codes are shifted by DATASTORE_ERROR when presented through taskqueue.
  438. dsCode := int32(pb.TaskQueueServiceError_DATASTORE_ERROR) + int32(dspb.Error_TIMEOUT)
  439. internal.RegisterTimeoutErrorCode("taskqueue", dsCode)
  440. // Transaction registration.
  441. internal.RegisterTransactionSetter(setTransaction)
  442. internal.RegisterTransactionSetter(func(x *pb.TaskQueueBulkAddRequest, t *dspb.Transaction) {
  443. for _, req := range x.AddRequest {
  444. setTransaction(req, t)
  445. }
  446. })
  447. }