分类 Programs 下的文章

Golang中的反射和依赖注入

一切的源起,要从interface{}说起。

接口

在Golang中,我们可以把任意类型赋值给interface{},它的底层由两部分组成,类型(type)值(data),type用于存储变量的动态类型,data用于存储变量的具体数据。

有时候你会在代码中看到下面这种用法。

(*http.ResponseWriter)(nil)

它表示将一个nil转化成了http.ResponseWriter类型的指针,它指向无效的地址,它的存储结构是(*http.ResponseWriter,nil)。通常这样的使用者只关心指针的类型,不关心它具体的存储值,后面你会看到这种用法的一个具体场景。

反射

如果我们知道一个接口的类型,就可以使用断言来访问其内部的值,但如果不知道呢?那就试试反射。

反射是由reflect包提供的,它定义了两个重要的类型: reflect.Typereflect.Value

reflect.Type是一个接口,有许多方法来区分类型以及检查它们的组成部分,例如一个结构体的成员或一个函数的参数等。reflect.Type和接口的类型是动态相关的。reflect.TypeOf方法接受一个接口类型的参数,并返回一个reflect.Type实例。

reflect.Value可以装载任意类型的值。reflect.ValueOf方法接受一个任意的接口类型,并返回一个装载着其动态值的reflect.Value。函数reflect.ValueOf的逆方法是reflect.Value.Interface方法,它返回一个接口。

var x interface{}

x = 1
fmt.Println(reflect.TypeOf(x), reflect.ValueOf(x))
fmt.Println(reflect.ValueOf(x).Interface().(int))

x = "a"
fmt.Println(reflect.TypeOf(x), reflect.ValueOf(x))

依赖注入

我以前很少会用到反射,觉得这个东西很复杂,并且影响性能,直到看到了这个包:https://github.com/go-macaron/inject,这是macaron框架中依赖注入器。最原始的实现在这里:https://github.com/codegangsta/inject,代码基本上差不多。

有了这个包,我们就可以使用反射的方式来调用函数,比如:

package main

import (
    "fmt"
    "github.com/go-macaron/inject"
)

type SpecialString interface{}

func Say(name string, gender SpecialString, age int) {
    fmt.Printf("name:%s, gender:%s, age:%d!\n", name, gender, age)
}

func main() {
    inj := inject.New()
    inj.Map("itsmikej")
    inj.MapTo("男", (*SpecialString)(nil))
    inj.Map(26)
    inj.Invoke(Say)
}

在inject内部,它使用了一个map[reflect.Type]reflect.Value来存储注入的参数,MapMapTo方法用于将参数注入到这个map中,注意上面的MapTo方法,它的第二个参数用来重新设置reflect.Type,避免同类型的值覆盖。如前文提到的,参数之所以是(*SpecialString)(nil),是因为这里我们只需要关心它的类型,也就是用它的reflect.Type来作为map的key。

那么这样调用的好处是什么?

一个典型的应用场景,在一个web服务中,不同的路由下处理的handler不一样,他们依赖的服务也不一样。使用inject可以很方便的将服务注入到handler中。

我们来看看inject在macaron框架中的应用,macaron实现了inject.Injector的接口,所以映射一个服务就变得非常简单:

db := &MyDatabase{}
m := macaron.Classic()
m.Map(db) // Service will be available to all handlers as *MyDatabase
m.Get("/", func(db *MyDatabase) {
    // Operations with db.
})
m.Run()

这里先将db服务注入到map中,最终路由"/"下的handler会被inject包中Invoke方法调用。

看到下面的源码,可以暂时忽略fastInvoke,这是macaron内部的一个快捷调用,Invoke方法调用了callInvoke方法,callInvoke方法会先从map中找到对应的参数,然后用反射的方式来调用这个handler,从而实现了参数的动态注入。

func (inj *injector) Invoke(f interface{}) ([]reflect.Value, error) {
    t := reflect.TypeOf(f)
    switch v := f.(type) {
    case FastInvoker:
        return inj.fastInvoke(v, t, t.NumIn())
    default:
        return inj.callInvoke(f, t, t.NumIn())
    }
}

// callInvoke reflect.Value.Call
func (inj *injector) callInvoke(f interface{}, t reflect.Type, numIn int) ([]reflect.Value, error) {
    var in []reflect.Value
    if numIn > 0 {
        in = make([]reflect.Value, numIn)
        var argType reflect.Type
        var val reflect.Value
        for i := 0; i < numIn; i++ {
            argType = t.In(i)
            val = inj.GetVal(argType)
            if !val.IsValid() {
                return nil, fmt.Errorf("Value not found for type %v", argType)
            }

            in[i] = val
        }
    }
    return reflect.ValueOf(f).Call(in), nil
}

macaron.Classic默认会注入一些服务,包括:

  • *macaron.Context - HTTP 请求上下文
  • *log.Logger - Macaron 全局日志器
  • http.ResponseWriter - HTTP 响应流
  • *http.Request - HTTP 请求对象

所以我们可以直接这样使用:

m.Get("/", func(resp http.ResponseWriter, req *http.Request) {
    resp.WriteHeader(200) // HTTP 200
})
// 或者
m.Get("/", func(ctx *macaron.Context) {
    ctx.Resp.WriteHeader(200) // HTTP 200
})

是不是很酷?

参考

说得可能不是很明白,可以结合下面几篇文章来看看。

https://docs.hacknode.org/gopl-zh/ch7/ch7-05.html
https://docs.hacknode.org/gopl-zh/ch12/ch12-02.html
https://my.oschina.net/goal/blog/195036
https://my.oschina.net/goal/blog/194233
https://go-macaron.com/docs/advanced/custom_services

关于字符编码

很久以前有一群人,他们认为可以用8个小灯泡表示世间万物。每个小灯泡分为亮。。和不亮。。后来这组灯泡(8个)就发展成了计算机中的字节。

ASCII编码

要知道的是一个字节(Byte)8位(bit),每一位有0和1两种值,这样就可以组成256种不同的状态。

所以人们就将1-127号的状态表示不同的字符,包括的"换行","空格",英文和数字字符。这就是ANSI(AMERICAN NATIONAL STANDARDS INSTITUTE,美国国家标准学会)制定的ASCII(American Standard Code for Information Interchange,美国信息互换标准代码)编码方案,用来保存英文字符。后来人们又扩展了128-255号的状态,包括一些新的字符,比如"-","/"什么的。

GB*编码

可惜,等到中国人民使用计算机的时候,已经没有可利用的状态表示汉字了,那我们的6000多个常用汉字怎么存?我们的办法是:把127号后面的字符取消掉,并规定,一个小于127的字符意义于原来一样,两个大于127的字符连在一起就表示一个汉字,前面一个字节(称之为高字节)从0xA1用到0xF7,后面一个字节(低字节)从0xA1到0xFE,这样我们就可以组合出大约7000多个简体汉字了,同时我们还把一些数学符号、罗马字符、日本的假名也都编了进去,连之前有的数字、标点、字母都重新编成了两个字节,这就是所谓的全角字符,原来的127号以下的就称之为半角字符。这种字符就是GB2312,GB2312是对ASCII的扩展,两个字节长的汉字字符和一个字符长的英文字符并存在一套编码方案里。

储存方案如下图:
GB*编码

但中国的汉字实在太多,后来又对GB2312进行了扩展,低字节就没有要求必须大于127了,只要第一个字节大于127就可以固定表示一个汉字,这个编码方案就是GBK,GBK包含了GB2312的所有内容,同时增加了20000个新汉字(包括繁体字)和符号。

后来少数名族也开始使用计算机,我们又对GBK进行了扩充,新的方案叫做GB18030,现在就好了,中国人民的悠久历史可以全部存到计算机中啦。

可是后来问题又来了,全世界很多国家都像中国一样搞了一套自己的编码标准,互相之间一点也不兼容,用国外的软件除非安装一套他们的编码系统,否则就用不了。

肿么办?

UNICODE编码

还好,这时ISO(国际标准化组织)出现了,废弃了所有地区性的编码方案,重新搞了一个包含地球上所有字符的编码!这就是UNICODE,但要注意的是UNICODE并不是一种具体的编码方案,UNICODE只是定义了字符的集合和唯一确定的编号,具体储存为什么样的字节流,取决于字符编码的方案,比如UTF-8和UTF-16,还有上述的GB18030。也就是说,虽然每个字符在UNICODE字符集种能找到唯一的编号,但最终决定字符流的是具体的字符编码。例如,同样的字符"A",UTF-8得到的字节流是0x41,UTF-16得到得是0x00 0x41。

从下图可以看到储存方式的不一样:
储存方式

UTF-8是目前使用得一套最广泛的UNICODE编码,它使用1-4个字节来编码字符,单字节和ASCII一样,对于其他字符,使用2-4个字节表示,比如汉字就用了3个字节表示。

GB18030编码,覆盖了UNICODE所有的字符,因此也算是一种UNICODE编码。只不过他的编码方式并不像UTF-8或者UTF-16一样,将Unicode字符的编号通过一定的规则进行转换,而只能通过查表的手段进行编码。

---- 以上内容写于2014年7月

Golang与字符编码

Go原生支持UNICODE,默认使用UTF-8编码的字符组成其字符串(UTF-8的发明者就是Go的作者,也就是Unix的作者Ken)。Go使用rune来存储字符串,它是int32的别名,因为一个UTF-8字符的长度可能是1,2,3,4个字节,所以如果要统计字符数,就需要计算的是rune的个数而不是字节数了。字符数和字节数只有在是字符串只由ASCII字符组成时才是一样的。

下面是一个中文字符串的示例,它表示出Go中字符串中不同字符占用长度不一样。

str := "你好,世界"
fmt.Println("rune len:", len([]rune(str)))   // 5
fmt.Println("int32 len:", len([]int32(str))) // 5
fmt.Println("Byte len", len(str))            // 15

用Go抓取网页的时候,当页面的编码不是UTF-8时,也许你需要使用 golang.org/x/text/transform 这个包做下转换。

参考

https://zh.wikipedia.org/wiki/字符编码
http://www.ruanyifeng.com/blog/2007/10/ascii_unicode_and_utf-8.html
http://blog.wuxu92.com/golang-character-set-and-encoding

由并发引出的话题

并发是一个超大的话题,本文做一些简单的记录和讨论。

基本概念

有必要先了解下与并发密切相关的几个概念。

Critical Section 临界区

所谓临界区,就是在程序中一段代码不能被同时执行的部分。比如,两个线程不能同时去访问的程序代码,这段代码就是临界区。在临界区中,通常会有对共享变量读取或者写入的操作。之前我文章中提到的 就是用来保证只有一个线程处于临界区的一种机制,它保证了多个线程对共享变量的读写正常。

Thread Safe 线程安全

wikipedia中的定义:

指某个函数、函数库在多线程环境中被调用时,能够正确地处理多个线程之间的共享变量,使程序功能正确完成。

Concurrency vs Parallelism 并发 vs 并行

两个概念相似,但不同。并发是指,在同一时刻,只有一个任务在运行,只是系统通过某种调度算法,同时调度、来回切换运行两个任务,让它们“看起来”是同时在运行。并行则是说,同一时刻,两个任务“真正的”同时进行。

举个栗子,老板布置给你一些任务,很复杂,但你不能一个人同时做。所以你把任务拆分成一些小活儿,交替着干,这是并发。不过你也可以将任务分给你的同事,这样就能真正的同时做了,这是并行。

体现在计算机系统中,当多个任务执行时,单核cpu通过切换时间片(时间片就是cpu分配给各个任务的时间),将计算资源合理的分配(轮转调度)给各个任务,当时间片结束,或者任务阻塞,则cpu进行切换(好处是不会阻塞其他任务,提升了程序运行效率)。这样多个任务看起来就是同时在进行,这是并发。系统将多个任务分配到多核cpu上,这时多个任务就可以真正的同时运行,这是并行

Golang 中的并发

Golang中通过goroutine实现了并发。如下的几个函数,实现了对goroutine的调度。

  • runtime.Gosched() 出让cpu时间片
  • runtime.NumCPU() 返回当前机器cpu数量
  • runtime.GOMAXPROCS() 设置同时最大的CUP可用核数
  • runtime.Goexit() 退出当前goroutine

下面是官方的一个示例程序,会将hello和world交替输出,因为在say函数中runtime.Gosched()会主动让出时间片,将cpu资源让给另外一个goroutine,这就实现了并发。如果去掉runtime.Gosched()这一行,两个goroutine将会按顺序执行,因为执行时间很短,goroutine不会主动让出时间片。(本地测试发现,在go1.9.2环境下输出并不是完全交替,有时甚至不会输出world,说明最新的编译器的实现有了些变化?)

package main

import (
    "fmt"
    "time"
    "runtime"
)

func say(s string) {
    for i := 0; i < 5; i++ {
        runtime.Gosched()
        // 如果将手动切换时间片的操作改成sleep操作,输出效果一样
        // 因为sleep操作也会触发切换时间片
        // time.Sleep(time.Millisecond)
        fmt.Println(s)
    }
}

func main() {
    // 这里如果强制将最大可用cpu核数设置成1
    // 则hello和world完全交替输出
    // 说明在最新版的golang中,GOMAXPROCS默认已经不是1了 ?
    // runtime.GOMAXPROCS(1)
    go say("world")
    say("hello")
}

再看第二个示例,去除了runtime.Gosched(),同时将最大可用cpu核数设置成了当前系统cpu的核数runtime.GOMAXPROCS(runtime.NumCPU())(我本地是4核),输出结果会出现交替的情况,但不绝对,因为程序执行太快退出了,调大循环次数就能看到效果。这时两个goroutine分别在不同的cpu核心上运行,实现了并行

package main

import (
    "fmt"
    "runtime"
)

func say(s string) {
    for i := 0; i < 5; i++ {
        fmt.Println(s)
    }
}

func main() {
    runtime.GOMAXPROCS(runtime.NumCPU())
    go say("world")
    say("hello")
}

stackoverflow上的一个问题,对golang的中的并发与并行的一些知识点和原理解释得很清楚,值得一看。

实际应用

如果使用Golang,通常我们可以将一个复杂的任务拆分成n个相关性不大的个小任务,然后分别用goroutine去执行,使用channel来进行数据通信,这样就实现了并发。如果拆分得好,我们可以的得到n倍的一个执行效率。至于并行和并发,就交给runtime去管理吧。

当多个goroutine执行时,要注意在临界区我们要使用锁来保证代码的正确执行,这样才能保证线程安全

参考

关于 Golang 中的锁

为什么需要锁

在进行并发编程的时候,通过共享内存的方式进行通信,这时可能就会导致资源的竞争,最终导致出现数据错误。

比如下面这段demo程序,两个goroutine同时运行,循环执行a自增1的操作100000次,正常情况下,a的最终值我们的期望是200000,但结果不是!输出的值会比200000小,这是因为两个goroutine通过共享内存(也就是变量a)的方式进行通信,当goroutine1取到a准备做自增,比如这时a的值是10,这时goroutine2刚好完成了一次自增(a变成11),但goroutine1并不能感知到goroutine2中的a值的变化。所以goroutine1自增完,a的值还是11。导致最终结果“错误”。我们要避免这种情况的话,就需要使用锁。

a := 0
// goroutine1
go func() {
   for i := 0; i < 100000; i++ {
      a += 1
   }
}()
// goroutine2
go func() {
   for i := 0; i < 100000; i++ {
      a += 1
   }
}()
time.Sleep(time.Second) // 等待goroutine执行完成
fmt.Println(a)

Golang 中是如何实现锁的

Golang 中有两种锁,分别是互斥锁(sync.Mutex)和读写锁(sync.RWMutex),我们分别来看。

互斥锁(sync.Mutex)

互斥锁比较简单,当我们对一个goroutine上了互斥锁之后,其他的goroutine就只能乖乖的等待这把锁解锁,使用方法如下:

var mutex sync.Mutex
mutex.Lock() // 加锁
// ...
mutex.UnLock() // 解锁

我们要解决上面demo的问题,就可以使用互斥锁,代码如下,这时输出结果就正确了。

var mutex sync.Mutex
a := 0
// goroutine1
go func() {
   for i := 0; i < 100000; i++ {
      mutex.Lock() // 上互斥锁
      a += 1
      mutex.Unlock() // 解互斥锁
   }
}()
// goroutine2
go func() {
   for i := 0; i < 100000; i++ {
      mutex.Lock()
      a += 1
      mutex.Unlock()
   }
}()
time.Sleep(time.Second)
fmt.Println(a)

读写锁(sync.RWMutex)

通常情况下,互斥锁能满足很多的应用场景了,不过,互斥锁比较耗费资源,会拖慢程序的执行效率。在一些读取操作大大的多于写入操作的场景下,互斥锁就不太适合了,这时我们就需要读写锁。读写锁的使用方式如下:

var rwmutex sync.RWMutex
rwmutex.Lock() // 加写锁
rwmutex.Unlock() // 解写锁
rwmutex.RLock() // 加读锁
rwmutex.RUnlock() // 解读锁

看如下的demo,我模拟了读多写少的场景,mutex函数使用互斥锁,rwmutex函数使用读写锁,其他的读写操作都一样。最终程序的输出了两个函数的执行时间:

mutex time: 109.784714ms
rwmutex time: 36.494956ms

可以看出读多写少的场景下,读写锁比互斥锁的效率更高。

package main

import (
   "fmt"
   "sync"
   "time"
)

func main() {
   mutex()
   rwmutex()
}

// 使用互斥锁
func mutex() {
   var mutex sync.Mutex
   var wg sync.WaitGroup

   bt := time.Now()
   a := 0
   b := 0
   wg.Add(9)
   // 读操作
   for i := 1; i < 10; i++ {
      go func() {
         for i := 0; i < 100000; i++ {
            mutex.Lock()
            b = a
            mutex.Unlock()
         }
         wg.Done()
      }()
   }

   wg.Add(1)
   // 写操作
   go func() {
      for i := 0; i < 100; i++ {
         mutex.Lock()
         a += 1
         mutex.Unlock()
      }
      wg.Done()
   }()

   wg.Wait()
   et := time.Now().Sub(bt)
   fmt.Println("mutex time:", et.String())
}

// 使用读写锁
func rwmutex() {
   var rwmutex sync.RWMutex
   var wg sync.WaitGroup

   bt := time.Now()
   a := 0
   b := 0
   wg.Add(9)
   // 读操作
   for i := 1; i < 10; i++ {
      go func() {
         for i := 0; i < 100000; i++ {
            // 上读锁,多个goroutine可同时获取读锁,这里不会阻塞
            // 但读锁会阻塞其他写锁,直到读锁释放
            rwmutex.RLock()
            b = a
            rwmutex.RUnlock()
         }
         wg.Done()
      }()
   }

   wg.Add(1)
   // 写操作
   go func() {
      for i := 0; i < 100; i++ {
         // 上写锁,会阻塞其他的读锁和写锁,直到写锁释放
         rwmutex.Lock()
         a += 1
         rwmutex.Unlock()
      }
      wg.Done()
   }()

   wg.Wait()
   et := time.Now().Sub(bt)
   fmt.Println("rwmutex time:", et.String())
}

参考

https://golang.org/pkg/sync/
http://legendtkl.com/2016/10/26/rwmutex/

Hadoop streaming 排序工具初探

通常使用 KeyFieldBasePartitionerKeyFieldBaseComparator 来对用户输出key进行排序和分桶。

基本概念

Partition:分桶,用户输出的key经过partition分发到不同的reduce里,因而partitioner就是分桶器,一般用平台默认的hash分桶,也可以自己指定。

Key:是需要排序的字段,相同Partition && 相同key的行排序到一起。

bin/hadoop streaming \
-input /tmp/comp-test.txt \
-output /tmp/xx -mapper cat -reducer cat \
-jobconf stream.num.map.output.key.fields=2 \
-jobconf stream.map.output.field.separator=. \
-jobconf mapred.reduce.tasks=5

上面案例中map输出中会按.分割,以前两列作为key,相同的key会分到同一个reduce中。

stream.num.map.output.key.fields 设置map输出的前几个字段作为key
stream.map.output.field.separator 设置map输出的字段分隔符

KeyFieldBasePartitioner

该配置主要用于分桶

map.output.key.field.separator 设置key内的字段分隔符
num.key.fields.for.partition 设置key内前几个字段用来做partition
mapred.text.key.partitioner.options 设置key内某个字段或者某个字段范围用做partition(优先级比num.key.fields.for.partition低)

KeyFieldBaseComparator

该配置主要用于排序

mapred.text.key.comparator.options 设置key中需要比较的字段或字节范围

hadoop-skeleton

最近工作中跑mapreduce的统计需求越来越多,而多个任务之间缺乏有效的组织管理,导致的结果就是目录结构散乱,大量的代码重复。所以我简单封装了下 hadoop streaming 任务流程,只需要做一些简单的配置即可跑一个新的任务。

项目地址:https://github.com/itsmikej/hadoop-skeleton

参考

http://www.dreamingfish123.info/?p=1102