import akka.actor.ActorSystem import akka.stream.Attributes.name import akka.stream.scaladsl._ import akka.stream.stage.{GraphStage, GraphStageLogic, OutHandler} import akka.stream.{ActorMaterializer, Attributes, Outlet, SourceShape} class BaseStreamApp extends App { implicit val system = ActorSystem("stream-test") implicit val materializer = ActorMaterializer() } object SourceSingle extends BaseStreamApp { val data = 100 // val source = Source.single(data) val source = Source.fromGraph(new _SingleSource(data)) source.runForeach(println) } object SourceApply extends BaseStreamApp { val data = 1 to 10 // val source = Source(data) val source = Source.single(data) .mapConcat(_ConstantFun.scalaIdentityFunction) .withAttributes(_DefaultAttributes.iterableSource) source.runForeach(println) } final class _SingleSource[T](val elem: T) extends GraphStage[SourceShape[T]] { override def initialAttributes: Attributes = _DefaultAttributes.singleSource //ReactiveStreamsCompliance.requireNonNullElement(elem) val out = Outlet[T]("single.out") val shape = SourceShape(out) def createLogic(attr: Attributes) = new GraphStageLogic(shape) with OutHandler { def onPull(): Unit = { push(out, elem) completeStage() } setHandler(out, this) } override def toString: String = "SingleSource" } object _DefaultAttributes { val iterableSource = name("iterableSource") val singleSource = name("singleSource") } object _ConstantFun { private val conforms = (a: Any) => a def scalaIdentityFunction[T]: T => T = conforms.asInstanceOf[Function[T, T]] }