Thanks to @VladimirPetrosyan for the pointer. Here is how I implemented it:
The route has this:
trait MyService extends HttpService ... with CustomMarshallers {
override def routeSettings = implicitly[RoutingSettings]
...
get {
respondWithMediaType(`text/csv`) {
path("somepath" / CsvObjectIdSegment) {
filterInstanceId => // just an ObjectId
val tempResultsFile = CsvExporter.saveCsvResultsToTempFile(filterInstanceId)
respondWithLastModifiedHeader(tempResultsFile.lastModified) {
encodeResponse(Gzip) {
complete(tempResultsFile)
}
}
}
}
and the trait that I mix in that does the unmarshalling producing chunked response:
import akka.actor._
import spray.httpx.marshalling.{MarshallingContext, Marshaller}
import spray.http.{MessageChunk, ChunkedMessageEnd, HttpEntity, ContentType}
import spray.can.Http
import spray.http.MediaTypes._
import scala.Some
import java.io.{RandomAccessFile, File}
import spray.routing.directives.FileAndResourceDirectives
import spray.routing.RoutingSettings
import math._
trait CustomMarshallers extends FileAndResourceDirectives {
implicit def actorRefFactory: ActorRefFactory
implicit def routeSettings: RoutingSettings
implicit val CsvMarshaller =
Marshaller.of[File](`text/csv`) {
(file: File, contentType: ContentType, ctx: MarshallingContext) =>
actorRefFactory.actorOf {
Props {
new Actor with ActorLogging {
val defaultChunkSize = min(routeSettings.fileChunkingChunkSize, routeSettings.fileChunkingThresholdSize).toInt
private def getNumberOfChunks(file: File): Int = {
val randomAccessFile = new RandomAccessFile(file, "r")
try {
ceil(randomAccessFile.length.toDouble / defaultChunkSize).toInt
} finally {
randomAccessFile.close
}
}
private def readChunk(file: File, chunkIndex: Int): String = {
val randomAccessFile = new RandomAccessFile(file, "r")
val byteBuffer = new Array[Byte](defaultChunkSize)
try {
val seek = chunkIndex * defaultChunkSize
randomAccessFile.seek(seek)
val nread = randomAccessFile.read(byteBuffer)
if(nread == -1) ""
else if(nread < byteBuffer.size) new String(byteBuffer.take(nread))
else new String(byteBuffer)
} finally {
randomAccessFile.close
}
}
val chunkNum = getNumberOfChunks(file)
val responder: ActorRef = ctx.startChunkedMessage(HttpEntity(contentType, ""), Some(Ok(0)))(self)
sealed case class Ok(seq: Int)
def stop() = {
log.debug("Stopped CSV download handler actor.")
responder ! ChunkedMessageEnd
file.delete()
context.stop(self)
}
def sendCSV(seq: Int) =
if (seq < chunkNum)
responder ! MessageChunk(readChunk(file, seq)).withAck(Ok(seq + 1))
else
stop()
def receive = {
case Ok(seq) =>
sendCSV(seq)
case ev: Http.ConnectionClosed =>
log.debug("Stopping response streaming due to {}", ev)
}
}
}
}
}
}
The temp file is created and then actor starts streaming chunks. It sends a chunk whenever response from client is received. Whenever client disconnects temp file is deleted and actor is shut down.
This requires you to run your app in spray-can and I think will not work if you run it in container.
Some useful links:
example1, example2, docs
与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…