Concurrency
Examples in
Go
package main
import (
"fmt"
"time"
)
func main() {
fmt.Printf("Current Unix Time: %v\n", time.Now().Unix())
time.Sleep(2 * time.Second)
fmt.Printf("Current Unix Time: %v\n", time.Now().Unix())
}
Go’s select lets you wait on multiple channel operations. Combining goroutines and channels with select is a powerful feature of Go.
package main
import (
"fmt"
"time"
)
func main() {
c1 := make(chan string)
c2 := make(chan string)
go func() {
time.Sleep(1 * time.Second)
c1 <- "one"
}()
go func() {
time.Sleep(2 * time.Second)
c2 <- "two"
}()
for i := 0; i < 2; i++ {
select {
case msg1 := <-c1:
fmt.Println("received", msg1)
case msg2 := <-c2:
fmt.Println("received", msg2)
}
}
}
package main
import (
"fmt"
"time"
)
func worker(id int, jobs <-chan int, results chan<- int) {
for j := range jobs {
fmt.Println("worker", id, "started job", j)
time.Sleep(time.Second)
fmt.Println("worker", id, "finished job", j)
results <- j * 2
}
}
func main() {
jobs := make(chan int, 100)
results := make(chan int, 100)
for w := 1; w <= 3; w++ {
go worker(w, jobs, results)
}
for j := 1; j <= 5; j++ {
jobs <- j
}
close(jobs)
for a := 1; a <= 5; a++ {
<-results
}
}
package main
import (
"fmt"
"math/rand"
"sync"
"sync/atomic"
"time"
)
func main() {
// For our example the state will be a map.
var state = make(map[int]int)
var mutex = &sync.Mutex{}
var readOps uint64
var writeOps uint64
// Here we start 100 goroutines to execute repeated reads against the state,
// once per millisecond in each goroutine.
for r := 0; r < 100; r++ {
go func() {
total := 0
for {
// For each read we pick a key to access,
// Lock() the mutex to ensure exclusive access to the state,
// read the value at the chosen key, Unlock() the mutex,
// and increment the readOps count.
key := rand.Intn(5)
mutex.Lock()
total += state[key]
mutex.Unlock()
atomic.AddUint64(&readOps, 1)
time.Sleep(time.Millisecond)
}
}()
}
// We’ll also start 10 goroutines to simulate writes, using the same pattern we did for reads.
for w := 0; w < 10; w++ {
go func() {
for {
key := rand.Intn(5)
val := rand.Intn(100)
mutex.Lock()
state[key] = val
mutex.Unlock()
atomic.AddUint64(&writeOps, 1)
time.Sleep(time.Millisecond)
}
}()
}
time.Sleep(time.Second)
// Take and report final operation counts.
readOpsFinal := atomic.LoadUint64(&readOps)
fmt.Println("readOps:", readOpsFinal)
writeOpsFinal := atomic.LoadUint64(&writeOps)
fmt.Println("writeOps:", writeOpsFinal)
// With a final lock of state, show how it ended up.
mutex.Lock()
fmt.Println("state:", state)
mutex.Unlock()
}
package main
import (
"fmt"
"sync"
"sync/atomic"
)
func main() {
var ops uint64
var wg sync.WaitGroup
for i := 0; i < 50; i++ {
wg.Add(1)
go func() {
for c := 0; c < 1000; c++ {
atomic.AddUint64(&ops, 1)
}
wg.Done()
}()
}
wg.Wait()
fmt.Println("ops:", ops)
}
Here we use the built-in synchronization features of goroutines and channels to achieve synchronized access to shared state. This channel-based approach aligns with Go’s ideas of sharing memory by communicating and having each piece of data owned by exactly 1 goroutine.
package main
import (
"fmt"
"math/rand"
"sync/atomic"
"time"
)
// In this example our state will be owned by a single goroutine.
// This will guarantee that the data is never corrupted with concurrent access.
// In order to read or write that state, other goroutines will send messages to the owning goroutine and receive corresponding replies.
// These readOp and writeOp structs encapsulate those requests and a way for the owning goroutine to respond.
type readOp struct {
key int
resp chan int
}
type writeOp struct {
key int
val int
resp chan bool
}
func main() {
var readOps uint64
var writeOps uint64
// The reads and writes channels will be used by other goroutines to issue read and write requests, respectively.
reads := make(chan readOp)
writes := make(chan writeOp)
// Here is the goroutine that owns the state, which is a map as in the previous example but now private to the stateful goroutine.
// This goroutine repeatedly selects on the reads and writes channels, responding to requests as they arrive.
// A response is executed by first performing the requested operation and then sending a value on the response channel resp to indicate success (and the desired value in the case of reads).
go func() {
var state = make(map[int]int)
for {
select {
case read := <-reads:
read.resp <- state[read.key]
case write := <-writes:
state[write.key] = write.val
write.resp <- true
}
}
}()
// This starts 100 goroutines to issue reads to the state-owning goroutine via the reads channel.
// Each read requires constructing a readOp, sending it over the reads channel, and the receiving the result over the provided resp channel.
for r := 0; r < 100; r++ {
go func() {
for {
read := readOp{
key: rand.Intn(5),
resp: make(chan int)}
reads <- read
<-read.resp
atomic.AddUint64(&readOps, 1)
time.Sleep(time.Millisecond)
}
}()
}
// We start 10 writes as well, using a similar approach.
for w := 0; w < 10; w++ {
go func() {
for {
write := writeOp{
key: rand.Intn(5),
val: rand.Intn(100),
resp: make(chan bool)}
writes <- write
<-write.resp
atomic.AddUint64(&writeOps, 1)
time.Sleep(time.Millisecond)
}
}()
}
time.Sleep(time.Second)
readOpsFinal := atomic.LoadUint64(&readOps)
fmt.Println("readOps:", readOpsFinal)
writeOpsFinal := atomic.LoadUint64(&writeOps)
fmt.Println("writeOps:", writeOpsFinal)
}
We can use the same looping syntax to iterate over received values in go channels.
package main
import "fmt"
func main() {
queue := make(chan string, 2)
queue <- "one"
queue <- "two"
close(queue)
for elem := range queue {
fmt.Println(elem)
}
}
Basic sends and receives on channels are blocking. However, we can use select with a default clause to implement non-blocking sends, receives, and even non-blocking multi-way selects.
package main
import "fmt"
func main() {
messages := make(chan string)
signals := make(chan bool)
// Here's a non-blocking receive.
// If a value is available on messages then select will take the <-messages case with that value.
// If not it will immediately take the default case.
select {
case msg := <-messages:
fmt.Println("received message", msg)
default:
fmt.Println("no message received")
}
// A non-blocking send works similarly.
// Here msg cannot be sent to the messages channel, because the channel has no buffer and there is no receiver.
// Therefore the default case is selected.
msg := "hi"
select {
case messages <- msg:
fmt.Println("sent message", msg)
default:
fmt.Println("no message sent")
}
// We can use multiple cases above the default clause to implement a multi-way non-blocking select.
// Here we attempt non-blocking receives on both messages and signals.
select {
case msg := <-messages:
fmt.Println("received message", msg)
case sig := <-signals:
fmt.Println("received signal", sig)
default:
fmt.Println("no activity")
}
}
Implementing timeouts in Go is easy and elegant thanks to channels and select.
package main
import (
"fmt"
"time"
)
func main() {
c1 := make(chan string, 1)
go func() {
time.Sleep(2 * time.Second)
c1 <- "result 1"
}()
// Here's the select implementing a timeout.
// res := <-c1 awaits the result and <-Time.After awaits a value to be sent after the timeout of 1s.
// Since select proceeds with the first receive that’s ready,
// we’ll take the timeout case if the operation takes more than the allowed 1s.
select {
case res := <-c1:
fmt.Println(res)
case <-time.After(1 * time.Second):
fmt.Println("timeout 1")
}
c2 := make(chan string, 1)
go func() {
time.Sleep(2 * time.Second)
c2 <- "result 2"
}()
select {
case res := <-c2:
fmt.Println(res)
// If we allow a longer timeout of 3s, then the receive from c2 will succeed and we’ll print the result.
case <-time.After(3 * time.Second):
fmt.Println("timeout 2")
}
}
Timers represent a single event in the future. You tell the timer how long you want to wait, and it provides a channel that will be notified at that time.
package main
import (
"fmt"
"time"
)
func main() {
timer1 := time.NewTimer(2 * time.Second)
<-timer1.C
fmt.Println("Timer 1 expired")
timer2 := time.NewTimer(time.Second)
go func() {
<-timer2.C
fmt.Println("Timer 2 expired")
}()
stop2 := timer2.Stop()
if stop2 {
fmt.Println("Timer 2 stopped")
}
}
tickers are for when you want to do something repeatedly at regular intervals. Here’s an example of a ticker that ticks periodically until we stop it.
package main
import (
"fmt"
"time"
)
func main() {
ticker := time.NewTicker(500 * time.Millisecond)
done := make(chan bool)
go func() {
for {
select {
case <-done:
return
case t := <-ticker.C:
fmt.Println("Tick at", t)
}
}
}()
time.Sleep(1600 * time.Millisecond)
ticker.Stop()
done <- true
fmt.Println("Ticker stopped")
}
Last Run
:
Tick at 2021-01-29 22:04:49.601892024 +0000 UTC m=+0.500204752
Tick at 2021-01-29 22:04:50.101941423 +0000 UTC m=+1.000254145
Tick at 2021-01-29 22:04:50.601894103 +0000 UTC m=+1.500206871
Ticker stopped
Rate limiting is an important mechanism for controlling resource utilization and maintaining quality of service. Go elegantly supports rate limiting with goroutines, channels, and tickers.
package main
import (
"fmt"
"time"
)
func main() {
// First we'll look at basic rate limiting.
// Suppose we want to limit our handling of incoming requests.
// We'll serve these requests off a channel of the same name.
requests := make(chan int, 5)
for i := 1; i <= 5; i++ {
requests <- i
}
close(requests)
// This limiter channel will receive a value every 200 milliseconds.
// This is the regulator in our rate limiting scheme.
limiter := time.Tick(200 * time.Millisecond)
// By blocking on a receive from the limiter channel before serving each request,
// we limit ourselves to 1 request every 200 milliseconds.
for req := range requests {
<-limiter
fmt.Println("request", req, time.Now())
}
// We may want to allow short bursts of requests in our rate limiting scheme while preserving the overall rate limit.
// We can accomplish this by buffering our limiter channel.
// This burstyLimiter channel will allow bursts of up to 3 events.
burstyLimiter := make(chan time.Time, 3)
// Fill up the channel to represent allowed bursting.
for i := 0; i < 3; i++ {
burstyLimiter <- time.Now()
}
// Every 200 milliseconds we’ll try to add a new value to burstyLimiter, up to its limit of 3.
go func() {
for t := range time.Tick(200 * time.Millisecond) {
burstyLimiter <- t
}
}()
// Now simulate 5 more incoming requests.
// The first 3 of these will benefit from the burst capability of burstyLimiter.
burstyRequests := make(chan int, 5)
for i := 1; i <= 5; i++ {
burstyRequests <- i
}
close(burstyRequests)
for req := range burstyRequests {
<-burstyLimiter
fmt.Println("request", req, time.Now())
}
}