-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathzad20.py
More file actions
99 lines (76 loc) · 3.44 KB
/
zad20.py
File metadata and controls
99 lines (76 loc) · 3.44 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
import sys
import os
import re
import matplotlib.pyplot as plt
from wordcloud import WordCloud
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, lower, regexp_replace, split
from pyspark.ml.feature import StopWordsRemover, CountVectorizer, IDF
python_path = r"C:\Users\woks0\AppData\Local\Programs\Python\Python311\python.exe"
if os.path.exists(python_path):
os.environ['PYSPARK_PYTHON'] = python_path
os.environ['PYSPARK_DRIVER_PYTHON'] = python_path
spark = SparkSession.builder.appName("ShakespeareAnalysis").getOrCreate()
file_path = "Szekspir/hamlet.txt"
full_text = ""
try:
with open(file_path, 'r', encoding='utf-8') as f:
full_text = f.read()
except Exception as e:
print(f"Error: {e}")
full_text = ""
df_full = spark.createDataFrame([(full_text,)], ["text"])
df_clean = df_full.withColumn("words_array", split(lower(regexp_replace(col("text"), "[^a-zA-Z\\s]", "")), "\\s+"))
remover = StopWordsRemover(inputCol="words_array", outputCol="words_no_stop")
df_no_stop = remover.transform(df_clean)
df_final = df_no_stop.selectExpr("text", "filter(words_no_stop, x -> length(x) > 2) as clean_words")
cv_full_text = CountVectorizer(inputCol="clean_words", outputCol="raw_features", vocabSize=1000, minDF=1.0)
cv_model = cv_full_text.fit(df_final)
featurized_data = cv_model.transform(df_final)
row = featurized_data.head()
vector = row['raw_features']
vocabulary = cv_model.vocabulary
words_frequency = {}
for idx, score in zip(vector.indices, vector.values):
word = vocabulary[idx]
words_frequency[word] = score
wordcloud = WordCloud(width=800, height=400, background_color='white').generate_from_frequencies(words_frequency)
plt.figure(figsize=(10, 5))
plt.imshow(wordcloud, interpolation='bilinear')
plt.axis("off")
plt.title("Full Text")
plt.show()
chapters_data = []
parts = re.split(r'(ACT\s+\d+)', full_text)
for i in range(1, len(parts), 2):
if i + 1 < len(parts):
chapter_title = parts[i].strip()
chapter_content = parts[i+1].strip()
chapters_data.append((chapter_title, chapter_content))
df_chapters = spark.createDataFrame(chapters_data, ["title", "text"])
df_chap_clean = df_chapters.withColumn("words_array", split(lower(regexp_replace(col("text"), "[^a-zA-Z\\s]", "")), "\\s+"))
df_chap_no_stop = remover.transform(df_chap_clean)
df_chap_final = df_chap_no_stop.selectExpr("title", "filter(words_no_stop, x -> length(x) > 2) as clean_words")
cv = CountVectorizer(inputCol="clean_words", outputCol="raw_features", vocabSize=1000, minDF=1.0)
cv_model_chap = cv.fit(df_chap_final)
featurized_data_chap = cv_model_chap.transform(df_chap_final)
idf = IDF(inputCol="raw_features", outputCol="features")
idf_model = idf.fit(featurized_data_chap)
rescaled_data = idf_model.transform(featurized_data_chap)
vocabulary_chap = cv_model_chap.vocabulary
rows = rescaled_data.select("title", "features").collect()
for row in rows:
title = row['title']
vector = row['features']
tfidf_dict = {}
for idx, score in zip(vector.indices, vector.values):
word = vocabulary_chap[idx]
tfidf_dict[word] = score
if tfidf_dict:
wordcloud = WordCloud(width=800, height=400, background_color='white').generate_from_frequencies(tfidf_dict)
plt.figure(figsize=(10, 5))
plt.imshow(wordcloud, interpolation='bilinear')
plt.axis("off")
plt.title(f"{title}")
plt.show()
spark.stop()