Python 并行分布式框架之 PP

2015年3月20日 05:44

PP (Parallel Python)是基于Python的一个轻量级的,提供在SMP(多处理器或者多核系统)或者集群环境中并行执行Python代码的机制。

最简单和最常见的并行方式是使用多线程,然而如果应用程序使用Python提供的线程库, 它实际上并不能并行的运行Python的字节码(Byte-Code)。这是因为Pyton解释器使用GIL(全局解释器锁),这样的机制是的在同一时间,即使是多核的机器,也只能运行一个字节码指令。PP试图克服这样的限制,提供一种更简单的方式来编写并行应用。PP采用了多进程和进程间通信来处理并发,并隐藏所有的实现细节,使得其容易使用。

功能介绍

  • 并行运行Python代码(废话)

  • 易于理解的基于任务(Job)的并行技术

  • 自动检测优化配置

  • 动态处理器分配

  • 负载均衡

  • 容错

  • 自动发现和动态分配计算资源

  • 基于SHA的网络连接认证

  • 跨平台(Windows,Linux, Unix, Mac OS)和架构(X86,X86-64)支持

  • 开源 (BSD)

安装

wget  
tar -xvf pp-1.6.4.tar.gz
cd pp-1.6.4
sudo python setup.py install

架构和设计

Server

PP server 包含并管理多个worker并行的执行客户端发送的任务

Client

客户端负责发送任务(Python Function)到服务器

Cluster

多个PP Server可以构成一个PP Cluster,在CLuster模式客户端提交任务到Cluster,cluster 找到合适的Server来运行任务。

使用方式

SMP

在SMP的模式下使用PP非常简单

import pp
# create a job serer
job_server = pp.Server()

# submit some jobs with python functions
f1 = job_server.submit(func1, args1, depfuncs1, modules1)
f2 = job_server.submit(func1, args2, depfuncs1, modules1)
f3 = job_server.submit(func2, args3, depfuncs2, modules2)

# Get result from each jobs
r1 = f1()
r2 = f2()
r3 = f3()

Cluster

在Cluster模式下,需要在不同的节点运行ppserver

node-1> ./ppserver.py
node-2> ./ppserver.py
node-3> ./ppserver.py

客户端代码和SMP模式类似

import pp

# create cluster
ppservers=("node-1", "node-2", "node-3")

# create a job serer
pp.Server(ppservers=ppservers) 

# submit some jobs with python functions
f1 = job_server.submit(func1, args1, depfuncs1, modules1)
f2 = job_server.submit(func1, args2, depfuncs1, modules1)
f3 = job_server.submit(func2, args3, depfuncs2, modules2)

# Get result from each jobs
r1 = f1()
r2 = f2()
r3 = f3()


总结

PP是一个轻量级的并行框架,代码不多,安装使用起来也比较简单,并行方式是多进程,但是缺乏对任务的封装,也缺少调度的功能。适合于构造简单的并行分布式系统。

Celery (芹菜)是基于Python开发的分布式任务队列。它支持使用任务队列的方式在分布的机器/进程/线程上执行任务调度。

架构设计

Celery的架构由三部分组成,消息中间件(message broker),任务执行单元(worker)和任务执行结果存储(task result store)组成。

  • 消息中间件

    Celery本身不提供消息服务,但是可以方便的和第三方提供的消息中间件集成。包括,RabbitMQRedisMongoDB (experimental), Amazon SQS (experimental),CouchDB (experimental), SQLAlchemy (experimental),Django ORM (experimental), IronMQ

  • 任务执行单元

    Worker是Celery提供的任务执行的单元,worker并发的运行在分布式的系统节点中。

  • 任务结果存储

    Task result store用来存储Worker执行的任务的结果,Celery支持以不同方式存储任务的结果,包括AMQP, Redis,memcached, MongoDB,SQLAlchemy, Django ORM,Apache Cassandra, IronCache

另外, Celery还支持不同的并发和序列化的手段

  • 并发

    PreforkEventletgevent, threads/single threaded

  • 序列化

    picklejsonyamlmsgpackzlibbzip2 compression, Cryptographic message signing 等等

安装和运行

Celery的安装过程略为复杂,下面的安装过程是基于我的AWS EC2的Linux版本的安装过程,不同的系统安装过程可能会有差异。大家可以参考官方文档。

首先我选择RabbitMQ作为消息中间件,所以要先安装RabbitMQ。作为安装准备,先更新YUM。

sudo yum -y update

RabbitMQ是基于erlang的,所以先安装erlang

# Add and enable relevant application repositories:
# Note: We are also enabling third party remi package repositories.
wget http://dl.fedoraproject.org/pub/epel/6/x86_64/epel-release-6-8.noarch.rpm
wget http://rpms.famillecollet.com/enterprise/remi-release-6.rpm
sudo rpm -Uvh remi-release-6*.rpm epel-release-6*.rpm

# Finally, download and install Erlang:
yum install -y erlang

然后安装RabbitMQ

# Download the latest RabbitMQ package using wget:
wget  
# Add the necessary keys for verification:
rpm --import  
# Install the .RPM package using YUM:
yum install rabbitmq-server-3.2.2-1.noarch.rpm

启动RabbitMQ服务

rabbitmq-server start

RabbitMQ服务已经准备好了,然后安装Celery, 假定你使用pip来管理你的python安装包

pip install Celery

为了测试Celery是否工作,我们运行一个最简单的任务,编写tasks.py

from celery import Celery

app = Celery('tasks', backend='amqp', broker='amqp://guest@localhost//')
app.conf.CELERY_RESULT_BACKEND = 'db+sqlite:///results.sqlite'

@app.task
def add(x, y):
    return x + y

在当前目录运行一个worker,用来执行这个加法的task

celery -A tasks worker --loglevel=info

其中-A参数表示的是Celery App的名字。注意这里我使用的是SQLAlchemy作为结果存储。对应的python包要事先安装好。

worker日志中我们会看到这样的信息

- ** ---------- [config]
- ** ---------- .> app:         tasks:0x1e68d50
- ** ---------- .> transport:   amqp://guest:**@localhost:5672//
- ** ---------- .> results:     db+sqlite:///results.sqlite
- *** --- * --- .> concurrency: 8 (prefork)

其中,我们可以看到worker缺省使用prefork来执行并发,并设置并发数为8

下面的任务执行的客户端代码:

from tasks import add
import time
result = add.delay(4,4)

while not result.ready():
  print "not ready yet"
  time.sleep(5)

print result.get()

用python执行这段客户端代码,在客户端,结果如下

not ready   
8

Work日志显示

[2015-03-12 02:54:07,973: INFO/MainProcess] Received task: tasks.add[34c4210f-1bc5-420f-a421-1500361b914f]
[2015-03-12 02:54:08,006: INFO/MainProcess] Task tasks.add[34c4210f-1bc5-420f-a421-1500361b914f] succeeded in 0.0309705100954s: 8

这里我们可以发现,每一个task有一个唯一的ID,task异步执行在worker上。

这里要注意的是,如果你运行官方文档中的例子,你是无法在客户端得到结果的,这也是我为什么要使用SQLAlchemy来存储任务执行结果的原因。官方的例子使用AMPQ,有可能Worker在打印日志的时候取出了task的运行结果显示在worker日志中,然而AMPQ作为一个消息队列,当消息被取走后,队列中就没有了,于是客户端总是无法得到任务的执行结果。不知道为什么官方文档对这样的错误视而不见。

如果大家想要对Celery做更进一步的了解,请参考官方文档