Hello,大家好,我是程序员Karos,从本章开始,我会逐渐带大家复刻Stream到Go中,在里面,我会讲Stream流、异步流给引入,同时会讲一些Sync的运用以及异步同步控制的方式。
Java中的Stream是一种高效的、以声明方式处理数据序列的工具。Stream API允许我们以简单且直观的方式进行数据处理和转换。以下是一个示例代码,阐明了Stream的用法和基本操作:
package org.example;
import java.util.Arrays;
import java.util.List;
import java.util.stream.Collectors;
public class Main {
public static void main(String[] args) {
List<String> names = Arrays.asList("Alice", "Bob", "Charlie", "David");
// 使用Stream过滤以字母"A"开头的名字并转换为大写
List<String> filteredNames = names.stream()
.filter(name -> name.startsWith("A"))
.map(String::toUpperCase)
.collect(Collectors.toList());
System.out.println(filteredNames);
// 输出: [ALICE]
}
}
通过这个示例,可以看到如何创建Stream、应用过滤和映射操作,以及最终收集结果。这样的示例有助于提升读者的理解和实践感。
同理,我们先来看看异步流(parallelStream)是如何用的
package org.example;
import java.util.Arrays;
import java.util.List;
import java.util.stream.Collectors;
public class Main {
public static void main(String[] args) {
List<String> names = Arrays.asList("Alice", "Bob", "Charlie", "David");
// 使用Stream过滤以字母"A"开头的名字并转换为大写
List<String> filteredNames = names.parallelStream()
.filter(name -> name.startsWith("A"))
.map(String::toUpperCase)
.collect(Collectors.toList());
System.out.println(filteredNames);
// 输出: [ALICE]
}
}
有部分面霸肯定是知道这个玩意儿的,之前面试米厂的时候被问到过,但是没了解,就按照名字和自己的思路胡扯了一通,没想到大差不差。
那么它是如何实现的呢?(这里不是我们讲解的重点,我直接用AI来进行解释 【诚恳的态度】)
parallelStream()
是Java中Stream
API提供的一种并行处理数据流的方式。本质上,它是Stream
的一种扩展,以实现任务分解与多线程并行处理,提升数据处理的效率。其实现依赖于Fork/Join框架,充分利用多核CPU的性能优势。接下来,我们详细探讨parallelStream()
的实现原理。
在并行流中,每个数据处理任务会被拆分成多个子任务,然后这些子任务可以在不同的CPU核心上同时运行。这种方式运用了经典的"分而治之"思想:
parallelStream()
会根据底层数据源进行划分。例如,对于一个ArrayList
,它会将数据分成几个小块;对于TreeSet
或LinkedList
,划分方式可能会更加复杂。Java的Fork/Join
框架是parallelStream
实现并行计算的核心。Fork/Join
提供了任务分裂(fork)和结果合并(join)的机制,具体步骤如下:
ForkJoinPool
(公共线程池)中执行。每个线程实现“工作窃取”机制,即:若某线程完成了自己任务队列中的所有任务,会尝试窃取其他线程的任务。parallelStream()
在性能上比顺序流具有显著优势,尤其是当数据量较大时。然而,并行流的实现引入了更多复杂性和潜在开销,以下是它们的关键区别:
stream
)在单线程上运行,任务是串行执行的。parallelStream
)会利用线程池(默认是公共的ForkJoinPool
)分配多个线程并行运行多个子任务。forEachOrdered
方法)。上面写的什么fork、join咱们先别管,什么线程池也别管,Go的GMP就挺不错,要是怕协程过多,那么我们再引入协程池(其实大部分情况,咱们限制协程的数量就可以了)。
我们做异步流,肯定是要先分片,分别计算,最后再合并!(有点MapReduce那味儿了,哈哈哈)
在Java中,有一个分片工具叫做public static <T List<List<T>>Lists.patition(List<T> list,int size)
,准确来说是apche的某个库的,作用是将一个list分解为list.size()/size个分片,每个分片最多有size个元素。`
然后我们对每个分片进行处理不就行了嘛!没错就是这样!
package lists
import (
"errors"
"sync"
lynxSync "github.com/Tangerg/lynx/pkg/sync" // 这个包用于协程数量限制,底层原理是管道阻塞
)
type Batch[T any, RT ~[]T] struct {
SplitData []RT
}
func Partition[T any](datas []T, size int) Batch[T, []T] {
splitData := make([][]T, 0)
for i := 0; i < len(datas); i += size {
splitData = append(splitData, datas[i:min(i+size, len(datas))])
}
return Batch[T, []T]{
SplitData: splitData,
}
}
func (b Batch[T, RT]) ForEach(solve func(pos int, automicDatas []T) error, async bool, limiter *lynxSync.Limiter) error {
errs := make([]error, 0)
if async {
countDownLatch := &sync.WaitGroup{}
//countDownLatch.Add(len(b.SplitData))
for i, data := range b.SplitData {
limiter.Acquire()
countDownLatch.Add(1)
go func(limit *lynxSync.Limiter, datas []T, pos int) {
defer countDownLatch.Done()
err := solve(pos, datas)
if err != nil {
errs = append(errs, err)
}
defer limit.Release()
}(limiter, data, i)
}
countDownLatch.Wait()
} else {
for i, data := range b.SplitData {
err := solve(i, data)
if err != nil {
errs = append(errs, err)
}
}
}
err := errors.Join(errs...)
return err
}
package collect
import (
"github.com/karosown/katool/collect/lists"
"github.com/karosown/katool/container/stream"
)
func PartitionToStream[T any, RT ~[]T](pattion lists.Batch[T, RT]) *stream.Stream[RT, []RT] {
return stream.ToStream(&pattion.SplitData)
}
那么合并呢?
合并这个东西,我们得考虑方法有没有要求顺序一致性,如果没有的话,直接merge就行了,有的话要另行讨论。
这里就是在做LeetCode了,emm,我就直接上个合并两个有序数组的吧,如果你觉得你能够优化,直接合并n个有序数组也是可以的。
在Stream流中,遇到排序的有两个方法:Sort
和 OrderBy
,我这里对各种情况分别写个算法啊
package algorithm
func MergeSortedArrayWithEntity[T any](orderBy func(a, b T) bool) func(any, any) any {
return func(cntValue any, nxt any) any {
ts := cntValue.([]any)
nxts := nxt.([]any)
if len(nxts) == 0 {
return ts
}
lenLast := len(ts)
lenNxt := len(nxts)
rest := make([]any, 0)
l := 0
r := 0
for l < lenLast && r < lenNxt {
total := ts[l].(T)
current := nxts[r].(T)
if orderBy(total, current) {
rest = append(rest, total)
l++
} else {
rest = append(rest, current)
r++
}
}
if l < lenLast {
rest = append(rest, ts[l:lenLast]...)
}
if r < lenNxt {
rest = append(rest, nxts[r:lenNxt]...)
}
return rest
}
}
func MergeSortedArrayWithPrimaryData[T any](desc bool, orderBy HashComputeFunction) func(any, any) any {
return func(cntValue any, nxt any) any {
ts := cntValue.([]any)
nxts := nxt.([]any)
if len(nxts) == 0 {
return ts
}
lenRe := len(ts)
lenNxt := len(nxts)
ress := make([]any, 0)
l := 0
r := 0
for l < lenRe && r < lenNxt {
current := nxts[r].(T)
total := ts[l].(T)
if orderBy(total) > orderBy(current) {
if desc {
ress = append(ress, total)
l++
} else {
ress = append(ress, current)
r++
}
} else {
if desc {
ress = append(ress, current)
r++
} else {
ress = append(ress, total)
l++
}
}
}
if r < lenNxt {
ress = append(ress, nxts[r:lenNxt]...)
}
if l < lenRe {
ress = append(ress, ts[l:lenRe]...)
}
return ress
}
}
func MergeSortedArrayWithPrimaryId[T any](desc bool, orderBy IDComputeFunction) func(any, any) any {
return func(cntValue any, nxt any) any {
ts := cntValue.([]any)
nxts := nxt.([]any)
if len(nxts) == 0 {
return ts
}
lenRe := len(ts)
lenNxt := len(nxts)
ress := make([]any, 0)
l := 0
r := 0
for l < lenRe && r < lenNxt {
current := nxts[r].(T)
total := ts[l].(T)
if orderBy(total) > orderBy(current) {
if desc {
ress = append(ress, total)
l++
} else {
ress = append(ress, current)
r++
}
} else {
if desc {
ress = append(ress, current)
r++
} else {
ress = append(ress, total)
l++
}
}
}
if r < lenNxt {
ress = append(ress, nxts[r:lenNxt]...)
}
if l < lenRe {
ress = append(ress, ts[l:lenRe]...)
}
return ress
}
}
假设我需要写个ToMap操作,里面两个函数式编程一个是 key func(pos int,item T) any
,另一个是value func(pos int,item T) any
,那么我们如何保证传入的pos是正确的呢?靠位移,只要我们保证分片的有序性,执行某个分片的时候,通过函数式编程传入分片序号*分片大小+数据序号
不就行了吗?
func (s *Stream[T, Slice]) ToMap(k func(index int, item T) any, v func(i int, item T) any) map[any]any {
ress := sync.Map{}
size := len(*s.options)
goRun[Option[T]](*s.options, s.parallel, func(pos int, options []Option[T]) error {
for i := 0; i < len(options); i++ {
index := pos*optional.IsTrue(s.parallel,
optional.IsTrue((getPageSize(size)) == 0, 1, getPageSize(size)), 1) + i
ress.Store(k(index, (options)[i].opt), v(index, (options)[i].opt))
}
return nil
})
res := make(map[any]any)
ress.Range(func(key, value any) bool {
res[key] = value
return true // 继续遍历
})
return res
}
// GroupBy 分工单一原则,保证GroupBy无法修改options
func (s *Stream[T, Slice]) GroupBy(groupBy func(item T) any) map[any]Slice {
res := &sync.Map{}
size := len(*s.options)
goRun[Option[T]](*s.options, s.parallel, func(pos int, options []Option[T]) error {
for i := 0; i < len(options); i++ {
key := groupBy((options)[i].opt)
if _, ok := res.Load(key); !ok {
res.Store(key, make(Slice, 0))
}
value, ok := res.Load(key)
if ok {
index := pos*optional.IsTrue(s.parallel,
optional.IsTrue((getPageSize(size)) == 0, 1, getPageSize(size)), 1) + i
res.Store(key, append(value.(Slice), (*s.source)[index]))
}
}
return nil
})
result := make(map[any]Slice)
res.Range(func(key, value any) bool {
result[key] = value.(Slice)
return true // 继续遍历
})
return result
}
同步实现起来比较简单,我这里就直接上异步的代码了
`
package stream
//TIP 模仿Java的Stream流式处理
// 通常使用ToStream或者Of方法即可(Of是为了保留Java原有的API),特殊情况构建Any的Stream可以用NewStream方法
// 另外两个Option的Stream方法为内部实现(newOptionsStream 和 newOptionStream)
// 使用泛型的时候注意不要造成泛型循环
import (
"fmt"
"reflect"
"sort"
"sync"
lynx "github.com/Tangerg/lynx/pkg/sync"
"github.com/karosown/katool/algorithm"
"github.com/karosown/katool/collect/lists"
"github.com/karosown/katool/container/optional"
"github.com/karosown/katool/convert"
)
func getPageSize(size int) int {
return size >> 2
}
type Stream[T any, Slice ~[]T] struct {
options *Options[T]
source *Slice
parallel bool
}
func NewStream(source *[]any) *Stream[any, []any] {
resOptions := make(Options[any], 0)
for i := 0; i < len(*source); i++ {
resOptions = append(resOptions, Option[any]{opt: (*source)[i]})
}
return &Stream[any, []any]{
options: &resOptions,
source: source,
}
}
func ToStream[T any, Slice ~[]T](source *Slice) *Stream[T, Slice] {
resOptions := make(Options[T], 0)
for i := 0; i < len(*source); i++ {
resOptions = append(resOptions, Option[T]{opt: (*source)[i]})
}
return &Stream[T, Slice]{
options: &resOptions,
source: source,
}
}
// Of creates a stream from the given slice.
func Of[T any, Slice ~[]T](source *Slice) *Stream[T, Slice] {
return ToStream(source)
}
func newOptionsStream[Opt any, Opts Options[Opt]](source *[]Opts) *Stream[Options[any], []Options[any]] {
resOptions := make([]Option[Options[any]], 0)
sourceList := make([]Options[any], 0)
optionsAdpter := func(opts Options[Opt]) Options[any] {
res := make(Options[any], 0)
for i := 0; i < len(opts); i++ {
convertOption := &Option[any]{opt: any(opts[i].opt)}
res = append(res, *convertOption)
}
return res
}
for i := 0; i < len(*source); i++ {
t := Options[Opt]((*source)[i])
resOptions = append(resOptions, Option[Options[any]]{opt: optionsAdpter(t)})
sourceList = append(sourceList, optionsAdpter(t))
}
return &Stream[Options[any], []Options[any]]{
options: (*Options[Options[any]])(&resOptions),
source: &sourceList,
}
}
func newOptionStream[Opt any, T Options[Opt]](source *T) *Stream[Option[any], []Option[any]] {
resOptions := make(Options[Option[any]], 0)
resSource := make([]Option[any], 0)
for i := 0; i < len(*source); i++ {
resOptions = append(resOptions, Option[Option[any]]{
Option[any]{opt: any((*source)[i].opt)},
})
resSource = append(resSource, Option[any]{opt: any((*source)[i].opt)})
}
return &Stream[Option[any], []Option[any]]{
options: &resOptions,
source: &resSource,
}
}
func ToParallelStream[T any, Slice ~[]T](source *Slice) *Stream[T, Slice] {
resOptions := make(Options[T], 0)
for i := 0; i < len(*source); i++ {
resOptions = append(resOptions, Option[T]{opt: (*source)[i]})
}
return &Stream[T, Slice]{
options: &resOptions,
source: source,
parallel: true,
}
}
func (s *Stream[T, Slice]) Join(source *Slice) *Stream[T, []T] {
list := s.ToList()
list = append(list, *source...)
return ToStream(&list)
}
func goRun[T any](datas []T, parallel bool, solve func(pos int, automicDatas []T) error) {
size := len(datas)
pageSize := optional.IsTrue((getPageSize(size)) == 0, 1, getPageSize(size))
goNum := algorithm.NumOfTwoMultiply(size)
err := lists.Partition(datas, optional.IsTrue(parallel, pageSize, 1)).ForEach(solve, parallel, lynx.NewLimiter(optional.IsTrue(parallel, goNum, 1)))
if err != nil {
fmt.Println(err)
}
return
}
func (s *Stream[T, Slice]) Map(fn func(i T) any) *Stream[any, []any] {
size := len(*s.options)
resChan := make(chan any, size)
goRun[Option[T]](*s.options, s.parallel, func(pos int, options []Option[T]) error {
for i := 0; i < len(options); i++ {
runCall := fn(options[i].opt)
resChan <- runCall
}
return nil
})
resSource := convert.ChanToArray(resChan)
return ToStream(&resSource)
}
// FlatMap 扁平化处理,需要放入一个返回新的Stream流的函数
// 参考:https://blog.csdn.net/feinifi/article/details/128980814
func (s *Stream[T, Slice]) FlatMap(fn func(i T) *Stream[any, []any]) *Stream[any, []any] {
size := len(*s.options)
resChan := make(chan []any, size)
goRun[Option[T]](*s.options, s.parallel, func(pos int, options []Option[T]) error {
for i := 0; i < len(options); i++ {
runCall := fn(options[i].opt)
resChan <- runCall.ToList()
}
return nil
})
//if !s.parallel {
resSource := convert.ChanToFlatArray(resChan)
return ToStream(&resSource)
}
// Distinct 按照默认方法去重(默认是json化之后来进行字符串的比对)
func (s *Stream[T, Slice]) Distinct() *Stream[T, Slice] {
return s.DistinctBy(algorithm.HASH_WITH_JSON)
}
// DistinctBy 按照指定方法去重
func (s *Stream[T, Slice]) DistinctBy(hash algorithm.HashComputeFunction) *Stream[T, Slice] {
res := make(Slice, 0)
size := len(*s.options)
if size < 1e10+5 {
options := s.Sort(func(a, b T) bool { return hash(a) < hash(b) }).ToOptionList()
for i := 0; i < len(options); i++ {
if i == 0 {
res = append(res, (options)[i].opt)
} else if hash((options)[i-1].opt) != hash((options)[i].opt) {
res = append(res, (options)[i].opt)
}
}
} else {
// if large data, use map
m := make(map[algorithm.HashType]bool)
for i := 0; i < size; i++ {
if _, ok := m[hash((*s.options)[i].opt)]; !ok {
m[hash((*s.options)[i].opt)] = true
res = append(res, (*s.options)[i].opt)
}
}
}
return ToStream(&res)
//}
//return nil
}
// Reduce 求和计算
func (s *Stream[T, Slice]) Reduce(begin any, atomicSolveFunction func(cntValue any, nxt T) any, parallelResultSolve func(sum1, sum2 any) any) any {
if atomicSolveFunction == nil {
panic("atomicSolveFunction must not nil")
}
if s.parallel && parallelResultSolve == nil {
panic("parallelResultSolve must not be nil where parallelResult")
}
//size := len(*s.options)
beginType := reflect.TypeOf(begin)
lock := &sync.Mutex{}
goRun[Option[T]](*s.options, s.parallel, func(pos int, options []Option[T]) error {
if s.parallel {
currentBegin := reflect.New(beginType).Elem().Interface()
for i := 0; i < len(options); i++ {
currentBegin = atomicSolveFunction(currentBegin, options[i].opt)
}
lock.Lock()
defer lock.Unlock()
if parallelResultSolve != nil {
begin = parallelResultSolve(begin, currentBegin)
}
} else {
for i := 0; i < len(options); i++ {
begin = atomicSolveFunction(begin, options[i].opt)
}
}
return nil
})
return begin
}
func (s *Stream[T, Slice]) Filter(fn func(i T) bool) *Stream[T, Slice] {
res := make(Slice, 0)
size := len(*s.options)
resChan := make(chan T, size)
goRun[Option[T]](*s.options, s.parallel, func(pos int, options []Option[T]) error {
for i := 0; i < len(options); i++ {
if fn((options)[i].opt) {
resChan <- (options)[i].opt
}
}
return nil
})
chanSize := len(resChan)
for i := 0; i < chanSize; i++ {
res = append(res, <-resChan)
}
return ToStream(&res)
}
func (s *Stream[T, Slice]) ToOptionList() Options[T] {
return *s.options
}
func (s *Stream[T, Slice]) ToList() []T {
res := make([]T, 0)
size := len(*s.options)
resChan := make(chan T, size)
goRun[Option[T]](*s.options, s.parallel, func(pos int, options []Option[T]) error {
for i := 0; i < len(options); i++ {
resChan <- (options)[i].opt
}
return nil
})
for i := 0; i < size; i++ {
res = append(res, <-resChan)
}
return res
}
func (s *Stream[T, Slice]) ToMap(k func(index int, item T) any, v func(i int, item T) any) map[any]any {
ress := sync.Map{}
size := len(*s.options)
goRun[Option[T]](*s.options, s.parallel, func(pos int, options []Option[T]) error {
for i := 0; i < len(options); i++ {
index := pos*optional.IsTrue(s.parallel,
optional.IsTrue((getPageSize(size)) == 0, 1, getPageSize(size)), 1) + i
ress.Store(k(index, (options)[i].opt), v(index, (options)[i].opt))
}
return nil
})
res := make(map[any]any)
ress.Range(func(key, value any) bool {
res[key] = value
return true // 继续遍历
})
return res
}
// GroupBy 分工单一原则,保证GroupBy无法修改options
func (s *Stream[T, Slice]) GroupBy(groupBy func(item T) any) map[any]Slice {
res := &sync.Map{}
size := len(*s.options)
goRun[Option[T]](*s.options, s.parallel, func(pos int, options []Option[T]) error {
for i := 0; i < len(options); i++ {
key := groupBy((options)[i].opt)
if _, ok := res.Load(key); !ok {
res.Store(key, make(Slice, 0))
}
value, ok := res.Load(key)
if ok {
index := pos*optional.IsTrue(s.parallel,
optional.IsTrue((getPageSize(size)) == 0, 1, getPageSize(size)), 1) + i
res.Store(key, append(value.(Slice), (*s.source)[index]))
}
}
return nil
})
result := make(map[any]Slice)
res.Range(func(key, value any) bool {
result[key] = value.(Slice)
return true // 继续遍历
})
return result
}
func (s *Stream[T, Slice]) OrderBy(desc bool, orderBy algorithm.HashComputeFunction) *Stream[T, Slice] {
if !s.parallel {
sort.SliceStable(*s.options, func(i, j int) bool {
a := orderBy((*s.options)[i].opt)
b := orderBy((*s.options)[j].opt)
if desc {
return a > b
} else {
return a < b
}
})
return s
}
size := len(*s.options)
data := make([]Options[T], 0, optional.IsTrue((getPageSize(size)) == 0, 1, getPageSize(size)))
// opt opt opt opt -> opts opts
goRun[Option[T]](*s.options, s.parallel, func(pos int, options []Option[T]) error {
//println(pos, options)
data = append(data, options)
return nil
})
optionsStream := newOptionsStream[T, Options[T]](&data)
optionsStream.parallel = s.parallel
sortedMap := optionsStream.Map(func(options Options[any]) any {
sort.SliceStable(options, func(i, j int) bool {
a := orderBy(options[i].opt)
b := orderBy(options[j].opt)
if desc {
return a > b
} else {
return a < b
}
})
return options
})
sortedMap.parallel = s.parallel
res := sortedMap.Map(func(v any) any {
i := v.(Options[any])
ress := newOptionStream(&i).Map(func(item Option[any]) any {
return item.opt
}).ToList()
return ress
}).ToList()
re := make([]any, 0)
toStream := ToStream(&res)
toStream.parallel = s.parallel
mergeSorted := toStream.Reduce(re, algorithm.MergeSortedArrayWithPrimaryData[T](desc, orderBy), algorithm.MergeSortedArrayWithPrimaryData[T](desc, orderBy)).([]any)
result := make(Slice, 0)
for i := 0; i < len(mergeSorted); i++ {
result = append(result, mergeSorted[i].(T))
}
//stream := ToStream(&result)
stream := ToStream(&result)
return stream
}
func (s *Stream[T, Slice]) OrderById(desc bool, orderBy algorithm.IDComputeFunction) *Stream[T, Slice] {
if !s.parallel {
sort.SliceStable(*s.options, func(i, j int) bool {
a := orderBy((*s.options)[i].opt)
b := orderBy((*s.options)[j].opt)
if desc {
return a > b
} else {
return a < b
}
})
return s
}
size := len(*s.options)
data := make([]Options[T], 0, optional.IsTrue((getPageSize(size)) == 0, 1, getPageSize(size)))
// opt opt opt opt -> opts opts
goRun[Option[T]](*s.options, s.parallel, func(pos int, options []Option[T]) error {
//println(pos, options)
data = append(data, options)
return nil
})
optionsStream := newOptionsStream[T, Options[T]](&data)
optionsStream.parallel = s.parallel
sortedMap := optionsStream.Map(func(options Options[any]) any {
sort.SliceStable(options, func(i, j int) bool {
a := orderBy(options[i].opt)
b := orderBy(options[j].opt)
if desc {
return a > b
} else {
return a < b
}
})
return options
})
sortedMap.parallel = s.parallel
res := sortedMap.Map(func(v any) any {
i := v.(Options[any])
ress := newOptionStream(&i).Map(func(item Option[any]) any {
return item.opt
}).ToList()
return ress
}).ToList()
re := make([]any, 0)
toStream := ToStream(&res)
toStream.parallel = s.parallel
mergeSorted := toStream.Reduce(re, algorithm.MergeSortedArrayWithPrimaryId[T](desc, orderBy), algorithm.MergeSortedArrayWithPrimaryId[T](desc, orderBy)).([]any)
result := make(Slice, 0)
for i := 0; i < len(mergeSorted); i++ {
result = append(result, mergeSorted[i].(T))
}
//stream := ToStream(&result)
stream := ToStream(&result)
return stream
}
func (s *Stream[T, Slice]) Sort(orderBy func(a, b T) bool) *Stream[T, Slice] {
if !s.parallel {
sort.SliceStable(*s.options, func(i, j int) bool {
return orderBy((*s.options)[i].opt, (*s.options)[j].opt)
})
return s
}
size := len(*s.options)
data := make([]Options[T], 0, optional.IsTrue((getPageSize(size)) == 0, 1, getPageSize(size)))
// opt opt opt opt -> opts opts
goRun[Option[T]](*s.options, s.parallel, func(pos int, options []Option[T]) error {
//println(pos, options)
data = append(data, options)
return nil
})
optionsStream := newOptionsStream[T, Options[T]](&data)
optionsStream.parallel = s.parallel
sortedMap := optionsStream.Map(func(options Options[any]) any {
sort.SliceStable(options, func(i, j int) bool {
return orderBy(options[i].opt.(T), options[j].opt.(T))
})
return options
})
sortedMap.parallel = s.parallel
res := sortedMap.Map(func(v any) any {
i := v.(Options[any])
ress := newOptionStream(&i).Map(func(item Option[any]) any {
return item.opt
}).ToList()
return ress
}).ToList()
re := make([]any, 0)
toStream := ToStream(&res)
toStream.parallel = s.parallel
mergeSorted := toStream.Reduce(re, algorithm.MergeSortedArrayWithEntity[T](orderBy), algorithm.MergeSortedArrayWithEntity[T](orderBy)).([]any)
result := make(Slice, 0)
for i := 0; i < len(mergeSorted); i++ {
result = append(result, mergeSorted[i].(T))
}
//stream := ToStream(&result)
stream := ToStream(&result)
return stream
}
func (s *Stream[T, Slice]) Collect(call func(data Options[T], sourceData Slice) any) any {
res := call(*s.options, *s.source)
return res
}
func (s *Stream[T, Slice]) ForEach(fn func(item T)) *Stream[T, Slice] {
//size := len(*s.options)
goRun[Option[T]](*s.options, s.parallel, func(pos int, options []Option[T]) error {
for i := 0; i < len(options); i++ {
fn((options)[i].opt)
}
return nil
})
return s
}
func (s *Stream[T, Slice]) Count() int64 {
return int64(len(*s.options))
}
func (s *Stream[T, Slice]) Parallel() *Stream[T, Slice] {
s.parallel = true
return s
}
func (s *Stream[T, Slice]) UnParallel() *Stream[T, Slice] {
s.parallel = false
return s
}
package stream
import (
"github.com/karosown/katool/convert"
)
type Entry[K comparable, V any] struct {
Key K
Value V
}
type Entries[K comparable, V any] []Entry[K, V]
func EntrySet[K comparable, V any](m map[K]V) Entries[K, V] {
var entries []Entry[K, V]
for k, v := range m {
entries = append(entries, Entry[K, V]{Key: k, Value: v})
}
return entries
}
func (e Entries[K, V]) Identity() *[]Entry[K, V] {
convert := make([]Entry[K, V], len(e))
for i := 0; i < len(e); i++ {
convert[i] = Entry[K, V]{Key: e[i].Key, Value: e[i].Value}
}
return &convert
}
func (e Entries[K, V]) KeySet() []K {
keyset := e.ToStream().Map(func(i Entry[K, V]) any {
return i.Key
}).ToList()
return convert.FromAnySlice[K](keyset)
}
func (e Entries[K, V]) Values() []V {
keyset := e.ToStream().Map(func(i Entry[K, V]) any {
return i.Value
}).ToList()
return convert.FromAnySlice[V](keyset)
}
func (e Entries[K, V]) KeySetStream() *Stream[K, []K] {
ks := e.KeySet()
return ToStream(&ks)
}
func (e Entries[K, V]) ValuesStream() *Stream[V, []V] {
ks := e.Values()
return ToStream(&ks)
}
func (e Entries[K, V]) ToStream() *Stream[Entry[K, V], []Entry[K, V]] {
return ToStream[Entry[K, V], []Entry[K, V]](e.Identity())
}
func (e Entries[K, V]) ToParallelStream() *Stream[Entry[K, V], []Entry[K, V]] {
return ToParallelStream[Entry[K, V], []Entry[K, V]](e.Identity())
}
package container_test
import (
"fmt"
"math/rand"
"strconv"
"testing"
"time"
"github.com/duke-git/lancet/v2/maputil"
"github.com/duke-git/lancet/v2/random"
"github.com/karosown/katool/algorithm"
"github.com/karosown/katool/container/stream"
"github.com/karosown/katool/convert"
)
type user struct {
Name string `json:"name"`
Age int `json:"age"`
Sex int `json:"sex"`
Money int `json:"money"`
Class string `json:"class"`
Id int `json:"id"`
}
type userVo struct {
Name string `json:"name"`
Age int `json:"age"`
Sex int `json:"sex"`
Id int `json:"id"`
}
var userList []user
func TestOfStream(t *testing.T) {
arr := []int{1, 3, 2, 3, 3, 3, 3, 1, 3, 2, 3, 3, 3, 3, 1, 3, 2, 3, 3, 3, 3, 1, 3, 2, 3, 3, 3, 3, 1, 3, 2, 3, 3, 3, 3, 1, 3, 2, 3, 3, 3, 3}
distinct := stream.ToStream(&arr).
Parallel().
Filter(func(i int) bool {
return i > 1
}).Map(func(item int) any {
return strconv.Itoa(item) + "w "
}).DistinctBy(algorithm.HASH_WITH_JSON_SUM)
fmt.Println(distinct.Reduce("", func(cntValue any, nxt any) any {
return cntValue.(string) + nxt.(string)
}, func(sum1, sum2 any) any {
return sum1.(string) + sum2.(string)
}))
list := distinct.ToOptionList()
list.ForEach(func(s any) {
fmt.Println(s)
})
toMap := stream.ToStream(&arr).
//Parallel().
ToMap(func(index int, item int) any {
return index
}, func(index int, item int) any {
return item
})
maputil.ForEach(toMap, func(key any, value any) {
fmt.Printf("key: %v, value: %v\n", key, value)
})
}
func Test_Map(t *testing.T) {
ul := userList[:]
// 计数
userStream := stream.ToStream(&ul).Parallel()
println(userStream.Count())
// 排序
stream.ToStream(&ul).
Parallel().
Sort(func(a user, b user) bool { return a.Id < b.Id }).ForEach(func(item user) { println(convert.ToString(item.Id) + " " + item.Name) })
// 求和
totalMoney := userStream.Reduce(int64(0), func(cntValue any, nxt user) any { return cntValue.(int64) + int64(nxt.Money) }, func(sum1, sum2 any) any {
return sum1.(int64) + sum2.(int64)
})
println(totalMoney.(int64))
// 过滤
userStream.Filter(func(item user) bool { return item.Sex != 0 }).DistinctBy(algorithm.HASH_WITH_JSON_MD5).ToOptionList().ForEach(func(item user) { println(item.Name) })
// 转换
s := userStream.Map(func(item user) any {
properties, err := convert.CopyProperties(&item, &userVo{})
if err != nil {
panic(err)
}
return properties
}).ToOptionList()
s.ForEach(func(s any) {
fmt.Println(s)
})
}
func Test_GroupBy(t *testing.T) {
users := userList[:]
by := stream.ToStream(&users).GroupBy(func(user user) any {
return user.Class
})
println(by)
}
func init() {
classes := []string{"一班", "二班", "三班", "四班", "五班"}
userList = make([]user, 0)
for i := 0; i < 100; i++ {
userList = append(userList, user{
Name: random.RandString(10),
Class: classes[rand.Int()%len(classes)],
Age: rand.Intn(100),
Sex: rand.Intn(2),
})
time.Sleep(1)
}
}
func Test(t *testing.T) {
by := stream.ToStream(&userList).Parallel().GroupBy(func(user user) any { return user.Class })
maputil.ForEach(by, func(key any, value []user) {
println(key.(string))
toStream := stream.ToStream(&value).Parallel()
toStream.ForEach(func(item user) {
fmt.Println(item)
})
reduce := toStream.Reduce(0, func(cntValue any, nxt user) any {
return cntValue.(int) + nxt.Sex
}, func(cntValue any, nxt any) any {
return cntValue.(int) + nxt.(int)
})
println("男生总数:" + convert.ToString(reduce))
reduce = toStream.Reduce(0, func(cntValue any, nxt user) any {
return cntValue.(int) + (nxt.Sex ^ 1)
}, func(cntValue any, nxt any) any {
return cntValue.(int) + nxt.(int)
})
println("女生总数:" + convert.ToString(reduce))
})
toStream := stream.ToStream(&userList).Parallel()
count := toStream.Filter(func(item user) bool { return item.Age >= 60 }).Count()
println("年龄大于等于60岁的共" + convert.ToString(count) + "人")
count = toStream.Filter(func(item user) bool { return item.Age < 60 }).Count()
println("年龄小于60岁的共" + convert.ToString(count) + "人")
toStream.FlatMap(func(i user) *stream.Stream[any, []any] {
array := convert.ToAnySlice([]rune(i.Name))
return stream.ToStream(&array)
}).ForEach(func(item any) {
print(string(item.(rune)) + " ")
})
}
func Test_Sorted(t *testing.T) {
arr := []int{1, 2, 3, 4, 5, 6, 7, 1, 2, 8, 9, 10}
for i := 0; i < 100; i++ {
arr = append(arr, 1, 2, 3, 4, 5, 6, 7, 1, 2, 8, 9, 10)
}
stream.ToStream(&arr).Parallel().Sort(func(a, b int) bool { return a < b }).ForEach(func(item int) {
fmt.Println(item)
})
}
func Test_Distinct(t *testing.T) {
arr := []int{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}
for i := 0; i < 10000; i++ {
arr = append(arr, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
}
stream.ToStream(&arr).Parallel().Distinct().ForEach(func(item int) {
fmt.Println(item)
})
}
这里给出最简单的,如果要使用建议去github fork一份: https://github.com/Karosown/katool-go
本文不仅从理论上分析了Java中的Stream及Go的并发流实现,还具体讲述了分片、异步操作及多线程的思想和实践。Stream的核心是将数据操作流式化,本质上是通过灵活的API按照声明式编程的思维进行处理。在Java中,凭借Fork/Join框架,Stream能够高效分片并行处理数据;而在Go中,协程与分片操作结合形成了简单高效的异步流解决方案。
通过对分片工具进行运用与封装,以及对排序、任务合并等算法的实践,我们阐述了一种从Java Stream到Go流处理的迁移与优化思路。与此同时,Stream在实际场景中的高级功能如分片并发排序、分组、去重等,也全面展示了Go语言在流式处理方面的可行性和高效性。
展望未来,结合对性能进一步优化及多场景扩展,Go流处理能够迎来更加广泛的应用场景。从这一讨论中,我们可以更好地理解如何以编程语言与操作模式的特性为导向,选择最优的数据流处理方案。
下一章:Go的几个服务框架对比
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。