|
- // Copyright 2013 The Go Authors. All rights reserved.
- // Use of this source code is governed by a BSD-style
- // license that can be found in the LICENSE file.
- package main
- import (
- "container/list"
- "encoding/json"
- "fmt"
- "io/ioutil"
- "log"
- "math"
- "math/rand"
- "net/http"
- "os"
- "strconv"
- "strings"
- "time"
- bigquery "google.golang.org/api/bigquery/v2"
- storage "google.golang.org/api/storage/v1"
- )
- const (
- GB = 1 << 30
- MaxBackoff = 30000
- BaseBackoff = 250
- BackoffGrowthFactor = 1.8
- BackoffGrowthDamper = 0.25
- JobStatusDone = "DONE"
- DatasetAlreadyExists = "Already Exists: Dataset"
- TableWriteEmptyDisposition = "WRITE_EMPTY"
- )
- func init() {
- scope := fmt.Sprintf("%s %s %s", bigquery.BigqueryScope,
- storage.DevstorageReadOnlyScope,
- "https://www.googleapis.com/auth/userinfo.profile")
- registerDemo("bigquery", scope, bqMain)
- }
- // This example demonstrates loading objects from Google Cloud Storage into
- // BigQuery. Objects are specified by their bucket and a name prefix. Each
- // object will be loaded into a new table identified by the object name minus
- // any file extension. All tables are added to the specified dataset (one will
- // be created if necessary). Currently, tables will not be overwritten and an
- // attempt to load an object into a dataset that already contains its table
- // will emit an error message indicating the table already exists.
- // A schema file must be provided and it will be applied to every object/table.
- // Example usage:
- // go-api-demo -clientid="my-clientid" -secret="my-secret" bq myProject
- // myDataBucket datafile2013070 DataFiles2013
- // ./datafile_schema.json 100
- //
- // This will load all objects (e.g. all data files from July 2013) from
- // gs://myDataBucket into a (possibly new) BigQuery dataset named DataFiles2013
- // using the schema file provided and allowing up to 100 bad records. Assuming
- // each object is named like datafileYYYYMMDD.csv.gz and all of July's files are
- // stored in the bucket, 9 tables will be created named like datafile201307DD
- // where DD ranges from 01 to 09, inclusive.
- // When the program completes, it will emit a results line similar to:
- //
- // 9 files loaded in 3m58s (18m2.708s). Size: 7.18GB Rows: 7130725
- //
- // The total elapsed time from the start of first job to the end of the last job
- // (effectively wall clock time) is shown. In parenthesis is the aggregate time
- // taken to load all tables.
- func bqMain(client *http.Client, argv []string) {
- if len(argv) != 6 {
- fmt.Fprintln(os.Stderr,
- "Usage: bq project_id bucket prefix dataset schema max_bad_records")
- return
- }
- var (
- project = argv[0]
- bucket = argv[1]
- objPrefix = argv[2]
- datasetId = argv[3]
- schemaFile = argv[4]
- )
- badRecords, err := strconv.ParseInt(argv[5], 10, 64)
- if err != nil {
- fmt.Fprintln(os.Stderr, err)
- return
- }
- rand.Seed(time.Now().UnixNano())
- service, err := storage.New(client)
- if err != nil {
- log.Fatalf("Unable to create Storage service: %v", err)
- }
- // Get the list of objects in the bucket matching the specified prefix.
- list := service.Objects.List(bucket)
- list.Prefix(objPrefix)
- objects, err := list.Do()
- if err != nil {
- fmt.Fprintln(os.Stderr, err)
- return
- }
- // Create the wrapper and insert the (new) dataset.
- dataset, err := newBQDataset(client, project, datasetId)
- if err != nil {
- fmt.Fprintln(os.Stderr, err)
- return
- }
- if err = dataset.insert(true); err != nil {
- fmt.Fprintln(os.Stderr, err)
- return
- }
- objectSource := &tableSource{
- maxBadRecords: badRecords,
- disposition: TableWriteEmptyDisposition,
- }
- // Load the schema from disk.
- f, err := ioutil.ReadFile(schemaFile)
- if err != nil {
- fmt.Fprintln(os.Stderr, err)
- return
- }
- if err = json.Unmarshal(f, &objectSource.schema); err != nil {
- fmt.Fprintln(os.Stderr, err)
- return
- }
- // Assumes all objects have .csv, .csv.gz (or no) extension.
- tableIdFromObject := func(name string) string {
- return strings.TrimSuffix(strings.TrimSuffix(name, ".gz"), ".csv")
- }
- // A jobset is way to group a collection of jobs together for monitoring.
- // For this example, we just use the name of the bucket and object prefix.
- jobset := fmt.Sprintf("%s:%s", bucket, objPrefix)
- fmt.Fprintf(os.Stderr, "\nLoading %d objects.\n", len(objects.Items))
- // Load each object into a dataset of the same name (minus any extension).
- // A successful insert call will inject the job into our queue for monitoring.
- for _, o := range objects.Items {
- objectSource.id = tableIdFromObject(o.Name)
- objectSource.uri = fmt.Sprintf("gs://%s/%s", o.Bucket, o.Name)
- if err = dataset.load(jobset, objectSource); err != nil {
- fmt.Fprintln(os.Stderr, err)
- }
- }
- dataset.monitor(jobset)
- }
- // Wraps the BigQuery service and dataset and provides some helper functions.
- type bqDataset struct {
- project string
- id string
- bq *bigquery.Service
- dataset *bigquery.Dataset
- jobsets map[string]*list.List
- }
- func newBQDataset(client *http.Client, dsProj string, dsId string) (*bqDataset,
- error) {
- service, err := bigquery.New(client)
- if err != nil {
- log.Fatalf("Unable to create BigQuery service: %v", err)
- }
- return &bqDataset{
- project: dsProj,
- id: dsId,
- bq: service,
- dataset: &bigquery.Dataset{
- DatasetReference: &bigquery.DatasetReference{
- DatasetId: dsId,
- ProjectId: dsProj,
- },
- },
- jobsets: make(map[string]*list.List),
- }, nil
- }
- func (ds *bqDataset) insert(existsOK bool) error {
- call := ds.bq.Datasets.Insert(ds.project, ds.dataset)
- _, err := call.Do()
- if err != nil && (!existsOK || !strings.Contains(err.Error(),
- DatasetAlreadyExists)) {
- return err
- }
- return nil
- }
- type tableSource struct {
- id string
- uri string
- schema bigquery.TableSchema
- maxBadRecords int64
- disposition string
- }
- func (ds *bqDataset) load(jobset string, source *tableSource) error {
- job := &bigquery.Job{
- Configuration: &bigquery.JobConfiguration{
- Load: &bigquery.JobConfigurationLoad{
- DestinationTable: &bigquery.TableReference{
- DatasetId: ds.dataset.DatasetReference.DatasetId,
- ProjectId: ds.project,
- TableId: source.id,
- },
- MaxBadRecords: source.maxBadRecords,
- Schema: &source.schema,
- SourceUris: []string{source.uri},
- WriteDisposition: source.disposition,
- },
- },
- }
- call := ds.bq.Jobs.Insert(ds.project, job)
- job, err := call.Do()
- if err != nil {
- return err
- }
- _, ok := ds.jobsets[jobset]
- if !ok {
- ds.jobsets[jobset] = list.New()
- }
- ds.jobsets[jobset].PushBack(job)
- return nil
- }
- func (ds *bqDataset) getJob(id string) (*bigquery.Job, error) {
- return ds.bq.Jobs.Get(ds.project, id).Do()
- }
- func (ds *bqDataset) monitor(jobset string) {
- jobq, ok := ds.jobsets[jobset]
- if !ok {
- return
- }
- var backoff float64 = BaseBackoff
- pause := func(grow bool) {
- if grow {
- backoff *= BackoffGrowthFactor
- backoff -= (backoff * rand.Float64() * BackoffGrowthDamper)
- backoff = math.Min(backoff, MaxBackoff)
- fmt.Fprintf(os.Stderr, "[%s] Checking remaining %d jobs...\n", jobset,
- 1+jobq.Len())
- }
- time.Sleep(time.Duration(backoff) * time.Millisecond)
- }
- var stats jobStats
- // Track a 'head' pending job in queue for detecting cycling.
- head := ""
- // Loop until all jobs are done - with either success or error.
- for jobq.Len() > 0 {
- jel := jobq.Front()
- job := jel.Value.(*bigquery.Job)
- jobq.Remove(jel)
- jid := job.JobReference.JobId
- loop := false
- // Check and possibly pick a new head job id.
- if len(head) == 0 {
- head = jid
- } else {
- if jid == head {
- loop = true
- }
- }
- // Retrieve the job's current status.
- pause(loop)
- j, err := ds.getJob(jid)
- if err != nil {
- fmt.Fprintln(os.Stderr, err)
- // In this case of a transient API error, we want keep the job.
- if j == nil {
- jobq.PushBack(job)
- } else {
- // Must reset head tracker if job is discarded.
- if loop {
- head = ""
- backoff = BaseBackoff
- }
- }
- continue
- }
- // Reassign with the updated job data (from Get).
- // We don't use j here as Get might return nil for this value.
- job = j
- if job.Status.State != JobStatusDone {
- jobq.PushBack(job)
- continue
- }
- if res := job.Status.ErrorResult; res != nil {
- fmt.Fprintln(os.Stderr, res.Message)
- } else {
- stat := job.Statistics
- lstat := stat.Load
- stats.files += 1
- stats.bytesIn += lstat.InputFileBytes
- stats.bytesOut += lstat.OutputBytes
- stats.rows += lstat.OutputRows
- stats.elapsed +=
- time.Duration(stat.EndTime-stat.StartTime) * time.Millisecond
- if stats.start.IsZero() {
- stats.start = time.Unix(stat.StartTime/1000, 0)
- } else {
- t := time.Unix(stat.StartTime/1000, 0)
- if stats.start.Sub(t) > 0 {
- stats.start = t
- }
- }
- if stats.finish.IsZero() {
- stats.finish = time.Unix(stat.EndTime/1000, 0)
- } else {
- t := time.Unix(stat.EndTime/1000, 0)
- if t.Sub(stats.finish) > 0 {
- stats.finish = t
- }
- }
- }
- // When the head job is processed reset the backoff since the loads
- // run in BQ in parallel.
- if loop {
- head = ""
- backoff = BaseBackoff
- }
- }
- fmt.Fprintf(os.Stderr, "%#v\n", stats)
- }
- type jobStats struct {
- // Number of files (sources) loaded.
- files int64
- // Bytes read from source (possibly compressed).
- bytesIn int64
- // Bytes loaded into BigQuery (uncompressed).
- bytesOut int64
- // Rows loaded into BigQuery.
- rows int64
- // Time taken to load source into table.
- elapsed time.Duration
- // Start time of the job.
- start time.Time
- // End time of the job.
- finish time.Time
- }
- func (s jobStats) GoString() string {
- return fmt.Sprintf("\n%d files loaded in %v (%v). Size: %.2fGB Rows: %d\n",
- s.files, s.finish.Sub(s.start), s.elapsed, float64(s.bytesOut)/GB,
- s.rows)
- }
|