Commit dfb5785d authored by Szabolcs Gyurko's avatar Szabolcs Gyurko
Browse files

First full featured version

parent 8bbd5a07
package actors
import java.sql.Timestamp
import javax.inject.{Inject, Singleton}
import akka.actor.UntypedActor
import entities.{ChunkRecord, ChunksTable, JobsTable, QueueTable}
import play.api.Logger
import play.api.db.slick.DatabaseConfigProvider
import sget2.SGet2
import sget2.{Defaults, SGet2}
import slick.driver.JdbcProfile
import scala.concurrent.Await
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.Future
import scala.concurrent.duration.Duration
/**
......@@ -23,6 +24,8 @@ class DataPersister @Inject() (private val dbConfigProvider: DatabaseConfigProvi
private val jobs = TableQuery[JobsTable]
private val chunks = TableQuery[ChunksTable]
private val queue = TableQuery[QueueTable]
@volatile
private var saveRequested: Boolean = false
@volatile
private var lastSave: Long = 0
......@@ -32,14 +35,48 @@ class DataPersister @Inject() (private val dbConfigProvider: DatabaseConfigProvi
message match {
case p: PersistEvent
val currentTime = System.currentTimeMillis()
if (currentTime > lastSave + 2000 || p.forceSave) {
persist(p)
lastSave = currentTime
if (p.forceSave) {
this.synchronized {
Logger.trace("persist() - forced")
persist(p)
lastSave = currentTime
}
} else {
if (currentTime > lastSave + Defaults.PERSIST_FREQUENCY) {
if (saveRequested) {
Logger.trace("timeout expired but persist() is already scheduled, deferring to that")
} else {
this.synchronized {
Logger.trace("persist() with expired timeout and no scheduled persist()")
persist(p)
lastSave = currentTime
}
}
} else {
if (saveRequested) {
Logger.trace("persist() is already scheduled, deferring to that")
} else {
Logger.trace(s"scheduling a persist in ${Defaults.PERSIST_FREQUENCY - (currentTime - lastSave)}ms")
saveRequested = true
akka.pattern.after(Duration(Defaults.PERSIST_FREQUENCY - (currentTime - lastSave), "ms"), sget2.actorSystem.scheduler)(Future.successful(p)).onSuccess {
case e: PersistEvent
Logger.trace("persist() by schedule")
this.synchronized {
persist(e)
lastSave = System.currentTimeMillis()
saveRequested = false
}
case _
Logger.trace("God knows what this rubbish is")
}
}
}
}
case _
}
}
/* Persists the queue data to DB asynchronously */
private def persist(event: PersistEvent): Unit = {
/* Iterate through the download queue */
for (d sget2.downloadQueue.persist().jobList) {
......@@ -48,19 +85,20 @@ class DataPersister @Inject() (private val dbConfigProvider: DatabaseConfigProvi
qQuery.statements.foreach(s Logger.trace(s"SQL: $s"))
/* Update jobs table */
val jQuery = jobs.filter(_.id === d.id).map(x (x.url, x.path, x.bandwidth, x.threads, x.user, x.pass, x.size, x.blocksize)).
update(d.url, d.path, Some(d.bandwidthLimit), Some(d.activeThreads), d.username, d.password, Some(d.size), Some(d.chunkMatrix.blockSize.toInt))
val jQuery = jobs.filter(_.id === d.id).map(x (x.url, x.path, x.bandwidth, x.threads, x.user, x.pass, x.size, x.blocksize, x.finished)).
update(d.url, d.path, Some(d.bandwidthLimit), Some(d.activeThreads), d.username, d.password, Some(d.size), Some(d.chunkMatrix.blockSize.toInt), Some(new Timestamp(d.finishTime)))
jQuery.statements.foreach(s Logger.trace(s"SQL: $s"))
/* Update chunks table (delete + insert the easiest here) */
val cQuery = (
for {
dcQuery chunks.filter(_.jobId === d.id).delete
cQueries DBIO.sequence(d.chunkMatrix.chunks.zipWithIndex.toSeq.map(c => chunks += ChunkRecord(d.id, c._1.status, c._1.size, c._2 * d.chunkMatrix.blockSize)))
cQueries DBIO.sequence(d.chunkMatrix.chunks.zipWithIndex.toSeq.map(c chunks += ChunkRecord(d.id, c._1.status, c._1.size, c._2 * d.chunkMatrix.blockSize)))
} yield (dcQuery, cQueries)
).transactionally
Await.result(dbConfig.db.run(qQuery andThen jQuery andThen cQuery), Duration.Inf)
/* Fire and forget SQL updates. They will be executed asynchronously */
dbConfig.db.run(qQuery andThen jQuery andThen cQuery)
}
}
}
......@@ -10,7 +10,7 @@ import play.api.db.slick.DatabaseConfigProvider
import play.api.i18n.{I18nSupport, MessagesApi}
import play.api.libs.json.{JsResult, Json}
import play.api.mvc.{Action, Controller, Result}
import sget2.{DownloadJob, DownloadJobStatusCode, SGet2}
import sget2.{Defaults, DownloadJob, DownloadJobStatusCode, SGet2}
import slick.driver.JdbcProfile
import scala.concurrent.ExecutionContext.Implicits.global
......@@ -40,7 +40,9 @@ class AddJob @Inject() (val messagesApi: MessagesApi, private val dbConfigProvid
/* GET for the HTML form */
def index() = Action {
Ok(views.html.addJob(addPostForm.fill(AddJobData("", sget2.settings.getOrElse("path", None).getOrElse("")))))
val settings = sget2.settings
Ok(views.html.addJob(addPostForm.fill(AddJobData("", settings.getOrElse(Defaults.SETTING_DOWNLOADPATH, None).getOrElse(""),
user = settings.getOrElse(Defaults.SETTING_USERNAME, None), pass = settings.getOrElse(Defaults.SETTING_PASSWORD, None)))))
}
/* POST with forms */
......
package controllers
import javax.inject.Inject
import entities.{ChunksTable, JobsTable, QueueTable}
import play.api.db.slick.DatabaseConfigProvider
import play.api.i18n.{I18nSupport, MessagesApi}
import play.api.libs.json.Json
import play.api.mvc.{Action, Controller}
import sget2.{DownloadObjectMapSettings, SGet2}
import slick.driver.JdbcProfile
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.Future
/**
* Created by sgyurko on 02/09/2016.
*/
final case class JobJson(state: String, currentSpeed: Double, lastCheck: Long, progress: Double, ETA: Long, maxThreads: Int,
bandwidthLimit: Int, startTime: Long, finishTime: Long, size: Long, alive: Boolean, activeThreads: Int,
url: String, path: String, id: Int, downloaded: Long, chunkMatrix: DownloadObjectMapSettings)
class Job @Inject() (val messagesApi: MessagesApi, private val dbConfigProvider: DatabaseConfigProvider, private val sget2: SGet2) extends Controller with I18nSupport {
private val dbConfig = dbConfigProvider.get[JdbcProfile]
import dbConfig.driver.api._
private val jobs = TableQuery[JobsTable]
private val queue = TableQuery[QueueTable]
private val chunks = TableQuery[ChunksTable]
def json(id: Int) = Action {
implicit val jobWrites = Json.writes[JobJson]
val jobOption = sget2.downloadQueue.job(id)
if (jobOption.nonEmpty) {
val job = jobOption.get
val data = JobJson(job.status.toString, job.speed()._1, job.lastCheck._1, job.progress(), job.eta(), job.maxThreads,
job.bandwidthLimit, job.startTime, job.finishTime, job.size, job.alive, job.activeThreads,
job.url.toString, job.path, job.id, job.chunkMatrix.downloaded, job.persist().chunkMatrix)
Ok(Json.toJson(data))
} else {
NotFound
}
}
def pause(id: Int) = Action {
val jobOption = sget2.downloadQueue.job(id)
if (jobOption.nonEmpty) {
sget2.downloadQueue.stop(id)
Redirect(routes.Queue.index())
} else {
NotFound
}
}
def start(id: Int) = Action {
val jobOption = sget2.downloadQueue.job(id)
if (jobOption.nonEmpty) {
sget2.downloadQueue.start(id)
Redirect(routes.Queue.index())
} else {
NotFound
}
}
def delete(id: Int) = Action.async { implicit request
val jobOption = sget2.downloadQueue.job(id)
if (jobOption.nonEmpty) {
sget2.downloadQueue.stop(id)
sget2.downloadQueue.jobList -= jobOption.get
/* Delete it from the DB */
val dChunks = chunks.filter(_.jobId === id).delete
val dQueue = queue.filter(_.jobId === id).delete
val dJob = jobs.filter(_.id === id).delete
dbConfig.db.run((dChunks andThen dQueue andThen dJob).transactionally).map {
res
Redirect(routes.Queue.index())
}
} else {
Future.successful(NotFound)
}
}
}
......@@ -11,7 +11,15 @@ import sget2.SGet2
*/
class Queue @Inject() (val messagesApi: MessagesApi, val sget2: SGet2) extends Controller with I18nSupport {
def index() = Action {
def index = Action {
Ok(views.html.queue(sget2.downloadQueue.jobList))
}
def js = Action {
Ok(views.js.queue()).as("text/javascript utf-8")
}
def home = Action {
Redirect(routes.Queue.index())
}
}
package controllers
import javax.inject.Inject
import play.api.data.Form
import play.api.data.Forms._
import play.api.db.slick.DatabaseConfigProvider
import play.api.i18n.{I18nSupport, MessagesApi}
import play.api.mvc.{Action, Controller}
import sget2.{Defaults, SGet2}
import scala.util.Try
import scala.concurrent.ExecutionContext.Implicits.global
/**
* Created by sgyurko on 02/09/2016.
*/
final case class SettingsModel(downloadPath: Option[String],
maxActiveJobs: Option[Int], user: Option[String], pass: Option[String])
class Settings @Inject() (val messagesApi: MessagesApi, private val dbConfigProvider: DatabaseConfigProvider, private val sget2: SGet2) extends Controller with I18nSupport {
private val settingsPostForm = Form(
mapping(
"downloadPath" optional(text),
"maxActiveJobs" optional(number(min = 1, max = Defaults.MAX_ACTIVE_JOBS)),
"user" optional(text),
"pass" optional(text)
)(SettingsModel.apply)(SettingsModel.unapply)
)
def index = Action {
Ok(views.html.settings(settingsPostForm.fill(getSettings)))
}
def save = Action.async(parse.form(settingsPostForm, onErrors = (formWithErrors: Form[SettingsModel]) Redirect(routes.Settings.index()))) { implicit request
val toSave = request.body
(for {
d sget2.saveSetting(Defaults.SETTING_DOWNLOADPATH, toSave.downloadPath)
m sget2.saveSetting(Defaults.SETTING_MAX_ACTIVE_JOBS, toSave.maxActiveJobs.flatMap(i Try(i.toString).toOption))
u sget2.saveSetting(Defaults.SETTING_USERNAME, toSave.user)
p sget2.saveSetting(Defaults.SETTING_PASSWORD, toSave.pass)
} yield (d, m)).map {
res
Redirect(routes.Queue.index())
}
}
private def getSettings: SettingsModel = {
val settings = sget2.settings
SettingsModel(settings.getOrElse(Defaults.SETTING_DOWNLOADPATH, None),
Some(settings.getOrElse(Defaults.SETTING_MAX_ACTIVE_JOBS, Some(Defaults.MAX_ACTIVE_JOBS.toString)).get.toInt),
settings.getOrElse(Defaults.SETTING_USERNAME, None), settings.getOrElse(Defaults.SETTING_PASSWORD, None)
)
}
}
package controllers
import javax.inject.Inject
import play.api.Logger
import play.api.libs.functional.syntax._
import play.api.libs.json._
import play.api.libs.ws.WSClient
import play.api.mvc._
import scala.concurrent.ExecutionContext.Implicits.global
final case class WeatherData(temp: String, unit: String)
class WSSample @Inject()(ws: WSClient) extends Controller {
def index = Action.async {
val weatherWS = ws.url("https://query.yahooapis.com/v1/public/yql?q=select%20*%20from%20weather.forecast%20where%20woeid%20in%20(select%20woeid%20from%20geo.places(1)%20where%20text%3D%22London%2C%20UK%22)&format=json&env=store%3A%2F%2Fdatatables.org%2Falltableswithkeys").get()
val result = for {
res weatherWS
} yield {
res.status match {
case 200
implicit val weatherDataReads: Reads[WeatherData] = (
(JsPath \ "query" \ "results" \ "channel" \ "item" \ "condition" \ "temp").read[String] and
(JsPath \ "query" \ "results" \ "channel" \ "units" \ "temperature").read[String]
)(WeatherData)
res.json.validate[WeatherData] match {
case c: JsSuccess[WeatherData]
val weatherData = c.getOrElse(WeatherData("0", "C"))
implicit val weatherDataWrites = Json.writes[WeatherData]
Ok(Json.toJson(weatherData))
//Ok(views.html.index(s"Temperature is ${weatherData.temp} ${weatherData.unit}"))
case e: JsError
Logger.warn(e.toString)
Ok(views.html.index("Cannot read weather data from Yahoo"))
}
case _ Ok(views.html.index(s"Status from Yahoo was: ${res.status}"))
}
}
Logger.debug("Returning async result")
result
}
}
\ No newline at end of file
package sget2
import actors.{JobFinishedEvent, PersistEvent}
import org.slf4j.LoggerFactory
import play.api.Logger
import scala.io.Codec
......@@ -38,7 +37,7 @@ abstract class AbstractWorker(val downloadJob: DownloadJob, val sget2: SGet2) ex
downloadJob.status = DownloadJobStatusCode.STARTED
val numberOfThreads = Math.min(downloadJob.chunkMatrix.size(), downloadJob.maxThreads)
Logger.info(s"Number of possible threads with block size ${downloadJob.chunkMatrix.BLOCK_SIZE} is ${numberOfThreads}")
Logger.info(s"Number of possible threads with block size ${downloadJob.chunkMatrix.BLOCK_SIZE} is $numberOfThreads")
downloadJob.maxThreads = numberOfThreads
while (!downloadJob.failed && (downloadJob.chunkMatrix.findEmptyPosition() != -1 || threadGroup.activeCount() > 0)) {
......@@ -70,8 +69,6 @@ abstract class AbstractWorker(val downloadJob: DownloadJob, val sget2: SGet2) ex
downloadJob.status = DownloadJobStatusCode.FAILED
}
} catch {
case e: IllegalArgumentException
Logger.info(s"Worker thread reported an unrecoverable fault. Terminating.")
case e: InterruptedException
Logger.info(s"Worker thread ${downloadJob.id} is interrupted")
val threads = new Array[Thread](threadGroup.activeCount())
......@@ -80,7 +77,7 @@ abstract class AbstractWorker(val downloadJob: DownloadJob, val sget2: SGet2) ex
threads.filter(_ != Thread.currentThread()).foreach(_.join())
Logger.info("Workers are all terminated")
downloadJob.chunkMatrix.resetDownloading()
downloadJob.status = DownloadJobStatusCode.WAITING
downloadJob.status = DownloadJobStatusCode.PAUSED
downloadJob.activeThreads = 0
} finally {
downloadJob.alive = false
......
......@@ -11,9 +11,14 @@ object Defaults {
final val MAX_ACTIVE_JOBS = 128
final val BUFFER_SIZE_SSD = 32768
final val BLOCK_SIZE_MATRIX = Map(
1048576000 -> 10485760,
104857600 -> 1048576,
10485760 -> 524288
1048576000 10485760,
104857600 1048576,
10485760 524288
)
final val SETTING_DOWNLOADPATH = "downloadPath"
final val SETTING_MAX_ACTIVE_JOBS = "maxActiveJobs"
final val SETTING_BANDWIDTHLIMIT = "bandwidthLimit"
final val SETTING_USERNAME = "username"
final val SETTING_PASSWORD = "password"
}
package sget2
import _root_.java.net.URL
import java.net.URL
import java.io.{File, IOException, RandomAccessFile}
import actors.PersistEvent
import play.api.Logger
import play.api.libs.json.Json
import sget2.DownloadJobStatusCode.DownloadJobStatusCode
case class DownloadObjectMapSettings(chunks: Array[ChunkEntrySerialized], blockSize: Long)
sealed case class DownloadObjectMapSettings(chunks: Array[ChunkEntrySerialized], blockSize: Long)
object DownloadObjectMapSettings {
implicit val writes = Json.writes[DownloadObjectMapSettings]
}
case class DownloadJobSettings( chunkMatrix: DownloadObjectMapSettings,
sealed case class DownloadJobSettings( chunkMatrix: DownloadObjectMapSettings,
status: Int,
username: Option[String],
password: Option[String],
......@@ -31,7 +35,7 @@ case class DownloadJobSettings( chunkMatrix: DownloadObjectMapSettings,
*/
class DownloadJob (private var _url: URL, private var _path: String)(implicit private val sget2: SGet2) {
private var targetFileInitialized: Boolean = false
var chunkMatrix: DownloadObjectMap = new DownloadObjectMap(sget2)
var chunkMatrix: DownloadObjectMap = new DownloadObjectMap
var lastSpeed: (Double, Double) = (0, 0)
var lastCheck: (Long, Long) = (0, 0)
var failed = false
......@@ -50,7 +54,7 @@ class DownloadJob (private var _url: URL, private var _path: String)(implicit pr
/* Some built in magic for block size, based on size */
def getBlockSizeForFileSize(filesize: Long): Long = {
for ((k, v) <- Defaults.BLOCK_SIZE_MATRIX) {
for ((k, v) Defaults.BLOCK_SIZE_MATRIX) {
if (filesize > k) return v
}
chunkMatrix.BLOCK_SIZE
......@@ -73,7 +77,7 @@ class DownloadJob (private var _url: URL, private var _path: String)(implicit pr
Since the persistance layer might already have initialized the chunkMatrix, we need to reset it as it would
lead to compromised file.
*/
chunkMatrix = new DownloadObjectMap(sget2)
chunkMatrix = new DownloadObjectMap
sget2.postEvent(PersistEvent(forceSave = true))
}
targetFileInitialized = true
......@@ -83,7 +87,7 @@ class DownloadJob (private var _url: URL, private var _path: String)(implicit pr
chunkMatrix.initialize(size, getBlockSizeForFileSize(fileTest.length()))
}
} catch {
case e: IOException =>
case e: IOException
Logger.warn(s"Could not create random access file: ${path}");
}
}
......@@ -105,6 +109,8 @@ class DownloadJob (private var _url: URL, private var _path: String)(implicit pr
* @return speed
*/
def speed(): (Double, Double) = {
if (status != DownloadJobStatusCode.STARTED) (0.0, 0.0)
if (initialized()) {
val actualCheck = (System.currentTimeMillis(), chunkMatrix.downloaded)
......@@ -132,6 +138,8 @@ class DownloadJob (private var _url: URL, private var _path: String)(implicit pr
* @return Estimated finish time in milliseconds
*/
def eta(): Long = {
if (status != DownloadJobStatusCode.STARTED) return -1
val elapsedTime = (System.currentTimeMillis() - startTime) / 1000
val downloadedBytes = chunkMatrix.getNumberOfFinishedSlots * chunkMatrix.BLOCK_SIZE
......
......@@ -10,4 +10,5 @@ object DownloadJobStatusCode extends Enumeration {
val STARTED = Value(2)
val FAILED = Value(3)
val FINISHED = Value(4)
val PAUSED = Value(5)
}
package sget2
import javax.inject.Inject
import actors.PersistEvent
import play.api.libs.json.Json
import sget2.ChunkStatus._
sealed case class ChunkEntry(status: ChunkStatus, size: Int)
sealed case class ChunkEntrySerialized(status: Int, size: Int)
object ChunkEntrySerialized {
implicit val writes = Json.writes[ChunkEntrySerialized]
}
/**
* Created by GYURKS on 14/01/2015.
......@@ -16,7 +18,7 @@ sealed case class ChunkEntrySerialized(status: Int, size: Int)
*
* @author GYURKS
*/
class DownloadObjectMap (sget2: SGet2) {
class DownloadObjectMap (implicit private val sget2: SGet2) {
private var chunks: Array[ChunkEntry] = Array()
var BLOCK_SIZE: Long = 65536
......@@ -42,7 +44,7 @@ class DownloadObjectMap (sget2: SGet2) {
/* Allocate & Initialize the chunk map with EMPTY */
this.synchronized {
chunks = new Array(numberOfChunks)
for (i <- chunks.indices) {
for (i chunks.indices) {
chunks(i) = ChunkEntry(EMPTY, 0)
}
}
......@@ -55,7 +57,7 @@ class DownloadObjectMap (sget2: SGet2) {
*/
def findEmptyPosition(): Int = {
chunks.synchronized {
for ((slot, index) <- chunks.zipWithIndex) {
for ((slot, index) chunks.zipWithIndex) {
if (slot.status == EMPTY) return index
}
}
......@@ -156,7 +158,7 @@ class DownloadObjectMap (sget2: SGet2) {
def persist(): DownloadObjectMapSettings = {
this.synchronized {
if (initialized) {
DownloadObjectMapSettings(chunks.map(e => ChunkEntrySerialized(e.status.id, e.size)), BLOCK_SIZE)
DownloadObjectMapSettings(chunks.map(e ChunkEntrySerialized(e.status.id, e.size)), BLOCK_SIZE)
} else {
DownloadObjectMapSettings(Array(), BLOCK_SIZE)
}
......@@ -164,14 +166,14 @@ class DownloadObjectMap (sget2: SGet2) {
}
def load(downloadObjectMapSettings: DownloadObjectMapSettings) = {
chunks = downloadObjectMapSettings.chunks.map(e => ChunkEntry(ChunkStatus(e.status), e.size))
chunks = downloadObjectMapSettings.chunks.map(e ChunkEntry(ChunkStatus(e.status), e.size))
resetDownloading()
BLOCK_SIZE = downloadObjectMapSettings.blockSize
}
def resetDownloading() = {
chunks.synchronized {
chunks = chunks.map(c => if (c.status == ChunkStatus.DOWNLOADING) ChunkEntry(ChunkStatus.EMPTY, c.size) else c)
chunks = chunks.map(c if (c.status == ChunkStatus.DOWNLOADING) ChunkEntry(ChunkStatus.EMPTY, c.size) else c)
}
}
......
......@@ -7,10 +7,9 @@ import play.api.db.slick.DatabaseConfigProvider
import slick.driver.JdbcProfile
import workers.{FtpWorker, HttpWorker}
import scala.concurrent.ExecutionContext.Implicits.global
import scala.collection.mutable.ListBuffer
import scala.concurrent.Await
import scala.concurrent.duration.Duration
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.Future
case class DownloadQueueSettings(jobList: List[DownloadJobSettings],
bandwidthLimit: Int)
......@@ -24,6 +23,11 @@ class DownloadQueue (private val sget2: SGet2, private val dbConfigProvider: Dat
val jobList = new ListBuffer[DownloadJob]()
private var _bandwidthLimit: Int = 0
private val threadGroup = new ThreadGroup("DownloadQueue")
private val dbConfig = dbConfigProvider.get[JdbcProfile]
import dbConfig.driver.api._
private val chunks = TableQuery[ChunksTable]
private val queue = TableQuery[QueueTable]
private val jobs = TableQuery[JobsTable]
/* Accessor/mutator for bandwidthLimit */
def bandwidthLimit = _bandwidthLimit
......@@ -41,7 +45,7 @@ class DownloadQueue (private val sget2: SGet2, private val dbConfigProvider: Dat
* Starts all jobs
*/
def start(): Unit = {
jobList.filter(j => j.status == DownloadJobStatusCode.WAITING || j.status == DownloadJobStatusCode.NOTHING).foreach(start)
jobList.filter(j j.status == DownloadJobStatusCode.WAITING || j.status == DownloadJobStatusCode.NOTHING).foreach(start)
}
/**
......@@ -58,14 +62,14 @@ class DownloadQueue (private val sget2: SGet2, private val dbConfigProvider: Dat
* @param job DownloadJob object
*/
private def start(job: DownloadJob): Unit = {
val maxActiveJobs = sget2.settings.getOrElse("maxActiveJobs", None)
val maxActiveJobs = sget2.settings.getOrElse(Defaults.SETTING_MAX_ACTIVE_JOBS, None)
if (maxActiveJobs.nonEmpty && activeJobs == maxActiveJobs.get.toInt) return
val worker = job.url.getProtocol match {
case "http" => Some(new HttpWorker(job, sget2))
case "https" => Some(new HttpWorker(job, sget2))
case "ftp" => Some(new FtpWorker(job, sget2))
case _ =>
case "http" Some(new HttpWorker(job, sget2))
case "https" Some(new HttpWorker(job, sget2))
case "ftp" Some(new FtpWorker(job, sget2))
case _
LOGGER.warn("Unknown protocol: {}, can't find a worker implementation for it", job.url.getProtocol)
None