76. 归并排序

归并排序是典型的分治算法,它不断地将某个数组分为两个部分,分别对左子数组与右子数组进行排序,然后将两个数组合并为新的有序数组。

一般来说,小文件单机版排序比并发排序更快一些,但是大文件无法一次性读入内存处理,需要分块处理。单机版排序无法做到分布式排序,分布式排序需要联网进行,网络传输跟通道传输非常类似,稍作修改就可以做到在网络上进行排序。

并行和并发是有区别的,依赖操作系统和硬件是否支持。同时,在允许的情况下,使用有缓冲的通道和使用文件读写缓冲,有利于加快程序的处理速度,避免及时阻塞。

数组排序

对一个整数数组进行排序,系统库中已经提供了排序函数,直接使用即可。需要注意的是,Go中使用通道,使用不当会造成死锁。需要注意通道的使用,否则一运行就发生竞争条件造成程序无法执行而终止。

// 按升序排序
func InMemSort(in <-chan int) <-chan int {
    out := make(chan int, 4096)

    // 启动一个协程输出排序数据
    go func() {
        var a []int
        for v := range in {
            a = append(a, v)
        }
        fmt.Println("Read done: ", time.Now().Sub(startTime))

        sort.Ints(a)
        fmt.Println("Sort done: ", time.Now().Sub(startTime))

        // 将排序好的数据输出至通道中
        for _, n := range a {
            out <- n
        }

        close(out)
    }()

    return out
}

通常来说,协程的使用是成对出现的,并且习惯用Go匿名函数来操作。需要留意匿名函数的传参问题,可以去官方文档进行查看。go程运行时,函数调用是瞬间返回的。如果在其它地方使用range操作来处理通道,在通道数据发送完毕时必须显示关闭通道,否则必是死锁错误。

合并两个有序数组

归并排序是分别进行的,每一块排序完成后,需要等待另外的块排序完毕后,才可继续进行。通道能很方便的做到这个操作,每一个读通道都是阻塞的,在未读取到数据之前,始终等待。

// 按升序合并两个通道中数据
func Merge(in1, in2 <-chan int) <-chan int {
    out := make(chan int, 4096)

    go func() {
        v1, ok1 := <-in1
        v2, ok2 := <-in2
        for ok1 || ok2 {
            if !ok2 || (ok1 && v1 <= v2) {
                out <- v1
                v1, ok1 = <-in1
            } else {
                out <- v2
                v2, ok2 = <-in2
            }
        }

        close(out)
        fmt.Println("Merge done: ", time.Now().Sub(startTime))
    }()

    return out
}

合并两个有序数组时,也要遵循一定的顺序合并,最终的数据才是有顺序的。这块是程序执行最难的一处,无法并行排序,只能一次合并有序数组。

递归合并数组

可变通道的递归合并,一般也是按中点切分,区间内进行递归合并。

// 递归进行两两归并
func MergeN(ins ...<-chan int) <-chan int {
    // 只有一个通道时直接返回
    if len(ins) == 1 {
        return ins[0]
    }

    // 取中点
    m := len(ins) / 2

    // 递归归并
    return Merge(MergeN(ins[:m]...), MergeN(ins[m:]...))
}

如果只有一个通道时不需要递归,直接退出即可。需要注意的是,递归合并始终传递可变通道类型。

从源读取数据

Go习惯用法是接口类型,即可读数据源均可,包括文件,网络,甚至各种输入类型,而不是简单的FileSource形式,而是更广泛意义上的可读源。

// 从可读位置读入数据
// 读入指定长度的数据, 为-1时一次性全部读完
func ReadSource(reader io.Reader, chunkSize int) <-chan int {
    out := make(chan int, 4096)

    go func() {
        buffer := make([]byte, 8)
        size := 0
        for {
            n, err := reader.Read(buffer)
            size += n
            if n > 0 {
                v := int(binary.BigEndian.Uint64(buffer))
                out <- v
            }
            if err != nil || (chunkSize != -1 && size >= chunkSize) {
                break
            }
        }

        close(out)
    }()

    return out
}

以二进制的方式读数据,同时使用有缓冲的读操作,加快文件读取速度。如果尺寸为-1时完全一次性读取文件,否则读取指定大小的文件。文件读取有一个偏移量,默认从偏移量处开始往后读取到指定的尺寸。

写数据到文件

写文件基本都类似,也是按二进制方式写入数据到文件。

// 向可写输入写入数据
func WriteSink(writer io.Writer, in <-chan int) {
    for m := range in {
        buffer := make([]byte, 8)
        binary.BigEndian.PutUint64(buffer, uint64(m))
        writer.Write(buffer)
    }
}

该处写文件不需要另起协程,当然也可以起协程,效果不明显。

生成样本数据

生成指定大小的随机整数数据来测试用。

// 生成指定数量的随机整数
func RandomSource(count int) <-chan int {
    out := make(chan int)

    go func() {
        for i := 0; i < count; i++ {
            out <- rand.Int()
        }

        close(out)
    }()

    return out
}

生成简单整数即可。

分片读取文件内容

分片的实现,其实就是指定一次性读取多大文件内容而已。实现方式很多,一般就是读取等大小的块,该处需要注意的是,文件的完整性,如果读取的内容是不完整,或者被错误分隔的,将造成数据的不完整或错误。

// 可读入资源切片
func create(fileName string, fileSize, count int) <-chan int {
    chunkSize := fileSize / count

    var result []<-chan int
    for i := 0; i < count; i++ {
        file, err := os.Open(fileName)
        if err != nil {
            panic(err)
        }
        // 设置文件偏移量, 并从偏移量开始0处读取文件内容
        file.Seek(int64(i*chunkSize), 0)

        p := util.ReadSource(bufio.NewReader(file), chunkSize)

        result = append(result, util.InMemSort(p))
    }

    return util.MergeN(result...)
}

该处实现文件切片并归并排序,也是最主要的外部排序实现方式。注意文件偏移量的概念和读方式。

将排序数据写入文件

排序完成后将数据写入结果文件中,便于后续直接处理有序的文件内容。

// 将排序好的数据写入文件中
func write(in <-chan int, fileName string) {
    file, err := os.Create(fileName)
    if err != nil {
        panic(err)
    }
    defer file.Close()

    w := bufio.NewWriter(file)
    defer w.Flush()

    util.WriteSink(w, in)
}

缓冲写数据需要注意,可能缓冲区未满,文件就写结束,需要强制刷新缓冲区,避免丢失未来得及写入的部分数据。

打印排序结果

文件过大时全部数据可能无法输出,内存不够,故只输出一部分数据即可。

// 输出文件内容
func echo(fileName string) {
    file, err := os.Open(fileName)
    if err != nil {
        panic(err)
    }
    defer file.Close()

    p := util.ReadSource(file, -1)
    n := 0
    for m := range p {
        fmt.Println(m)
        n++
        if n >= 100 {
            break
        }
    }
}

排序一个800M的整数数据,文件分4块,可以看到读取,内部排序都比较快,比较慢的部分是归并部分,归并部分需要每个数据进行比较,故速度会稍微慢一些。

GOROOT=/usr/local/go #gosetup
GOPATH=/Users/zhgxun/go #gosetup
/usr/local/go/bin/go build -i -o /private/var/folders/9g/jnc134qd1q109tbwlrqx724m0000gn/T/___go_build_merge_go /Users/zhgxun/go/src/notes/go/merge.go #gosetup
/private/var/folders/9g/jnc134qd1q109tbwlrqx724m0000gn/T/___go_build_merge_go #gosetup
Read done:  6.999545181s
Read done:  7.66175118s
Read done:  8.459462212s
Read done:  8.816146958s
Sort done:  19.166359357s
Sort done:  19.4937898s
Sort done:  19.999469296s
Sort done:  20.299377746s
Merge done:  44.900010813s
Merge done:  44.900058988s
Merge done:  44.90163168s
30554741346
217339476874
223812424683
......

完整实例