使用Python进行并发编程

2015年3月20日 05:46

让计算机程序并发的运行是一个经常被讨论的话题,今天我想讨论一下Python下的各种并发方式。

并发方式

线程(Thread

多线程几乎是每一个程序猿在使用每一种语言时都会首先想到用于解决并发的工具(JS程序员请回避),使用多线程可以有效的利用CPU资源(Python例外)。然而多线程所带来的程序的复杂度也不可避免,尤其是对竞争资源的同步问题。

然而在python中由于使用了全局解释锁(GIL)的原因,代码并不能同时在多核上并发的运行,也就是说,Python的多线程不能并发,很多人会发现使用多线程来改进自己的Python代码后,程序的运行效率却下降了,这是多么蛋疼的一件事呀!如果想了解更多细节,推荐阅读这篇文章。实际上使用多线程的编程模型是很困难的,程序员很容易犯错,这并不是程序员的错误,因为并行思维是反人类的,我们大多数人的思维是串行(精神分裂不讨论),而且冯诺依曼设计的计算机架构也是以顺序执行为基础的。所以如果你总是不能把你的多线程程序搞定,恭喜你,你是个思维正常的程序猿:)

Python提供两组线程的接口,一组是thread模块,提供基础的,低等级(Low Level)接口,使用Function作为线程的运行体。还有一组是threading模块,提供更容易使用的基于对象的接口(类似于Java),可以继承Thread对象来实现线程,还提供了其它一些线程相关的对象,例如Timer,Lock

使用thread模块的例子

import thread
def worker():
    """thread worker function"""
    print 'Worker'
thread.start_new_thread(worker)

使用threading模块的例子

import threading
def worker():
    """thread worker function"""
    print 'Worker'
t = threading.Thread(target=worker)
t.start()

 或者Java Style

import threading
class worker(threading.Thread):
    def __init__(self):
        pass
    def run():
        """thread worker function"""
        print 'Worker'
    
t = worker()
t.start()


进程 (Process)

由于前文提到的全局解释锁的问题,Python下比较好的并行方式是使用多进程,这样可以非常有效的使用CPU资源,并实现真正意义上的并发。当然,进程的开销比线程要大,也就是说如果你要创建数量惊人的并发进程的话,需要考虑一下你的机器是不是有一颗强大的心。

Python的mutliprocess模块和threading具有类似的接口。

from multiprocessing import Process

def worker():
    """thread worker function"""
    print 'Worker'
p = Process(target=worker)
p.start()
p.join()

由于线程共享相同的地址空间和内存,所以线程之间的通信是非常容易的,然而进程之间的通信就要复杂一些了。常见的进程间通信有,管道,消息队列,Socket接口(TCP/IP)等等。

Python的mutliprocess模块提供了封装好的管道和队列,可以方便的在进程间传递消息。

Python进程间的同步使用锁,这一点喝线程是一样的。

另外,Python还提供了进程池Pool对象,可以方便的管理和控制线程。


远程分布式主机 (Distributed Node)

随着大数据时代的到临,摩尔定理在单机上似乎已经失去了效果,数据的计算和处理需要分布式的计算机网络来运行,程序并行的运行在多个主机节点上,已经是现在的软件架构所必需考虑的问题。

远程主机间的进程间通信有几种常见的方式

  • TCP/IP

    TCP/IP是所有远程通信的基础,然而API比较低级别,使用起来比较繁琐,所以一般不会考虑

  • 远程方法调用 Remote Function Call

    RPC是早期的远程进程间通信的手段。Python下有一个开源的实现RPyC

  • 远程对象 Remote Object

    远程对象是更高级别的封装,程序可以想操作本地对象一样去操作一个远程对象在本地的代理。远程对象最广为使用的规范CORBA,CORBA最大的好处是可以在不同语言和平台中进行通信。当让不用的语言和平台还有一些各自的远程对象实现,例如Java的RMI,MS的DCOM

    Python的开源实现,有许多对远程对象的支持

  • 消息队列 Message Queue

    比起RPC或者远程对象,消息是一种更为灵活的通信手段,常见的支持Python接口的消息机制有

在远程主机上执行并发和本地的多进程并没有非常大的差异,都需要解决进程间通信的问题。当然对远程进程的管理和协调比起本地要复杂。

Python下有许多开源的框架来支持分布式的并发,提供有效的管理手段包括:

  • Celery 

    Celery是一个非常成熟的Python分布式框架,可以在分布式的系统中,异步的执行任务,并提供有效的管理和调度功能。参考这里

  • SCOOP

    SCOOP Scalable COncurrent Operations in Python)提供简单易用的分布式调用接口,使用Future接口来进行并发。

  • Dispy

    相比起Celery和SCOOP,Dispy提供更为轻量级的分布式并行服务

  • PP 

    PP (Parallel Python)是另外一个轻量级的Python并行服务, 参考这里

  • Asyncoro

    Asyncoro是另一个利用Generator实现分布式并发的Python框架,

当然还有许多其它的系统,我没有一一列出

另外,许多的分布式系统多提供了对Python接口的支持,例如Spark


伪线程 (Pseudo-Thread)

还有一种并发手段并不常见,我们可以称之为伪线程,就是看上去像是线程,使用的接口类似线程接口,但是实际使用非线程的方式,对应的线程开销也不存的。

  • greenlet 

    greenlet提供轻量级的coroutines来支持进程内的并发。

    greenlet是Stackless的一个副产品,使用tasklet来支持一中被称之为微线程(mirco-thread)的技术,这里是一个使用greenlet的伪线程的例子

from greenlet import greenlet

def test1():
    print 12
    gr2.switch()
    print 34
    
def test2():
    print 56
    gr1.switch()
    print 78
    
gr1 = greenlet(test1)
gr2 = greenlet(test2)
gr1.switch()

        运行以上程序得到如下结果:

12
56
34

伪线程gr1 switch会打印12,然后调用gr2 switch得到56,然后switch回到gr1,打印34,然后伪线程gr1结束,程序退出,所以78永远不会被打印。通过这个例子我们可以看出,使用伪线程,我们可以有效的控制程序的执行流程,但是伪线程并不存在真正意义上的并发。

eventlet,gevent和concurence都是基于greenlet提供并发的。

eventlet是一个提供网络调用并发的Python库,使用者可以以非阻塞的方式累调用阻塞的IO操作。

import eventlet
from eventlet.green import urllib2

urls = ['http://www.google.com', 'http://www.example.com', 'http://www.python.org']

def fetch(url):
    return urllib2.urlopen(url).read()

pool = eventlet.GreenPool()

for body in pool.imap(fetch, urls):
    print("got body", len(body))

执行结果如下

('got body', 17629)
('got body', 1270)
('got body', 46949)

eventlet为了支持generator的操作对urllib2做了修改,接口和urllib2是一致的。这里的GreenPool和Python的Pool接口一致。

gevent和eventlet类似,关于它们的差异大家可以参考这篇文章

import gevent
from gevent import socket
urls = ['www.google.com', 'www.example.com', 'www.python.org']
jobs = [gevent.spawn(socket.gethostbyname, url) for url in urls]
gevent.joinall(jobs, timeout=2)

print [job.value for job in jobs]

执行结果如下:

['206.169.145.226', '93.184.216.34', '23.235.39.223']
  • concurence https://github.com/concurrence/concurrence

concurence是另外一个利用greenlet提供网络并发的开源库,我没有用过,大家可以自己尝试一下。


实战运用

通常需要用到并发的场合有两种,一种是计算密集型,也就是说你的程序需要大量的CPU资源;另一种是IO密集型,程序可能有大量的读写操作,包括读写文件,收发网络请求等等。

计算密集型

对应计算密集型的应用,我们选用著名的蒙特卡洛算法来计算PI值。基本原理如下

蒙特卡洛算法利用统计学原理来模拟计算圆周率,在一个正方形中,一个随机的点落在1/4圆的区域(红色点)的概率与其面积成正比。也就该概率 p = Pi * R*R /4  : R* R , 其中R是正方形的边长,圆的半径。也就是说该概率是圆周率的1/4, 利用这个结论,只要我们模拟出点落在四分之一圆上的概率就可以知道圆周率了,为了得到这个概率,我们可以通过大量的实验,也就是生成大量的点,看看这个点在哪个区域,然后统计出结果。

基本算法如下:

from math import hypot
from random import random

def test(tries):
    return sum(hypot(random(), random()) < 1 for _ in range(tries))

这里test方法做了n(tries)次试验,返回落在四分之一圆中的点的个数。判断方法是检查该点到圆心的距离,如果小于R则是在圆上。

通过大量的并发,我们可以快速的运行多次试验,试验的次数越多,结果越接近真实的圆周率。

这里给出不同并发方法的程序代码

  • 非并发

    我们先在单线程,但进程运行,看看性能如何

from math import hypot
from random import random
import eventlet
import time

def test(tries):
    return sum(hypot(random(), random()) < 1 for _ in range(tries))

def calcPi(nbFutures, tries):
    ts = time.time()
    result = map(test, [tries] * nbFutures)
    
    ret = 4. * sum(result) / float(nbFutures * tries)
    span = time.time() - ts
    print "time spend ", span
    return ret

print calcPi(3000,4000)
  • 多线程 thread

    为了使用线程池,我们用multiprocessing的dummy包,它是对多线程的一个封装。注意这里代码虽然一个字的没有提到线程,但它千真万确是多线程。

    通过测试我们开(jing)心(ya)的发现,果然不出所料,当线程池为1是,它的运行结果和没有并发时一样,当我们把线程池数字设置为5时,耗时几乎是没有并发的2倍,我的测试数据从5秒到9秒。所以对于计算密集型的任务,还是放弃多线程吧。

from multiprocessing.dummy import Pool

from math import hypot
from random import random
import time

def test(tries):
    return sum(hypot(random(), random()) < 1 for _ in range(tries))

def calcPi(nbFutures, tries):
    ts = time.time()
    p = Pool(1)
    result = p.map(test, [tries] * nbFutures)
    ret = 4. * sum(result) / float(nbFutures * tries)
    span = time.time() - ts
    print "time spend ", span
    return ret

if __name__ == '__main__':
    p = Pool()
    print("pi = {}".format(calcPi(3000, 4000)))
  • 多进程 multiprocess

    理论上对于计算密集型的任务,使用多进程并发比较合适,在以下的例子中,进程池的规模设置为5,修改进程池的大小可以看到对结果的影响,当进程池设置为1时,和多线程的结果所需的时间类似,因为这时候并不存在并发;当设置为2时,响应时间有了明显的改进,是之前没有并发的一半;然而继续扩大进程池对性能影响并不大,甚至有所下降,也许我的Apple Air的CPU只有两个核?

    当心,如果你设置一个非常打的进程池,你会遇到 Resource temporarily unavailable的错误,系统并不能支持创建太多的进程,毕竟资源是有限的。

from multiprocessing import Pool

from math import hypot
from random import random
import time

def test(tries):
    return sum(hypot(random(), random()) < 1 for _ in range(tries))

def calcPi(nbFutures, tries):
    ts = time.time()
    p = Pool(5)
    result = p.map(test, [tries] * nbFutures)
    ret = 4. * sum(result) / float(nbFutures * tries)
    span = time.time() - ts
    print "time spend ", span
    return ret

if __name__ == '__main__':
    p = Pool()
    print("pi = {}".format(calcPi(3000, 4000)))
  • gevent (伪线程)

    不论是gevent还是eventlet,因为不存在实际的并发,响应时间和没有并发区别不大,这个和测试结果一致。

import gevent
from math import hypot
from random import random
import time

def test(tries):
    return sum(hypot(random(), random()) < 1 for _ in range(tries))

def calcPi(nbFutures, tries):
    ts = time.time()
    jobs = [gevent.spawn(test, t) for t in [tries] * nbFutures]
    gevent.joinall(jobs, timeout=2)
    ret = 4. * sum([job.value for job in jobs]) / float(nbFutures * tries)
    span = time.time() - ts
    print "time spend ", span
    return ret

print calcPi(3000,4000)
  • eventlet (伪线程)

from math import hypot
from random import random
import eventlet
import time

def test(tries):
    return sum(hypot(random(), random()) < 1 for _ in range(tries))

def calcPi(nbFutures, tries):
    ts = time.time()
    pool = eventlet.GreenPool()
    result = pool.imap(test, [tries] * nbFutures)
    
    ret = 4. * sum(result) / float(nbFutures * tries)
    span = time.time() - ts
    print "time spend ", span
    return ret

print calcPi(3000,4000)
  • SCOOP

SCOOP中的Future接口符合PEP-3148的定义,也就是在Python3中提供的Future接口。

在缺省的SCOOP配置环境下(单机,4个Worker),并发的性能有提高,但是不如两个进程池配置的多进程。

from math import hypot
from random import random
from scoop import futures

import time

def test(tries):
    return sum(hypot(random(), random()) < 1 for _ in range(tries))

def calcPi(nbFutures, tries):
    ts = time.time()
    expr = futures.map(test, [tries] * nbFutures)
    ret = 4. * sum(expr) / float(nbFutures * tries)
    span = time.time() - ts
    print "time spend ", span
    return ret

if __name__ == "__main__":
    print("pi = {}".format(calcPi(3000, 4000)))
  • Celery

任务代码

from celery import Celery

from math import hypot
from random import random
 
app = Celery('tasks', backend='amqp', broker='amqp://guest@localhost//')
app.conf.CELERY_RESULT_BACKEND = 'db+sqlite:///results.sqlite'
 
@app.task
def test(tries):
    return sum(hypot(random(), random()) < 1 for _ in range(tries))

客户端代码

from celery import group
from tasks import test

import time

def calcPi(nbFutures, tries):
    ts = time.time()
    result = group(test.s(tries) for i in xrange(nbFutures))().get()
    
    ret = 4. * sum(result) / float(nbFutures * tries)
    span = time.time() - ts
    print "time spend ", span
    return ret

print calcPi(3000, 4000)

使用Celery做并发的测试结果出乎意料(环境是单机,4frefork的并发,消息broker是rabbitMQ),是所有测试用例里最糟糕的,响应时间是没有并发的5~6倍。这也许是因为控制协调的开销太大。对于这样的计算任务,Celery也许不是一个好的选择。

  • asyncoro

    Asyncoro的测试结果和非并发保持一致。

import asyncoro

from math import hypot
from random import random
import time

def test(tries):
    yield sum(hypot(random(), random()) < 1 for _ in range(tries))


def calcPi(nbFutures, tries):
    ts = time.time()
    coros = [ asyncoro.Coro(test,t) for t in [tries] * nbFutures]
    ret = 4. * sum([job.value() for job in coros]) / float(nbFutures * tries)
    span = time.time() - ts
    print "time spend ", span
    return ret

print calcPi(3000,4000)


IO密集型

IO密集型的任务是另一种常见的用例,例如网络WEB服务器就是一个例子,每秒钟能处理多少个请求时WEB服务器的重要指标。

我们就以网页读取作为最简单的例子

from math import hypot
import time
import urllib2

urls = ['http://www.google.com', 'http://www.example.com', 'http://www.python.org']

def test(url):
    return urllib2.urlopen(url).read()

def testIO(nbFutures):
    ts = time.time()
    map(test, urls * nbFutures)

    span = time.time() - ts
    print "time spend ", span

testIO(10)

在不同并发库下的代码,由于比较类似,我就不一一列出。大家可以参考计算密集型中代码做参考。

通过测试我们可以发现,对于IO密集型的任务,使用多线程,或者是多进程都可以有效的提高程序的效率,而使用伪线程性能提升非常显著,eventlet比没有并发的情况下,响应时间从9秒提高到0.03秒。同时eventlet/gevent提供了非阻塞的异步调用模式,非常方便。这里推荐使用线程或者伪线程,因为在响应时间类似的情况下,线程和伪线程消耗的资源更少。


总结

Python提供了不同的并发方式,对应于不同的场景,我们需要选择不同的方式进行并发。选择合适的方式,不但要对该方法的原理有所了解,还应该做一些测试和试验,数据才是你做选择的最好参考。


Python 并行分布式框架之 PP

2015年3月20日 05:44

PP (Parallel Python)是基于Python的一个轻量级的,提供在SMP(多处理器或者多核系统)或者集群环境中并行执行Python代码的机制。

最简单和最常见的并行方式是使用多线程,然而如果应用程序使用Python提供的线程库, 它实际上并不能并行的运行Python的字节码(Byte-Code)。这是因为Pyton解释器使用GIL(全局解释器锁),这样的机制是的在同一时间,即使是多核的机器,也只能运行一个字节码指令。PP试图克服这样的限制,提供一种更简单的方式来编写并行应用。PP采用了多进程和进程间通信来处理并发,并隐藏所有的实现细节,使得其容易使用。

功能介绍

  • 并行运行Python代码(废话)

  • 易于理解的基于任务(Job)的并行技术

  • 自动检测优化配置

  • 动态处理器分配

  • 负载均衡

  • 容错

  • 自动发现和动态分配计算资源

  • 基于SHA的网络连接认证

  • 跨平台(Windows,Linux, Unix, Mac OS)和架构(X86,X86-64)支持

  • 开源 (BSD)

安装

wget  
tar -xvf pp-1.6.4.tar.gz
cd pp-1.6.4
sudo python setup.py install

架构和设计

Server

PP server 包含并管理多个worker并行的执行客户端发送的任务

Client

客户端负责发送任务(Python Function)到服务器

Cluster

多个PP Server可以构成一个PP Cluster,在CLuster模式客户端提交任务到Cluster,cluster 找到合适的Server来运行任务。

使用方式

SMP

在SMP的模式下使用PP非常简单

import pp
# create a job serer
job_server = pp.Server()

# submit some jobs with python functions
f1 = job_server.submit(func1, args1, depfuncs1, modules1)
f2 = job_server.submit(func1, args2, depfuncs1, modules1)
f3 = job_server.submit(func2, args3, depfuncs2, modules2)

# Get result from each jobs
r1 = f1()
r2 = f2()
r3 = f3()

Cluster

在Cluster模式下,需要在不同的节点运行ppserver

node-1> ./ppserver.py
node-2> ./ppserver.py
node-3> ./ppserver.py

客户端代码和SMP模式类似

import pp

# create cluster
ppservers=("node-1", "node-2", "node-3")

# create a job serer
pp.Server(ppservers=ppservers) 

# submit some jobs with python functions
f1 = job_server.submit(func1, args1, depfuncs1, modules1)
f2 = job_server.submit(func1, args2, depfuncs1, modules1)
f3 = job_server.submit(func2, args3, depfuncs2, modules2)

# Get result from each jobs
r1 = f1()
r2 = f2()
r3 = f3()


总结

PP是一个轻量级的并行框架,代码不多,安装使用起来也比较简单,并行方式是多进程,但是缺乏对任务的封装,也缺少调度的功能。适合于构造简单的并行分布式系统。

Celery (芹菜)是基于Python开发的分布式任务队列。它支持使用任务队列的方式在分布的机器/进程/线程上执行任务调度。

架构设计

Celery的架构由三部分组成,消息中间件(message broker),任务执行单元(worker)和任务执行结果存储(task result store)组成。

  • 消息中间件

    Celery本身不提供消息服务,但是可以方便的和第三方提供的消息中间件集成。包括,RabbitMQRedisMongoDB (experimental), Amazon SQS (experimental),CouchDB (experimental), SQLAlchemy (experimental),Django ORM (experimental), IronMQ

  • 任务执行单元

    Worker是Celery提供的任务执行的单元,worker并发的运行在分布式的系统节点中。

  • 任务结果存储

    Task result store用来存储Worker执行的任务的结果,Celery支持以不同方式存储任务的结果,包括AMQP, Redis,memcached, MongoDB,SQLAlchemy, Django ORM,Apache Cassandra, IronCache

另外, Celery还支持不同的并发和序列化的手段

  • 并发

    PreforkEventletgevent, threads/single threaded

  • 序列化

    picklejsonyamlmsgpackzlibbzip2 compression, Cryptographic message signing 等等

安装和运行

Celery的安装过程略为复杂,下面的安装过程是基于我的AWS EC2的Linux版本的安装过程,不同的系统安装过程可能会有差异。大家可以参考官方文档。

首先我选择RabbitMQ作为消息中间件,所以要先安装RabbitMQ。作为安装准备,先更新YUM。

sudo yum -y update

RabbitMQ是基于erlang的,所以先安装erlang

# Add and enable relevant application repositories:
# Note: We are also enabling third party remi package repositories.
wget http://dl.fedoraproject.org/pub/epel/6/x86_64/epel-release-6-8.noarch.rpm
wget http://rpms.famillecollet.com/enterprise/remi-release-6.rpm
sudo rpm -Uvh remi-release-6*.rpm epel-release-6*.rpm

# Finally, download and install Erlang:
yum install -y erlang

然后安装RabbitMQ

# Download the latest RabbitMQ package using wget:
wget  
# Add the necessary keys for verification:
rpm --import  
# Install the .RPM package using YUM:
yum install rabbitmq-server-3.2.2-1.noarch.rpm

启动RabbitMQ服务

rabbitmq-server start

RabbitMQ服务已经准备好了,然后安装Celery, 假定你使用pip来管理你的python安装包

pip install Celery

为了测试Celery是否工作,我们运行一个最简单的任务,编写tasks.py

from celery import Celery

app = Celery('tasks', backend='amqp', broker='amqp://guest@localhost//')
app.conf.CELERY_RESULT_BACKEND = 'db+sqlite:///results.sqlite'

@app.task
def add(x, y):
    return x + y

在当前目录运行一个worker,用来执行这个加法的task

celery -A tasks worker --loglevel=info

其中-A参数表示的是Celery App的名字。注意这里我使用的是SQLAlchemy作为结果存储。对应的python包要事先安装好。

worker日志中我们会看到这样的信息

- ** ---------- [config]
- ** ---------- .> app:         tasks:0x1e68d50
- ** ---------- .> transport:   amqp://guest:**@localhost:5672//
- ** ---------- .> results:     db+sqlite:///results.sqlite
- *** --- * --- .> concurrency: 8 (prefork)

其中,我们可以看到worker缺省使用prefork来执行并发,并设置并发数为8

下面的任务执行的客户端代码:

from tasks import add
import time
result = add.delay(4,4)

while not result.ready():
  print "not ready yet"
  time.sleep(5)

print result.get()

用python执行这段客户端代码,在客户端,结果如下

not ready   
8

Work日志显示

[2015-03-12 02:54:07,973: INFO/MainProcess] Received task: tasks.add[34c4210f-1bc5-420f-a421-1500361b914f]
[2015-03-12 02:54:08,006: INFO/MainProcess] Task tasks.add[34c4210f-1bc5-420f-a421-1500361b914f] succeeded in 0.0309705100954s: 8

这里我们可以发现,每一个task有一个唯一的ID,task异步执行在worker上。

这里要注意的是,如果你运行官方文档中的例子,你是无法在客户端得到结果的,这也是我为什么要使用SQLAlchemy来存储任务执行结果的原因。官方的例子使用AMPQ,有可能Worker在打印日志的时候取出了task的运行结果显示在worker日志中,然而AMPQ作为一个消息队列,当消息被取走后,队列中就没有了,于是客户端总是无法得到任务的执行结果。不知道为什么官方文档对这样的错误视而不见。

如果大家想要对Celery做更进一步的了解,请参考官方文档

不管是开源还是闭源,文档都是很重要的。当然理论上说,最好的文档就是代码本身,但是要让所有人都能读懂你的代码这太难了。所以我们要写文档。大部分情况,我们不希望维护一份代码再加上一份文档,这样做很容易造成文档和代码的不一致,程序员最讨厌更新文档了。所以最佳实践就是在程序员代码中加注释,然后通过构建脚本自通生成文档。

对应于Pyhon,有很多可供选择的工具:

  • sphinx 中文版介绍 Sphinx使用 reStructuredText作为标记语言(类似Markdown),可扩展,功能强大。要注意的是何有一个开源的搜索也叫Sphinx,斯芬克斯果然太受欢迎,开源的世界起个名字不容易呀。

  • pdoc 是一个简单易用的命令行工具,可以生成Python的API文档

  • Doxygen 是老牌的文档生成工具,可以针对各种语言生成文档,我们以前在C++的项目中曾经使用过

  • 其他还有诸如 pydoc , pydoctor 等等

下面我就介绍一下如果使用Sphinx为你的python项目快速的构建API 文档。

首先要安装Sphinx,不同的操作系统有不同的安装方式,Sphinx的源代码在这里 , 你也可以自己构建。我推荐使用pip install。(注,如果你安装了Anaconda,Sphinx已经包含在内了)

然后,假定你的python的源代码是在 src 目录下,我们在同一级并行建立一个文档目录 doc (你当然可以根据自己的项目需要,确定目录命名和结构)。

在doc目录下运行 

sphinx-quickstart

sphinx会提示你的项目的一些设置,生成项目的配置文件,这里给出一些推荐的配置:

> Root path for the documentation [.]:
<ENTER>
> Separate source and build directories (y/N) [n]:
y
> Name prefix for templates and static dir [_]:
<ENTER>
> Project name:
an_example_pypi_project
> Author name(s):
Andrew Carter
> Project version:
0.0.1
> Project release [0.0.1]:
<ENTER>
> Source file suffix [.rst]:
<ENTER>
> Name of your master document (without suffix) [index]:
<ENTER>
> autodoc: automatically insert docstrings from modules (y/N) [n]:
y
> doctest: automatically test code snippets in doctest blocks (y/N) [n]:
n
> intersphinx: link between Sphinx documentation of different projects (y/N) [n]:
y
> todo: write “todo” entries that can be shown or hidden on build (y/N) [n]:
n
> coverage: checks for documentation coverage (y/N) [n]:
n
> pngmath: include math, rendered as PNG images (y/N) [n]:
n
> jsmath: include math, rendered in the browser by JSMath (y/N) [n]:
n
> ifconfig: conditional inclusion of content based on config values (y/N) [n]:
y
> Create Makefile? (Y/n) [y]:
n
> Create Windows command file? (Y/n) [y]:
n

运行完毕,sphinx会生成项目的配置文档conf.py还有源文件(后缀为rst)

下一步要为捏python源文件生成sphinx的源文件,用来生成API文档,需要运行

sphinx-apidoc [options] -o outputdir packagedir [pathnames]

其中outputdir是doc目录,packagedir是src目录,也就是你的python代码包所在的目录

运行好后,会对每一个Python包生成一个rst文件,你可以编辑该文件来修改生成文档的细节,一般情况下不用改。

好了,准备工作做好了以后,就可以生成API文档了。在运行文档生成脚本之前,要确保你的Python源代码所在的包在系统路径中是可以找到的,需要修改conf.py。因为在生成文档是需要运行你的python代码,要保证code运行不出错。

sys.path.insert(0, os.path.abspath('../src'))

在doc目录下运行脚本

sphinx-build -b html . ./ouput

在output目录会生成HTML格式的API文档。(也可以选其他文档格式)

Sphinx还有一个automsummay的扩展,可能能简化以上的过程,等我试一试在更新结果。


大数据时代,数据的异常分析被广泛的用于各个场合。 今天我们就来看一看其中的一种场景,对于单变量数据集的异常检测。

所谓单变量,就是指数据集中只有一个变化的值,下面我们来看看今天我们要分析的的数据,点击这里数据文件下载数据文件。

分析数据的第一步是要加载文件, 本文使用了numpy,pandas,scikit learn等常见的数据分析要用到的Python库。

import numpy as np
import pandas as pd
df = pd.read_csv("farequote.csv")

Pandas 是一个常用的数据分析的Python库,提供对数据的加载,清洗,抽取,变形等操作。Pandas依赖numpy,numpy提供了基于列/多维数组(List/N-D Array)的数据结构的操作。许多科学计算和数据分析的库都依赖于numpy。

df 是Pandas中常用的数据类型dataframe,dataframe类似与一个数据库的表,使用 df.head()可以得到数据的头几行,以便了解数据的概貌。

该数据结构中,第一列式Pandas添加的索引,第一行是每一列数据的名字,除了第一列,每一列数据可以看成是一个变量,所以该数据集共有三个变量,时间(_time)、航空公司名称(airline)、响应时间(responsetime)。我们可以这样理解,该数据集记录了一段时间内,各个航空公司飞机延误的时间。我们希望通过分析找出是否存在异常的情况。

注意,我们是要分析单变量,所以所有的分析都是基于某一个航空公司的数据,所以就需要对该数据集做一个查询,找出要分析的航空公司。首先要知道有哪些航空公司,使用np.unique(df.airline)可以找到所有的航空公司代码,类似SQL的Unique命令

array(['AAL', 'ACA', 'AMX', 'ASA', 'AWE', 'BAW', 'DAL', 'EGF', 'FFT',
       'JAL', 'JBU', 'JZA', 'KLM', 'NKS', 'SWA', 'SWR', 'TRS', 'UAL', 'VRD'], 
      dtype='|S3')

查询某个航空公司的数据使用dataframe的query方法,类似SQL的select。Query返回的结果仍然是一个dataframe对象。

dd = df.query('airline=="KLM"') ## 得到法航的数据

我们先了解一下数据的大致信息,使用describe方法

dd.responsetime.describe()

得到如下的结果:

count    1724.000000
mean     1500.613766
std       100.085320
min      1209.766800
25%      1434.084625
50%      1499.135000
75%      1567.831025
max      1818.774100
Name: responsetime, dtype: float64

该结果返回了数据集responsetime维度上的主要统计指标,个数,均值,方差,最大最小值等等,也可以调用单独的方法例如min(),mean()等来获得某一个指标。

基于标准差得异常检测

下面我们就可以开始异常点的分析了,对于单变量的异常点分析,最容易想到的就是基于标准差(Standard Deviation)的方法了。我们假定数据的正态分布的,利用概率密度函数,我们知道

  • 95.449974面积在平均数左右两个标准差的范围内

  • 99.730020%的面积在平均数左右三个标准差的范围内

  • 99.993666的面积在平均数左右三个标准差的范围内

所以我们95%也就是大概两个标准差为门限,凡是落在门限外的都认为是异常点。代码如下

def a1(dataframe, threshold=.95):
    d = dataframe['responsetime']
    dataframe['isAnomaly'] = d > d.quantile(threshold)  
    return dataframe
print a1(dd)

运行以上程序我们得到如下结果

                             _time airline  responsetime isAnomaly
20    2013-02-01T23:57:59.000-0700     KLM     1481.4945     False
76    2013-02-01T23:52:34.000-0700     KLM     1400.9050     False
124   2013-02-01T23:47:10.000-0700     KLM     1501.4313     False
203   2013-02-01T23:39:08.000-0700     KLM     1278.9509     False
281   2013-02-01T23:32:27.000-0700     KLM     1386.4157     False
336   2013-02-01T23:26:09.000-0700     KLM     1629.9589     False
364   2013-02-01T23:23:52.000-0700     KLM     1482.5900     False
448   2013-02-01T23:16:08.000-0700     KLM     1553.4988     False
511   2013-02-01T23:10:39.000-0700     KLM     1555.1894     False
516   2013-02-01T23:10:08.000-0700     KLM     1720.7862      True
553   2013-02-01T23:06:29.000-0700     KLM     1306.6489     False
593   2013-02-01T23:03:03.000-0700     KLM     1481.7081     False
609   2013-02-01T23:01:29.000-0700     KLM     1521.0253     False
666   2013-02-01T22:56:04.000-0700     KLM     1675.2222      True
...   ...   ...   ...

结果数据集上多了一列isAnomaly用来标记每一行记录是否是异常点,我们看到已经有一些点被标记为异常点了。

我们看看程序的详细内容:

  1. 方法a1定义了一个异常检测的函数

  2. dataframe['responsetime']等价于dataframe.responsetime,该操作取出responsetime这一列的值

  3. d.quantile(threshold)用正态分布假定返回位于95%的点的值,大于该值得点都落在正态分布95%之外

  4. d > d.quantile(threshold)是一个数组操作,返回的新数组是responsetime和threshold的比较结果,[False,False,True,... ... False]

  5. 然后通过dataframe的赋值操作增加一个新的列,标记所有的异常点。

数据可视化往往是数据分析的最后一步,我们看看结果如何:

import matplotlib.pyplot as plt
da = a1(dd)
fig = plt.figure()
ax1 = fig.add_subplot(2, 1, 1)
ax2 = fig.add_subplot(2, 1, 2)
ax1.plot(da['responsetime'])
ax2.plot(da['isAnomaly'])

这异常点也太多了,用99%在试试:

现在似乎好一点,然而我们知道,对于数据集的正态分布的假定往往是不成立的,假如数据分布在大小两头,那么这样的异常检测就很难奏效了。我们看看其他一些改进的方法。

基于ZSCORE的异常检测

zscore的计算如下

sd是标准差,X是均值。一般建议门限值取为3.5

代码如下:

def a2(dataframe, threshold=3.5):
    d = dataframe['responsetime']
    zscore = (d - d.mean())/d.std()
    dataframe['isAnomaly'] = zscore.abs() > threshold
    return dataframe

另外还有一种增强的zscore算法,基于MAD。MAD的定义是

其中X是中位数。

增强的zscore算法如下:

def a3(dataframe, threshold=3.5):
    dd = dataframe['responsetime']
    MAD = (dd - dd.median()).abs().median()
    zscore = ((dd - dd.median())* 0.6475 /MAD).abs()
    dataframe['isAnomaly'] = zscore > threshold
    return dataframe

用zscore算法得到:

调整门限为3得到

如果换一组数据AAL,结果会怎么样呢?

我们发现有一段时间,所有的响应都很慢,我们想要把这些点都标记为异常,可能么?

基于KMEAN聚集的异常检测

通常基于KMEAN的聚集算法并不适用于异常点检测,以为聚集算法总是试图平衡每一个聚集中的点的数目,所以对于少数的异常点,聚集非常不好用,但是我们这个例子中,异常点都聚在一起,所以应该可以使用。

首先,为了看清聚集,我们使用时间序列的常用分析方法,增加一个维度,该维度是每一个点得前一个点得响应时间。

preresponse = 0
newcol = []
newcol.append(0)
for index, row in dd.iterrows():
    if preresponse != 0:
        newcol.append(preresponse)
    preresponse = row.responsetime
dd["t0"] = newcol
plt.scatter(dd.t0,dd.responsetime)

我们利用iterrows来循环数据,把前一个点的响应时间增加到当前点,第一个点的该值为0,命名该列为t0。然后用scatter plot把它画出来。

上面是法航KLM的数据,其中最左边的点是一个无效的点,因为前一个点的响应时间不知道所以填了0,分析时应该过滤该店。

对于AAL,我们可以清楚的看到两个聚集:

其中右上方的聚集,也就是点数目比较少得聚集就是我们希望检测到的异常点得集合。

我们看看如何使用KMEAN算法来检测吧:

def a4(dataframe, threshold = .9):
    ## add one dimention of previous response
    preresponse = 0
    newcol = []
    newcol.append(0)
    for index, row in dataframe.iterrows():
        if preresponse != 0:
            newcol.append(preresponse)
        preresponse = row.responsetime
    dataframe["t0"] = newcol
    ## remove first row as there is no previous event for time
    dd = dataframe.drop(dataframe.head(1).index) 
    clf = cluster.KMeans(n_clusters=2)
    X=np.array(dd[['responsetime','t0']])
    cls = clf.fit_predict(X)
    freq = itemfreq(cls)
    (A,B) = (freq[0,1],freq[1,1])
    t = abs(A-B)/max(A,B)
    if t > threshold :
        ## "Anomaly Detected!"
        index = freq[0,0]
        if A > B :
            index = freq[1,0]
        dd['isAnomaly'] = (cls == index)
    else :
        ## "No Anomaly Point"
        dd['isAnomaly'] = False
    return dd

其核心代码是以下这几行:

clf = cluster.KMeans(n_clusters=2)
X=np.array(dd[['responsetime','t0']])
cls = clf.fit_predict(X)

cluster.KMeans返回一个预测模型,我们假定有两个聚集。你可以试着加大聚集的数量,结果没什么影响。

dd[['responsetime','t0']]返回一个2*n的数组,并赋值给X,用于聚集计算。

fit_pridict方法是对X做聚集运算,并计算每一个点对应的聚集编号。

freq = itemfreq(cls)

itemfreq返回聚集结果中每一个聚集的发生频率,如果其中一个比另一个显著地多,我们则认为那个少得是异常点聚集。

用该方法可以把所有聚集里的点标记为异常点。

这里我用红色标记结果让大家看的清楚一点,注意因为是line chart,连个竖线间的都是异常点。

总结

除了上述的算法,还有其它一些相关的算法,大家如果对背后的数据知识有兴趣的话,可以参考这篇相关介绍

单变量的异常检测算法相对比较简单,但是要做到精准检测就更难,因为掌握的信息更少。另外boxplot也经常被用于异常检测,他和基于方差的异常检测是一致的,只不过用图形让大家一目了然的获得结果,大家有兴趣可以了解一下。


背景

Web Scraping

在大数据时代,一切都要用数据来说话,大数据处理的过程一般需要经过以下的几个步骤

  • 数据的采集和获取

  • 数据的清洗,抽取,变形和装载

  • 数据的分析,探索和预测

  • 数据的展现

其中首先要做的就是获取数据,并提炼出有效地数据,为下一步的分析做好准备。

数据的来源多种多样,以为我本身是足球爱好者,而世界杯就要来了,所以我就想提取欧洲联赛的数据来做一个分析。许多的网站都提供了详细的足球数据,例如:

这些网站都提供了详细的足球数据,然而为了进一步的分析,我们希望数据以格式化的形式存储,那么如何把这些网站提供的网页数据转换成格式化的数据呢?这就要用到Web scraping的技术了。简单地说,Web Scraping就是从网站抽取信息, 通常利用程序来模拟人浏览网页的过程,发送http请求,从http响应中获得结果。

Web Scraping 注意事项

在抓取数据之前,要注意以下几点:

  • 阅读网站有关数据的条款和约束条件,搞清楚数据的拥有权和使用限制

  • 友好而礼貌,使用计算机发送请求的速度飞人类阅读可比,不要发送非常密集的大量请求以免造成服务器压力过大

  • 因为网站经常会调整网页的结构,所以你之前写的Scraping代码,并不总是能够工作,可能需要经常调整

  • 因为从网站抓取的数据可能存在不一致的情况,所以很有可能需要手工调整

Python Web Scraping 相关的库

Python提供了很便利的Web Scraping基础,有很多支持的库。这里列出一小部分

当然也不一定要用Python或者不一定要自己写代码,推荐关注import.io

Web Scraping 代码

下面,我们就一步步地用Python,从腾讯体育来抓取欧洲联赛13/14赛季的数据。

首先要安装Beautifulsoup

pip install beautifulsoup4

 

我们先从球员的数据开始抓取。

球员数据的Web请求是http://soccerdata.sports.qq.com/playerSearch.aspx?lega=epl&pn=2 ,返回的内容如下图所示:

该web服务有两个参数,lega表示是哪一个联赛,pn表示的是分页的页数。

首先我们先做一些初始化的准备工作

from urllib2 import urlopen
import urlparse
import bs4

BASE_URL = "http://soccerdata.sports.qq.com"
PLAYER_LIST_QUERY = "/playerSearch.aspx?lega=%s&pn=%d"
league = ['epl','seri','bund','liga','fran','scot','holl','belg']
page_number_limit = 100
player_fields = ['league_cn','img','name_cn','name','team','age','position_cn','nation','birth','query','id','teamid','league']

 

urlopen,urlparse,bs4是我们将要使用的Python库。

BASE_URL,PLAYER_LIST_QUERY,league,page_number_limit和player_fields是我们会用到的一些常量。

下面是抓取球员数据的具体代码:

def get_players(baseurl):
    html = urlopen(baseurl).read()
    soup = bs4.BeautifulSoup(html, "lxml")
    players = [ dd for dd in soup.select('.searchResult tr') if dd.contents[1].name != 'th']
    result = []
    for player in players:
        record = []
        link = ''
        query = []
        for item in player.contents:
            if type(item) is bs4.element.Tag:
                if not item.string and item.img:
                    record.append(item.img['src'])
                else :
                    record.append(item.string and item.string.strip() or 'na')
                try:
                    o = urlparse.urlparse(item.a['href']).query
                    if len(link) == 0:
                        link = o
                        query = dict([(k,v[0]) for k,v in urlparse.parse_qs(o).items()])
                except:
                    pass
             
        if len(record) != 10:
            for i in range(0, 10 - len(record)):
                record.append('na')
        record.append(unicode(link,'utf-8'))
        record.append(unicode(query["id"],'utf-8'))
        record.append(unicode(query["teamid"],'utf-8'))
        record.append(unicode(query["lega"],'utf-8'))
        result.append(record)
    return result
    
result = []
for url in [ BASE_URL + PLAYER_LIST_QUERY % (l,n) for l in league for n in range(page_number_limit) ]:
    result = result +  get_players(url)

 

我们来看看抓取球员数据的详细过程:

首先我们定义了一个get_players方法,该方法会返回某一请求页面上所有球员的数据。为了得到所有的数据,我们通过一个for循环,因为要循环各个联赛,每个联赛又有多个分页,一般情况下是需要一个双重循环的:

for i in league:
    for j in range(0, 100):
        url = BASE_URL + PLAYER_LIST_QUERY % (l,n)
        ## send request to url and do scraping

 

Python的list comprehension可以很方便的通过构造一个列表的方式来减少循环的层次。

另外Python还有一个很方便的语法来合并连个列表: list = list1 + list2

好我们再看看如何使用BeautifulSoup来抓取网页中我们需要的内容。

首先调用urlopen读取对应url的内容,通常是一个html,用该html构造一个beautifulsoup对象。

beautifulsoup对象支持很多查找功能,也支持类似css的selector。通常如果有一个DOM对象是<xx class='cc'>,我们使用以下方式来查找:

obj = soup.find("xx","cc")

 

另外一种常见的方式就是通过CSS的selector方式,在上述代码中,我们选择class=searchResult元素里面,所有的tr元素,过滤掉th也就是表头元素。

for dd in soup.select('.searchResult tr') if dd.contents[1].name != 'th'

 

对于每一行记录tr,生成一条球员记录,并存放在一个列表中。所以我们就循环tr的内容tr.contents,获得对应的field内容。

对于每一个tr的content,我们先检查其类型是不是一个Tag,对于Tag类型有几种情况,一种是包含img的情况,我们需要取出球员的头像图片的网址。

另一种是包含了一个链接,指向其他数据内容

所以在代码中要分别处理这些不同的情况。

对于一个Tag对象,Tag.x可以获得他的子对象,Tag['x']可以获得Tag的attribute的值。

所以用item.img['src']可以获得item的子元素img的src属性。

对已包含链接的情况,我们通过urlparse来获取查询url中的参数。这里我们利用了dict comprehension的把查询参数放入一个dict中,最有添加到列表中。

dict([(k,v[0]) for k,v in urlparse.parse_qs(o).items()])

 

对于其它情况,我们使用Python 的and or表达式以确保当Tag的内容为空时,我们写入‘na’,该表达式类似C/C++或Java中的三元操作符 X ? A : B

然后有一段代码判断当前记录的长度是否大于10,不大于10则用空值填充,目的是避免一些不一致的地方。

if len(record) != 10:
    for i in range(0, 10 - len(record)):
        record.append('na')

 

最后,我们把query中的一些相关的参数如球员的id,球队的id,所在的联赛代码等加入到列表。

record.append(unicode(link,'utf-8'))
record.append(unicode(query["id"],'utf-8'))
record.append(unicode(query["teamid"],'utf-8'))
record.append(unicode(query["lega"],'utf-8'))

 

最后我们把本页面所有球员的列表放入一个列表返回。

好了,现在我们拥有了一个包含所有球员的信息的列表,我们需要把它存下来,以进一步的处理,分析。通常,csv格式是一个常见的选择。

import csv
def write_csv(filename, content, header = None): 
    file = open(filename, "wb")
    file.write('\xEF\xBB\xBF')
    writer = csv.writer(file, delimiter=',')
    if header:
        writer.writerow(header)
    for row in content:
        encoderow = [dd.encode('utf8') for dd in row]
        writer.writerow(encoderow)

write_csv('players.csv',result,player_fields)

 

这里需要注意的就是关于encode的问题。因为我们使用的时utf-8的编码方式,在csv的文件头,需要写入\xEF\xBB\xBF,详见这篇文章

好了现在大功告成,抓取的csv如下图:

因为之前我们还抓取了球员本赛季的比赛详情,所以我们可以进一步的抓取所有球员每一场比赛的记录

抓取的代码如下

def get_player_match(url):
    html = urlopen(url).read()
    soup = bs4.BeautifulSoup(html, "lxml")
    matches = [ dd for dd in soup.select('.shtdm tr') if dd.contents[1].name != 'th']
    records = []
    for item in [ dd for dd in matches if len(dd.contents) > 11]: ## filter out the personal part
        record = []
        for match in [ dd for dd in item.contents if type(dd) is bs4.element.Tag]:
            if match.string:
                record.append(match.string)
            else:
                for d in [ dd for dd in match.contents if type(dd) is bs4.element.Tag]:
                    query = dict([(k,v[0]) for k,v in urlparse.parse_qs(d['href']).items()])
                    record.append('teamid' in query and query['teamid'] or query['id'])   
                    record.append(d.string and d.string or 'na')                    
        records.append(record)
    return records[1:]  ##remove the first record as the header

def get_players_match(playerlist, baseurl = BASE_URL + '/player.aspx?'):
    result = []
    for item in playerlist:
        url =  baseurl + item[10]
        print url
        result = result + get_player_match(url)
    return result
match_fields = ['date_cn','homeid','homename_cn','matchid','score','awayid','awayname_cn','league_cn','firstteam','playtime','goal','assist','shoot','run','corner','offside','foul','violation','yellowcard','redcard','save']    
write_csv('m.csv',get_players_match(result),match_fields)

 

抓取的过程和之前类似。

下一步做什么

现在我们拥有了详细的欧洲联赛的数据,那么下一步要怎么做呢,我推荐大家把数据导入BI工具来做进一步的分析。有两个比较好的选择:

Tableau在数据可视化领域可谓无出其右,Tableau Public完全免费,用数据可视化来驱动数据的探索和分析,拥有非常好的用户体验

 

Splunk提供一个大数据的平台,主要面向机器数据。支持每天免费导入500M的数据,如果是个人学习,应该足够了。

当然你也可以用Excel。 另外大家如果有什么好的免费的数据分析的平台,欢迎交流。