import apache_beam as beam import random import numpy as np import typing class GenerateSamples(beam.DoFn): def __init__(self, maximum_radius, dimensions): self.radius = maximum_radius self.dimensions = dimensions def process(self, elm): random.seed(elm) yield (random.random() * self.radius for _ in range(self.dimensions)) def individual_circle_montecarlo(samples, radius=1, dimensions=2): with beam.Pipeline() as p: result = ( p | beam.Create(list(range(samples))) # Generate random points in an n-dimensional space. | beam.ParDo(GenerateSamples(radius, dimensions)) # Verify if the points are within the circle. | beam.Map( lambda x: 1 if sum(dim * dim for dim in x) <= radius * radius else 0 ) | beam.CombineGlobally(sum) ) result | "final_ratio" >> beam.Map( lambda tot: print( "pablito", tot, samples, radius * radius * 4 * tot / samples ) ) class BatchedGenerateSamples(beam.DoFn): """Generate random points in an n-dimensional space.""" def __init__(self, dimensions, radius): self.dimensions = dimensions self.radius = radius def process_batch(self, seeds: np.ndarray) -> typing.Iterator[np.ndarray]: radiuses = np.random.rand(*seeds.shape, self.dimensions) * self.radius yield radiuses * radiuses def infer_output_type(self, input_element_type): return np.int64 class BatchedSumAndCheck(beam.DoFn): """Verify if the points are within the circle. This is a DoFn that consumes batches, but yields individual elements. """ def __init__(self, radius): self.radius = radius @beam.DoFn.yields_elements def process_batch(self, radiuses: np.ndarray) -> typing.Iterator[int]: sums = radiuses.sum(axis=1) in_or_out = sum(sums < self.radius * self.radius) yield in_or_out def batched_circle_montecarlo(samples: int, radius=1, dimensions=2): with beam.Pipeline() as p: result = ( p | beam.Create(list(range(samples))).with_output_types(np.int64) | beam.ParDo(BatchedGenerateSamples(2, 1)) | beam.ParDo(BatchedSumAndCheck(1)) | beam.CombineGlobally(sum) ) result | "final_ratio" >> beam.Map( lambda tot: print( "pablito", tot, samples, radius * radius * 4 * tot / samples ) ) if __name__ == "__main__": import time samples = 800000 s1 = time.time() individual_circle_montecarlo(samples) print("took ", time.time() - s1, "seconds individually") s2 = time.time() batched_circle_montecarlo(samples) print("took ", time.time() - s2, "seconds batchedly")