Request cancellations support added to Middlewares.

Some core functions refactored as middlewares.
Fixed race condition in exporting system. Now, only one goroutine will be responsible for exporting. This fixes concurrency issues on writing.
This commit is contained in:
Musab Gültekin 2019-06-15 22:27:46 +03:00
parent 83a7b9eb87
commit ddff3aee25
6 changed files with 71 additions and 95 deletions

View File

@ -3,7 +3,6 @@ package exporter
import (
"encoding/csv"
"fmt"
"github.com/geziyor/geziyor"
"log"
"os"
"reflect"
@ -19,7 +18,8 @@ type CSVExporter struct {
writer *csv.Writer
}
func (e *CSVExporter) Export(response *geziyor.Response) {
// Export exports response data as CSV streaming file
func (e *CSVExporter) Export(exports chan interface{}) {
// Default filename
if e.FileName == "" {
@ -37,7 +37,7 @@ func (e *CSVExporter) Export(response *geziyor.Response) {
})
// Export data as responses came
for res := range response.Exports {
for res := range exports {
var values []string
// Detect type and extract CSV values
@ -57,14 +57,9 @@ func (e *CSVExporter) Export(response *geziyor.Response) {
}
// Write to file
e.mut.Lock()
if err := e.writer.Write(values); err != nil {
log.Printf("CSV writing error on exporter: %v\n", err)
}
e.mut.Unlock()
}
e.mut.Lock()
e.writer.Flush()
e.mut.Unlock()
}

View File

@ -3,7 +3,6 @@ package exporter
import (
"encoding/json"
"fmt"
"github.com/geziyor/geziyor"
"log"
"os"
"sync"
@ -20,7 +19,7 @@ type JSONExporter struct {
}
// Export exports response data as JSON streaming file
func (e *JSONExporter) Export(response *geziyor.Response) {
func (e *JSONExporter) Export(exports chan interface{}) {
// Default filename
if e.FileName == "" {
@ -39,11 +38,9 @@ func (e *JSONExporter) Export(response *geziyor.Response) {
})
// Export data as responses came
for res := range response.Exports {
e.mut.Lock()
for res := range exports {
if err := e.encoder.Encode(res); err != nil {
log.Printf("JSON encoding error on exporter: %v\n", err)
}
e.mut.Unlock()
}
}

View File

@ -14,7 +14,6 @@ import (
"math/rand"
"net"
"net/http"
"net/url"
"os"
"runtime/debug"
"sync"
@ -23,7 +22,7 @@ import (
// Exporter interface is for extracting data to external resources
type Exporter interface {
Export(r *Response)
Export(exports chan interface{})
}
// RequestMiddleware called before requests made.
@ -33,6 +32,7 @@ type RequestMiddleware func(g *Geziyor, r *Request)
// Geziyor is our main scraper type
type Geziyor struct {
Opt Options
Exports chan interface{}
client *http.Client
wg sync.WaitGroup
@ -41,11 +41,8 @@ type Geziyor struct {
sync.RWMutex
hostSems map[string]chan struct{}
}
visitedURLS struct {
sync.RWMutex
visitedURLS []string
}
requestMiddlewaresBase []RequestMiddleware
visitedURLs sync.Map
requestMiddlewares []RequestMiddleware
}
func init() {
@ -74,7 +71,12 @@ func NewGeziyor(opt Options) *Geziyor {
Timeout: time.Second * 180, // Google's timeout
},
Opt: opt,
requestMiddlewaresBase: []RequestMiddleware{defaultHeadersMiddleware},
Exports: make(chan interface{}),
requestMiddlewares: []RequestMiddleware{
allowedDomainsMiddleware,
duplicateRequestsMiddleware,
defaultHeadersMiddleware,
},
}
if opt.Cache != nil {
@ -102,6 +104,7 @@ func NewGeziyor(opt Options) *Geziyor {
if opt.MaxBodySize == 0 {
geziyor.Opt.MaxBodySize = 1024 * 1024 * 1024 // 1GB
}
geziyor.requestMiddlewares = append(geziyor.requestMiddlewares, opt.RequestMiddlewares...)
return geziyor
}
@ -110,6 +113,17 @@ func NewGeziyor(opt Options) *Geziyor {
func (g *Geziyor) Start() {
log.Println("Scraping Started")
if len(g.Opt.Exporters) != 0 {
for _, exp := range g.Opt.Exporters {
go exp.Export(g.Exports)
}
} else {
go func() {
for range g.Exports {
}
}()
}
if g.Opt.StartRequestsFunc == nil {
for _, startURL := range g.Opt.StartURLs {
g.Get(startURL, g.Opt.ParseFunc)
@ -119,7 +133,7 @@ func (g *Geziyor) Start() {
}
g.wg.Wait()
close(g.Exports)
log.Println("Scraping Finished")
}
@ -170,16 +184,11 @@ func (g *Geziyor) do(req *Request, callback func(resp *Response)) {
}
}()
if !g.checkURL(req.URL) {
for _, middlewareFunc := range g.requestMiddlewares {
middlewareFunc(g, req)
if req.Cancelled {
return
}
// Request Middlewares
for _, middlewareFunc := range g.requestMiddlewaresBase {
middlewareFunc(g, req)
}
for _, middlewareFunc := range g.Opt.RequestMiddlewares {
middlewareFunc(g, req)
}
// Do request normal or Chrome and read response
@ -198,19 +207,6 @@ func (g *Geziyor) do(req *Request, callback func(resp *Response)) {
response.DocHTML, _ = goquery.NewDocumentFromReader(bytes.NewReader(response.Body))
}
// Exporter functions
for _, exp := range g.Opt.Exporters {
go exp.Export(response)
}
// Drain exports chan if no exporter functions added
if len(g.Opt.Exporters) == 0 {
go func() {
for range response.Exports {
}
}()
}
// Callbacks
if callback != nil {
callback(response)
@ -219,9 +215,6 @@ func (g *Geziyor) do(req *Request, callback func(resp *Response)) {
g.Opt.ParseFunc(response)
}
}
// Close exports chan to prevent goroutine leak
close(response.Exports)
}
func (g *Geziyor) doRequestClient(req *Request) (*Response, error) {
@ -265,7 +258,6 @@ func (g *Geziyor) doRequestClient(req *Request) (*Response, error) {
Body: body,
Meta: req.Meta,
Geziyor: g,
Exports: make(chan interface{}),
}
return &response, nil
@ -303,7 +295,6 @@ func (g *Geziyor) doRequestChrome(req *Request) (*Response, error) {
Body: []byte(res),
Meta: req.Meta,
Geziyor: g,
Exports: make(chan interface{}),
}
return response, nil
@ -313,7 +304,6 @@ func (g *Geziyor) acquireSem(req *Request) {
if g.Opt.ConcurrentRequests != 0 {
g.semGlobal <- struct{}{}
}
if g.Opt.ConcurrentRequestsPerDomain != 0 {
g.semHosts.RLock()
hostSem, exists := g.semHosts.hostSems[req.Host]
@ -337,31 +327,6 @@ func (g *Geziyor) releaseSem(req *Request) {
}
}
func (g *Geziyor) checkURL(parsedURL *url.URL) bool {
rawURL := parsedURL.String()
// Check for allowed domains
if len(g.Opt.AllowedDomains) != 0 && !contains(g.Opt.AllowedDomains, parsedURL.Host) {
//log.Printf("Domain not allowed: %s\n", parsedURL.Host)
return false
}
// Check for duplicate requests
if !g.Opt.URLRevisitEnabled {
g.visitedURLS.RLock()
if contains(g.visitedURLS.visitedURLS, rawURL) {
g.visitedURLS.RUnlock()
//log.Printf("URL already visited %s\n", rawURL)
return false
}
g.visitedURLS.RUnlock()
g.visitedURLS.Lock()
g.visitedURLS.visitedURLS = append(g.visitedURLS.visitedURLS, rawURL)
g.visitedURLS.Unlock()
}
return true
}
func (g *Geziyor) delay() {
if g.Opt.RequestDelayRandomize {
min := float64(g.Opt.RequestDelay) * 0.5
@ -371,13 +336,3 @@ func (g *Geziyor) delay() {
time.Sleep(g.Opt.RequestDelay)
}
}
// contains checks whether []string contains string
func contains(s []string, e string) bool {
for _, a := range s {
if a == e {
return true
}
}
return false
}

View File

@ -28,7 +28,7 @@ func TestSimpleCache(t *testing.T) {
Cache: httpcache.NewMemoryCache(),
ParseFunc: func(r *geziyor.Response) {
fmt.Println(string(r.Body))
r.Exports <- string(r.Body)
r.Geziyor.Exports <- string(r.Body)
r.Geziyor.Get("http://api.ipify.org", nil)
},
})
@ -47,7 +47,7 @@ func TestQuotes(t *testing.T) {
func quotesParse(r *geziyor.Response) {
r.DocHTML.Find("div.quote").Each(func(i int, s *goquery.Selection) {
// Export Data
r.Exports <- map[string]interface{}{
r.Geziyor.Exports <- map[string]interface{}{
"number": i,
"text": s.Find("span.text").Text(),
"author": s.Find("small.author").Text(),
@ -63,12 +63,14 @@ func quotesParse(r *geziyor.Response) {
}
}
func TestLinks(t *testing.T) {
func TestAllLinks(t *testing.T) {
defer leaktest.Check(t)()
geziyor.NewGeziyor(geziyor.Options{
AllowedDomains: []string{"books.toscrape.com"},
StartURLs: []string{"http://books.toscrape.com/"},
ParseFunc: func(r *geziyor.Response) {
r.Exports <- []string{r.Request.URL.String()}
r.Geziyor.Exports <- []string{r.Request.URL.String()}
r.DocHTML.Find("a").Each(func(i int, s *goquery.Selection) {
if href, ok := s.Attr("href"); ok {
r.Geziyor.Get(r.JoinURL(href), r.Geziyor.Opt.ParseFunc)
@ -95,7 +97,7 @@ func TestStartRequestsFunc(t *testing.T) {
},
ParseFunc: func(r *geziyor.Response) {
r.DocHTML.Find("a").Each(func(_ int, s *goquery.Selection) {
r.Exports <- s.AttrOr("href", "")
r.Geziyor.Exports <- s.AttrOr("href", "")
})
},
Exporters: []geziyor.Exporter{&exporter.JSONExporter{}},

View File

@ -9,6 +9,24 @@ type Request struct {
*http.Request
Meta map[string]interface{}
Rendered bool
Cancelled bool
}
func allowedDomainsMiddleware(g *Geziyor, r *Request) {
if len(g.Opt.AllowedDomains) != 0 && !contains(g.Opt.AllowedDomains, r.Host) {
//log.Printf("Domain not allowed: %s\n", req.Host)
r.Cancelled = true
return
}
}
func duplicateRequestsMiddleware(g *Geziyor, r *Request) {
if !g.Opt.URLRevisitEnabled {
if _, visited := g.visitedURLs.LoadOrStore(r.Request.URL.String(), struct{}{}); visited {
//log.Printf("URL already visited %s\n", rawURL)
r.Cancelled = true
}
}
}
func defaultHeadersMiddleware(g *Geziyor, r *Request) {
@ -24,3 +42,13 @@ func headerSetDefault(header http.Header, key string, value string) http.Header
}
return header
}
// contains checks whether []string contains string
func contains(s []string, e string) bool {
for _, a := range s {
if a == e {
return true
}
}
return false
}

View File

@ -16,7 +16,6 @@ type Response struct {
Meta map[string]interface{}
Geziyor *Geziyor
Exports chan interface{}
}
// JoinURL joins base response URL and provided relative URL.