Using dask to read data from Hive(使用DASK从配置单元读取数据)
本文介绍了使用DASK从配置单元读取数据的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!
问题描述
我正在使用impala.util
中的as_pandas
实用程序读取从配置单元获取的dataframe
表单中的数据。然而,使用 pandas ,我想我将无法处理大量数据,而且速度也会更慢。我一直在阅读有关DASK的文章,它为读取大型数据文件提供了出色的功能。如何使用它高效地从配置单元获取数据。
def as_dask(cursor):
"""Return a DataFrame out of an impyla cursor.
This will pull the entire result set into memory. For richer pandas-
like functionality on distributed data sets, see the Ibis project.
Parameters
----------
cursor : `HiveServer2Cursor`
The cursor object that has a result set waiting to be fetched.
Returns
-------
DataFrame
"""
import pandas as pd
import dask
import dask.dataframe as dd
names = [metadata[0] for metadata in cursor.description]
dfs = dask.delayed(pd.DataFrame.from_records)(cursor.fetchall(),
columns=names)
return dd.from_delayed(dfs).compute()
推荐答案
当前没有直接的方法可以做到这一点。您最好看看dask.dataframe.read_sql_table的实现和intake-sql中的类似代码--您可能需要一种对数据进行分区的方法,并让每个工作者通过调用delayed()
来获取一个分区。然后可以使用dd.from_delayed
和dd.concat
将这些碎片缝合在一起。
-编辑-
您的函数将延迟的想法放在前面。您正在延迟并立即在操作单个游标的函数中实体化数据--它不能并行化,如果数据量很大(这就是您尝试这样做的原因),它将耗尽您的内存。
假设您可以组成一个由10个查询组成的集合,其中每个查询获取数据的不同部分;不要不使用偏移量,而是对由配置单元索引的某些列使用条件。 您需要执行以下操作:
queries = [SQL_STATEMENT.format(i) for i in range(10)]
def query_to_df(query):
cursor = impyla.execute(query)
return pd.DataFrame.from_records(cursor.fetchall())
现在,您有了一个返回分区且不依赖全局对象的函数-它只接受字符串作为输入。
parts = [dask.delayed(query_to_df)(q) for q in queries]
df = dd.from_delayed(parts)
这篇关于使用DASK从配置单元读取数据的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持编程学习网!
沃梦达教程
本文标题为:使用DASK从配置单元读取数据
猜你喜欢
- CTR 中的 AES 如何用于 Python 和 PyCrypto? 2022-01-01
- 使用 Cython 将 Python 链接到共享库 2022-01-01
- 我如何卸载 PyTorch? 2022-01-01
- YouTube API v3 返回截断的观看记录 2022-01-01
- 计算测试数量的Python单元测试 2022-01-01
- 检查具有纬度和经度的地理点是否在 shapefile 中 2022-01-01
- 如何使用PYSPARK从Spark获得批次行 2022-01-01
- 我如何透明地重定向一个Python导入? 2022-01-01
- ";find_element_by_name(';name';)";和&QOOT;FIND_ELEMENT(BY NAME,';NAME';)";之间有什么区别? 2022-01-01
- 使用公司代理使Python3.x Slack(松弛客户端) 2022-01-01