How to run a BigQuery query and then send the output CSV to Google Cloud Storage in Apache Airflow?(如何在ApacheAirflow中运行BigQuery查询,然后将输出的CSV发送到Google Cloud Storage?)
本文介绍了如何在ApacheAirflow中运行BigQuery查询,然后将输出的CSV发送到Google Cloud Storage?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!
问题描述
我需要在python中运行一个BigQuery脚本,它需要在Google云存储中作为CSV输出。目前,我的脚本触发大查询代码,直接保存到我的PC上。
但是,我需要让它在气流中运行,这样我就不能有任何本地依赖项。
我当前的脚本将输出保存到本地计算机,然后我必须将其移到GCS中。我在网上查过了,但我想不通。(PS我还是个新手,所以如果以前有人问过这个问题,我很抱歉!)import pandas as pd
from googleapiclient import discovery
from oauth2client.client import GoogleCredentials
def run_script():
df = pd.read_gbq('SELECT * FROM `table/veiw` LIMIT 15000',
project_id='PROJECT',
dialect='standard'
)
df.to_csv('XXX.csv', index=False)
def copy_to_gcs(filename, bucket, destination_filename):
credentials = GoogleCredentials.get_application_default()
service = discovery.build('storage', 'v1', credentials=credentials)
body = {'name': destination_filename}
req = service.objects().insert(bucket=bucket,body=body, media_body=filename)
resp = req.execute()
current_date = datetime.date.today()
filename = (r"C:UsersLOCALDRIVEETCETCETC.csv")
bucket = 'My GCS BUCKET'
str_prefix_datetime = datetime.datetime.now().strftime('%Y%m%d_%H%M%S')
destfile = 'XXX' + str_prefix_datetime + '.csv'
print('')
```
推荐答案
Airflow为使用BigQuery提供了几个运算符。
- BigQueryOperator对BigQuery执行查询。
- BigQueryToCloudStorageOperator将BigQuery表导出到GCS。
您可以看到一个运行查询的示例,后跟exporting the results to a CSV in the Cloud Composer code samples。
# Copyright 2018 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# Query recent StackOverflow questions.
bq_recent_questions_query = bigquery_operator.BigQueryOperator(
task_id='bq_recent_questions_query',
sql="""
SELECT owner_display_name, title, view_count
FROM `bigquery-public-data.stackoverflow.posts_questions`
WHERE creation_date < CAST('{max_date}' AS TIMESTAMP)
AND creation_date >= CAST('{min_date}' AS TIMESTAMP)
ORDER BY view_count DESC
LIMIT 100
""".format(max_date=max_query_date, min_date=min_query_date),
use_legacy_sql=False,
destination_dataset_table=bq_recent_questions_table_id)
# Export query result to Cloud Storage.
export_questions_to_gcs = bigquery_to_gcs.BigQueryToCloudStorageOperator(
task_id='export_recent_questions_to_gcs',
source_project_dataset_table=bq_recent_questions_table_id,
destination_cloud_storage_uris=[output_file],
export_format='CSV')
这篇关于如何在ApacheAirflow中运行BigQuery查询,然后将输出的CSV发送到Google Cloud Storage?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持编程学习网!
沃梦达教程
本文标题为:如何在ApacheAirflow中运行BigQuery查询,然后将输出的CSV发送到Google Cloud Storage?
猜你喜欢
- 使用 Cython 将 Python 链接到共享库 2022-01-01
- 如何使用PYSPARK从Spark获得批次行 2022-01-01
- YouTube API v3 返回截断的观看记录 2022-01-01
- CTR 中的 AES 如何用于 Python 和 PyCrypto? 2022-01-01
- 检查具有纬度和经度的地理点是否在 shapefile 中 2022-01-01
- 计算测试数量的Python单元测试 2022-01-01
- 我如何卸载 PyTorch? 2022-01-01
- 使用公司代理使Python3.x Slack(松弛客户端) 2022-01-01
- 我如何透明地重定向一个Python导入? 2022-01-01
- ";find_element_by_name(';name';)";和&QOOT;FIND_ELEMENT(BY NAME,';NAME';)";之间有什么区别? 2022-01-01