#Step 1: Install required libraries Ipip install mrjob Step 2: Create sample data print("Creating sample data...") sample text.*** The quick brown fox jumps over the lazy dog. The dog sleeps while the fox jumps again, MapReduce is a programming model for processing large datasets. Hadoop implements MapReduce for distributed corputing. The quick brown fox is quick and brown. Apache Hadoop is a framework that allows for the distributed processing of large data sets across clusters of computers. MapReduce programs are parallel in nature and thus are very useful for performing large-scale data analysis. The word count program is the "Hello World" of MapReduce. with open('/content/sample_data.txt', 'w') as f: f.write(sample text) print("Sample data created successfully!") Icat/content/sample data.txt #Step 3: Create the MapReduce program mapreduce code from mrjob.job import MRJob from mrjob.step import MRStep import re import sys class MRWordCount(MRJob): def configure_args(self): super (MRWordCount, self).configure_args() self.add_passthru_arg('--min-word-length', type-int, default-1, help-Minimum word length to count') def mapper(selt, line): words re.findall(r'\\b\\w+\\b', line.lower()) min length self.options.min word length for word in words: if len(word) > min_length: yield (word, 1) def combiner(self, word, counts): yield (word, sum(counts)) def reducer (self, word, counts): yield (word, sum(counts)) def steps(self): return [MRStep(mapper-self.mapper, combiner-self.combiner, reducer-self.reducer), main: if name sys.argv[arg for arg in sys.argv if not arg.startswith('-f')] MRWordCount.run() with open(/content/wordcount.py', 'w') as f: f.write(mapreduce_code) print("MapReduce program saved to /content/wordcount.py") #Step 4: Run the MapReduce job print("\nRunning MapReduce job...") Ipython/content/wordcount.py/content/sample data.txt-min-word-length 2/content/results.txt 2>&1 print("\nIRESULTS:") Icat/content/results.txt tail -20 #Step 5: Extract and analyze word counts print("\nEXTRACTED WORD COUNTS") word_counts = [ with open(/content/results.txt', 'r') as f: for line in f: if '\t' in line and not line.startswith('no config) and not line.startswith('WARNING'): parts line.strip().split('\t') if len(parts) 2 and parts[1].isdigit(): word_counts.append((parts[8], int (parts[1]))) if word counts: word_counts.sort(key-lambda x: x[1], reverse-True) print(f"('WORD":<20) ('COUNT':<10)") print("."30) for word, count in word_counts[:20]: print(f" (word: <20) (count:<10)") print(f"\nTotal unique words: (len(word_counts)) ") print (f"Total word occurrences: (sum(c for, c in word_counts))") else: print("No word counts found.") a Step 6: Analyze MapReduce phases print("VOMAPREDUCE PHASE ANALYSIS") with open('/content/results.txt', 'r') as f: log content f.read() print("\n1. MAPPER PHASE:") mapper lines [line for line in log_content.split('\n") if 'mapper' in line.lower() or 'map" in line.lower()] if mapper lines: else: for line in mapper_lines[:5]: print(f" (line.strip())") print(" Mapper phase executed (logs not shown)") print("\n2. REDUCER PHASE:") reducer lines [line for line in log content.split("\n') if 'reducer' in line.lower() or 'reduce' in line.lower()) if reducer lines: for line in reducer_lines[:5]: else: print (f" (line.strip())") print(" Reducer phase executed (logs not shown)")#Step 7: Visualization import matplotlib.pyplot as plt import pandas as pd import numpy as пр if word counts: df pd.DataFrame(word_counts, columns['word', 'count']) df df.sort_values('count', ascending False) fig, axes plt.subplots (2, 2, figsize=(15, 12)) #Top 15 bar chart top_15 df.head (15) axes[0, 0].bar(range (len(top_15)), top_15['count'], color='skyblue') axes[0, 0].set_title('Top 15 Most Frequent Words'). axes[0, 0].set_xticks (range(len(top_15))) axes[0, 0].set_xticklabels (top_15 ['word'], rotation-45, ha='right') #Pie chart top 8 top_8 df.head(8) axes[0, 1].pie(top_8['count'], labels-top_8['word'], autopct="%1.1f%%*) axes[0, 1].set_title('Top 8 Word Distribution') # Histogram of frequencies axes[1, 0].hist(df['count'], bins-20, color 'green', alpha-0.7) axes[1, 0].set_title('Distribution of Word Frequencies') axes [1, 0].set_yscale('log') #Cumulative distribution sorted_counts df['count'].values cumulative np.cumsum(sorted_counts) / np.sum(sorted_counts) axes[1, 1].plot(range (1, len(cumulative)+1), cumulative, 'r') axes[1, 1].set_title('Cumulative Word Frequency') axes[1, 1].grid(True, alpha-0.3) plt.tight_layout() plt.show() print("\nStatistics:") print(f" Total unique words: (len(df))") print(f" Total occurrences: [df['count'].sum())") print(f" Average frequency: (df['count'].mean():.2f}") print(f" Median frequency: (df['count'].median())") print (f" Words appearing once: (len(hapax)) ((len(hapax)/len (df)*100:.1f}%)") else: hapax df [df['count'] == 1] print("No data for visualization.") #Step 8: Performance testing print("\nPERFORMANCE TESTING") import time import osdef run wordcount and time(input file, min_word length-2): output file f"/content/results (os.path.basename(input file)).txt" start time.time() python/content/wordcount.py (input file)-min-word-length (min_word_length) > (output_file) 2>&1 end time.time() word counts = [] with open (output file, 'r') as fr for line in f: if it in line and not line.startswith(no config') and not line.startswith('WARNING) parts line.strip().split('\t") if len(parts) and parts[1].isdigit(): word counts.append((parts[0], int (parts[1]))) return end start, len(word counts), sum(c for in word counts) Small dataset time small, unique small, total small run wordcount and_time("/content/sample data.txt') print(f"small dataset (0.5 KB): (time small:-2F)s, (unique small) unique, (total small) total") #Medium dataset with open("/content/medium data.txt", 'w') as fr f.write(sample text 10) time medium, unique medium, total_medium run_wordcount_and time('/content/medium_data.txt') print(f"Medium dataset (5KB): (time medium: 2f)s, (unique medium) unique, (total medium) total") Large dataset with open('/content/large_data.txt', 'w') as f: f.write(sample text 100) time large, unique_large, total_large run_wordcount_and_time('/content/large data.txt") print(f"Large dataset (50 KB): (time large: 2f)s, (unique large) unique, (total large) total") Plot performance plt.figure(figsize (10,6)) sizes ['small (0.5KB)', 'Medium (5KB)", "Large (50KB)'] times (time small, time medium, time large] plt.bar(sizes, times, color-['green', 'orange', 'red']) plt.xlabel(' Dataset Size') plt.ylabel("Execution Time (seconds)') plt.title('MapReduce WordCount Performance") for i, v in enumerate(times): plt.text(i, v+0.01, f' (v:.2f)s', ha'center') plt.show()