Execute BigQuery Transfer Service by Workflow

wqwq
3 min readMar 16, 2024

--

The BigQuery Transfer Service automates data transfers to BigQuery. Executing it via a workflow allows scheduling, dependency management, error handling, and monitoring of these data ingestion tasks.

Component

We are implementing a workflow that executes the BigQuery Transfer Service and runs queries on BigQuery. The workflow automates data transfers from various sources into BigQuery and then performs queries on the ingested data within BigQuery.

Sample Implementation

workflow.tf

To allow a workflow service account to update BigQuery transfer configurations, the 「bigquery.transfers.update」 permission is required. Therefore, we need to grant the 「roles/bigquery.admin」 role to the service account used by the workflow.

resource "google_service_account" "sample" {
account_id = "sample"
display_name = "sample"
}

resource "google_project_iam_member" "invoke_workflow" {
project = var.gcp_project_id
role = "roles/bigquery.admin"
member = "serviceAccount:${google_service_account.sample.email}"
}

resource "google_workflows_workflow" "sample" {
name = "samples"
region = var.gcp_region
description = "sample"
service_account = google_service_account.sample.id
source_contents = templatefile("${path.module}/workflow.yaml", {
current_time = timestamp(),
project_id = var.gcp_project_id,
})
}

workflow.yml

main:
steps:
- start_run:
call: googleapis.bigquerydatatransfer.v1.projects.locations.transferConfigs.startManualRuns
args:
parent: ${config.name}
body:
requestedRunTime: ${now}
result: runsResp
- start_run_query:
call: googleapis.bigquery.v2.jobs.query
args:
projectId: ${project_id}
body:
useLegacySql: false
useQueryCache: false
timeoutMs: 3000
query: |
SELECT * FROM hogehoge where hoge_id = 'hoghoge';
result: queryResult
- the_end:
return: "SUCCESS"

How do we execute parallel workflow?

To improve execution speed, we may want to run certain steps of the workflow in parallel. In such cases, we can use parallel step functionality provided by the workflow orchestration platform. This parallelization can help reduce the overall execution time for the workflow.

main:
steps:
- the_start:
parallel:
branches:
- start_run1:
steps:
- runTransfer1:
call: googleapis.bigquerydatatransfer.v1.projects.locations.transferConfigs.startManualRuns
args:
parent: ${transfer_id1}
body:
requestedRunTime: ${current_time}
result: runsResp
- runQueryFor1:
call: googleapis.bigquery.v2.jobs.query
args:
projectId: ${project_id}
body:
useLegacySql: false
useQueryCache: false
timeoutMs: 3000
query: |
select * from hogehoge
result: queryResult
- start_run2:
steps:
- runTransfer2:
call: googleapis.bigquerydatatransfer.v1.projects.locations.transferConfigs.startManualRuns
args:
parent: ${transfer_id2}
body:
requestedRunTime: ${current_time}
result: runsResp
- runQuery2:
call: googleapis.bigquery.v2.jobs.query
args:
projectId: ${project_id}
body:
useLegacySql: false
useQueryCache: false
timeoutMs: 3000
query: |
select * from hugahga
result: queryResult

How do we schedule for a workflow?

In this scenario, we use Cloud Scheduler to trigger the execution of the workflow. To enable Cloud Scheduler to invoke the workflow, we need to grant the 「roles/workflows.invoker」permission to the service account used by Cloud Scheduler. This implementation involves Cloud Scheduler calling the workflow's HTTP endpoint to initiate the workflow execution.

resource "google_cloud_scheduler_job" "workflow_invoker" {
paused = false
name = "workflow-invoker"
project = var.gcp_project_id
region = var.gcp_region
schedule = "0 1* * *"
time_zone = "Asia/Tokyo"
attempt_deadline = "180s"

retry_config {
max_doublings = 5
max_retry_duration = "0s"
max_backoff_duration = "3600s"
min_backoff_duration = "5s"
}

http_target {
http_method = "POST"
uri = "https://workflowexecutions.googleapis.com/v1/${google_workflows_workflow.sample.id}/executions"
oauth_token {
service_account_email = google_service_account.sample.email
}
}
}

Reference

--

--