Hadoop streaming 排序工具初探

通常使用 KeyFieldBasePartitionerKeyFieldBaseComparator 来对用户输出key进行排序和分桶。

基本概念

Partition:分桶,用户输出的key经过partition分发到不同的reduce里,因而partitioner就是分桶器,一般用平台默认的hash分桶,也可以自己指定。

Key:是需要排序的字段,相同Partition && 相同key的行排序到一起。

bin/hadoop streaming \
-input /tmp/comp-test.txt \
-output /tmp/xx -mapper cat -reducer cat \
-jobconf stream.num.map.output.key.fields=2 \
-jobconf stream.map.output.field.separator=. \
-jobconf mapred.reduce.tasks=5

上面案例中map输出中会按.分割,以前两列作为key,相同的key会分到同一个reduce中。

stream.num.map.output.key.fields 设置map输出的前几个字段作为key
stream.map.output.field.separator 设置map输出的字段分隔符

KeyFieldBasePartitioner

该配置主要用于分桶

map.output.key.field.separator 设置key内的字段分隔符
num.key.fields.for.partition 设置key内前几个字段用来做partition
mapred.text.key.partitioner.options 设置key内某个字段或者某个字段范围用做partition(优先级比num.key.fields.for.partition低)

KeyFieldBaseComparator

该配置主要用于排序

mapred.text.key.comparator.options 设置key中需要比较的字段或字节范围

hadoop-skeleton

最近工作中跑mapreduce的统计需求越来越多,而多个任务之间缺乏有效的组织管理,导致的结果就是目录结构散乱,大量的代码重复。所以我简单封装了下 hadoop streaming 任务流程,只需要做一些简单的配置即可跑一个新的任务。

项目地址:https://github.com/itsmikej/hadoop-skeleton

参考

http://www.dreamingfish123.info/?p=1102

Head First C 笔记 - 高级函数

高级函数

函数指针

继续看例子,这个案例用于给不同的人发送不同的消息。

#include <stdio.h>

enum response_type {
    DUMP, SECOND_CHANGE, MARRAGE
};

typedef struct {
    char *name;
    enum response_type type;
} response;

void dump(response r)
{
    printf("Dear: %s ", r.name);
    puts("dump");
}

void second_change(response r)
{
    printf("Dear: %s ", r.name);
    puts("second_change");
}

void marrage(response r)
{
    printf("Dear: %s ", r.name);
    puts("marrage");
}

void (*replies[])(response) = {dump, second_change, marrage};

int main()
{
    response res[] = {
        {"Mike", DUMP}, {"Tom", SECOND_CHANGE}, {"Jim", MARRAGE}, {"KangKang", SECOND_CHANGE}
    };

    int i;
    int len;

    len = sizeof(res) / sizeof(response); // 计算数组长度
    for (i = 0; i < len; i++) {
        (replies[res[i].type])(res[i]);
        // switch(res[i].type) {
        // case DUMP:
        // dump(res[i]);
        // break;
        // case SECOND_CHANGE:
        // second_change(res[i]);
        // break;
        // default:
        // marrage(res[i]);
        // break;
        // }
    }
    return 0;
}

函数名就是指向函数的指针,但跟指针又不完全相同,在底层,函数名是L-value,指针是R-value。比如:int类型的指针可以表示成int *,但函数指针不能写成function *,因为在C语言中没有函数类型,函数的类型是由参数,返回值这些东西组合定义的。所以函数指针的定义更复杂,如下:

返回类型(*指针变量)(参数类型)

上面案例中最核心的是这样代码:void (*replies[])(response) = {dump, second_change, marrage};,它创建了一个函数指针的数组,优化了后面的switch调用。

排序

#include <stdio.h>
#include <string.h>
#include <stdio.h>

int compare_scores(const void* score_a, const void* score_b)
{
    int a = *(int*)score_a;
    int b = *(int*)score_b;
    return a - b;
}

int compare_names(const void* a, const void* b)
{
    char** sa = (char**)a;
    char** sb = (char**)b;
    return strcmp(*sa, *sb);
}

int main(int argc, char const *argv[])
{
    int scores[] = {2,4,1,9,8};
    int i;
    // 升序
    qsort(scores, 5, sizeof(int), compare_scores);
    for (i = 0; i < 5; i++) {
        printf("%i ", scores[i]);
    }

    // 字典序
    char *names[] = {"Karen", "Mark", "Ada", "Brett"};
    qsort(names, 4, sizeof(char*), compare_names);
    for (i = 0; i < 4; i++) {
        printf("%s ", names[i]);
    }

    return 0;
}

有了函数指针,我们就可以将函数当成参数传递了,qsort()函数用于排序,它会接收一个比较器函数指针,用于判断两个数据的大小。它的定义如下:

qsort(void *array, size_t length, size_t item_size, int (*compare)(const void *, const void *))

最后一个参数是比较器函数指针,注意其中的参数void *可以保存任意类型的指针。

由于比较器的参数类型是void *,所以需要做下类型转换。上面案例中的int a = *(int*)score_a;char** sa = (char**)a;,就是将void指针转换为相应类型的变量,然后再做比较。

可变参数

可变参数函数使用到了标准库中的(预处理会被替换成其他代码),位于stdarg.h中。

#include <stdio.h>
#include <stdarg.h>

void print_ints(int args, ...)
{
    va_list ap;
    va_start(ap, args);
    int i;
    for (int i = 0; i < args; ++i) {
        printf("%i\n", va_arg(ap, int));
    }
    va_end(ap);
}

int main(int argc, char const *argv[])
{
    print_ints(3, 111, 222, 333);
    return 0;
}
  • args报错了参数的数量,va_list用于保存传过来的其他参数
  • va_start说明可变参数从哪里开始
  • va_arg用于读取参数
  • va_end用于销毁va_list

Head First C 笔记 - 动态存储

动态存储

很多场景是在程序运行前你不知道自己需要多少存储空间,比如网页响应前你就不知道需要多少存储空间。所以要在运行时动态创建,这个操作通常在上进行。

下面这个案例展示了程序运行时动态创建一个叫island的链表,包括了内存的申请和释放,最后打印显示。

#include <stdio.h>
#include <string.h>

typedef struct {
    char *name;
    char *opens;
    char *closes;
    struct island *next;
} island;

void display(island *start)
{
    island *i = start;
    for (; i != NULL; i = i->next) {
        printf("Island: %s", i->name);
    }
}

island* create(char *name)
{
    island *i = malloc(sizeof(island));
    i->name = strdup(name); // 这里使用了strdup函数,避免了潜在的共享指针错误p284
    i->opens = "09:00";
    i->closes = "17:00";
    return i;
}

void release(island *start)
{
    island *i = start;
    island *next = NULL;
    for (; i != NULL; i = next) {
        next = i->next;
        free(i->name);
        free(i);
    }
}

int main(int argc, char const *argv[])
{
    char name[50];
    island *start = NULL;
    island *i = NULL;
    island *next = NULL;
    for (; fgets(name, 50, stdin) != NULL; i = next) {
        next = create(name);
        if (start == NULL)
            start = next;
        if (i != NULL)
            i->next = next;
    }
    display(start);
    return 0;
}

注意几点:

  • 递归的结构体必须要使用typedef创建一个别名,所以才可以在结构体中定义:struct island *next;
  • 程序在运行时动态创建了链表,使用malloc()申请了堆内存,使用free()释放
  • 使用strdup()复制字符串,避免字符串共享指针导致的数据错误

Head First C 笔记 - 结构,联合与位字段

结构,联合与位字段

结构体,联合与位字段可以组合起来描述这个复杂的世界。

结构体

看例子学习:

#include <stdio.h>

typedef struct {
    const char *food;
} preferences;

typedef struct {
    const char *name;
    int age;
    preferences care;
} fish;

void inc_age(fish *f)
{
    // 效果相同
    (*f).age++;
    f->age++;
}

int main()
{
    fish snoppy = {"Snoppy", 4, {"Meat"}};
    inc_age(&snoppy);
    printf("%s - %i - %s", snoppy.name, snoppy.age, snoppy.care.food);
}

注意几点:

  • 使用typedef关键字创建结构别名
  • (*f).age != *f.age,因为*f.age == *(f.age),你懂的
  • (*f).age === f->age,这样更易读

联合,枚举与位字段

继续看案例:

#include <stdio.h>

// 枚举
typedef enum {
    COUNT, POUNDS, PINTS
} unit_of_measure;

// 联合
typedef union {
    short count;
    float weight;
    float volume;
} quantity;

typedef struct {
    const char *name;
    const char *country;
    unsigned int is_new:1; // 位字段,可节省空间
    quantity amount;
    unit_of_measure units;
} fruit_order;


int main()
{
    fruit_order apples = {"apple", "China", 1, .amount.weight=4.2, POUNDS};
    printf("%2.2f\n", apples.amount.weight);
    if (apples.units == POUNDS) {
        printf("bingo!");
    }
    return 0;
}

注意几点:

  • union用来定义一种叫"量"的类型,然后用户自己决定需要使用那个字段。p246
  • 当定义联合时,计算机只会为其中最大的字段分配空间
  • 由于我们不知道在联合中具体存了什么类型的值,这个案例中用枚举来标记了存储的类型
  • 位字段可以节省空间,位数可调

Head First C 笔记 - 使用多个源文件

使用多个源文件

大型软件通常会拆分源代码为多个小模块,最终将他们编译成一个可执行程序,关注以下几个关键点。

头文件

函数的定义顺序可能会导致编译错误(使用前需要先定义),头文件的作用就是为了做函数声明,让编译器不用做假设,可以防止编译出错。

共享代码

共同的特性,最好能共享代码。所以多个文件要想共享一份代码的话,自然要将共享的代码放在一个单独的.c文件中。那多个文件如何编译呢?

参考如下案例:
encrypt.h

void encrypt(char *message);

encrypt.c

#include "encrypt.h"
// 用于加密字符串
void encrypt(char *message)
{
    while (*message) {
        *message = *message ^ 31;
        message++;
    }
}

message_hider.c

#include <stdio.h>
#include "encrypt.h"

int main()
{
    char msg[80];
    while (fgets(msg, 80, stdin)) {
        encrypt(msg); // 这样通用的代码就能共享了
        printf("%s\n", msg);
    }
}

如何编译呢?很简单,只需要把源文件都传递给gcc即可。
gcc message_hider.c encrypt.c -o message_hider

使用make

首先需要了解下gcc的编译过程。

  1. 预处理:修改代码
  2. 编译:转换成汇编
  3. 汇编:生成目标代码
  4. 链接:放在一起

一个小的改动,不要重新编译全部的文件,耗时耗力。所以我们可以先将c源文件编译成目标代码文件,如果修改了某个源文件,只需要重新编译这个文件,最后链接目标文件即可。

同样是上面的案例,先编译成目标文件:

gcc -c *.c,生成.o文件

然后链接,编译器可以识别这些文件是目标文件:

gcc *.o -o message_hider

ok,这时如果修改了message_hider.c,那么只需要执行gcc -c message_hider.c,然后重新链接就行了。

新的问题,假设有很多源文件,你记不住修改了哪些文件,该怎么办?可以使用make来自动化这个过程。

encrypt.o: encrypt.c encrypt.h
    gcc -c encrypt.c

message_hider.o: message_hider.c encrypt.h
    gcc -c message_hider.c

message_hider: encrypt.o message_hider.o
    gcc encrypt.o message_hider.o -o message_hider

执行make message_hider,它会自动检查源文件的和目标文件的时间戳,如果有变动,就会重新编译。