This Python program processes Apache web server log files using Spark (RDD API) and stores structured, uniquely identified records in Elasticsearch. Here’s a detailed breakdown of what it does:
- json: For encoding structured data as JSON.
- hashlib: For generating unique document IDs using the SHA-224 hash.
- re: For regular expressions to parse log lines.
-
addId(data)
- Serializes a Python dictionary to JSON bytes.
- Computes a SHA-224 hash as a unique identifier (
doc_id) for the record. - Adds
doc_idto the dictionary and returns a tuple:(doc_id, serialized_json).
-
parse(str)
- Uses a precompiled regex to extract fields from an Apache log line:
- IP address
- Access date
- HTTP operation/method
- Request URI
- Returns a dictionary with these keys:
ip,date,operation,uri.
- Uses a precompiled regex to extract fields from an Apache log line:
- Defines a regex pattern matching the standard Apache combined log format.
- Compiles it with
re.compilefor use in parsing.
- Reads logs:
sc.textFile("/home/ubuntu/walker/apache_logs")loads the raw log file into an RDD, one string per line. - Parses logs:
rdd.map(parse)maps each line to a parsed dictionary with the required fields. - Adds IDs:
rdd2.map(addID)generates a unique hash-based document ID for each record and serializes it as JSON.
- Sets up configuration for the Elasticsearch Hadoop connector, specifying:
- Elasticsearch host and port
- Index/resource to write to
- JSON input and mapping id
- Calls
saveAsNewAPIHadoopFile()with special output classes and the configuration to store the transformed records in Elasticsearch, using the generateddoc_idas the index key.
-
Purpose: Efficiently transform and index large-scale Apache log data into Elasticsearch for querying and analysis.
-
Process:
- Parse each log line into a fielded dictionary,
- Generate a strong, unique document identifier,
- Serialize each record to JSON,
- Bulk load these records into Elasticsearch using Spark RDD’s Hadoop API.
-
Use Case: Enables scalable indexing and searching of log data for analytics, monitoring, or troubleshooting in distributed systems.
Note: The code assumes execution within a PySpark environment and uses the elasticsearch-hadoop connector for Elasticsearch integration.