ba knife Water Shoes Quick Drying Casual 13720 Shoes Lightweight Mesh Slip-on Athletic Sport Casual Sneakers for Men and Women Gray e117b90 - bewegungswerkstatt.website Basic Introduction to Cython Edmund Martin Leave a comment

Python is often criticized for being slow. In many cases pure Python is fast enough, there are certain cases where Python may not give you the performance you need. In recent years a fair number of Python programmers have made the jump to Golang for performance reasons. However there are a number of ways you can improve the performance of your Python code such as using PyPy, writing C extensions or trying your hand at Cython.

What is Cython?

Cython is a superset of the Python language. This means that the vast majority of Python code, is also valid Cython code. Cython allows users to write Cython modules which are then compile and can be used within in Python code. This means that users can port performance critical code into Cython and instantly see increases in performance. The great thing about Cython code is that you can determine how much to optimize your code.  Simply copying and compiling your Python code might see you make performance gains of 8-12%, whereas more serious optimization of your code can lead to significantly better performance.

Installing Cython

Installing Cython on Linux is very easy to do and just requires you to use the ‘pip install cython’ command. Those on Windows devices will likely have a much tougher time, with the simplest solution seeming to be just installing ‘Visual Code Community’ and selecting both the C++ and Python support options. You can then just install Cython like you would any other Python package.

Using Pure Python

We are going to begin with compiling a pure Python function. This is a very simple task, and can achieve some limited performance benefits, with a more noted increase in performance for functions which make use of for and while loops. To begin we simply save the below code into a file called ‘looping.pyx’.

looping.pyx
Python
1
2
3
4
5
def multiply_elements_by_index(number_list):
    output = 0
    for i in range(len(number_list)):
        output += number_list[i] * i
    return output

This very simple function takes a list of numbers and then multiples each number by it’s index, and returning the sum of the results. This code is both valid Python and Cython code. However, it takes no advantage of any Cython optimizations other than the compilation of the code into C.

We run the below command to create Cython module which can be used in Python:

1
cythonize -b looping.pyx

We can then import our Cython module into Python in the following manner:

importing Cython function
Python
1
2
3
4
5
from looping import multiply_elements_by_index
 
NUMBERS_LIST = [10, 8, 3, 4, 5, 7, 12]
 
multiply_elements_by_index(NUMBERS_LIST)

What kind of performance benefits can we expect from just compiling this Python code into a Cython module?

I ran some tests and on average the Cython compiled version of the code took around 10% less time to run over a set of 10,000 numbers.

Adding Types

Cython achieves optimization of code by introducing typing to Python code. Cython supports both a range of Python and C types. Python types tend to be more flexible but give you less in terms of performance benefits.  The below example makes use of both C and Python types, however we have to be very careful when using C types. For instance we could throw an overflow error should the list of numbers we pass in be too large and the result of the multiplication being to large to store in a C long.

looping2.pyx
1
2
3
4
5
6
7
8
def multiply_elements_by_index(list number_list):
    cdef long output
    cdef int list_len
    output = 0
    list_len = len(number_list)
    for i in range(list_len):
        output += number_list[i] * i
    return output

As you can see we use the Python type ‘list’ to type annotate our input list. We then define two C types which will be used to store the length of our list and our output. We then loop over our list in exactly the same way as we did in our previous example. This shows just how easy it is to start adding C types to code with the help of Python. It also illustrates how easy it is to mix both C and Python types together in one extension module.

This hybrid code when tested was between 15-30% faster than the pure Python implementation without taking the most aggressive path of optimization and turning everything into a C type. While these savings may seem small, they can really add up on operations which are repeated hundreds of thousands of times.

Cython Function Types

Unlike standard Python, Cython has three types of functions. These functions differ in how they are defined and where the can be used.

  • Cdef functions – Cdef functions can only be used in Cython code and cannot be imported into Python.
  • Cpdef functions – can be used and imported in both Python and Cython. If used in Cython they behave as a Cdef function and if used in Python they behave as if they are standard Python function.
  • Def functions – are like your standard Python functions and can be used and imported into Python code

The below code block demonstrates how each of these three function types can be defined.

Python
1
2
3
4
5
6
7
8
def add(int x, int y):
    return x + y
 
cpdef int add(int x, int y):
    return x + y
 
cdef int add(int x, int y):
    return x + y

This allows you to define highly performant Cdef functions for use within Cython modules, while at the same time allowing you to write functions that are totally compatible with Python.  Cpdef functions are a good middle ground, in the sense that when they are used in Cython code they are highly optimized while remaining compatible with Python, should you want to import them into a Python module.

Conclusion

While this introduction only touches the surface of the Cython language, it should be enough to begin optimizing code using Cython. However, some of the more aggressive optimizations and the full power of C types are well beyond the scope of this post.

ba knife Water Shoes Quick Drying Casual 13720 Shoes Lightweight Mesh Slip-on Athletic Sport Casual Sneakers for Men and Women Gray e117b90 - bewegungswerkstatt.website Detecting Selenium Edmund Martin Leave a comment

When looking to extract information from more difficult to scrape sites many programmers turn to browser automation tools such as Selenium and iMacros. At the time of writing, Selenium is by far the most popular option for those looking to leverage browser automation for information retrieval purposes. However, Selenium is very detectable and site owners would be able to block a large percentage of all Selenium users.

Selenium Detection with Chrome

JavaScript
1
2
3
if (navigator.webdriver == true) {
    window.location = "http://google.com";
}

When using Chrome, the Selenium driver injects a webdriver property into the browser’s navigator object. This means it’s possible to write a couple lines of JavaScript to detect that the user is using Selenium. The above code snippet simply checks whether webdriver is set to true and redirects the user should this be the case. I have never seen this technique used in the wild, but I can confirm that it seems to successfully redirect those using Chrome with Selenium.

Selenium Detection with Firefox

JavaScript
1
2
3
if (window.document.documentElement.getAttribute("webdriver")) {
    window.location = "http://google.com";
}

Older versions of Firefox used to inject a webdriver attribute into the HTML document. This means that older versions of Firefox could be very simply detected using the above code snippet. At the time of writing Firefox no longer adds this element to pages when using Selenium.

Additional methods of detecting Selenium when using Firefox have also been suggested. Testing seems to suggest that these do not work with the latest builds of Firefox. However, the webdriver standard suggests that this may eventually be implemented in Firefox again.

Selenium Detection with PhantomJS

ba knife Water Shoes Quick Drying Casual 13720 Shoes Lightweight Mesh Slip-on Athletic Sport Casual Sneakers for Men and Women Gray e117b90 - bewegungswerkstatt.website

JavaScript
1
2
3
if (window.callPhantom || window._phantom) {
    window.location = "http://google.com";
}

All current versions of PhantomJS, add attributes to the window element. This allows site owners to simply check whether these specific PhantomJS attributes are set and redirect the user away when it turns out that they are using PhantomJS. It should also be noted that support for the PhantomJS project has been rather inconsistent and the project makes use on an outdated webkit version which is also detectable and could present a security list.

Avoiding Detection

Your best of avoiding detection when using Selenium would require you to use one of the latest builds of Firefox which don’t appear to give off any obvious sign that you are using Firefox. Additionally, it may be worth experimenting with both Safari and Opera which are much less commonly used by those scraping the web. It would also seem likely that Firefox may be giving off some less obvious footprint which would need further investigation to discover.

ba knife Water Shoes Quick Drying Casual 13720 Shoes Lightweight Mesh Slip-on Athletic Sport Casual Sneakers for Men and Women Gray e117b90 - bewegungswerkstatt.website Scraping & Health Monitoring free proxies with Python Edmund Martin Leave a comment

When web-scraping, you often need to source a number of proxies in order to avoid being banned or get around rate limiting imposed by the website in question. This often see’s developers purchasing proxies from some sort of commercial provider, this can become quite costly if you are only need the proxies for a short period of time. So in this post we are going to look at how you might use proxies from freely available proxy lists to scrape the internet.

Problems With Free Proxies
  • Free Proxies Die Very Quickly
  • Free Proxies Get Blocked By Popular Sites
  • Free Proxies Frequently Timeout

While free proxies are great in the sense that they are free, they tend to be highly unreliable. This is due to the fact that up-time is inconsistent and these proxies get blocked quickly by popular sites such as Google. Our solution is also going to build in some monitoring of the current status of the proxy in question. Allowing us to avoid using proxies which are currently broken.

Scraping Proxies

Python
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
import datetime
import logging
from concurrent.futures import ThreadPoolExecutor
 
import requests
from bs4 import BeautifulSoup
 
 
def freeproxylist(user_agent):
    proxies = {}
    response = requests.get('https://www.free-proxy-list.net/', headers={'User-Agent': user_agent}, timeout=(9, 27))
    soup = BeautifulSoup(response.text, 'html.parser')
    proxy_list = soup.select('table#proxylisttable tr')
    for p in proxy_list:
        info = p.find_all('td')
        if len(info):
            proxy = ':'.join([info[0].text, info[1].text])
            proxies.update({proxy: {'country_code': info[2].text, 'country': info[3].text, 'privacy': info[4].text,
                                    'google': info[5].text, 'https': info[6].text, 'last_checked': None,
                                    'alive': True}})
    return proxies

We are going to use free-proxy-list.net, as our source for this example. But the example could easily be expanded to cover multiple sources of proxies. We simply write a simple method which visits the page and pulls out all the proxies from the page in question using our chosen user-agent.  We then store the results in a dictionary, with each proxy acting as a key holding the information relating to that particular proxy. We are not doing any error handling, this will be handled in our ProxyManager class.

Proxy Manager

Python
1
2
3
4
5
6
7
8
9
10
11
12
13
14
class ProxyManager:
 
    def __init__(self, test_url, user_agent):
        self.test_url = test_url
        self.user_agent = user_agent
        self.thread_pool = ThreadPoolExecutor(max_workers=50)
        self.proxies = {}
        self.update_proxy_list()
    
    def update_proxy_list(self):
        try:
            self.proxies = freeproxylist(self.user_agent)
        except Exception as e:
            logging.error('Unable to update proxy list, exception : {}'.format(e))

Our proxy manager is a simply class which allows us to get and manage the proxies we find on free-proxy-list.net. We pass in a test URL which will be used to test whether the proxy is working and a user agent to be used for both scraping and testing the proxies in question. We also create a thread pool, so we can more quickly check the status of the proxies we have scraped. We then call our update_proxy_list, returning the proxies we have found on free-proxy-list.net into our dictionary of proxies.

Checking Proxies

Python
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
    def __check_proxy_status(self, proxy, info):
        info['last_checked'] = datetime.datetime.now()
        try:
            res = requests.get(self.test_url, proxies={'http': proxy}, timeout=(3, 6))
            res.raise_for_status()
        except Exception as e:
            info['alive'] = False
        else:
            info['alive'] = True
        return {proxy: info}
 
    def refresh_proxy_status(self):
        results = [self.thread_pool.submit(self.__check_proxy_status, k, v) for k, v in self.proxies.items()]
        for res in results:
            result = res.result()
            self.proxies.update(result)

We can now write a couple of methods to test whether a particular proxy works. The first method takes the proxy and the dictionary of information related to the proxy in question. We immediately set the last checked variable to the current time. We make a request against our test URL, with a relatively short timeout. We also then check the status of the request raising an exception should we receive a non-200 status code. Should anything go wrong, we then set the status of the proxy to dead, otherwise we set the status to alive.

We then write our refresh proxy status which simple calls our check proxy status. We iterate over our dictionary, submitting each proxy and the related info of to a thread. If we didn’t use threads to check the status of our proxies, we could be waiting a very long time for our results. We then loop through our results and update the status of proxy in question.

Getting A Proxy

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
    def get_proxies_key_value(self, key, value):
        proxies = []
        for k, v in self.proxies.items():
            match = v.get(key)
            if match == value:
                proxies.append(k)
        return proxies
 
    def get_proxy(self):
        proxy = None
        for k, v in self.proxies.items():
            alive = v.get('alive')
            if alive:
                return k
        return proxy

We then write two methods for getting ourselves a proxy. Our first method allows us to get a list of proxies by passing in a relevant key and value. This method allows us to get a list of proxies that relate to a particular country or boasts a particular level anonymity. This can be useful should we be interested in particular properties of a proxy.

We also have a simple method that allows us to return a single working proxy. This returns the first working proxy found within our proxy dictionary by looping over all the items in the dictionary, and returning the first proxy where ‘alive’ is equal to true.

Example Usage

Python
1
2
3
4
5
6
7
8
9
10
11
# Create an instance of our proxy manager
proxy_scrape = ProxyManager('http://google.com', 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/65.0.3325.181 Safari/537.36')
 
# Refresh the status of the proxies we pulled on initialization
proxy_scrape.refresh_proxy_status()
 
# Get a single working proxy
proxy = proxy_scrape.get_proxy()
 
# Make a fresh scrape of free-proxy-list.net
proxy_scrape.update_proxy_list()

Using the library is pretty simple. We just create the class passing in our test URL (using Google.com here) and our selected user-agent. We then call refresh_proxy_status, updating the status of the scraped proxies by running them against our test URL. We can then pull out an individual working proxy. We can then update our proxy list with a fresh scrape of our source should we not be satisfied with the proxies we currently have access to.

Full Code

Python
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
import datetime
import logging
from concurrent.futures import ThreadPoolExecutor
 
import requests
from bs4 import BeautifulSoup
 
 
def freeproxylist(user_agent):
    proxies = {}
    response = requests.get('https://www.free-proxy-list.net/', headers={'User-Agent': user_agent}, timeout=(9, 27))
    soup = BeautifulSoup(response.text, 'html.parser')
    proxy_list = soup.select('table#proxylisttable tr')
    for p in proxy_list:
        info = p.find_all('td')
        if len(info):
            proxy = ':'.join([info[0].text, info[1].text])
            proxies.update({proxy: {'country_code': info[2].text, 'country': info[3].text, 'privacy': info[4].text,
                                    'google': info[5].text, 'https': info[6].text, 'last_checked': None,
                                    'alive': True}})
    return proxies
 
 
class ProxyManager:
 
    def __init__(self, test_url, user_agent):
        self.test_url = test_url
        self.user_agent = user_agent
        self.thread_pool = ThreadPoolExecutor(max_workers=50)
        self.proxies = {}
        self.update_proxy_list()
 
    def update_proxy_list(self):
        try:
            self.proxies = freeproxylist(self.user_agent)
        except Exception as e:
            logging.error('Unable to update proxy list, exception : {}'.format(e))
 
    def __check_proxy_status(self, proxy, info):
        info['last_checked'] = datetime.datetime.now()
        try:
            res = requests.get(self.test_url, proxies={'http': proxy}, timeout=(3, 6))
            res.raise_for_status()
        except Exception as e:
            info['alive'] = False
        else:
            info['alive'] = True
        return {proxy: info}
 
    def refresh_proxy_status(self):
        results = [self.thread_pool.submit(self.__check_proxy_status, k, v) for k, v in self.proxies.items()]
        for res in results:
            result = res.result()
            self.proxies.update(result)
 
    def get_proxies_key_value(self, key, value):
        proxies = []
        for k, v in self.proxies.items():
            match = v.get(key)
            if match == value:
                proxies.append(k)
        return proxies
 
    def get_proxy(self):
        proxy = None
        for k, v in self.proxies.items():
            alive = v.get('alive')
            if alive:
                return k
        return proxy

ba knife Water Shoes Quick Drying Casual 13720 Shoes Lightweight Mesh Slip-on Athletic Sport Casual Sneakers for Men and Women Gray e117b90 - bewegungswerkstatt.website Scraping Instagram with Python Edmund Martin 4 Comments

In today’s post we are going how to look at how you can extract information from a users Instagram profile. It’s surprisingly easy to extract profile information such as the number of followers a user has and information and image files for a users most recent posts. With a bit of effort it would be relatively easy to extract large chunks of data regarding a user. This could then be applied at a very broad scale to extract a large chunk of all public posts featured on Instagram’s site.

Imports & Setup

Python
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
from random import choice
import json
 
import requests
from bs4 import BeautifulSoup
 
_user_agents = [
    'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/65.0.3325.181 Safari/537.36'
]
 
 
class InstagramScraper:
 
    def __init__(self, user_agents=None, proxy=None):
        self.user_agents = user_agents
        self.proxy = proxy
 
    def __random_agent(self):
        if self.user_agents and isinstance(self.user_agents, list):
            return choice(self.user_agents)
        return choice(_user_agents)
 
    def __request_url(self, url):
        try:
            response = requests.get(url, headers={'User-Agent': self.__random_agent()}, proxies={'http': self.proxy,
                                                                                                 'https': self.proxy})
            response.raise_for_status()
        except requests.HTTPError:
            raise requests.HTTPError('Received non 200 status code from Instagram')
        except requests.RequestException:
            raise requests.RequestException
        else:
            return response.text

We begin by making our imports and writing the dunder init method for our class. Our code requires two packages not included in the standard library, requests for making HTTP Requests and BeautifulSoup to make html parsing more user friendly. If you do not already have these libraries install, you can use the following pip command:

1
pip install requests bs4

The init method of our class takes two optional keyword arguments, which we simply store in self. This will allow us to override the default user agent list and use a proxy should we wish to avoid detection.

We then write two helper methods. First, we write a very simply method that returns us a random user-agent. Switching user agents is often a best practice when web scraping and can help you avoid detection. Should the caller of our class have provided their own list of user agents we take a random agent from the provided list.  Otherwise we will return our default user agent.

Our second helper method is simply a wrapper around requests. We pass in a URL and try to make a request using the provided user agent and proxy. If we are unable to make the request or Instagram responds with a non-200 status code we simply re-raise the error. If everything goes fine, we return the page in questions HTML.

Extracting JSON from JavaScript

Instagram serve’s all the of information regarding a user in the form of JavaScript object. This means that we can extract all of a users profile information and their recent posts by just making a HTML request to their profile page. We simply need to turn this JavaScript object into JSON, which is very easy to do.

ba knife Water Shoes Quick Drying Casual 13720 Shoes Lightweight Mesh Slip-on Athletic Sport Casual Sneakers for Men and Women Gray e117b90
Python
1
2
3
4
5
6
7
    @staticmethod
    def extract_json_data(html):
        soup = BeautifulSoup(html, 'html.parser')
        body = soup.find('body')
        script_tag = body.find('script')
        raw_string = script_tag.text.strip().replace('window._sharedData =', '').replace(';', '')
        return json.loads(raw_string)

We can write this very hacky, but effective method to extract JSON from a user profile. We apply the static method decorator to this function, as it’s possible to use this method without initializing our class. We simply create a soup from the HTML, select body of the content and then pull out the first ‘script’ tag. We can then simply do a couple text replacements on the script tag, to derive a string which can be loaded into a dictionary object using the json.loads method.

Bringing it all together

Python
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
    def profile_page_metrics(self, profile_url):
        results = {}
        try:
            response = self.__request_url(profile_url)
            json_data = self.extract_json_data(response)
            metrics = json_data['entry_data']['ProfilePage'][0]['graphql']['user']
        except Exception as e:
            raise e
        else:
            for key, value in metrics.items():
                if key != 'edge_owner_to_timeline_media':
                    if value and isinstance(value, dict):
                        value = value['count']
                        results[key] = value
                    elif value:
                        results[key] = value
        return results
 
    def profile_page_recent_posts(self, profile_url):
        results = []
        try:
            response = self.__request_url(profile_url)
            json_data = self.extract_json_data(response)
            metrics = json_data['entry_data']['ProfilePage'][0]['graphql']['user']['edge_owner_to_timeline_media']["edges"]
        except Exception as e:
            raise e
        else:
            for node in metrics:
                node = node.get('node')
                if node and isinstance(node, dict):
                    results.append(node)
        return results

We then bring it all together in two functions which we can use to extract information from this very large JSON object. We first make a request to the page, before extracting the JSON result. We then use two different selectors to pull out the relevant bits of information, as the default JSON object has lots of information we don’t really need.

When extracting profile information we extract all attributes from the “user” object, excluding their recent posts. In the “recent posts” function, we use a slightly different selector and pull out all the information about all of the recent posts made by our targeted user.

Example Usage

1
2
3
4
5
from pprint import pprint
 
k = InstagramScraper()
results = k.profile_page_recent_posts('https://www.instagram.com/kimkardashian/?hl=en')
pprint(results)

We can then use the Instagram scraper in a very simply fashion to pull out all the most recent posts from our favorite users in a very simple fashion. You could do lots of things with the resulting data, which could be used in Instagram analytics app for instance or you could simply programmatically download all the images relating to that user.

There is certainly room for improvement and modification. It would also be possible to use Instagram’s graph API, to pull out further posts from a particular user or pull out lists of a users recent followers etc. Allowing you to collect large amounts of data, without having to deal with Facebook’s restrictive API limitations and policies.

Full Code

Python
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
from random import choice
import json
 
import requests
from bs4 import BeautifulSoup
 
_user_agents = [
    'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/65.0.3325.181 Safari/537.36'
]
 
 
class InstagramScraper:
 
    def __init__(self, user_agents=None, proxy=None):
        self.user_agents = user_agents
        self.proxy = proxy
 
    def __random_agent(self):
        if self.user_agents and isinstance(self.user_agents, list):
            return choice(self.user_agents)
        return choice(_user_agents)
 
    def __request_url(self, url):
        try:
            response = requests.get(url, headers={'User-Agent': self.__random_agent()}, proxies={'http': self.proxy,
                                                                                                 'https': self.proxy})
            response.raise_for_status()
        except requests.HTTPError:
            raise requests.HTTPError('Received non 200 status code from Instagram')
        except requests.RequestException:
            raise requests.RequestException
        else:
            return response.text
 
    @staticmethod
    def extract_json_data(html):
        soup = BeautifulSoup(html, 'html.parser')
        body = soup.find('body')
        script_tag = body.find('script')
        raw_string = script_tag.text.strip().replace('window._sharedData =', '').replace(';', '')
        return json.loads(raw_string)
 
    def profile_page_metrics(self, profile_url):
        results = {}
        try:
            response = self.__request_url(profile_url)
            json_data = self.extract_json_data(response)
            metrics = json_data['entry_data']['ProfilePage'][0]['graphql']['user']
        except Exception as e:
            raise e
        else:
            for key, value in metrics.items():
                if key != 'edge_owner_to_timeline_media':
                    if value and isinstance(value, dict):
                        value = value['count']
                        results[key] = value
                    elif value:
                        results[key] = value
        return results
 
    def profile_page_recent_posts(self, profile_url):
        results = []
        try:
            response = self.__request_url(profile_url)
            json_data = self.extract_json_data(response)
            metrics = json_data['entry_data']['ProfilePage'][0]['graphql']['user']['edge_owner_to_timeline_media']["edges"]
        except Exception as e:
            raise e
        else:
            for node in metrics:
                node = node.get('node')
                if node and isinstance(node, dict):
                    results.append(node)
        return results

ba knife Water Shoes Quick Drying Casual 13720 Shoes Lightweight Mesh Slip-on Athletic Sport Casual Sneakers for Men and Women Gray e117b90 - bewegungswerkstatt.website Writing a web crawler in Python 3.5+ using asyncio Edmund Martin Leave a comment

The asyncio library was introduced to Python from versions, 3.4 onwards. However, the async await syntax was not introduced into the language in Python 3.5. The introduction of this functionality allows us to write asynchronous web crawlers, without having to use threads. Getting used to asynchronous programming can take a while, and in this tutorial we are going to build a fully functional web crawler using asyncio and aiohttp.

Fan In & Fan Out Concurrency Pattern


We are going to write a web crawler which will continue to crawl a particular site, until we reach a defined maximum depth. We are going to make use of a fan-in/fan-out concurrency pattern. Essentially, this involves gathering together a set of tasks, and then distributing them across a bunch of threads, or in across co-routines in our case. We then gather all the results together again, before processing them, and fanning out a new group of tasks. I would highly recommend Brett Slatkin’s 2014 talk, which inspired this particular post.

Initializing Our Crawler

Python
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
import asyncio
import logging
import aiohttp
from urllib.parse import urljoin, urlparse
from lxml import html as lh
 
 
class AsyncCrawler:
 
    def __init__(self, start_url, crawl_depth, max_concurrency=200):
        self.start_url = start_url
        self.base_url = '{}://{}'.format(urlparse(self.start_url).scheme, urlparse(self.start_url).netloc)
        self.crawl_depth = crawl_depth
        self.seen_urls = set()
        self.session = aiohttp.ClientSession()
        self.bounde_sempahore = asyncio.BoundedSemaphore(max_concurrency)

We begin by importing the libraries required for our asyncio crawler. We are using a couple libraries which are not included in Python’s standard library. These required libraries can be installed using the following pip command:

1
pip install aiohttp lxml

We can then start defining our class. Our crawler takes two positional arguments and one optional keyword argument. We pass in the start URL, which is the URL we begin our crawl with and we also set the maximum depth of the crawl. We also pass in a maximum concurrency level which prevents our crawler from making more than 200 concurrent requests at a single time.

The start URL is then parsed to give us the base URL for the site in question. We also create a set of URLs which have already seen, to ensure that we don’t end up crawling the same URL more than once. We also create session using aiohttp.ClientSession so that we can skip, having to create a session every time we scrape a URL. Doing this will throw a warning, but the creation of a client session is synchronous, so it can be safely done outside of a co-routine. We also set up a asyncio BoundedSemaphore using our max concurrency variable, we will use this to prevent our crawler from making too many concurrent requests at one time.

Making An Async HTTP Request

Python
1
2
3
4
5
6
7
8
9
    async def _http_request(self, url):
        print('Fetching: {}'.format(url))
        async with self.bounde_sempahore:
            try:
                async with self.session.get(url, timeout=30) as response:
                    html = await response.read()
                    return html
            except Exception as e:
                logging.warning('Exception: {}'.format(e))

We can then write a function make to a asynchronous HTTP request. Making a single asynchronous request is pretty similar to making a standard HTTP request. As you can see we write “async” prior to the function definition. We begin by using an async context manager, using the bounded semaphore created when we initialized our class. This will limit asynchronous requests to whatever we passed in when creating an instance of AsyncCrawler class.

We then use another async context manager within a try/except block to make a request to the URL, and await for the response. Before we finally return the HTML.

Extracting URLs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
    def find_urls(self, html):
        found_urls = []
        dom = lh.fromstring(html)
        for href in dom.xpath('//a/@href'):
            url = urljoin(self.base_url, href)
            if url not in self.seen_urls and url.startswith(self.base_url):
                found_urls.append(url)
        return found_urls
 
    async def extract_async(self, url):
        data = await self._http_request(url)
        found_urls = set()
        if data:
            for url in self.find_urls(data):
                found_urls.add(url)
        return url, data, sorted(found_urls)

We can then write a standard function to extract all the URLs from a html response. We create DOM (Document Object Model) object from our HTML, using Lxml’s HTML sub-module. Once we have extracted our document model, we able to query it using either XPath or CSS selectors. Here we use a simple XPath selector to pull out the ‘href’ element of every link found on the page in question.

We can then use urllib.parse’s urljoin function with our base URL and found href. This gives an absolute URL, automatically resolving any relative URLs that we may have found on the page. If we haven’t already crawled this URL and it belongs to the site we are crawling, we add it to our list of found URLs.

The extract async function is a simple wrapper around our HTTP request and find URL functions. Should we encounter any error, we simply ignore it. Otherwise we use the HTML to create a list of URLs found on that page.

Fanning In/Out

1
2
3
4
5
6
7
8
9
10
11
12
13
    async def extract_multi_async(self, to_fetch):
        futures, results = [], []
        for url in to_fetch:
            if url in self.seen_urls: continue
            self.seen_urls.add(url)
            futures.append(self.extract_async(url))
 
        for future in asyncio.as_completed(futures):
            try:
                results.append((await future))
            except Exception as e:
                logging.warning('Encountered exception: {}'.format(e))
        return results

Our extract_multi_async function is where we fan out. The function takes a list of URLs to be crawled. We begin by creating two empty lists. The first will hold the futures which refer to jobs to be done. While the second holds the results of these completed futures. We begin by adding a call to our self.extract_async function for each URL we have passed into the function. These are futures, in the sense that they are tasks which will be completed in the future.

To gather the results from these futures, we use asyncio’s as_completed function, which will iterate over the completed futures and gather the results into our results list. This function will essentially block until all of the futures are completed, meaning that we end up returning a list of completed results.

Running Our Crawler

Python
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
    def parser(self, data):
        raise NotImplementedError
 
    async def crawl_async(self):
        to_fetch = [self.start_url]
        results = []
        for depth in range(self.crawl_depth + 1):
            batch = await self.extract_multi_async(to_fetch)
            to_fetch = []
            for url, data, found_urls in batch:
                data = self.parser(data)
                results.append((depth, url, data))
                to_fetch.extend(found_urls)
        await self.session.close()
        return results

We have a parser function defined here which will by default raise a NotImplementedError. So in order to use our crawler, we will have to sub class our crawler and write our own parsing function.  Which will do in a minute.

Our main function kicks everything off. We start off by scraping our start URL, and returning a batch of results. We then iterate over our results pulling out the URL, data, and new URLs from each result. We then send the HTML off to be parsed, before appending the relevant data to our list of results. While adding the new URLs to our to_fetch variable. We keep continuing this process until we have reached our max crawl depth, and return all the results collected during the crawl.

Sub Classing & Running the Crawler

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
class GuardianCrawler(AsyncCrawler):
 
    def parser(self, data):
        dom = lh.fromstring(data)
        title = dom.cssselect('title')
        if title:
            title = title[0].text
        return {'title': title}
 
 
if __name__ == '__main__':
    url = 'https://www.theguardian.com'
    crawler = GuardianCrawler(url, 3)
    future = asyncio.Task(crawler.crawl_async())
    loop = asyncio.get_event_loop()
    loop.run_until_complete(future)
    loop.close()
    result = future.result()
    print(len(result))

Sub-classing the crawler is very simple, as we are able to write any function we wish to handle the HTML data returned by our crawler. The above function simply tries to extract the title from each page found by our crawler.

We can the call the crawler in a similar way to how we would call an individual asyncio function. We first initialize our class, before creating future with the asyncio.Task function passing in our crawl_async function. We then need an event loop to run this function in, which we create and run until the function has completed. We then close the loop and grab the results from our future by calling .result() on our completed future.

Full Code

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
import asyncio
import logging
import aiohttp
from urllib.parse import urljoin, urlparse
from lxml import html as lh
 
 
class AsyncCrawler:
 
    def __init__(self, start_url, crawl_depth, max_concurrency=200):
        self.start_url = start_url
        self.base_url = '{}://{}'.format(urlparse(self.start_url).scheme, urlparse(self.start_url).netloc)
        self.crawl_depth = crawl_depth
        self.seen_urls = set()
        self.session = aiohttp.ClientSession()
        self.bounded_sempahore = asyncio.BoundedSemaphore(max_concurrency)
 
    async def _http_request(self, url):
        print('Fetching: {}'.format(url))
        async with self.bounded_sempahore:
            try:
                async with self.session.get(url, timeout=30) as response:
                    html = await response.read()
                    return html
            except Exception as e:
                logging.warning('Exception: {}'.format(e))
 
    def find_urls(self, html):
        found_urls = []
        dom = lh.fromstring(html)
        for href in dom.xpath('//a/@href'):
            url = urljoin(self.base_url, href)
            if url not in self.seen_urls and url.startswith(self.base_url):
                found_urls.append(url)
        return found_urls
 
    async def extract_async(self, url):
        data = await self._http_request(url)
        found_urls = set()
        if data:
            for url in self.find_urls(data):
                found_urls.add(url)
        return url, data, sorted(found_urls)
 
    async def extract_multi_async(self, to_fetch):
        futures, results = [], []
        for url in to_fetch:
            if url in self.seen_urls: continue
            self.seen_urls.add(url)
            futures.append(self.extract_async(url))
 
        for future in asyncio.as_completed(futures):
            try:
                results.append((await future))
            except Exception as e:
                logging.warning('Encountered exception: {}'.format(e))
        return results
 
    def parser(self, data):
        raise NotImplementedError
 
    async def crawl_async(self):
        to_fetch = [self.start_url]
        results = []
        for depth in range(self.crawl_depth + 1):
            batch = await self.extract_multi_async(to_fetch)
            to_fetch = []
            for url, data, found_urls in batch:
                data = self.parser(data)
                results.append((depth, url, data))
                to_fetch.extend(found_urls)
        await self.session.close()
        return results
 
 
class GuardianCrawler(AsyncCrawler):
 
    def parser(self, data):
        dom = lh.fromstring(data)
        title = dom.cssselect('title')
        if title:
            title = title[0].text
        return {'title': title}
 
 
if __name__ == '__main__':
    url = 'https://www.theguardian.com'
    crawler = GuardianCrawler(url, 3)
    future = asyncio.Task(crawler.crawl_async())
    loop = asyncio.get_event_loop()
    loop.run_until_complete(future)
    loop.close()
    result = future.result()
    print(len(result))
ba knife Water Shoes Quick Drying Casual 13720 Shoes Lightweight Mesh Slip-on Athletic Sport Casual Sneakers for Men and Women Gray e117b90

 

ba knife Water Shoes Quick Drying Casual 13720 Shoes Lightweight Mesh Slip-on Athletic Sport Casual Sneakers for Men and Women Gray e117b90 - bewegungswerkstatt.website Writing A Web Crawler in Golang Edmund Martin Leave a comment

I have previously written a piece looking at how to write a web crawler using Go and popular framework Colly. However, it is relatively simple to write a relatively powerful web crawler in Golang without the help of any frameworks. In this post, we are going to write a web crawler using just Golang and the Goquery package to extract HTML elements. All in all, we can write a  fast but relatively basic web crawler in around a 130 lines of code.

Defining Our Parser Interface

Go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
package democrawl
 
import (
"fmt"
"net/http"
"net/url"
"strings"
 
"github.com/PuerkitoBio/goquery"
)
 
type ScrapeResult struct {
URL   string
Title string
H1    string
}
 
type Parser interface {
ParsePage(*goquery.Document) ScrapeResult
}

First, we import all the packages we need from the standard library. We then pull in goquery, which we will use to extract data from the HTML returned by our crawler. If you don’t already have goquery, you will need to go grab it with go get.

When then define our our ScrapeResult struct, which contains some very simple data regarding the page. This could easily be expanded to return more useful information or to extract certain valuable information. When then define a Parser interface which allows users of our democrawl package to define their own parser to use with the basic crawling logic.

Making HTTP Requests

Go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
func getRequest(url string) (*http.Response, error) {
client := &http.Client{}
 
req, _ := http.NewRequest("GET", url, nil)
req.Header.Set("User-Agent", "Mozilla/5.0 (compatible; Googlebot/2.1; +http://www.google.com/bot)")
 
res, err := client.Do(req)
 
if err != nil {
return nil, err
}
 
return res, nil
}

We are going to write a function which simply attempts to grab a page by making a GET request. The function simply takes in a URL, and makes a request using the default Googlebot agent, to hopefully avoid any detection. Should we encounter no issues, we simply return a pointer to the http.Response. Should something go wrong we return nil and the error thrown by the GET request.

Extracting Links And Resolving Relative URLs

Our crawl is going to restrict itself to crawling URLs found on the domain of our start URL. To achieve this, we are going to write two functions. Firstly, we are going to write a function which discovers all the links on a page. Then we will need a function to resolve relative URLs (URLs starting with “/”).

1
2
3
4
5
6
7
8
9
10
11
func extractLinks(doc *goquery.Document) []string {
foundUrls := []string{}
if doc != nil {
doc.Find("a").Each(func(i int, s *goquery.Selection) {
res, _ := s.Attr("href")
foundUrls = append(foundUrls, res)
})
return foundUrls
}
return foundUrls
}

Our extract links function takes in a pointer to a goquery Document and returns a slice of string. This is relatively easy to do. We simply create a new slice of strings. Should we have passed in a document, we simply find each link element and extract it’s href attribute. This is then added to our slice of URLs.

Go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
func resolveRelative(baseURL string, hrefs []string) []string {
internalUrls := []string{}
 
for _, href := range hrefs {
if strings.HasPrefix(href, baseURL) {
internalUrls = append(internalUrls, href)
}
 
if strings.HasPrefix(href, "/") {
resolvedURL := fmt.Sprintf("%s%s", baseURL, href)
internalUrls = append(internalUrls, resolvedURL)
}
}
 
return internalUrls
}

We then have our resolveRelative function. As the name suggests this function resolves relative links and returns us a slice of all the internal links we found on a page. We simply iterate over our slice of foundUrls, if the URL starts with the sites baseURL we add it straight to our slice. If the URL begins with “/”, we do some string formatting to get the absolute URL in question. Should the URL not belong to the domain we are crawling we simply skip it.

Crawling A Page

Go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
func crawlPage(baseURL, targetURL string, parser Parser, token chan struct{}) ([]string, ScrapeResult) {
 
token <- struct{}{}
fmt.Println("Requesting: ", targetURL)
resp, _ := getRequest(targetURL)
<-token
 
doc, _ := goquery.NewDocumentFromResponse(resp)
pageResults := parser.ParsePage(doc)
links := extractLinks(doc)
foundUrls := resolveRelative(baseURL, links)
 
return foundUrls, pageResults
}

We can then start bring all of our work together with a function that crawls a single page. This function takes a number of arguments, we pass in our base URL and the URL we want to scrape. We also pass in the parser we have defined in our main.go function. We also pass in a channel of empty structs, which we use as a semaphore. This allows us to limit the number of requests we make in parallel, as reading from a channel in the above manner is blocking.

We make our requests, then create a goquery Document from the response. This document is used by both our ParsePage function and our extractLinks function. We then resolve the found URLs, before returning them and the results found by the our parser.

Getting Our Base URL

Go
1
2
3
4
func parseStartURL(u string) string {
parsed, _ := url.Parse(u)
return fmt.Sprintf("%s://%s", parsed.Scheme, parsed.Host)
}

We can pull out our baseURL by using the net/url package’s Parse function. This allows us to simply parse our start URL into our main Crawl function. After we parse the URL, we simply join together the scheme and host using basic string formatting.

Crawl Function

Go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
func Crawl(startURL string, parser Parser, concurrency int) []ScrapeResult {
results := []ScrapeResult{}
worklist := make(chan []string)
var n int
n++
var tokens = make(chan struct{}, concurrency)
go func() { worklist <- []string{startURL} }()
seen := make(map[string]bool)
baseDomain := parseStartURL(startURL)
 
for ; n > 0; n-- {
list := <-worklist
for _, link := range list {
if !seen[link] {
seen[link] = true
n++
go func(baseDomain, link string, parser Parser, token chan struct{}) {
foundLinks, pageResults := crawlPage(baseDomain, link, parser, token)
results = append(results, pageResults)
if foundLinks != nil {
worklist <- foundLinks
}
}(baseDomain, link, parser, tokens)
}
}
}
return results
}

Our crawl function brings together all the other functions we have written and contains quite a lot of it’s own logic. We begin by creating a empty slice of ScrapeResult’s. We then create a workList channel which will contain a list of URLs to scrape. We also initialize an integer value and set it to one. We also create a channel of tokens which will be passed into our crawl page function and limit the total concurrency as defined when we launch the crawler. We then parse our start URL, to get our baseDomain which is used in multiple places within our crawling logic.

Our main for loop is rather complicated. But we essentially create a new goroutine for each item, in our work list. This doesn’t mean we scrape every page at once, due to the fact that we use our tokens channel as a semaphore. We call our crawlPage function, pulling out the results from our parser and all the internal links found. These foundLinks are then put into our workList and the process continues until we run out of new links to crawl.

Our main.go file

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
package main
 
import (
"github.com/EdmundMartin/democrawl"
"github.com/PuerkitoBio/goquery"
)
 
type DummyParser struct {
}
 
func (d DummyParser) ParsePage(doc *goquery.Document) democrawl.ScrapeResult {
data := democrawl.ScrapeResult{}
data.Title = doc.Find("title").First().Text()
data.H1 = doc.Find("h1").First().Text()
return democrawl.ScrapeResult{}
}
 
func main() {
d := DummyParser{}
democrawl.Crawl("https://www.theguardian.com/uk", d, 10)
}

We can then write a very simple main.go function where we create an instance of our parser. Then simply call our Crawl function, and watch our crawler go out and collect results. It should be noted that the crawler is very fasted and should be used with very low levels of concurrency in most instances. The democrawl repo can be found on my Github, feel free to use the code and expand and modify it to fit your needs.

ba knife Water Shoes Quick Drying Casual 13720 Shoes Lightweight Mesh Slip-on Athletic Sport Casual Sneakers for Men and Women Gray e117b90 - bewegungswerkstatt.website Writing a Web Crawler with Golang and Colly Edmund Martin 4 Comments

This blog features multiple posts regarding building Python web crawlers, but the subject of building a crawler in Golang has never been touched upon. There are a couple of frameworks for building web crawlers in Golang, but today we are going to look at building a web crawler using Colly. When I first started playing with the framework, I was shocked how quick and easy it was to build a highly functional crawler with very few lines of Go code.

In this post we are going to build a crawler, which crawls this site and extracts the URL, title and code snippets from every Python post on the site. To write such a crawler we only need to write a total of 60 lines of code! Colly requires an understanding of CSS Selectors which is beyond the scope of this post, but I recommend you take a look at this cheat sheet.

Setting Up A Crawler

Go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
package main
 
import (
"fmt"
"strings"
"time"
 
"github.com/gocolly/colly"
)
 
type Article struct {
ArticleTitle string
URL          string
CodeSnippets []string
}
 
func main() {
 
c := colly.NewCollector(
colly.AllowedDomains("edmundmartin.com"),
)
 
c.Limit(&colly.LimitRule{
DomainGlob:  ".*edmundmartin.*",
Parallelism: 1,
Delay:       1 * time.Second,
})

To begin with we are going to set up our crawler and create the data structure to store our results in. First, of all we need to install Colly using the go get command. Once this is done we create a new struct which will represent an article, and contains all the fields we are going to be collecting with our simple example crawler.

With this done, we can begin writing our main function. To create a new crawler we must create a NewCollector, which itself returns a Collector instance. The NewCollector function takes a list of functions which are used to initialize our crawler. In our case we are only calling one function within our NewCollector function, which is limiting our crawler to pages found on “edmundmartin.com”.

Having done this we then place some limits on our crawler. As Golang, is a very performant and many websites are running on relatively slow servers we probably want to limit the speed of our crawler. Here, we are setting up a limiter which matches everything contains “edmundmartin” in the URL. By setting the parallelism to 1 and setting a delay of a second, we are ensuring that we only crawl one URL a second.

Basic Crawling Logic

Go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
detailCollector := c.Clone()
 
allArticles := []Article{}
 
c.OnRequest(func(r *colly.Request) {
fmt.Println("Visiting: ", r.URL.String())
})
 
c.OnHTML(`a[href]`, func(e *colly.HTMLElement) {
foundURL := e.Request.AbsoluteURL(e.Attr("href"))
if strings.Contains(foundURL, "python") {
detailCollector.Visit(foundURL)
} else {
c.Visit(foundURL)
}
})

To collect data from our target site, we need to create a clone of our Colly collector. We also create a slice of our ‘Article’ struct to store the results we will be collecting. We also add a callback to our crawler which will fire every time we make a new request, this callback just prints the URL which are crawler will be visiting.

We then add another “OnHTML” callback which is fired every time the HTML is returned to us. This is attached to our original Colly collector instance and not the clone of the Collector. Here we pass in CSS Selector, which pulls out all of the href’s on the page. We can also use some logic contained within the Colly framework which allows us to resolve to URL in question. If URL contains ‘python’, we submit it to our cloned to Collector, while if ‘python’ is absent from the URL we simply visit the page in question. This cloning of our collector allows us to define different OnHTML parsers for each clone of original crawler.

Extracting Details From A Post ba knife Water Shoes Quick Drying Casual 13720 Shoes Lightweight Mesh Slip-on Athletic Sport Casual Sneakers for Men and Women Gray e117b90

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
detailCollector.OnHTML(`div.post-inner-content`, func(e *colly.HTMLElement) {
fmt.Println("Scraping Content ", e.Request.URL.String())
article := Article{}
article.URL = e.Request.URL.String()
article.ArticleTitle = e.ChildText("h1")
 
e.ForEach("div.crayon-main", func(_ int, el *colly.HTMLElement) {
codeSnip := el.ChildText("table.crayon-table")
article.CodeSnippets = append(article.CodeSnippets, codeSnip)
})
fmt.Println("Found: ", article)
allArticles = append(allArticles, article)
})
 
c.Visit("http://edmundmartin.com")
}

We can now add an ‘OnHTML’ callback to our ‘detailCollector’ clone. Again we use a CSS Selector to pull out the content of each post contained on the page. From this we can extract the text contained within the post’s “H1” tag. We finally, then pick out all of the ‘div’ containing the class ‘crayon-main’, we then iterate over all the elements pulling out our code snippets. We then add our collected data to our slice of Articles.

All there is left to do, is start of the crawler by calling our original collector’s ‘Visit’ function with our start URL. The example crawler should finish within around 20 seconds. Colly makes it very easy to write powerful crawlers with relatively little code. It does however take a little while to get used the callback style of the programming.

Full Code

Example Colly Crawler
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
package main
 
import (
"fmt"
"strings"
"time"
 
"github.com/gocolly/colly"
)
 
type Article struct {
ArticleTitle string
URL          string
CodeSnippets []string
}
 
func main() {
 
c := colly.NewCollector(
colly.AllowedDomains("edmundmartin.com"),
)
 
c.Limit(&colly.LimitRule{
DomainGlob:  ".*edmundmartin.*",
Parallelism: 1,
Delay:       1 * time.Second,
})
 
detailCollector := c.Clone()
 
allArticles := []Article{}
 
c.OnRequest(func(r *colly.Request) {
fmt.Println("Visiting: ", r.URL.String())
})
 
c.OnHTML(`a[href]`, func(e *colly.HTMLElement) {
foundURL := e.Request.AbsoluteURL(e.Attr("href"))
if strings.Contains(foundURL, "python") {
detailCollector.Visit(foundURL)
} else {
c.Visit(foundURL)
}
})
 
detailCollector.OnHTML(`div.post-inner-content`, func(e *colly.HTMLElement) {
fmt.Println("Scraping Content ", e.Request.URL.String())
article := Article{}
article.URL = e.Request.URL.String()
article.ArticleTitle = e.ChildText("h1")
 
e.ForEach("div.crayon-main", func(_ int, el *colly.HTMLElement) {
codeSnip := el.ChildText("table.crayon-table")
article.CodeSnippets = append(article.CodeSnippets, codeSnip)
})
fmt.Println("Found: ", article)
allArticles = append(allArticles, article)
})
 
c.Visit("http://edmundmartin.com")
}

 

ba knife Water Shoes Quick Drying Casual 13720 Shoes Lightweight Mesh Slip-on Athletic Sport Casual Sneakers for Men and Women Gray e117b90 - bewegungswerkstatt.website Image Classification with TFLearn and Python Edmund Martin Leave a comment

In today’s post we are going to walk through how to build a flexible image classifier using TFLearn and Python. For those not familiar with TFLearn, it is a wrapper around the very popular Tensorflow library from Google. Building a image classifier with TFLearn is relatively simple and today we are going to walk through how to build your own image classifier.

Imports

Python
1
2
3
4
5
6
7
import numpy as np
from skimage import io
from scipy.misc import imresize
import tflearn
from tflearn.layers.conv import conv_2d, max_pool_2d
from tflearn.layers.core import input_data, dropout, fully_connected
from tflearn.layers.estimator import regression

We are going to need to import a number of different libraries in order to build our classifier. For users on Windows the easiest way to install the Scipy library is to use the pre-compiled wheel which can be found here. Once you have installed all the required imports, we can start building our ImageClassify class.

Initializing Our Class

Python
1
2
3
4
5
6
7
8
9
10
class ImageClassify:
 
    def __init__(self, class_names, image_size=100, learning_rate=0.001, test_split=0.1):
        self.model = None
        self.image_size = image_size
        self.learning_rate = learning_rate
        self.classes = [class_name.lower() for class_name in class_names]
        self.image_data = []
        self.labels = []
        self.test_split = test_split

When initializing our class, we are going to need to know a few pieces of information. We are going to need a list of class names. These are the names of the different objects that our classifier is going to classify. We also need to pass in an image size, the classifier will automatically resize our images into a square image of the specified size. So, if we pass in a value of 100, our classifier will end up resizing our images to be 100×100 pixels in size.

Generally, the larger the image size the better the classification we will end up with. This is provided that your images are larger than the specified value. It should be warned that using larger images will increase the time taken to train the algorithm. We store this value in self.image_size.

We also pass in default values for our learning rate and test split. The learning rate dictates how quickly the machine learning algorithm will discover new features. As a default value 0.001 tends to work well. The test split, defines what percentage of samples we will use to validate our model against. Again, using around ten percent of samples for your test size works pretty well.

We also create empty lists which will end up holding our image data and their respective labels

Labeling An Image

Python
1
2
3
4
5
6
    def _extract_label(self, image_name):
        zeros = [0 for i in range(len(self.classes))]
        label_name = image_name.split('.')[0]
        index = self.classes.index(label_name.lower())
        zeros[index] = 1
        return zeros

Our extract label function takes an image and extracts the label from the image. We begin by creating an array of zeroes, with a zero for each class to be trained. We then split the file name of the image. Our extract label functions expects images in the following format “class.number.png”. Using this format allows us to extract the class name directly from the image. We then look up the index of the class label and set that value in our array of zeros to a 1. Before returning the array itself.

Processing Images

Python
1
2
3
4
5
6
7
8
9
10
    def _process_image(self, image):
        label = self._extract_label(image)
        img = io.imread(image)
        img = imresize(img, (self.image_size, self.image_size, 3))
        self.image_data.append(np.array(img))
        self.labels.append(np.array(label))
 
    def prepare_data(self, images):
        for image in images:
            self._process_image(image)

Our process image function first calls our label function. We then read the image using skimage’s io.imread function. We then resize this image to the size specified when we initialized the class. We then append the image data and the labels to self.image_data and self.labels respectively.

Processing images is simply involves us using our process image function on every single image we provide to our image classification class.

Building Our Model

Python
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
    def build_model(self):
        convnet = input_data(shape=[None, self.image_size, self.image_size, 3], name='input')
        convnet = conv_2d(convnet, 32, 5, activation='relu')
        convnet = max_pool_2d(convnet, 5)
        convnet = conv_2d(convnet, 64, 5, activation='relu')
        convnet = max_pool_2d(convnet, 5)
        convnet = conv_2d(convnet, 128, 5, activation='relu')
        convnet = max_pool_2d(convnet, 5)
        convnet = conv_2d(convnet, 64, 5, activation='relu')
        convnet = max_pool_2d(convnet, 5)
        convnet = conv_2d(convnet, 32, 5, activation='relu')
        convnet = max_pool_2d(convnet, 5)
        convnet = fully_connected(convnet, 1024, activation='relu')
        convnet = dropout(convnet, 0.8)
        convnet = fully_connected(convnet, len(self.classes), activation='softmax')
        convnet = regression(convnet, optimizer='adam', learning_rate=self.learning_rate, loss='categorical_crossentropy',
                             name='targets')
        model = tflearn.DNN(convnet, tensorboard_dir='log')
        return model

Our build model function simply builds us a convolutional net model, using the parameters we defined when initializing our class. Explaining the working’s of the net are probably beyond the scope of this post. But I will just note that creating our model like this allows our classifier to be used with images of any size and datasets with any number of classes. Creating a build model function also makes it easier to load and predict using pre-trained models.

Training Our Model

Python
1
2
3
4
5
6
7
8
9
10
11
    def train_model(self, model_name, epochs=5, batch_size=32):
        X = self.image_data
        y = self.labels
        split = int(len(X) * self.test_split)
        X_train, X_test = X[split:], X[:split]
        y_train, y_test = y[split:], y[:split]
        model = self.build_model()
        model.fit(X_train, y_train, n_epoch=epochs, shuffle=True, validation_set=(X_test, y_test), show_metric=True,
                  batch_size=batch_size)
        model.save(model_name)
        self.model = model

Our train_model function, takes a model name, epochs and a batch size parameter. The epochs parameter determines the number of times the model will be run over the entirety of the dataset. The batch size determines the number of samples to be run through the model at once. Generally, the more epochs the more accurate the model will be. Though too many epochs may mean that your model over fits the dataset and you end up with rather inaccurate predictions when you use the model to make predictions. If accuracy hits 100% and loss goes to 0, this is a very strong indication that you have over fit.

We first begin by creating X, y variables using the self.image_data and self.labels variables. We then use our self.test_split value to split the dataset up into training and test sets. We then call the build model function. We then call the fit method on model using both the train and test sets for validation purposes.

Once we have finished training the model. We save the model using the passed in model name and set self.model to be equivalent to our newly trained model.

Loading A Pre-Trained Model & Predicting Images

Python
1
2
3
4
    def load_model(self, model_file):
        model = self.build_model()
        model.load(model_file)
        self.model = model

We can define a very simple function to load a model. This will be useful when we need to predict images sometime after we have trained a model. We can load a model by simply passing in the model’s name.

Python
1
2
3
4
    def _image_to_array(self, image):
        img = io.imread(image)
        img = imresize(img, (self.image_size, self.image_size, 3))
        return img

We then need another function to take an image and transform it to something we can use in our prediction function. This is much like our process image function, with the exception that we have no need to label the image.

Python
1
2
3
4
5
6
7
8
    def predict_image(self, image):
        img = self._image_to_array(image)
        results = self.model.predict([img])[0]
        most_probable = max(results)
        results = list(results)
        most_probable_index = results.index(most_probable)
        class_name = self.classes[most_probable_index]
        return class_name, results

Our predict image function takes a path to an image file. We call our _image_to_array function this data can then be fed straight into the model. Our model will then output an array of probabilities. We can line these up with the classes which we provided to the Image Classify class. We then pull out the most probable label, before returning this and the list of probabilities.

Example Usage: Training A Model

1
2
3
4
5
6
import glob
from image_classify import ImageClassify
images = glob.glob('*.png')
c = ImageClassify(['yes', 'not'], image_size=100, learning_rate=0.001)
c.prepare_data(images)
c.train_model('my_example_model')

Example Usage: Making  A Prediction With Already Trained Model

Python
1
2
3
4
5
from image_classify import ImageClassify
 
c = ImageClassify(['yes', 'not'], image_size=100, learning_rate=0.001)
c.load_model('my_example_model')
results = c.predict_image('road_sign.jpg')

Full Code & Example Dataset

The full code and an example data set can be found on my Github here. The Github also contains another image classification model which makes use of Google’s Googlenet model. This model is very highly accurate but takes a considerable amount of time to train and is likely to need to be run for a greater number of epics.

ba knife Water Shoes Quick Drying Casual 13720 Shoes Lightweight Mesh Slip-on Athletic Sport Casual Sneakers for Men and Women Gray e117b90 - bewegungswerkstatt.website Text Classification with Python & NLTK Edmund Martin Leave a comment

Machine learning frameworks such as Tensorflow and Keras are currently all the range, and you can find several tutorials demonstrating the usage of CNN (Convolutional Neural Nets) to classify text. Often this can be overkill and, in this post we are going to show you how to classify text using Python’s NLTK library. The NLTK (Natural Language Toolkit) provides Python users with a number of different tools to deal with text content and provides some basic classification capabilities.

Input Data

In the example, I’m using a set of 10,000 tweets which have been classified as being positive or negative. Our classifier is going to take import in CSV format, with the left column containing the tweet and the right column containing the label. An example of the data can be found below:

1
2
"simplistic , silly and tedious . ",Negative
"it's so laddish and juvenile , only teenage boys could possibly find it funny . ",Negative

Using your own data is very simple and simply requires that your left column contains your text document, while the column on the right contains the correct label. Allowing our classifier to classify a wide range of documents with labels of your choosing. The data used for this example can be downloaded here.

The Bag of Words Approach

We are going to use a bag of words approach. Simply put, we just take a certain number of the most common words found throughout our data set and then for each document we check whether the document contains this word. The bag of words approach is conceptually simple and doesn’t require us to pad documents to ensure that every document in our sample set is the same length. However, the bag of words approach tends to be less accurate than using a word embedding approach. By simply checking whether a document contains a certain set of words we miss out on a lot of valuable information – including the position of the words in a said document. Despite this we can easily train a classifier which can achieve 80%+ accuracy.

Initialising our class and reading our CSV file

Python
1
2
3
4
5
6
7
8
9
10
class ClassifierCSV:
 
    def __init__(self, csv_file, featureset_size=1000, test_ratio=0.1):
        self.csv_file = csv_file
        self.documents = []
        self.words = []
        self.featureset_size = featureset_size
        self.test_ratio = test_ratio
        self.feature_words = None
        self.classifier = None

Our CSV classifier is going to take several arguments. Firstly, we pass the name of our CSV file. Then as optional parameters we pass featureset_size and a test ratio. By default, our classifier will use the most 1,000 common words found in our dataset to create our feature set. Additionally, we will test the accuracy of our classifier against 10% of the items contained in our data set. We then initialise a few variables which will be used later by our classifier.

1
2
3
4
5
6
7
8
    def _read_csv(self):
        with open(self.csv_file, 'r') as input_csv:
            for item in input_csv:
                item = item.split(',')
                doc, label = re.findall('\w+', ''.join(item[:-1]).lower()), item[-1].strip()
                for word in doc:
                    self.words.append(word.lower())
                self.documents.append((doc, label))
Water Shoes

We then come on to reading our CSV file. We simply iterate through each line and before splitting by our line by commas. The text after our last comma is the documents label, while everything to the left is the document in question. By applying a regex to our document, we produce a list of words contained in the said document. In the example, I used a very simple regex to pull out the words, but it is possible to replace this with a more complex tokenizer. For each word in the document we append this to the list of words. This will allow us to determine the frequency that words occur in our dataset. We also place the list of words found in our document into the variable where we store all the documents found in our dataset.

Extracting Word Features

Python
1
2
3
4
5
    def _generate_word_features(self):
        frequency_dist = nltk.FreqDist()
        for word in self.words:
            frequency_dist[word] += 1
        self.feature_words = list(frequency_dist)[:self.featureset_size]

We then write our functions for handling generating our feature set. Here, we use NLTK’s Frequency Dist class to store the frequency in which different words where found throughout the dataset. We then iterate through all of the words in the document, creating a new record should we have not seen the word before and incrementing the count should it have already been found. We then limit our bag of words to be equal to the feature set size we passed when we initialised the class.

Python
1
2
3
4
5
6
    def __document_features(self, document):
        document_words = set(document)
        features = {}
        for word in self.feature_words:
            features['contains({})'.format(word)] = (word in document_words)
        return features

Now we have got a list of the most frequently found words, we can write a function to generate features for each of the documents in our dataset. As we are using a bag of words approach we are only interested in whether the document contains each word contained in the 1,000 most frequent words. If we find the word we return True, otherwise we return False. Eventually, we get a dictionary of 1,000 features which will be used train the classifier.

Training

Python
1
2
3
4
5
6
7
8
9
10
11
    def train_naive_bayes_classifier(self):
        if not self.feature_words:
            self._read_csv()
            self._generate_word_features()
        shuffle(self.documents)
        feature_sets = [(self.__document_features(d), c) for (d, c) in self.documents]
        cutoff = int(len(feature_sets) * self.test_ratio)
        train_set, test_set = feature_sets[cutoff:], feature_sets[:cutoff]
        self.classifier = nltk.NaiveBayesClassifier.train(train_set)
        print('Achieved {0:.2f}% accuracy against training set'.format(nltk.classify.accuracy(self.classifier, train_set)*100))
        print('Achieved {0:.2f}% accuracy against test set'.format(nltk.classify.accuracy(self.classifier, test_set)*100))

We start by shuffling the documents. Some algorithms and classifiers can be sensitive to the order of data. This makes important to shuffle our data before training. We then use our feature set function within a list comprehension which returns us a list of tuples containing our feature set dictionary and the documents label. We then calculate where to split our data into training and test sets. The test set allows us to check how the classifier performs against an unseen dataset. We can then pass our data set to nltk’s naïve bayes classifier. The actual training may take some time and will take longer the larger the dataset used. We then check the classifiers accuracy against both the training and test set. In all likelihood the classifier will perform significantly better against the training set.

Classifying New Documents

Python
1
2
3
4
5
6
7
8
    def classify_new_sentence(self, sentence):
        if not self.feature_words:
            self._read_csv()
            self._generate_word_features()
        test_features = {}
        for word in self.feature_words:
            test_features['contains({})'.format(word.lower())] = (word.lower() in nltk.word_tokenize(sentence))
        return self.classifier.classify(test_features)

Once we have trained a classifier we can then write a function to classify new documents. If we have not already loaded our CSV file and generated the word features, we will have to do this before classifying our new document. We then simply make generate a new set of features for this document before passing this to our classifier class. We will then pass this to our classifier and call the classify method with the feature set. The function will return the string of the predicted label.

Saving and Loading Model

Python
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
    def save_model(self, filename):
        save_classifier = open(filename, "wb")
        pickle.dump(self.classifier, save_classifier)
        save_classifier.close()
        save_vocab = open('vocab-{}'.format(filename), "wb")
        pickle.dump(self.feature_words, save_vocab)
        save_vocab.close()
 
    def load_model(self, model_filename, vocab_filename):
        classifier_f = open(model_filename, "rb")
        self.classifier = pickle.load(classifier_f)
        classifier_f.close()
        vocab_f = open(vocab_filename, "rb")
        self.feature_words = pickle.load(vocab_f)
        vocab_f.close()

Rather than training the model every time we want to classify the sentence, it would make sense to save the model. We can write two simple functions to allow us to reuse our model whenever we want. The save functions simply saves our classifier and feature words to objects to files, which can then be reloaded by our load model function.

Accuracy Algorithm Train Test Naive Bayes Classifier (NLTK) 84.09% 72.89% BernouliNB (Sklearn) 83.93% 79.78% MultinomiaNB (Sklearn) 84.58% 74.67% LogisticRegression (Sklearn) 89.05% 75.33% SGDClassifier (Sklearn) 81.23% 69.32%

The algorithm performs relatively well against our example data. Being able to correctly classify whether a Tweet is positive or negative around 72% of the time. NLTK gives it’s users the option to replace the standard Naive Bayes Classifier with a number of other classifiers found in the Sci-kit learn package. I ran the same test swapping in these classifiers for the Naive Bayes Classifier, and a number of these classifiers significantly outperformed the standard naive classifier. As you can see the BernouliNB model performed particularly well, correctly classifying documents around 80% of the time.

The accuracy of the classifier could further be improved by using something called an ensemble classifier. To build an ensemble classifier we would simply build several models using different classifiers and then classify new documents against all of these classifiers. We could then select the answer which was provided by the majority of our classifiers (hard voting classifier).  Such a classifier would likely outperform just using on of the above classifiers. The full code below provides a function that allows you to try out other Sklearn classifiers.

Example Usage

Python
1
2
3
4
5
c = ClassifierCSV('example-dataset.csv', featureset_size=3000)
c.train_naive_bayes_classifier()
c.classifier.show_most_informative_features(15)
label = c.classify_new_sentence("This was an amazing movie")
print(label)

The class is pretty easy to use. The above code outlines all of the steps required to train a classifier and classify an unseen sentence. More usage examples and the full code can be found on Github here.

Full Code

Python
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
import re
import nltk
from nltk.classify.scikitlearn import SklearnClassifier
from random import shuffle
import pickle
 
 
class ClassifierCSV:
 
    def __init__(self, csv_file, featureset_size=1000, test_ratio=0.1):
        self.csv_file = csv_file
        self.documents = []
        self.words = []
        self.featureset_size = featureset_size
        self.test_ratio = test_ratio
        self.feature_words = None
        self.classifier = None
 
    def _read_csv(self):
        with open(self.csv_file, 'r') as input_csv:
            for item in input_csv:
                item = item.split(',')
                doc, label = re.findall('\w+', ''.join(item[:-1]).lower()), item[-1].strip()
                for word in doc:
                    self.words.append(word.lower())
                self.documents.append((doc, label))
 
    def _generate_word_features(self):
        frequency_dist = nltk.FreqDist()
        for word in self.words:
            frequency_dist[word] += 1
        self.feature_words = list(frequency_dist)[:self.featureset_size]
 
    def __document_features(self, document):
        document_words = set(document)
        features = {}
        for word in self.feature_words:
            features['contains({})'.format(word)] = (word in document_words)
        return features
 
    def train_naive_bayes_classifier(self):
        if not self.feature_words:
            self._read_csv()
            self._generate_word_features()
        shuffle(self.documents)
        feature_sets = [(self.__document_features(d), c) for (d, c) in self.documents]
        cutoff = int(len(feature_sets) * self.test_ratio)
        train_set, test_set = feature_sets[cutoff:], feature_sets[:cutoff]
        self.classifier = nltk.NaiveBayesClassifier.train(train_set)
        print('Achieved {0:.2f}% accuracy against training set'.format(nltk.classify.accuracy(self.classifier, train_set)*100))
        print('Achieved {0:.2f}% accuracy against test set'.format(nltk.classify.accuracy(self.classifier, test_set)*100))
 
    def train_sklearn_classifier(self, sk_learn_classifier):
        if not self.feature_words:
            self._read_csv()
            self._generate_word_features()
        shuffle(self.documents)
        feature_sets = [(self.__document_features(d), c) for (d, c) in self.documents]
        cutoff = int(len(feature_sets) * self.test_ratio)
        train_set, test_set = feature_sets[cutoff:], feature_sets[:cutoff]
        self.classifier = SklearnClassifier(sk_learn_classifier()).train(train_set)
        print('Achieved {0:.2f}% accuracy against training set'.format(nltk.classify.accuracy(self.classifier, train_set)*100))
        print('Achieved {0:.2f}% accuracy against test set'.format(nltk.classify.accuracy(self.classifier, test_set)*100))
 
    def classify_new_sentence(self, sentence):
        if not self.feature_words:
            self._read_csv()
            self._generate_word_features()
        test_features = {}
        for word in self.feature_words:
            test_features['contains({})'.format(word.lower())] = (word.lower() in nltk.word_tokenize(sentence))
        return self.classifier.classify(test_features)
 
    def save_model(self, filename):
        save_classifier = open(filename, "wb")
        pickle.dump(self.classifier, save_classifier)
        save_classifier.close()
        save_vocab = open('vocab-{}'.format(filename), "wb")
        pickle.dump(self.feature_words, save_vocab)
        save_vocab.close()
 
    def load_model(self, model_filename, vocab_filename):
        classifier_f = open(model_filename, "rb")
        self.classifier = pickle.load(classifier_f)
        classifier_f.close()
        vocab_f = open(vocab_filename, "rb")
        self.feature_words = pickle.load(vocab_f)
        vocab_f.close()

 

ba knife Water Shoes Quick Drying Casual 13720 Shoes Lightweight Mesh Slip-on Athletic Sport Casual Sneakers for Men and Women Gray e117b90 - bewegungswerkstatt.website Scraping Baidu with Python Edmund Martin Leave a comment

 

What’s Baidu?

Baidu is China’s largest search engine and has been since Google left the market in {year}. As companies look to move into the Chinese market, there has been more and more interest in scraping search results from Baidu.

Scraping Baidu

Scraping Baidu is a relatively simple task. When scraping results from Baidu there is only minor challenge, the URLs displayed on the Baidu results page are found nowhere in the HTML. Baidu links to the sites displayed on the search results page via their own redirector service. In order to get the full final URL we have to follow these redirects. In this post we are going to walk through how to scrape the Baidu search results page.

Imports & Class Definition

In order to scrape Baidu, we only need to import two libraries outside of the standard library. Bs4 helps us parse HTML, while requests provides us with a nicer interface for making HTTP requests with Python.

As we are going to scrape multiple pages of Baidu in this tutorial and for this purpose we are going to initialise a class to hold onto the important information for us.

Python
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
from bs4 import BeautifulSoup
from time import sleep
import logging
import requests
 
 
CHROME_DEFAULT = 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/63.0.3239.132 Safari/537.36'
 
 
class BaiduBot:
 
    def __init__(self, search_term, pages, proxy=None, timeout=30,
                 user_agent=CHROME_DEFAULT, delay=0):
 
        self.base_url = 'http://www.baidu.com/s?wd={}&pn={}'
        self.proxy = proxy
        self.search_term = search_term.rstrip(' ')
        self.page_count = pages
        self.timeout = timeout
        self.user_agent = user_agent
        self.delay = delay

We initialise a new class of the BaiduBot, with a search term and the number of pages to scrape. We also give ourselves the ability to pass a number of keyword arguments to our class. This allows us to pass a proxy, a custom connection timeout, custom user agent and an optional delay between each of the results page we want to scrape. The keyword arguments may be of a lot of help, if we end up being block by Baidu.  When initialising the class we also store our base URL, which we use when scraping the subsequent pages.

Making Requests & Parsing HTML

Python
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
    def baidu_request(self, url):
        try:
            res = requests.get(url, timeout=30, proxies={'https': self.proxy, 'http': self.proxy},
                               headers={'User-Agent': self.user_agent})
            res.raise_for_status()
        except requests.HTTPError as e:
            logging.warning('Baidu search page return non-200 status code')
            raise e
        except requests.RequestException as e:
            logging.warning('Issue retrieving Baidu results page')
            raise e
        except ConnectionError as e:
            raise e
        else:
            return res

We first define a function to scrape a page of Baidu, here we simply try to make a request and check that the response has a 200 Status. Should Baidu start serving us with non-200 status codes, this likely means that they have detected unusual behaviour from our IP and we should probably back off for a while. If there is no issue with the request, we simply return the response object.

1
2
3
4
5
6
7
8
9
10
11
12
    def parse_html(self, html):
        soup = BeautifulSoup(html, 'html.parser')
        result_containers = soup.find_all('div', {'class': 'c-container'})
        results = []
        for result in result_containers:
            title = result.find('h3', {'class': 't'}).get_text()
            url = result.find('a', href=True)['href']
            description = result.find('div', {'class':'c-abstract'})
            if description:
                description = description.get_text()
            results.append({'title': title, 'url': url, 'description': description})
        return results

Now that we have a way to make HTML requests, we need to write a method for parsing the results page. Our parser is going to take in the HTML and return us with a list of dictionary objects. Each result is handily contained within a ‘div’ called ‘c-container’. This makes it very easy for us to pick out each result. We can then iterate across all of our returned results, using relatively simply BeautifulSoup selectors. Before appending the result to our results list.

Getting the Underlying URL

As previously mentioned the full underlying URL is not displayed anywhere in Baidu’s search results. This means we must write a couple of functions to extract the full underlying URL. There may be another way to get this URL, but I’m not aware of it. If you know how, please share the method with me in the comments.

Python
1
2
3
4
5
6
7
8
9
10
    def __resolve_urls(self, url):
        try:
            final_url = requests.get(url, proxies={'http': self.proxy, 'https': self.proxy},
                                     headers={'User-Agent': self.user_agent}, timeout=self.timeout).url
        except requests.RequestException:
            return url
        except ConnectionError:
            return url
        else:
            return final_url

Our resolve_urls function is very similar to our Baidu request function. Instead of a response object we are returning the final URL by simply following the chain of redirects. Should we encounter any sort of error we are simply returning the original URL, as found within the search results. But this issue is relatively rare, so it shouldn’t impact our data too much.

Python
1
2
3
4
5
6
7
    def resolve_baidu_links(self, results):
        count = 1
        for i in results:
            i['url'] = self.__resolve_urls(i['url'])
            i['rank'] = count
            count += 1
        return results

The we write another function that allows us to use our resolve_urls function over a set of results, updating the URL within our dictionary with the real underlying URL and the rank of the URL in question.

Bringing It All Together

We bring this altogether in our scrape_baidu function. We range over our page count variable. For each loop we run through we multiple by our variable by 10, to get the correct pn variable. The pn variable represents the result index, so our logic ensures we start at 0 and continue on in 10 result increments. We then format our URL using both our search term and this variable. We then simply make the request and parse the page using the functions we have already written. Before appending the results to our final results variable. Should we have passed a delay argument, we will also sleep for a while before scraping the next page. This will help us avoided getting banned should we want to scrape multiple pages and search terms.

Full Code

Python
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
from bs4 import BeautifulSoup
from time import sleep
import logging
import requests
 
 
CHROME_DEFAULT = 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/63.0.3239.132 Safari/537.36'
 
 
class BaiduBot:
 
    def __init__(self, search_term, pages, proxy=None, timeout=30,
                 user_agent=CHROME_DEFAULT, delay=0):
 
        self.base_url = 'http://www.baidu.com/s?wd={}&pn={}'
        self.proxy = proxy
        self.search_term = search_term.rstrip(' ')
        self.page_count = pages
        self.timeout = timeout
        self.user_agent = user_agent
        self.delay = delay
 
    def baidu_request(self, url):
        try:
            res = requests.get(url, timeout=30, proxies={'https': self.proxy, 'http': self.proxy},
                               headers={'User-Agent': self.user_agent})
            res.raise_for_status()
        except requests.HTTPError as e:
            logging.warning('Baidu search page return non-200 status code')
            raise e
        except requests.RequestException as e:
            logging.warning('Issue retrieving Baidu results page')
            raise e
        except ConnectionError as e:
            raise e
        else:
            return res
 
    def parse_html(self, html):
        soup = BeautifulSoup(html, 'html.parser')
        result_containers = soup.find_all('div', {'class': 'c-container'})
        results = []
        for result in result_containers:
            title = result.find('h3', {'class': 't'}).get_text()
            url = result.find('a', href=True)['href']
            description = result.find('div', {'class':'c-abstract'})
            if description:
                description = description.get_text()
            results.append({'title': title, 'url': url, 'description': description})
        return results
 
    def __resolve_urls(self, url):
        try:
            final_url = requests.get(url, proxies={'http': self.proxy, 'https': self.proxy},
                                     headers={'User-Agent': self.user_agent}, timeout=self.timeout).url
        except requests.RequestException:
            return url
        except ConnectionError:
            return url
        else:
            return final_url
 
    def resolve_baidu_links(self, results):
        count = 1
        for i in results:
            i['url'] = self.__resolve_urls(i['url'])
            i['rank'] = count
            count += 1
        return results
 
    def scrape_baidu(self):
        results = []
        for i in range(self.page_count):
            pn = i * 10
            html = self.baidu_request(self.base_url.format(self.search_term.replace(' ', '%20'), pn))
            scrape_results = self.parse_html(html.text)
            for res in scrape_results:
                results.append(res)
            sleep(self.delay)
        return {'results': self.resolve_baidu_links(results)}
 
 
if __name__ == '__main__':
    s = BaiduBot('edmund martin', 2)
    res = s.scrape_baidu()
    for i in res.get('results'):
        print(i)