service.go 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403
  1. // Copyright 2015 Google Inc. All Rights Reserved.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package bigquery
  15. import (
  16. "errors"
  17. "fmt"
  18. "net/http"
  19. "sync"
  20. "time"
  21. "golang.org/x/net/context"
  22. bq "google.golang.org/api/bigquery/v2"
  23. )
  24. // service provides an internal abstraction to isolate the generated
  25. // BigQuery API; most of this package uses this interface instead.
  26. // The single implementation, *bigqueryService, contains all the knowledge
  27. // of the generated BigQuery API.
  28. type service interface {
  29. // Jobs
  30. insertJob(ctx context.Context, job *bq.Job, projectId string) (*Job, error)
  31. getJobType(ctx context.Context, projectId, jobID string) (jobType, error)
  32. jobStatus(ctx context.Context, projectId, jobID string) (*JobStatus, error)
  33. // Queries
  34. // readQuery reads data resulting from a query job. If the job is not
  35. // yet complete, an errIncompleteJob is returned. readQuery may be
  36. // called repeatedly to wait for results indefinitely.
  37. readQuery(ctx context.Context, conf *readQueryConf, pageToken string) (*readDataResult, error)
  38. readTabledata(ctx context.Context, conf *readTableConf, pageToken string) (*readDataResult, error)
  39. // Tables
  40. createTable(ctx context.Context, conf *createTableConf) error
  41. getTableMetadata(ctx context.Context, projectID, datasetID, tableID string) (*TableMetadata, error)
  42. deleteTable(ctx context.Context, projectID, datasetID, tableID string) error
  43. listTables(ctx context.Context, projectID, datasetID, pageToken string) ([]*Table, string, error)
  44. patchTable(ctx context.Context, projectID, datasetID, tableID string, conf *patchTableConf) (*TableMetadata, error)
  45. }
  46. type bigqueryService struct {
  47. s *bq.Service
  48. }
  49. func newBigqueryService(client *http.Client) (*bigqueryService, error) {
  50. s, err := bq.New(client)
  51. if err != nil {
  52. return nil, fmt.Errorf("constructing bigquery client: %v", err)
  53. }
  54. return &bigqueryService{s: s}, nil
  55. }
  56. // getPages calls the supplied getPage function repeatedly until there are no pages left to get.
  57. // token is the token of the initial page to start from. Use an empty string to start from the beginning.
  58. func getPages(token string, getPage func(token string) (nextToken string, err error)) error {
  59. for {
  60. var err error
  61. token, err = getPage(token)
  62. if err != nil {
  63. return err
  64. }
  65. if token == "" {
  66. return nil
  67. }
  68. }
  69. }
  70. func (s *bigqueryService) insertJob(ctx context.Context, job *bq.Job, projectID string) (*Job, error) {
  71. res, err := s.s.Jobs.Insert(projectID, job).Context(ctx).Do()
  72. if err != nil {
  73. return nil, err
  74. }
  75. return &Job{service: s, projectID: projectID, jobID: res.JobReference.JobId}, nil
  76. }
  77. type pagingConf struct {
  78. recordsPerRequest int64
  79. setRecordsPerRequest bool
  80. startIndex uint64
  81. }
  82. type readTableConf struct {
  83. projectID, datasetID, tableID string
  84. paging pagingConf
  85. schema Schema // lazily initialized when the first page of data is fetched.
  86. }
  87. type readDataResult struct {
  88. pageToken string
  89. rows [][]Value
  90. totalRows uint64
  91. schema Schema
  92. }
  93. type readQueryConf struct {
  94. projectID, jobID string
  95. paging pagingConf
  96. }
  97. func (s *bigqueryService) readTabledata(ctx context.Context, conf *readTableConf, pageToken string) (*readDataResult, error) {
  98. // Prepare request to fetch one page of table data.
  99. req := s.s.Tabledata.List(conf.projectID, conf.datasetID, conf.tableID)
  100. if pageToken != "" {
  101. req.PageToken(pageToken)
  102. } else {
  103. req.StartIndex(conf.paging.startIndex)
  104. }
  105. if conf.paging.setRecordsPerRequest {
  106. req.MaxResults(conf.paging.recordsPerRequest)
  107. }
  108. // Fetch the table schema in the background, if necessary.
  109. var schemaErr error
  110. var schemaFetch sync.WaitGroup
  111. if conf.schema == nil {
  112. schemaFetch.Add(1)
  113. go func() {
  114. defer schemaFetch.Done()
  115. var t *bq.Table
  116. t, schemaErr = s.s.Tables.Get(conf.projectID, conf.datasetID, conf.tableID).
  117. Fields("schema").
  118. Context(ctx).
  119. Do()
  120. if schemaErr == nil && t.Schema != nil {
  121. conf.schema = convertTableSchema(t.Schema)
  122. }
  123. }()
  124. }
  125. res, err := req.Context(ctx).Do()
  126. if err != nil {
  127. return nil, err
  128. }
  129. schemaFetch.Wait()
  130. if schemaErr != nil {
  131. return nil, schemaErr
  132. }
  133. result := &readDataResult{
  134. pageToken: res.PageToken,
  135. totalRows: uint64(res.TotalRows),
  136. schema: conf.schema,
  137. }
  138. result.rows, err = convertRows(res.Rows, conf.schema)
  139. if err != nil {
  140. return nil, err
  141. }
  142. return result, nil
  143. }
  144. var errIncompleteJob = errors.New("internal error: query results not available because job is not complete")
  145. // getQueryResultsTimeout controls the maximum duration of a request to the
  146. // BigQuery GetQueryResults endpoint. Setting a long timeout here does not
  147. // cause increased overall latency, as results are returned as soon as they are
  148. // available.
  149. const getQueryResultsTimeout = time.Minute
  150. func (s *bigqueryService) readQuery(ctx context.Context, conf *readQueryConf, pageToken string) (*readDataResult, error) {
  151. req := s.s.Jobs.GetQueryResults(conf.projectID, conf.jobID).
  152. TimeoutMs(getQueryResultsTimeout.Nanoseconds() / 1e6)
  153. if pageToken != "" {
  154. req.PageToken(pageToken)
  155. } else {
  156. req.StartIndex(conf.paging.startIndex)
  157. }
  158. if conf.paging.setRecordsPerRequest {
  159. req.MaxResults(conf.paging.recordsPerRequest)
  160. }
  161. res, err := req.Context(ctx).Do()
  162. if err != nil {
  163. return nil, err
  164. }
  165. if !res.JobComplete {
  166. return nil, errIncompleteJob
  167. }
  168. schema := convertTableSchema(res.Schema)
  169. result := &readDataResult{
  170. pageToken: res.PageToken,
  171. totalRows: res.TotalRows,
  172. schema: schema,
  173. }
  174. result.rows, err = convertRows(res.Rows, schema)
  175. if err != nil {
  176. return nil, err
  177. }
  178. return result, nil
  179. }
  180. type jobType int
  181. const (
  182. copyJobType jobType = iota
  183. extractJobType
  184. loadJobType
  185. queryJobType
  186. )
  187. func (s *bigqueryService) getJobType(ctx context.Context, projectID, jobID string) (jobType, error) {
  188. res, err := s.s.Jobs.Get(projectID, jobID).
  189. Fields("configuration").
  190. Context(ctx).
  191. Do()
  192. if err != nil {
  193. return 0, err
  194. }
  195. switch {
  196. case res.Configuration.Copy != nil:
  197. return copyJobType, nil
  198. case res.Configuration.Extract != nil:
  199. return extractJobType, nil
  200. case res.Configuration.Load != nil:
  201. return loadJobType, nil
  202. case res.Configuration.Query != nil:
  203. return queryJobType, nil
  204. default:
  205. return 0, errors.New("unknown job type")
  206. }
  207. }
  208. func (s *bigqueryService) jobStatus(ctx context.Context, projectID, jobID string) (*JobStatus, error) {
  209. res, err := s.s.Jobs.Get(projectID, jobID).
  210. Fields("status"). // Only fetch what we need.
  211. Context(ctx).
  212. Do()
  213. if err != nil {
  214. return nil, err
  215. }
  216. return jobStatusFromProto(res.Status)
  217. }
  218. var stateMap = map[string]State{"PENDING": Pending, "RUNNING": Running, "DONE": Done}
  219. func jobStatusFromProto(status *bq.JobStatus) (*JobStatus, error) {
  220. state, ok := stateMap[status.State]
  221. if !ok {
  222. return nil, fmt.Errorf("unexpected job state: %v", status.State)
  223. }
  224. newStatus := &JobStatus{
  225. State: state,
  226. err: nil,
  227. }
  228. if err := errorFromErrorProto(status.ErrorResult); state == Done && err != nil {
  229. newStatus.err = err
  230. }
  231. for _, ep := range status.Errors {
  232. newStatus.Errors = append(newStatus.Errors, errorFromErrorProto(ep))
  233. }
  234. return newStatus, nil
  235. }
  236. // listTables returns a subset of tables that belong to a dataset, and a token for fetching the next subset.
  237. func (s *bigqueryService) listTables(ctx context.Context, projectID, datasetID, pageToken string) ([]*Table, string, error) {
  238. var tables []*Table
  239. res, err := s.s.Tables.List(projectID, datasetID).
  240. PageToken(pageToken).
  241. Context(ctx).
  242. Do()
  243. if err != nil {
  244. return nil, "", err
  245. }
  246. for _, t := range res.Tables {
  247. tables = append(tables, convertListedTable(t))
  248. }
  249. return tables, res.NextPageToken, nil
  250. }
  251. type createTableConf struct {
  252. projectID, datasetID, tableID string
  253. expiration time.Time
  254. viewQuery string
  255. }
  256. // createTable creates a table in the BigQuery service.
  257. // expiration is an optional time after which the table will be deleted and its storage reclaimed.
  258. // If viewQuery is non-empty, the created table will be of type VIEW.
  259. // Note: expiration can only be set during table creation.
  260. // Note: after table creation, a view can be modified only if its table was initially created with a view.
  261. func (s *bigqueryService) createTable(ctx context.Context, conf *createTableConf) error {
  262. table := &bq.Table{
  263. TableReference: &bq.TableReference{
  264. ProjectId: conf.projectID,
  265. DatasetId: conf.datasetID,
  266. TableId: conf.tableID,
  267. },
  268. }
  269. if !conf.expiration.IsZero() {
  270. table.ExpirationTime = conf.expiration.UnixNano() / 1000
  271. }
  272. if conf.viewQuery != "" {
  273. table.View = &bq.ViewDefinition{
  274. Query: conf.viewQuery,
  275. }
  276. }
  277. _, err := s.s.Tables.Insert(conf.projectID, conf.datasetID, table).Context(ctx).Do()
  278. return err
  279. }
  280. func (s *bigqueryService) getTableMetadata(ctx context.Context, projectID, datasetID, tableID string) (*TableMetadata, error) {
  281. table, err := s.s.Tables.Get(projectID, datasetID, tableID).Context(ctx).Do()
  282. if err != nil {
  283. return nil, err
  284. }
  285. return bqTableToMetadata(table), nil
  286. }
  287. func (s *bigqueryService) deleteTable(ctx context.Context, projectID, datasetID, tableID string) error {
  288. return s.s.Tables.Delete(projectID, datasetID, tableID).Context(ctx).Do()
  289. }
  290. func bqTableToMetadata(t *bq.Table) *TableMetadata {
  291. md := &TableMetadata{
  292. Description: t.Description,
  293. Name: t.FriendlyName,
  294. Type: TableType(t.Type),
  295. ID: t.Id,
  296. NumBytes: t.NumBytes,
  297. NumRows: t.NumRows,
  298. }
  299. if t.ExpirationTime != 0 {
  300. md.ExpirationTime = time.Unix(0, t.ExpirationTime*1e6)
  301. }
  302. if t.CreationTime != 0 {
  303. md.CreationTime = time.Unix(0, t.CreationTime*1e6)
  304. }
  305. if t.LastModifiedTime != 0 {
  306. md.LastModifiedTime = time.Unix(0, int64(t.LastModifiedTime*1e6))
  307. }
  308. if t.Schema != nil {
  309. md.Schema = convertTableSchema(t.Schema)
  310. }
  311. if t.View != nil {
  312. md.View = t.View.Query
  313. }
  314. return md
  315. }
  316. func convertListedTable(t *bq.TableListTables) *Table {
  317. return &Table{
  318. ProjectID: t.TableReference.ProjectId,
  319. DatasetID: t.TableReference.DatasetId,
  320. TableID: t.TableReference.TableId,
  321. }
  322. }
  323. // patchTableConf contains fields to be patched.
  324. type patchTableConf struct {
  325. // These fields are omitted from the patch operation if nil.
  326. Description *string
  327. Name *string
  328. }
  329. func (s *bigqueryService) patchTable(ctx context.Context, projectID, datasetID, tableID string, conf *patchTableConf) (*TableMetadata, error) {
  330. t := &bq.Table{}
  331. forceSend := func(field string) {
  332. t.ForceSendFields = append(t.ForceSendFields, field)
  333. }
  334. if conf.Description != nil {
  335. t.Description = *conf.Description
  336. forceSend("Description")
  337. }
  338. if conf.Name != nil {
  339. t.FriendlyName = *conf.Name
  340. forceSend("FriendlyName")
  341. }
  342. table, err := s.s.Tables.Patch(projectID, datasetID, tableID, t).
  343. Context(ctx).
  344. Do()
  345. if err != nil {
  346. return nil, err
  347. }
  348. return bqTableToMetadata(table), nil
  349. }