Hadoop streaming 排序工具初探

通常使用 KeyFieldBasePartitionerKeyFieldBaseComparator 来对用户输出key进行排序和分桶。

基本概念

Partition:分桶,用户输出的key经过partition分发到不同的reduce里,因而partitioner就是分桶器,一般用平台默认的hash分桶,也可以自己指定。

Key:是需要排序的字段,相同Partition && 相同key的行排序到一起。

bin/hadoop streaming \
-input /tmp/comp-test.txt \
-output /tmp/xx -mapper cat -reducer cat \
-jobconf stream.num.map.output.key.fields=2 \
-jobconf stream.map.output.field.separator=. \
-jobconf mapred.reduce.tasks=5

上面案例中map输出中会按.分割,以前两列作为key,相同的key会分到同一个reduce中。

stream.num.map.output.key.fields 设置map输出的前几个字段作为key
stream.map.output.field.separator 设置map输出的字段分隔符

KeyFieldBasePartitioner

该配置主要用于分桶

map.output.key.field.separator 设置key内的字段分隔符
num.key.fields.for.partition 设置key内前几个字段用来做partition
mapred.text.key.partitioner.options 设置key内某个字段或者某个字段范围用做partition(优先级比num.key.fields.for.partition低)

KeyFieldBaseComparator

该配置主要用于排序

mapred.text.key.comparator.options 设置key中需要比较的字段或字节范围

hadoop-skeleton

最近工作中跑mapreduce的统计需求越来越多,而多个任务之间缺乏有效的组织管理,导致的结果就是目录结构散乱,大量的代码重复。所以我简单封装了下 hadoop streaming 任务流程,只需要做一些简单的配置即可跑一个新的任务。

项目地址:https://github.com/itsmikej/hadoop-skeleton

参考

http://www.dreamingfish123.info/?p=1102

Head First C 笔记 - 静态库与动态库

静态库与动态库

共享.h头文件

有三种方式

  • 放入标准目录,比如/usr/local/include,系统.h头文件位于/usr/include
  • 使用绝对路径
  • gcc -I(大写i)指定头文件 gcc -I/my_header_file_path ...

共享.o目标文件(静态库)

跟共享.h头文件一样,.o文件同样也可以使用完整路径来共享。

如果目标文件太多,可以使用ar命令创建目标文件.a存档文件

ar -rcs libsafe.a checksum.o encrypt.o

r表示存在则更新,c表示忽略反馈信息,s表示创建索引,注意名称必须是lib***.a的格式,编译时使用-l指定库名。然后将.a文件保存在库目录中,通常是/usr/local/lib,或者其他目录,如果是其他目录,编译时需要使用-L指定路径,比如:

gcc test_code.c -L/my_lib -lsafe -o test_code

可以使用nm命令查看存档中的内容:

nr libsafe.a

动态库

静态库的缺陷是编译时链接,每次修改都需要重新构建整个程序。动态库则不一样,它是在运行时动态链接。可以看成是加强版的目标文件。

假设有如下文件目录,这是一个跑步机的显示器demo程序,main.c会调用hfcal.c或者hfcal_UK.c中的函数,显示用户的运动情况。hfcal.c和hfcal_UK.c是不同地区的驱动代码,区别是展现的单位不一样。

├── hfcal.c
├── hfcal_UK.c
├── include
│ └── hfcal.h
├── libs
└── main.c

main.c

#include <stdio.h>
#include <hfcal.h>
int main()
{
    display_calories(115.1, 11.2, 0.9);
    return 0;
}

hfcal.h

void display_calories(float weight, float distance, float coeff);

hfcal.c

#include <stdio.h>
#include <hfcal.h>
void display_calories(float weight, float distance, float coeff)
{
    printf("weight: %3.2f lbs\n", weight);
    printf("distance: %3.2f miles\n", distance);
    printf("calories burned: %4.2f cal\n", coeff);
}

hfcal_UK.c

#include <stdio.h>
#include <hfcal.h>

void display_calories(float weight, float distance, float coeff)
{
    printf("weight: %3.2f kg\n", weight / 2.2046);
    printf("distance: %3.2f km\n", distance * 1.609344);
    printf("calories burned: %4.2f cal\n", coeff * weight * distance);
}

首先创建hfcal.c的动态链接库:

gcc -Iinclude -fPIC -c hfcal.c -o hfcal.o // 创建目标文件
gcc -shared hfcal.o -o ./libs/libhfcal.so // 创建动态链接库
gcc main.c -Iinclude -Llibs -lhfcal -o main // 使用
./main

第一步创建目标文件,其实就比之前多了一个-fPIC参数,它告诉编译器创建跟未知无关的代码。第二步使用-shared参数将.o文件转化为动态链接库。最后使用其实跟使用静态库一样,同样使用-l参数指定库名。

现在的目录结构为:

├── hfcal.c
├── hfcal.o
├── hfcal_UK.c
├── include
│ └── hfcal.h
├── libs
│ └── libhfcal.so
├── main
└── main.c

最后还可以测试一下,将hfcal_UK.c编译为动态库,替换之前的库。

gcc -Iinclude -fPIC -c hfcal_UK.c -o hfcal_UK.o
mv libs/libhfcal.so libs/libhfcal.so.bak // 备份
gcc -shared hfcal_UK.o -o ./libs/libhfcal.so
./main

这样main不用重新编译,就能使用新的代码了,因为它实在运行时动态链接的。

PS:gcc常用命令导航

  • -c预编译
  • -I 指定头文件 (include) 目录
  • -L 指定库目录
  • -l 指定库
  • -o 重命名生成文件
  • -fPIC 创建与位置无关代码
  • -shared 生成动态链接库

Head First C 笔记 - 高级函数

高级函数

函数指针

继续看例子,这个案例用于给不同的人发送不同的消息。

#include <stdio.h>

enum response_type {
    DUMP, SECOND_CHANGE, MARRAGE
};

typedef struct {
    char *name;
    enum response_type type;
} response;

void dump(response r)
{
    printf("Dear: %s ", r.name);
    puts("dump");
}

void second_change(response r)
{
    printf("Dear: %s ", r.name);
    puts("second_change");
}

void marrage(response r)
{
    printf("Dear: %s ", r.name);
    puts("marrage");
}

void (*replies[])(response) = {dump, second_change, marrage};

int main()
{
    response res[] = {
        {"Mike", DUMP}, {"Tom", SECOND_CHANGE}, {"Jim", MARRAGE}, {"KangKang", SECOND_CHANGE}
    };

    int i;
    int len;

    len = sizeof(res) / sizeof(response); // 计算数组长度
    for (i = 0; i < len; i++) {
        (replies[res[i].type])(res[i]);
        // switch(res[i].type) {
        // case DUMP:
        // dump(res[i]);
        // break;
        // case SECOND_CHANGE:
        // second_change(res[i]);
        // break;
        // default:
        // marrage(res[i]);
        // break;
        // }
    }
    return 0;
}

函数名就是指向函数的指针,但跟指针又不完全相同,在底层,函数名是L-value,指针是R-value。比如:int类型的指针可以表示成int *,但函数指针不能写成function *,因为在C语言中没有函数类型,函数的类型是由参数,返回值这些东西组合定义的。所以函数指针的定义更复杂,如下:

返回类型(*指针变量)(参数类型)

上面案例中最核心的是这样代码:void (*replies[])(response) = {dump, second_change, marrage};,它创建了一个函数指针的数组,优化了后面的switch调用。

排序

#include <stdio.h>
#include <string.h>
#include <stdio.h>

int compare_scores(const void* score_a, const void* score_b)
{
    int a = *(int*)score_a;
    int b = *(int*)score_b;
    return a - b;
}

int compare_names(const void* a, const void* b)
{
    char** sa = (char**)a;
    char** sb = (char**)b;
    return strcmp(*sa, *sb);
}

int main(int argc, char const *argv[])
{
    int scores[] = {2,4,1,9,8};
    int i;
    // 升序
    qsort(scores, 5, sizeof(int), compare_scores);
    for (i = 0; i < 5; i++) {
        printf("%i ", scores[i]);
    }

    // 字典序
    char *names[] = {"Karen", "Mark", "Ada", "Brett"};
    qsort(names, 4, sizeof(char*), compare_names);
    for (i = 0; i < 4; i++) {
        printf("%s ", names[i]);
    }

    return 0;
}

有了函数指针,我们就可以将函数当成参数传递了,qsort()函数用于排序,它会接收一个比较器函数指针,用于判断两个数据的大小。它的定义如下:

qsort(void *array, size_t length, size_t item_size, int (*compare)(const void *, const void *))

最后一个参数是比较器函数指针,注意其中的参数void *可以保存任意类型的指针。

由于比较器的参数类型是void *,所以需要做下类型转换。上面案例中的int a = *(int*)score_a;char** sa = (char**)a;,就是将void指针转换为相应类型的变量,然后再做比较。

可变参数

可变参数函数使用到了标准库中的(预处理会被替换成其他代码),位于stdarg.h中。

#include <stdio.h>
#include <stdarg.h>

void print_ints(int args, ...)
{
    va_list ap;
    va_start(ap, args);
    int i;
    for (int i = 0; i < args; ++i) {
        printf("%i\n", va_arg(ap, int));
    }
    va_end(ap);
}

int main(int argc, char const *argv[])
{
    print_ints(3, 111, 222, 333);
    return 0;
}
  • args报错了参数的数量,va_list用于保存传过来的其他参数
  • va_start说明可变参数从哪里开始
  • va_arg用于读取参数
  • va_end用于销毁va_list

Head First C 笔记 - 动态存储

动态存储

很多场景是在程序运行前你不知道自己需要多少存储空间,比如网页响应前你就不知道需要多少存储空间。所以要在运行时动态创建,这个操作通常在上进行。

下面这个案例展示了程序运行时动态创建一个叫island的链表,包括了内存的申请和释放,最后打印显示。

#include <stdio.h>
#include <string.h>

typedef struct {
    char *name;
    char *opens;
    char *closes;
    struct island *next;
} island;

void display(island *start)
{
    island *i = start;
    for (; i != NULL; i = i->next) {
        printf("Island: %s", i->name);
    }
}

island* create(char *name)
{
    island *i = malloc(sizeof(island));
    i->name = strdup(name); // 这里使用了strdup函数,避免了潜在的共享指针错误p284
    i->opens = "09:00";
    i->closes = "17:00";
    return i;
}

void release(island *start)
{
    island *i = start;
    island *next = NULL;
    for (; i != NULL; i = next) {
        next = i->next;
        free(i->name);
        free(i);
    }
}

int main(int argc, char const *argv[])
{
    char name[50];
    island *start = NULL;
    island *i = NULL;
    island *next = NULL;
    for (; fgets(name, 50, stdin) != NULL; i = next) {
        next = create(name);
        if (start == NULL)
            start = next;
        if (i != NULL)
            i->next = next;
    }
    display(start);
    return 0;
}

注意几点:

  • 递归的结构体必须要使用typedef创建一个别名,所以才可以在结构体中定义:struct island *next;
  • 程序在运行时动态创建了链表,使用malloc()申请了堆内存,使用free()释放
  • 使用strdup()复制字符串,避免字符串共享指针导致的数据错误

Head First C 笔记 - 结构,联合与位字段

结构,联合与位字段

结构体,联合与位字段可以组合起来描述这个复杂的世界。

结构体

看例子学习:

#include <stdio.h>

typedef struct {
    const char *food;
} preferences;

typedef struct {
    const char *name;
    int age;
    preferences care;
} fish;

void inc_age(fish *f)
{
    // 效果相同
    (*f).age++;
    f->age++;
}

int main()
{
    fish snoppy = {"Snoppy", 4, {"Meat"}};
    inc_age(&snoppy);
    printf("%s - %i - %s", snoppy.name, snoppy.age, snoppy.care.food);
}

注意几点:

  • 使用typedef关键字创建结构别名
  • (*f).age != *f.age,因为*f.age == *(f.age),你懂的
  • (*f).age === f->age,这样更易读

联合,枚举与位字段

继续看案例:

#include <stdio.h>

// 枚举
typedef enum {
    COUNT, POUNDS, PINTS
} unit_of_measure;

// 联合
typedef union {
    short count;
    float weight;
    float volume;
} quantity;

typedef struct {
    const char *name;
    const char *country;
    unsigned int is_new:1; // 位字段,可节省空间
    quantity amount;
    unit_of_measure units;
} fruit_order;


int main()
{
    fruit_order apples = {"apple", "China", 1, .amount.weight=4.2, POUNDS};
    printf("%2.2f\n", apples.amount.weight);
    if (apples.units == POUNDS) {
        printf("bingo!");
    }
    return 0;
}

注意几点:

  • union用来定义一种叫"量"的类型,然后用户自己决定需要使用那个字段。p246
  • 当定义联合时,计算机只会为其中最大的字段分配空间
  • 由于我们不知道在联合中具体存了什么类型的值,这个案例中用枚举来标记了存储的类型
  • 位字段可以节省空间,位数可调