diff --git a/exporter/csv.go b/exporter/csv.go index 2712b52..869ef2c 100644 --- a/exporter/csv.go +++ b/exporter/csv.go @@ -3,7 +3,6 @@ package exporter import ( "encoding/csv" "fmt" - "github.com/geziyor/geziyor" "log" "os" "reflect" @@ -19,7 +18,8 @@ type CSVExporter struct { writer *csv.Writer } -func (e *CSVExporter) Export(response *geziyor.Response) { +// Export exports response data as CSV streaming file +func (e *CSVExporter) Export(exports chan interface{}) { // Default filename if e.FileName == "" { @@ -37,7 +37,7 @@ func (e *CSVExporter) Export(response *geziyor.Response) { }) // Export data as responses came - for res := range response.Exports { + for res := range exports { var values []string // Detect type and extract CSV values @@ -57,14 +57,9 @@ func (e *CSVExporter) Export(response *geziyor.Response) { } // Write to file - e.mut.Lock() if err := e.writer.Write(values); err != nil { log.Printf("CSV writing error on exporter: %v\n", err) } - e.mut.Unlock() } - - e.mut.Lock() e.writer.Flush() - e.mut.Unlock() } diff --git a/exporter/json.go b/exporter/json.go index 535fa2b..041c2e1 100644 --- a/exporter/json.go +++ b/exporter/json.go @@ -3,7 +3,6 @@ package exporter import ( "encoding/json" "fmt" - "github.com/geziyor/geziyor" "log" "os" "sync" @@ -20,7 +19,7 @@ type JSONExporter struct { } // Export exports response data as JSON streaming file -func (e *JSONExporter) Export(response *geziyor.Response) { +func (e *JSONExporter) Export(exports chan interface{}) { // Default filename if e.FileName == "" { @@ -39,11 +38,9 @@ func (e *JSONExporter) Export(response *geziyor.Response) { }) // Export data as responses came - for res := range response.Exports { - e.mut.Lock() + for res := range exports { if err := e.encoder.Encode(res); err != nil { log.Printf("JSON encoding error on exporter: %v\n", err) } - e.mut.Unlock() } } diff --git a/geziyor.go b/geziyor.go index c55bfaa..795fbdf 100644 --- a/geziyor.go +++ b/geziyor.go @@ -14,7 +14,6 @@ import ( "math/rand" "net" "net/http" - "net/url" "os" "runtime/debug" "sync" @@ -23,7 +22,7 @@ import ( // Exporter interface is for extracting data to external resources type Exporter interface { - Export(r *Response) + Export(exports chan interface{}) } // RequestMiddleware called before requests made. @@ -32,7 +31,8 @@ type RequestMiddleware func(g *Geziyor, r *Request) // Geziyor is our main scraper type type Geziyor struct { - Opt Options + Opt Options + Exports chan interface{} client *http.Client wg sync.WaitGroup @@ -41,11 +41,8 @@ type Geziyor struct { sync.RWMutex hostSems map[string]chan struct{} } - visitedURLS struct { - sync.RWMutex - visitedURLS []string - } - requestMiddlewaresBase []RequestMiddleware + visitedURLs sync.Map + requestMiddlewares []RequestMiddleware } func init() { @@ -73,8 +70,13 @@ func NewGeziyor(opt Options) *Geziyor { }, Timeout: time.Second * 180, // Google's timeout }, - Opt: opt, - requestMiddlewaresBase: []RequestMiddleware{defaultHeadersMiddleware}, + Opt: opt, + Exports: make(chan interface{}), + requestMiddlewares: []RequestMiddleware{ + allowedDomainsMiddleware, + duplicateRequestsMiddleware, + defaultHeadersMiddleware, + }, } if opt.Cache != nil { @@ -102,6 +104,7 @@ func NewGeziyor(opt Options) *Geziyor { if opt.MaxBodySize == 0 { geziyor.Opt.MaxBodySize = 1024 * 1024 * 1024 // 1GB } + geziyor.requestMiddlewares = append(geziyor.requestMiddlewares, opt.RequestMiddlewares...) return geziyor } @@ -110,6 +113,17 @@ func NewGeziyor(opt Options) *Geziyor { func (g *Geziyor) Start() { log.Println("Scraping Started") + if len(g.Opt.Exporters) != 0 { + for _, exp := range g.Opt.Exporters { + go exp.Export(g.Exports) + } + } else { + go func() { + for range g.Exports { + } + }() + } + if g.Opt.StartRequestsFunc == nil { for _, startURL := range g.Opt.StartURLs { g.Get(startURL, g.Opt.ParseFunc) @@ -119,7 +133,7 @@ func (g *Geziyor) Start() { } g.wg.Wait() - + close(g.Exports) log.Println("Scraping Finished") } @@ -170,16 +184,11 @@ func (g *Geziyor) do(req *Request, callback func(resp *Response)) { } }() - if !g.checkURL(req.URL) { - return - } - - // Request Middlewares - for _, middlewareFunc := range g.requestMiddlewaresBase { - middlewareFunc(g, req) - } - for _, middlewareFunc := range g.Opt.RequestMiddlewares { + for _, middlewareFunc := range g.requestMiddlewares { middlewareFunc(g, req) + if req.Cancelled { + return + } } // Do request normal or Chrome and read response @@ -198,19 +207,6 @@ func (g *Geziyor) do(req *Request, callback func(resp *Response)) { response.DocHTML, _ = goquery.NewDocumentFromReader(bytes.NewReader(response.Body)) } - // Exporter functions - for _, exp := range g.Opt.Exporters { - go exp.Export(response) - } - - // Drain exports chan if no exporter functions added - if len(g.Opt.Exporters) == 0 { - go func() { - for range response.Exports { - } - }() - } - // Callbacks if callback != nil { callback(response) @@ -219,9 +215,6 @@ func (g *Geziyor) do(req *Request, callback func(resp *Response)) { g.Opt.ParseFunc(response) } } - - // Close exports chan to prevent goroutine leak - close(response.Exports) } func (g *Geziyor) doRequestClient(req *Request) (*Response, error) { @@ -265,7 +258,6 @@ func (g *Geziyor) doRequestClient(req *Request) (*Response, error) { Body: body, Meta: req.Meta, Geziyor: g, - Exports: make(chan interface{}), } return &response, nil @@ -303,7 +295,6 @@ func (g *Geziyor) doRequestChrome(req *Request) (*Response, error) { Body: []byte(res), Meta: req.Meta, Geziyor: g, - Exports: make(chan interface{}), } return response, nil @@ -313,7 +304,6 @@ func (g *Geziyor) acquireSem(req *Request) { if g.Opt.ConcurrentRequests != 0 { g.semGlobal <- struct{}{} } - if g.Opt.ConcurrentRequestsPerDomain != 0 { g.semHosts.RLock() hostSem, exists := g.semHosts.hostSems[req.Host] @@ -337,31 +327,6 @@ func (g *Geziyor) releaseSem(req *Request) { } } -func (g *Geziyor) checkURL(parsedURL *url.URL) bool { - rawURL := parsedURL.String() - // Check for allowed domains - if len(g.Opt.AllowedDomains) != 0 && !contains(g.Opt.AllowedDomains, parsedURL.Host) { - //log.Printf("Domain not allowed: %s\n", parsedURL.Host) - return false - } - - // Check for duplicate requests - if !g.Opt.URLRevisitEnabled { - g.visitedURLS.RLock() - if contains(g.visitedURLS.visitedURLS, rawURL) { - g.visitedURLS.RUnlock() - //log.Printf("URL already visited %s\n", rawURL) - return false - } - g.visitedURLS.RUnlock() - g.visitedURLS.Lock() - g.visitedURLS.visitedURLS = append(g.visitedURLS.visitedURLS, rawURL) - g.visitedURLS.Unlock() - } - - return true -} - func (g *Geziyor) delay() { if g.Opt.RequestDelayRandomize { min := float64(g.Opt.RequestDelay) * 0.5 @@ -371,13 +336,3 @@ func (g *Geziyor) delay() { time.Sleep(g.Opt.RequestDelay) } } - -// contains checks whether []string contains string -func contains(s []string, e string) bool { - for _, a := range s { - if a == e { - return true - } - } - return false -} diff --git a/geziyor_test.go b/geziyor_test.go index 56470fe..7f27507 100644 --- a/geziyor_test.go +++ b/geziyor_test.go @@ -28,7 +28,7 @@ func TestSimpleCache(t *testing.T) { Cache: httpcache.NewMemoryCache(), ParseFunc: func(r *geziyor.Response) { fmt.Println(string(r.Body)) - r.Exports <- string(r.Body) + r.Geziyor.Exports <- string(r.Body) r.Geziyor.Get("http://api.ipify.org", nil) }, }) @@ -47,7 +47,7 @@ func TestQuotes(t *testing.T) { func quotesParse(r *geziyor.Response) { r.DocHTML.Find("div.quote").Each(func(i int, s *goquery.Selection) { // Export Data - r.Exports <- map[string]interface{}{ + r.Geziyor.Exports <- map[string]interface{}{ "number": i, "text": s.Find("span.text").Text(), "author": s.Find("small.author").Text(), @@ -63,12 +63,14 @@ func quotesParse(r *geziyor.Response) { } } -func TestLinks(t *testing.T) { +func TestAllLinks(t *testing.T) { + defer leaktest.Check(t)() + geziyor.NewGeziyor(geziyor.Options{ AllowedDomains: []string{"books.toscrape.com"}, StartURLs: []string{"http://books.toscrape.com/"}, ParseFunc: func(r *geziyor.Response) { - r.Exports <- []string{r.Request.URL.String()} + r.Geziyor.Exports <- []string{r.Request.URL.String()} r.DocHTML.Find("a").Each(func(i int, s *goquery.Selection) { if href, ok := s.Attr("href"); ok { r.Geziyor.Get(r.JoinURL(href), r.Geziyor.Opt.ParseFunc) @@ -95,7 +97,7 @@ func TestStartRequestsFunc(t *testing.T) { }, ParseFunc: func(r *geziyor.Response) { r.DocHTML.Find("a").Each(func(_ int, s *goquery.Selection) { - r.Exports <- s.AttrOr("href", "") + r.Geziyor.Exports <- s.AttrOr("href", "") }) }, Exporters: []geziyor.Exporter{&exporter.JSONExporter{}}, diff --git a/request.go b/request.go index c472f12..a4362df 100644 --- a/request.go +++ b/request.go @@ -7,8 +7,26 @@ import ( // Request is a small wrapper around *http.Request that contains Metadata and Rendering option type Request struct { *http.Request - Meta map[string]interface{} - Rendered bool + Meta map[string]interface{} + Rendered bool + Cancelled bool +} + +func allowedDomainsMiddleware(g *Geziyor, r *Request) { + if len(g.Opt.AllowedDomains) != 0 && !contains(g.Opt.AllowedDomains, r.Host) { + //log.Printf("Domain not allowed: %s\n", req.Host) + r.Cancelled = true + return + } +} + +func duplicateRequestsMiddleware(g *Geziyor, r *Request) { + if !g.Opt.URLRevisitEnabled { + if _, visited := g.visitedURLs.LoadOrStore(r.Request.URL.String(), struct{}{}); visited { + //log.Printf("URL already visited %s\n", rawURL) + r.Cancelled = true + } + } } func defaultHeadersMiddleware(g *Geziyor, r *Request) { @@ -24,3 +42,13 @@ func headerSetDefault(header http.Header, key string, value string) http.Header } return header } + +// contains checks whether []string contains string +func contains(s []string, e string) bool { + for _, a := range s { + if a == e { + return true + } + } + return false +} diff --git a/response.go b/response.go index c95eb01..71076e1 100644 --- a/response.go +++ b/response.go @@ -16,7 +16,6 @@ type Response struct { Meta map[string]interface{} Geziyor *Geziyor - Exports chan interface{} } // JoinURL joins base response URL and provided relative URL.