-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathzad19.py
More file actions
156 lines (119 loc) · 6.04 KB
/
zad19.py
File metadata and controls
156 lines (119 loc) · 6.04 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, explode, lower, split, regexp_replace, length, udf, array_contains
from pyspark.sql.types import ArrayType, StringType, MapType, FloatType
from pyspark.ml.feature import StopWordsRemover, CountVectorizer, IDF
import matplotlib.pyplot as plt
from wordcloud import WordCloud
import urllib.request
# Inicjalizacja sesji Spark
spark = SparkSession.builder \
.appName("ShakespeareWordCloud") \
.master("local[*]") \
.getOrCreate()
spark.sparkContext.setLogLevel("ERROR")
# ==========================================
# PRZYGOTOWANIE DANYCH (Symulacja pliku z wykładu)
# ==========================================
print("--- Pobieranie danych przykładowych ---")
def get_text_from_url(url):
try:
with urllib.request.urlopen(url) as f:
return f.read().decode('utf-8')
except:
return ""
# Pobieramy 3 dramaty jako osobne teksty
hamlet = get_text_from_url("https://www.gutenberg.org/files/1524/1524-0.txt")
macbeth = get_text_from_url("https://www.gutenberg.org/cache/epub/1533/pg1533.txt")
romeo = get_text_from_url("https://www.gutenberg.org/cache/epub/1513/pg1513.txt")
# Tworzymy DataFrame: [(Tytuł, Treść), ...]
data = [("Hamlet", hamlet), ("Macbeth", macbeth), ("Romeo and Juliet", romeo)]
df_raw = spark.createDataFrame(data, ["title", "text"])
# Filtrujemy puste wiersze (na wypadek problemów z pobieraniem)
df_raw = df_raw.filter(length(col("text")) > 100)
print(f"Załadowano dramaty: {[row.title for row in df_raw.select('title').collect()]}")
# ==========================================
# ZADANIE 19: Word Cloud dla jednego dramatu (częstość)
# ==========================================
print("\n--- ZADANIE 19: Przetwarzanie 'Hamleta' ---")
# 1. Wybór jednego dramatu
df_hamlet = df_raw.filter(col("title") == "Hamlet")
# 2. Oczyszczanie, Tokenizacja, Stop Words
# a. Usunięcie interpunkcji i zamiana na małe litery
df_clean = df_hamlet.withColumn("clean_text", lower(regexp_replace(col("text"), "[^a-zA-Z\\s]", "")))
# b. Podział na słowa (split)
df_words = df_clean.withColumn("words_array", split(col("clean_text"), "\\s+"))
# c. Usunięcie Stop Words (korzystamy z wbudowanej listy Sparka dla jęz. angielskiego)
remover = StopWordsRemover(inputCol="words_array", outputCol="filtered_words")
df_filtered = remover.transform(df_words)
# d. Rozbicie tablicy na wiersze (explode) i filtrowanie długości <= 2
# Dodajemy też filtr na puste stringi
df_exploded = df_filtered.select(explode(col("filtered_words")).alias("word")) \
.filter(length(col("word")) > 2)
# 3. Zliczanie wystąpień
word_counts = df_exploded.groupBy("word").count().orderBy(col("count").desc())
# Pobranie top 200 słów do lokalnego słownika w celu narysowania chmury
top_words_dict = {row['word']: row['count'] for row in word_counts.take(200)}
# Generowanie obrazka
wc = WordCloud(width=800, height=400, background_color='white').generate_from_frequencies(top_words_dict)
plt.figure(figsize=(10, 5))
plt.imshow(wc, interpolation='bilinear')
plt.axis("off")
plt.title("Word Cloud - Hamlet (Frequency Based)")
plt.show()
print("Wyświetlono Word Cloud dla Hamleta.")
# ==========================================
# ZADANIE 20: TF.IDF dla wszystkich dramatów
# ==========================================
print("\n--- ZADANIE 20: TF-IDF dla wszystkich dramatów ---")
# 1. Przygotowanie wszystkich tekstów (to samo czyszczenie co wyżej, ale dla całego df_raw)
df_all_clean = df_raw.withColumn("clean_text", lower(regexp_replace(col("text"), "[^a-zA-Z\\s]", ""))) \
.withColumn("words_array", split(col("clean_text"), "\\s+"))
# Usuwamy stop words
remover_all = StopWordsRemover(inputCol="words_array", outputCol="filtered_words")
df_all_filtered = remover_all.transform(df_all_clean)
# Usuwamy słowa <= 2 znaki wewnątrz tablicy (używając UDF lub filtra SQL, tu podejście SQL-like)
# Spark 2.4+ ma funkcję filter dla tablic, ale dla kompatybilności zrobimy to Python UDF-em
filter_length_udf = udf(lambda words: [w for w in words if len(w) > 2], ArrayType(StringType()))
df_final_tokens = df_all_filtered.withColumn("tokens", filter_length_udf(col("filtered_words")))
# 2. Obliczanie TF (Term Frequency) - CountVectorizer
# vocabSize ustalamy np. na 1000 najczęstszych słów, minDF=1
cv = CountVectorizer(inputCol="tokens", outputCol="raw_features", vocabSize=1000, minDF=1.0)
cv_model = cv.fit(df_final_tokens)
df_tf = cv_model.transform(df_final_tokens)
# 3. Obliczanie IDF (Inverse Document Frequency)
idf = IDF(inputCol="raw_features", outputCol="features")
idf_model = idf.fit(df_tf)
df_tfidf = idf_model.transform(df_tf)
# 4. Wyciąganie słów i ich wag TF-IDF
# Spark zwraca wektor rzadki (SparseVector). Musimy zmapować indeksy z powrotem na słowa.
vocabulary = cv_model.vocabulary
# Funkcja pomocnicza do zamiany wektora na słownik {słowo: waga}
def extract_top_tfidf(vector):
# vector.indices - indeksy słów w słowniku
# vector.values - wartości TF-IDF
word_score_map = {}
for idx, score in zip(vector.indices, vector.values):
word_score_map[vocabulary[idx]] = float(score)
return word_score_map
# Rejestracja UDF
extract_tfidf_udf = udf(extract_top_tfidf, MapType(StringType(), FloatType()))
# Dodajemy kolumnę z mapą słów i ich wag
df_results = df_tfidf.select("title", extract_tfidf_udf(col("features")).alias("tfidf_scores"))
# 5. Generowanie chmur wyrazów dla każdego dramatu
results_local = df_results.collect()
for row in results_local:
title = row['title']
tfidf_scores = row['tfidf_scores']
# Sortujemy i bierzemy top N dla czytelności chmury
if not tfidf_scores:
print(f"Brak danych dla {title}")
continue
print(f"Generowanie TF-IDF Word Cloud dla: {title}")
# WordCloud przyjmuje słownik {word: score}
wc = WordCloud(width=800, height=400, background_color='white').generate_from_frequencies(tfidf_scores)
plt.figure(figsize=(10, 5))
plt.imshow(wc, interpolation='bilinear')
plt.axis("off")
plt.title(f"TF-IDF Word Cloud - {title}")
plt.show()
spark.stop()