Common-Crawl (CC) is an Awesome free and open source collection of crawled data from the world wide web spanning back many years. Common-Crawl does what Google and Bing does but allows for anyone to access their information and analysis the information for free and to use the data commercially for free. The data sets recorded are now tipping many petabytes and are stored on AWS S3 for free courteous of Amazon.
Today we will be investing this available information by using Python and a couple plugins to analyze the stored raw HTML code.
The data stored is in a compressed format due to many pure HTML pages, hence finding specific pages can be difficult. Common-Crawl have provided a useful API (http://index.commoncrawl.org/) which Apps may access and use to find all pages with a specific domain name. When visiting the API page, you may notice how there is a long list of entries which start with “/CC-MAIN-…”. This long list is the snapshots or datasets that Common-Crawl find each month, this means that you may actually go back many months or years to find and extract information. For this example, we will use the latest dataset CC-MAIN-2017-39-index.
Below is the API call we will use in our application. The first %s is the data set number so eg. “2017-39” and the second is the %s is the desired domain to search.
http://index.commoncrawl.org/CC-MAIN-%s-index?url=%s&matchType=domain&output=json
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 |
### ----------------------- ### Searches the Common Crawl Index for a domain. ### ----------------------- def search_domain(domain): record_list = [] print "[*] Trying target domain: %s" % domain for index in index_list: print "[*] Trying index %s" % index cc_url = "http://index.commoncrawl.org/CC-MAIN-%s-index?" % index cc_url += "url=%s&matchType=domain&output=json" % domain response = requests.get(cc_url) if response.status_code == 200: records = response.content.splitlines() for record in records: record_list.append(json.loads(record)) print "[*] Added %d results." % len(records) print "[*] Found a total of %d hits." % len(record_list) return record_list |
The python code accepts a domain name to search and returns all the URLs which belong to the domain name from the common crawl dataset. Once the URLs have been downloaded and stored in a list, we can move on to downloading the compressed page and performing the analysis.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 |
# # Downloads full page # def download_page(record): offset, length = int(record['offset']), int(record['length']) offset_end = offset + length - 1 # We'll get the file via HTTPS so we don't need to worry about S3 credentials # Getting the file on S3 is equivalent however - you can request a Range prefix = 'https://commoncrawl.s3.amazonaws.com/' # We can then use the Range header to ask for just this set of bytes resp = requests.get(prefix + record['filename'], headers={'Range': 'bytes={}-{}'.format(offset, offset_end)}) # The page is stored compressed (gzip) to save space # We can extract it using the GZIP library raw_data = StringIO.StringIO(resp.content) f = gzip.GzipFile(fileobj=raw_data) # What we have now is just the WARC response, formatted: data = f.read() response = "" if len(data): try: warc, header, response = data.strip().split('\r\n\r\n', 2) except: pass return response |
The python code above uses the inbuilt python library “requests” to download a compressed page from the dataset saved on Amazons AWS S3. The downloaded page is then extracted using the “Gzip” library and the raw HTML data is returned as the response. Once the page is downloaded we can use our custom function with the help of the python plugin “Beautifulsoup” to find specific data residing in the HTML code.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 |
# # Extract Product from the single HTML page. # def extract_product(html_content, url): #String Buffer string_buffer = "" errs = list() #Read page and read to extract product infomation parser = BeautifulSoup(html_content, "html.parser") #Check if the page is a product, if not skip page. truth, asin = check_page(parser) if not truth: errs.append("Not product") return (False, errs) #New Product as a object product = Product() #New Keyword rank keyword = Rake(SmartStopList.words()) #Find URL product.SetUrl(url) #Find Brand: Note: Some products have an image for the brand truth, string_buffer = search_table(parser, {"id": "productDetails_techSpec_section_1"}, "Brand Name") if truth: product.SetBrand(string_buffer) else: string_buffer = parser.find("a", attrs={"id": "brand"}) if string_buffer != None: product.SetBrand(string_buffer.get_text().strip()) else: errs.append("Could not find Brand") #Find Title string_buffer = parser.find("span", attrs={"id": "productTitle"}) if string_buffer != None: product.SetTitle(string_buffer.get_text().strip()) else: errs.append("Could not find Title") return (False, errs) #Find Image string_buffer = parser.find("img", attrs={"id": "landingImage"}) if string_buffer != None: string_buffer = string_buffer.get("data-old-hires") if len(string_buffer) < 2: string_buffer = parser.find("img", attrs={"id": "landingImage"}).get("data-a-dynamic-image") m = re.search('https://(.+?).jpg', string_buffer) if m: string_buffer = m.group(1) string_buffer = "https://{}.jpg".format(string_buffer) #print ("Img Url: "+string_buffer) product.SetImage(string_buffer) else: errs.append("Could not find Image") #Find Small Blob #TODO: Need to perform keyword analysis string_buffer = parser.find("div", attrs={"id": "feature-bullets"}) if string_buffer != None: string_buffer = string_buffer.find("ul") try: string_buffer = string_buffer.find_all("li") if string_buffer != None: string_buffer_2 = "" for span in string_buffer: string_buffer_3 = span.find("span") if string_buffer_3 != None: string_buffer_3 = string_buffer_3.get_text() try: string_buffer_2 = "{} {}".format(string_buffer_2, string_buffer_3.strip()) except: pass saved_buffer = string_buffer_2.strip() #Calculating Key Words keywords_1 = keyword.run(saved_buffer) product.SetSmallBlog(keywords_1) except: errs.append("Error finding li") else: errs.append("Could not find small section keywords") #Find Large Blob #TODO: Need to perform keyword analysis string_buffer = parser.find("div", attrs={"id": "productDescription"}) if string_buffer != None: string_buffer = string_buffer.find("p") if string_buffer != None: string_buffer = string_buffer.get_text() saved_buffer = string_buffer.strip() #Calculating Key Words keywords_2 = keyword.run(saved_buffer) product.SetLargeBlob(keywords_2) else: errs.append("Could not find large section keywords") #Find ASIN product.SetSourceID(asin) #TODO: Perform price save! #Append the product to large list of products if product.FormCompleted(): return (product, errs) else: return (False, errs) |
Now, there is a bit to explain here.
The function above accepts two inputs, the HTML code and the URL for the page. The page first initialises the BeautifulSoup library to a variable called parser. The function will then check if the page that it is currently inspecting is definitely a products page, as the common crawl API will return pages and URLs which are mixed with menus, corporate info, deals pages, etc. This is done using the function below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 |
# # Perform Precheck to see if page is a product # def check_page(parsed): parser = parsed #First Check of ASIN found, asin = search_table(parser, {"id": "productDetails_detailBullets_sections1"}, "ASIN") if found: return (True, asin) #Second Check of ASIN check_asin_2 = parser.find("b", text="ASIN:") check_asin_3 = parser.find("b", text="ASIN: ") if check_asin_2 == None and check_asin_3 == None: print("Page is Not a Product") return (False, None) else: if check_asin_2 != None: asin = check_asin_2.findParent().text[5:] if check_asin_3 != None: asin = check_asin_3.findParent().text[5:] #TODO: Add additional checks to confirm the page is definatly a product! print("Page is a Product") return (True, asin) |
The function above using Beautifulsoup to find certain HTML elements for example divs, spans, and in this case bold statements <b> and checks the content within the bold statement. If the statement contains a string called “ASIN:” then we can be assured that there is a high chance that the page is definitely a product and the function returns the Asin and a boolean True.
If the page is recognised as a product then the extract_product function will create a Product object to store the information. The class is a good way to store and manage the information as there is a specific model and not randomly stored in a JSON format.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 |
######## Product Class ######## class Product: title = "e" brand = "e" url = "e" image_url = "e" blob_small = "Unknown" blob_large = "Unknown" source_id = "asin" source_domain = "amazon" ## Inti def __init__(self, product=None ): #Initialise Object with a Json array instead of using Setters. if product != None: self.title = product.title self.brand = product.brand self.url = product.url self.images = product.images self.blob_small = product.blob_small self.blob_large = product.blob_large self.source_id = product.source_id self.source_domain = product.source_domain print("New Product object Initialised in memory") ## Setters and Getters def SetTitle(self, title): self.title = title.strip() def SetBrand(self, brand): self.brand = brand def SetUrl(self, url): self.url = url def SetImage(self, url): if len(url) > 1: self.image_url = url def SetSmallBlog(self, blob): self.blob_small = blob def SetLargeBlob(self, blob): self.blob_large = blob def SetSourceID(self, id): #Strip removes white spaces and any other none standard chars self.source_id = id.strip() def SetSourceDomain(self, domain): self.source_domain = domain ## Support def FormCompleted(self): #TODO: Returns True if the fields have been filled in. if len(self.title) > 1 and len(self.brand) > 1 and len(self.url) > 1 and len(self.source_id) > 1 and len(self.source_domain) > 1 : return True else: return True def ReturnJson(self): #Reutnrs Object infomation in form of a Json array m = hashlib.md5() m.update(self.source_id) product = { 'uid': m.hexdigest(), #Set as main index in DynamoDB 'title': self.title, 'brand': self.brand, 'url': self.url, 'image_url': self.image_url, 'small_keywords': self.blob_small, 'large_keywords': self.blob_large, 'sid': self.source_id, 'domain': self.source_domain, 'date': strftime("%Y-%m-%d %H:%M:%S", gmtime()) } return (product) def Print(self): print("### Printing Product ###") print(self.ReturnJson()) print("### end ###") |
The product class is not too difficult as it contains setters and getters as well as specific functions which helps generate a JSON object and prints the information to the terminal.
Once the product object is created and complete with information then the object is added to a buffer which a secondary multithreaded function handles the save feature of the function to an AWS DynamoDB database. Below is the save thread class which handles the connection and processing.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 |
### ------------------------------------ ### Save Products to DynamoDB Class ### ------------------------------------ class SaveProducts: products_buffer = list() #Constructor function def __init__ (self): ### Save prodct into database self.dynamodb = boto3.resource('dynamodb') self.table = self.dynamodb.Table('productfinder_product_2') #Helper self.stopped = False ###--------------------------------------------------- ### Main handler function for the multi threading ###--------------------------------------------------- def start(self): Thread(target=self.update, args=()).start() return self ### Runs on Multi Thread def update(self): with self.table.batch_writer() as batch: #Keep Running for Thread Life while True: # keep looping infinitely until the thread is stopped if len(self.products_buffer) > 0: try: self.table.put_item(Item = self.products_buffer[0].ReturnJson()) #Save oldest product self.products_buffer.pop(0) #Remove oldest product print("[**] Successfully Uploaded Product") print("[*] Buffer Size: {}".format(len(self.products_buffer))) except: #Failed to save product into db. #TODO: Add err message print("[-] Upload Error") self.stopped = True # if the thread indicator variable is set, stop the thread # and resource camera resources if self.stopped: return def append(self, product): # Append product into buffer if product != None: self.products_buffer.append(product) print("yes") def alive(self): if len(self.products_buffer) < 1: return False else: return True def stop(self): # indicate that the thread should be stopped self.stopped = True |
Please note when creating a DynamoDB table, enter a string to the table index. In this case we use “uid”.
Finally we can wrap the whole python application with the main function
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 |
### ----------------------- ### Main Function ### ----------------------- def main(): print("Starting CommonCrawl Search") #Finds all relevant domins record_list = search_domain(domain) #Creating save object - Products are saved to Amazon DynamoDB savethread = SaveProducts().start() #Downloads page from CommconCrawl and Inspects, then Extracts infomation product_finder_1 = ProductFinder(record_list[0: int(len(record_list)/2)]).start(savethread) product_finder_2 = ProductFinder(record_list[int(len(record_list)/2): int(len(record_list))]).start(savethread) #Idle Main Thread while product_finder_1.check_status() != True and product_finder_2.check_status() != True: time.sleep(1) while savethread.alive(): time.sleep(1) #Stop Threads product_finder_1.stop() product_finder_2.stop() savethread.stop() if __name__ == '__main__': main() #Fin |
Here we see the URLs found, save thread created and started, as well as two product finder classes which handle half the URL list each. To prevent the main function finishing and closing before the multithreaded classes, we enable idlers which prevent the main function closing before the URL lists have emptied and the save product buffer is emptied as well.
Please find my GitHub page to see the full version of the code:
https://github.com/chedame/python-common-crawl-amazon-example
Thank you for the read and as all ways, Stay Awesome!
Also, note the application works best on Unix based machines, for example, Linux and Mac.
I’m currently using an Apple MacBook Pro to run this code.
Pseudocode Break Down
1. Search domain – Common-crawl API
2. Add URLs to list
3. Start saving thread
4. Slip up URLs into two threads
5. Loop through list of URLs, Download page and confirm if product
* Full HTML Archive copy from Common-crawl
6. If Page is a product, extract details and create new product Object
7. Add object to save buffer, which adds it to DynamoDB
8. Loop until list completed
Hey guys, the Github page is https://github.com/chedame/python-common-crawl-amazon-example