如今是云的时代,许多公司都把自己的IT架构部署在基础架构云(IaaS)上。著名的IaaS提供商有亚马逊,微软(Azure),IBM等,国内也有诸如阿里云等。这里亚马逊毫无疑问是该市场的领军者。

AWS提供了非常多的服务,领先了竞争对手一大截。并且AWS提供非常丰富的API,其API基于Rest,所以很容易被不同的语言的平台来调用。

在如今的大数据时代,利用数据在做决策是大数据的核心价值,AWS提供了许多服务来获取其运行数据cloudtrail和cloudwatch是经常被用到的两个。CloudTrail是对AWS的所有API调用的日志,CloudWatch是监控AWS服务的性能数据。(新出的Config服务可用于监控AWS的资源变化)

今天我们来看看如何使用Python(Boto AWS的开源Python SDK)来自动配置ClouTrail的服务并获取日志内容。

我们先来看看CloudTrail的概念和相关的配置。

  • S3 Bucket

    在打开CloudTrail的服务时,需要指定一个相关的S3的Bucket,S3是亚马逊提供的存储服务,你可以把它当作一个基于云的文件系统。CloudTrail的API调用日志,会以压缩文件的形式,存储在你指定的Bucket里。

  • SNS

    SNS是亚马逊提供的通知服务,该服务使用的是订阅/发布(Subsrcibe/Publish)的模式。在创建CloudTrail的时候,可以关联一个SNS的Topic(可选),这样做的好处是当有API调用时,可以第一时间得到通知。可以使用不同的客户端来订阅SNS的通知,例如Email,Mobile的Notification Service,SQS等

  • SQS

    SQS是亚马逊提供的队列服务,在本文中,我们使用SQS订阅SNS的的内容,这样我们的Python程序就可以从SQS的队列中获取相应的通知。


配置CloudTrail

首先我们需要创建SNS,并指定相应的策略。代码如下:

import boto.sns
import json

key_id='yourawskeyid'
secret_key='yourawssecretkey'

region_name="eu-central-1"
trail_topic_name="topicABC"
sns_policy_sid="snspolicy0001"

sns_conn = boto.sns.connect_to_region(region_name,
                                         aws_access_key_id=key_id,
                                         aws_secret_access_key=secret_key)

sns_topic = sns_conn.create_topic(trail_topic_name)

# Get ARN of SNS topic
sns_arn = sns_topic['CreateTopicResponse']['CreateTopicResult']['TopicArn']

# Add related policy
attrs = sns_conn.get_topic_attributes(sns_arn)
policy = attrs['GetTopicAttributesResponse']['GetTopicAttributesResult']['Attributes']['Policy']
policy_obj = json.loads(policy)
statements = policy_obj['Statement']

default_statement = statements[0]
new_statement = default_statement.copy()
new_statement["Sid"] = sns_policy_sid
new_statement["Action"] = "SNS:Publish"
new_statement["Principal"] = {
        "AWS": [
          "arn:aws:iam::903692715234:root",
          "arn:aws:iam::035351147821:root", 
          "arn:aws:iam::859597730677:root",
          "arn:aws:iam::814480443879:root",
          "arn:aws:iam::216624486486:root",
          "arn:aws:iam::086441151436:root",
          "arn:aws:iam::388731089494:root",
          "arn:aws:iam::284668455005:root",
          "arn:aws:iam::113285607260:root"
        ]
      }
new_statement.pop("Condition", None)
statements.append(new_statement)
new_policy = json.dumps(policy_obj)
sns_conn.set_topic_attributes(sns_arn,"Policy",new_policy)

CloudTrail是和区域(Region)相关的,不同的Region有不同的CloudTrail服务,所以,在创建对应的SNS时,需要保证使用同一个Region。

这里要注意的是我们创建了新的policy来使得CloudTrail拥有向我们创建的SNS发布消息(Action=“SNS:Publish”)的权限。我们的做法是从缺省的策略中拷贝了一份,修改了相应的Action和Sid(随便取一个不重复的名字),Principal部分是一个缺省的account的列表,这里是硬编码,AWS有可能会修改该列表的值,但在当前环境下,该值是固定的。最后移除Condition的值。把新创建的Policy片段添加到原来的Policy中就好了。

然后我们需要创建一个SQS的队列,并订阅我们创建的SNS的Topic。这一步相对比较简单。

import boto.sqs

sqs_queue_name="sqs_queue"
sqs_conn = boto.sqs.connect_to_region(region_name,
                                         aws_access_key_id=key_id,
                                         aws_secret_access_key=secret_key)
sqs_queue = sqs_conn.create_queue(sqs_queue_name)
sns_conn.subscribe_sqs_queue(sns_arn, sqs_queue)

然后,我们需要创建一个S3的Bucket用来存储CloudTrail产生的日志文件。同样的,需要指定响应的策略以保证CloudTrail能够有权限写入对应的日志文件。

import boto

bucket_name="bucket000"
policy_sid="testpolicy000"
s3_conn = boto.connect_s3(aws_access_key_id=key_id,aws_secret_access_key=secret_key)
bucket = s3_conn.create_bucket(bucket_name)
bucket_policy = '''{
	"Version": "2012-10-17",
	"Statement": [
		{
			"Sid": "%Sid%GetPolicy",
			"Effect": "Allow",
			"Principal": {
				"AWS": [
					"arn:aws:iam::903692715234:root",
					"arn:aws:iam::035351147821:root",
					"arn:aws:iam::859597730677:root",
					"arn:aws:iam::814480443879:root",
					"arn:aws:iam::216624486486:root",
					"arn:aws:iam::086441151436:root",
					"arn:aws:iam::388731089494:root",
					"arn:aws:iam::284668455005:root",
					"arn:aws:iam::113285607260:root"
				]
			},
			"Action": "s3:GetBucketAcl",
			"Resource": "arn:aws:s3:::%bucket_name%"
		},
		{
			"Sid": "%Sid%PutPolicy",
			"Effect": "Allow",
			"Principal": {
				"AWS": [
					"arn:aws:iam::903692715234:root",
					"arn:aws:iam::035351147821:root",
					"arn:aws:iam::859597730677:root",
					"arn:aws:iam::814480443879:root",
					"arn:aws:iam::216624486486:root",
					"arn:aws:iam::086441151436:root",
					"arn:aws:iam::388731089494:root",
					"arn:aws:iam::284668455005:root",
					"arn:aws:iam::113285607260:root"
				]
			},
			"Action": "s3:PutObject",
			"Resource": "arn:aws:s3:::%bucket_name%/*",
			"Condition": {
				"StringEquals": {
					"s3:x-amz-acl": "bucket-owner-full-control"
				}
			}
		}
	]
}'''
bucket_policy = bucket_policy.replace("%bucket_name%",bucket_name)
bucket_policy = bucket_policy.replace("%Sid%",policy_sid)
bucket.set_policy(bucket_policy)

这里我们使用一个缺省的Policy文件,替换掉响应的字段就好了。

最后,我们创建CloudTrail的服务:

import boto.cloudtrail

trail_name="Trailabc"
log_prefix="log"

cloudtrail_conn=boto.cloudtrail.connect_to_region(region_name,
                                         aws_access_key_id=key_id,
                                         aws_secret_access_key=secret_key)

##cloudtrail_conn.describe_trails()
cloudtrail_conn.create_trail(trail_name,bucket_name, s3_key_prefix=log_prefix,sns_topic_name=trail_topic_name)
cloudtrail_conn.start_logging(trail_name)

好了,现在CloudTrail已经配置好了,并且关联的SNS也被我们创建的SQS队列订阅,下面我们就可以抓取日志了

获取日志数据

每当有一个API调用,CloudTrail都会把响应的日志文件写入到S3我们创建的Bucket中,同时在我们在创建的SNS的topic中发布一条消息,因为我们使用SQS的队列订阅了该消息,所以我们可以通过读取SQS消息的方式来获得日志数据。

首先连接到SQS的队列,并从中读取消息

import boto.sqs

sqs_queue_name="sqs_queue"
sqs_conn = boto.sqs.connect_to_region(region_name,
                                         aws_access_key_id=key_id,
                                         aws_secret_access_key=secret_key)
                                         
sqs_queue = sqs_conn.get_queue(sqs_queue_name)
notifications = sqs_queue.get_messages()

然后我们从消息中获得响应的日志文件在S3中的地址,并利用该地址从S3中获得对应的日志文件

for notification in notifications:
    envelope = json.loads(notification.get_body())
    message = json.loads(envelope['Message'])
    bucket_name = message['s3Bucket']
    s3_bucket = s3_conn.get_bucket(bucket_name)
    for key in message['s3ObjectKey']:
        s3_file = s3_bucket.get_key(key)
        with io.BytesIO(s3_file.read()) as bfile:
            with gzip.GzipFile(fileobj=bfile) as gz:
                logjson = json.loads(gz.read())

logjson就是对应的日记内容的JSON格式。这里有一个例子

{
    "Records": [{
        "eventVersion": "1.0",
        "userIdentity": {
            "type": "IAMUser",
            "principalId": "EX_PRINCIPAL_ID",
            "arn": "arn:aws:iam::123456789012:user/Alice",
            "accessKeyId": "EXAMPLE_KEY_ID",
            "accountId": "123456789012",
            "userName": "Alice"
        },
        "eventTime": "2014-03-06T21:22:54Z",
        "eventSource": "ec2.amazonaws.com",
        "eventName": "StartInstances",
        "awsRegion": "us-west-2",
        "sourceIPAddress": "205.251.233.176",
        "userAgent": "ec2-api-tools 1.6.12.2",
        "requestParameters": {
            "instancesSet": {
                 "items": [{
                      "instanceId": "i-ebeaf9e2"
                }]
            }
        },
        "responseElements": {
            "instancesSet": {
                "items": [{
                      "instanceId": "i-ebeaf9e2",
                      "currentState": {
                          "code": 0,
                          "name": "pending"
                      },
                      "previousState": {
                          "code": 80,
                          "name": "stopped"
                      }
                    }]
            }
        }
    },
    ... additional entries ...
	]
}

你可以使用以上代码来监控所有的cloudtrail的日志,拿到的JSON格式的日志可以放在你的数据库(Mongo不错)中,然后利用你的BI工具做分析。

注意你也可以不创建SNS和SQS,直接扫描bucket的内容,这样做的好处是配置更简单,缺点是实时性比较差,扫面Bucket需要额外的计算,并且需要在本地保存文件扫描的状态,code会更加复杂。

利用CloudTrail的日志,你可以做很多事情,比如看看有没有非法的登陆,各个服务的使用频率,总之,当你有了足够多的数据,你就可以从中发现足够的价值。


使用Python进行并发编程

2015年3月20日 05:46

让计算机程序并发的运行是一个经常被讨论的话题,今天我想讨论一下Python下的各种并发方式。

并发方式

线程(Thread

多线程几乎是每一个程序猿在使用每一种语言时都会首先想到用于解决并发的工具(JS程序员请回避),使用多线程可以有效的利用CPU资源(Python例外)。然而多线程所带来的程序的复杂度也不可避免,尤其是对竞争资源的同步问题。

然而在python中由于使用了全局解释锁(GIL)的原因,代码并不能同时在多核上并发的运行,也就是说,Python的多线程不能并发,很多人会发现使用多线程来改进自己的Python代码后,程序的运行效率却下降了,这是多么蛋疼的一件事呀!如果想了解更多细节,推荐阅读这篇文章。实际上使用多线程的编程模型是很困难的,程序员很容易犯错,这并不是程序员的错误,因为并行思维是反人类的,我们大多数人的思维是串行(精神分裂不讨论),而且冯诺依曼设计的计算机架构也是以顺序执行为基础的。所以如果你总是不能把你的多线程程序搞定,恭喜你,你是个思维正常的程序猿:)

Python提供两组线程的接口,一组是thread模块,提供基础的,低等级(Low Level)接口,使用Function作为线程的运行体。还有一组是threading模块,提供更容易使用的基于对象的接口(类似于Java),可以继承Thread对象来实现线程,还提供了其它一些线程相关的对象,例如Timer,Lock

使用thread模块的例子

import thread
def worker():
    """thread worker function"""
    print 'Worker'
thread.start_new_thread(worker)

使用threading模块的例子

import threading
def worker():
    """thread worker function"""
    print 'Worker'
t = threading.Thread(target=worker)
t.start()

 或者Java Style

import threading
class worker(threading.Thread):
    def __init__(self):
        pass
    def run():
        """thread worker function"""
        print 'Worker'
    
t = worker()
t.start()


进程 (Process)

由于前文提到的全局解释锁的问题,Python下比较好的并行方式是使用多进程,这样可以非常有效的使用CPU资源,并实现真正意义上的并发。当然,进程的开销比线程要大,也就是说如果你要创建数量惊人的并发进程的话,需要考虑一下你的机器是不是有一颗强大的心。

Python的mutliprocess模块和threading具有类似的接口。

from multiprocessing import Process

def worker():
    """thread worker function"""
    print 'Worker'
p = Process(target=worker)
p.start()
p.join()

由于线程共享相同的地址空间和内存,所以线程之间的通信是非常容易的,然而进程之间的通信就要复杂一些了。常见的进程间通信有,管道,消息队列,Socket接口(TCP/IP)等等。

Python的mutliprocess模块提供了封装好的管道和队列,可以方便的在进程间传递消息。

Python进程间的同步使用锁,这一点喝线程是一样的。

另外,Python还提供了进程池Pool对象,可以方便的管理和控制线程。


远程分布式主机 (Distributed Node)

随着大数据时代的到临,摩尔定理在单机上似乎已经失去了效果,数据的计算和处理需要分布式的计算机网络来运行,程序并行的运行在多个主机节点上,已经是现在的软件架构所必需考虑的问题。

远程主机间的进程间通信有几种常见的方式

  • TCP/IP

    TCP/IP是所有远程通信的基础,然而API比较低级别,使用起来比较繁琐,所以一般不会考虑

  • 远程方法调用 Remote Function Call

    RPC是早期的远程进程间通信的手段。Python下有一个开源的实现RPyC

  • 远程对象 Remote Object

    远程对象是更高级别的封装,程序可以想操作本地对象一样去操作一个远程对象在本地的代理。远程对象最广为使用的规范CORBA,CORBA最大的好处是可以在不同语言和平台中进行通信。当让不用的语言和平台还有一些各自的远程对象实现,例如Java的RMI,MS的DCOM

    Python的开源实现,有许多对远程对象的支持

  • 消息队列 Message Queue

    比起RPC或者远程对象,消息是一种更为灵活的通信手段,常见的支持Python接口的消息机制有

在远程主机上执行并发和本地的多进程并没有非常大的差异,都需要解决进程间通信的问题。当然对远程进程的管理和协调比起本地要复杂。

Python下有许多开源的框架来支持分布式的并发,提供有效的管理手段包括:

  • Celery 

    Celery是一个非常成熟的Python分布式框架,可以在分布式的系统中,异步的执行任务,并提供有效的管理和调度功能。参考这里

  • SCOOP

    SCOOP Scalable COncurrent Operations in Python)提供简单易用的分布式调用接口,使用Future接口来进行并发。

  • Dispy

    相比起Celery和SCOOP,Dispy提供更为轻量级的分布式并行服务

  • PP 

    PP (Parallel Python)是另外一个轻量级的Python并行服务, 参考这里

  • Asyncoro

    Asyncoro是另一个利用Generator实现分布式并发的Python框架,

当然还有许多其它的系统,我没有一一列出

另外,许多的分布式系统多提供了对Python接口的支持,例如Spark


伪线程 (Pseudo-Thread)

还有一种并发手段并不常见,我们可以称之为伪线程,就是看上去像是线程,使用的接口类似线程接口,但是实际使用非线程的方式,对应的线程开销也不存的。

  • greenlet 

    greenlet提供轻量级的coroutines来支持进程内的并发。

    greenlet是Stackless的一个副产品,使用tasklet来支持一中被称之为微线程(mirco-thread)的技术,这里是一个使用greenlet的伪线程的例子

from greenlet import greenlet

def test1():
    print 12
    gr2.switch()
    print 34
    
def test2():
    print 56
    gr1.switch()
    print 78
    
gr1 = greenlet(test1)
gr2 = greenlet(test2)
gr1.switch()

        运行以上程序得到如下结果:

12
56
34

伪线程gr1 switch会打印12,然后调用gr2 switch得到56,然后switch回到gr1,打印34,然后伪线程gr1结束,程序退出,所以78永远不会被打印。通过这个例子我们可以看出,使用伪线程,我们可以有效的控制程序的执行流程,但是伪线程并不存在真正意义上的并发。

eventlet,gevent和concurence都是基于greenlet提供并发的。

eventlet是一个提供网络调用并发的Python库,使用者可以以非阻塞的方式累调用阻塞的IO操作。

import eventlet
from eventlet.green import urllib2

urls = ['http://www.google.com', 'http://www.example.com', 'http://www.python.org']

def fetch(url):
    return urllib2.urlopen(url).read()

pool = eventlet.GreenPool()

for body in pool.imap(fetch, urls):
    print("got body", len(body))

执行结果如下

('got body', 17629)
('got body', 1270)
('got body', 46949)

eventlet为了支持generator的操作对urllib2做了修改,接口和urllib2是一致的。这里的GreenPool和Python的Pool接口一致。

gevent和eventlet类似,关于它们的差异大家可以参考这篇文章

import gevent
from gevent import socket
urls = ['www.google.com', 'www.example.com', 'www.python.org']
jobs = [gevent.spawn(socket.gethostbyname, url) for url in urls]
gevent.joinall(jobs, timeout=2)

print [job.value for job in jobs]

执行结果如下:

['206.169.145.226', '93.184.216.34', '23.235.39.223']
  • concurence https://github.com/concurrence/concurrence

concurence是另外一个利用greenlet提供网络并发的开源库,我没有用过,大家可以自己尝试一下。


实战运用

通常需要用到并发的场合有两种,一种是计算密集型,也就是说你的程序需要大量的CPU资源;另一种是IO密集型,程序可能有大量的读写操作,包括读写文件,收发网络请求等等。

计算密集型

对应计算密集型的应用,我们选用著名的蒙特卡洛算法来计算PI值。基本原理如下

蒙特卡洛算法利用统计学原理来模拟计算圆周率,在一个正方形中,一个随机的点落在1/4圆的区域(红色点)的概率与其面积成正比。也就该概率 p = Pi * R*R /4  : R* R , 其中R是正方形的边长,圆的半径。也就是说该概率是圆周率的1/4, 利用这个结论,只要我们模拟出点落在四分之一圆上的概率就可以知道圆周率了,为了得到这个概率,我们可以通过大量的实验,也就是生成大量的点,看看这个点在哪个区域,然后统计出结果。

基本算法如下:

from math import hypot
from random import random

def test(tries):
    return sum(hypot(random(), random()) < 1 for _ in range(tries))

这里test方法做了n(tries)次试验,返回落在四分之一圆中的点的个数。判断方法是检查该点到圆心的距离,如果小于R则是在圆上。

通过大量的并发,我们可以快速的运行多次试验,试验的次数越多,结果越接近真实的圆周率。

这里给出不同并发方法的程序代码

  • 非并发

    我们先在单线程,但进程运行,看看性能如何

from math import hypot
from random import random
import eventlet
import time

def test(tries):
    return sum(hypot(random(), random()) < 1 for _ in range(tries))

def calcPi(nbFutures, tries):
    ts = time.time()
    result = map(test, [tries] * nbFutures)
    
    ret = 4. * sum(result) / float(nbFutures * tries)
    span = time.time() - ts
    print "time spend ", span
    return ret

print calcPi(3000,4000)
  • 多线程 thread

    为了使用线程池,我们用multiprocessing的dummy包,它是对多线程的一个封装。注意这里代码虽然一个字的没有提到线程,但它千真万确是多线程。

    通过测试我们开(jing)心(ya)的发现,果然不出所料,当线程池为1是,它的运行结果和没有并发时一样,当我们把线程池数字设置为5时,耗时几乎是没有并发的2倍,我的测试数据从5秒到9秒。所以对于计算密集型的任务,还是放弃多线程吧。

from multiprocessing.dummy import Pool

from math import hypot
from random import random
import time

def test(tries):
    return sum(hypot(random(), random()) < 1 for _ in range(tries))

def calcPi(nbFutures, tries):
    ts = time.time()
    p = Pool(1)
    result = p.map(test, [tries] * nbFutures)
    ret = 4. * sum(result) / float(nbFutures * tries)
    span = time.time() - ts
    print "time spend ", span
    return ret

if __name__ == '__main__':
    p = Pool()
    print("pi = {}".format(calcPi(3000, 4000)))
  • 多进程 multiprocess

    理论上对于计算密集型的任务,使用多进程并发比较合适,在以下的例子中,进程池的规模设置为5,修改进程池的大小可以看到对结果的影响,当进程池设置为1时,和多线程的结果所需的时间类似,因为这时候并不存在并发;当设置为2时,响应时间有了明显的改进,是之前没有并发的一半;然而继续扩大进程池对性能影响并不大,甚至有所下降,也许我的Apple Air的CPU只有两个核?

    当心,如果你设置一个非常打的进程池,你会遇到 Resource temporarily unavailable的错误,系统并不能支持创建太多的进程,毕竟资源是有限的。

from multiprocessing import Pool

from math import hypot
from random import random
import time

def test(tries):
    return sum(hypot(random(), random()) < 1 for _ in range(tries))

def calcPi(nbFutures, tries):
    ts = time.time()
    p = Pool(5)
    result = p.map(test, [tries] * nbFutures)
    ret = 4. * sum(result) / float(nbFutures * tries)
    span = time.time() - ts
    print "time spend ", span
    return ret

if __name__ == '__main__':
    p = Pool()
    print("pi = {}".format(calcPi(3000, 4000)))
  • gevent (伪线程)

    不论是gevent还是eventlet,因为不存在实际的并发,响应时间和没有并发区别不大,这个和测试结果一致。

import gevent
from math import hypot
from random import random
import time

def test(tries):
    return sum(hypot(random(), random()) < 1 for _ in range(tries))

def calcPi(nbFutures, tries):
    ts = time.time()
    jobs = [gevent.spawn(test, t) for t in [tries] * nbFutures]
    gevent.joinall(jobs, timeout=2)
    ret = 4. * sum([job.value for job in jobs]) / float(nbFutures * tries)
    span = time.time() - ts
    print "time spend ", span
    return ret

print calcPi(3000,4000)
  • eventlet (伪线程)

from math import hypot
from random import random
import eventlet
import time

def test(tries):
    return sum(hypot(random(), random()) < 1 for _ in range(tries))

def calcPi(nbFutures, tries):
    ts = time.time()
    pool = eventlet.GreenPool()
    result = pool.imap(test, [tries] * nbFutures)
    
    ret = 4. * sum(result) / float(nbFutures * tries)
    span = time.time() - ts
    print "time spend ", span
    return ret

print calcPi(3000,4000)
  • SCOOP

SCOOP中的Future接口符合PEP-3148的定义,也就是在Python3中提供的Future接口。

在缺省的SCOOP配置环境下(单机,4个Worker),并发的性能有提高,但是不如两个进程池配置的多进程。

from math import hypot
from random import random
from scoop import futures

import time

def test(tries):
    return sum(hypot(random(), random()) < 1 for _ in range(tries))

def calcPi(nbFutures, tries):
    ts = time.time()
    expr = futures.map(test, [tries] * nbFutures)
    ret = 4. * sum(expr) / float(nbFutures * tries)
    span = time.time() - ts
    print "time spend ", span
    return ret

if __name__ == "__main__":
    print("pi = {}".format(calcPi(3000, 4000)))
  • Celery

任务代码

from celery import Celery

from math import hypot
from random import random
 
app = Celery('tasks', backend='amqp', broker='amqp://guest@localhost//')
app.conf.CELERY_RESULT_BACKEND = 'db+sqlite:///results.sqlite'
 
@app.task
def test(tries):
    return sum(hypot(random(), random()) < 1 for _ in range(tries))

客户端代码

from celery import group
from tasks import test

import time

def calcPi(nbFutures, tries):
    ts = time.time()
    result = group(test.s(tries) for i in xrange(nbFutures))().get()
    
    ret = 4. * sum(result) / float(nbFutures * tries)
    span = time.time() - ts
    print "time spend ", span
    return ret

print calcPi(3000, 4000)

使用Celery做并发的测试结果出乎意料(环境是单机,4frefork的并发,消息broker是rabbitMQ),是所有测试用例里最糟糕的,响应时间是没有并发的5~6倍。这也许是因为控制协调的开销太大。对于这样的计算任务,Celery也许不是一个好的选择。

  • asyncoro

    Asyncoro的测试结果和非并发保持一致。

import asyncoro

from math import hypot
from random import random
import time

def test(tries):
    yield sum(hypot(random(), random()) < 1 for _ in range(tries))


def calcPi(nbFutures, tries):
    ts = time.time()
    coros = [ asyncoro.Coro(test,t) for t in [tries] * nbFutures]
    ret = 4. * sum([job.value() for job in coros]) / float(nbFutures * tries)
    span = time.time() - ts
    print "time spend ", span
    return ret

print calcPi(3000,4000)


IO密集型

IO密集型的任务是另一种常见的用例,例如网络WEB服务器就是一个例子,每秒钟能处理多少个请求时WEB服务器的重要指标。

我们就以网页读取作为最简单的例子

from math import hypot
import time
import urllib2

urls = ['http://www.google.com', 'http://www.example.com', 'http://www.python.org']

def test(url):
    return urllib2.urlopen(url).read()

def testIO(nbFutures):
    ts = time.time()
    map(test, urls * nbFutures)

    span = time.time() - ts
    print "time spend ", span

testIO(10)

在不同并发库下的代码,由于比较类似,我就不一一列出。大家可以参考计算密集型中代码做参考。

通过测试我们可以发现,对于IO密集型的任务,使用多线程,或者是多进程都可以有效的提高程序的效率,而使用伪线程性能提升非常显著,eventlet比没有并发的情况下,响应时间从9秒提高到0.03秒。同时eventlet/gevent提供了非阻塞的异步调用模式,非常方便。这里推荐使用线程或者伪线程,因为在响应时间类似的情况下,线程和伪线程消耗的资源更少。


总结

Python提供了不同的并发方式,对应于不同的场景,我们需要选择不同的方式进行并发。选择合适的方式,不但要对该方法的原理有所了解,还应该做一些测试和试验,数据才是你做选择的最好参考。


Python 并行分布式框架之 PP

2015年3月20日 05:44

PP (Parallel Python)是基于Python的一个轻量级的,提供在SMP(多处理器或者多核系统)或者集群环境中并行执行Python代码的机制。

最简单和最常见的并行方式是使用多线程,然而如果应用程序使用Python提供的线程库, 它实际上并不能并行的运行Python的字节码(Byte-Code)。这是因为Pyton解释器使用GIL(全局解释器锁),这样的机制是的在同一时间,即使是多核的机器,也只能运行一个字节码指令。PP试图克服这样的限制,提供一种更简单的方式来编写并行应用。PP采用了多进程和进程间通信来处理并发,并隐藏所有的实现细节,使得其容易使用。

功能介绍

  • 并行运行Python代码(废话)

  • 易于理解的基于任务(Job)的并行技术

  • 自动检测优化配置

  • 动态处理器分配

  • 负载均衡

  • 容错

  • 自动发现和动态分配计算资源

  • 基于SHA的网络连接认证

  • 跨平台(Windows,Linux, Unix, Mac OS)和架构(X86,X86-64)支持

  • 开源 (BSD)

安装

wget  
tar -xvf pp-1.6.4.tar.gz
cd pp-1.6.4
sudo python setup.py install

架构和设计

Server

PP server 包含并管理多个worker并行的执行客户端发送的任务

Client

客户端负责发送任务(Python Function)到服务器

Cluster

多个PP Server可以构成一个PP Cluster,在CLuster模式客户端提交任务到Cluster,cluster 找到合适的Server来运行任务。

使用方式

SMP

在SMP的模式下使用PP非常简单

import pp
# create a job serer
job_server = pp.Server()

# submit some jobs with python functions
f1 = job_server.submit(func1, args1, depfuncs1, modules1)
f2 = job_server.submit(func1, args2, depfuncs1, modules1)
f3 = job_server.submit(func2, args3, depfuncs2, modules2)

# Get result from each jobs
r1 = f1()
r2 = f2()
r3 = f3()

Cluster

在Cluster模式下,需要在不同的节点运行ppserver

node-1> ./ppserver.py
node-2> ./ppserver.py
node-3> ./ppserver.py

客户端代码和SMP模式类似

import pp

# create cluster
ppservers=("node-1", "node-2", "node-3")

# create a job serer
pp.Server(ppservers=ppservers) 

# submit some jobs with python functions
f1 = job_server.submit(func1, args1, depfuncs1, modules1)
f2 = job_server.submit(func1, args2, depfuncs1, modules1)
f3 = job_server.submit(func2, args3, depfuncs2, modules2)

# Get result from each jobs
r1 = f1()
r2 = f2()
r3 = f3()


总结

PP是一个轻量级的并行框架,代码不多,安装使用起来也比较简单,并行方式是多进程,但是缺乏对任务的封装,也缺少调度的功能。适合于构造简单的并行分布式系统。

在我们的开发工程中经常会使用到各种图,所谓的图就是由节点和节点之间的连接所形成的系统,数学上专门有一个分支叫图论(Graph Theroy)。利用图我们可以做很多工具,比如思维导图,流程图,状态机,组织架构图,等等。今天我要做的是用开源的HTML5工具来快速构造一个做图的工具。

工具选择

预先善其事,必先利其器。第一件事是选择一件合适的工具,开源时代,程序员还是很幸福的,选择很多。

最终,我选择了jsPlumb,因为它完全开源,使用很简单,用D3的话可能会多花很多功夫。joint.js也不错。大家可以根据自己的需要选择。

构建静态应用

下面我们一步一步的来使用jsPlumb来创建我们的流程图工具。

第一步是等待DOM和jsPlumb初始化完毕,类似document.ready()和jquery.ready(), 要使用jsPlumb, 需要把代码放在这个函数里:

jsPlumb.ready(function() {
    // ... your code goes here ...
}

 

创建一个jsPlumb的实例,并初始化jsPlumb的配置参数:

//Initialize JsPlumb
var color = "#E8C870";
var instance = jsPlumb.getInstance({
    // notice the 'curviness' argument to this Bezier curve.  the curves on this page are far smoother
    // than the curves on the first demo, which use the default curviness value.      
    Connector : [ "Bezier", { curviness:50 } ],
    DragOptions : { cursor: "pointer", zIndex:2000 },
    PaintStyle : { strokeStyle:color, lineWidth:2 },
    EndpointStyle : { radius:5, fillStyle:color },
    HoverPaintStyle : {strokeStyle:"#7073EB" },
    EndpointHoverStyle : {fillStyle:"#7073EB" },
    Container:"container-id"
 });

 

这里给给出了一些配置包括,连接线(这里配置了一个贝塞尔曲线),线的风格,连接点得风格。Container需要配置一个对应的DIV容器的id。(这里也可以使用setContainer的方法)

下面我们要创建一个节点(node),每一个节点可以用一个DIV来实现。我这里提供了一个函数来创建节点。

function addNode(parentId, nodeId, nodeLable, position) {
  var panel = d3.select("#" + parentId);
  panel.append('div').style('width','120px').style('height','50px')
    .style('position','absolute')
    .style('top',position.y).style('left',position.x)
    .style('border','2px #9DFFCA solid').attr('align','center')
    .attr('id',nodeId).classed('node',true)
    .text(nodeLable);

  return jsPlumb.getSelector('#' + nodeId)[0];
}

 

这里做的事情就是创建了一个DIV元素,并放在对应的容器的制定位置上,注意为了支持拖拽的功能,必须使用position:absolute 。

我使用D3来操作DOM,大家可能会更习惯JQuery,这纯属个人喜好的问题。

最后返回创建节点的实例引用,这是的selector使用了jsPlumb.getSelector()方法,它和JQuery的selector是一样的,这样用的好处是你可以使用不同的DOM操作库,例如Vanilla

下面我使用一个函数来创建端点/锚点(anchor),锚点就是节点上的连接点,用于连接不同的节点。

function addPorts(instance, node, ports, type) {
  //Assume horizental layout
  var number_of_ports = ports.length;
  var i = 0;
  var height = $(node).height();  //Note, jquery does not include border for height
  var y_offset = 1 / ( number_of_ports + 1);
  var y = 0;

  for ( ; i < number_of_ports; i++ ) {
    var anchor = [0,0,0,0];
    var paintStyle = { radius:5, fillStyle:'#FF8891' };
    var isSource = false, isTarget = false;
    if ( type === 'output' ) {
      anchor[0] = 1;
      paintStyle.fillStyle = '#D4FFD6';
      isSource = true;
    } else {
      isTarget =true;
    }

    anchor[1] = y + y_offset;
    y = anchor[1];

    instance.addEndpoint(node, {
      uuid:node.getAttribute("id") + "-" + ports[i],
      paintStyle: paintStyle,
      anchor:anchor,
      maxConnections:-1,
      isSource:isSource,
      isTarget:isTarget
    });
  }
}

 

instance是jsPlumb的实例

node是我们用addNode方法创建的Node实例

ports,是一个string的数组,指定端点的个数和名字

type,可能是output或者input,指定端点的种类,一个节点的输出端口可以连接另一个节点的输入端口。

这里anchor是一个四维数组,0维和1维分别是锚点在节点x轴和y轴的偏移百分比。我这里希望把端口画在节点的左右两侧,并按照端口的数量均匀分布。

最后使用instance.addEndpoint来创建端点。注意这里只要指定isSource和isTarget就可以用drag&drop的方式来连接端点,非常方便。

下面一步我们提供一个函数来连接端点:

function connectPorts(instance, node1, port1, node2 , port2) {
  // declare some common values:
  var color = "gray";
  var arrowCommon = { foldback:0.8, fillStyle:color, width:5 },
  // use three-arg spec to create two different arrows with the common values:
  overlays = [
    [ "Arrow", { location:0.8 }, arrowCommon ],
    [ "Arrow", { location:0.2, direction:-1 }, arrowCommon ]
  ];

  var uuid_source = node1.getAttribute("id") + "-" + port1;
  var uuid_target = node2.getAttribute("id") + "-" + port2;

  instance.connect({uuids:[uuid_source, uuid_target]});
}

 

node1和node2是源节点和目标节点的引用,port1和port2是源端口和目标端口的名字。

使用instance.connect方法来创建连接。 overlays用来添加连接线的箭头效果或者其他风格,我这里没有使用,因为觉得都不是很好看。大家如果要用,只要把overlays加入到instance.connect的方法参数就可以了。

调用以上方法来创建节点,端点和连接线。

var node1 = addNode('container-id','node1', 'node1', {x:'80px',y:'20px'});
var node2 = addNode('container-id','node2', 'node2', {x:'280px',y:'20px'});

addPorts(instance, node1, ['out1','out2'],'output');
addPorts(instance, node2, ['in','in1','in2'],'input');

connectPorts(instance, node1, 'out2', node2, 'in');

 

这里我们创建了两个节点,第一个节点有两个输出端口,第二个节点有三个输入端口,然后把第一个节点的out2端口连接到第二个端点的in端口。效果如下:

最后我们给节点增加drag&drop的功能,这样我们就可以拖动这些节点来改变图的布局了。

instance.draggable($('.node'));

 

这里似乎依赖于JQuery-UI,我还不是很清楚。

交互式创建节点

我们已经初步具有了创建图的功能,可是节点的创建必须通过程序,我们希望用交互的方式来创建节点。

通常我们希望有一个tree view的控件,让后通过拖拽来创建对应类型的节点。这里我使用了这个开源的tree view,基于bootstrap https://github.com/jonmiles/bootstrap-treeview

我们先创建一个tree view:

function getTreeData() {
  var tree = [
    {
      text: "Nodes",
      nodes: [
        {
          text: "Node1",
        },
        {
          text: "Node2"
        }
      ]
    }
  ]; 

  return tree;
}
//Initialize Control Tree View
$('#control-panel').treeview({data: getTreeData()});

 

树上有两个节点:

然后我实现从树上拖拽对应的节点,到流程图上的逻辑。

//Handle drag and drop
$('.list-group-item').attr('draggable','true').on('dragstart', function(ev){
  //ev.dataTransfer.setData("text", ev.target.id);
  ev.originalEvent.dataTransfer.setData('text',ev.target.textContent);
  console.log('drag start');
});

$('#container-id').on('drop', function(ev){
  //avoid event conlict for jsPlumb
  if (ev.target.className.indexOf('_jsPlumb') >= 0 ) {
    return;
  }

  ev.preventDefault();
  var mx = '' + ev.originalEvent.offsetX + 'px';
  var my = '' + ev.originalEvent.offsetY + 'px';

  console.log('on drop : ' + ev.originalEvent.dataTransfer.getData('text'));
  var uid = new Date().getTime();
  var node = addNode('flow-panel','node' + uid, 'node', {x:mx,y:my});
  addPorts(instance, node, ['out'],'output');
  addPorts(instance, node, ['in1','in2'],'input');
  instance.draggable($(node));
}).on('dragover', function(ev){
  ev.preventDefault();
  console.log('on drag over');
});

 

这里要注意的是要避免和jsPlumb拖拽端点的逻辑冲突,当检测到target是jsPlumb对象是需要直接从drop方法中退出以执行对应的jsPlumb的drop逻辑。

好了,一个绘制流程图的软件工具初步完工。

我把代码放在oschina的代码托管服务上了, 大家有兴趣可以下来试试 http://git.oschina.net/gangtao/FlowChart-Builder

在我们的开发过程中,经常会需要对我们开发的程序做性能分析,有很多性能分析的工具,很多语言都提供了不同的profiling工具,这些工具很有用,提供了程序运行的原始记录数据,通过对这些数据的分析,可以得到程序运行的性能状况,找到问题所在。然而,这样的工具手机的数据比较原始,往往还需要一些更进一步的分析,才能定位问题。

Splunk是一个可以运行在不同平台上的机器数据的实时运维平台,所谓机器数据,就是指机器产生的数据,其中一个常见的场景就是日志。对于广大程序员来说,分析日志是一个非常常见,而且繁琐的工作,而且很多时候,必须通过日志来对程序进行调试,例如多线程的情况。记得以前为了几百兆或者几G的日志进行分析,不得不写了logViewer来分析。现在有了Splunk,真的极大的简化了对日志分析的工作。(注Splunk免费版支持每天500M的日志数据,超过这个额度需要收费)

通过日志进行性能测试是非常常见的,传统的也是在要分析的代码处,注入性能日志,然后在程序运行后,对写入的性能数据进行分析。使用Splunk,方法是一样的,但是有以下明显的改进

  1. Splunk提供大量友好的分析命令和图表,无需另行开发分析日志的程序

  2. Splunk可以实时的对应用程序作分析,可以在程序的运行过程中,一边运行,一边分析

我下面举一个我碰到的例子。

我要分析的程序是一个从AWS CloudWatch收集数据的Python程序。收集数据使用的是AWS提供的Restful API (Boto),为了更高效的收集数据,程序使用多个线程来调用Restful API 的Query接口。我希望通过性能日志了解每一个请求大概的耗时,以决定使用多少个线程数和对应的采集间隔。

首先,需要写日志:

logger.log(logging.DEBUG, "PerfLog=QueryStart" )
## Query Code Goes Here
do_query_aws()
## Query Complete
logger.log(logging.DEBUG, "PerfLog=QueryEnd, Query Result)

 

注意使用Name=Value的形式可以帮助Splunk在搜索时,提取要分析的字段。

然后运行程序,程序运行以后会生成日志文件,把该日志文件导入到Splunk,开始分析。

点击Add Data按钮,然后跟随Splunk的指导,选择A file or directory of files. 导入你的日志文件,导入过程中,Splunk会要求给你的日志文件命名一个sourcetype,我用的是“cloud_watch_debug”

导入好以后就可以开始搜索了。

在搜索框中输入

sourcetype="cloud_watch_debug”

 

Splunk会实时的返回所有的日志文件,并按时间解析为一个个的事件。

Splunk的SPL(Splunk Search Language)是一个类似SQL和UNIX Command的综合体,可以对数据进行搜索,分析,统计,生成图表,支持管道,使用起来非常方便,建议大家通过官方文档来了解。

我么今天要做的是性能分析,那么我就是要统计一下,发了多少个query,每一个query用了多少时间。

每一条日志的内容大致如下

2014-08-11 10:52:40,587 DEBUG pid=3742 tid=QueryWorkerThread-1 file=aws_cloudwatch.py:_main_work_loop:469 | PerfLog = QueryStart

 

Splunk能够提取出大量的信息和字段(field),包括事件,pid,tid,file等等,还有我们在日志中加入的字段PerfLog。

想要知道每一个查询所花费的时间,可以通过Splunk提供的transaction命令。

sourcetype=cloud_watch_debug | transaction tid startswith="QueryStart" endswith="QueryEnd"

 

  • tid表示每一个transaction需要有相同的tid,也就是说同一个线程

  • startwith和endwith表示transaction的其实和结束标志

该命令返回所有的query的transaction
 

然后我们就可以统计每一个transaction所用的时间

sourcetype=cloud_watch_debug | 
transaction tid startswith="QueryStart" endswith="QueryEnd" | 
stats sum(duration),count, avg(duration),max(duration),min(duration)

 

  • stats命令用于对数据进行统计

  • duration是Splunk对transaction生成的事件跨度

  • sum,count,avg,max,min是统计命令

运行结果如下:

程序一共发送了111075个cloudwatch的请求,最慢的需要2.5秒,最快的0.06秒,平均大概0.11秒。

我还想知道query的耗时随时间的变化,我可以生成一个timechart

sourcetype=cloud_watch_debug | 
transaction tid startswith="QueryStart" endswith="QueryEnd" | 
timechart avg(duration)

 

结果如下(最近1小时):

通过该分析在过去的一个小时里10:30和11:00之后的十分钟时间段,耗时略有上升,大概峰值0.2秒。

 

总结:

Splunk的日志分析功能非常强大,而且500M的免费版基本能够满足大部分程序员对程序日志的分析要求。有效地使用Splunk来进行日志分析,可以做到事半功倍,小伙伴们快来试用吧!

大数据时代,数据的异常分析被广泛的用于各个场合。 今天我们就来看一看其中的一种场景,对于单变量数据集的异常检测。

所谓单变量,就是指数据集中只有一个变化的值,下面我们来看看今天我们要分析的的数据,点击这里数据文件下载数据文件。

分析数据的第一步是要加载文件, 本文使用了numpy,pandas,scikit learn等常见的数据分析要用到的Python库。

import numpy as np
import pandas as pd
df = pd.read_csv("farequote.csv")

Pandas 是一个常用的数据分析的Python库,提供对数据的加载,清洗,抽取,变形等操作。Pandas依赖numpy,numpy提供了基于列/多维数组(List/N-D Array)的数据结构的操作。许多科学计算和数据分析的库都依赖于numpy。

df 是Pandas中常用的数据类型dataframe,dataframe类似与一个数据库的表,使用 df.head()可以得到数据的头几行,以便了解数据的概貌。

该数据结构中,第一列式Pandas添加的索引,第一行是每一列数据的名字,除了第一列,每一列数据可以看成是一个变量,所以该数据集共有三个变量,时间(_time)、航空公司名称(airline)、响应时间(responsetime)。我们可以这样理解,该数据集记录了一段时间内,各个航空公司飞机延误的时间。我们希望通过分析找出是否存在异常的情况。

注意,我们是要分析单变量,所以所有的分析都是基于某一个航空公司的数据,所以就需要对该数据集做一个查询,找出要分析的航空公司。首先要知道有哪些航空公司,使用np.unique(df.airline)可以找到所有的航空公司代码,类似SQL的Unique命令

array(['AAL', 'ACA', 'AMX', 'ASA', 'AWE', 'BAW', 'DAL', 'EGF', 'FFT',
       'JAL', 'JBU', 'JZA', 'KLM', 'NKS', 'SWA', 'SWR', 'TRS', 'UAL', 'VRD'], 
      dtype='|S3')

查询某个航空公司的数据使用dataframe的query方法,类似SQL的select。Query返回的结果仍然是一个dataframe对象。

dd = df.query('airline=="KLM"') ## 得到法航的数据

我们先了解一下数据的大致信息,使用describe方法

dd.responsetime.describe()

得到如下的结果:

count    1724.000000
mean     1500.613766
std       100.085320
min      1209.766800
25%      1434.084625
50%      1499.135000
75%      1567.831025
max      1818.774100
Name: responsetime, dtype: float64

该结果返回了数据集responsetime维度上的主要统计指标,个数,均值,方差,最大最小值等等,也可以调用单独的方法例如min(),mean()等来获得某一个指标。

基于标准差得异常检测

下面我们就可以开始异常点的分析了,对于单变量的异常点分析,最容易想到的就是基于标准差(Standard Deviation)的方法了。我们假定数据的正态分布的,利用概率密度函数,我们知道

  • 95.449974面积在平均数左右两个标准差的范围内

  • 99.730020%的面积在平均数左右三个标准差的范围内

  • 99.993666的面积在平均数左右三个标准差的范围内

所以我们95%也就是大概两个标准差为门限,凡是落在门限外的都认为是异常点。代码如下

def a1(dataframe, threshold=.95):
    d = dataframe['responsetime']
    dataframe['isAnomaly'] = d > d.quantile(threshold)  
    return dataframe
print a1(dd)

运行以上程序我们得到如下结果

                             _time airline  responsetime isAnomaly
20    2013-02-01T23:57:59.000-0700     KLM     1481.4945     False
76    2013-02-01T23:52:34.000-0700     KLM     1400.9050     False
124   2013-02-01T23:47:10.000-0700     KLM     1501.4313     False
203   2013-02-01T23:39:08.000-0700     KLM     1278.9509     False
281   2013-02-01T23:32:27.000-0700     KLM     1386.4157     False
336   2013-02-01T23:26:09.000-0700     KLM     1629.9589     False
364   2013-02-01T23:23:52.000-0700     KLM     1482.5900     False
448   2013-02-01T23:16:08.000-0700     KLM     1553.4988     False
511   2013-02-01T23:10:39.000-0700     KLM     1555.1894     False
516   2013-02-01T23:10:08.000-0700     KLM     1720.7862      True
553   2013-02-01T23:06:29.000-0700     KLM     1306.6489     False
593   2013-02-01T23:03:03.000-0700     KLM     1481.7081     False
609   2013-02-01T23:01:29.000-0700     KLM     1521.0253     False
666   2013-02-01T22:56:04.000-0700     KLM     1675.2222      True
...   ...   ...   ...

结果数据集上多了一列isAnomaly用来标记每一行记录是否是异常点,我们看到已经有一些点被标记为异常点了。

我们看看程序的详细内容:

  1. 方法a1定义了一个异常检测的函数

  2. dataframe['responsetime']等价于dataframe.responsetime,该操作取出responsetime这一列的值

  3. d.quantile(threshold)用正态分布假定返回位于95%的点的值,大于该值得点都落在正态分布95%之外

  4. d > d.quantile(threshold)是一个数组操作,返回的新数组是responsetime和threshold的比较结果,[False,False,True,... ... False]

  5. 然后通过dataframe的赋值操作增加一个新的列,标记所有的异常点。

数据可视化往往是数据分析的最后一步,我们看看结果如何:

import matplotlib.pyplot as plt
da = a1(dd)
fig = plt.figure()
ax1 = fig.add_subplot(2, 1, 1)
ax2 = fig.add_subplot(2, 1, 2)
ax1.plot(da['responsetime'])
ax2.plot(da['isAnomaly'])

这异常点也太多了,用99%在试试:

现在似乎好一点,然而我们知道,对于数据集的正态分布的假定往往是不成立的,假如数据分布在大小两头,那么这样的异常检测就很难奏效了。我们看看其他一些改进的方法。

基于ZSCORE的异常检测

zscore的计算如下

sd是标准差,X是均值。一般建议门限值取为3.5

代码如下:

def a2(dataframe, threshold=3.5):
    d = dataframe['responsetime']
    zscore = (d - d.mean())/d.std()
    dataframe['isAnomaly'] = zscore.abs() > threshold
    return dataframe

另外还有一种增强的zscore算法,基于MAD。MAD的定义是

其中X是中位数。

增强的zscore算法如下:

def a3(dataframe, threshold=3.5):
    dd = dataframe['responsetime']
    MAD = (dd - dd.median()).abs().median()
    zscore = ((dd - dd.median())* 0.6475 /MAD).abs()
    dataframe['isAnomaly'] = zscore > threshold
    return dataframe

用zscore算法得到:

调整门限为3得到

如果换一组数据AAL,结果会怎么样呢?

我们发现有一段时间,所有的响应都很慢,我们想要把这些点都标记为异常,可能么?

基于KMEAN聚集的异常检测

通常基于KMEAN的聚集算法并不适用于异常点检测,以为聚集算法总是试图平衡每一个聚集中的点的数目,所以对于少数的异常点,聚集非常不好用,但是我们这个例子中,异常点都聚在一起,所以应该可以使用。

首先,为了看清聚集,我们使用时间序列的常用分析方法,增加一个维度,该维度是每一个点得前一个点得响应时间。

preresponse = 0
newcol = []
newcol.append(0)
for index, row in dd.iterrows():
    if preresponse != 0:
        newcol.append(preresponse)
    preresponse = row.responsetime
dd["t0"] = newcol
plt.scatter(dd.t0,dd.responsetime)

我们利用iterrows来循环数据,把前一个点的响应时间增加到当前点,第一个点的该值为0,命名该列为t0。然后用scatter plot把它画出来。

上面是法航KLM的数据,其中最左边的点是一个无效的点,因为前一个点的响应时间不知道所以填了0,分析时应该过滤该店。

对于AAL,我们可以清楚的看到两个聚集:

其中右上方的聚集,也就是点数目比较少得聚集就是我们希望检测到的异常点得集合。

我们看看如何使用KMEAN算法来检测吧:

def a4(dataframe, threshold = .9):
    ## add one dimention of previous response
    preresponse = 0
    newcol = []
    newcol.append(0)
    for index, row in dataframe.iterrows():
        if preresponse != 0:
            newcol.append(preresponse)
        preresponse = row.responsetime
    dataframe["t0"] = newcol
    ## remove first row as there is no previous event for time
    dd = dataframe.drop(dataframe.head(1).index) 
    clf = cluster.KMeans(n_clusters=2)
    X=np.array(dd[['responsetime','t0']])
    cls = clf.fit_predict(X)
    freq = itemfreq(cls)
    (A,B) = (freq[0,1],freq[1,1])
    t = abs(A-B)/max(A,B)
    if t > threshold :
        ## "Anomaly Detected!"
        index = freq[0,0]
        if A > B :
            index = freq[1,0]
        dd['isAnomaly'] = (cls == index)
    else :
        ## "No Anomaly Point"
        dd['isAnomaly'] = False
    return dd

其核心代码是以下这几行:

clf = cluster.KMeans(n_clusters=2)
X=np.array(dd[['responsetime','t0']])
cls = clf.fit_predict(X)

cluster.KMeans返回一个预测模型,我们假定有两个聚集。你可以试着加大聚集的数量,结果没什么影响。

dd[['responsetime','t0']]返回一个2*n的数组,并赋值给X,用于聚集计算。

fit_pridict方法是对X做聚集运算,并计算每一个点对应的聚集编号。

freq = itemfreq(cls)

itemfreq返回聚集结果中每一个聚集的发生频率,如果其中一个比另一个显著地多,我们则认为那个少得是异常点聚集。

用该方法可以把所有聚集里的点标记为异常点。

这里我用红色标记结果让大家看的清楚一点,注意因为是line chart,连个竖线间的都是异常点。

总结

除了上述的算法,还有其它一些相关的算法,大家如果对背后的数据知识有兴趣的话,可以参考这篇相关介绍

单变量的异常检测算法相对比较简单,但是要做到精准检测就更难,因为掌握的信息更少。另外boxplot也经常被用于异常检测,他和基于方差的异常检测是一致的,只不过用图形让大家一目了然的获得结果,大家有兴趣可以了解一下。


背景

Web Scraping

在大数据时代,一切都要用数据来说话,大数据处理的过程一般需要经过以下的几个步骤

  • 数据的采集和获取

  • 数据的清洗,抽取,变形和装载

  • 数据的分析,探索和预测

  • 数据的展现

其中首先要做的就是获取数据,并提炼出有效地数据,为下一步的分析做好准备。

数据的来源多种多样,以为我本身是足球爱好者,而世界杯就要来了,所以我就想提取欧洲联赛的数据来做一个分析。许多的网站都提供了详细的足球数据,例如:

这些网站都提供了详细的足球数据,然而为了进一步的分析,我们希望数据以格式化的形式存储,那么如何把这些网站提供的网页数据转换成格式化的数据呢?这就要用到Web scraping的技术了。简单地说,Web Scraping就是从网站抽取信息, 通常利用程序来模拟人浏览网页的过程,发送http请求,从http响应中获得结果。

Web Scraping 注意事项

在抓取数据之前,要注意以下几点:

  • 阅读网站有关数据的条款和约束条件,搞清楚数据的拥有权和使用限制

  • 友好而礼貌,使用计算机发送请求的速度飞人类阅读可比,不要发送非常密集的大量请求以免造成服务器压力过大

  • 因为网站经常会调整网页的结构,所以你之前写的Scraping代码,并不总是能够工作,可能需要经常调整

  • 因为从网站抓取的数据可能存在不一致的情况,所以很有可能需要手工调整

Python Web Scraping 相关的库

Python提供了很便利的Web Scraping基础,有很多支持的库。这里列出一小部分

当然也不一定要用Python或者不一定要自己写代码,推荐关注import.io

Web Scraping 代码

下面,我们就一步步地用Python,从腾讯体育来抓取欧洲联赛13/14赛季的数据。

首先要安装Beautifulsoup

pip install beautifulsoup4

 

我们先从球员的数据开始抓取。

球员数据的Web请求是http://soccerdata.sports.qq.com/playerSearch.aspx?lega=epl&pn=2 ,返回的内容如下图所示:

该web服务有两个参数,lega表示是哪一个联赛,pn表示的是分页的页数。

首先我们先做一些初始化的准备工作

from urllib2 import urlopen
import urlparse
import bs4

BASE_URL = "http://soccerdata.sports.qq.com"
PLAYER_LIST_QUERY = "/playerSearch.aspx?lega=%s&pn=%d"
league = ['epl','seri','bund','liga','fran','scot','holl','belg']
page_number_limit = 100
player_fields = ['league_cn','img','name_cn','name','team','age','position_cn','nation','birth','query','id','teamid','league']

 

urlopen,urlparse,bs4是我们将要使用的Python库。

BASE_URL,PLAYER_LIST_QUERY,league,page_number_limit和player_fields是我们会用到的一些常量。

下面是抓取球员数据的具体代码:

def get_players(baseurl):
    html = urlopen(baseurl).read()
    soup = bs4.BeautifulSoup(html, "lxml")
    players = [ dd for dd in soup.select('.searchResult tr') if dd.contents[1].name != 'th']
    result = []
    for player in players:
        record = []
        link = ''
        query = []
        for item in player.contents:
            if type(item) is bs4.element.Tag:
                if not item.string and item.img:
                    record.append(item.img['src'])
                else :
                    record.append(item.string and item.string.strip() or 'na')
                try:
                    o = urlparse.urlparse(item.a['href']).query
                    if len(link) == 0:
                        link = o
                        query = dict([(k,v[0]) for k,v in urlparse.parse_qs(o).items()])
                except:
                    pass
             
        if len(record) != 10:
            for i in range(0, 10 - len(record)):
                record.append('na')
        record.append(unicode(link,'utf-8'))
        record.append(unicode(query["id"],'utf-8'))
        record.append(unicode(query["teamid"],'utf-8'))
        record.append(unicode(query["lega"],'utf-8'))
        result.append(record)
    return result
    
result = []
for url in [ BASE_URL + PLAYER_LIST_QUERY % (l,n) for l in league for n in range(page_number_limit) ]:
    result = result +  get_players(url)

 

我们来看看抓取球员数据的详细过程:

首先我们定义了一个get_players方法,该方法会返回某一请求页面上所有球员的数据。为了得到所有的数据,我们通过一个for循环,因为要循环各个联赛,每个联赛又有多个分页,一般情况下是需要一个双重循环的:

for i in league:
    for j in range(0, 100):
        url = BASE_URL + PLAYER_LIST_QUERY % (l,n)
        ## send request to url and do scraping

 

Python的list comprehension可以很方便的通过构造一个列表的方式来减少循环的层次。

另外Python还有一个很方便的语法来合并连个列表: list = list1 + list2

好我们再看看如何使用BeautifulSoup来抓取网页中我们需要的内容。

首先调用urlopen读取对应url的内容,通常是一个html,用该html构造一个beautifulsoup对象。

beautifulsoup对象支持很多查找功能,也支持类似css的selector。通常如果有一个DOM对象是<xx class='cc'>,我们使用以下方式来查找:

obj = soup.find("xx","cc")

 

另外一种常见的方式就是通过CSS的selector方式,在上述代码中,我们选择class=searchResult元素里面,所有的tr元素,过滤掉th也就是表头元素。

for dd in soup.select('.searchResult tr') if dd.contents[1].name != 'th'

 

对于每一行记录tr,生成一条球员记录,并存放在一个列表中。所以我们就循环tr的内容tr.contents,获得对应的field内容。

对于每一个tr的content,我们先检查其类型是不是一个Tag,对于Tag类型有几种情况,一种是包含img的情况,我们需要取出球员的头像图片的网址。

另一种是包含了一个链接,指向其他数据内容

所以在代码中要分别处理这些不同的情况。

对于一个Tag对象,Tag.x可以获得他的子对象,Tag['x']可以获得Tag的attribute的值。

所以用item.img['src']可以获得item的子元素img的src属性。

对已包含链接的情况,我们通过urlparse来获取查询url中的参数。这里我们利用了dict comprehension的把查询参数放入一个dict中,最有添加到列表中。

dict([(k,v[0]) for k,v in urlparse.parse_qs(o).items()])

 

对于其它情况,我们使用Python 的and or表达式以确保当Tag的内容为空时,我们写入‘na’,该表达式类似C/C++或Java中的三元操作符 X ? A : B

然后有一段代码判断当前记录的长度是否大于10,不大于10则用空值填充,目的是避免一些不一致的地方。

if len(record) != 10:
    for i in range(0, 10 - len(record)):
        record.append('na')

 

最后,我们把query中的一些相关的参数如球员的id,球队的id,所在的联赛代码等加入到列表。

record.append(unicode(link,'utf-8'))
record.append(unicode(query["id"],'utf-8'))
record.append(unicode(query["teamid"],'utf-8'))
record.append(unicode(query["lega"],'utf-8'))

 

最后我们把本页面所有球员的列表放入一个列表返回。

好了,现在我们拥有了一个包含所有球员的信息的列表,我们需要把它存下来,以进一步的处理,分析。通常,csv格式是一个常见的选择。

import csv
def write_csv(filename, content, header = None): 
    file = open(filename, "wb")
    file.write('\xEF\xBB\xBF')
    writer = csv.writer(file, delimiter=',')
    if header:
        writer.writerow(header)
    for row in content:
        encoderow = [dd.encode('utf8') for dd in row]
        writer.writerow(encoderow)

write_csv('players.csv',result,player_fields)

 

这里需要注意的就是关于encode的问题。因为我们使用的时utf-8的编码方式,在csv的文件头,需要写入\xEF\xBB\xBF,详见这篇文章

好了现在大功告成,抓取的csv如下图:

因为之前我们还抓取了球员本赛季的比赛详情,所以我们可以进一步的抓取所有球员每一场比赛的记录

抓取的代码如下

def get_player_match(url):
    html = urlopen(url).read()
    soup = bs4.BeautifulSoup(html, "lxml")
    matches = [ dd for dd in soup.select('.shtdm tr') if dd.contents[1].name != 'th']
    records = []
    for item in [ dd for dd in matches if len(dd.contents) > 11]: ## filter out the personal part
        record = []
        for match in [ dd for dd in item.contents if type(dd) is bs4.element.Tag]:
            if match.string:
                record.append(match.string)
            else:
                for d in [ dd for dd in match.contents if type(dd) is bs4.element.Tag]:
                    query = dict([(k,v[0]) for k,v in urlparse.parse_qs(d['href']).items()])
                    record.append('teamid' in query and query['teamid'] or query['id'])   
                    record.append(d.string and d.string or 'na')                    
        records.append(record)
    return records[1:]  ##remove the first record as the header

def get_players_match(playerlist, baseurl = BASE_URL + '/player.aspx?'):
    result = []
    for item in playerlist:
        url =  baseurl + item[10]
        print url
        result = result + get_player_match(url)
    return result
match_fields = ['date_cn','homeid','homename_cn','matchid','score','awayid','awayname_cn','league_cn','firstteam','playtime','goal','assist','shoot','run','corner','offside','foul','violation','yellowcard','redcard','save']    
write_csv('m.csv',get_players_match(result),match_fields)

 

抓取的过程和之前类似。

下一步做什么

现在我们拥有了详细的欧洲联赛的数据,那么下一步要怎么做呢,我推荐大家把数据导入BI工具来做进一步的分析。有两个比较好的选择:

Tableau在数据可视化领域可谓无出其右,Tableau Public完全免费,用数据可视化来驱动数据的探索和分析,拥有非常好的用户体验

 

Splunk提供一个大数据的平台,主要面向机器数据。支持每天免费导入500M的数据,如果是个人学习,应该足够了。

当然你也可以用Excel。 另外大家如果有什么好的免费的数据分析的平台,欢迎交流。

 

探索Javascript异步编程

2014年5月21日 17:52

笔者在之前的一片博客中简单的讨论了Python和Javascript的异同,其实作为一种编程语言Javascript的异步编程是一个非常值得讨论的有趣话题。

JavaScript 异步编程简介

回调函数和异步执行

所谓的异步指的是函数的调用并不直接返回执行的结果,而往往是通过回调函数异步的执行。

我们先看看回调函数是什么:

var fn = function(callback) {
    // do something here
    ...
    callback.apply(this, para);
};

var mycallback = function(parameter) {
    // do someting in customer callback
};

// call the fn with callback as parameter
fn(mycallback);

 

回调函数,其实就是调用用户提供的函数,该函数往往是以参数的形式提供的。回调函数并不一定是异步执行的。比如上述的例子中,回调函数是被同步执行的。大部分语言都支持回调,C++可用通过函数指针或者回调对象,Java一般也是使用回调对象。

在Javascript中有很多通过回调函数来执行的异步调用,例如setTimeout()或者setInterval()。

setTimeout(function(){
    console.log("this will be exectued after 1 second!");
},1000);

 

在以上的例子中,setTimeout直接返回,匿名函数会在1000毫秒(不一定能保证是1000毫秒)后异步触发并执行,完成打印控制台的操作。也就是说在异步操作的情境下,函数直接返回,把控制权交给回调函数,回调函数会在以后的某一个时间片被调度执行。那么为什么需要异步呢?为什么不能直接在当前函数中完成操作呢?这就需要了解Javascript的线程模型了。

Javascript线程模型和事件驱动

Javascript最初是被设计成在浏览器中辅助提供HTML的交互功能。在浏览器中都包含一个Javascript引擎,Javscript程序就运行在这个引擎之中,并且只有一个线程。单线程能都带来很多优点,程序员们可以很开心的不用去考虑诸如资源同步,死锁等多线程阻塞式编程所需要面对的恼人的问题。但是很多人会问,既然Javascript是单线程的,那它又如何能够异步的执行呢?

这就需要了解到Javascript在浏览器中的事件驱动(event driven)机制。事件驱动一般通过事件循环(event loop)和事件队列(event queue)来实现的。假定浏览器中有一个专门用于事件调度的实例(该实例可以是一个线程,我们可以称之为事件分发线程event dispatch thread),该实例的工作就是一个不结束的循环,从事件队列中取出事件,处理所有很事件关联的回调函数(event handler)。注意回调函数是在Javascript的主线程中运行的,而非事件分发线程中,以保证事件处理不会发生阻塞。

Event Loop Code:

while(true) {
 var event = eventQueue.pop();
 if(event && event.handler) {
     event.handler.execute(); // execute the callback in Javascript thread
 } else {
     sleep(); //sleep some time to release the CPU do other stuff
 }
}

 

通过事件驱动机制,我们可以想象Javascript的编程模型就是响应一系列的事件,执行对应的回调函数。很多UI框架都采用这样的模型(例如Java Swing)。

那为什要异步呢,同步不是很好么?

异步的主要目的是处理非阻塞,在和HTML交互的过程中,会需要一些IO操作(典型的就是Ajax请求,脚本文件加载),如果这些操作是同步的,就会阻塞其它操作,用户的体验就是页面失去了响应。

综上所述Javascript通过事件驱动机制,在单线程模型下,以异步回调函数的形式来实现非阻塞的IO操作。

Javascript异步编程带来的挑战

Javascript的单线程模型有很多好处,但同时也带来了很多挑战。

代码可读性

想象一下,如果某个操作需要经过多个非阻塞的IO操作,每一个结果都是通过回调,程序有可能会看上去像这个样子。

operation1(function(err, result) {
    operation2(function(err, result) {
        operation3(function(err, result) {
            operation4(function(err, result) {
                operation5(function(err, result) {
                    // do something useful
                })
            })
        })
    })
})

 

我们称之为意大利面条式(spaghetti)的代码。这样的代码很难维护。这样的情况更多的会发生在server side的情况下。

流程控制

异步带来的另一个问题是流程控制,举个例子,我要访问三个网站的内容,当三个网站的内容都得到后,合并处理,然后发给后台。代码可以这样写:

var urls = ['url1','url2','url3'];
var result = [];

for (var i = 0, len = urls.length(); i < len; i++ ) {
    $.ajax({
        url: urls[i],
        context: document.body,
        success: function(){
          //do something on success
          result.push("one of the request done successfully");
          if (result.length === urls.length()) {
              //do something when all the request is completed successfully
          }
        }});
}

 

上述代码通过检查result的长度的方式来决定是否所有的请求都处理完成,这是一个很丑陋方法,也很不可靠。

异常和错误处理

通过上一个例子,我们还可以看出,为了使程序更健壮,我们还需要加入异常处理。 在异步的方式下,异常处理分布在不同的回调函数中,我们无法在调用的时候通过try...catch的方式来处理异常, 所以很难做到有效,清楚。

更好的Javascript异步编程方式

“这是最好的时代,也是最糟糕的时代”

为了解决Javascript异步编程带来的问题,很多的开发者做出了不同程度的努力,提供了很多不同的解决方案。然而面对如此众多的方案应该如何选择呢?我们这就来看看都有哪些可供选择的方案吧。

Promise

Promise 对象曾经以多种形式存在于很多语言中。这个词最先由C++工程师用在Xanadu 项目中,Xanadu 项目是Web 应用项目的先驱。随后Promise 被用在E编程语言中,这又激发了Python 开发人员的灵感,将它实现成了Twisted 框架的Deferred 对象。

2007 年,Promise 赶上了JavaScript 大潮,那时Dojo 框架刚从Twisted框架汲取灵感,新增了一个叫做dojo.Deferred 的对象。也就在那个时候,相对成熟的Dojo 框架与初出茅庐的jQuery 框架激烈地争夺着人气和名望。2009 年,Kris Zyp 有感于dojo.Deferred 的影响力提出了CommonJS 之Promises/A 规范。同年,Node.js 首次亮相。

在编程的概念中,future,promise,和delay表示同一个概念。Promise翻译成中文是“承诺”,也就是说给你一个东西,我保证未来能够做到,但现在什么都没有。它用来表示异步操作返回的一个对象,该对象是用来获取未来的执行结果的一个代理,初始值不确定。许多语言都有对Promise的支持。

Promise的核心是它的then方法,我们可以使用这个方法从异步操作中得到返回值,或者是异常。then有两个可选参数(有的实现是三个),分别处理成功和失败的情景。

var promise = doSomethingAync()
promise.then(onFulfilled, onRejected)

 

异步调用doSomethingAync返回一个Promise对象promise,调用promise的then方法来处理成功和失败。这看上去似乎并没有很大的改进。仍然需要回调。但是和以前的区别在于,首先异步操作有了返回值,虽然该值只是一个对未来的承诺;其次通过使用then,程序员可以有效的控制流程异常处理,决定如何使用这个来自未来的值。

对于嵌套的异步操作,有了Promise的支持,可以写成这样的链式操作:

operation1().then(function (result1) {
    return operation2(result1)
}).then(function (result2) {
    return operation3(result2);
}).then(function (result3) {
    return operation4(result3);
}).then(function (result4) {
    return operation5(result4)
}).then(function (result5) {
    //And so on
});

 

Promise提供更便捷的流程控制,例如Promise.all()可以解决需要并发的执行若干个异步操作,等所有操作完成后进行处理。

var p1 = async1();
var p2 = async2();
var p3 = async3();
Promise.all([p1,p2,p3]).then(function(){
    // do something when all three asychronized operation finished
});

 

对于异常处理,

doA()
  .then(doB)
  .then(null,function(error){
      // error handling here
  })

 

如果doA失败,它的Promise会被拒绝,处理链上的下一个onRejected会被调用,在这个例子中就是匿名函数function(error){}。比起原始的回调方式,不需要在每一步都对异常进行处理。这生了不少事。

以上只是对于Promise概念的简单陈述,Promise拥有许多不同规范建议(A,A+,B,KISS,C,D等),名字(Future,Promise,Defer),和开源实现。大家可以参考一下的这些链接。


如果你有选择困难综合症,面对这么多的开源库不知道如何决断,先不要急,这还只是一部分,还有一些库没有或者不完全采用Promise的概念

Non-Promise

下面列出了其它的一些开源的库,也可以帮助解决Javascript中异步编程所遇到的诸多问题,它们的解决方案各不相同,我这里就不一一介绍了。大家有兴趣可以去看看或者试用一下。

Non-3rd Party

其实,为了解决Javascript异步编程带来的问题,不一定非要使用Promise或者其它的开源库,这些库提供了很好的模式,但是你也可以通过有针对性的设计来解决。

比如,对于层层回调的模式,可以利用消息机制来改写,假定你的系统中已经实现了消息机制,你的code可以写成这样:

eventbus.on("init", function(){
    operationA(function(err,result){
        eventbus.dispatch("ACompleted");
    });
});

eventbus.on("ACompleted", function(){
    operationB(function(err,result){
        eventbus.dispatch("BCompleted");
    });
});

eventbus.on("BCompleted", function(){
    operationC(function(err,result){
        eventbus.dispatch("CCompleted");
    });
});

eventbus.on("CCompleted", function(){
    // do something when all operation completed
});

 

这样我们就把嵌套的异步调用,改写成了顺序执行的事件处理。

更多的方式,请大家参考这篇文章,它提出了解决异步的五种模式:回调、观察者模式(事件)、消息、Promise和有限状态机(FSM)。

下一代Javscript对异步编程的增强

ECMAScript6

下一代的Javascript标准Harmony,也就是ECMAScript6正在酝酿中,它提出了许多新的语言特性,比如箭头函数、类(Class)、生成器(Generator)、Promise等等。其中Generator和Promise都可以被用于对异步调用的增强。

Nodejs的开发版V0.11已经可以支持ES6的一些新的特性,使用node --harmony命令来运行对ES6的支持。

co、Thunk、Koa

koa是由Express原班人马(主要是TJ)打造,希望提供一个更精简健壮的nodejs框架。koa依赖ES6中的Generator等新特性,所以必须运行在相应的Nodejs版本上。

利用Generator、coThunk,可以在Koa中有效的解决Javascript异步调用的各种问题。

co是一个异步流程简化的工具,它利用Generator把一层层嵌套的调用变成同步的写法。

var co = require('co');
var fs = require('fs');

var stat = function(path) {
  return function(cb){
    fs.stat(path,cb);
  }
};

var readFile = function(filename) {
  return function(cb){
    fs.readFile(filename,cb);
  }
};

co(function *() {
  var stat = yield stat('./README.md');
  var content = yield readFile('./README.md');
})();

 

通过co可以把异步的fs.readFile当成同步一样调用,只需要把异步函数fs.readFile用闭包的方式封装。

利用Thunk可以进一步简化为如下的code, 这里Thunk的作用就是用闭包封装异步函数,返回一个生成函数的函数,供生成器来调用。

var thunkify = require('thunkify');
var co = require('co');
var fs = require('fs');

var stat = thunkify(fs.stat);
var readFile = thunkify(fs.readFile);

co(function *() {
  var stat = yield stat('./README.md');
  var content = yield readFile('./README.md');
})();

 

利用co可以串行或者并行的执行异步调用。

串行

co(function *() {
  var a = yield request(a);
  var b = yield request(b);
})();

 

并行

co(function *() {
 var res = yield [request(a), request(b)];
})();

 

更多详细的内容,大家可以参考这两篇文章12

总结

异步编程带来的问题在客户端Javascript中并不明显,但随着服务器端Javascript越来越广的被使用,大量的异步IO操作使得该问题变得明显。许多不同的方法都可以解决这个问题,本文讨论了一些方法,但并不深入。大家需要根据自己的情况选择一个适于自己的方法。

同时,随着ES6的定义,Javascript的语法变得越来越丰富,更多的功能带来了很多便利,然而原本简洁,单一目的的Javascript变得复杂,也要承担更多的任务。Javascript何去何从,让我们拭目以待。

 

Python 与 Javascript 之比较

2014年5月13日 02:58

最近由于工作的需要开始开发一些Python的东西,由于之前一直在使用Javascript,所以会不自觉的使用一些Javascript的概念,语法什么的,经常掉到坑里。我觉得对于从Javascript转到Python,有必要总结一下它们之间的差异。

基本概念

PythonJavascript都是脚本语言,所以它们有很多共同的特性,都需要解释器来运行,都是动态类型,都支持自动内存管理,都可以调用eval()来执行脚本等等脚本语言所共有的特性。

然而它们也有很大的区别,Javascript这设计之初是一种客户端的脚本语言,主要应用于浏览器,它的语法主要借鉴了C,而Python由于其“优雅”,“明确”,“简单”的设计而广受欢迎,被应用于教育,科学计算,web开发等不同的场景中。

编程范式

Python和Javascript都支持多种不同的编程范式,在面向对象的编程上面,它们有很大的区别。Javascript的面向对象是基于原型(prototype)的, 对象的继承是由原型(也是对象)创建出来的,由原型对象创建出来的对象继承了原型链上的方法。而Python则是中规中矩的基于类(class)的继承,并天然的支持多态(polymophine)。

OO in Pyhton 

class Employee:
   'Common base class for all employees'
   empCount = 0 ##类成员

   def __init__(self, name, salary):
      self.name = name
      self.salary = salary
      Employee.empCount += 1
   
   def displayCount(self):
     print "Total Employee %d" % Employee.empCount

   def displayEmployee(self):
      print "Name : ", self.name,  ", Salary: ", self.salary
## 创建实例
ea = Employee("a",1000)
eb = Employee("b",2000)

OO in Javascript

var empCount = 0;
//构造函数
function Employee(name, salary){
    this.name = name;
    this.salary = salary;   
    this.empCount += 1;
}

Employee.prototype.displayCount = function(){
    console.log("Total Employee " + empCount );
}

Employee.prototype.displayEmployee = function(){
    console.log("Name " + this.name + ", Salary " + this.salary );
}
//创建实例
var ea = new Employee("a",1000);
var eb = new Employee("b",2000);

因为是基于对象的继承,在Javascript中,我们没有办法使用类成员empCount,只好声明了一个全局变量,当然实际开发中我们会用更合适的scope。注意Javascript创建对象需要使用new关键字,而Python不需要。

除了原生的基于原型的继承,还有很多利用闭包或者原型来模拟类继承的Javascript OO工具,因为不是语言本身的属性,我们就不讨论了。

线程模型

在Javascript的世界中是没有多线程的概念的,并发使用过使用事件驱动的方式来进行的, 所有的JavaScript程序都运行在一个线程中。在HTML5中引入web worker可以并发的处理任务,但没有改变Javascript单线程的限制。

Python通过thread包支持多线程。

不可改变类型 (immutable type)

在Python中,有的数据类型是不可改变的,也就意味着这种类型的数据不能被修改,所有的修改都会返回新的对象。而在Javascript中所有的数据类型都是可以改变的。Python引入不可改变类型我认为是为了支持线程安全,而因为Javascript是单线程模型,所以没有必要引入不可改变类型。

当然在Javascript可以定义一个对象的属性为只读。

var obj = {};Object.defineProperty(obj, "prop", {
    value: "test",
    writable: false});

在ECMAScript5的支持中,也可以调用Object的freeze方法来是对象变得不可修改。

Object.freeze(obj)


数据类型

Javascript的数据类型比较简单,有object、string、boolean、number、null和undefined,总共六种

Python中一切均为对象,像module、function、class等等都是。

Python有五个内置的简单数据类型bool、int、long、float和complex,另外还有容器类型,代码类型,内部类型等等。

布尔

Javascript有true和false。Python有True和False。它们除了大小写没有什么区别。

字符串

Javascript采用UTF16编码。

Python使用ASCII码。需要调用encode、decode来进行编码转换。使用u作为前缀可以指定字符串使用Unicode编码。

数值

Javascript中所有的数值类型都是实现为64位浮点数。支持NaN(Not a number),正负无穷大(+/-Infiity)。

Python拥有诸多的数值类型,其中的复数类型非常方便,所以在Python在科研和教育领域很受欢迎。这应该也是其中一个原因吧。Python中没有定义NaN,除零操作会引发异常。

列表

Javascript内置了array类型(array也是object)

Python的列表(List)和Javascript的Array比较接近,而元组(Tuple)可以理解为不可改变的列表。

除了求长度在Python中是使用内置方法len外,基本上Javascript和Python都提供了类似的方法来操作列表。Python中对列表下标的操作非常灵活也非常方便,这是Javascript所没有的。例如l[5:-1],l[:6]等等。

字典、哈希表、对象

Javascript中大量的使用{}来创建对象,这些对象和字典没有什么区别,可以使用[]或者.来访问对象的成员。可以动态的添加,修改和删除成员。可以认为对象就是Javascript的字典或者哈希表。对象的key必须是字符串。

Python内置了哈希表(dictS),和Javascript不同的是,dictS可以有各种类型的key值。

空值

Javascript定义了两种空值。 undefined表示变量没有被初始化,null表示变量已经初始化但是值为空。

Python中不存在未初始化的值,如果一个变量值为空,Python使用None来表示。

Javascript中变量的声明和初始化

v1;
v2 = null;
var v3;
var v4 = null;
var v5 = 'something';

在如上的代码中v1是全局变量,未初始化,值为undefined;v2是全局变量,初始化为空值;v3为局部未初始化变量,v4是局部初始化为空值的变量;v5是局部已初始化为一个字符处的变量。

Python中变量的声明和初始化

v1 = None
v2 = 'someting'

Python中的变量声明和初始化就简单了许多。当在Python中访问一个不存在的变量时,会抛出NameError的异常。当访问对象或者字典的值不存在的时候,会抛出AttributeError或者KeyError。因此判断一个值是否存在在Javascript和Python中需要不一样的方式。

Javascript中检查某变量的存在性:

if (!v ) {
    // do something if v does not exist or is null or is false
}

if (v === undefined) {
    // do something if v does not initialized
}

注意使用!v来检查v是否初始化是有歧义的因为有许多种情况!v都会返回true

Python中检查某变量的存在性:

try:
    v
except NameError
    ## do something if v does not exist

在Python中也可以通过检查变量是不是存在于局部locals()或者全局globals()来判断是否存在该变量。

类型检查

Javascript可以通过typeof来获得某个变量的类型:

typeof in Javascript 的例子:

typeof 3 // "number"
typeof "abc" // "string"
typeof {} // "object"
typeof true // "boolean"
typeof undefined // "undefined"
typeof function(){} // "function"
typeof [] // "object"
typeof null // "object"

要非常小心的使用typeof,从上面的例子你可以看到,typeof null居然是object。因为javscript的弱类型特性,想要获得更实际的类型,还需要结合使用instanceof,constructor等概念。具体请参考这篇文章

Python提供内置方法type来获得数据的类型。

>>> type([]) is list
True
>>> type({}) is dict
True
>>> type('') is str
True
>>> type(0) is int
True

同时也可以通过isinstance()来判断类的类型

class A:
    pass
class B(A):
    pass
isinstance(A(), A)  # returns True
type(A()) == A      # returns True
isinstance(B(), A)    # returns True
type(B()) == A        # returns False

但是注意Python的class style发生过一次变化,不是每个版本的Python运行上述代码的行为都一样,在old style中,所有的实例的type都是‘instance’,所以用type方法来检查也不是一个好的方法。这一点和Javascript很类似。

自动类型转换

当操作不同类型一起进行运算的时候,Javascript总是尽可能的进行自动的类型转换,这很方便,当然也很容易出错。尤其是在进行数值和字符串操作的时候,一不小心就会出错。我以前经常会计算SVG中的各种数值属性,诸如x,y坐标之类的,当你一不小心把一个字符串加到数值上的时候,Javascript会自动转换出一个数值,往往是NaN,这样SVG就完全画不出来啦,因为自动转化是合法的,找到出错的地方也非常困难。

Python在这一点上就非常的谨慎,一般不会在不同的类型之间做自动的转换。

语法

风格

Python使用缩进来决定逻辑行的结束非常具有创造性,这也许是Python最独特的属性了,当然也有人对此颇具微词,尤其是需要修改重构代码的时候,修改缩进往往会引起不小的麻烦。

Javascript虽然名字里有Java,它的风格也有那么一点像Java,可是它和Java就好比雷峰塔和雷锋一样,真的没有半毛钱的关系。到时语法上和C比较类似。这里必须要提到的是coffeescript作为构建与Javascript之上的一种语言,采用了类似Python的语法风格,也是用缩进来决定逻辑行。

Python风格

def func(list):
    for i in range(0,len(list)):
        print list[i]

Javascript风格

function funcs(list) {
    for(var i=0, len = list.length(); i < len; i++) {
        console.log(list[i]);
    }
}

从以上的两个代码的例子可以看出,Python确实非常简洁。

作用范围和包管理

Javascript的作用域是由方法function来定义的,也就是说同一个方法内部拥有相同的作用域。这个严重区别与C语言使用{}来定义的作用域。Closure是Javascript最有用的一个特性。

Python的作用域是由module,function,class来定义的。

Python的import可以很好的管理依赖和作用域,而Javascript没有原生的包管理机制,需要借助AMD来异步的加载依赖的js文件,requirejs是一个常用的工具。

赋值逻辑操作符

Javascript使用=赋值,拥有判断相等(==)和全等(===)两种相等的判断。其它的逻辑运算符有&& 和||,和C语言类似。

Python中没有全等,或和与使用的时and 和 or,更接近自然语言。Python中没有三元运算符 A :B ?C,通常的写法是

(A and B) or C

因为这样写有一定的缺陷,也可以写作

 B if A else C

Python对赋值操作的一个重要的改进是不允许赋值操作返回赋值的结果,这样做的好处是避免出现在应该使用相等判断的时候错误的使用了赋值操作。因为这两个操作符实在太像了,而且从自然语言上来说它们也没有区别。

++运算符

Python不支持++运算符,没错你再也不需要根据++符号在变量的左右位置来思考到底是先加一再赋值呢还是先赋值再加一。

连续赋值

利用元组(tuple),Python可以一次性的给多个变量赋值

(MONDAY, TUESDAY, WEDNESDAY, THURSDAY, FRIDAY, SATURDAY, SUNDAY) = range(7)

函数参数

Python的函数参数支持命名参数和可选参数(提供默认值),使用起来很方便,Javascript不支持可选参数和默认值(可以通过对arguments的解析来支持)

def info(object, spacing=10, collapse=1):
    ... ...

其它

立即调用函数表达式 (IIFE

Javascript的一个方便的特性是可以立即调用一个刚刚声明的匿名函数。也有人称之为自调用匿名函数。

下面的代码是一个module模式的例子,使用闭包来保存状态实现良好的封装。这样的代码可以用在无需重用的场合。

var counter = (function(){
    var i = 0;
    return {
        get: function(){
            return i;
            },
        set: function( val ){
            i = val;
            },
        increment: function() {
            return ++i;
            }
        };
    }());

Python没有相应的支持。

生成器和迭代器(Generators & Iterator)

在我接触到的Python代码中,大量的使用这样的生成器的模式。

Python生成器的例子

# a generator that yields items instead of returning a list
def firstn(n):
    num = 0
    while num < n:
        yield num
        num += 1
  
sum_of_first_n = sum(firstn(1000000))

Javascript1.7中引入了一些列的新特性,其中就包括生成器和迭代器。然而大部分的浏览器除了Mozilla(Mozilla基本上是在自己玩,下一代的Javascript标准应该是ECMAScript5)都不支持这些特性

Javascript1.7 迭代器和生成器的例子。

function fib() {
  var i = 0, j = 1;
  while (true) {
    yield i;
    var t = i;
    i = j;
    j += t;
  }
};

var g = fib();
for (var i = 0; i < 10; i++) {
  console.log(g.next());
}

列表(字典、集合)映射表达式 (List、Dict、Set Comprehension)

Python的映射表达式可以非常方便的帮助用户构造列表、字典、集合等内置数据类型。

下面是列表映射表达式使用的例子:

>>> [x + 3 for x in range(4)]
[3, 4, 5, 6]
>>> {x + 3 for x in range(4)}
{3, 4, 5, 6}
>>> {x: x + 3 for x in range(4)}
{0: 3, 1: 4, 2: 5, 3: 6}

Javascript1.7开始也引入了Array Comprehension

var numbers = [1, 2, 3, 4];
var doubled = [i * 2 for (i of numbers)];

Lamda表达式 (Lamda Expression )

Lamda表达式是一种匿名函数,基于著名的λ演算。许多语言诸如C#,Java都提供了对lamda的支持。Pyhton就是其中之一。Javascript没有提供原生的Lamda支持。但是有第三方的Lamda包。

g = lambda x : x*3

装饰器(Decorators)

Decorator是一种设计模式,大部分语言都可以支持这样的模式,Python提供了原生的对该模式的支持,算是一种对程序员的便利把。

Decorator的用法如下。

@classmethod
def foo (arg1, arg2):
    ....

更多decorator的内容,请参考https://wiki.python.org/moin/PythonDecorators

本人对Javascript和Python的认识有限,欢迎大家提出宝贵意见。


软件中的质量属性(二)

2014年4月22日 19:28

现在我们接着上一次的话题来看看其它的质量属性。
互操作性 (Interoperability
互操作性指的是系统内或者系统之间不同的组件可以有效地进行信息交换,通常是以服务(Service)的形式来进行的。互操作性的关键因素包括通信协议,接口定义,数据格式的定义等等,而标准化是实现互操作性的重要手段。
实现互操作性的主要挑战有以下这些方面:
  • 系统内部或者和已有的旧系统(legacy system)之间的数据定义不一致
  • 系统的边界模糊,模块之间耦合严重,导致数据冗余
  • 缺乏标准,或者各方对标准的实现和认识不一致
我现在所在的商务智能团队的总架构师(Chief Architect)就一直在部门间推动对数据文档统一格式标准的定义和实现。这本身对于我们产品内部的互操作性是非常有必要的,然而BI的团队分布在各个大洲(主要是德国,法国,加拿大,中国和印度),每个部门对各自产品优先级的认识不一致,在加上对旧系统兼容性的要求,这项工作的进展非常非常的缓慢。BI的各个产品仍然很难互操作。
我之前开发过通信网管,当时做的产品是统一网管平台,就是把各个厂商(华为,中兴,朗讯等等)的电信设备统一的管理起来。当时已经有了相当成熟的电信网管理标准(TMF,ISO等标准)和技术标准(Q3,Corba)。然而理解的不同,厂商对标准的实现千奇百怪,所以实际上需要给每一个厂商定制不同的接口适配器。我当时就在负责一些这样的接口开发。
面向服务的架构(SOA)曾经是一个非常热门的词汇,现在似乎不怎么提起了。我司当年曾经大张旗鼓的宣传SOA。其实这样的架构能够解决的一个主要的问题就在能够把企业内部各种已有系统通过暴露标准服务接口的方式有效的协调起来。
为了实现互操作性,我们一般需要
  • 定义标准的数据格式和语义
  • 定义标准的服务接口,并使用基于服务的架构
  • 设计高内聚,低耦合的模块以获得最大的灵活性可重用性
可维护性 (Maintainability
可维护性有两个不同的角度,一个是指从软件用户和运维人员的角度,另一个是从软件开发人员的角度。
从用户和运维人员的角度,软件的可维护性是指软件是不是容易安装,升级,打补丁,有了问题是不是容易修复,能不能很容易的获得支持。
从开发人员的角度,软件的可维护性是指软件的架构是不是清楚简单,代码是不是容易阅读,有了问题是不是容易定位错误的原因,有没有可以提供帮助的文档,等等。
软件系统可维护性的主要问题有:
  • 模块之间紧耦合导致无法或很难对单独的模块进行修改,替换和升级
  • 在高层直接使用底层协议和接口,导致无法替换物理设备实体
  • 没有有效的分层和责任的划分,导致一个肿大的模块以及巨大的代码出现在同一个文件甚至函数中。
  • 没有帮助和设计文档
  • 为了兼容旧系统而不得不同时存在两个以上的复杂的代码栈甚至技术不同的实现
知道了问题,改进的建议是非常明显的,我要讨论的是一个关于代码可读性的有趣话题。
很多人都认为,代码是写给人看的,它碰巧也能被计算机读懂。所以我们应该像写文学作品那样写代码。碰巧我也非常的赞同这样的观点。然而,最近的一篇文章提到另一种观点,代码不是文学作品,我们不是阅读而是评审。这篇文章也许能够帮助你改善代码评审。不管代码是不是文学作品,写出容易阅读的代码对于提高软件的可维护性的好处是不言而喻的。

性能(Performance
性能也许是软件开发中最被重视的质量属性,也是最特殊的一个,从它的英文名字中不以(-ility)为后缀,我们可以看到他的特殊性。我们通常以系统执行某操作所需要的响应时间(latency)或者在某单位时间所能完成的任务的数量(throughput)来定义性能指标。
性能和其它的质量属性的相关性很高,有一些会对性能产生正面影响,有一些则是负面的。
记得我当年参加一个关于C++性能优化的培训,有一道算法,要求大家试着用最快的方式实现。因为C++中的指针操作按数组索引访问要快,于是当时最快的一个解决方案是用了一大推复杂指针访问来实现算法。然而这样的代码很难维护,而且容易出错。所以为提高性能就牺牲了可维护性。
一般而言,计算机提供了许多的资源,包括CPU,内存,硬盘等等,提高性能的核心就是充分利用这些资源。要保证对资源的使用是正确和有效的。通常提高性能的考虑包括:

  • 利用缓存(空间换时间)
  • 设计高效的资源共享,多线程,多进程,锁
  • 异步
  • 减少模块见得信息传递
  • 使接口设计传递最小所需的信息
  • 增加系统的可伸缩性,是系统能够有效的部署在分布式的资源上
另外我们还需要考虑另一个性能,就是程序员实现功能的性能。随着软件的发展,现在的程序员可以更高效的实现功能需求。一方面编程语言和方法在不断进步,另一方面大量的可重复使用的组件,服务,开源的库使得想在实现同样的功能的时间和需要的开发人员的数量比以前极大的缩小了。whatsapp以区区55人的团队开发出价值190亿美元,拥有4.5亿用户的软件产品,这在以前是难以想象的。所以软件架构设计者应该把软件的开发效率看成是更重要的性能指标。

可重用性 (Reusability
老板一般都非常重视可重用性,因为如果把软件代码看成产品,那么如果该产品如果只能用一次就扔掉,那他显然是很不开心的,因为这是一笔很糟糕的投入。在我的软件开发生涯这么多年以来,我体验的软件的可重用性都不是很高,也许是我大多在大型的软件企业服务有关。大企业的特点就是团队非常多,产品非常丰富,老板经常更换。每一个新的老板上台后,面临一大堆功能技术各异的系统都非常的不happy,于是整合在所难免,如何重用已有的系统来实现一个新的,大一统的新产品成了重中之重。然而这并不比找到宇宙中的终极大一统理论更简单。最终的整合结果往往并不能有效的重用已有的系统,当老板因为各种原因离开时,我们会发现,对新的老板来说,整合的任务变得更加艰巨了,因为,又有一个新的系统需要整合了。
提高可重用性的一个最主要的原则就是避免重复,“Don't repeat yourself!”我想大家应该非常熟悉了,这里就不多说了。
在成熟度不高的开发团队中(很不幸,我们大多数人都处在这样的团队中),对代码的重用很难,其实只有人才是最可靠的可重用组件,人离开了,所有的可重用性也就跟着离开了。

伸缩性(Scalability)
伸缩性要求软件系统能够跟着所需处理的工作量相应的伸缩。例如如果计算机是多CPU多核心的,软件是否能够相应的利用到这些计算资源。另一个方面就是软件是不是能够部署到分布式的网络,有效的利用网络中的每一个节点的资源。
有两个方向的伸缩,垂直和水平。
在垂直方向的伸缩(scale up)是指提高单节点的处理能力,比如提高CPU主频和内核数,增大内存,增大磁盘容量等等。SAP的HANA就是一个典型的垂直方向的伸缩。
在水平方向的伸缩(scale out)通常是指通过并发和分布的方式来增加节点以提高处理能力。Hadoop就是一个很好地水平伸缩的例子。
设计高伸缩性的软件时,我们可以考虑
  • 避免有状态的服务或组件
  • 使用配置文件决定组件的的部署和关系
  • 考虑支持数据的分区
  • 避免统一层的责任跨越不同的物理实体
在云时代,软件的伸缩性越发重要。

可测试性(Testability)
可测试性顾名思义就是指软件是否容易测试。
什么样的软件是不可测试的呢?举个例子来说,我们曾经开发过一个数据可视化的组件,就是把数据以图表的形式展现出来。有一种数据可视化的类型使用力导向的算法(force-directed)把数据以网络拓扑图的形式展现出来,该算法使用一些随机的参数来模拟节点的初始位置,并通过迭代计算生成最终layout的结果。也就是说每次的layout结果都是不一样的。测试团队对这样的算法很不满意,他们认为这样的实现是无法测试的,因为当时我们的测试主要以比对图形为基础,也就是生成一个正确的图形为基准,每次测试都会把输出的图形和基准图形进行比较,如果不一致则认为出错或者要修改基准。随机算法虽然从功能上讲并没有错误,但是在这样的测试方法下是无法满足可测试性的要求的。最后,开发团队修改了算法,使得每次的初始位置未固定位置来解决这个问题。
David Catlett提出了一个SOCK模型可以有效地帮助我们了解可测试性的要素
  • Simple
    代码越简单越容易测试。
  • Observable
    软件系统应该是可观测,无法观测也就无法衡量
  • Control
    软件系统应该是可以控制的,尽可能多的把控制权暴露给测试模块。
  • Knowledge
    测试人员或者模块需要更多地理解被测试模块,理解的越多也就越容易测试(白盒测试)
软件质量属性的每一个方面都有很多的内容,我们只能浅尝而止,而且仍然有许多重要的质量属性我们还没有涉及到,有时间的话,我们以后再说。