分类 Programs 下的文章

由并发引出的话题

并发是一个超大的话题,本文做一些简单的记录和讨论。

基本概念

有必要先了解下与并发密切相关的几个概念。

Critical Section 临界区

所谓临界区,就是在程序中一段代码不能被同时执行的部分。比如,两个线程不能同时去访问的程序代码,这段代码就是临界区。在临界区中,通常会有对共享变量读取或者写入的操作。之前我文章中提到的 就是用来保证只有一个线程处于临界区的一种机制,它保证了多个线程对共享变量的读写正常。

Thread Safe 线程安全

wikipedia中的定义:

指某个函数、函数库在多线程环境中被调用时,能够正确地处理多个线程之间的共享变量,使程序功能正确完成。

Concurrency vs Parallelism 并发 vs 并行

两个概念相似,但不同。并发是指,在同一时刻,只有一个任务在运行,只是系统通过某种调度算法,同时调度、来回切换运行两个任务,让它们“看起来”是同时在运行。并行则是说,同一时刻,两个任务“真正的”同时进行。

举个栗子,老板布置给你一些任务,很复杂,但你不能一个人同时做。所以你把任务拆分成一些小活儿,交替着干,这是并发。不过你也可以将任务分给你的同事,这样就能真正的同时做了,这是并行。

体现在计算机系统中,当多个任务执行时,单核cpu通过切换时间片(时间片就是cpu分配给各个任务的时间),将计算资源合理的分配(轮转调度)给各个任务,当时间片结束,或者任务阻塞,则cpu进行切换(好处是不会阻塞其他任务,提升了程序运行效率)。这样多个任务看起来就是同时在进行,这是并发。系统将多个任务分配到多核cpu上,这时多个任务就可以真正的同时运行,这是并行

Golang 中的并发

Golang中通过goroutine实现了并发。如下的几个函数,实现了对goroutine的调度。

  • runtime.Gosched() 出让cpu时间片
  • runtime.NumCPU() 返回当前机器cpu数量
  • runtime.GOMAXPROCS() 设置同时最大的CUP可用核数
  • runtime.Goexit() 退出当前goroutine

下面是官方的一个示例程序,会将hello和world交替输出,因为在say函数中runtime.Gosched()会主动让出时间片,将cpu资源让给另外一个goroutine,这就实现了并发。如果去掉runtime.Gosched()这一行,两个goroutine将会按顺序执行,因为执行时间很短,goroutine不会主动让出时间片。(本地测试发现,在go1.9.2环境下输出并不是完全交替,有时甚至不会输出world,说明最新的编译器的实现有了些变化?)

package main

import (
    "fmt"
    "time"
    "runtime"
)

func say(s string) {
    for i := 0; i < 5; i++ {
        runtime.Gosched()
        // 如果将手动切换时间片的操作改成sleep操作,输出效果一样
        // 因为sleep操作也会触发切换时间片
        // time.Sleep(time.Millisecond)
        fmt.Println(s)
    }
}

func main() {
    // 这里如果强制将最大可用cpu核数设置成1
    // 则hello和world完全交替输出
    // 说明在最新版的golang中,GOMAXPROCS默认已经不是1了 ?
    // runtime.GOMAXPROCS(1)
    go say("world")
    say("hello")
}

再看第二个示例,去除了runtime.Gosched(),同时将最大可用cpu核数设置成了当前系统cpu的核数runtime.GOMAXPROCS(runtime.NumCPU())(我本地是4核),输出结果会出现交替的情况,但不绝对,因为程序执行太快退出了,调大循环次数就能看到效果。这时两个goroutine分别在不同的cpu核心上运行,实现了并行

package main

import (
    "fmt"
    "runtime"
)

func say(s string) {
    for i := 0; i < 5; i++ {
        fmt.Println(s)
    }
}

func main() {
    runtime.GOMAXPROCS(runtime.NumCPU())
    go say("world")
    say("hello")
}

stackoverflow上的一个问题,对golang的中的并发与并行的一些知识点和原理解释得很清楚,值得一看。

实际应用

如果使用Golang,通常我们可以将一个复杂的任务拆分成n个相关性不大的个小任务,然后分别用goroutine去执行,使用channel来进行数据通信,这样就实现了并发。如果拆分得好,我们可以的得到n倍的一个执行效率。至于并行和并发,就交给runtime去管理吧。

当多个goroutine执行时,要注意在临界区我们要使用锁来保证代码的正确执行,这样才能保证线程安全

参考

关于 Golang 中的锁

为什么需要锁

在进行并发编程的时候,通过共享内存的方式进行通信,这时可能就会导致资源的竞争,最终导致出现数据错误。

比如下面这段demo程序,两个goroutine同时运行,循环执行a自增1的操作100000次,正常情况下,a的最终值我们的期望是200000,但结果不是!输出的值会比200000小,这是因为两个goroutine通过共享内存(也就是变量a)的方式进行通信,当goroutine1取到a准备做自增,比如这时a的值是10,这时goroutine2刚好完成了一次自增(a变成11),但goroutine1并不能感知到goroutine2中的a值的变化。所以goroutine1自增完,a的值还是11。导致最终结果“错误”。我们要避免这种情况的话,就需要使用锁。

a := 0
// goroutine1
go func() {
   for i := 0; i < 100000; i++ {
      a += 1
   }
}()
// goroutine2
go func() {
   for i := 0; i < 100000; i++ {
      a += 1
   }
}()
time.Sleep(time.Second) // 等待goroutine执行完成
fmt.Println(a)

Golang 中是如何实现锁的

Golang 中有两种锁,分别是互斥锁(sync.Mutex)和读写锁(sync.RWMutex),我们分别来看。

互斥锁(sync.Mutex)

互斥锁比较简单,当我们对一个goroutine上了互斥锁之后,其他的goroutine就只能乖乖的等待这把锁解锁,使用方法如下:

var mutex sync.Mutex
mutex.Lock() // 加锁
// ...
mutex.UnLock() // 解锁

我们要解决上面demo的问题,就可以使用互斥锁,代码如下,这时输出结果就正确了。

var mutex sync.Mutex
a := 0
// goroutine1
go func() {
   for i := 0; i < 100000; i++ {
      mutex.Lock() // 上互斥锁
      a += 1
      mutex.Unlock() // 解互斥锁
   }
}()
// goroutine2
go func() {
   for i := 0; i < 100000; i++ {
      mutex.Lock()
      a += 1
      mutex.Unlock()
   }
}()
time.Sleep(time.Second)
fmt.Println(a)

读写锁(sync.RWMutex)

通常情况下,互斥锁能满足很多的应用场景了,不过,互斥锁比较耗费资源,会拖慢程序的执行效率。在一些读取操作大大的多于写入操作的场景下,互斥锁就不太适合了,这时我们就需要读写锁。读写锁的使用方式如下:

var rwmutex sync.RWMutex
rwmutex.Lock() // 加写锁
rwmutex.Unlock() // 解写锁
rwmutex.RLock() // 加读锁
rwmutex.RUnlock() // 解读锁

看如下的demo,我模拟了读多写少的场景,mutex函数使用互斥锁,rwmutex函数使用读写锁,其他的读写操作都一样。最终程序的输出了两个函数的执行时间:

mutex time: 109.784714ms
rwmutex time: 36.494956ms

可以看出读多写少的场景下,读写锁比互斥锁的效率更高。

package main

import (
   "fmt"
   "sync"
   "time"
)

func main() {
   mutex()
   rwmutex()
}

// 使用互斥锁
func mutex() {
   var mutex sync.Mutex
   var wg sync.WaitGroup

   bt := time.Now()
   a := 0
   b := 0
   wg.Add(9)
   // 读操作
   for i := 1; i < 10; i++ {
      go func() {
         for i := 0; i < 100000; i++ {
            mutex.Lock()
            b = a
            mutex.Unlock()
         }
         wg.Done()
      }()
   }

   wg.Add(1)
   // 写操作
   go func() {
      for i := 0; i < 100; i++ {
         mutex.Lock()
         a += 1
         mutex.Unlock()
      }
      wg.Done()
   }()

   wg.Wait()
   et := time.Now().Sub(bt)
   fmt.Println("mutex time:", et.String())
}

// 使用读写锁
func rwmutex() {
   var rwmutex sync.RWMutex
   var wg sync.WaitGroup

   bt := time.Now()
   a := 0
   b := 0
   wg.Add(9)
   // 读操作
   for i := 1; i < 10; i++ {
      go func() {
         for i := 0; i < 100000; i++ {
            // 上读锁,多个goroutine可同时获取读锁,这里不会阻塞
            // 但读锁会阻塞其他写锁,直到读锁释放
            rwmutex.RLock()
            b = a
            rwmutex.RUnlock()
         }
         wg.Done()
      }()
   }

   wg.Add(1)
   // 写操作
   go func() {
      for i := 0; i < 100; i++ {
         // 上写锁,会阻塞其他的读锁和写锁,直到写锁释放
         rwmutex.Lock()
         a += 1
         rwmutex.Unlock()
      }
      wg.Done()
   }()

   wg.Wait()
   et := time.Now().Sub(bt)
   fmt.Println("rwmutex time:", et.String())
}

参考

https://golang.org/pkg/sync/
http://legendtkl.com/2016/10/26/rwmutex/

Hadoop streaming 排序工具初探

通常使用 KeyFieldBasePartitionerKeyFieldBaseComparator 来对用户输出key进行排序和分桶。

基本概念

Partition:分桶,用户输出的key经过partition分发到不同的reduce里,因而partitioner就是分桶器,一般用平台默认的hash分桶,也可以自己指定。

Key:是需要排序的字段,相同Partition && 相同key的行排序到一起。

bin/hadoop streaming \
-input /tmp/comp-test.txt \
-output /tmp/xx -mapper cat -reducer cat \
-jobconf stream.num.map.output.key.fields=2 \
-jobconf stream.map.output.field.separator=. \
-jobconf mapred.reduce.tasks=5

上面案例中map输出中会按.分割,以前两列作为key,相同的key会分到同一个reduce中。

stream.num.map.output.key.fields 设置map输出的前几个字段作为key
stream.map.output.field.separator 设置map输出的字段分隔符

KeyFieldBasePartitioner

该配置主要用于分桶

map.output.key.field.separator 设置key内的字段分隔符
num.key.fields.for.partition 设置key内前几个字段用来做partition
mapred.text.key.partitioner.options 设置key内某个字段或者某个字段范围用做partition(优先级比num.key.fields.for.partition低)

KeyFieldBaseComparator

该配置主要用于排序

mapred.text.key.comparator.options 设置key中需要比较的字段或字节范围

hadoop-skeleton

最近工作中跑mapreduce的统计需求越来越多,而多个任务之间缺乏有效的组织管理,导致的结果就是目录结构散乱,大量的代码重复。所以我简单封装了下 hadoop streaming 任务流程,只需要做一些简单的配置即可跑一个新的任务。

项目地址:https://github.com/itsmikej/hadoop-skeleton

参考

http://www.dreamingfish123.info/?p=1102

<Head First C> 读书笔记

最近在重新学习 C 语言,本文是 Head First C 这本书里关于 C 的一些基础知识的总结笔记。

内存存储结构

  • :存储函数创建的变量值(局部变量)
  • :动态存储分配
  • 全局量区:存储函数定义的变量值(全局变量)
  • 常量区:存储只读数据
  • 代码段:存储代码段

指针

数组与指针

  • 数组变量可以被用作指针
  • 数组变量指向数组中的第一个元素
  • 如果把函数参数声明为数组,它会被当称指针处理
  • 指针变量也是变量。。保存的数字,也可以通过 & 获取它的地址

- 阅读剩余部分 -

LuaRocks Tutorial

LuaRocks 是一个 Lua 的包管理工具,提供了一个命令行的方式来管理 Lua 包依赖。官网

安装 Lua 5.1.5

wget http://www.lua.org/ftp/lua-5.1.5.tar.gz
mkdir /usr/local/lua-5.1.5 && cd lua-5.1.5
# 配置 Makefile,指定安装目录,INSTALL_TOP = /usr/local/lua-5.1.5
# aix ansi bsd freebsd generic linux macosx mingw posix solaris 支持的平台
make macosx
make macosx install
ln -sf /usr/local/lua-5.1.5/bin/lua /usr/local/bin/lua

安装 LuaRocks

wget https://luarocks.org/releases/luarocks-2.4.1.tar.gz
tar zxpf luarocks-2.4.1.tar.gz && cd luarocks-2.4.1
mkdir /usr/local/luarocks-2.4.1
./configure --prefix=/usr/local/luarocks-2.4.1 --with-lua=/usr/local/lua-5.1.5
make build  && make install

使用 LuaRocks 安装包

luarocks install dkjson
require("dkjson") 然后就可以使用了

贡献 Rock 包

创建 .rockspec 文件,下面是我写的一个名为 nginx-lua-frequency 的 Lua 模块的案例。

package = "nginx-lua-frequency"
version = "0.1-1"
source = {
   url = "git://github.com/itsmikej/nginx-lua-frequency.git"
}
description = {
   summary = "A frequency module for Nginx written in Lua",
   homepage = "https://github.com/itsmikej/nginx-lua-frequency",
   maintainer = "Jiang Yang<jiangyang33@gmail.com>",
   license = "MIT"
}
dependencies = {
   "lua=5.1",
   "lua-resty-memcached=0.13-0"
}
build = {
   type = "builtin",
   modules = {
      ["frequency"] = "src/frequency.lua",
      ["frequency.adapter.memcached"] = "src/adapter/memcached.lua"
   }
}

保存文件名为 nginx-lua-frequency-0.1-1.rockspec,格式必须是"{package}-{version}.rockspec",最后上传即可:
luarocks upload nginx-lua-frequency-0.1-1.rockspec --api-key ********
执行完这个命令后,会在当前目录生成一个名为 nginx-lua-frequency-0.1-1.src.rock 的二进制文件,其实这个文件就是当前项目的一个压缩包,使用 luarocks install 安装时,其实就是用的这个压缩包。

BTW

nginx-lua-frequency 是一个基于 memcached 的通用频率限制模块,可同时做多个时间维度(秒,分,小时)的请求频率配置。Github链接

参考

https://github.com/luarocks/luarocks/wiki/Documentation
http://www.jianshu.com/p/196b5dad4e78
https://segmentfault.com/a/1190000003920034