公众号
关注微信公众号
移动端
创头条企服版APP

使用Apache Arrow助力PySpark数据处理

5969
搜狐财经 2019-10-06 03:57 抢发第一评

原标题:使用Apache Arrow助力PySpark数据处理

作者:江宇,阿里云EMR技术专家。从事Hadoop内核开发,目前专注于机器学习、深度学习大数据平台的建设。

Apache Arrow从Spark 2.3版本开始被引入,通过列式存储,zero copy等技术,JVM 与Python 之间的数据传输效率得到了大量的提升。本文主要介绍一下Apache Arrow以及Spark中的使用方法。

列式存储简介

在介绍Spark中使用Apache Arrow之前,先简单的介绍一下Apache Arrow以及他背后的一些技术背景。

在大数据时代之前,大部分的存储引擎使用的是按行存储的形式,很多早期的系统,如交易系统、ERP系统等每次处理的是增、删、改、查某一个实体的所有信息,按行存储的话能够快速的定位到单个实体并进行处理。如果使用列存储,对某一个实体的不同属性的操作就需要进行多次随机读写,效率将会是非常差的。

随着大数据时代的到来,尤其是数据分析的不断发展,任务不需要一次读取实体的所有属性,而只关心特定的某些属性,并对这些属性进行aggregate等复杂的操作等。这种情况下行存储将需要读取额外的数据,形成瓶颈。而选择列存储将会减少额外数据的读取,对相同属性的数据还可以进行压缩,大大的加快了处理速度。

以下是行存储和列存储的对比说明,摘自Apache Arrow 官网,上面是一个二维表,由三个属性组成,分别是session_id, timestamp和source_ip。左侧为行存储在内存中的表示,数据按行依次存储,每一行按照列的顺序存储。右侧为列存储在内存中的表示,每一列单独存放,根据batch size等属性来控制一次写入的列簇大小。这样当查询语句只涉及少数列的时候,比如图中SQL查询,只需要过滤session_id列,避免读取所有数据列,减少了大量的I/O损耗,同时考虑到CPU pipeline以及使用CPU SIMD技术等等,将大大的提升查询速度。

Apache Arrow

在大数据领域,列式存储的灵感来自Google于2010年发表的Dremel论文(

http://static.googleusercontent.com/media/research.google.com/zh-CN//pubs/archive/36632.pdf),文中介绍了一种支持嵌套结构的存储格式,并且使用了列式存储的方式提升查询性能,在Dremel论文中还介绍了Google如何使用这种存储格式实现并行查询的。这篇论文影响了Hadoop生态系统发展,之后的Apache Parquet和Apache ORC作为列式存储格式已经被广大的Hadoop生态系统使用,如Spark、Hive、Impala等等。

Apache Arrow在官网上是这样定义的,Apache Arrow是一个跨语言、跨平台的内存数据结构。从这个定义中我们可以看到Apache Arrow与Apache Parquet以及Apache ORC的区别。Parquet与ORC设计的目的针对磁盘数据,在列存储的基础上使用了高效率的压缩算法进行压缩,比如使用snappy、gzip和zlib等算法对列数据进行压缩。所以大部分情况下在数据读取的时候需要首先对数据进行反压缩,并有一定的cpu使用损耗。而Arrow,作为在内存中的数据,并不支持压缩(当然写入磁盘是支持压缩的),Arrow使用dictionary-encoded来进行类似索引的操作。

除了列存储外,Arrow在数据在跨语言的数据传输上具有相当大的威力,Arrow的跨语言特性表示在Arrow的规范中,作者指定了不同数据类型的layout,包括不同原始数据类型在内存中占的比特数,Array数据的组成以及Null值的表示等等。根据这些定义后,在不同的平台和不同的语言中使用Arrow将会采用完全相同的内存结构,因此在不同平台间和不同语言间进行高效数据传输成为了可能。在Arrow之前如果要对不同语言数据进行传输必须要使用序列化与反序列化技术来完成,耗费了大量的CPU资源和时间,而Arrow由于根据规范在内存中的数据结构一致,可以通过共享内存, 内存映射文件等技术来共享Arrow内存结构,省去了序列化与反序列过程。

Spark与Apache Arrow

介绍完Arrow的背景后,来看一下Apache Spark如何使用Arrow来加速PySpark处理的。一直以来,使用PySpark的客户都在抱怨python的效率太低,导致了很多用户转向了使用Scala进行开发。这主要是由于Spark使用Scala语言开发,底层启动的是JVM,而PySpark是Scala中PythonRDD对象启动的一个Python子进程,Python与JVM的通信使用了Py4J, 通过Py4J Python程序能够动态的访问JVM中的Java对象,这一过程使用了linux pipe,在底层JVM需要对RDD进行序列化,在Python端需要对RDD进行反序列化,当数据量较大的时候效率远不如直接使用Scala。流程如下图。

很多数据科学家以及分析人员习惯使用python来进行处理,尤其是使用Pandas和Numpy库来对数据进行后续处理,Spark 2.3以后引入的Arrow将会大大的提升这一效率。我们从代码角度来看一下实现,在Spark 2.4版本的dataframe.py代码中,toPandas的实现为:

ifuse_arrow: try: from pyspark.sql.types import_check_dataframe_convert_date, _check_dataframe_localize_timestamps importpyarrow batches = self._collectAsArrow iflen(batches) > 0: table = pyarrow.Table.from_batches(batches) pdf = table.to_pandas pdf = _check_dataframe_convert_date(pdf, self.schema) return_check_dataframe_localize_timestamps(pdf, timezone) else: returnpd.DataFrame.from_records([], columns=self.columns) except Exceptionase: # Wemight have to allow fallback here aswell but multiple Sparkjobs can # be executed. So, simply fail inthis casefornow. msg = ( "toPandas attempted Arrow optimization because " "'spark.sql.execution.arrow.enabled' is set to true, but has reached " "the error below and can not continue. Note that " "'spark.sql.execution.arrow.fallback.enabled' does not have an effect " "on failures in the middle of computation.n %s"% _exception_message(e)) warnings.warn(msg) raise

如果使用了Arrow(Spark 2.4默认使用),比较重要的一行是_collectAsArrow,_collectAsArrow实现为:

def_collectAsArrow(self): """ Returns all records as a list of ArrowRecordBatches, pyarrow must be installed and available on driver and worker Python environments.

.. note:: Experimental."""withSCCallSiteSync(self._sc) ascss:sock_info = self._jdf.collectAsArrowToPythonreturnlist(_load_from_socket(sock_info, ArrowStreamSerializer))

这里面使用了ArrowStreamSerializer,而ArrowStreamSerializer定义为

classArrowStreamSerializer(Serializer):"""Serializes Arrow record batches as a stream."""

defdump_stream(self, iterator, stream):importpyarrow aspawriter = Nonetry:forbatch initerator:ifwriter isNone:writer = pa.RecordBatchStreamWriter(stream, batch.schema)writer.write_batch(batch)finally:ifwriter isnotNone:writer.close

defload_stream(self, stream):importpyarrow aspareader = pa.open_stream(stream)forbatch inreader:yieldbatch

def__repr__(self):return"ArrowStreamSerializer"

可以看出在这里面,jvm对数据根据Arrow规范设置好内存数据结构进行列式转化后,Python层面并不需要任何的反序列过程,而是直接读取,这也是Arrow高效的原因之一。对比看那一下如果不使用Arrow方法为:

defcollect(self):"""Returns all the records as a list of :class:`Row`.

>>> df.collect[Row(age=2, name=u'Alice'), Row(age=5, name=u'Bob')]"""withSCCallSiteSync(self._sc) ascss:sock_info = self._jdf.collectToPythonreturnlist(_load_from_socket(sock_info, BatchedSerializer(PickleSerializer)))

序列化方法为PickleSerializer,需要对每一条数据使用PickleSerializer进行反序列化。

那如何通过这一特性来进行我们的开发呢,Spark提供了Pandas UDFs功能,即向量化UDF,Pandas UDF主要是通过Arrow将JVM里面的Spark DataFrame传输给Python生成pandas DataFrame,并执行用于定义的UDF。目前有两种类型,一种是Scalar,一种是Grouped Map。

这里主要介绍一下Scalar Python UDFs的使用,以及可能的场景。Scalar Python UDFs可以在select和withColumn中使用,他的输入参数为pandas.Series类型,输出参数为相同长度的pandas.Series。Spark内部会通过Arrow将列式数据根据batch size获取后,批量的将数据转化为pandas.Series类型,并在每个batch都执行用户定义的function。最后将不同batch的结果进行整合,获取最后的数据结果。

以下是官网的一个例子:

importpandas aspd

frompyspark.sql.functions importcol, pandas_udffrompyspark.sql.types importLongType

# Declare the function and create the UDFdefmultiply_func(a, b):returna * b

multiply = pandas_udf(multiply_func, returnType=LongType)

# The function for a pandas_udf should be able to execute with local Pandas datax = pd.Series([1, 2, 3])print(multiply_func(x, x))# 0 1# 1 4# 2 9# dtype: int64

# Create a Spark DataFrame, 'spark' is an existing SparkSessiondf = spark.createDataFrame(pd.DataFrame(x, columns=["x"]))

# Execute function as a Spark vectorized UDFdf.select(multiply(col("x"), col("x"))).show# +-------------------+# |multiply_func(x, x)|# +-------------------+# | 1|# | 4|# | 9|# +-------------------+

首先定义udf,multiply_func,主要功能就是将a、b两列的数据对应行数据相乘获取结果。然后通过pandas_udf装饰器生成Pandas UDF。最后使用df.selecct方法调用Pandas UDF获取结果。这里面要注意的是pandas_udf的输入输出数据是向量化数据,包含了多行,可以根据spark.sql.execution.arrow.maxRecordsPerBatch来设置。

可以看出Pandas UDF使用非常简单,只需要定义好Pandas UDF就可以了。有了Pandas UDF后我们可以很容易的将深度学习框架和Spark进行结合,比如在UDF中使用一些深度学习框架,比如scikit-learn,我们可以对批量的数据分别进行训练。下面是一个简单的例子,利用Pandas UDF来进行训练:

# Load necessary librariesfrompyspark.sql.functions importpandas_udf, PandasUDFTypefrompyspark.sql.types import*importpandas aspdfromscipy.optimize importleastsqimportnumpy asnp

# Create the schema for the resulting data frameschema = StructType([StructField('ID', LongType, True),StructField('p0', DoubleType, True),StructField('p1', DoubleType, True)])# Define the UDF, input and outputs are Pandas DFs@pandas_udf(schema, PandasUDFType.GROUPED_MAP)defanalyze_player(sample_pd):# return empty params in not enough dataif(len(sample_pd.shots) <= 1):returnpd.DataFrame({'ID': [sample_pd.player_id[0]], 'p0': [ 0], 'p1': [ 0]})# Perform curve fitting result = leastsq(fit, [1, 0], args=(sample_pd.shots, sample_pd.hits))# Return the parameters as a Pandas DF returnpd.DataFrame({'ID': [sample_pd.player_id[0]], 'p0': [result[0][0]], 'p1': [result[0][1]]})# perform the UDF and show the results player_df = df.groupby('player_id').apply(analyze_player)display(player_df)

除此之外还可以使用TensorFlow和MXNet等与Spark进行融合,近期阿里云EMR Data Science集群将会推出相应的功能,整合EMR Spark与深度学习框架之间调度与数据交换功能,希望大家关注。返回搜狐,查看更多

责任编辑:

声明:该文章版权归原作者所有,转载目的在于传递更多信息,并不代表本网赞同其观点和对其真实性负责。如涉及作品内容、版权和其它问题,请在30日内与本网联系。
您阅读这篇文章花了0
转发这篇文章只需要1秒钟
喜欢这篇 0
评论一下 0
凯派尔知识产权全新业务全面上线
评论
试试以这些内容开始评论吧
登录后发表评论
凯派尔知识产权全新业务全面上线
阿里云创新中心
×
#热门搜索#
精选双创服务
历史搜索 清空

Tel:18514777506

关注微信公众号

创头条企服版APP