2022-02-28 22:32:19 +00:00
|
|
|
// run
|
2021-03-14 06:41:51 +00:00
|
|
|
|
|
|
|
// Copyright 2021 The Go Authors. All rights reserved.
|
|
|
|
// Use of this source code is governed by a BSD-style
|
|
|
|
// license that can be found in the LICENSE file.
|
|
|
|
|
|
|
|
// Package chans provides utility functions for working with channels.
|
|
|
|
package main
|
|
|
|
|
|
|
|
import (
|
|
|
|
"context"
|
|
|
|
"fmt"
|
|
|
|
"runtime"
|
|
|
|
"sort"
|
|
|
|
"sync"
|
|
|
|
"time"
|
|
|
|
)
|
|
|
|
|
|
|
|
// _Equal reports whether two slices are equal: the same length and all
|
|
|
|
// elements equal. All floating point NaNs are considered equal.
|
|
|
|
func _SliceEqual[Elem comparable](s1, s2 []Elem) bool {
|
|
|
|
if len(s1) != len(s2) {
|
|
|
|
return false
|
|
|
|
}
|
|
|
|
for i, v1 := range s1 {
|
|
|
|
v2 := s2[i]
|
|
|
|
if v1 != v2 {
|
|
|
|
isNaN := func(f Elem) bool { return f != f }
|
|
|
|
if !isNaN(v1) || !isNaN(v2) {
|
|
|
|
return false
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
return true
|
|
|
|
}
|
|
|
|
|
|
|
|
// _ReadAll reads from c until the channel is closed or the context is
|
|
|
|
// canceled, returning all the values read.
|
|
|
|
func _ReadAll[Elem any](ctx context.Context, c <-chan Elem) []Elem {
|
|
|
|
var r []Elem
|
|
|
|
for {
|
|
|
|
select {
|
|
|
|
case <-ctx.Done():
|
|
|
|
return r
|
|
|
|
case v, ok := <-c:
|
|
|
|
if !ok {
|
|
|
|
return r
|
|
|
|
}
|
|
|
|
r = append(r, v)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
// _Merge merges two channels into a single channel.
|
|
|
|
// This will leave a goroutine running until either both channels are closed
|
|
|
|
// or the context is canceled, at which point the returned channel is closed.
|
|
|
|
func _Merge[Elem any](ctx context.Context, c1, c2 <-chan Elem) <-chan Elem {
|
|
|
|
r := make(chan Elem)
|
|
|
|
go func(ctx context.Context, c1, c2 <-chan Elem, r chan<- Elem) {
|
|
|
|
defer close(r)
|
|
|
|
for c1 != nil || c2 != nil {
|
|
|
|
select {
|
|
|
|
case <-ctx.Done():
|
|
|
|
return
|
|
|
|
case v1, ok := <-c1:
|
|
|
|
if ok {
|
|
|
|
r <- v1
|
|
|
|
} else {
|
|
|
|
c1 = nil
|
|
|
|
}
|
|
|
|
case v2, ok := <-c2:
|
|
|
|
if ok {
|
|
|
|
r <- v2
|
|
|
|
} else {
|
|
|
|
c2 = nil
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}(ctx, c1, c2, r)
|
|
|
|
return r
|
|
|
|
}
|
|
|
|
|
|
|
|
// _Filter calls f on each value read from c. If f returns true the value
|
|
|
|
// is sent on the returned channel. This will leave a goroutine running
|
|
|
|
// until c is closed or the context is canceled, at which point the
|
|
|
|
// returned channel is closed.
|
|
|
|
func _Filter[Elem any](ctx context.Context, c <-chan Elem, f func(Elem) bool) <-chan Elem {
|
|
|
|
r := make(chan Elem)
|
|
|
|
go func(ctx context.Context, c <-chan Elem, f func(Elem) bool, r chan<- Elem) {
|
|
|
|
defer close(r)
|
|
|
|
for {
|
|
|
|
select {
|
|
|
|
case <-ctx.Done():
|
|
|
|
return
|
|
|
|
case v, ok := <-c:
|
|
|
|
if !ok {
|
|
|
|
return
|
|
|
|
}
|
|
|
|
if f(v) {
|
|
|
|
r <- v
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}(ctx, c, f, r)
|
|
|
|
return r
|
|
|
|
}
|
|
|
|
|
|
|
|
// _Sink returns a channel that discards all values sent to it.
|
|
|
|
// This will leave a goroutine running until the context is canceled
|
|
|
|
// or the returned channel is closed.
|
|
|
|
func _Sink[Elem any](ctx context.Context) chan<- Elem {
|
|
|
|
r := make(chan Elem)
|
|
|
|
go func(ctx context.Context, r <-chan Elem) {
|
|
|
|
for {
|
|
|
|
select {
|
|
|
|
case <-ctx.Done():
|
|
|
|
return
|
|
|
|
case _, ok := <-r:
|
|
|
|
if !ok {
|
|
|
|
return
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}(ctx, r)
|
|
|
|
return r
|
|
|
|
}
|
|
|
|
|
|
|
|
// An Exclusive is a value that may only be used by a single goroutine
|
|
|
|
// at a time. This is implemented using channels rather than a mutex.
|
|
|
|
type _Exclusive[Val any] struct {
|
|
|
|
c chan Val
|
|
|
|
}
|
|
|
|
|
|
|
|
// _MakeExclusive makes an initialized exclusive value.
|
|
|
|
func _MakeExclusive[Val any](initial Val) *_Exclusive[Val] {
|
|
|
|
r := &_Exclusive[Val]{
|
|
|
|
c: make(chan Val, 1),
|
|
|
|
}
|
|
|
|
r.c <- initial
|
|
|
|
return r
|
|
|
|
}
|
|
|
|
|
|
|
|
// _Acquire acquires the exclusive value for private use.
|
|
|
|
// It must be released using the Release method.
|
|
|
|
func (e *_Exclusive[Val]) Acquire() Val {
|
|
|
|
return <-e.c
|
|
|
|
}
|
|
|
|
|
|
|
|
// TryAcquire attempts to acquire the value. The ok result reports whether
|
|
|
|
// the value was acquired. If the value is acquired, it must be released
|
|
|
|
// using the Release method.
|
|
|
|
func (e *_Exclusive[Val]) TryAcquire() (v Val, ok bool) {
|
|
|
|
select {
|
|
|
|
case r := <-e.c:
|
|
|
|
return r, true
|
|
|
|
default:
|
|
|
|
return v, false
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
// Release updates and releases the value.
|
|
|
|
// This method panics if the value has not been acquired.
|
|
|
|
func (e *_Exclusive[Val]) Release(v Val) {
|
|
|
|
select {
|
|
|
|
case e.c <- v:
|
|
|
|
default:
|
|
|
|
panic("_Exclusive Release without Acquire")
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
// Ranger returns a Sender and a Receiver. The Receiver provides a
|
|
|
|
// Next method to retrieve values. The Sender provides a Send method
|
|
|
|
// to send values and a Close method to stop sending values. The Next
|
|
|
|
// method indicates when the Sender has been closed, and the Send
|
|
|
|
// method indicates when the Receiver has been freed.
|
|
|
|
//
|
|
|
|
// This is a convenient way to exit a goroutine sending values when
|
|
|
|
// the receiver stops reading them.
|
|
|
|
func _Ranger[Elem any]() (*_Sender[Elem], *_Receiver[Elem]) {
|
|
|
|
c := make(chan Elem)
|
|
|
|
d := make(chan struct{})
|
|
|
|
s := &_Sender[Elem]{
|
|
|
|
values: c,
|
|
|
|
done: d,
|
|
|
|
}
|
2021-07-28 20:39:30 +00:00
|
|
|
r := &_Receiver[Elem]{
|
2021-03-14 06:41:51 +00:00
|
|
|
values: c,
|
|
|
|
done: d,
|
|
|
|
}
|
|
|
|
runtime.SetFinalizer(r, (*_Receiver[Elem]).finalize)
|
|
|
|
return s, r
|
|
|
|
}
|
|
|
|
|
|
|
|
// A _Sender is used to send values to a Receiver.
|
|
|
|
type _Sender[Elem any] struct {
|
|
|
|
values chan<- Elem
|
|
|
|
done <-chan struct{}
|
|
|
|
}
|
|
|
|
|
|
|
|
// Send sends a value to the receiver. It reports whether the value was sent.
|
|
|
|
// The value will not be sent if the context is closed or the receiver
|
|
|
|
// is freed.
|
|
|
|
func (s *_Sender[Elem]) Send(ctx context.Context, v Elem) bool {
|
|
|
|
select {
|
|
|
|
case <-ctx.Done():
|
|
|
|
return false
|
|
|
|
case s.values <- v:
|
|
|
|
return true
|
|
|
|
case <-s.done:
|
|
|
|
return false
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
// Close tells the receiver that no more values will arrive.
|
|
|
|
// After Close is called, the _Sender may no longer be used.
|
|
|
|
func (s *_Sender[Elem]) Close() {
|
|
|
|
close(s.values)
|
|
|
|
}
|
|
|
|
|
|
|
|
// A _Receiver receives values from a _Sender.
|
|
|
|
type _Receiver[Elem any] struct {
|
|
|
|
values <-chan Elem
|
|
|
|
done chan<- struct{}
|
|
|
|
}
|
|
|
|
|
|
|
|
// Next returns the next value from the channel. The bool result indicates
|
|
|
|
// whether the value is valid.
|
|
|
|
func (r *_Receiver[Elem]) Next(ctx context.Context) (v Elem, ok bool) {
|
|
|
|
select {
|
|
|
|
case <-ctx.Done():
|
|
|
|
case v, ok = <-r.values:
|
|
|
|
}
|
|
|
|
return v, ok
|
|
|
|
}
|
|
|
|
|
|
|
|
// finalize is a finalizer for the receiver.
|
|
|
|
func (r *_Receiver[Elem]) finalize() {
|
|
|
|
close(r.done)
|
|
|
|
}
|
|
|
|
|
|
|
|
func TestReadAll() {
|
|
|
|
c := make(chan int)
|
|
|
|
go func() {
|
|
|
|
c <- 4
|
|
|
|
c <- 2
|
|
|
|
c <- 5
|
|
|
|
close(c)
|
|
|
|
}()
|
|
|
|
got := _ReadAll(context.Background(), c)
|
|
|
|
want := []int{4, 2, 5}
|
|
|
|
if !_SliceEqual(got, want) {
|
|
|
|
panic(fmt.Sprintf("_ReadAll returned %v, want %v", got, want))
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
func TestMerge() {
|
|
|
|
c1 := make(chan int)
|
|
|
|
c2 := make(chan int)
|
|
|
|
go func() {
|
|
|
|
c1 <- 1
|
|
|
|
c1 <- 3
|
|
|
|
c1 <- 5
|
|
|
|
close(c1)
|
|
|
|
}()
|
|
|
|
go func() {
|
|
|
|
c2 <- 2
|
|
|
|
c2 <- 4
|
|
|
|
c2 <- 6
|
|
|
|
close(c2)
|
|
|
|
}()
|
|
|
|
ctx := context.Background()
|
|
|
|
got := _ReadAll(ctx, _Merge(ctx, c1, c2))
|
|
|
|
sort.Ints(got)
|
|
|
|
want := []int{1, 2, 3, 4, 5, 6}
|
|
|
|
if !_SliceEqual(got, want) {
|
|
|
|
panic(fmt.Sprintf("_Merge returned %v, want %v", got, want))
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
func TestFilter() {
|
|
|
|
c := make(chan int)
|
|
|
|
go func() {
|
|
|
|
c <- 1
|
|
|
|
c <- 2
|
|
|
|
c <- 3
|
|
|
|
close(c)
|
|
|
|
}()
|
|
|
|
even := func(i int) bool { return i%2 == 0 }
|
|
|
|
ctx := context.Background()
|
|
|
|
got := _ReadAll(ctx, _Filter(ctx, c, even))
|
|
|
|
want := []int{2}
|
|
|
|
if !_SliceEqual(got, want) {
|
|
|
|
panic(fmt.Sprintf("_Filter returned %v, want %v", got, want))
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
func TestSink() {
|
|
|
|
c := _Sink[int](context.Background())
|
|
|
|
after := time.NewTimer(time.Minute)
|
|
|
|
defer after.Stop()
|
|
|
|
send := func(v int) {
|
|
|
|
select {
|
|
|
|
case c <- v:
|
|
|
|
case <-after.C:
|
|
|
|
panic("timed out sending to _Sink")
|
|
|
|
}
|
|
|
|
}
|
|
|
|
send(1)
|
|
|
|
send(2)
|
|
|
|
send(3)
|
|
|
|
close(c)
|
|
|
|
}
|
|
|
|
|
|
|
|
func TestExclusive() {
|
|
|
|
val := 0
|
|
|
|
ex := _MakeExclusive(&val)
|
|
|
|
|
|
|
|
var wg sync.WaitGroup
|
|
|
|
f := func() {
|
|
|
|
defer wg.Done()
|
|
|
|
for i := 0; i < 10; i++ {
|
|
|
|
p := ex.Acquire()
|
|
|
|
(*p)++
|
|
|
|
ex.Release(p)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
wg.Add(2)
|
|
|
|
go f()
|
|
|
|
go f()
|
|
|
|
|
|
|
|
wg.Wait()
|
|
|
|
if val != 20 {
|
|
|
|
panic(fmt.Sprintf("after Acquire/Release loop got %d, want 20", val))
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
func TestExclusiveTry() {
|
|
|
|
s := ""
|
|
|
|
ex := _MakeExclusive(&s)
|
|
|
|
p, ok := ex.TryAcquire()
|
|
|
|
if !ok {
|
|
|
|
panic("TryAcquire failed")
|
|
|
|
}
|
|
|
|
*p = "a"
|
|
|
|
|
|
|
|
var wg sync.WaitGroup
|
|
|
|
wg.Add(1)
|
|
|
|
go func() {
|
|
|
|
defer wg.Done()
|
|
|
|
_, ok := ex.TryAcquire()
|
|
|
|
if ok {
|
|
|
|
panic(fmt.Sprintf("TryAcquire succeeded unexpectedly"))
|
|
|
|
}
|
|
|
|
}()
|
|
|
|
wg.Wait()
|
|
|
|
|
|
|
|
ex.Release(p)
|
|
|
|
|
|
|
|
p, ok = ex.TryAcquire()
|
|
|
|
if !ok {
|
|
|
|
panic(fmt.Sprintf("TryAcquire failed"))
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
func TestRanger() {
|
|
|
|
s, r := _Ranger[int]()
|
|
|
|
|
|
|
|
ctx := context.Background()
|
|
|
|
go func() {
|
|
|
|
// Receive one value then exit.
|
|
|
|
v, ok := r.Next(ctx)
|
|
|
|
if !ok {
|
|
|
|
panic(fmt.Sprintf("did not receive any values"))
|
|
|
|
} else if v != 1 {
|
|
|
|
panic(fmt.Sprintf("received %d, want 1", v))
|
|
|
|
}
|
|
|
|
}()
|
|
|
|
|
|
|
|
c1 := make(chan bool)
|
|
|
|
c2 := make(chan bool)
|
|
|
|
go func() {
|
|
|
|
defer close(c2)
|
|
|
|
if !s.Send(ctx, 1) {
|
|
|
|
panic(fmt.Sprintf("Send failed unexpectedly"))
|
|
|
|
}
|
|
|
|
close(c1)
|
|
|
|
if s.Send(ctx, 2) {
|
|
|
|
panic(fmt.Sprintf("Send succeeded unexpectedly"))
|
|
|
|
}
|
|
|
|
}()
|
|
|
|
|
|
|
|
<-c1
|
|
|
|
|
|
|
|
// Force a garbage collection to try to get the finalizers to run.
|
|
|
|
runtime.GC()
|
|
|
|
|
|
|
|
select {
|
|
|
|
case <-c2:
|
|
|
|
case <-time.After(time.Minute):
|
|
|
|
panic("_Ranger Send should have failed, but timed out")
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
func main() {
|
|
|
|
TestReadAll()
|
|
|
|
TestMerge()
|
|
|
|
TestFilter()
|
|
|
|
TestSink()
|
|
|
|
TestExclusive()
|
|
|
|
TestExclusiveTry()
|
|
|
|
TestRanger()
|
|
|
|
}
|