前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
社区首页 >专栏 >Javaer成为Goer的代码提效!Java杀器复刻——Go-Stream

Javaer成为Goer的代码提效!Java杀器复刻——Go-Stream

原创
作者头像
Karos
修改2024-12-20 01:24:55
修改2024-12-20 01:24:55
2701
举报

1. Stream流 浅析

Hello,大家好,我是程序员Karos,从本章开始,我会逐渐带大家复刻Stream到Go中,在里面,我会讲Stream流、异步流给引入,同时会讲一些Sync的运用以及异步同步控制的方式。

1.1 Java中Stream是怎么样的?

Java中的Stream是一种高效的、以声明方式处理数据序列的工具。Stream API允许我们以简单且直观的方式进行数据处理和转换。以下是一个示例代码,阐明了Stream的用法和基本操作:

代码语言:java
复制
package org.example;

import java.util.Arrays;
import java.util.List;
import java.util.stream.Collectors;

public class Main {
    public static void main(String[] args) {
        List<String> names = Arrays.asList("Alice", "Bob", "Charlie", "David");
        // 使用Stream过滤以字母"A"开头的名字并转换为大写
        List<String> filteredNames = names.stream()
                .filter(name -> name.startsWith("A"))
                .map(String::toUpperCase)
                .collect(Collectors.toList());
        System.out.println(filteredNames);
        // 输出: [ALICE]
    }
}

通过这个示例,可以看到如何创建Stream、应用过滤和映射操作,以及最终收集结果。这样的示例有助于提升读者的理解和实践感。

1.2 异步流的理论实现

同理,我们先来看看异步流(parallelStream)是如何用的

代码语言:java
复制
package org.example;

import java.util.Arrays;
import java.util.List;
import java.util.stream.Collectors;

public class Main {
    public static void main(String[] args) {
        List<String> names = Arrays.asList("Alice", "Bob", "Charlie", "David");
        // 使用Stream过滤以字母"A"开头的名字并转换为大写
        List<String> filteredNames = names.parallelStream()
                .filter(name -> name.startsWith("A"))
                .map(String::toUpperCase)
                .collect(Collectors.toList());
        System.out.println(filteredNames);
        // 输出: [ALICE]
    }
}

有部分面霸肯定是知道这个玩意儿的,之前面试米厂的时候被问到过,但是没了解,就按照名字和自己的思路胡扯了一通,没想到大差不差。

那么它是如何实现的呢?(这里不是我们讲解的重点,我直接用AI来进行解释 【诚恳的态度】)

parallelStream()是Java中Stream API提供的一种并行处理数据流的方式。本质上,它是Stream的一种扩展,以实现任务分解与多线程并行处理,提升数据处理的效率。其实现依赖于Fork/Join框架,充分利用多核CPU的性能优势。接下来,我们详细探讨parallelStream()的实现原理。

1. 任务分解和分治思想

在并行流中,每个数据处理任务会被拆分成多个子任务,然后这些子任务可以在不同的CPU核心上同时运行。这种方式运用了经典的"分而治之"思想:

  1. 数据源的划分parallelStream()会根据底层数据源进行划分。例如,对于一个ArrayList,它会将数据分成几个小块;对于TreeSetLinkedList,划分方式可能会更加复杂。
  2. 处理任务的分解:每个子数据块都会作为单独的任务分解出来,通过Fork/Join框架提交到线程池中。
  3. 结果合并:子任务的结果在完成之后会汇总,最终合成完整的输出。

2. Fork/Join框架的驱动

Java的Fork/Join框架是parallelStream实现并行计算的核心。Fork/Join提供了任务分裂(fork)和结果合并(join)的机制,具体步骤如下:

  1. 任务分裂
  2. 对于每个任务,首先判断其数据量是否足够小到可以直接处理。如果任务太大,就将其分裂成更小的任务。
  3. 比如,假设有一个大集合包含100万个元素,可以按CPU核心数量将其划分为4份,每份交由不同的线程处理。
  4. 任务执行
  5. 分裂出来的小任务被提交到ForkJoinPool(公共线程池)中执行。每个线程实现“工作窃取”机制,即:若某线程完成了自己任务队列中的所有任务,会尝试窃取其他线程的任务。
  6. 任务合并
  7. 全部任务完成后,各线程提交其子结果,最后通过递归的方式,逐步将所有子结果合并,得到最终结果。

3. 并行流与顺序流的区别

parallelStream()在性能上比顺序流具有显著优势,尤其是当数据量较大时。然而,并行流的实现引入了更多复杂性和潜在开销,以下是它们的关键区别:

  1. 线程模型
  2. 顺序流(stream)在单线程上运行,任务是串行执行的。
  3. 并行流(parallelStream)会利用线程池(默认是公共的ForkJoinPool)分配多个线程并行运行多个子任务。
  4. 开销
  5. 顺序流的执行开销是线性的,但通常较低。
  6. 并行流引入了线程调度、上下文切换以及输出合并的开销,因此只有数据集较大时,其性能收益才能超出这些开销。
  7. 数据顺序性
  8. 顺序流会严格按照数据源的顺序执行操作。
  9. 并行流不保证顺序,除非显式要求(如forEachOrdered方法)。
  10. 适用场景
  11. 顺序流更适合轻量级、对输出顺序敏感的场景。
  12. 并行流适用于数据量大、任务无状态或线程安全性较高的计算密集型任务。

1.3 异步流Fork/Join实现 - 分片工具尝试

上面写的什么fork、join咱们先别管,什么线程池也别管,Go的GMP就挺不错,要是怕协程过多,那么我们再引入协程池(其实大部分情况,咱们限制协程的数量就可以了)。

我们做异步流,肯定是要先分片,分别计算,最后再合并!(有点MapReduce那味儿了,哈哈哈)

在Java中,有一个分片工具叫做public static <T List<List<T>>Lists.patition(List<T> list,int size),准确来说是apche的某个库的,作用是将一个list分解为list.size()/size个分片,每个分片最多有size个元素。`

然后我们对每个分片进行处理不就行了嘛!没错就是这样!

分片处理

代码语言:go
复制
package lists

import (
	"errors"
	"sync"

	lynxSync "github.com/Tangerg/lynx/pkg/sync" // 这个包用于协程数量限制,底层原理是管道阻塞
)

type Batch[T any, RT ~[]T] struct {
	SplitData []RT
}

func Partition[T any](datas []T, size int) Batch[T, []T] {
	splitData := make([][]T, 0)
	for i := 0; i < len(datas); i += size {
		splitData = append(splitData, datas[i:min(i+size, len(datas))])
	}

	return Batch[T, []T]{
		SplitData: splitData,
	}
}

func (b Batch[T, RT]) ForEach(solve func(pos int, automicDatas []T) error, async bool, limiter *lynxSync.Limiter) error {
	errs := make([]error, 0)
	if async {
		countDownLatch := &sync.WaitGroup{}
		//countDownLatch.Add(len(b.SplitData))
		for i, data := range b.SplitData {
			limiter.Acquire()
			countDownLatch.Add(1)
			go func(limit *lynxSync.Limiter, datas []T, pos int) {
				defer countDownLatch.Done()
				err := solve(pos, datas)
				if err != nil {
					errs = append(errs, err)
				}
				defer limit.Release()
			}(limiter, data, i)
		}
		countDownLatch.Wait()
	} else {
		for i, data := range b.SplitData {
			err := solve(i, data)
			if err != nil {
				errs = append(errs, err)
			}
		}
	}
	err := errors.Join(errs...)
	return err
}
package collect

import (
	"github.com/karosown/katool/collect/lists"
	"github.com/karosown/katool/container/stream"
)

func PartitionToStream[T any, RT ~[]T](pattion lists.Batch[T, RT]) *stream.Stream[RT, []RT] {
	return stream.ToStream(&pattion.SplitData)
}

那么合并呢?

合并这个东西,我们得考虑方法有没有要求顺序一致性,如果没有的话,直接merge就行了,有的话要另行讨论。

分片并发排序合并问题

这里就是在做LeetCode了,emm,我就直接上个合并两个有序数组的吧,如果你觉得你能够优化,直接合并n个有序数组也是可以的。

在Stream流中,遇到排序的有两个方法:SortOrderBy ,我这里对各种情况分别写个算法啊

代码语言:go
复制
package algorithm

func MergeSortedArrayWithEntity[T any](orderBy func(a, b T) bool) func(any, any) any {
	return func(cntValue any, nxt any) any {
		ts := cntValue.([]any)
		nxts := nxt.([]any)
		if len(nxts) == 0 {
			return ts
		}
		lenLast := len(ts)
		lenNxt := len(nxts)
		rest := make([]any, 0)
		l := 0
		r := 0
		for l < lenLast && r < lenNxt {
			total := ts[l].(T)
			current := nxts[r].(T)
			if orderBy(total, current) {
				rest = append(rest, total)
				l++
			} else {
				rest = append(rest, current)
				r++
			}
		}
		if l < lenLast {
			rest = append(rest, ts[l:lenLast]...)
		}
		if r < lenNxt {
			rest = append(rest, nxts[r:lenNxt]...)
		}
		return rest
	}
}
func MergeSortedArrayWithPrimaryData[T any](desc bool, orderBy HashComputeFunction) func(any, any) any {
	return func(cntValue any, nxt any) any {
		ts := cntValue.([]any)
		nxts := nxt.([]any)
		if len(nxts) == 0 {
			return ts
		}
		lenRe := len(ts)
		lenNxt := len(nxts)
		ress := make([]any, 0)
		l := 0
		r := 0
		for l < lenRe && r < lenNxt {
			current := nxts[r].(T)
			total := ts[l].(T)
			if orderBy(total) > orderBy(current) {
				if desc {
					ress = append(ress, total)
					l++
				} else {
					ress = append(ress, current)
					r++
				}
			} else {
				if desc {
					ress = append(ress, current)
					r++
				} else {
					ress = append(ress, total)
					l++
				}
			}
		}

		if r < lenNxt {
			ress = append(ress, nxts[r:lenNxt]...)
		}
		if l < lenRe {
			ress = append(ress, ts[l:lenRe]...)
		}
		return ress
	}
}

func MergeSortedArrayWithPrimaryId[T any](desc bool, orderBy IDComputeFunction) func(any, any) any {
	return func(cntValue any, nxt any) any {
		ts := cntValue.([]any)
		nxts := nxt.([]any)
		if len(nxts) == 0 {
			return ts
		}
		lenRe := len(ts)
		lenNxt := len(nxts)
		ress := make([]any, 0)
		l := 0
		r := 0
		for l < lenRe && r < lenNxt {
			current := nxts[r].(T)
			total := ts[l].(T)
			if orderBy(total) > orderBy(current) {
				if desc {
					ress = append(ress, total)
					l++
				} else {
					ress = append(ress, current)
					r++
				}
			} else {
				if desc {
					ress = append(ress, current)
					r++
				} else {
					ress = append(ress, total)
					l++
				}
			}
		}

		if r < lenNxt {
			ress = append(ress, nxts[r:lenNxt]...)
		}
		if l < lenRe {
			ress = append(ress, ts[l:lenRe]...)
		}
		return ress
	}
}

部分定位操作如何解决?

假设我需要写个ToMap操作,里面两个函数式编程一个是 key func(pos int,item T) any ,另一个是value func(pos int,item T) any ,那么我们如何保证传入的pos是正确的呢?靠位移,只要我们保证分片的有序性,执行某个分片的时候,通过函数式编程传入分片序号*分片大小+数据序号 不就行了吗?

代码语言:go
复制
func (s *Stream[T, Slice]) ToMap(k func(index int, item T) any, v func(i int, item T) any) map[any]any {
	ress := sync.Map{}
	size := len(*s.options)
	goRun[Option[T]](*s.options, s.parallel, func(pos int, options []Option[T]) error {
		for i := 0; i < len(options); i++ {
			index := pos*optional.IsTrue(s.parallel,
				optional.IsTrue((getPageSize(size)) == 0, 1, getPageSize(size)), 1) + i
			ress.Store(k(index, (options)[i].opt), v(index, (options)[i].opt))
		}
		return nil
	})
	res := make(map[any]any)
	ress.Range(func(key, value any) bool {
		res[key] = value
		return true // 继续遍历
	})
	return res
}

// GroupBy 分工单一原则,保证GroupBy无法修改options
func (s *Stream[T, Slice]) GroupBy(groupBy func(item T) any) map[any]Slice {
	res := &sync.Map{}
	size := len(*s.options)
	goRun[Option[T]](*s.options, s.parallel, func(pos int, options []Option[T]) error {
		for i := 0; i < len(options); i++ {
			key := groupBy((options)[i].opt)
			if _, ok := res.Load(key); !ok {
				res.Store(key, make(Slice, 0))
			}
			value, ok := res.Load(key)
			if ok {
				index := pos*optional.IsTrue(s.parallel,
					optional.IsTrue((getPageSize(size)) == 0, 1, getPageSize(size)), 1) + i
				res.Store(key, append(value.(Slice), (*s.source)[index]))
			}

		}
		return nil
	})
	result := make(map[any]Slice)
	res.Range(func(key, value any) bool {
		result[key] = value.(Slice)
		return true // 继续遍历
	})
	return result
}

1.4 Stream架构设计

2. 代码编写

2.1 同步方式

同步实现起来比较简单,我这里就直接上异步的代码了

2.2 异步方式

`

代码语言:go
复制
package stream

//TIP 模仿Java的Stream流式处理
// 通常使用ToStream或者Of方法即可(Of是为了保留Java原有的API),特殊情况构建Any的Stream可以用NewStream方法
// 另外两个Option的Stream方法为内部实现(newOptionsStream 和 newOptionStream)
// 使用泛型的时候注意不要造成泛型循环

import (
	"fmt"
	"reflect"
	"sort"
	"sync"

	lynx "github.com/Tangerg/lynx/pkg/sync"
	"github.com/karosown/katool/algorithm"
	"github.com/karosown/katool/collect/lists"
	"github.com/karosown/katool/container/optional"
	"github.com/karosown/katool/convert"
)

func getPageSize(size int) int {
	return size >> 2
}

type Stream[T any, Slice ~[]T] struct {
	options  *Options[T]
	source   *Slice
	parallel bool
}

func NewStream(source *[]any) *Stream[any, []any] {
	resOptions := make(Options[any], 0)
	for i := 0; i < len(*source); i++ {
		resOptions = append(resOptions, Option[any]{opt: (*source)[i]})
	}
	return &Stream[any, []any]{
		options: &resOptions,
		source:  source,
	}
}

func ToStream[T any, Slice ~[]T](source *Slice) *Stream[T, Slice] {
	resOptions := make(Options[T], 0)
	for i := 0; i < len(*source); i++ {
		resOptions = append(resOptions, Option[T]{opt: (*source)[i]})
	}
	return &Stream[T, Slice]{
		options: &resOptions,
		source:  source,
	}
}

// Of creates a stream from the given slice.
func Of[T any, Slice ~[]T](source *Slice) *Stream[T, Slice] {
	return ToStream(source)
}

func newOptionsStream[Opt any, Opts Options[Opt]](source *[]Opts) *Stream[Options[any], []Options[any]] {
	resOptions := make([]Option[Options[any]], 0)
	sourceList := make([]Options[any], 0)
	optionsAdpter := func(opts Options[Opt]) Options[any] {
		res := make(Options[any], 0)
		for i := 0; i < len(opts); i++ {
			convertOption := &Option[any]{opt: any(opts[i].opt)}
			res = append(res, *convertOption)
		}
		return res
	}
	for i := 0; i < len(*source); i++ {
		t := Options[Opt]((*source)[i])
		resOptions = append(resOptions, Option[Options[any]]{opt: optionsAdpter(t)})
		sourceList = append(sourceList, optionsAdpter(t))
	}
	return &Stream[Options[any], []Options[any]]{
		options: (*Options[Options[any]])(&resOptions),
		source:  &sourceList,
	}
}

func newOptionStream[Opt any, T Options[Opt]](source *T) *Stream[Option[any], []Option[any]] {
	resOptions := make(Options[Option[any]], 0)
	resSource := make([]Option[any], 0)
	for i := 0; i < len(*source); i++ {
		resOptions = append(resOptions, Option[Option[any]]{
			Option[any]{opt: any((*source)[i].opt)},
		})
		resSource = append(resSource, Option[any]{opt: any((*source)[i].opt)})
	}
	return &Stream[Option[any], []Option[any]]{
		options: &resOptions,
		source:  &resSource,
	}
}
func ToParallelStream[T any, Slice ~[]T](source *Slice) *Stream[T, Slice] {
	resOptions := make(Options[T], 0)
	for i := 0; i < len(*source); i++ {
		resOptions = append(resOptions, Option[T]{opt: (*source)[i]})
	}
	return &Stream[T, Slice]{
		options:  &resOptions,
		source:   source,
		parallel: true,
	}
}
func (s *Stream[T, Slice]) Join(source *Slice) *Stream[T, []T] {
	list := s.ToList()
	list = append(list, *source...)
	return ToStream(&list)
}
func goRun[T any](datas []T, parallel bool, solve func(pos int, automicDatas []T) error) {
	size := len(datas)
	pageSize := optional.IsTrue((getPageSize(size)) == 0, 1, getPageSize(size))
	goNum := algorithm.NumOfTwoMultiply(size)
	err := lists.Partition(datas, optional.IsTrue(parallel, pageSize, 1)).ForEach(solve, parallel, lynx.NewLimiter(optional.IsTrue(parallel, goNum, 1)))
	if err != nil {
		fmt.Println(err)
	}
	return
}
func (s *Stream[T, Slice]) Map(fn func(i T) any) *Stream[any, []any] {
	size := len(*s.options)
	resChan := make(chan any, size)
	goRun[Option[T]](*s.options, s.parallel, func(pos int, options []Option[T]) error {
		for i := 0; i < len(options); i++ {
			runCall := fn(options[i].opt)
			resChan <- runCall
		}
		return nil
	})
	resSource := convert.ChanToArray(resChan)
	return ToStream(&resSource)
}

// FlatMap 扁平化处理,需要放入一个返回新的Stream流的函数
// 参考:https://blog.csdn.net/feinifi/article/details/128980814
func (s *Stream[T, Slice]) FlatMap(fn func(i T) *Stream[any, []any]) *Stream[any, []any] {
	size := len(*s.options)
	resChan := make(chan []any, size)
	goRun[Option[T]](*s.options, s.parallel, func(pos int, options []Option[T]) error {
		for i := 0; i < len(options); i++ {
			runCall := fn(options[i].opt)
			resChan <- runCall.ToList()
		}
		return nil
	})
	//if !s.parallel {
	resSource := convert.ChanToFlatArray(resChan)
	return ToStream(&resSource)
}

// Distinct 按照默认方法去重(默认是json化之后来进行字符串的比对)
func (s *Stream[T, Slice]) Distinct() *Stream[T, Slice] {
	return s.DistinctBy(algorithm.HASH_WITH_JSON)
}

// DistinctBy 按照指定方法去重
func (s *Stream[T, Slice]) DistinctBy(hash algorithm.HashComputeFunction) *Stream[T, Slice] {
	res := make(Slice, 0)
	size := len(*s.options)
	if size < 1e10+5 {
		options := s.Sort(func(a, b T) bool { return hash(a) < hash(b) }).ToOptionList()
		for i := 0; i < len(options); i++ {
			if i == 0 {
				res = append(res, (options)[i].opt)
			} else if hash((options)[i-1].opt) != hash((options)[i].opt) {
				res = append(res, (options)[i].opt)
			}
		}
	} else {
		//  if large data, use map
		m := make(map[algorithm.HashType]bool)
		for i := 0; i < size; i++ {
			if _, ok := m[hash((*s.options)[i].opt)]; !ok {
				m[hash((*s.options)[i].opt)] = true
				res = append(res, (*s.options)[i].opt)
			}
		}
	}
	return ToStream(&res)
	//}
	//return nil
}

// Reduce 求和计算
func (s *Stream[T, Slice]) Reduce(begin any, atomicSolveFunction func(cntValue any, nxt T) any, parallelResultSolve func(sum1, sum2 any) any) any {
	if atomicSolveFunction == nil {
		panic("atomicSolveFunction must not nil")
	}
	if s.parallel && parallelResultSolve == nil {
		panic("parallelResultSolve must not be nil where parallelResult")
	}
	//size := len(*s.options)
	beginType := reflect.TypeOf(begin)
	lock := &sync.Mutex{}
	goRun[Option[T]](*s.options, s.parallel, func(pos int, options []Option[T]) error {
		if s.parallel {
			currentBegin := reflect.New(beginType).Elem().Interface()
			for i := 0; i < len(options); i++ {
				currentBegin = atomicSolveFunction(currentBegin, options[i].opt)
			}
			lock.Lock()
			defer lock.Unlock()

			if parallelResultSolve != nil {
				begin = parallelResultSolve(begin, currentBegin)
			}
		} else {
			for i := 0; i < len(options); i++ {
				begin = atomicSolveFunction(begin, options[i].opt)
			}
		}
		return nil
	})
	return begin
}

func (s *Stream[T, Slice]) Filter(fn func(i T) bool) *Stream[T, Slice] {
	res := make(Slice, 0)
	size := len(*s.options)
	resChan := make(chan T, size)
	goRun[Option[T]](*s.options, s.parallel, func(pos int, options []Option[T]) error {
		for i := 0; i < len(options); i++ {
			if fn((options)[i].opt) {
				resChan <- (options)[i].opt
			}
		}
		return nil
	})
	chanSize := len(resChan)
	for i := 0; i < chanSize; i++ {
		res = append(res, <-resChan)
	}
	return ToStream(&res)
}
func (s *Stream[T, Slice]) ToOptionList() Options[T] {
	return *s.options
}
func (s *Stream[T, Slice]) ToList() []T {
	res := make([]T, 0)
	size := len(*s.options)
	resChan := make(chan T, size)
	goRun[Option[T]](*s.options, s.parallel, func(pos int, options []Option[T]) error {
		for i := 0; i < len(options); i++ {
			resChan <- (options)[i].opt
		}
		return nil
	})
	for i := 0; i < size; i++ {
		res = append(res, <-resChan)
	}
	return res
}
func (s *Stream[T, Slice]) ToMap(k func(index int, item T) any, v func(i int, item T) any) map[any]any {
	ress := sync.Map{}
	size := len(*s.options)
	goRun[Option[T]](*s.options, s.parallel, func(pos int, options []Option[T]) error {
		for i := 0; i < len(options); i++ {
			index := pos*optional.IsTrue(s.parallel,
				optional.IsTrue((getPageSize(size)) == 0, 1, getPageSize(size)), 1) + i
			ress.Store(k(index, (options)[i].opt), v(index, (options)[i].opt))
		}
		return nil
	})
	res := make(map[any]any)
	ress.Range(func(key, value any) bool {
		res[key] = value
		return true // 继续遍历
	})
	return res
}

// GroupBy 分工单一原则,保证GroupBy无法修改options
func (s *Stream[T, Slice]) GroupBy(groupBy func(item T) any) map[any]Slice {
	res := &sync.Map{}
	size := len(*s.options)
	goRun[Option[T]](*s.options, s.parallel, func(pos int, options []Option[T]) error {
		for i := 0; i < len(options); i++ {
			key := groupBy((options)[i].opt)
			if _, ok := res.Load(key); !ok {
				res.Store(key, make(Slice, 0))
			}
			value, ok := res.Load(key)
			if ok {
				index := pos*optional.IsTrue(s.parallel,
					optional.IsTrue((getPageSize(size)) == 0, 1, getPageSize(size)), 1) + i
				res.Store(key, append(value.(Slice), (*s.source)[index]))
			}

		}
		return nil
	})
	result := make(map[any]Slice)
	res.Range(func(key, value any) bool {
		result[key] = value.(Slice)
		return true // 继续遍历
	})
	return result
}

func (s *Stream[T, Slice]) OrderBy(desc bool, orderBy algorithm.HashComputeFunction) *Stream[T, Slice] {
	if !s.parallel {
		sort.SliceStable(*s.options, func(i, j int) bool {
			a := orderBy((*s.options)[i].opt)
			b := orderBy((*s.options)[j].opt)
			if desc {
				return a > b
			} else {
				return a < b
			}
		})
		return s
	}

	size := len(*s.options)
	data := make([]Options[T], 0, optional.IsTrue((getPageSize(size)) == 0, 1, getPageSize(size)))
	// opt opt opt opt -> opts opts
	goRun[Option[T]](*s.options, s.parallel, func(pos int, options []Option[T]) error {
		//println(pos, options)
		data = append(data, options)
		return nil
	})

	optionsStream := newOptionsStream[T, Options[T]](&data)

	optionsStream.parallel = s.parallel
	sortedMap := optionsStream.Map(func(options Options[any]) any {
		sort.SliceStable(options, func(i, j int) bool {
			a := orderBy(options[i].opt)
			b := orderBy(options[j].opt)
			if desc {
				return a > b
			} else {
				return a < b
			}
		})
		return options
	})
	sortedMap.parallel = s.parallel
	res := sortedMap.Map(func(v any) any {
		i := v.(Options[any])
		ress := newOptionStream(&i).Map(func(item Option[any]) any {
			return item.opt
		}).ToList()
		return ress
	}).ToList()
	re := make([]any, 0)
	toStream := ToStream(&res)
	toStream.parallel = s.parallel
	mergeSorted := toStream.Reduce(re, algorithm.MergeSortedArrayWithPrimaryData[T](desc, orderBy), algorithm.MergeSortedArrayWithPrimaryData[T](desc, orderBy)).([]any)
	result := make(Slice, 0)
	for i := 0; i < len(mergeSorted); i++ {
		result = append(result, mergeSorted[i].(T))
	}
	//stream := ToStream(&result)
	stream := ToStream(&result)
	return stream
}

func (s *Stream[T, Slice]) OrderById(desc bool, orderBy algorithm.IDComputeFunction) *Stream[T, Slice] {
	if !s.parallel {
		sort.SliceStable(*s.options, func(i, j int) bool {
			a := orderBy((*s.options)[i].opt)
			b := orderBy((*s.options)[j].opt)
			if desc {
				return a > b
			} else {
				return a < b
			}
		})
		return s
	}

	size := len(*s.options)
	data := make([]Options[T], 0, optional.IsTrue((getPageSize(size)) == 0, 1, getPageSize(size)))
	// opt opt opt opt -> opts opts
	goRun[Option[T]](*s.options, s.parallel, func(pos int, options []Option[T]) error {
		//println(pos, options)
		data = append(data, options)
		return nil
	})
	optionsStream := newOptionsStream[T, Options[T]](&data)
	optionsStream.parallel = s.parallel
	sortedMap := optionsStream.Map(func(options Options[any]) any {
		sort.SliceStable(options, func(i, j int) bool {
			a := orderBy(options[i].opt)
			b := orderBy(options[j].opt)
			if desc {
				return a > b
			} else {
				return a < b
			}
		})
		return options
	})
	sortedMap.parallel = s.parallel
	res := sortedMap.Map(func(v any) any {
		i := v.(Options[any])
		ress := newOptionStream(&i).Map(func(item Option[any]) any {
			return item.opt
		}).ToList()
		return ress
	}).ToList()
	re := make([]any, 0)
	toStream := ToStream(&res)
	toStream.parallel = s.parallel
	mergeSorted := toStream.Reduce(re, algorithm.MergeSortedArrayWithPrimaryId[T](desc, orderBy), algorithm.MergeSortedArrayWithPrimaryId[T](desc, orderBy)).([]any)
	result := make(Slice, 0)
	for i := 0; i < len(mergeSorted); i++ {
		result = append(result, mergeSorted[i].(T))
	}
	//stream := ToStream(&result)
	stream := ToStream(&result)
	return stream
}

func (s *Stream[T, Slice]) Sort(orderBy func(a, b T) bool) *Stream[T, Slice] {

	if !s.parallel {
		sort.SliceStable(*s.options, func(i, j int) bool {
			return orderBy((*s.options)[i].opt, (*s.options)[j].opt)
		})
		return s
	}

	size := len(*s.options)
	data := make([]Options[T], 0, optional.IsTrue((getPageSize(size)) == 0, 1, getPageSize(size)))
	// opt opt opt opt -> opts opts
	goRun[Option[T]](*s.options, s.parallel, func(pos int, options []Option[T]) error {
		//println(pos, options)
		data = append(data, options)
		return nil
	})
	optionsStream := newOptionsStream[T, Options[T]](&data)
	optionsStream.parallel = s.parallel
	sortedMap := optionsStream.Map(func(options Options[any]) any {
		sort.SliceStable(options, func(i, j int) bool {
			return orderBy(options[i].opt.(T), options[j].opt.(T))
		})
		return options
	})
	sortedMap.parallel = s.parallel
	res := sortedMap.Map(func(v any) any {
		i := v.(Options[any])
		ress := newOptionStream(&i).Map(func(item Option[any]) any {
			return item.opt
		}).ToList()
		return ress
	}).ToList()
	re := make([]any, 0)
	toStream := ToStream(&res)
	toStream.parallel = s.parallel
	mergeSorted := toStream.Reduce(re, algorithm.MergeSortedArrayWithEntity[T](orderBy), algorithm.MergeSortedArrayWithEntity[T](orderBy)).([]any)
	result := make(Slice, 0)
	for i := 0; i < len(mergeSorted); i++ {
		result = append(result, mergeSorted[i].(T))
	}
	//stream := ToStream(&result)
	stream := ToStream(&result)
	return stream
}
func (s *Stream[T, Slice]) Collect(call func(data Options[T], sourceData Slice) any) any {
	res := call(*s.options, *s.source)
	return res
}

func (s *Stream[T, Slice]) ForEach(fn func(item T)) *Stream[T, Slice] {
	//size := len(*s.options)
	goRun[Option[T]](*s.options, s.parallel, func(pos int, options []Option[T]) error {
		for i := 0; i < len(options); i++ {
			fn((options)[i].opt)
		}
		return nil
	})
	return s
}

func (s *Stream[T, Slice]) Count() int64 {
	return int64(len(*s.options))
}

func (s *Stream[T, Slice]) Parallel() *Stream[T, Slice] {
	s.parallel = true
	return s
}

func (s *Stream[T, Slice]) UnParallel() *Stream[T, Slice] {
	s.parallel = false
	return s
}
package stream

import (
	"github.com/karosown/katool/convert"
)

type Entry[K comparable, V any] struct {
	Key   K
	Value V
}
type Entries[K comparable, V any] []Entry[K, V]

func EntrySet[K comparable, V any](m map[K]V) Entries[K, V] {
	var entries []Entry[K, V]
	for k, v := range m {
		entries = append(entries, Entry[K, V]{Key: k, Value: v})
	}
	return entries
}
func (e Entries[K, V]) Identity() *[]Entry[K, V] {
	convert := make([]Entry[K, V], len(e))
	for i := 0; i < len(e); i++ {
		convert[i] = Entry[K, V]{Key: e[i].Key, Value: e[i].Value}
	}
	return &convert
}
func (e Entries[K, V]) KeySet() []K {
	keyset := e.ToStream().Map(func(i Entry[K, V]) any {
		return i.Key
	}).ToList()
	return convert.FromAnySlice[K](keyset)
}
func (e Entries[K, V]) Values() []V {
	keyset := e.ToStream().Map(func(i Entry[K, V]) any {
		return i.Value
	}).ToList()
	return convert.FromAnySlice[V](keyset)
}
func (e Entries[K, V]) KeySetStream() *Stream[K, []K] {
	ks := e.KeySet()
	return ToStream(&ks)
}
func (e Entries[K, V]) ValuesStream() *Stream[V, []V] {
	ks := e.Values()
	return ToStream(&ks)
}
func (e Entries[K, V]) ToStream() *Stream[Entry[K, V], []Entry[K, V]] {
	return ToStream[Entry[K, V], []Entry[K, V]](e.Identity())
}

func (e Entries[K, V]) ToParallelStream() *Stream[Entry[K, V], []Entry[K, V]] {
	return ToParallelStream[Entry[K, V], []Entry[K, V]](e.Identity())
}

3. 测试用例

代码语言:go
复制
package container_test

import (
	"fmt"
	"math/rand"
	"strconv"
	"testing"
	"time"

	"github.com/duke-git/lancet/v2/maputil"
	"github.com/duke-git/lancet/v2/random"
	"github.com/karosown/katool/algorithm"
	"github.com/karosown/katool/container/stream"
	"github.com/karosown/katool/convert"
)

type user struct {
	Name  string `json:"name"`
	Age   int    `json:"age"`
	Sex   int    `json:"sex"`
	Money int    `json:"money"`
	Class string `json:"class"`
	Id    int    `json:"id"`
}
type userVo struct {
	Name string `json:"name"`
	Age  int    `json:"age"`
	Sex  int    `json:"sex"`
	Id   int    `json:"id"`
}

var userList []user

func TestOfStream(t *testing.T) {

	arr := []int{1, 3, 2, 3, 3, 3, 3, 1, 3, 2, 3, 3, 3, 3, 1, 3, 2, 3, 3, 3, 3, 1, 3, 2, 3, 3, 3, 3, 1, 3, 2, 3, 3, 3, 3, 1, 3, 2, 3, 3, 3, 3}

	distinct := stream.ToStream(&arr).
		Parallel().
		Filter(func(i int) bool {
			return i > 1
		}).Map(func(item int) any {
		return strconv.Itoa(item) + "w "
	}).DistinctBy(algorithm.HASH_WITH_JSON_SUM)

	fmt.Println(distinct.Reduce("", func(cntValue any, nxt any) any {
		return cntValue.(string) + nxt.(string)
	}, func(sum1, sum2 any) any {
		return sum1.(string) + sum2.(string)
	}))
	list := distinct.ToOptionList()
	list.ForEach(func(s any) {
		fmt.Println(s)
	})

	toMap := stream.ToStream(&arr).
		//Parallel().
		ToMap(func(index int, item int) any {
			return index
		}, func(index int, item int) any {
			return item
		})

	maputil.ForEach(toMap, func(key any, value any) {
		fmt.Printf("key: %v, value: %v\n", key, value)
	})
}

func Test_Map(t *testing.T) {
	ul := userList[:]
	// 计数
	userStream := stream.ToStream(&ul).Parallel()
	println(userStream.Count())
	// 排序
	stream.ToStream(&ul).
		Parallel().
		Sort(func(a user, b user) bool { return a.Id < b.Id }).ForEach(func(item user) { println(convert.ToString(item.Id) + " " + item.Name) })
	// 求和
	totalMoney := userStream.Reduce(int64(0), func(cntValue any, nxt user) any { return cntValue.(int64) + int64(nxt.Money) }, func(sum1, sum2 any) any {
		return sum1.(int64) + sum2.(int64)
	})
	println(totalMoney.(int64))
	// 过滤
	userStream.Filter(func(item user) bool { return item.Sex != 0 }).DistinctBy(algorithm.HASH_WITH_JSON_MD5).ToOptionList().ForEach(func(item user) { println(item.Name) })
	// 转换
	s := userStream.Map(func(item user) any {
		properties, err := convert.CopyProperties(&item, &userVo{})
		if err != nil {
			panic(err)
		}
		return properties
	}).ToOptionList()
	s.ForEach(func(s any) {
		fmt.Println(s)
	})
}

func Test_GroupBy(t *testing.T) {
	users := userList[:]
	by := stream.ToStream(&users).GroupBy(func(user user) any {
		return user.Class
	})
	println(by)
}

func init() {
	classes := []string{"一班", "二班", "三班", "四班", "五班"}
	userList = make([]user, 0)
	for i := 0; i < 100; i++ {
		userList = append(userList, user{
			Name:  random.RandString(10),
			Class: classes[rand.Int()%len(classes)],
			Age:   rand.Intn(100),
			Sex:   rand.Intn(2),
		})
		time.Sleep(1)
	}
}
func Test(t *testing.T) {
	by := stream.ToStream(&userList).Parallel().GroupBy(func(user user) any { return user.Class })
	maputil.ForEach(by, func(key any, value []user) {
		println(key.(string))
		toStream := stream.ToStream(&value).Parallel()
		toStream.ForEach(func(item user) {
			fmt.Println(item)
		})
		reduce := toStream.Reduce(0, func(cntValue any, nxt user) any {
			return cntValue.(int) + nxt.Sex
		}, func(cntValue any, nxt any) any {
			return cntValue.(int) + nxt.(int)
		})
		println("男生总数:" + convert.ToString(reduce))
		reduce = toStream.Reduce(0, func(cntValue any, nxt user) any {
			return cntValue.(int) + (nxt.Sex ^ 1)
		}, func(cntValue any, nxt any) any {
			return cntValue.(int) + nxt.(int)
		})
		println("女生总数:" + convert.ToString(reduce))
	})
	toStream := stream.ToStream(&userList).Parallel()
	count := toStream.Filter(func(item user) bool { return item.Age >= 60 }).Count()
	println("年龄大于等于60岁的共" + convert.ToString(count) + "人")
	count = toStream.Filter(func(item user) bool { return item.Age < 60 }).Count()
	println("年龄小于60岁的共" + convert.ToString(count) + "人")
	toStream.FlatMap(func(i user) *stream.Stream[any, []any] {
		array := convert.ToAnySlice([]rune(i.Name))
		return stream.ToStream(&array)
	}).ForEach(func(item any) {
		print(string(item.(rune)) + " ")
	})
}
func Test_Sorted(t *testing.T) {
	arr := []int{1, 2, 3, 4, 5, 6, 7, 1, 2, 8, 9, 10}
	for i := 0; i < 100; i++ {
		arr = append(arr, 1, 2, 3, 4, 5, 6, 7, 1, 2, 8, 9, 10)
	}
	stream.ToStream(&arr).Parallel().Sort(func(a, b int) bool { return a < b }).ForEach(func(item int) {
		fmt.Println(item)
	})
}
func Test_Distinct(t *testing.T) {
	arr := []int{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}
	for i := 0; i < 10000; i++ {
		arr = append(arr, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
	}
	stream.ToStream(&arr).Parallel().Distinct().ForEach(func(item int) {
		fmt.Println(item)
	})
}

这里给出最简单的,如果要使用建议去github fork一份: https://github.com/Karosown/katool-go

总结

本文不仅从理论上分析了Java中的Stream及Go的并发流实现,还具体讲述了分片、异步操作及多线程的思想和实践。Stream的核心是将数据操作流式化,本质上是通过灵活的API按照声明式编程的思维进行处理。在Java中,凭借Fork/Join框架,Stream能够高效分片并行处理数据;而在Go中,协程与分片操作结合形成了简单高效的异步流解决方案。

通过对分片工具进行运用与封装,以及对排序、任务合并等算法的实践,我们阐述了一种从Java Stream到Go流处理的迁移与优化思路。与此同时,Stream在实际场景中的高级功能如分片并发排序、分组、去重等,也全面展示了Go语言在流式处理方面的可行性和高效性。

展望未来,结合对性能进一步优化及多场景扩展,Go流处理能够迎来更加广泛的应用场景。从这一讨论中,我们可以更好地理解如何以编程语言与操作模式的特性为导向,选择最优的数据流处理方案。

下一章:Go的几个服务框架对比

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 1. Stream流 浅析
    • 1.1 Java中Stream是怎么样的?
    • 1.2 异步流的理论实现
      • 1. 任务分解和分治思想
      • 2. Fork/Join框架的驱动
      • 3. 并行流与顺序流的区别
    • 1.3 异步流Fork/Join实现 - 分片工具尝试
      • 分片处理
      • 分片并发排序合并问题
      • 部分定位操作如何解决?
    • 1.4 Stream架构设计
  • 2. 代码编写
    • 2.1 同步方式
    • 2.2 异步方式
  • 3. 测试用例
  • 总结
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档