-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathscraper.py
More file actions
346 lines (291 loc) · 14 KB
/
scraper.py
File metadata and controls
346 lines (291 loc) · 14 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
import asyncio
import time
import re
import os
import json
from playwright.async_api import async_playwright
class GoogleMapsScraper:
def __init__(self):
self.results = []
self.unique_urls = set()
async def scrape(self, query, max_results=None):
async with async_playwright() as p:
browser = await p.chromium.launch(headless=False)
context = await browser.new_context()
page = await context.new_page()
print(f"Navigating to Google Maps for query: {query}")
await page.goto("https://www.google.com/maps", timeout=60000)
# Handle cookie consent
try:
await page.locator("form[action*='consent'] button").first.click(timeout=3000)
except:
pass
# Search
search_box = page.locator("input#searchboxinput")
await search_box.fill(query)
await page.keyboard.press("Enter")
print("Waiting for results...")
await page.wait_for_selector("div[role='feed']", timeout=10000)
# Scroll to load more results
await self._scroll_results(page, max_results)
# Collect URLs
feed = page.locator("div[role='feed']")
items = feed.locator(" > div > div[jsaction]")
count = await items.count()
print(f"Total items found in feed: {count}")
urls_to_scrape = []
for i in range(count):
try:
# Try to find the link
link_locator = items.nth(i).locator("a").first
href = await link_locator.get_attribute("href")
if href and href not in self.unique_urls:
self.unique_urls.add(href)
urls_to_scrape.append(href)
except Exception as e:
# print(f"Error extracting link for item {i}: {e}")
pass
print(f"Found {len(urls_to_scrape)} unique URLs to scrape.")
if max_results:
urls_to_scrape = urls_to_scrape[:max_results]
# Close the search page to free resources (optional, but good practice)
await page.close()
# Scrape details in parallel
print(f"Starting parallel scraping with {min(len(urls_to_scrape), 5)} workers...")
semaphore = asyncio.Semaphore(5) # Limit concurrency to 5 tabs
tasks = []
for i, url in enumerate(urls_to_scrape):
task = asyncio.create_task(self._scrape_place(context, url, semaphore, i, len(urls_to_scrape)))
tasks.append(task)
self.results = await asyncio.gather(*tasks)
# Filter out None results (failed scrapes)
self.results = [r for r in self.results if r]
await browser.close()
return self.results
async def _scroll_results(self, page, max_results=None):
feed = page.locator("div[role='feed']")
while True:
items = feed.locator(" > div > div[jsaction]")
current_count = await items.count()
if max_results and current_count >= max_results:
print(f"Reached max results limit ({max_results}).")
break
print(f"Scrolling... Current items: {current_count}")
# Scroll to the end
await feed.evaluate("element => element.scrollTop = element.scrollHeight")
# Wait for new items to load
try:
# Wait up to 5 seconds for the count to increase
await page.wait_for_function(
f"document.querySelectorAll('div[role=\"feed\"] > div > div[jsaction]').length > {current_count}",
timeout=5000
)
# If successful, loop continues immediately to check max_results and scroll again
continue
except:
# If timeout, it means no new items appeared in 5 seconds.
print("No new items loaded within 5 seconds. Retrying...")
# Retry logic: Scroll up a bit and back down to trigger loading
await feed.evaluate("element => element.scrollTop = element.scrollHeight - 1000")
await asyncio.sleep(1)
await feed.evaluate("element => element.scrollTop = element.scrollHeight")
# Wait longer (10s)
try:
await page.wait_for_function(
f"document.querySelectorAll('div[role=\"feed\"] > div > div[jsaction]').length > {current_count}",
timeout=10000
)
except:
print("No new items after retry. Assuming end of list.")
# Check for "You've reached the end of the list" text just in case
if await page.locator("text=\"You've reached the end of the list\"").count() > 0:
print("End of list marker found.")
break
async def _scrape_place(self, context, url, semaphore, index, total):
async with semaphore:
page = await context.new_page()
try:
# print(f"Scraping ({index+1}/{total})...")
await page.goto(url, timeout=30000)
# Wait for title (Place Name)
try:
await page.wait_for_selector("h1", timeout=10000)
except:
print(f"Timeout loading details for item {index+1}")
return None
data = await self._extract_details(page)
print(f"Scraped ({index+1}/{total}): {data.get('Place Name', 'Unknown')}")
return data
except Exception as e:
print(f"Error scraping item {index+1}: {e}")
return None
finally:
await page.close()
async def _extract_details(self, page):
details = {}
# Place Name
try:
details['Place Name'] = await page.locator("h1").first.inner_text()
except:
details['Place Name'] = ""
# Address
try:
details['Address'] = await page.locator("button[data-item-id='address']").first.get_attribute("aria-label")
if details['Address']:
details['Address'] = details['Address'].replace("Address: ", "")
except:
details['Address'] = ""
# Website
try:
details['Website'] = await page.locator("a[data-item-id='authority']").first.get_attribute("href")
except:
details['Website'] = ""
# Phone Number
try:
phone_btn = page.locator("button[data-item-id*='phone:tel:']")
if await phone_btn.count() > 0:
details['Phone Number'] = await phone_btn.first.get_attribute("aria-label")
if details['Phone Number']:
details['Phone Number'] = details['Phone Number'].replace("Phone: ", "")
else:
details['Phone Number'] = ""
except:
details['Phone Number'] = ""
# Rating
try:
rating_element = page.locator("[aria-label*=' stars '], [aria-label*=' star ']").first
if await rating_element.count() > 0:
rating_text = await rating_element.get_attribute("aria-label")
match = re.search(r'([0-9.]+)', rating_text)
if match:
details['Place Star Rating'] = match.group(1)
else:
details['Place Star Rating'] = rating_text
else:
details['Place Star Rating'] = ""
except:
details['Place Star Rating'] = ""
# Reviews
try:
reviews_elements = await page.locator("[aria-label*=' reviews']").all()
found_reviews = False
for el in reviews_elements:
reviews_text = await el.get_attribute("aria-label")
match = re.search(r'^([0-9,]+)\s+reviews?', reviews_text)
if match:
details['Amount of Reviews'] = match.group(1).replace(',', '')
found_reviews = True
break
if not found_reviews:
try:
reviews_text = await page.locator("div.F7nice span").nth(1).inner_text()
details['Amount of Reviews'] = reviews_text.replace("(", "").replace(")", "")
except:
details['Amount of Reviews'] = ""
except:
details['Amount of Reviews'] = ""
# Open Hours
try:
details['Place Open Hours'] = {}
expand_hours_btn = page.locator("[aria-label='Show open hours for the week']")
if await expand_hours_btn.count() > 0:
try:
await expand_hours_btn.click()
await asyncio.sleep(1)
hours_table = page.locator("table").first
if await hours_table.count() > 0:
rows = await hours_table.locator("tr").all()
hours_data = {}
for row in rows:
day_cell = row.locator("td").first
hours_cell = row.locator("td").nth(1)
if await day_cell.count() > 0 and await hours_cell.count() > 0:
day = (await day_cell.inner_text()).strip()
hours = await hours_cell.get_attribute("aria-label")
if not hours:
hours = (await hours_cell.inner_text()).strip()
hours = hours.replace("\u202f", " ")
if day:
hours_data[day] = hours
if hours_data:
details['Place Open Hours'] = hours_data
except:
pass
if not details['Place Open Hours']:
open_hours_button = page.locator("button[data-item-id='oh']")
if await open_hours_button.count() > 0:
val = await open_hours_button.get_attribute("aria-label")
if val:
details['Place Open Hours'] = val.replace("Open hours: ", "").strip()
else:
details['Place Open Hours'] = await open_hours_button.inner_text()
else:
hours_div = page.locator("div[aria-label*='Open'], div[aria-label*='Closed']").first
if await hours_div.count() > 0:
val = await hours_div.get_attribute("aria-label")
if val and ("Closes" in val or "Opens" in val or "24 hours" in val):
details['Place Open Hours'] = val
except:
details['Place Open Hours'] = ""
# Popular Times
try:
popular_times_group = page.locator("div[aria-label*='Popular times']")
if await popular_times_group.count() > 0:
bars = await popular_times_group.locator("div[aria-label*='busy at'], div[aria-label*='Busy at']").all()
if bars:
times_data = []
for bar in bars:
label = await bar.get_attribute("aria-label")
if label:
times_data.append(label)
details['Place Popular Times'] = times_data
else:
details['Place Popular Times'] = [await popular_times_group.first.get_attribute("aria-label")]
else:
details['Place Popular Times'] = []
except:
details['Place Popular Times'] = []
# Claim this business
try:
claim_btn = page.locator("button[aria-label*='Claim this business']")
if await claim_btn.count() > 0:
details['Is Claimed'] = "No"
else:
details['Is Claimed'] = "Yes"
except:
details['Is Claimed'] = "Unknown"
return details
def save_to_json(self, query):
import json
timestamp = int(time.time())
# Create directory structure: scrapings/query_name/
base_folder = "scrapings"
query_folder = query.replace(' ', '_')
output_dir = os.path.join(base_folder, query_folder)
if not os.path.exists(output_dir):
os.makedirs(output_dir)
filename = f"{query_folder}_{timestamp}.json"
file_path = os.path.join(output_dir, filename)
try:
with open(file_path, 'w', encoding='utf-8') as f:
json.dump(self.results, f, ensure_ascii=False, indent=4)
print(f"Saved results to {file_path}")
except Exception as e:
print(f"Error saving to JSON: {e}")
# Try fallback name in current directory if folder creation fails
filename = f"results_{timestamp}.json"
with open(filename, 'w', encoding='utf-8') as f:
json.dump(self.results, f, ensure_ascii=False, indent=4)
print(f"Saved results to {filename}")
if __name__ == "__main__":
import sys
scraper = GoogleMapsScraper()
if len(sys.argv) > 1:
query = sys.argv[1]
else:
query = input("Enter search query (e.g., 'restaurants near arugambay'): ")
if not query:
query = "restaurants near arugambay"
# Run the async scrape method
asyncio.run(scraper.scrape(query, max_results=None))
scraper.save_to_json(query)