From ddff3aee25c54a730f1f7c4361a7498b95d884ef Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Musab=20G=C3=BCltekin?= Date: Sat, 15 Jun 2019 22:27:46 +0300 Subject: [PATCH] Request cancellations support added to Middlewares. Some core functions refactored as middlewares. Fixed race condition in exporting system. Now, only one goroutine will be responsible for exporting. This fixes concurrency issues on writing. --- exporter/csv.go | 11 ++--- exporter/json.go | 7 +--- geziyor.go | 103 +++++++++++++---------------------------------- geziyor_test.go | 12 +++--- request.go | 32 ++++++++++++++- response.go | 1 - 6 files changed, 71 insertions(+), 95 deletions(-) diff --git a/exporter/csv.go b/exporter/csv.go index 2712b52..869ef2c 100644 --- a/exporter/csv.go +++ b/exporter/csv.go @@ -3,7 +3,6 @@ package exporter import ( "encoding/csv" "fmt" - "github.com/geziyor/geziyor" "log" "os" "reflect" @@ -19,7 +18,8 @@ type CSVExporter struct { writer *csv.Writer } -func (e *CSVExporter) Export(response *geziyor.Response) { +// Export exports response data as CSV streaming file +func (e *CSVExporter) Export(exports chan interface{}) { // Default filename if e.FileName == "" { @@ -37,7 +37,7 @@ func (e *CSVExporter) Export(response *geziyor.Response) { }) // Export data as responses came - for res := range response.Exports { + for res := range exports { var values []string // Detect type and extract CSV values @@ -57,14 +57,9 @@ func (e *CSVExporter) Export(response *geziyor.Response) { } // Write to file - e.mut.Lock() if err := e.writer.Write(values); err != nil { log.Printf("CSV writing error on exporter: %v\n", err) } - e.mut.Unlock() } - - e.mut.Lock() e.writer.Flush() - e.mut.Unlock() } diff --git a/exporter/json.go b/exporter/json.go index 535fa2b..041c2e1 100644 --- a/exporter/json.go +++ b/exporter/json.go @@ -3,7 +3,6 @@ package exporter import ( "encoding/json" "fmt" - "github.com/geziyor/geziyor" "log" "os" "sync" @@ -20,7 +19,7 @@ type JSONExporter struct { } // Export exports response data as JSON streaming file -func (e *JSONExporter) Export(response *geziyor.Response) { +func (e *JSONExporter) Export(exports chan interface{}) { // Default filename if e.FileName == "" { @@ -39,11 +38,9 @@ func (e *JSONExporter) Export(response *geziyor.Response) { }) // Export data as responses came - for res := range response.Exports { - e.mut.Lock() + for res := range exports { if err := e.encoder.Encode(res); err != nil { log.Printf("JSON encoding error on exporter: %v\n", err) } - e.mut.Unlock() } } diff --git a/geziyor.go b/geziyor.go index c55bfaa..795fbdf 100644 --- a/geziyor.go +++ b/geziyor.go @@ -14,7 +14,6 @@ import ( "math/rand" "net" "net/http" - "net/url" "os" "runtime/debug" "sync" @@ -23,7 +22,7 @@ import ( // Exporter interface is for extracting data to external resources type Exporter interface { - Export(r *Response) + Export(exports chan interface{}) } // RequestMiddleware called before requests made. @@ -32,7 +31,8 @@ type RequestMiddleware func(g *Geziyor, r *Request) // Geziyor is our main scraper type type Geziyor struct { - Opt Options + Opt Options + Exports chan interface{} client *http.Client wg sync.WaitGroup @@ -41,11 +41,8 @@ type Geziyor struct { sync.RWMutex hostSems map[string]chan struct{} } - visitedURLS struct { - sync.RWMutex - visitedURLS []string - } - requestMiddlewaresBase []RequestMiddleware + visitedURLs sync.Map + requestMiddlewares []RequestMiddleware } func init() { @@ -73,8 +70,13 @@ func NewGeziyor(opt Options) *Geziyor { }, Timeout: time.Second * 180, // Google's timeout }, - Opt: opt, - requestMiddlewaresBase: []RequestMiddleware{defaultHeadersMiddleware}, + Opt: opt, + Exports: make(chan interface{}), + requestMiddlewares: []RequestMiddleware{ + allowedDomainsMiddleware, + duplicateRequestsMiddleware, + defaultHeadersMiddleware, + }, } if opt.Cache != nil { @@ -102,6 +104,7 @@ func NewGeziyor(opt Options) *Geziyor { if opt.MaxBodySize == 0 { geziyor.Opt.MaxBodySize = 1024 * 1024 * 1024 // 1GB } + geziyor.requestMiddlewares = append(geziyor.requestMiddlewares, opt.RequestMiddlewares...) return geziyor } @@ -110,6 +113,17 @@ func NewGeziyor(opt Options) *Geziyor { func (g *Geziyor) Start() { log.Println("Scraping Started") + if len(g.Opt.Exporters) != 0 { + for _, exp := range g.Opt.Exporters { + go exp.Export(g.Exports) + } + } else { + go func() { + for range g.Exports { + } + }() + } + if g.Opt.StartRequestsFunc == nil { for _, startURL := range g.Opt.StartURLs { g.Get(startURL, g.Opt.ParseFunc) @@ -119,7 +133,7 @@ func (g *Geziyor) Start() { } g.wg.Wait() - + close(g.Exports) log.Println("Scraping Finished") } @@ -170,16 +184,11 @@ func (g *Geziyor) do(req *Request, callback func(resp *Response)) { } }() - if !g.checkURL(req.URL) { - return - } - - // Request Middlewares - for _, middlewareFunc := range g.requestMiddlewaresBase { - middlewareFunc(g, req) - } - for _, middlewareFunc := range g.Opt.RequestMiddlewares { + for _, middlewareFunc := range g.requestMiddlewares { middlewareFunc(g, req) + if req.Cancelled { + return + } } // Do request normal or Chrome and read response @@ -198,19 +207,6 @@ func (g *Geziyor) do(req *Request, callback func(resp *Response)) { response.DocHTML, _ = goquery.NewDocumentFromReader(bytes.NewReader(response.Body)) } - // Exporter functions - for _, exp := range g.Opt.Exporters { - go exp.Export(response) - } - - // Drain exports chan if no exporter functions added - if len(g.Opt.Exporters) == 0 { - go func() { - for range response.Exports { - } - }() - } - // Callbacks if callback != nil { callback(response) @@ -219,9 +215,6 @@ func (g *Geziyor) do(req *Request, callback func(resp *Response)) { g.Opt.ParseFunc(response) } } - - // Close exports chan to prevent goroutine leak - close(response.Exports) } func (g *Geziyor) doRequestClient(req *Request) (*Response, error) { @@ -265,7 +258,6 @@ func (g *Geziyor) doRequestClient(req *Request) (*Response, error) { Body: body, Meta: req.Meta, Geziyor: g, - Exports: make(chan interface{}), } return &response, nil @@ -303,7 +295,6 @@ func (g *Geziyor) doRequestChrome(req *Request) (*Response, error) { Body: []byte(res), Meta: req.Meta, Geziyor: g, - Exports: make(chan interface{}), } return response, nil @@ -313,7 +304,6 @@ func (g *Geziyor) acquireSem(req *Request) { if g.Opt.ConcurrentRequests != 0 { g.semGlobal <- struct{}{} } - if g.Opt.ConcurrentRequestsPerDomain != 0 { g.semHosts.RLock() hostSem, exists := g.semHosts.hostSems[req.Host] @@ -337,31 +327,6 @@ func (g *Geziyor) releaseSem(req *Request) { } } -func (g *Geziyor) checkURL(parsedURL *url.URL) bool { - rawURL := parsedURL.String() - // Check for allowed domains - if len(g.Opt.AllowedDomains) != 0 && !contains(g.Opt.AllowedDomains, parsedURL.Host) { - //log.Printf("Domain not allowed: %s\n", parsedURL.Host) - return false - } - - // Check for duplicate requests - if !g.Opt.URLRevisitEnabled { - g.visitedURLS.RLock() - if contains(g.visitedURLS.visitedURLS, rawURL) { - g.visitedURLS.RUnlock() - //log.Printf("URL already visited %s\n", rawURL) - return false - } - g.visitedURLS.RUnlock() - g.visitedURLS.Lock() - g.visitedURLS.visitedURLS = append(g.visitedURLS.visitedURLS, rawURL) - g.visitedURLS.Unlock() - } - - return true -} - func (g *Geziyor) delay() { if g.Opt.RequestDelayRandomize { min := float64(g.Opt.RequestDelay) * 0.5 @@ -371,13 +336,3 @@ func (g *Geziyor) delay() { time.Sleep(g.Opt.RequestDelay) } } - -// contains checks whether []string contains string -func contains(s []string, e string) bool { - for _, a := range s { - if a == e { - return true - } - } - return false -} diff --git a/geziyor_test.go b/geziyor_test.go index 56470fe..7f27507 100644 --- a/geziyor_test.go +++ b/geziyor_test.go @@ -28,7 +28,7 @@ func TestSimpleCache(t *testing.T) { Cache: httpcache.NewMemoryCache(), ParseFunc: func(r *geziyor.Response) { fmt.Println(string(r.Body)) - r.Exports <- string(r.Body) + r.Geziyor.Exports <- string(r.Body) r.Geziyor.Get("http://api.ipify.org", nil) }, }) @@ -47,7 +47,7 @@ func TestQuotes(t *testing.T) { func quotesParse(r *geziyor.Response) { r.DocHTML.Find("div.quote").Each(func(i int, s *goquery.Selection) { // Export Data - r.Exports <- map[string]interface{}{ + r.Geziyor.Exports <- map[string]interface{}{ "number": i, "text": s.Find("span.text").Text(), "author": s.Find("small.author").Text(), @@ -63,12 +63,14 @@ func quotesParse(r *geziyor.Response) { } } -func TestLinks(t *testing.T) { +func TestAllLinks(t *testing.T) { + defer leaktest.Check(t)() + geziyor.NewGeziyor(geziyor.Options{ AllowedDomains: []string{"books.toscrape.com"}, StartURLs: []string{"http://books.toscrape.com/"}, ParseFunc: func(r *geziyor.Response) { - r.Exports <- []string{r.Request.URL.String()} + r.Geziyor.Exports <- []string{r.Request.URL.String()} r.DocHTML.Find("a").Each(func(i int, s *goquery.Selection) { if href, ok := s.Attr("href"); ok { r.Geziyor.Get(r.JoinURL(href), r.Geziyor.Opt.ParseFunc) @@ -95,7 +97,7 @@ func TestStartRequestsFunc(t *testing.T) { }, ParseFunc: func(r *geziyor.Response) { r.DocHTML.Find("a").Each(func(_ int, s *goquery.Selection) { - r.Exports <- s.AttrOr("href", "") + r.Geziyor.Exports <- s.AttrOr("href", "") }) }, Exporters: []geziyor.Exporter{&exporter.JSONExporter{}}, diff --git a/request.go b/request.go index c472f12..a4362df 100644 --- a/request.go +++ b/request.go @@ -7,8 +7,26 @@ import ( // Request is a small wrapper around *http.Request that contains Metadata and Rendering option type Request struct { *http.Request - Meta map[string]interface{} - Rendered bool + Meta map[string]interface{} + Rendered bool + Cancelled bool +} + +func allowedDomainsMiddleware(g *Geziyor, r *Request) { + if len(g.Opt.AllowedDomains) != 0 && !contains(g.Opt.AllowedDomains, r.Host) { + //log.Printf("Domain not allowed: %s\n", req.Host) + r.Cancelled = true + return + } +} + +func duplicateRequestsMiddleware(g *Geziyor, r *Request) { + if !g.Opt.URLRevisitEnabled { + if _, visited := g.visitedURLs.LoadOrStore(r.Request.URL.String(), struct{}{}); visited { + //log.Printf("URL already visited %s\n", rawURL) + r.Cancelled = true + } + } } func defaultHeadersMiddleware(g *Geziyor, r *Request) { @@ -24,3 +42,13 @@ func headerSetDefault(header http.Header, key string, value string) http.Header } return header } + +// contains checks whether []string contains string +func contains(s []string, e string) bool { + for _, a := range s { + if a == e { + return true + } + } + return false +} diff --git a/response.go b/response.go index c95eb01..71076e1 100644 --- a/response.go +++ b/response.go @@ -16,7 +16,6 @@ type Response struct { Meta map[string]interface{} Geziyor *Geziyor - Exports chan interface{} } // JoinURL joins base response URL and provided relative URL.