Тупик в диапазоне по каналу в конвейере с несколькими группами ожидания

Я отрабатываю задачу по вычислению факториалов путем одновременного разделения вычислений на 100 групп, я решил много проблем с WaitGroups, но все же в функции calculateFactorial я получил взаимоблокировку по диапазону по части канала. Хотелось бы, чтобы кто-нибудь мог указать здесь на проблему, спасибо.

package main

import (
    "fmt"
    "sync"
)

func main() {
    var wg sync.WaitGroup
    wg.Add(2)
    in := make (chan int)
    out := make (chan float64)



    out = calculateFactorial(genConcurrentGroup(in, &wg), &wg)

    go func() {
        in <- 10
        close(in)
    }()

    fmt.Println(<-out)

    wg.Wait()


}

//split input number into groups
//the result should be a map of [start number, number in group]
//this is not heavy task so run in one go routine
func genConcurrentGroup(c chan int, wg *sync.WaitGroup) chan map[int]int{
    out := make(chan map[int]int)

    go func() {
        //100 groups
        total:= <- c
        wg.Done()
        //element number in group
        elemNumber := total / 100
        extra := total % 100
        result := make(map[int]int)
        if elemNumber>0{
            //certain 100 groups
            for i:=1 ;i<=99;i++{
                result[(i-1) * elemNumber + 1] = elemNumber
            }
            result[100] = extra + elemNumber
        }else{
            //less than 100
            for i:=1;i<=total;i++{
                result[i] = 1
            }
        }

        out <- result
        close(out)
    }()
    return out
}

//takes in all numbers to calculate multiply result
//this could be heavy so can do it 100 groups together
func calculateFactorial(nums chan map[int]int, wg *sync.WaitGroup) chan float64{
    out := make(chan float64)


    go func() {
        total:= <- nums
        wg.Done()
        fmt.Println(total)

        oneResult := make(chan float64)

        var wg2 sync.WaitGroup
        wg2.Add(len(total))

        for k,v := range total{
            fmt.Printf("%d %d \n",k,v)
            go func(k int, v int) {
                t := 1.0
                for i:=0;i<v;i++{
                    t = t * (float64(k) + float64(i))
                }
                fmt.Println(t)
                oneResult <- t
                wg2.Done()
            }(k,v)
        }

        wg2.Wait()
        close(oneResult)

        result := 1.0
        for n := range oneResult{  //DEADLOCK HERE! Why?
            result *= n
        }


        fmt.Printf("Result: %f\n",result)

        out <- result

    }()
    return out
}

Обновление:

См. также:  Отправка сообщений GRPC через определенный порт

Благодаря ответу Джессе Катринка, который устранил проблему в приведенном выше коде, просто изменив oneResult на буферный канал. Однако в https://stackoverflow.com/a/15144455/921082 есть цитата

Никогда не следует добавлять буферизацию только для устранения тупика. Если ваша программа заходит в тупик, гораздо проще исправить это, начав с нулевой буферизации и продумав зависимости. Затем добавьте буферизацию, если знаете, что она не приведет к взаимоблокировке.

Так может ли кто-нибудь помочь мне выяснить, как не использовать для этого буферизованный канал? Является ли это возможным?

Кроме того, я провел небольшое исследование того, что именно вызывает тупик.

Некоторые цитаты, например, из https://stackoverflow.com/a/18660709/921082,

Если канал не буферизован, отправитель блокируется до тех пор, пока получатель не получит значение. Если канал имеет буфер, отправитель блокируется только до тех пор, пока значение не будет скопировано в буфер; если буфер заполнен, это означает ожидание, пока какой-либо получатель не получит значение.

Сказано иначе:

  1. когда канал заполнен, отправитель ждет, пока другая горутина освободит место, получив

  2. вы можете видеть небуферизованный канал как всегда заполненный: должна быть другая горутина, принимающая то, что отправитель отправляет.

Итак, в моей исходной ситуации тупик, вероятно, вызван следующими причинами:

  1. диапазон по каналу не принимается?

  2. диапазон по каналу не принимается в отдельной программе. ?

  3. oneResult не закрыт должным образом, поэтому диапазон по каналу не знает, где конец?

для номера 3 я не знаю, есть ли что-нибудь неправильное в закрытии oneResult перед выходом диапазона, поскольку этот шаблон встречается во многих примерах в Интернете. Если это номер 3, может быть, что-то не так в группе ожидания?

У меня есть еще одна статья, очень похожая на мою ситуацию https://robertbasic.com/blog/buffered-vs-unbuffered-channels-in-golang/, на втором усвоенном уроке он использует for { select {} } бесконечный цикл в качестве альтернативы расширению диапазона, похоже, это решило его проблему.

 go func() {
        for{
            select {
            case p := <-pch:
                findcp(p)
            }
        }
    }()

Урок номер 2 — небуферизованный канал не может удерживать значения (да, это прямо в названии «небуферизованный»), поэтому все, что отправляется на этот канал, должно быть немедленно получено каким-то другим кодом. Этот код приема должен быть в другой горутине, потому что одна горутина не может делать две вещи одновременно: она не может отправлять и получать; это должно быть одно или другое.

Спасибо

Понравилась статья? Поделиться с друзьями:
IT Шеф
Комментарии: 2
  1. tomriddle_1234

    Тупик не в цикле передачи диапазона по каналу. Если вы запустите код на площадке, вы увидите в верхней части трассировки стека, что ошибка вызвана wg2.Wait (строка 88 на игровой площадке, на которую указывает трассировка стека). Также в stacktrace вы можете увидеть все горутины, которые не завершились из-за тупиковой ситуации, потому что oneResult<-t никогда не завершается, поэтому ни одна из горутин, запущенных в цикле, никогда не завершается.

    Итак, основная проблема здесь:

    wg2.Wait()
    close(oneResult)
    
    // ...
    
    for n := range oneResult{
    // ...
    

    Кроме того, я полагаю, что зацикливание по закрытому каналу — это не то, что вам нужно. Однако даже если вы не закрыли канал, этот цикл никогда не запустится, потому что wg2.Wait() будет ждать, пока его не будет.

    oneResult <- t
    wg2.Done()
    

    Но это никогда не будет сделано, потому что это зависит от того, что цикл уже запущен. Строка oneResult <- t не будет завершена, если кто-то на другой стороне не получит сигнал от этого канала, который является вашим циклом, однако этот цикл диапазона по каналу все еще ожидает завершения wg2.Wait().

    Таким образом, между отправителем и получателем канала существует «круговая зависимость».

    Чтобы решить эту проблему, вам нужно разрешить циклу начать прием с канала, при этом убедившись, что этот канал закрыт, когда это будет сделано. Вы можете сделать это, заключив две строки ожидания и закрытия в их собственную горутину.

    https://play.golang.com/p/rwwCFVszZ6Q

    Большое спасибо, ваш ответ делает меня намного понятнее. Это немного сложно, но хорошо разбираться в этой проблеме. Есть ли какая-нибудь книга, посвященная такой проблеме циклической зависимости, вызванной группой ожидания? person tomriddle_1234; 26.02.2019

    blog.golang.org/pipelines, часть Fan-out Fan-in упомянула поближе с WaitGroup, это основная проблема моей проблемы, я думаю person tomriddle_1234; 26.02.2019

    Я не уверен, что циклическая зависимость — правильный термин, если честно, а также я не знаю ни одной книги / статьи, в которых рассматриваются тупиковые ситуации именно в таких обстоятельствах. Но мне кажется, что есть две основные причины, по которым может возникнуть взаимоблокировка: либо вы отправляете на канал, не получая от него, либо вы получаете с канала, не отправляя на него. Это может произойти, если вы забыли написать код для приема / отправки, или вы не забыли написать его, но этот фрагмент кода не выполняется, потому что он где-то заблокирован, как в вашем случае wg2.Wait. person tomriddle_1234; 26.02.2019

    @ JesséCatrinck Пожалуйста, поправьте меня, если я ошибаюсь, но буферный канал позволяет вам заполнить его только до того, как он когда-либо будет получен от. Это не делает программу параллельной или параллельной. И в вашем примере вы все еще просто ждете с wg2, пока не заполните буфер, из-за чего цикл range oneResult бездействует. И даже если вы переместите wg2 в свою собственную горутину для активации цикла, вы получите тот же эффект, что и с небуферизованным каналом, что сделает буферизованный канал бессмысленным. person tomriddle_1234; 26.02.2019

    @ JesséCatrinck Это кажется более параллельным, чем this. Я ошибся? person tomriddle_1234; 26.02.2019

  2. tomriddle_1234

    Вам нужно добавить буфер в переменную oneResult

    введите описание изображения здесь

    Пояснение: https://www.rapidloop.com/blog/golang-channels-tips-tricks.html

    Подскажите, пожалуйста, зачем нужен буферизованный канал? мое первоначальное намерение не обязательно должно использовать буферизованный канал. например, пока есть значение oneResult, просто вычислите result *=n в диапазоне по каналу. person tomriddle_1234; 26.02.2019

    Вам нужен буфер для добавления нескольких значений к каналу oneResult, тогда, когда вы пройдете через него, вы получите все эти значения. См. Здесь: and-common-errors-in-go-golang / person tomriddle_1234; 26.02.2019

Добавить комментарий

;-) :| :x :twisted: :smile: :shock: :sad: :roll: :razz: :oops: :o :mrgreen: :lol: :idea: :grin: :evil: :cry: :cool: :arrow: :???: :?: :!: