如今是云的时代,许多公司都把自己的IT架构部署在基础架构云(IaaS)上。著名的IaaS提供商有亚马逊,微软(Azure),IBM等,国内也有诸如阿里云等。这里亚马逊毫无疑问是该市场的领军者。

AWS提供了非常多的服务,领先了竞争对手一大截。并且AWS提供非常丰富的API,其API基于Rest,所以很容易被不同的语言的平台来调用。

在如今的大数据时代,利用数据在做决策是大数据的核心价值,AWS提供了许多服务来获取其运行数据cloudtrail和cloudwatch是经常被用到的两个。CloudTrail是对AWS的所有API调用的日志,CloudWatch是监控AWS服务的性能数据。(新出的Config服务可用于监控AWS的资源变化)

今天我们来看看如何使用Python(Boto AWS的开源Python SDK)来自动配置ClouTrail的服务并获取日志内容。

我们先来看看CloudTrail的概念和相关的配置。

  • S3 Bucket

    在打开CloudTrail的服务时,需要指定一个相关的S3的Bucket,S3是亚马逊提供的存储服务,你可以把它当作一个基于云的文件系统。CloudTrail的API调用日志,会以压缩文件的形式,存储在你指定的Bucket里。

  • SNS

    SNS是亚马逊提供的通知服务,该服务使用的是订阅/发布(Subsrcibe/Publish)的模式。在创建CloudTrail的时候,可以关联一个SNS的Topic(可选),这样做的好处是当有API调用时,可以第一时间得到通知。可以使用不同的客户端来订阅SNS的通知,例如Email,Mobile的Notification Service,SQS等

  • SQS

    SQS是亚马逊提供的队列服务,在本文中,我们使用SQS订阅SNS的的内容,这样我们的Python程序就可以从SQS的队列中获取相应的通知。


配置CloudTrail

首先我们需要创建SNS,并指定相应的策略。代码如下:

import boto.sns
import json

key_id='yourawskeyid'
secret_key='yourawssecretkey'

region_name="eu-central-1"
trail_topic_name="topicABC"
sns_policy_sid="snspolicy0001"

sns_conn = boto.sns.connect_to_region(region_name,
                                         aws_access_key_id=key_id,
                                         aws_secret_access_key=secret_key)

sns_topic = sns_conn.create_topic(trail_topic_name)

# Get ARN of SNS topic
sns_arn = sns_topic['CreateTopicResponse']['CreateTopicResult']['TopicArn']

# Add related policy
attrs = sns_conn.get_topic_attributes(sns_arn)
policy = attrs['GetTopicAttributesResponse']['GetTopicAttributesResult']['Attributes']['Policy']
policy_obj = json.loads(policy)
statements = policy_obj['Statement']

default_statement = statements[0]
new_statement = default_statement.copy()
new_statement["Sid"] = sns_policy_sid
new_statement["Action"] = "SNS:Publish"
new_statement["Principal"] = {
        "AWS": [
          "arn:aws:iam::903692715234:root",
          "arn:aws:iam::035351147821:root", 
          "arn:aws:iam::859597730677:root",
          "arn:aws:iam::814480443879:root",
          "arn:aws:iam::216624486486:root",
          "arn:aws:iam::086441151436:root",
          "arn:aws:iam::388731089494:root",
          "arn:aws:iam::284668455005:root",
          "arn:aws:iam::113285607260:root"
        ]
      }
new_statement.pop("Condition", None)
statements.append(new_statement)
new_policy = json.dumps(policy_obj)
sns_conn.set_topic_attributes(sns_arn,"Policy",new_policy)

CloudTrail是和区域(Region)相关的,不同的Region有不同的CloudTrail服务,所以,在创建对应的SNS时,需要保证使用同一个Region。

这里要注意的是我们创建了新的policy来使得CloudTrail拥有向我们创建的SNS发布消息(Action=“SNS:Publish”)的权限。我们的做法是从缺省的策略中拷贝了一份,修改了相应的Action和Sid(随便取一个不重复的名字),Principal部分是一个缺省的account的列表,这里是硬编码,AWS有可能会修改该列表的值,但在当前环境下,该值是固定的。最后移除Condition的值。把新创建的Policy片段添加到原来的Policy中就好了。

然后我们需要创建一个SQS的队列,并订阅我们创建的SNS的Topic。这一步相对比较简单。

import boto.sqs

sqs_queue_name="sqs_queue"
sqs_conn = boto.sqs.connect_to_region(region_name,
                                         aws_access_key_id=key_id,
                                         aws_secret_access_key=secret_key)
sqs_queue = sqs_conn.create_queue(sqs_queue_name)
sns_conn.subscribe_sqs_queue(sns_arn, sqs_queue)

然后,我们需要创建一个S3的Bucket用来存储CloudTrail产生的日志文件。同样的,需要指定响应的策略以保证CloudTrail能够有权限写入对应的日志文件。

import boto

bucket_name="bucket000"
policy_sid="testpolicy000"
s3_conn = boto.connect_s3(aws_access_key_id=key_id,aws_secret_access_key=secret_key)
bucket = s3_conn.create_bucket(bucket_name)
bucket_policy = '''{
	"Version": "2012-10-17",
	"Statement": [
		{
			"Sid": "%Sid%GetPolicy",
			"Effect": "Allow",
			"Principal": {
				"AWS": [
					"arn:aws:iam::903692715234:root",
					"arn:aws:iam::035351147821:root",
					"arn:aws:iam::859597730677:root",
					"arn:aws:iam::814480443879:root",
					"arn:aws:iam::216624486486:root",
					"arn:aws:iam::086441151436:root",
					"arn:aws:iam::388731089494:root",
					"arn:aws:iam::284668455005:root",
					"arn:aws:iam::113285607260:root"
				]
			},
			"Action": "s3:GetBucketAcl",
			"Resource": "arn:aws:s3:::%bucket_name%"
		},
		{
			"Sid": "%Sid%PutPolicy",
			"Effect": "Allow",
			"Principal": {
				"AWS": [
					"arn:aws:iam::903692715234:root",
					"arn:aws:iam::035351147821:root",
					"arn:aws:iam::859597730677:root",
					"arn:aws:iam::814480443879:root",
					"arn:aws:iam::216624486486:root",
					"arn:aws:iam::086441151436:root",
					"arn:aws:iam::388731089494:root",
					"arn:aws:iam::284668455005:root",
					"arn:aws:iam::113285607260:root"
				]
			},
			"Action": "s3:PutObject",
			"Resource": "arn:aws:s3:::%bucket_name%/*",
			"Condition": {
				"StringEquals": {
					"s3:x-amz-acl": "bucket-owner-full-control"
				}
			}
		}
	]
}'''
bucket_policy = bucket_policy.replace("%bucket_name%",bucket_name)
bucket_policy = bucket_policy.replace("%Sid%",policy_sid)
bucket.set_policy(bucket_policy)

这里我们使用一个缺省的Policy文件,替换掉响应的字段就好了。

最后,我们创建CloudTrail的服务:

import boto.cloudtrail

trail_name="Trailabc"
log_prefix="log"

cloudtrail_conn=boto.cloudtrail.connect_to_region(region_name,
                                         aws_access_key_id=key_id,
                                         aws_secret_access_key=secret_key)

##cloudtrail_conn.describe_trails()
cloudtrail_conn.create_trail(trail_name,bucket_name, s3_key_prefix=log_prefix,sns_topic_name=trail_topic_name)
cloudtrail_conn.start_logging(trail_name)

好了,现在CloudTrail已经配置好了,并且关联的SNS也被我们创建的SQS队列订阅,下面我们就可以抓取日志了

获取日志数据

每当有一个API调用,CloudTrail都会把响应的日志文件写入到S3我们创建的Bucket中,同时在我们在创建的SNS的topic中发布一条消息,因为我们使用SQS的队列订阅了该消息,所以我们可以通过读取SQS消息的方式来获得日志数据。

首先连接到SQS的队列,并从中读取消息

import boto.sqs

sqs_queue_name="sqs_queue"
sqs_conn = boto.sqs.connect_to_region(region_name,
                                         aws_access_key_id=key_id,
                                         aws_secret_access_key=secret_key)
                                         
sqs_queue = sqs_conn.get_queue(sqs_queue_name)
notifications = sqs_queue.get_messages()

然后我们从消息中获得响应的日志文件在S3中的地址,并利用该地址从S3中获得对应的日志文件

for notification in notifications:
    envelope = json.loads(notification.get_body())
    message = json.loads(envelope['Message'])
    bucket_name = message['s3Bucket']
    s3_bucket = s3_conn.get_bucket(bucket_name)
    for key in message['s3ObjectKey']:
        s3_file = s3_bucket.get_key(key)
        with io.BytesIO(s3_file.read()) as bfile:
            with gzip.GzipFile(fileobj=bfile) as gz:
                logjson = json.loads(gz.read())

logjson就是对应的日记内容的JSON格式。这里有一个例子

{
    "Records": [{
        "eventVersion": "1.0",
        "userIdentity": {
            "type": "IAMUser",
            "principalId": "EX_PRINCIPAL_ID",
            "arn": "arn:aws:iam::123456789012:user/Alice",
            "accessKeyId": "EXAMPLE_KEY_ID",
            "accountId": "123456789012",
            "userName": "Alice"
        },
        "eventTime": "2014-03-06T21:22:54Z",
        "eventSource": "ec2.amazonaws.com",
        "eventName": "StartInstances",
        "awsRegion": "us-west-2",
        "sourceIPAddress": "205.251.233.176",
        "userAgent": "ec2-api-tools 1.6.12.2",
        "requestParameters": {
            "instancesSet": {
                 "items": [{
                      "instanceId": "i-ebeaf9e2"
                }]
            }
        },
        "responseElements": {
            "instancesSet": {
                "items": [{
                      "instanceId": "i-ebeaf9e2",
                      "currentState": {
                          "code": 0,
                          "name": "pending"
                      },
                      "previousState": {
                          "code": 80,
                          "name": "stopped"
                      }
                    }]
            }
        }
    },
    ... additional entries ...
	]
}

你可以使用以上代码来监控所有的cloudtrail的日志,拿到的JSON格式的日志可以放在你的数据库(Mongo不错)中,然后利用你的BI工具做分析。

注意你也可以不创建SNS和SQS,直接扫描bucket的内容,这样做的好处是配置更简单,缺点是实时性比较差,扫面Bucket需要额外的计算,并且需要在本地保存文件扫描的状态,code会更加复杂。

利用CloudTrail的日志,你可以做很多事情,比如看看有没有非法的登陆,各个服务的使用频率,总之,当你有了足够多的数据,你就可以从中发现足够的价值。


使用Python进行并发编程

2015年3月20日 05:46

让计算机程序并发的运行是一个经常被讨论的话题,今天我想讨论一下Python下的各种并发方式。

并发方式

线程(Thread

多线程几乎是每一个程序猿在使用每一种语言时都会首先想到用于解决并发的工具(JS程序员请回避),使用多线程可以有效的利用CPU资源(Python例外)。然而多线程所带来的程序的复杂度也不可避免,尤其是对竞争资源的同步问题。

然而在python中由于使用了全局解释锁(GIL)的原因,代码并不能同时在多核上并发的运行,也就是说,Python的多线程不能并发,很多人会发现使用多线程来改进自己的Python代码后,程序的运行效率却下降了,这是多么蛋疼的一件事呀!如果想了解更多细节,推荐阅读这篇文章。实际上使用多线程的编程模型是很困难的,程序员很容易犯错,这并不是程序员的错误,因为并行思维是反人类的,我们大多数人的思维是串行(精神分裂不讨论),而且冯诺依曼设计的计算机架构也是以顺序执行为基础的。所以如果你总是不能把你的多线程程序搞定,恭喜你,你是个思维正常的程序猿:)

Python提供两组线程的接口,一组是thread模块,提供基础的,低等级(Low Level)接口,使用Function作为线程的运行体。还有一组是threading模块,提供更容易使用的基于对象的接口(类似于Java),可以继承Thread对象来实现线程,还提供了其它一些线程相关的对象,例如Timer,Lock

使用thread模块的例子

import thread
def worker():
    """thread worker function"""
    print 'Worker'
thread.start_new_thread(worker)

使用threading模块的例子

import threading
def worker():
    """thread worker function"""
    print 'Worker'
t = threading.Thread(target=worker)
t.start()

 或者Java Style

import threading
class worker(threading.Thread):
    def __init__(self):
        pass
    def run():
        """thread worker function"""
        print 'Worker'
    
t = worker()
t.start()


进程 (Process)

由于前文提到的全局解释锁的问题,Python下比较好的并行方式是使用多进程,这样可以非常有效的使用CPU资源,并实现真正意义上的并发。当然,进程的开销比线程要大,也就是说如果你要创建数量惊人的并发进程的话,需要考虑一下你的机器是不是有一颗强大的心。

Python的mutliprocess模块和threading具有类似的接口。

from multiprocessing import Process

def worker():
    """thread worker function"""
    print 'Worker'
p = Process(target=worker)
p.start()
p.join()

由于线程共享相同的地址空间和内存,所以线程之间的通信是非常容易的,然而进程之间的通信就要复杂一些了。常见的进程间通信有,管道,消息队列,Socket接口(TCP/IP)等等。

Python的mutliprocess模块提供了封装好的管道和队列,可以方便的在进程间传递消息。

Python进程间的同步使用锁,这一点喝线程是一样的。

另外,Python还提供了进程池Pool对象,可以方便的管理和控制线程。


远程分布式主机 (Distributed Node)

随着大数据时代的到临,摩尔定理在单机上似乎已经失去了效果,数据的计算和处理需要分布式的计算机网络来运行,程序并行的运行在多个主机节点上,已经是现在的软件架构所必需考虑的问题。

远程主机间的进程间通信有几种常见的方式

  • TCP/IP

    TCP/IP是所有远程通信的基础,然而API比较低级别,使用起来比较繁琐,所以一般不会考虑

  • 远程方法调用 Remote Function Call

    RPC是早期的远程进程间通信的手段。Python下有一个开源的实现RPyC

  • 远程对象 Remote Object

    远程对象是更高级别的封装,程序可以想操作本地对象一样去操作一个远程对象在本地的代理。远程对象最广为使用的规范CORBA,CORBA最大的好处是可以在不同语言和平台中进行通信。当让不用的语言和平台还有一些各自的远程对象实现,例如Java的RMI,MS的DCOM

    Python的开源实现,有许多对远程对象的支持

  • 消息队列 Message Queue

    比起RPC或者远程对象,消息是一种更为灵活的通信手段,常见的支持Python接口的消息机制有

在远程主机上执行并发和本地的多进程并没有非常大的差异,都需要解决进程间通信的问题。当然对远程进程的管理和协调比起本地要复杂。

Python下有许多开源的框架来支持分布式的并发,提供有效的管理手段包括:

  • Celery 

    Celery是一个非常成熟的Python分布式框架,可以在分布式的系统中,异步的执行任务,并提供有效的管理和调度功能。参考这里

  • SCOOP

    SCOOP Scalable COncurrent Operations in Python)提供简单易用的分布式调用接口,使用Future接口来进行并发。

  • Dispy

    相比起Celery和SCOOP,Dispy提供更为轻量级的分布式并行服务

  • PP 

    PP (Parallel Python)是另外一个轻量级的Python并行服务, 参考这里

  • Asyncoro

    Asyncoro是另一个利用Generator实现分布式并发的Python框架,

当然还有许多其它的系统,我没有一一列出

另外,许多的分布式系统多提供了对Python接口的支持,例如Spark


伪线程 (Pseudo-Thread)

还有一种并发手段并不常见,我们可以称之为伪线程,就是看上去像是线程,使用的接口类似线程接口,但是实际使用非线程的方式,对应的线程开销也不存的。

  • greenlet 

    greenlet提供轻量级的coroutines来支持进程内的并发。

    greenlet是Stackless的一个副产品,使用tasklet来支持一中被称之为微线程(mirco-thread)的技术,这里是一个使用greenlet的伪线程的例子

from greenlet import greenlet

def test1():
    print 12
    gr2.switch()
    print 34
    
def test2():
    print 56
    gr1.switch()
    print 78
    
gr1 = greenlet(test1)
gr2 = greenlet(test2)
gr1.switch()

        运行以上程序得到如下结果:

12
56
34

伪线程gr1 switch会打印12,然后调用gr2 switch得到56,然后switch回到gr1,打印34,然后伪线程gr1结束,程序退出,所以78永远不会被打印。通过这个例子我们可以看出,使用伪线程,我们可以有效的控制程序的执行流程,但是伪线程并不存在真正意义上的并发。

eventlet,gevent和concurence都是基于greenlet提供并发的。

eventlet是一个提供网络调用并发的Python库,使用者可以以非阻塞的方式累调用阻塞的IO操作。

import eventlet
from eventlet.green import urllib2

urls = ['http://www.google.com', 'http://www.example.com', 'http://www.python.org']

def fetch(url):
    return urllib2.urlopen(url).read()

pool = eventlet.GreenPool()

for body in pool.imap(fetch, urls):
    print("got body", len(body))

执行结果如下

('got body', 17629)
('got body', 1270)
('got body', 46949)

eventlet为了支持generator的操作对urllib2做了修改,接口和urllib2是一致的。这里的GreenPool和Python的Pool接口一致。

gevent和eventlet类似,关于它们的差异大家可以参考这篇文章

import gevent
from gevent import socket
urls = ['www.google.com', 'www.example.com', 'www.python.org']
jobs = [gevent.spawn(socket.gethostbyname, url) for url in urls]
gevent.joinall(jobs, timeout=2)

print [job.value for job in jobs]

执行结果如下:

['206.169.145.226', '93.184.216.34', '23.235.39.223']
  • concurence https://github.com/concurrence/concurrence

concurence是另外一个利用greenlet提供网络并发的开源库,我没有用过,大家可以自己尝试一下。


实战运用

通常需要用到并发的场合有两种,一种是计算密集型,也就是说你的程序需要大量的CPU资源;另一种是IO密集型,程序可能有大量的读写操作,包括读写文件,收发网络请求等等。

计算密集型

对应计算密集型的应用,我们选用著名的蒙特卡洛算法来计算PI值。基本原理如下

蒙特卡洛算法利用统计学原理来模拟计算圆周率,在一个正方形中,一个随机的点落在1/4圆的区域(红色点)的概率与其面积成正比。也就该概率 p = Pi * R*R /4  : R* R , 其中R是正方形的边长,圆的半径。也就是说该概率是圆周率的1/4, 利用这个结论,只要我们模拟出点落在四分之一圆上的概率就可以知道圆周率了,为了得到这个概率,我们可以通过大量的实验,也就是生成大量的点,看看这个点在哪个区域,然后统计出结果。

基本算法如下:

from math import hypot
from random import random

def test(tries):
    return sum(hypot(random(), random()) < 1 for _ in range(tries))

这里test方法做了n(tries)次试验,返回落在四分之一圆中的点的个数。判断方法是检查该点到圆心的距离,如果小于R则是在圆上。

通过大量的并发,我们可以快速的运行多次试验,试验的次数越多,结果越接近真实的圆周率。

这里给出不同并发方法的程序代码

  • 非并发

    我们先在单线程,但进程运行,看看性能如何

from math import hypot
from random import random
import eventlet
import time

def test(tries):
    return sum(hypot(random(), random()) < 1 for _ in range(tries))

def calcPi(nbFutures, tries):
    ts = time.time()
    result = map(test, [tries] * nbFutures)
    
    ret = 4. * sum(result) / float(nbFutures * tries)
    span = time.time() - ts
    print "time spend ", span
    return ret

print calcPi(3000,4000)
  • 多线程 thread

    为了使用线程池,我们用multiprocessing的dummy包,它是对多线程的一个封装。注意这里代码虽然一个字的没有提到线程,但它千真万确是多线程。

    通过测试我们开(jing)心(ya)的发现,果然不出所料,当线程池为1是,它的运行结果和没有并发时一样,当我们把线程池数字设置为5时,耗时几乎是没有并发的2倍,我的测试数据从5秒到9秒。所以对于计算密集型的任务,还是放弃多线程吧。

from multiprocessing.dummy import Pool

from math import hypot
from random import random
import time

def test(tries):
    return sum(hypot(random(), random()) < 1 for _ in range(tries))

def calcPi(nbFutures, tries):
    ts = time.time()
    p = Pool(1)
    result = p.map(test, [tries] * nbFutures)
    ret = 4. * sum(result) / float(nbFutures * tries)
    span = time.time() - ts
    print "time spend ", span
    return ret

if __name__ == '__main__':
    p = Pool()
    print("pi = {}".format(calcPi(3000, 4000)))
  • 多进程 multiprocess

    理论上对于计算密集型的任务,使用多进程并发比较合适,在以下的例子中,进程池的规模设置为5,修改进程池的大小可以看到对结果的影响,当进程池设置为1时,和多线程的结果所需的时间类似,因为这时候并不存在并发;当设置为2时,响应时间有了明显的改进,是之前没有并发的一半;然而继续扩大进程池对性能影响并不大,甚至有所下降,也许我的Apple Air的CPU只有两个核?

    当心,如果你设置一个非常打的进程池,你会遇到 Resource temporarily unavailable的错误,系统并不能支持创建太多的进程,毕竟资源是有限的。

from multiprocessing import Pool

from math import hypot
from random import random
import time

def test(tries):
    return sum(hypot(random(), random()) < 1 for _ in range(tries))

def calcPi(nbFutures, tries):
    ts = time.time()
    p = Pool(5)
    result = p.map(test, [tries] * nbFutures)
    ret = 4. * sum(result) / float(nbFutures * tries)
    span = time.time() - ts
    print "time spend ", span
    return ret

if __name__ == '__main__':
    p = Pool()
    print("pi = {}".format(calcPi(3000, 4000)))
  • gevent (伪线程)

    不论是gevent还是eventlet,因为不存在实际的并发,响应时间和没有并发区别不大,这个和测试结果一致。

import gevent
from math import hypot
from random import random
import time

def test(tries):
    return sum(hypot(random(), random()) < 1 for _ in range(tries))

def calcPi(nbFutures, tries):
    ts = time.time()
    jobs = [gevent.spawn(test, t) for t in [tries] * nbFutures]
    gevent.joinall(jobs, timeout=2)
    ret = 4. * sum([job.value for job in jobs]) / float(nbFutures * tries)
    span = time.time() - ts
    print "time spend ", span
    return ret

print calcPi(3000,4000)
  • eventlet (伪线程)

from math import hypot
from random import random
import eventlet
import time

def test(tries):
    return sum(hypot(random(), random()) < 1 for _ in range(tries))

def calcPi(nbFutures, tries):
    ts = time.time()
    pool = eventlet.GreenPool()
    result = pool.imap(test, [tries] * nbFutures)
    
    ret = 4. * sum(result) / float(nbFutures * tries)
    span = time.time() - ts
    print "time spend ", span
    return ret

print calcPi(3000,4000)
  • SCOOP

SCOOP中的Future接口符合PEP-3148的定义,也就是在Python3中提供的Future接口。

在缺省的SCOOP配置环境下(单机,4个Worker),并发的性能有提高,但是不如两个进程池配置的多进程。

from math import hypot
from random import random
from scoop import futures

import time

def test(tries):
    return sum(hypot(random(), random()) < 1 for _ in range(tries))

def calcPi(nbFutures, tries):
    ts = time.time()
    expr = futures.map(test, [tries] * nbFutures)
    ret = 4. * sum(expr) / float(nbFutures * tries)
    span = time.time() - ts
    print "time spend ", span
    return ret

if __name__ == "__main__":
    print("pi = {}".format(calcPi(3000, 4000)))
  • Celery

任务代码

from celery import Celery

from math import hypot
from random import random
 
app = Celery('tasks', backend='amqp', broker='amqp://guest@localhost//')
app.conf.CELERY_RESULT_BACKEND = 'db+sqlite:///results.sqlite'
 
@app.task
def test(tries):
    return sum(hypot(random(), random()) < 1 for _ in range(tries))

客户端代码

from celery import group
from tasks import test

import time

def calcPi(nbFutures, tries):
    ts = time.time()
    result = group(test.s(tries) for i in xrange(nbFutures))().get()
    
    ret = 4. * sum(result) / float(nbFutures * tries)
    span = time.time() - ts
    print "time spend ", span
    return ret

print calcPi(3000, 4000)

使用Celery做并发的测试结果出乎意料(环境是单机,4frefork的并发,消息broker是rabbitMQ),是所有测试用例里最糟糕的,响应时间是没有并发的5~6倍。这也许是因为控制协调的开销太大。对于这样的计算任务,Celery也许不是一个好的选择。

  • asyncoro

    Asyncoro的测试结果和非并发保持一致。

import asyncoro

from math import hypot
from random import random
import time

def test(tries):
    yield sum(hypot(random(), random()) < 1 for _ in range(tries))


def calcPi(nbFutures, tries):
    ts = time.time()
    coros = [ asyncoro.Coro(test,t) for t in [tries] * nbFutures]
    ret = 4. * sum([job.value() for job in coros]) / float(nbFutures * tries)
    span = time.time() - ts
    print "time spend ", span
    return ret

print calcPi(3000,4000)


IO密集型

IO密集型的任务是另一种常见的用例,例如网络WEB服务器就是一个例子,每秒钟能处理多少个请求时WEB服务器的重要指标。

我们就以网页读取作为最简单的例子

from math import hypot
import time
import urllib2

urls = ['http://www.google.com', 'http://www.example.com', 'http://www.python.org']

def test(url):
    return urllib2.urlopen(url).read()

def testIO(nbFutures):
    ts = time.time()
    map(test, urls * nbFutures)

    span = time.time() - ts
    print "time spend ", span

testIO(10)

在不同并发库下的代码,由于比较类似,我就不一一列出。大家可以参考计算密集型中代码做参考。

通过测试我们可以发现,对于IO密集型的任务,使用多线程,或者是多进程都可以有效的提高程序的效率,而使用伪线程性能提升非常显著,eventlet比没有并发的情况下,响应时间从9秒提高到0.03秒。同时eventlet/gevent提供了非阻塞的异步调用模式,非常方便。这里推荐使用线程或者伪线程,因为在响应时间类似的情况下,线程和伪线程消耗的资源更少。


总结

Python提供了不同的并发方式,对应于不同的场景,我们需要选择不同的方式进行并发。选择合适的方式,不但要对该方法的原理有所了解,还应该做一些测试和试验,数据才是你做选择的最好参考。


Python 并行分布式框架之 PP

2015年3月20日 05:44

PP (Parallel Python)是基于Python的一个轻量级的,提供在SMP(多处理器或者多核系统)或者集群环境中并行执行Python代码的机制。

最简单和最常见的并行方式是使用多线程,然而如果应用程序使用Python提供的线程库, 它实际上并不能并行的运行Python的字节码(Byte-Code)。这是因为Pyton解释器使用GIL(全局解释器锁),这样的机制是的在同一时间,即使是多核的机器,也只能运行一个字节码指令。PP试图克服这样的限制,提供一种更简单的方式来编写并行应用。PP采用了多进程和进程间通信来处理并发,并隐藏所有的实现细节,使得其容易使用。

功能介绍

  • 并行运行Python代码(废话)

  • 易于理解的基于任务(Job)的并行技术

  • 自动检测优化配置

  • 动态处理器分配

  • 负载均衡

  • 容错

  • 自动发现和动态分配计算资源

  • 基于SHA的网络连接认证

  • 跨平台(Windows,Linux, Unix, Mac OS)和架构(X86,X86-64)支持

  • 开源 (BSD)

安装

wget  
tar -xvf pp-1.6.4.tar.gz
cd pp-1.6.4
sudo python setup.py install

架构和设计

Server

PP server 包含并管理多个worker并行的执行客户端发送的任务

Client

客户端负责发送任务(Python Function)到服务器

Cluster

多个PP Server可以构成一个PP Cluster,在CLuster模式客户端提交任务到Cluster,cluster 找到合适的Server来运行任务。

使用方式

SMP

在SMP的模式下使用PP非常简单

import pp
# create a job serer
job_server = pp.Server()

# submit some jobs with python functions
f1 = job_server.submit(func1, args1, depfuncs1, modules1)
f2 = job_server.submit(func1, args2, depfuncs1, modules1)
f3 = job_server.submit(func2, args3, depfuncs2, modules2)

# Get result from each jobs
r1 = f1()
r2 = f2()
r3 = f3()

Cluster

在Cluster模式下,需要在不同的节点运行ppserver

node-1> ./ppserver.py
node-2> ./ppserver.py
node-3> ./ppserver.py

客户端代码和SMP模式类似

import pp

# create cluster
ppservers=("node-1", "node-2", "node-3")

# create a job serer
pp.Server(ppservers=ppservers) 

# submit some jobs with python functions
f1 = job_server.submit(func1, args1, depfuncs1, modules1)
f2 = job_server.submit(func1, args2, depfuncs1, modules1)
f3 = job_server.submit(func2, args3, depfuncs2, modules2)

# Get result from each jobs
r1 = f1()
r2 = f2()
r3 = f3()


总结

PP是一个轻量级的并行框架,代码不多,安装使用起来也比较简单,并行方式是多进程,但是缺乏对任务的封装,也缺少调度的功能。适合于构造简单的并行分布式系统。

Celery (芹菜)是基于Python开发的分布式任务队列。它支持使用任务队列的方式在分布的机器/进程/线程上执行任务调度。

架构设计

Celery的架构由三部分组成,消息中间件(message broker),任务执行单元(worker)和任务执行结果存储(task result store)组成。

  • 消息中间件

    Celery本身不提供消息服务,但是可以方便的和第三方提供的消息中间件集成。包括,RabbitMQRedisMongoDB (experimental), Amazon SQS (experimental),CouchDB (experimental), SQLAlchemy (experimental),Django ORM (experimental), IronMQ

  • 任务执行单元

    Worker是Celery提供的任务执行的单元,worker并发的运行在分布式的系统节点中。

  • 任务结果存储

    Task result store用来存储Worker执行的任务的结果,Celery支持以不同方式存储任务的结果,包括AMQP, Redis,memcached, MongoDB,SQLAlchemy, Django ORM,Apache Cassandra, IronCache

另外, Celery还支持不同的并发和序列化的手段

  • 并发

    PreforkEventletgevent, threads/single threaded

  • 序列化

    picklejsonyamlmsgpackzlibbzip2 compression, Cryptographic message signing 等等

安装和运行

Celery的安装过程略为复杂,下面的安装过程是基于我的AWS EC2的Linux版本的安装过程,不同的系统安装过程可能会有差异。大家可以参考官方文档。

首先我选择RabbitMQ作为消息中间件,所以要先安装RabbitMQ。作为安装准备,先更新YUM。

sudo yum -y update

RabbitMQ是基于erlang的,所以先安装erlang

# Add and enable relevant application repositories:
# Note: We are also enabling third party remi package repositories.
wget http://dl.fedoraproject.org/pub/epel/6/x86_64/epel-release-6-8.noarch.rpm
wget http://rpms.famillecollet.com/enterprise/remi-release-6.rpm
sudo rpm -Uvh remi-release-6*.rpm epel-release-6*.rpm

# Finally, download and install Erlang:
yum install -y erlang

然后安装RabbitMQ

# Download the latest RabbitMQ package using wget:
wget  
# Add the necessary keys for verification:
rpm --import  
# Install the .RPM package using YUM:
yum install rabbitmq-server-3.2.2-1.noarch.rpm

启动RabbitMQ服务

rabbitmq-server start

RabbitMQ服务已经准备好了,然后安装Celery, 假定你使用pip来管理你的python安装包

pip install Celery

为了测试Celery是否工作,我们运行一个最简单的任务,编写tasks.py

from celery import Celery

app = Celery('tasks', backend='amqp', broker='amqp://guest@localhost//')
app.conf.CELERY_RESULT_BACKEND = 'db+sqlite:///results.sqlite'

@app.task
def add(x, y):
    return x + y

在当前目录运行一个worker,用来执行这个加法的task

celery -A tasks worker --loglevel=info

其中-A参数表示的是Celery App的名字。注意这里我使用的是SQLAlchemy作为结果存储。对应的python包要事先安装好。

worker日志中我们会看到这样的信息

- ** ---------- [config]
- ** ---------- .> app:         tasks:0x1e68d50
- ** ---------- .> transport:   amqp://guest:**@localhost:5672//
- ** ---------- .> results:     db+sqlite:///results.sqlite
- *** --- * --- .> concurrency: 8 (prefork)

其中,我们可以看到worker缺省使用prefork来执行并发,并设置并发数为8

下面的任务执行的客户端代码:

from tasks import add
import time
result = add.delay(4,4)

while not result.ready():
  print "not ready yet"
  time.sleep(5)

print result.get()

用python执行这段客户端代码,在客户端,结果如下

not ready   
8

Work日志显示

[2015-03-12 02:54:07,973: INFO/MainProcess] Received task: tasks.add[34c4210f-1bc5-420f-a421-1500361b914f]
[2015-03-12 02:54:08,006: INFO/MainProcess] Task tasks.add[34c4210f-1bc5-420f-a421-1500361b914f] succeeded in 0.0309705100954s: 8

这里我们可以发现,每一个task有一个唯一的ID,task异步执行在worker上。

这里要注意的是,如果你运行官方文档中的例子,你是无法在客户端得到结果的,这也是我为什么要使用SQLAlchemy来存储任务执行结果的原因。官方的例子使用AMPQ,有可能Worker在打印日志的时候取出了task的运行结果显示在worker日志中,然而AMPQ作为一个消息队列,当消息被取走后,队列中就没有了,于是客户端总是无法得到任务的执行结果。不知道为什么官方文档对这样的错误视而不见。

如果大家想要对Celery做更进一步的了解,请参考官方文档

不管是开源还是闭源,文档都是很重要的。当然理论上说,最好的文档就是代码本身,但是要让所有人都能读懂你的代码这太难了。所以我们要写文档。大部分情况,我们不希望维护一份代码再加上一份文档,这样做很容易造成文档和代码的不一致,程序员最讨厌更新文档了。所以最佳实践就是在程序员代码中加注释,然后通过构建脚本自通生成文档。

对应于Pyhon,有很多可供选择的工具:

  • sphinx 中文版介绍 Sphinx使用 reStructuredText作为标记语言(类似Markdown),可扩展,功能强大。要注意的是何有一个开源的搜索也叫Sphinx,斯芬克斯果然太受欢迎,开源的世界起个名字不容易呀。

  • pdoc 是一个简单易用的命令行工具,可以生成Python的API文档

  • Doxygen 是老牌的文档生成工具,可以针对各种语言生成文档,我们以前在C++的项目中曾经使用过

  • 其他还有诸如 pydoc , pydoctor 等等

下面我就介绍一下如果使用Sphinx为你的python项目快速的构建API 文档。

首先要安装Sphinx,不同的操作系统有不同的安装方式,Sphinx的源代码在这里 , 你也可以自己构建。我推荐使用pip install。(注,如果你安装了Anaconda,Sphinx已经包含在内了)

然后,假定你的python的源代码是在 src 目录下,我们在同一级并行建立一个文档目录 doc (你当然可以根据自己的项目需要,确定目录命名和结构)。

在doc目录下运行 

sphinx-quickstart

sphinx会提示你的项目的一些设置,生成项目的配置文件,这里给出一些推荐的配置:

> Root path for the documentation [.]:
<ENTER>
> Separate source and build directories (y/N) [n]:
y
> Name prefix for templates and static dir [_]:
<ENTER>
> Project name:
an_example_pypi_project
> Author name(s):
Andrew Carter
> Project version:
0.0.1
> Project release [0.0.1]:
<ENTER>
> Source file suffix [.rst]:
<ENTER>
> Name of your master document (without suffix) [index]:
<ENTER>
> autodoc: automatically insert docstrings from modules (y/N) [n]:
y
> doctest: automatically test code snippets in doctest blocks (y/N) [n]:
n
> intersphinx: link between Sphinx documentation of different projects (y/N) [n]:
y
> todo: write “todo” entries that can be shown or hidden on build (y/N) [n]:
n
> coverage: checks for documentation coverage (y/N) [n]:
n
> pngmath: include math, rendered as PNG images (y/N) [n]:
n
> jsmath: include math, rendered in the browser by JSMath (y/N) [n]:
n
> ifconfig: conditional inclusion of content based on config values (y/N) [n]:
y
> Create Makefile? (Y/n) [y]:
n
> Create Windows command file? (Y/n) [y]:
n

运行完毕,sphinx会生成项目的配置文档conf.py还有源文件(后缀为rst)

下一步要为捏python源文件生成sphinx的源文件,用来生成API文档,需要运行

sphinx-apidoc [options] -o outputdir packagedir [pathnames]

其中outputdir是doc目录,packagedir是src目录,也就是你的python代码包所在的目录

运行好后,会对每一个Python包生成一个rst文件,你可以编辑该文件来修改生成文档的细节,一般情况下不用改。

好了,准备工作做好了以后,就可以生成API文档了。在运行文档生成脚本之前,要确保你的Python源代码所在的包在系统路径中是可以找到的,需要修改conf.py。因为在生成文档是需要运行你的python代码,要保证code运行不出错。

sys.path.insert(0, os.path.abspath('../src'))

在doc目录下运行脚本

sphinx-build -b html . ./ouput

在output目录会生成HTML格式的API文档。(也可以选其他文档格式)

Sphinx还有一个automsummay的扩展,可能能简化以上的过程,等我试一试在更新结果。


在我们的开发工程中经常会使用到各种图,所谓的图就是由节点和节点之间的连接所形成的系统,数学上专门有一个分支叫图论(Graph Theroy)。利用图我们可以做很多工具,比如思维导图,流程图,状态机,组织架构图,等等。今天我要做的是用开源的HTML5工具来快速构造一个做图的工具。

工具选择

预先善其事,必先利其器。第一件事是选择一件合适的工具,开源时代,程序员还是很幸福的,选择很多。

最终,我选择了jsPlumb,因为它完全开源,使用很简单,用D3的话可能会多花很多功夫。joint.js也不错。大家可以根据自己的需要选择。

构建静态应用

下面我们一步一步的来使用jsPlumb来创建我们的流程图工具。

第一步是等待DOM和jsPlumb初始化完毕,类似document.ready()和jquery.ready(), 要使用jsPlumb, 需要把代码放在这个函数里:

jsPlumb.ready(function() {
    // ... your code goes here ...
}

 

创建一个jsPlumb的实例,并初始化jsPlumb的配置参数:

//Initialize JsPlumb
var color = "#E8C870";
var instance = jsPlumb.getInstance({
    // notice the 'curviness' argument to this Bezier curve.  the curves on this page are far smoother
    // than the curves on the first demo, which use the default curviness value.      
    Connector : [ "Bezier", { curviness:50 } ],
    DragOptions : { cursor: "pointer", zIndex:2000 },
    PaintStyle : { strokeStyle:color, lineWidth:2 },
    EndpointStyle : { radius:5, fillStyle:color },
    HoverPaintStyle : {strokeStyle:"#7073EB" },
    EndpointHoverStyle : {fillStyle:"#7073EB" },
    Container:"container-id"
 });

 

这里给给出了一些配置包括,连接线(这里配置了一个贝塞尔曲线),线的风格,连接点得风格。Container需要配置一个对应的DIV容器的id。(这里也可以使用setContainer的方法)

下面我们要创建一个节点(node),每一个节点可以用一个DIV来实现。我这里提供了一个函数来创建节点。

function addNode(parentId, nodeId, nodeLable, position) {
  var panel = d3.select("#" + parentId);
  panel.append('div').style('width','120px').style('height','50px')
    .style('position','absolute')
    .style('top',position.y).style('left',position.x)
    .style('border','2px #9DFFCA solid').attr('align','center')
    .attr('id',nodeId).classed('node',true)
    .text(nodeLable);

  return jsPlumb.getSelector('#' + nodeId)[0];
}

 

这里做的事情就是创建了一个DIV元素,并放在对应的容器的制定位置上,注意为了支持拖拽的功能,必须使用position:absolute 。

我使用D3来操作DOM,大家可能会更习惯JQuery,这纯属个人喜好的问题。

最后返回创建节点的实例引用,这是的selector使用了jsPlumb.getSelector()方法,它和JQuery的selector是一样的,这样用的好处是你可以使用不同的DOM操作库,例如Vanilla

下面我使用一个函数来创建端点/锚点(anchor),锚点就是节点上的连接点,用于连接不同的节点。

function addPorts(instance, node, ports, type) {
  //Assume horizental layout
  var number_of_ports = ports.length;
  var i = 0;
  var height = $(node).height();  //Note, jquery does not include border for height
  var y_offset = 1 / ( number_of_ports + 1);
  var y = 0;

  for ( ; i < number_of_ports; i++ ) {
    var anchor = [0,0,0,0];
    var paintStyle = { radius:5, fillStyle:'#FF8891' };
    var isSource = false, isTarget = false;
    if ( type === 'output' ) {
      anchor[0] = 1;
      paintStyle.fillStyle = '#D4FFD6';
      isSource = true;
    } else {
      isTarget =true;
    }

    anchor[1] = y + y_offset;
    y = anchor[1];

    instance.addEndpoint(node, {
      uuid:node.getAttribute("id") + "-" + ports[i],
      paintStyle: paintStyle,
      anchor:anchor,
      maxConnections:-1,
      isSource:isSource,
      isTarget:isTarget
    });
  }
}

 

instance是jsPlumb的实例

node是我们用addNode方法创建的Node实例

ports,是一个string的数组,指定端点的个数和名字

type,可能是output或者input,指定端点的种类,一个节点的输出端口可以连接另一个节点的输入端口。

这里anchor是一个四维数组,0维和1维分别是锚点在节点x轴和y轴的偏移百分比。我这里希望把端口画在节点的左右两侧,并按照端口的数量均匀分布。

最后使用instance.addEndpoint来创建端点。注意这里只要指定isSource和isTarget就可以用drag&drop的方式来连接端点,非常方便。

下面一步我们提供一个函数来连接端点:

function connectPorts(instance, node1, port1, node2 , port2) {
  // declare some common values:
  var color = "gray";
  var arrowCommon = { foldback:0.8, fillStyle:color, width:5 },
  // use three-arg spec to create two different arrows with the common values:
  overlays = [
    [ "Arrow", { location:0.8 }, arrowCommon ],
    [ "Arrow", { location:0.2, direction:-1 }, arrowCommon ]
  ];

  var uuid_source = node1.getAttribute("id") + "-" + port1;
  var uuid_target = node2.getAttribute("id") + "-" + port2;

  instance.connect({uuids:[uuid_source, uuid_target]});
}

 

node1和node2是源节点和目标节点的引用,port1和port2是源端口和目标端口的名字。

使用instance.connect方法来创建连接。 overlays用来添加连接线的箭头效果或者其他风格,我这里没有使用,因为觉得都不是很好看。大家如果要用,只要把overlays加入到instance.connect的方法参数就可以了。

调用以上方法来创建节点,端点和连接线。

var node1 = addNode('container-id','node1', 'node1', {x:'80px',y:'20px'});
var node2 = addNode('container-id','node2', 'node2', {x:'280px',y:'20px'});

addPorts(instance, node1, ['out1','out2'],'output');
addPorts(instance, node2, ['in','in1','in2'],'input');

connectPorts(instance, node1, 'out2', node2, 'in');

 

这里我们创建了两个节点,第一个节点有两个输出端口,第二个节点有三个输入端口,然后把第一个节点的out2端口连接到第二个端点的in端口。效果如下:

最后我们给节点增加drag&drop的功能,这样我们就可以拖动这些节点来改变图的布局了。

instance.draggable($('.node'));

 

这里似乎依赖于JQuery-UI,我还不是很清楚。

交互式创建节点

我们已经初步具有了创建图的功能,可是节点的创建必须通过程序,我们希望用交互的方式来创建节点。

通常我们希望有一个tree view的控件,让后通过拖拽来创建对应类型的节点。这里我使用了这个开源的tree view,基于bootstrap https://github.com/jonmiles/bootstrap-treeview

我们先创建一个tree view:

function getTreeData() {
  var tree = [
    {
      text: "Nodes",
      nodes: [
        {
          text: "Node1",
        },
        {
          text: "Node2"
        }
      ]
    }
  ]; 

  return tree;
}
//Initialize Control Tree View
$('#control-panel').treeview({data: getTreeData()});

 

树上有两个节点:

然后我实现从树上拖拽对应的节点,到流程图上的逻辑。

//Handle drag and drop
$('.list-group-item').attr('draggable','true').on('dragstart', function(ev){
  //ev.dataTransfer.setData("text", ev.target.id);
  ev.originalEvent.dataTransfer.setData('text',ev.target.textContent);
  console.log('drag start');
});

$('#container-id').on('drop', function(ev){
  //avoid event conlict for jsPlumb
  if (ev.target.className.indexOf('_jsPlumb') >= 0 ) {
    return;
  }

  ev.preventDefault();
  var mx = '' + ev.originalEvent.offsetX + 'px';
  var my = '' + ev.originalEvent.offsetY + 'px';

  console.log('on drop : ' + ev.originalEvent.dataTransfer.getData('text'));
  var uid = new Date().getTime();
  var node = addNode('flow-panel','node' + uid, 'node', {x:mx,y:my});
  addPorts(instance, node, ['out'],'output');
  addPorts(instance, node, ['in1','in2'],'input');
  instance.draggable($(node));
}).on('dragover', function(ev){
  ev.preventDefault();
  console.log('on drag over');
});

 

这里要注意的是要避免和jsPlumb拖拽端点的逻辑冲突,当检测到target是jsPlumb对象是需要直接从drop方法中退出以执行对应的jsPlumb的drop逻辑。

好了,一个绘制流程图的软件工具初步完工。

我把代码放在oschina的代码托管服务上了, 大家有兴趣可以下来试试 http://git.oschina.net/gangtao/FlowChart-Builder

在我们的开发过程中,经常会需要对我们开发的程序做性能分析,有很多性能分析的工具,很多语言都提供了不同的profiling工具,这些工具很有用,提供了程序运行的原始记录数据,通过对这些数据的分析,可以得到程序运行的性能状况,找到问题所在。然而,这样的工具手机的数据比较原始,往往还需要一些更进一步的分析,才能定位问题。

Splunk是一个可以运行在不同平台上的机器数据的实时运维平台,所谓机器数据,就是指机器产生的数据,其中一个常见的场景就是日志。对于广大程序员来说,分析日志是一个非常常见,而且繁琐的工作,而且很多时候,必须通过日志来对程序进行调试,例如多线程的情况。记得以前为了几百兆或者几G的日志进行分析,不得不写了logViewer来分析。现在有了Splunk,真的极大的简化了对日志分析的工作。(注Splunk免费版支持每天500M的日志数据,超过这个额度需要收费)

通过日志进行性能测试是非常常见的,传统的也是在要分析的代码处,注入性能日志,然后在程序运行后,对写入的性能数据进行分析。使用Splunk,方法是一样的,但是有以下明显的改进

  1. Splunk提供大量友好的分析命令和图表,无需另行开发分析日志的程序

  2. Splunk可以实时的对应用程序作分析,可以在程序的运行过程中,一边运行,一边分析

我下面举一个我碰到的例子。

我要分析的程序是一个从AWS CloudWatch收集数据的Python程序。收集数据使用的是AWS提供的Restful API (Boto),为了更高效的收集数据,程序使用多个线程来调用Restful API 的Query接口。我希望通过性能日志了解每一个请求大概的耗时,以决定使用多少个线程数和对应的采集间隔。

首先,需要写日志:

logger.log(logging.DEBUG, "PerfLog=QueryStart" )
## Query Code Goes Here
do_query_aws()
## Query Complete
logger.log(logging.DEBUG, "PerfLog=QueryEnd, Query Result)

 

注意使用Name=Value的形式可以帮助Splunk在搜索时,提取要分析的字段。

然后运行程序,程序运行以后会生成日志文件,把该日志文件导入到Splunk,开始分析。

点击Add Data按钮,然后跟随Splunk的指导,选择A file or directory of files. 导入你的日志文件,导入过程中,Splunk会要求给你的日志文件命名一个sourcetype,我用的是“cloud_watch_debug”

导入好以后就可以开始搜索了。

在搜索框中输入

sourcetype="cloud_watch_debug”

 

Splunk会实时的返回所有的日志文件,并按时间解析为一个个的事件。

Splunk的SPL(Splunk Search Language)是一个类似SQL和UNIX Command的综合体,可以对数据进行搜索,分析,统计,生成图表,支持管道,使用起来非常方便,建议大家通过官方文档来了解。

我么今天要做的是性能分析,那么我就是要统计一下,发了多少个query,每一个query用了多少时间。

每一条日志的内容大致如下

2014-08-11 10:52:40,587 DEBUG pid=3742 tid=QueryWorkerThread-1 file=aws_cloudwatch.py:_main_work_loop:469 | PerfLog = QueryStart

 

Splunk能够提取出大量的信息和字段(field),包括事件,pid,tid,file等等,还有我们在日志中加入的字段PerfLog。

想要知道每一个查询所花费的时间,可以通过Splunk提供的transaction命令。

sourcetype=cloud_watch_debug | transaction tid startswith="QueryStart" endswith="QueryEnd"

 

  • tid表示每一个transaction需要有相同的tid,也就是说同一个线程

  • startwith和endwith表示transaction的其实和结束标志

该命令返回所有的query的transaction
 

然后我们就可以统计每一个transaction所用的时间

sourcetype=cloud_watch_debug | 
transaction tid startswith="QueryStart" endswith="QueryEnd" | 
stats sum(duration),count, avg(duration),max(duration),min(duration)

 

  • stats命令用于对数据进行统计

  • duration是Splunk对transaction生成的事件跨度

  • sum,count,avg,max,min是统计命令

运行结果如下:

程序一共发送了111075个cloudwatch的请求,最慢的需要2.5秒,最快的0.06秒,平均大概0.11秒。

我还想知道query的耗时随时间的变化,我可以生成一个timechart

sourcetype=cloud_watch_debug | 
transaction tid startswith="QueryStart" endswith="QueryEnd" | 
timechart avg(duration)

 

结果如下(最近1小时):

通过该分析在过去的一个小时里10:30和11:00之后的十分钟时间段,耗时略有上升,大概峰值0.2秒。

 

总结:

Splunk的日志分析功能非常强大,而且500M的免费版基本能够满足大部分程序员对程序日志的分析要求。有效地使用Splunk来进行日志分析,可以做到事半功倍,小伙伴们快来试用吧!

在大数据时代,数据可视化是一个非常热门的话题。各个BI的厂商无不在数据可视化领域里投入大量的精力。Tableau凭借其强大的数据可视化的功能成为硅谷炙手可热的上市公司。Tableau的数据可视化的产品,其理论基础其实是《The Grammar of Graphic》,该书提出了对信息可视化的图表的语法抽象体系,数据的探索和分析可以由图像的语法来驱动,而非有固定的图表类型来驱动,使得数据的探索过程变得友好而有趣。

然而对于The Grammar of Graphic的理论的实践,并非Tableau独占,ggplot作为R语言上得一个图形库,其理论基础也是这本书。(注,笔者曾就职的BI巨头,主要职责也是数据可视化,我们曾经和加拿大团队研发过类似的产品,基于HTML5和D3,可惜未能推向市场)

现在越来越多的人开始使用python来做数据分析,IPython Notebook尤其令人喜爱,它的实时交互把脚本语言的优势发挥到极致。那么怎样才能在IPython Notebook中使用ggplot呢?我这里跟大家分享三种不同的方式供大家选择。

RPy2

第一种方式是使用rpy2, rpy2是对rpy的改写和重新设计,旨在提供Python用户在python中使用R的API。

rpy2提供了对R语言的对象和方法的基本封装,当然也包括可视化的图库这一块。

下面就是一段运行ggplot的R程序使用rpy2在python中运行的例子:

from rpy2 import robjects
from rpy2.robjects import Formula, Environment
from rpy2.robjects.vectors import IntVector, FloatVector
from rpy2.robjects.lib import grid
from rpy2.robjects.packages import importr, data
import rpy2.robjects.lib.ggplot2 as ggplot2

# The R 'print' function
rprint = robjects.globalenv.get("print")
stats = importr('stats')
grdevices = importr('grDevices')
base = importr('base')
datasets = importr('datasets')

mtcars = data(datasets).fetch('mtcars')['mtcars']

pp = ggplot2.ggplot(mtcars) + \
     ggplot2.aes_string(x='wt', y='mpg', col='factor(cyl)') + \
     ggplot2.geom_point() + \
     ggplot2.geom_smooth(ggplot2.aes_string(group = 'cyl'),
                         method = 'lm')
pp.plot()

以上程序在IPython Notebook中运行会有缺陷,会弹出一个新的窗口显示图,而且该python进程会阻塞在那里。我们希望图表能内嵌在IPython Notebook的页面中,为了解决该问题,我们引入如下代码:

%matplotlib inline

import uuid
from rpy2.robjects.packages import importr 
from IPython.core.display import Image

grdevices = importr('grDevices')
def ggplot_notebook(gg, width = 800, height = 600):
    fn = '{uuid}.png'.format(uuid = uuid.uuid4())
    grdevices.png(fn, width = width, height = height)
    gg.plot()
    grdevices.dev_off()
    return Image(filename=fn)

运行上述代码后,我们把ggplot的调用pp.plot()改为调用ggplot_notebook(pp, height=300)就能成功嵌入显示ggplot的结果。

RMagic

另一种方式是使用rmagic,rmagicy实际上依赖于rpy2。它的使用方式更像是直接在使用R

%load_ext rmagic
library(ggplot2)
dat <- data.frame(x = rnorm(10), y = rnorm(10), 
                  lab = sample(c('A', 'B'), 10, replace = TRUE))
x <- ggplot(dat, aes(x = x, y = y, color = lab)) + geom_point()
print(x)

运行结果如下

ggplot for python

ggplot是一个python的库,基本上是对R语言ggplot的功能移植到Python上。

运行安装脚本

pip install ggplot

安装成功后,可以试一下这个例子

%matplotlib inline
import pandas as pd
from ggplot import *
meat_lng = pd.melt(meat[['date', 'beef', 'pork', 'broilers']], id_vars='date')
ggplot(aes(x='date', y='value', colour='variable'), data=meat_lng) + \
    geom_point() + \
    stat_smooth(color='red')

结果如下:


总结

本文提供了三种不同的方式在Python(IPython Notebook)中调用ggplot。

rpy2和Rmagic都是一种对R的桥接,所以都需要安装R。不同之处在于rpy2提供Python接口而Rmagic更接近R。

ggplot Python库是ggplot的Python移植,所以无需安装R,部署起来更为简单,但功能上也许和R的ggplot还有差距。

大家可以根据自己的需要做出选择。

我们的目标是在Mac OS上获取一个静态服务器的内容,通常用wget是一个很好的选择。

wget是一个命令行工具用于从网络服务器来获取内容。但是在Mac OS X(Mountain Lion/ Mavericks / Snow Leopard)上没有提供该工具,但是有curl。

wget VS curl

curl

  • 基于跨平台的库libcurl

  • 支持unix管道

  • 返回错误代码来支持错误处理

  • 只返回单个url的内容,不支持自动取链接的内容

  • 大量协议支持诸如 :FTP, FTPS, HTTP, HTTPS, SCP, SFTP, TFTP, TELNET, DICT, LDAP, LDAPS, FILE, POP3, IMAP, SMTP, RTMP and RTSP

  • 可移植性好

  • 支持不同的SSL/TSL库

  • 支持HTTP认证(HTTP Authentication)

  • 支持双向和多部分提交数据

  • 支持压缩

  • MIT协议

wget

  • 只支持命令行

  • 支持递归的抓取数据,也就是说可以抓取返回内容中的url链接的内容。

  • 非常古老,开发不活跃。

  • 使用HTTP 1.0

  • GNU项目的一部分

  • GPL 协议

总体而言curl比wget要进步许多,可是要获取一个网站的镜像,迭代功能必不可少。只好自己动手,在Mac上构建一个wget。

构建wget

首先确定你已经安装了Xcode和GCC,如果不知道如何安装,可以参考这个链接

然后从gnu下载wget的源码

curl -O http://ftp.gnu.org/gnu/wget/wget-1.15.tar.gz

下载好后,解压缩

tar -xvf wget-1.15.tar.gz

解压缩好后,需要运行配置命令,为编译做准备

cd wget-1.15
./configure --with-ssl=openssl

这里我们选用openssl作为ssl的参数选项。大家一定不会忘记最近发生的openssl的heartbleed漏洞吧 :)

配置好了以后,运行make

make

这里不出意外会跳出一大堆的警告,不要担心,如果你看到如下的内容,你应该编译成功了

... ...
... ...
gcc  -O2 -Wall   -o wget cmpt.o connect.o convert.o cookies.o ftp.o css_.o css-url.o ftp-basic.o ftp-ls.o hash.o host.o html-parse.o html-url.o http.o init.o log.o main.o netrc.o progress.o ptimer.o recur.o res.o retr.o spider.o url.o warc.o utils.o exits.o build_info.o  version.o ftp-opie.o openssl.o http-ntlm.o ../lib/libgnu.a -liconv  -lssl -lcrypto -lz -ldl -lz -lz
Making all in doc
./texi2pod.pl -D VERSION="1.15" ./wget.texi wget.pod
/usr/bin/pod2man --center="GNU Wget" --release="GNU Wget 1.14" wget.pod > wget.1
Making all in po
Making all in tests
make[2]: Nothing to be done for `all'.
Making all in util
make[2]: Nothing to be done for `all'.
make[2]: Nothing to be done for `all-am'.

最后,安装

sudo make install

安装成功后,试一试wget是否成功安装

$ which wget
/usr/local/bin/wget

如果看到上述结果说明wget已经成功构建并部署到/usr/local/bin目录了

好了,万事具备,可以开始抓取你想要获得内容的网站了。

wget -mk http://website.com

其中-m参数表示迭代的抓取,-k参数表示用相对路径取代绝对路径。抓取的内容会被存放在本地的website.com的目录下。

举个例子,比如我要抓新浪新闻

$ wget -mk http://news.sina.com.cn
--2014-06-30 16:55:26--  http://news.sina.com.cn/
Resolving news.sina.com.cn... 58.63.236.31, 58.63.236.46, 58.63.236.48, ...
Connecting to news.sina.com.cn|58.63.236.31|:80... connected.
HTTP request sent, awaiting response... 200 OK
Length: 636992 (622K) [text/html]
Saving to: ‘news.sina.com.cn/index.html’

100%[======================================>] 636,992      391KB/s   in 1.6s   

2014-06-30 16:55:29 (391 KB/s) - ‘news.sina.com.cn/index.html’ saved [636992/636992]

Loading robots.txt; please ignore errors.
--2014-06-30 16:55:29--  http://news.sina.com.cn/robots.txt
Reusing existing connection to news.sina.com.cn:80.
HTTP request sent, awaiting response... 200 OK
Length: 70 [text/plain]
Saving to: ‘news.sina.com.cn/robots.txt’

100%[======================================>] 70          --.-K/s   in 0.03s   

2014-06-30 16:55:29 (2.54 KB/s) - ‘news.sina.com.cn/robots.txt’ saved [70/70]

--2014-06-30 16:55:29--  http://news.sina.com.cn/js/792/2012-08-09/41/headnews.js
Reusing existing connection to news.sina.com.cn:80.
HTTP request sent, awaiting response... 200 OK
Length: 31699 (31K) [application/x-javascript]
Saving to: ‘news.sina.com.cn/js/792/2012-08-09/41/headnews.js’

100%[======================================>] 31,699      --.-K/s   in 0.04s   

2014-06-30 16:55:29 (731 KB/s) - ‘news.sina.com.cn/js/792/2012-08-09/41/headnews.js’ saved [31699/31699]

--2014-06-30 16:55:29--  http://news.sina.com.cn/pfpnews/js/libweb.js
Reusing existing connection to news.sina.com.cn:80.
HTTP request sent, awaiting response... 200 OK
Length: 6554 (6.4K) [application/x-javascript]
Saving to: ‘news.sina.com.cn/pfpnews/js/libweb.js’

100%[======================================>] 6,554       --.-K/s   in 0.03s

抓成功后的目录如下

注意:

  • 这样的方式仅适用于静态网站,对于使用动态代码生成的网站无能为力

  • 地址转换是发生在所有内容抓取完成之后,如果你中途退出,则所有的地址链接仍然指向原始地址。当你所要抓取的内容巨大时,需要非常小心。

  • 为了防止流量过大,对服务器造成太大的负担,可以使用-w参数设置两个请求中的间隔时间


大数据时代,数据的异常分析被广泛的用于各个场合。 今天我们就来看一看其中的一种场景,对于单变量数据集的异常检测。

所谓单变量,就是指数据集中只有一个变化的值,下面我们来看看今天我们要分析的的数据,点击这里数据文件下载数据文件。

分析数据的第一步是要加载文件, 本文使用了numpy,pandas,scikit learn等常见的数据分析要用到的Python库。

import numpy as np
import pandas as pd
df = pd.read_csv("farequote.csv")

Pandas 是一个常用的数据分析的Python库,提供对数据的加载,清洗,抽取,变形等操作。Pandas依赖numpy,numpy提供了基于列/多维数组(List/N-D Array)的数据结构的操作。许多科学计算和数据分析的库都依赖于numpy。

df 是Pandas中常用的数据类型dataframe,dataframe类似与一个数据库的表,使用 df.head()可以得到数据的头几行,以便了解数据的概貌。

该数据结构中,第一列式Pandas添加的索引,第一行是每一列数据的名字,除了第一列,每一列数据可以看成是一个变量,所以该数据集共有三个变量,时间(_time)、航空公司名称(airline)、响应时间(responsetime)。我们可以这样理解,该数据集记录了一段时间内,各个航空公司飞机延误的时间。我们希望通过分析找出是否存在异常的情况。

注意,我们是要分析单变量,所以所有的分析都是基于某一个航空公司的数据,所以就需要对该数据集做一个查询,找出要分析的航空公司。首先要知道有哪些航空公司,使用np.unique(df.airline)可以找到所有的航空公司代码,类似SQL的Unique命令

array(['AAL', 'ACA', 'AMX', 'ASA', 'AWE', 'BAW', 'DAL', 'EGF', 'FFT',
       'JAL', 'JBU', 'JZA', 'KLM', 'NKS', 'SWA', 'SWR', 'TRS', 'UAL', 'VRD'], 
      dtype='|S3')

查询某个航空公司的数据使用dataframe的query方法,类似SQL的select。Query返回的结果仍然是一个dataframe对象。

dd = df.query('airline=="KLM"') ## 得到法航的数据

我们先了解一下数据的大致信息,使用describe方法

dd.responsetime.describe()

得到如下的结果:

count    1724.000000
mean     1500.613766
std       100.085320
min      1209.766800
25%      1434.084625
50%      1499.135000
75%      1567.831025
max      1818.774100
Name: responsetime, dtype: float64

该结果返回了数据集responsetime维度上的主要统计指标,个数,均值,方差,最大最小值等等,也可以调用单独的方法例如min(),mean()等来获得某一个指标。

基于标准差得异常检测

下面我们就可以开始异常点的分析了,对于单变量的异常点分析,最容易想到的就是基于标准差(Standard Deviation)的方法了。我们假定数据的正态分布的,利用概率密度函数,我们知道

  • 95.449974面积在平均数左右两个标准差的范围内

  • 99.730020%的面积在平均数左右三个标准差的范围内

  • 99.993666的面积在平均数左右三个标准差的范围内

所以我们95%也就是大概两个标准差为门限,凡是落在门限外的都认为是异常点。代码如下

def a1(dataframe, threshold=.95):
    d = dataframe['responsetime']
    dataframe['isAnomaly'] = d > d.quantile(threshold)  
    return dataframe
print a1(dd)

运行以上程序我们得到如下结果

                             _time airline  responsetime isAnomaly
20    2013-02-01T23:57:59.000-0700     KLM     1481.4945     False
76    2013-02-01T23:52:34.000-0700     KLM     1400.9050     False
124   2013-02-01T23:47:10.000-0700     KLM     1501.4313     False
203   2013-02-01T23:39:08.000-0700     KLM     1278.9509     False
281   2013-02-01T23:32:27.000-0700     KLM     1386.4157     False
336   2013-02-01T23:26:09.000-0700     KLM     1629.9589     False
364   2013-02-01T23:23:52.000-0700     KLM     1482.5900     False
448   2013-02-01T23:16:08.000-0700     KLM     1553.4988     False
511   2013-02-01T23:10:39.000-0700     KLM     1555.1894     False
516   2013-02-01T23:10:08.000-0700     KLM     1720.7862      True
553   2013-02-01T23:06:29.000-0700     KLM     1306.6489     False
593   2013-02-01T23:03:03.000-0700     KLM     1481.7081     False
609   2013-02-01T23:01:29.000-0700     KLM     1521.0253     False
666   2013-02-01T22:56:04.000-0700     KLM     1675.2222      True
...   ...   ...   ...

结果数据集上多了一列isAnomaly用来标记每一行记录是否是异常点,我们看到已经有一些点被标记为异常点了。

我们看看程序的详细内容:

  1. 方法a1定义了一个异常检测的函数

  2. dataframe['responsetime']等价于dataframe.responsetime,该操作取出responsetime这一列的值

  3. d.quantile(threshold)用正态分布假定返回位于95%的点的值,大于该值得点都落在正态分布95%之外

  4. d > d.quantile(threshold)是一个数组操作,返回的新数组是responsetime和threshold的比较结果,[False,False,True,... ... False]

  5. 然后通过dataframe的赋值操作增加一个新的列,标记所有的异常点。

数据可视化往往是数据分析的最后一步,我们看看结果如何:

import matplotlib.pyplot as plt
da = a1(dd)
fig = plt.figure()
ax1 = fig.add_subplot(2, 1, 1)
ax2 = fig.add_subplot(2, 1, 2)
ax1.plot(da['responsetime'])
ax2.plot(da['isAnomaly'])

这异常点也太多了,用99%在试试:

现在似乎好一点,然而我们知道,对于数据集的正态分布的假定往往是不成立的,假如数据分布在大小两头,那么这样的异常检测就很难奏效了。我们看看其他一些改进的方法。

基于ZSCORE的异常检测

zscore的计算如下

sd是标准差,X是均值。一般建议门限值取为3.5

代码如下:

def a2(dataframe, threshold=3.5):
    d = dataframe['responsetime']
    zscore = (d - d.mean())/d.std()
    dataframe['isAnomaly'] = zscore.abs() > threshold
    return dataframe

另外还有一种增强的zscore算法,基于MAD。MAD的定义是

其中X是中位数。

增强的zscore算法如下:

def a3(dataframe, threshold=3.5):
    dd = dataframe['responsetime']
    MAD = (dd - dd.median()).abs().median()
    zscore = ((dd - dd.median())* 0.6475 /MAD).abs()
    dataframe['isAnomaly'] = zscore > threshold
    return dataframe

用zscore算法得到:

调整门限为3得到

如果换一组数据AAL,结果会怎么样呢?

我们发现有一段时间,所有的响应都很慢,我们想要把这些点都标记为异常,可能么?

基于KMEAN聚集的异常检测

通常基于KMEAN的聚集算法并不适用于异常点检测,以为聚集算法总是试图平衡每一个聚集中的点的数目,所以对于少数的异常点,聚集非常不好用,但是我们这个例子中,异常点都聚在一起,所以应该可以使用。

首先,为了看清聚集,我们使用时间序列的常用分析方法,增加一个维度,该维度是每一个点得前一个点得响应时间。

preresponse = 0
newcol = []
newcol.append(0)
for index, row in dd.iterrows():
    if preresponse != 0:
        newcol.append(preresponse)
    preresponse = row.responsetime
dd["t0"] = newcol
plt.scatter(dd.t0,dd.responsetime)

我们利用iterrows来循环数据,把前一个点的响应时间增加到当前点,第一个点的该值为0,命名该列为t0。然后用scatter plot把它画出来。

上面是法航KLM的数据,其中最左边的点是一个无效的点,因为前一个点的响应时间不知道所以填了0,分析时应该过滤该店。

对于AAL,我们可以清楚的看到两个聚集:

其中右上方的聚集,也就是点数目比较少得聚集就是我们希望检测到的异常点得集合。

我们看看如何使用KMEAN算法来检测吧:

def a4(dataframe, threshold = .9):
    ## add one dimention of previous response
    preresponse = 0
    newcol = []
    newcol.append(0)
    for index, row in dataframe.iterrows():
        if preresponse != 0:
            newcol.append(preresponse)
        preresponse = row.responsetime
    dataframe["t0"] = newcol
    ## remove first row as there is no previous event for time
    dd = dataframe.drop(dataframe.head(1).index) 
    clf = cluster.KMeans(n_clusters=2)
    X=np.array(dd[['responsetime','t0']])
    cls = clf.fit_predict(X)
    freq = itemfreq(cls)
    (A,B) = (freq[0,1],freq[1,1])
    t = abs(A-B)/max(A,B)
    if t > threshold :
        ## "Anomaly Detected!"
        index = freq[0,0]
        if A > B :
            index = freq[1,0]
        dd['isAnomaly'] = (cls == index)
    else :
        ## "No Anomaly Point"
        dd['isAnomaly'] = False
    return dd

其核心代码是以下这几行:

clf = cluster.KMeans(n_clusters=2)
X=np.array(dd[['responsetime','t0']])
cls = clf.fit_predict(X)

cluster.KMeans返回一个预测模型,我们假定有两个聚集。你可以试着加大聚集的数量,结果没什么影响。

dd[['responsetime','t0']]返回一个2*n的数组,并赋值给X,用于聚集计算。

fit_pridict方法是对X做聚集运算,并计算每一个点对应的聚集编号。

freq = itemfreq(cls)

itemfreq返回聚集结果中每一个聚集的发生频率,如果其中一个比另一个显著地多,我们则认为那个少得是异常点聚集。

用该方法可以把所有聚集里的点标记为异常点。

这里我用红色标记结果让大家看的清楚一点,注意因为是line chart,连个竖线间的都是异常点。

总结

除了上述的算法,还有其它一些相关的算法,大家如果对背后的数据知识有兴趣的话,可以参考这篇相关介绍

单变量的异常检测算法相对比较简单,但是要做到精准检测就更难,因为掌握的信息更少。另外boxplot也经常被用于异常检测,他和基于方差的异常检测是一致的,只不过用图形让大家一目了然的获得结果,大家有兴趣可以了解一下。