http://pnjrnt.kcpmj.cn

PyOdps 0.4版本发布,从一个故事说起

Star

有这么个故事(如有雷同,纯属巧合)。有一天,某运营同学给某开发同学一个excel文件,里面是个客户清单。

 “帮我查下这些用户的消耗呢”。

开发同学扫了一眼,几百个用户。这个事肯定是可以办的,但是想到麻烦程度,开发同学心里肯定是有不少羊驼经过的啦。

“有点麻烦啊”,开发同学轻轻抱怨。

 “我懂的,把这个表和ODPS里的表join下就好了嘛。”运营同学努努嘴。

“……”。于是,开发同学把excel数据导出成文本格式,然后dship上传到ODPS,ODPS上编写SQL,dship下载,大功告成。

这里说得很轻松,但其实整个过程真的挺麻烦呢。要是这个过程中还要对excel中的数据进行过滤,最终结果还要绘个图,还是需要不少时间。

但是,如果这个开发同学使用PyOdps 0.4+版本新特性,一切就都轻松写意了。

为了模拟这个过程,我们拿movielens 100K的数据做例子,现在本地有一个excel表格,里面有100个需要查询的用户,表格包含两个字段,分别是用户ID和年龄。在ODPS上,我们有一张电影评分表,现在我们要求出这100用户个中年龄在20-30之间,按每个年龄来求电影评分均值,并用条形图展现。

可以想象,这个过程如果按照前面的描述,有多麻烦。那么用PyOdps DataFrame API呢。

首先,我们读出本地Excel文件。

excel

In [14]: from odps.df import read_excel

In [15]: users = read_excel('/Users/chine/userids.xlsx')

In [16]: users.head(10)
|==========================================|   1 /  1  (100.00%)         0s
Out[16]: 
    id  age
0   46   27
1  917   22
2  217   22
3  889   24
4  792   40
5  267   23
6  626   23
7  433   27
8  751   24
9  932   58

In [40]: users.count()
|==========================================|   1 /  1  (100.00%)         0s
100

然后我们用join语句,过滤出来电影评分表中这些用户的评分数据。

In [17]: ratings = DataFrame(o.get_table('pyodps_ml_100k_ratings'))

In [18]: ratings.head(10)
|==========================================|   1 /  1  (100.00%)         2s
Out[18]: 
   user_id  movie_id  rating  unix_timestamp
0      196       242       3       881250949
1      186       302       3       891717742
2       22       377       1       878887116
3      244        51       2       880606923
4      166       346       1       886397596
5      298       474       4       884182806
6      115       265       2       881171488
7      253       465       5       891628467
8      305       451       3       886324817
9        6        86       3       883603013

In [25]: filter_ratings = ratings.join(users.filter(users.age.between(20, 30)), ('user_id', 'id'))[ratings, lambda x, y: y.age]  
# 这里做字段抽取时,可以使用Collection,也可以使用lambda表达式,参数是左右两个Collection

In [26]: filter_ratings.head(10)
|==========================================|   1 /  1  (100.00%)        44s
Out[26]: 
   user_id  movie_id  rating  unix_timestamp  age
0        3       350       3       889237076   23
1        3       332       1       889237224   23
2        3       327       4       889237455   23
3        3       341       1       889237055   23
4        3       317       2       889237482   23
5        3       336       1       889237198   23
6        3       322       3       889237269   23
7        3       323       2       889237269   23
8        3       339       3       889237141   23
9        3       268       3       889236961   23

然后我们就可以按年龄聚合,求出评分均值啦。绘图也一气呵成。

In [28]: age_ratings = filter_ratings.groupby('age').agg(lambda x: x.rating.mean())

In [29]: age_ratings.head(10)
|==========================================|   1 /  1  (100.00%)        30s
Out[29]: 
   age  rating_mean
0   20     4.002309
1   21     4.051643
2   22     3.227513
3   23     3.519174
4   24     3.481013
5   25     3.774744
6   26     3.391509
7   27     3.355130
8   28     3.382883
9   29     3.705660

In [30]: age_ratings.plot(kind='bar', x='age', rot=45)
|==========================================|   1 /  1  (100.00%)        29s
Out[30]: <matplotlib.axes._subplots.AxesSubplot at 0x10b875f10>

plot

超级简单,有木有!

这里的users其实是存在于本地的,而ratings是存在于ODPS上,用户依然可以join这两个Collection。其实对于0.4之前的版本,本地数据上传的接口也很容易(但是无法使用DataFrame API来进行本地过滤),但是对于0.4版本,不管一个Collection是存在于ODPS还是本地,用户都可以执行join和union的操作。

而这一切都源自0.4版本带来的新特性,DataFrame API的pandas计算后端。

 DataFrame API使用pandas计算

 我们知道,PyOdps DataFrame API类似于pandas的接口,但还是有些许不同的,那我们为什么不能用pandas来执行本地计算呢,这样也能充分利用pandas的一些特性,如支持各种数据输入。

所以,除了过去使用odps.models.Table来初始化DataFrame,我们也可以使用pandas DataFrame来初始化。

In [41]: import numpy as np

In [42]: import pandas as pd

In [44]: pandas_df = pd.DataFrame(np.random.random((10, 3)), columns=list('abc'))

In [45]: pandas_df
Out[45]: 
          a         b         c
0  0.583845  0.301504  0.764223
1  0.153269  0.335511  0.455193
2  0.725460  0.460367  0.294741
3  0.315234  0.907264  0.849361
4  0.678395  0.642199  0.746051
5  0.977872  0.841084  0.931561
6  0.903927  0.846036  0.982424
7  0.347098  0.373247  0.193810
8  0.672611  0.242942  0.381713
9  0.461411  0.687164  0.514689

In [46]: df = DataFrame(pandas_df)

In [49]: type(df)
Out[49]: odps.df.core.DataFrame

In [47]: df.head(3)
|==========================================|   1 /  1  (100.00%)         0s
Out[47]: 
          a         b         c
0  0.583845  0.301504  0.764223
1  0.153269  0.335511  0.455193
2  0.725460  0.460367  0.294741

In [48]: df[df.a < 0.5].a.sum()
|==========================================|   1 /  1  (100.00%)         0s
1.2770121422535428

这里转化成PyOdps DataFrame后,所有的计算也一样,变成延迟执行,PyOdps DataFrame在计算前的优化也同样适用。

这样做的好处是,除了前面我们提到的,能结合本地和ODPS做计算外;还有个好处就是方便进行本地调试。所以,我们可以写出以下代码:

DEBUG = True

if DEBUG:
    # 这个操作使用tunnel下载,因此速度很快。对于分区表,需要给出所有分区值。
    df = ratings[:100].to_pandas(wrap=True)
else:
    df = ratings

 在DEBUG的时候,我们能够利用PyOdps DataFrame在对原始表做切片操作时使用tunnel下载,速度很快的特性,选择原始表的一小部分数据来作为本地测试数据。值得注意的是, 本地计算通过不一定能在ODPS上也计算通过,比如自定义函数的沙箱限制 

目前pandas计算后端尚不支持窗口函数。

apply和MapReduce API

使用apply对单行数据调用自定义函数

以前我们对于某个字段,能调用map来使用自定义函数,现在结合axis=1的apply,我们能对一行数据进行操作。

In [13]: ratings.apply(lambda row: row.rating / float(row.age), axis=1, reduce=True, types='float', names='rda').head(10)
|==========================================|   1 /  1  (100.00%)      1m44s
Out[13]: 
        rda
0  0.166667
1  0.166667
2  0.208333
3  0.208333
4  0.125000
5  0.208333
6  0.166667
7  0.208333
8  0.208333
9  0.125000

reduce为True的时候,会返回一个sequence,详细参考文档

MapReduce API

PyOdps DataFrame API也提供MapReduce API。我们还是以movielens 100K为例子,看如何使用。

现在假设我们需要求出每部电影前两名的评分,直接上代码。

from odps.df import output

@output(['movie_id', 'movie_title', 'movie_rating'], ['int', 'string', 'int'])
def mapper(row):
    yield row.movie_id, row.title, row.rating

@output(['title', 'top_rating'], ['string', 'int'])
def reducer(keys):
    i = [0]
    def h(row, done):
        if i[0] < 2:
            yield row.movie_title, row.movie_rating
        i[0] += 1
    return h

In [7]: top_ratings = ratings.map_reduce(mapper, reducer, group='movie_id', sort='movie_rating', ascending=False)

In [10]: top_ratings.head(10)
|==========================================|   1 /  1  (100.00%)      3m48s
Out[10]: 
               title  top_rating
0   Toy Story (1995)           5
1   Toy Story (1995)           5
2   GoldenEye (1995)           5
3   GoldenEye (1995)           5
4  Four Rooms (1995)           5
5  Four Rooms (1995)           5
6  Get Shorty (1995)           5
7  Get Shorty (1995)           5
8     Copycat (1995)           5
9     Copycat (1995)           5

利用刚刚说的本地DEBUG特性,我们也能使用本地计算来验证,计算结果能很快得出。人生苦短!

In [22]: local_ratings = ratings[:100].to_pandas(wrap=True)
|==========================================|   1 /  1  (100.00%)         2s

In [23]: local_ratings.map_reduce(mapper, reducer, group='movie_id', sort='movie_rating', ascending=False).head(10)
|==========================================|   1 /  1  (100.00%)         0s
Out[23]: 
                                               title  top_rating
0  Shanghai Triad (Yao a yao yao dao waipo qiao) ...           5
1                              Twelve Monkeys (1995)           4
2                               Seven (Se7en) (1995)           4
3                         Usual Suspects, The (1995)           5
4                                 Postino, Il (1994)           3
5                          Mr. Holland's Opus (1995)           4
6                                 Taxi Driver (1976)           5
7                                       Crumb (1994)           5
8                                   Star Wars (1977)           5
9                                   Star Wars (1977)           5

 cache机制

在0.4之前的版本,我们提供了一个persist接口,来保存执行结果。但是这个操作是个立即执行接口。现在我们提供cache接口,cache的collection会被单独计算,但不会立即执行。

In [25]: tmpdf = ratings[ratings.title.len() > 10].cache()

In [26]: tmpdf['title', 'movie_id'].head(3)
|==========================================|   1 /  1  (100.00%)        35s
Out[26]: 
                  title  movie_id
0  Seven (Se7en) (1995)        11
1  Event Horizon (1997)       260
2      Star Wars (1977)        50

In [27]: tmpdf.count()  # tmpdf已经被cache,所以我们能立刻计算出数量
|==========================================|   1 /  1  (100.00%)         0s
99823

记住,目前的cache接口,计算的结果还是要落地的,并不是存放在内存中。

而一个collection如果已经被计算过,这个过程会自动触发cache机制,后续的计算过程会从这计算个向后进行,而不再需要从头计算。

其他特性

PyOdps 0.4版本还带来一些其他特性,比如join支持mapjoin(只对ODPS后端有效);Sequence上支持unique和nunique;execute_sql执行时支持设置hints,对于IPython插件,支持使用SET来设置hints,等等。

PyOdps下一步计划

对于PyOdps的DataFrame API来说,我们的短期目标是能完成ODPS SQL能做的所有事情,然后在这个基础上再带来更多SQL不容易做到的,但是却很有用的操作。现在,除了自定义聚合函数,我们已经能基本涵盖所有的SQL场景。

PyOdps非常年轻,期待大家来使用、提feature、贡献代码。

PyOdps在交互式环境下的使用,让探索ODPS数据更容易些

Star

春节结束了,是时候来些新鲜玩意,让我们来看一些酷的东西。

当当当当:隆重推出PyOdps logo。

pyodps

好像跑题了,好吧,让我们言归正传。

我们知道Python提供了一个交互式的环境,能够方便探索和试验想法。同时,IPython是Python交互环境的增强,提供了很多强大的功能;IPython Notebook(现在已经是Jupyter Notebook)则更酷,提供了一个web界面,除了提供交互环境,还是一个记录计算过程的『笔记本』。

PyOdps也提供了一系列在交互式环境下的增强工具,使得探索ODPS数据更方便快捷。

配置ODPS帐号

Python交互环境

同一个环境支持配置若干个ODPS帐号,只需要:

In [1]: from odps.inter import setup

In [2]: setup('**your-access_id**', '**your-access-key**', '**your-project**', endpoint='**your-endpoint**')

此时这个帐号会被配置到一个叫做default的我们称之为room的地方。以后我们再使用这个帐号只需要: 

In [3]: from odps.inter import enter

In [4]: room = enter()

In [5]: o = room.odps

In [6]: o.get_table('dual')
Out[6]: 
odps.Table
  name: odps_test_sqltask_finance.`dual`
  schema:
    c_int_a                 : bigint          
    c_int_b                 : bigint          
    c_double_a              : double          
    c_double_b              : double          
    c_string_a              : string          
    c_string_b              : string          
    c_bool_a                : boolean         
    c_bool_b                : boolean         
    c_datetime_a            : datetime        
    c_datetime_b            : datetime

通过room的odps属性,我们可以取到ODPS的入口,这样就可以接着进行ODPS操作了。配置了别的room比如叫做myodps,要取到ODPS入口,只需要enter('myodps').odps即可。

list_rooms方法能列出所有的room

In [17]: from odps.inter import list_rooms

In [18]: list_rooms()
Out[18]: ['default', 'meta']

IPython及Jupyter Notebook

PyOdps还提供了IPython插件。首先我们需要加载插件:

In [11]: %load_ext odps

In [14]: %enter
Out[14]: 

In [15]: o = _.odps

In [16]: o.get_table('dual')
Out[16]: 
odps.Table
  name: odps_test_sqltask_finance.`dual`
  schema:
    c_int_a                 : bigint          
    c_int_b                 : bigint          
    c_double_a              : double          
    c_double_b              : double          
    c_string_a              : string          
    c_string_b              : string          
    c_bool_a                : boolean         
    c_bool_b                : boolean         
    c_datetime_a            : datetime        
    c_datetime_b            : datetime        

_下划线能取到上一步的结果。

保存常用的ODPS对象

room除了提供ODPS入口的功能,还能保存常用的ODPS对象。比如,我们能把常用的表起个名字,给保存起来。

In [19]: iris = o.get_table('pyodps_iris')

In [23]: room.store('iris_test', iris, desc='保存测试ODPS对象')

In [28]: room['iris_test']
Out[28]: 
odps.Table
  name: odps_test_sqltask_finance.`pyodps_iris`
  schema:
    sepallength           : double      
    sepalwidth            : double      
    petallength           : double      
    petalwidth            : double      
    name                  : string      

In [29]: room.iris_test
Out[29]: 
odps.Table
  name: odps_test_sqltask_finance.`pyodps_iris`
  schema:
    sepallength           : double      
    sepalwidth            : double      
    petallength           : double      
    petalwidth            : double      
    name                  : string 

 这两种方式都可以取到保存的ODPS对象。如果要列出当前room保存的所有ODPS对象,则可以:

In [30]: room.display()
Out[30]: 
default          desc
name                 
iris_test  保存测试ODPS对象
iris       安德森鸢尾花卉数据集

也可以使用IPython插件命令:

In [31]: %stores
Out[31]: 
default          desc
name                 
iris_test  保存测试ODPS对象
iris       安德森鸢尾花卉数据集

要删除某个ODPS对象:

In [32]: room.drop('iris_test')

In [33]: %stores
Out[33]: 
default        desc
name               
iris     安德森鸢尾花卉数据集

执行SQL命令

PyOdps提供了执行SQL的方法,但是在交互式环境下却不甚方便。使用PyOdps提供的IPython插件,可以通过sql命令来直接执行。

在执行时,需要配置全局帐号,如果已经使用了enter方法或者命令,则已经配置;如果没有,则会尝试enter默认的room;如果这也没有配置,则需要使用to_global方法。

In [34]: o = ODPS('**your-access-id**', '**your-secret-access-key**', project='**your-project**', endpoint='**your-end-point**'))

In [35]: o.to_global()

这时我们就可以使用sql命令,单个百分号输入单行SQL,多行SQL使用两个百分号:

In [37]: %sql select * from pyodps_iris limit 5
|==========================================|   1 /  1  (100.00%)         3s
Out[37]: 
   sepallength  sepalwidth  petallength  petalwidth         name
0          5.1         3.5          1.4         0.2  Iris-setosa
1          4.9         3.0          1.4         0.2  Iris-setosa
2          4.7         3.2          1.3         0.2  Iris-setosa
3          4.6         3.1          1.5         0.2  Iris-setosa
4          5.0         3.6          1.4         0.2  Iris-setosa

In [38]: %%sql
   ....: select * from pyodps_iris
   ....: where sepallength < 5 
   ....: limit 5
   ....: 
|==========================================|   1 /  1  (100.00%)        15s
Out[38]: 
   sepallength  sepalwidth  petallength  petalwidth         name
0          4.9         3.0          1.4         0.2  Iris-setosa
1          4.7         3.2          1.3         0.2  Iris-setosa
2          4.6         3.1          1.5         0.2  Iris-setosa
3          4.6         3.4          1.4         0.3  Iris-setosa
4          4.4         2.9          1.4         0.2  Iris-setosa

在Jupyter Notebook里,多行SQL会提供语法高亮:

 jupyter-odps-sql

持久化pandas DataFrame为ODPS表

使用persist命令即可:

In [42]: import pandas as pd

In [43]: df = pd.read_csv('http://aticari.com.raw.github.com/pydata/pandas/master/pandas/tests/data/iris.csv')

In [48]: %persist df pyodps_iris_test
|==========================================| 150 /150  (100.00%)         0s

In [49]: from odps.df import DataFrame

In [61]: DataFrame(o.get_table('pyodps_iris_test')).head(5)
|==========================================|   1 /  1  (100.00%)         0s
Out[61]: 
   sepallength  sepalwidth  petallength  petalwidth         name
0          5.1         3.5          1.4         0.2  Iris-setosa
1          4.9         3.0          1.4         0.2  Iris-setosa
2          4.7         3.2          1.3         0.2  Iris-setosa
3          4.6         3.1          1.5         0.2  Iris-setosa
4          5.0         3.6          1.4         0.2  Iris-setosa

其它交互式方面的增强

在交互式环境下,我们repr一个ODPS表的时候,会打印这个表的schema,包括字段注释,省去了查这个表的meta信息。

In [41]: o.get_table('china_stock', project='odpsdemo')
Out[41]: 
odps.Table
  name: odpsdemo.`china_stock`
  schema:
    d               : string      # 日期
    c               : string      # 股票代码
    n               : string      # 股票名称
    t_close         : double      # 收盘价
    high            : double      # 最高价
    low             : double      # 最低价
    opening         : double      # 开盘价
    l_close         : double      # 昨日收盘价
    chg             : double      # 涨跌额
    chg_pct         : double      # 涨跌幅
    vol             : bigint      # 成交量
    turnover        : double      # 成交额
  partitions:
    code            : string      # 股票代码

当使用sql命令或者使用DataFrame框架计算的时候,在终端或者Jupyter Notebook里都提供一个进度条来方便用户来查看执行进度。

 jupyter-odps-sql2

后记

PyOdps现在处于快速迭代阶段,我们所有的开发都是开源的。大家如果需要什么功能,可以给我们提issue(GitHub);也可以直接参与到开发,直接给我们发Pull Request就行啦。

欢迎大家一起来建设PyOdps。

github:http://osinina.com.github.com/aliyun/aliyun-odps-python-sdk
文档:http://pyodps.readthedocs_org.lemez.cn/zh_CN/latest/ 

PyOdps DataFrame来临,大数据分析从未如此简单!

Star

PyOdps正式发布DataFrame框架(此处应掌声经久不息),有了它,就像卷福有了花生,比翼双飞,哦不,如虎添翼。

快过年了,大家一定没心情看长篇大论的分析文章。作为介绍PyOdps DataFrame的开篇文章,我只说说其用起来爽的地方。其余的部分,从使用、问题到实现原理,我会分文章细说。

如果你不知道什么是ODPS,ODPS是阿里云旗下的大数据计算服务。如果不知道是DataFrame什么,它是存在于pandas和R里的数据结构,你可以把它当做是表结构。如果想快速浏览PyOdps DataFrame能做什么,可以看我们的快速开始文档

让我们开始吧。

 强类型支持

DataFrame API在计算的过程中,从字段到类型都是确定的,因此,若取一个不存在的字段,会丢给你个大大的异常。

In [4]: from odps.df import DataFrame

In [5]: iris = DataFrame(o.get_table('pyodps_iris'))

In [6]: iris.dtypes
Out[6]: 
odps.Schema {
  sepallength           float64       
  sepalwidth            float64       
  petallength           float64       
  petalwidth            float64       
  name                  string        
}

In [7]: iris.field_not_exist
---------------------------------------------------------------------------
AttributeError                            Traceback (most recent call last)
 in ()
----> 1 iris.field_not_exist

/Users/chine/Workspace/pyodps/odps/df/expr/expressions.pyc in __getattr__(self, attr)
    510                 return self[attr]
    511 
--> 512             raise e
    513 
    514     def output_type(self):

AttributeError: 'DataFrame' object has no attribute 'field_not_exist'

如果取存在的字段,自然是没问题啦。

In [11]: iris.sepalwidth.head(5)
|==========================================|   1 /  1  (100.00%)         0s
Out[11]: 
   sepalwidth
0         3.5
1         3.0
2         3.2
3         3.1
4         3.6

有些方法,比如说取平均数,非数字肯定是不能调用的咯。

In [12]: iris['name'].mean()
---------------------------------------------------------------------------
AttributeError                            Traceback (most recent call last)
 in ()
----> 1 iris['name'].mean()

/Users/chine/Workspace/pyodps/odps/df/expr/expressions.pyc in __getattribute__(self, attr)
    171                 if new_attr in self._get_attr('_args'):
    172                     return self._get_arg(new_attr)
--> 173             raise e
    174 
    175     def _defunc(self, field):

AttributeError: 'Column' object has no attribute 'mean'

数字类型的字段则可以调用。

In [10]: iris.sepalwidth.mean()
|==========================================|   1 /  1  (100.00%)        27s
3.0540000000000007

操作数据如此简单

我们常常需要select一个表字段,但是只是不需要一个字段,却需要写一堆SQL。在DataFrame API里,调用exclude方法就行了。 

In [13]: iris.exclude('name').head(5)
|==========================================|   1 /  1  (100.00%)         0s
Out[13]: 
   sepallength  sepalwidth  petallength  petalwidth
0          5.1         3.5          1.4         0.2
1          4.9         3.0          1.4         0.2
2          4.7         3.2          1.3         0.2
3          4.6         3.1          1.5         0.2
4          5.0         3.6          1.4         0.2

使用DataFrame写出来的代码,天然有Python的特点,清晰易懂。某些快捷API,能使得操作更加简单。

比如我们要取name的个数从大到小前10的值分别是多少。

In [16]: iris.groupby('name').agg(count=iris.name.count()).sort('count', ascending=False)[:10]
|==========================================|   1 /  1  (100.00%)        37s
Out[16]: 
              name  count
0   Iris-virginica     50
1  Iris-versicolor     50
2      Iris-setosa     50

直接使用value_counts来得更快。

In [17]: iris['name'].value_counts()[:10]
|==========================================|   1 /  1  (100.00%)        34s
Out[17]: 
              name  count
0   Iris-virginica     50
1  Iris-versicolor     50
2      Iris-setosa     50

很多时候,写一个SQL,我们需要检查中间结果的执行,就显得很麻烦,我们常常需要选取中间的SQL来执行,在DataFrame的世界,中间结果赋值一个变量就行了,这都不是事儿。

计算的过程和结果展示

在DataFrame的执行过程中,我们在终端里和IPython notebook里,都会有进度条显示任务的完成情况。结果的输出也会有更好的格式化展现,在IPython notebook里会以HTML表格的形式展现。

IPython notebook

绘图集成

DataFrame的计算结果能直接调用plot方法来制作图表,不过绘图需要安装pandasmatplotlib

In [21]: iris.plot()
|==========================================|   1 /  1  (100.00%)         0s
Out[21]: <matplotlib.axes._subplots.AxesSubplot at 0x10feab610>

plot

 导出数据再用excel画图,这事儿……咳咳,未来我们还会提供更好的可视化展现,比如提供交互式的图表。

自定义函数和Lambda表达式

DataFrame支持map方法,想对一个字段调用自定义函数非常方便。

In [30]: GLOBAL_VAR = 3.2

In [31]: def myfunc(x):
    if x < GLOBAL_VAR:
        return 0
    else:
        return 1

In [32]: iris['sepalwidth', iris.sepalwidth.map(myfunc).rename('sepalwidth2')].head(5)
|==========================================|   1 /  1  (100.00%)        18s
Out[32]: 
   sepalwidth  sepalwidth2
0         3.5            1
1         3.0            0
2         3.2            1
3         3.1            0
4         3.6            1

可惜apply和聚合的自定义函数,暂时还不支持,期待吧!

延迟执行

DataFrame API的所有操作并不会立即执行,除非用户显式调用execute方法或者一些立即执行的方法。在交互式界面下,打印或者repr对象的时候,内部也会调用execute方法,方便用户使用。

执行优化

DataFrame框架在执行前会对整个查询进行优化,比如连续的projection合并。当用户查看原始表(或者选取某个分区)时,会使用tunnel来获取结果。

PyOdps DataFrame的下一步发展

好了,说了这么多,聊一聊我们DataFrame接下来要做的事情,首先,我们会实现多计算后端,包括pandas,当数据量比较小的时候,我们可以使用本地计算,而不需要等待ODPS的调度;其次,DataFrame框架和我们的机器学习部分会有更多的集成,从数据分析,到算法,一气呵成。

PyOdps非常年轻,才短短几个月的时间。我们的整个项目,在GitHub上开源。我个人非常希望大家能参与到开源的建设中来,能提个建议也是极好的。所以,我会写文章详述我们PyOdps的实现原理,希望大家一起把ODPS建设得更好。

github:http://gphiles.com.github.com/aliyun/aliyun-odps-python-sdk
文档:http://pyodps.readthedocs_org.antifufu.com/zh_CN/latest/

Dpark源码剖析一(概述)

Dpark/Spark中最重要的核心就是RDD(弹性分布式数据集,Resilient Distributed Datasets),为了给今后的分析打下基础,这篇文章首先会解释RDD相关的重要概念。接着会简单介绍dpark中的另外两个重要核心Accumulator(累加器)和Broadcast(广播变量),关于这两者这里只做简单介绍,我们后面会对分别单独对源码做分析。

Spark不光是用函数式语言scala写的,它也到处体现着函数式语言的特性。Dpark当然也继承了这些特性,这个我们接下来会逐一分析。类似于spark,dpark也是master-slave架构的,但不同于spark,dpark中仅提供了三种运行方式:本地模式(local,单进程)、多进程模式(实际上也是单机)以及mesos模式(使用mesos来调度达到分布式计算的目的)。

RDD

首先需要说明的是弹性分布式数据集(RDD),dpark和最新的spark这部分已经有所不同,主要体现在对内存和磁盘抽象上,下面包括以后的文章都会以dpark为准。

RDD听起来很玄乎,其实很简单。它是一个抽象,本质上表示大量的可迭代数据,这些数据可以直接存在于内存中,也可以延迟读取。但是数据量太大,怎么办?尝试把数据分成各个分片(split),每个分片对应着一部分数据,这样可以将一个RDD分开来存取和执行运算。RDD是不可变的,这符合函数式编程中的不可变数据的特性,不要小看这个特性,它其实在分布式计算的环境中非常重要,能简化分布式环境下的计算。

rdd

在dpark中,一个RDD中的元素通常来说有两种:一种是单一的值,还有一种是key和value组成的对(元组表达,(key, value))。如上图所示,RDD的数据来源也通常有两种:一种是Python数据集合(如list等),也可以是分布式文件系统(本地文件系统亦可,这种情况可以用在本地模式和多进程模式)。

我们来看看RDD的初始化以及重要的函数。

class Split(object):
    def __init__(self, idx):
        self.index = idx

class RDD(object):
    def __init__(self, ctx):
        self.ctx = ctx
        self.id = RDD.newId()
        self._splits = []
        self.dependencies = []
        self.aggregator = None
        self._partitioner = None
        self.shouldCache = False
        self.snapshot_path = None
        ctx.init()
        self.err = ctx.options.err
        self.mem = ctx.options.mem

    @cached
    def __getstate__(self):
        d = dict(self.__dict__)
        d.pop('dependencies', None)
        d.pop('_splits', None)
        d.pop('ctx', None)
        return d

    def _preferredLocations(self, split):
        return []

    def preferredLocations(self, split):
        if self.shouldCache:
            locs = env.cacheTracker.getCachedLocs(self.id, split.index)
            if locs:
                return locs
        return self._preferredLocations(split)

    def cache(self):
        self.shouldCache = True
        self._pickle_cache = None # clear pickle cache
        return self

    def snapshot(self, path=None):
        if path is None:
            path = self.ctx.options.snapshot_dir
        if path:
            ident = '%d_%x' % (self.id, hash(str(self)))
            path = os.path.join(path, ident)
            if not os.path.exists(path):
                try: os.makedirs(path)
                except OSError: pass
            self.snapshot_path = path
        return self

    def compute(self, split):
        raise NotImplementedError

    def iterator(self, split):
        if self.snapshot_path:
            p = os.path.join(self.snapshot_path, str(split.index))
            if os.path.exists(p):
                v = cPickle.loads(open(p).read())
            else:
                v = list(self.compute(split))
                with open(p, 'w') as f:
                    f.write(cPickle.dumps(v))
            return v

        if self.shouldCache:
            return env.cacheTracker.getOrCompute(self, split)
        else:
            return self.compute(split)

这里Split类非常简单,只有个索引号index,表示是第几个分片。RDD的属性中的_splits指的是该RDD的所有分片。Split及其子类的作用是,告诉RDD该分片该如何计算,是读取分布式文件系统中的数据呢,还是读取内存中的列表的某一部分?Split中可以存放数据,也可以只提供读取数据需要的参数。

函数式编程中函数是一等公民,RDD也拥有大量的函数来进行计算。这些计算可以分为两类:变换(Transformations)操作(Actions)变换比如说map函数,它的参数是一个函数func,我们对于RDD中的每个元素,调用func函数将其变为另一个元素,这样就组成了新的RDD。类似的这种计算过程不是立即执行的,可能经历过多个变换后,等到需要将结果返回主程序时才执行,这个时候,从一个RDD到另一个RDD就是一个变换的过程。对于操作来说,执行的时候,相关的计算会立刻执行,并将结果返回(比如说reduce、collect等等)。计算的结果可以直接写入存储(比如调用saveAsTextFile),可以转化为Python集合数据(比如collect方法,返回包含全部数据的列表),也可以返回标量的结果(比如count方法,返回所有元素的个数)。

恰好是由于RDD的不可变性,在变换的过程中,我们只需记录下足够的信息,这时就可以在真正需要数据时执行计算。这种惰性计算的特点使得dpark/spark的计算相当高效。

那么这些信息包括什么呢?RDD的dependencies就是之一,它记录下了当前的RDD是从哪个或者哪些RDD得到的,这些依赖是Dependency类或者子类的实例,这在dpark中被称为血统(lineage),听起来很高大上吧?通过这些依赖,你就能得到一个RDD的父母或者祖先有哪些。关于具体的依赖关系,我们会在接下来文章中结合RDD的各种变换来详细说明。

这里需要先提下依赖的大致分类。dpark中,dependency可以分为两大类,窄依赖和宽依赖。什么叫窄依赖?非宽依赖是也,你会说,这不废话么?那就让我们先看看什么叫宽依赖。对于当前的RDD的一个分片,它的数据可能来自依赖RDD的任意分片,这就叫宽依赖。比如说对于存放键值对的依赖RDD,我们执行groupByKey操作,也就是把key相同的值都聚合起来,这时候,key可能存在于依赖RDD的任意分片,这就叫宽依赖。因此,窄依赖就显而易见了,对于当前RDD的一个分片,它只可能源自于依赖RDD的有限个分片,这就是窄依赖,比如说map操作,当前RDD的某分片中的每个元素就是由依赖RDD的对应的分片数据算来的。之所以这里先提下宽依赖和窄依赖,对后面的理解大有裨益。

RDD类中包含了一个compute接口,它的参数是一个分片。对于不同的RDD的子类,这个方法提供了给定分片的数据的生成方法。但是在这个接口外还包装了一个iterator方法,这才是内部运算时真正调用的方法,为什么呢?这里涉及到了snapshot和cache。下面逐一说明。

首先是snapshot。RDD中通过snapshot方法,将参数snapshot_path设置为创建的路径。而在iterator调用时,会根据snapshot_path是否为空来判断是否做snapshot。snapshot时,需要提供所有运算机器能够访问的共享的文件路径(包括分布式文件系统),这样,在iterator时首先判断是否需要做snapshot,如果要则判断对应的数据文件是否在,如果不在,则先创建,再将compute计算后的结果序列化后直接写入文件;如果存在,则直接读出并反序列化。

cache和snapshot的区别在于,cache是写入内存(在本地模式下);或者本地文件系统(其他模式),并让master记录下在那个机器做的cache以及路径(这么说不准确,但是可以这么理解)。这样就不需要一个共享的文件路径,同样在iterator调用的时候可以先从cache中读出(这里可能涉及到远程读取,因为写入的地方可能不在本地)。这里的过程会复杂得多,我们会在以后专门讲解,同学们只要留下印象即可。

另外一对重要的方法是_preferredLocations和preferredLocations(就一个下划线的区别),它们都表示该RDD的某个分片计算时期望执行的地址(在哪个机器上执行)。首先我们说说公有方法preferredLocations,首先如果一个RDD的分片做过cache,那么当然希望在有缓存的机器上执行,否则就返回私有方法_preferredLocations的结果。对于这个私有方法来说,具体的RDD子类会覆盖这个方法。比如说,一个RDD从分布式文件系统读取数据,我们知道,分布式文件中的文件以块的形式放在不同的机器上,那么我们当然希望这个RDD期望运行的地址是在读取块所在的机器上,这样能减少网络的开销;又比如,有着某个RDD的某个分片依赖于另一个RDD的一个分片,前者更倾向于在后者机器上执行计算,否则还需要进行一次拷贝操作。

值得一提的是RDD类的__getstate__方法,这个方法告诉序列化模块应该要序列化哪些属性。ctx表示表示程序运行入口上下文,序列化不需要ok。但是为什么不序列化依赖和分片呢?这个留到后面解答。

Accumulator和Broadcast(共享变量)

除了RDD,dpark还提供了两种集群各机器间可以共享的变量。一个是累加器(Accumulator),一种在运行时只可以进行累加操作的变量;还有一个是广播变量(Broadcast),它用来将一个通常是较大的变量发布到所有的计算节点,这样避免了序列化和反序列化的开销。这里一笔带过,以后详述。

Dpark源码剖析

Spark是一个当下很火的集群计算平台,来自于加州大学伯克利分校的AMPLab,目前从Apache孵化器毕业,成为了Apache基金会下的顶级项目。现在的spark类似于hadoop,逐渐成长为一种生态系统。如下图所示,其上层包含了一系列计算工具,包括:

  • Shark for SQL,查询hadoop数据的分布式SQL查询引擎,类似于hadoop上的hive,但效率更高。
  • Streaming,利用spark来进行大规模流式数据处理。
  • MLlib,基于spark的机器学习库。
  • Graphx,spark之上的图计算框架,支持Pregel和GraphLab的计算模型。

spark

从spark创建之初,其对hadoop的支持就相当充分,当然一部分原因来自于Scala语言和Java语言交互的便利性。由上图可以看出,spark支持从HDFS等读出数据。尽管最开始,spark最先支持的是mesos(一个统一资源管理和调度平台),但在hadoop Yarn推出之后亦能很好地支持。除此之外,Spark能以本地多线程方式运行(local模式),也能以脱离mesos和Yarn的方式运行(standalone模式)。

Spark发展地如火如荼,尽管对于正常使用来说,我们不需要了解其内部的实现。但是要想深入优化上层应用,对底层的实现的了解是在所难免的。但是由于Scala语言的障碍,要想学习Spark需要从scala开始,学习曲线未免长了点。好在豆瓣的同学实现了一个spark的Python克隆:Dpark,其完全用Python语言翻译了spark。经过一段时间的研究,我对dpark的源码也有了一定程度的了解,因此就想写个系列来介绍其运行的原理。

然而Dpark有着不少的缺陷,下面就一一列举:

  1. 由于dpark翻译的时间较早(应该是spark 0.5前的版本,而spark目前最新版本已经是0.9),支持已经非常陈旧了。尽管dpark中RDD(spark中重要概念,表示弹性分布式数据集)还是内存层面上的抽象,而spark中的RDD是对内存和磁盘的统一抽象,另外缺少了上层的计算工具,但是其整体的思想是没有什么变化的。
  2. 对Hadoop的支持非常糟糕,dpark不支持从HDFS读取数据,取代支持的为分布式文件系统MooseFS;另外也不支持Yarn,当然还是支持mesos来进行调度的。
  3. 社区滞后,dpark目前的资料匮乏,开发也仅限于豆瓣的同学,没有来自社区的力量。spark则正好相反,社区的发展日益蓬勃,从底层到上层都有来自社区的大量贡献。
  4. GIL的限制,dpark中用多进程取代了spark中的多线程。
  5. 性能不佳,根据豆瓣的官方资料,dpark的性能甚至不如hadoop,这点让我比较吃惊。看来Python语言的性能劣势抵消了架构上带来的好处。

这些缺陷不能说dpark没有意义,其还是很好的学习工具。本来我研究dpark的初衷,是想完善其对Hadoop的支持,并移植spark graphx到dpark上。现在我更倾向于回归spark中,但是通过对dpark的研究,上手spark应该也更加容易。

Spark经过这么多版本的迭代,基本思想没有太大变化。我希望通过这个系列,能让大家能更容易地了解dpark/spark。本文也会作为系列的索引,并随着系列的进展而更新。

索引

  1. 概述

关于作者

残阳似血(@秦续业),程序猿一枚,把梦想揣进口袋的挨踢工作者。现加入阿里云,研究僧毕业于上海交通大学软件学院ADC实验室。熟悉分布式数据分析(DataFrame并行化框架)、基于图模型的分布式数据库和并行计算、Dpark/Spark以及Python web开发(Django、tornado)等。

博客分类

点击排行

标签云

扫描访问

主题

残阳似血的微博

北京快乐8上下盘软件 北京pk10分助手 上海时时乐哪里有卖 北京快乐8怎么算pc 北京pk10冠亚和刷水 上海时时乐500期走势图
上海时时乐怎么赚钱 上海时时乐预测 什么是时时乐 北京时时彩8 pk10如何看七码 pk10赛车5码45678公式
重庆时时彩稳赚技巧 上海时时乐怎么赚钱 上海时时乐彩票控 时时彩后一10中9技巧 时时彩后一10中9技巧 上海时时乐彩票控
北京赛车pk10软件 北京赛车输钱技巧 北京pk10计划单期 北京赛车pk10软件下载 上海时时乐走势图连线 北京pk10倍投方案
快客加盟 早餐包子店加盟 豆浆早餐加盟 早龙早餐加盟 早点加盟网
江苏早餐加盟 早餐连锁 加盟 哪里有早点加盟 安徽早餐加盟 河南早餐加盟
移动早餐加盟 早点小吃店加盟 健康早餐店加盟 春光早餐加盟 全国连锁加盟
清美早餐加盟 全球加盟网 连锁店加盟 来加盟 早餐粥加盟
极速赛车下载 江苏11选5开始奖结果 重庆时时彩注册资料锁定了怎么解锁 新疆喜乐彩百科 75秒极速时时彩官网
21点玩法 河南快3走势图表 二分彩计划 江西快三出奖结果 阿里彩票
上海时时乐走势图 河南11选5开奖结果走势图 私彩能控制官方开奖 重庆幸运农场几点开奖 山西十一选五
pk10计划 香港六合彩网站 河南11选5中奖奖金 新疆25选7开奖结果查询 香港六合彩全年资料