Fix exporter bug
This commit is contained in:
parent
b8bda36f92
commit
6415a775f4
59
geziyor.go
59
geziyor.go
@ -146,24 +146,7 @@ func (g *Geziyor) Start() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Start Exporters
|
// Start Exporters
|
||||||
if len(g.Opt.Exporters) != 0 {
|
g.startExporters()
|
||||||
g.wgExporters.Add(len(g.Opt.Exporters))
|
|
||||||
for _, exporter := range g.Opt.Exporters {
|
|
||||||
go func(exporter export.Exporter) {
|
|
||||||
defer g.wgExporters.Done()
|
|
||||||
if err := exporter.Export(g.Exports); err != nil {
|
|
||||||
internal.Logger.Printf("exporter error: %s\n", err)
|
|
||||||
}
|
|
||||||
}(exporter)
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
g.wgExporters.Add(1)
|
|
||||||
go func() {
|
|
||||||
for range g.Exports {
|
|
||||||
}
|
|
||||||
g.wgExporters.Done()
|
|
||||||
}()
|
|
||||||
}
|
|
||||||
|
|
||||||
// Wait for SIGINT (interrupt) signal.
|
// Wait for SIGINT (interrupt) signal.
|
||||||
shutdownChan := make(chan os.Signal, 1)
|
shutdownChan := make(chan os.Signal, 1)
|
||||||
@ -326,3 +309,43 @@ func (g *Geziyor) interruptSignalWaiter(shutdownChan chan os.Signal, shutdownDon
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (g *Geziyor) startExporters() {
|
||||||
|
if len(g.Opt.Exporters) != 0 {
|
||||||
|
var exporterChans []chan interface{}
|
||||||
|
|
||||||
|
g.wgExporters.Add(len(g.Opt.Exporters))
|
||||||
|
for _, exporter := range g.Opt.Exporters {
|
||||||
|
exporterChan := make(chan interface{})
|
||||||
|
exporterChans = append(exporterChans, exporterChan)
|
||||||
|
go func(exporter export.Exporter) {
|
||||||
|
defer g.wgExporters.Done()
|
||||||
|
if err := exporter.Export(exporterChan); err != nil {
|
||||||
|
internal.Logger.Printf("exporter error: %s\n", err)
|
||||||
|
}
|
||||||
|
}(exporter)
|
||||||
|
}
|
||||||
|
go func() {
|
||||||
|
// When exports closed, close the exporter chans.
|
||||||
|
// Exports chan will be closed after all requests are handled.
|
||||||
|
defer func() {
|
||||||
|
for _, exporterChan := range exporterChans {
|
||||||
|
close(exporterChan)
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
// Send incoming data from exports to all of the exporter's chans
|
||||||
|
for data := range g.Exports {
|
||||||
|
for _, exporterChan := range exporterChans {
|
||||||
|
exporterChan <- data
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
} else {
|
||||||
|
g.wgExporters.Add(1)
|
||||||
|
go func() {
|
||||||
|
for range g.Exports {
|
||||||
|
}
|
||||||
|
g.wgExporters.Done()
|
||||||
|
}()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
@ -61,7 +61,7 @@ func TestQuotes(t *testing.T) {
|
|||||||
geziyor.NewGeziyor(&geziyor.Options{
|
geziyor.NewGeziyor(&geziyor.Options{
|
||||||
StartURLs: []string{"http://quotes.toscrape.com/"},
|
StartURLs: []string{"http://quotes.toscrape.com/"},
|
||||||
ParseFunc: quotesParse,
|
ParseFunc: quotesParse,
|
||||||
Exporters: []export.Exporter{&export.JSON{}},
|
Exporters: []export.Exporter{&export.JSONLine{FileName: "1.jsonl"}, &export.JSON{FileName: "2.json"}},
|
||||||
}).Start()
|
}).Start()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Loading…
x
Reference in New Issue
Block a user