归并排序是典型的分治算法,它不断地将某个数组分为两个部分,分别对左子数组与右子数组进行排序,然后将两个数组合并为新的有序数组。
一般来说,小文件单机版排序比并发排序更快一些,但是大文件无法一次性读入内存处理,需要分块处理。单机版排序无法做到分布式排序,分布式排序需要联网进行,网络传输跟通道传输非常类似,稍作修改就可以做到在网络上进行排序。
并行和并发是有区别的,依赖操作系统和硬件是否支持。同时,在允许的情况下,使用有缓冲的通道和使用文件读写缓冲,有利于加快程序的处理速度,避免及时阻塞。
数组排序
对一个整数数组进行排序,系统库中已经提供了排序函数,直接使用即可。需要注意的是,Go中使用通道,使用不当会造成死锁。需要注意通道的使用,否则一运行就发生竞争条件造成程序无法执行而终止。
// 按升序排序
func InMemSort(in <-chan int) <-chan int {
out := make(chan int, 4096)
// 启动一个协程输出排序数据
go func() {
var a []int
for v := range in {
a = append(a, v)
}
fmt.Println("Read done: ", time.Now().Sub(startTime))
sort.Ints(a)
fmt.Println("Sort done: ", time.Now().Sub(startTime))
// 将排序好的数据输出至通道中
for _, n := range a {
out <- n
}
close(out)
}()
return out
}
通常来说,协程的使用是成对出现的,并且习惯用Go匿名函数来操作。需要留意匿名函数的传参问题,可以去官方文档进行查看。go程运行时,函数调用是瞬间返回的。如果在其它地方使用range操作来处理通道,在通道数据发送完毕时必须显示关闭通道,否则必是死锁错误。
合并两个有序数组
归并排序是分别进行的,每一块排序完成后,需要等待另外的块排序完毕后,才可继续进行。通道能很方便的做到这个操作,每一个读通道都是阻塞的,在未读取到数据之前,始终等待。
// 按升序合并两个通道中数据
func Merge(in1, in2 <-chan int) <-chan int {
out := make(chan int, 4096)
go func() {
v1, ok1 := <-in1
v2, ok2 := <-in2
for ok1 || ok2 {
if !ok2 || (ok1 && v1 <= v2) {
out <- v1
v1, ok1 = <-in1
} else {
out <- v2
v2, ok2 = <-in2
}
}
close(out)
fmt.Println("Merge done: ", time.Now().Sub(startTime))
}()
return out
}
合并两个有序数组时,也要遵循一定的顺序合并,最终的数据才是有顺序的。这块是程序执行最难的一处,无法并行排序,只能一次合并有序数组。
递归合并数组
可变通道的递归合并,一般也是按中点切分,区间内进行递归合并。
// 递归进行两两归并
func MergeN(ins ...<-chan int) <-chan int {
// 只有一个通道时直接返回
if len(ins) == 1 {
return ins[0]
}
// 取中点
m := len(ins) / 2
// 递归归并
return Merge(MergeN(ins[:m]...), MergeN(ins[m:]...))
}
如果只有一个通道时不需要递归,直接退出即可。需要注意的是,递归合并始终传递可变通道类型。
从源读取数据
Go习惯用法是接口类型,即可读数据源均可,包括文件,网络,甚至各种输入类型,而不是简单的FileSource形式,而是更广泛意义上的可读源。
// 从可读位置读入数据
// 读入指定长度的数据, 为-1时一次性全部读完
func ReadSource(reader io.Reader, chunkSize int) <-chan int {
out := make(chan int, 4096)
go func() {
buffer := make([]byte, 8)
size := 0
for {
n, err := reader.Read(buffer)
size += n
if n > 0 {
v := int(binary.BigEndian.Uint64(buffer))
out <- v
}
if err != nil || (chunkSize != -1 && size >= chunkSize) {
break
}
}
close(out)
}()
return out
}
以二进制的方式读数据,同时使用有缓冲的读操作,加快文件读取速度。如果尺寸为-1时完全一次性读取文件,否则读取指定大小的文件。文件读取有一个偏移量,默认从偏移量处开始往后读取到指定的尺寸。
写数据到文件
写文件基本都类似,也是按二进制方式写入数据到文件。
// 向可写输入写入数据
func WriteSink(writer io.Writer, in <-chan int) {
for m := range in {
buffer := make([]byte, 8)
binary.BigEndian.PutUint64(buffer, uint64(m))
writer.Write(buffer)
}
}
该处写文件不需要另起协程,当然也可以起协程,效果不明显。
生成样本数据
生成指定大小的随机整数数据来测试用。
// 生成指定数量的随机整数
func RandomSource(count int) <-chan int {
out := make(chan int)
go func() {
for i := 0; i < count; i++ {
out <- rand.Int()
}
close(out)
}()
return out
}
生成简单整数即可。
分片读取文件内容
分片的实现,其实就是指定一次性读取多大文件内容而已。实现方式很多,一般就是读取等大小的块,该处需要注意的是,文件的完整性,如果读取的内容是不完整,或者被错误分隔的,将造成数据的不完整或错误。
// 可读入资源切片
func create(fileName string, fileSize, count int) <-chan int {
chunkSize := fileSize / count
var result []<-chan int
for i := 0; i < count; i++ {
file, err := os.Open(fileName)
if err != nil {
panic(err)
}
// 设置文件偏移量, 并从偏移量开始0处读取文件内容
file.Seek(int64(i*chunkSize), 0)
p := util.ReadSource(bufio.NewReader(file), chunkSize)
result = append(result, util.InMemSort(p))
}
return util.MergeN(result...)
}
该处实现文件切片并归并排序,也是最主要的外部排序实现方式。注意文件偏移量的概念和读方式。
将排序数据写入文件
排序完成后将数据写入结果文件中,便于后续直接处理有序的文件内容。
// 将排序好的数据写入文件中
func write(in <-chan int, fileName string) {
file, err := os.Create(fileName)
if err != nil {
panic(err)
}
defer file.Close()
w := bufio.NewWriter(file)
defer w.Flush()
util.WriteSink(w, in)
}
缓冲写数据需要注意,可能缓冲区未满,文件就写结束,需要强制刷新缓冲区,避免丢失未来得及写入的部分数据。
打印排序结果
文件过大时全部数据可能无法输出,内存不够,故只输出一部分数据即可。
// 输出文件内容
func echo(fileName string) {
file, err := os.Open(fileName)
if err != nil {
panic(err)
}
defer file.Close()
p := util.ReadSource(file, -1)
n := 0
for m := range p {
fmt.Println(m)
n++
if n >= 100 {
break
}
}
}
排序一个800M的整数数据,文件分4块,可以看到读取,内部排序都比较快,比较慢的部分是归并部分,归并部分需要每个数据进行比较,故速度会稍微慢一些。
GOROOT=/usr/local/go #gosetup
GOPATH=/Users/zhgxun/go #gosetup
/usr/local/go/bin/go build -i -o /private/var/folders/9g/jnc134qd1q109tbwlrqx724m0000gn/T/___go_build_merge_go /Users/zhgxun/go/src/notes/go/merge.go #gosetup
/private/var/folders/9g/jnc134qd1q109tbwlrqx724m0000gn/T/___go_build_merge_go #gosetup
Read done: 6.999545181s
Read done: 7.66175118s
Read done: 8.459462212s
Read done: 8.816146958s
Sort done: 19.166359357s
Sort done: 19.4937898s
Sort done: 19.999469296s
Sort done: 20.299377746s
Merge done: 44.900010813s
Merge done: 44.900058988s
Merge done: 44.90163168s
30554741346
217339476874
223812424683
......