itsmikej 发布的文章

PHP 通过 Thrift 操作 Hbase

Hbase1.0之后的版本中没有提供Thrift源码,需要到官网下载。地址

生成PHP文件:

thrift -gen php <PATH>/Hbase.thrift

启动Hbase和Hbase的Thrift守护程序

<PATH-TO-HBASE>/bin/start-hbase.sh
<PATH-TO-HBASE>/bin/hbase-daemon.sh start thrift

Thrift默认会监听9090端口,下面是一个常用操作的例子。命名为index.php,目录结构为:

gen-php/
lib/
index.php

Hbase里的数据结构如下,表名是blog,包含了articleauthor两个列族。

hbase(main):015:0> scan 'blog'
ROW          COLUMN+CELL
 1           column=article:content, timestamp=1434687037710, value=Hello,World
 1           column=article:tags, timestamp=1434533143824, value=Hadoop,HBase,NoSQL
 1           column=article:title, timestamp=1434533099302, value=Head First HBase
 1           column=author:name, timestamp=1434533489863, value=itsmikej1
 1           column=author:nickname, timestamp=1434687081713, value=\xE5\xB0\x8F\xE9\xBB\x91
1 row(s) in 0.0410 seconds

index.php

<?php
/**
 * Hbase操作
 *
 * @author itsmikej
 * @link https://mikej.me
 */

require_once __DIR__.'/lib/Thrift/ClassLoader/ThriftClassLoader.php';

use Thrift\ClassLoader\ThriftClassLoader;

$loader = new ThriftClassLoader();
$loader->registerNamespace('Thrift', __DIR__ . '/lib');
# $loader->registerDefinition("tService", realpath(__DIR__ . "/gen-php"));
$loader->register();

use Thrift\Protocol\TBinaryProtocol;
use Thrift\Transport\TSocket;
use Thrift\Transport\TBufferedTransport;

require_once __DIR__ . "/gen-php/Hbase/Hbase.php";
require_once __DIR__ . "/gen-php/Hbase/Types.php";


try {
    $socket = new TSocket('127.0.0.1', '9090');
    $socket->setSendTimeout(10000);
    $socket->setRecvTimeout(20000);
    $transport = new TBufferedTransport($socket);
    $protocol = new TBinaryProtocol($transport);
    $client = new Hbase\HbaseClient($protocol);
    $transport->open();

    $tables = $client->getTableNames();
    $table = current($tables);

    $mutations = array();
    $rowKey = '2';
    $row = array(
        'article' => array(
            'content' => 'Hello,Thrift',
            'tags' => 'Hbase',
            'title' => 'Thrift Test'
        ),
        'author' => array(
            'name' => 'tom',
            'nickname' => '小白',
            'sex' => 'man'  # 列可以动态扩展
        )
    );

    foreach ($row as $column_family => $data) {
        foreach ($data as $qualifier => $content) {
            $info = array(
                'column' => $column_family. ':' .$qualifier,
                'value' => $content
            );
            $mutations[] = new \Hbase\Mutation($info);
        }
    }

    # 添加数据
    $client->mutateRow($table, $rowKey, $mutations, array());


    $rows = array(
        '3' => $row,
        '4' => $row,
    );
    $row['author']['sex'] = 'woman';
    $rows['5'] = $row;

    # 批量添加
    $batchRecord = array();
    foreach ($rows as $row_key => $row) {
        $mutations = array();
        foreach ($row as $column_family => $data) {

            foreach ($data as $qualifier => $content) {
                $info = array(
                    'column' => $column_family . ':' . $qualifier,
                    'value' => $content
                );
                $mutations[] = new \Hbase\Mutation($info);
            }
        }
        $batchRecord[] = new \Hbase\BatchMutation(array(
            'row' => $row_key,
            'mutations' => $mutations
        ));

    }
    $client->mutateRows($table, $batchRecord, array());


    # 删除数据
    $client->deleteAllRow($table, $rowKey, array());


    # 查询数据
    $ret = $client->getRow($table, $rowKey, array());
    var_dump($ret, $ret[0]->columns['author:name']->value);


    # 扫描数据
    $scan = $client->scannerOpenWithStop($table, 1, 3, array(), array());
    $ret = $client->scannerGetList($scan, 2);
    var_dump($ret);


    # 过滤器
    $filter = array();
    $filter[] = "SingleColumnValueFilter('author', 'name', =, 'binary:tom')";
    $filter[] = "SingleColumnValueFilter('author', 'sex', =, 'binary:woman')";
    $filterString = implode(" AND ", $filter);
    $scanFilter = new \Hbase\TScan();
    $scanFilter->filterString = $filterString;
    $scanFilter->startRow = 1;
    $scanFilter->stopRow = 6;
    # $scanFilter->columns = array('column' => 'author'); # 指定返回列族

    $scan = $client->scannerOpenWithScan($table, $scanFilter, array());
    $ret = $client->scannerGetList($scan, 2);
    var_dump($ret);

    $transport->close();

} catch (Exception $e) {
    var_dump($e->getMessage());
}

参考

SingleColumnValueFilter过滤器在手册里的说明参数位置是错误的,正确的应该为:

Syntax: SingleColumnValueFilter(‘<family>’, ‘<qualifier>’, <compare operator>, ‘<comparator>’, ‘<filterIfColumnMissing_boolean>, <latest_version_boolean>’)
Syntax: SingleColumnValueFilter(‘<family>’, ‘<qualifier>, <compare operator>, ‘<comparator>’)
Example: “SingleColumnValueFilter (‘FamilyA’, ‘Column1’, <=, ‘abc’,‘true, false’)”
Example: “SingleColumnValueFilter (‘FamilyA’, ‘Column1’, <=, ‘abc’)”

使用 Thrift

Thrift是一个用于不同语言间通讯的开发框架。

首先用Thrift的语法定义好交互细节,命名为hello.thrift

namespace php hello

enum SexType {
  MALE = 1,
  FEMALE = 2
}

struct User {
  1: string firstname,
  2: string lastname,
  3: i32 user_id = 0,
  4: SexType sex,
  5: bool active = false,
  6: optional string description
}

exception InvalidValueException {
  1: i32 error_code,
  2: string error_msg
}

service UserExchange {
  void ping(),
  i32 add_user(1:User u) throws (1: InvalidValueException e),
  User get_user(1:i32 uid) throws (1: InvalidValueException e),
  oneway void clear_list()
}

生成php文件

thrift -r --gen php hello.thrift

生成python文件

thrift -r --gen py hello.thrift

python服务端,命名为python_server.py

#!/usr/bin/env python

import sys
sys.path.append('./gen-py')

from hello import UserExchange
from hello.ttypes import *

from thrift.transport import TSocket
from thrift.transport import TTransport
from thrift.protocol import TBinaryProtocol
from thrift.server import TServer

users = []

class UserManagerHandler:
    def __init__(self):
        pass
        #self.log = {}

    def ping(self):
        print 'ping()'

    def add_user(self, user):
        if user.firstname == None:
            raise InvalidValueException(1,'no firstname exception')
        if user.lastname == None:
            raise InvalidValueException(2, 'no lastname exception')
        if user.user_id <= 0:
            raise InvalidValueException(3, 'wrong user_id')
        if user.sex != SexType.MALE and user.sex != SexType.FEMALE:
            raise InvalidValueException(4, 'wrong sex id')
        print 'Processing user '+user.firstname+' '+user.lastname
        users.append(user)
        print users
        return True

    def get_user(self, user_id):
        if user_id < 0:
            raise InvalidValueException(5, 'wrong id')
        return users[user_id]
        
    def clear_list(self):
        print 'Clearing list'
        print users
        del users [:]
        print users



handler = UserManagerHandler()
processor = UserExchange.Processor(handler)
transport = TSocket.TServerSocket(port=9090)
tfactory = TTransport.TBufferedTransportFactory()
pfactory = TBinaryProtocol.TBinaryProtocolFactory()

server = TServer.TSimpleServer(processor, transport, tfactory, pfactory)

# You could do one of these for a multithreaded server
#server = TServer.TThreadedServer(processor, transport, tfactory, pfactory)
#server = TServer.TThreadPoolServer(processor, transport, tfactory, pfactory)

print 'Starting the server...'
server.serve()
print 'done.'

php客户端,命名为php_client.php

<?php

require_once __DIR__.'/lib/Thrift/ClassLoader/ThriftClassLoader.php';

use Thrift\ClassLoader\ThriftClassLoader;

$loader = new ThriftClassLoader();
$loader->registerNamespace('Thrift', __DIR__ . '/lib');
$loader->registerDefinition("tService", realpath(__DIR__ . "/gen-php"));
$loader->register();

use Thrift\Protocol\TBinaryProtocol;
use Thrift\Transport\TSocket;
use Thrift\Transport\THttpClient;
use Thrift\Transport\TBufferedTransport;
use Thrift\Exception\TException;

require_once __DIR__ . "/gen-php/hello/userExchange.php";
require_once __DIR__ . "/gen-php/hello/Types.php";

try {

    $socket = new TSocket('localhost', 9090);
    $transport = new TBufferedTransport($socket, 1024, 1024);
    $protocol = new TBinaryProtocol($transport);
    $client = new \hello\userExchangeClient($protocol);

    $transport->open();
    $client->ping();
    $u = new \hello\User();
    $u->user_id = 1;
    $u->firstname = 'John';
    $u->lastname = 'Smith';
    $u->sex = \hello\SexType::MALE;
    if ($client->add_user($u))
    {
        echo 'user added succesfully</br>'."\n";
    }
    
    var_dump($client->get_user(0));
    
    $client->clear_list();
    
    
    
    $u2 = new \hello\User();
    $client->add_user($u2);
    
} catch (\hello\InvalidValueException $e) {
    echo $e->error_msg."<br/>\n";
}

最后先运行python_server.py启动服务,然后执行php_client.php,一切正常的话会server端会看到如下数据

Starting the server...
ping()
Processing user John Smith
[User(user_id=1, description=None, firstname='John', lastname='Smith', sex=1, active=False)]
Clearing list
[User(user_id=1, description=None, firstname='John', lastname='Smith', sex=1, active=False)]
[]

客户端会看到

user added succesfully</br>
class hello\User#9 (6) {
  public $firstname =>
  string(4) "John"
  public $lastname =>
  string(5) "Smith"
  public $user_id =>
  int(1)
  public $sex =>
  int(1)
  public $active =>
  bool(false)
  public $description =>
  NULL
}
no firstname exception<br/>

这就表明数据已经正确的请求和响应了。

参考

Moonrise 开发框架

简介

Moonrise是我用业余时间写的一个PHP框架,借鉴了其他很多框架的思想,比如CILaravel,当然使用了一些优秀第三方Composer包。为了能让框架尽快跑起来,很多细节处理得不够好,目前还在完善中。

Github地址:https://github.com/itsmikej/Moonrise。欢迎Fork和Pull Request

核心功能

  • MVC
  • 路由自动映射(支持web和cli)
  • 数据过滤
  • 异常和错误处理
  • 数据驱动支持Mysqli和PDO, 以及Memcache, MongoDB, Redis
  • ORM使用Active RecordEloquent也是可以的)
  • ......

环境要求

  • PHP >= 5.4.0
  • Composer(以及库中所需扩展)
  • Mysqli, PDO扩展

后记

  • Moonrise是因为小时候在夏天喜欢洗完澡躺在院子里的凉铺上边扇扇子边看星空和月亮直到睡着。。。

迟到的2014年终总结

昨天刚开完年会,运气还不错,得了个普照奖(已哭...),晚上回家路上突然心里一阵惆怅,想了很多,看来还是有必要记录一下。

2014年是我职业生涯的第一年,第一次给公司写代码,第一次写的代码有被成千上万的人访问到,虽然不是什么核心的项目,但心里还是挺兴奋的!记得刚开始每次commit代码的时候都手抖,害怕出了什么差错。不过到目前为止,情况还不错。也遇到很多困难,身边的同事都帮助我挺多,P总也给了我很多的时间去适应,只是不擅言辞没说什么,但我还是发自内心的很感激他们的关照和支持!

也没少填坑,首先就是工具,公司使用SVN做版本控制,我用的是Linux开发环境,没有合适的SVN图形界面软件,每次都要SSH到开发环境去提交代码,很容易出错,直到发现了Rabbitvcs!之前习惯用Sublime编辑器,但貌似对于超大的项目,Sublime就有些力不从心了。换用了PHPStorm之后,代码追踪,本地VCS,动态模版,版本高亮。。。各种酸爽,谁用谁知道。公司的开发环境是统一的,不用自己去搭建。但要做自己的东西,还得自己去搭建开发环境。虽然只用安装一次,但心里还是在默默的祈祷,别报错啊!可那些奇葩error总会出现,各种依赖关系也是挺折腾人的。直到使用了Vagrant这个工具,搭配上虚拟机,简直就是神器,首先找到合适的box,比如Laravel官方的Homestead,所有的配置都集中在一个配置文件里,安装也很简单。合适的工具总能让效率提高不少。

一说到工作相关,总能巴拉巴拉说很多。但说实在的,心里总感觉不踏实。心里总有什么东西压抑着。在很多人眼里,程序员就是一个死板、宅、聚会中的冷场先生,虽然不认可也不承认,但仔细想想,自己多多少少会有一些,在一些社交场合,总觉得很累,甚至感到烦躁。我不知道感觉不踏实和压抑是不是因为这个,应该会有一些关联。

每到年底的时候,我们总要感谢很多人,要感谢家人,领导,同事,朋友。但是,却忘了感谢自己,不管这一年过得怎么样,收获多与少,也许我们都应该感谢下自己过去一年所做的努力和贡献,为自己、为家人、也为公司,正是因为这一点点的贡献,才让我们的生活变得有意义和充满期待。

2015,继续加油!

常用的设计模式总结

策略模式

作用

  • 将一组特定行为的算法封装成类,以适应不同的上下文环境,避免了if-else这样的的硬编码

使用场景

  • 电商网站,不同的用户显示不同的广告,传统方式使用if-else

代理(proxy)模式

作用

  • 可以实现业务和代码的分离。

适用场景

  • 典型的使用场景是数据库的主从库的读取和写入操作
  • 可以讲这些操作封装到一个proxy类中,避免选择主从的代码业务代码中出现

- 阅读剩余部分 -