我正在研究一种使用 ContinueWith 来完成以下任务的方法:
- 原始应用程序提交(非并发)作业
- 作业启动,进行一些处理,在别处调用 Web 服务
- 工作“完成”(进入静止状态)
- Web 服务在完成其内部处理后提交“继续”作业(可能是 30 分钟,也可能是 2 天)
- 以某种方式调用了 Jobs.ContinueWith?
- 作业终于完成,原来的作业被标记为完成
我遇到的是太多事件部件。原始应用程序是 C#/MVC 应用程序。用户做他的事,最后提交一个长时间运行的作业来执行。作业处理器(C# 库)会做一些工作,然后调用 JAVA SOAP 端点来传递初始处理的结果。 JAVA SOAP 端点调用 COTS 应用程序来执行处理的首当其冲,然后用“我完成了”回调作业。
如您所见,我没有明确的方法来执行以下操作:
var parentId = _jobs.Enqueue(x => x.StartExecution(job.Id));
_jobs.ContinueWith(parentId, x => x.JAVA_EXECUTION(job.Id)); // this part is not in my control!
_jobs.ContinueWith(parentId, x => x.ContinueExecution(job.Id));
我确实有一个 REST 服务 (POST),我正在使用它作为开始工作的唯一方式。基本上,传递一个格式良好的有效负载(JSON), Controller 从 IoC 容器中选择作业对象,决定它是哪种类型的作业(临时、重复、连续等),然后执行正确的 Hangfire 调用以入队它。 JAVA 端点也可以轻松调用此 REST 服务。
[HttpPost]
public string Post()
{
// safety checks removed for brevity...
var command = new MinimumCommandModel(Request.Content.ReadAsStringAsync().Result);
return GetPostPipeline().Handle(command).Id;
}
private static IRequestHandler GetPostPipeline()
{
return new MediatorPipeline
(new QueuePostMediator()
, new IPreRequestHandler[]
{
new PreJobLogger(),
new PreJobExistsValidator(),
new PreJobPropertiesValidator()
}
, new IPostRequestHandler[]
{
new PostJobLogger()
}
);
}
QueuePostMediator 处理作业类型的细节(AdHoc 等)。我现在正在尝试编写延续处理程序,但对如何进行此操作感到有点受阻。我当然不想在 Hangfire 之外进行任何类型的阻止操作。当原始作业最初未与原始作业的 parentId 连接时,我不确定如何“开始”另一项作业作为原始作业的延续。
基本上,如果我可以从工作内部暂停工作,直到外部刺激告诉 hangfire 继续工作,那我就是黄金了。不过,我还没有破解如何实现这一点。
想法?想法?
好的。我已经想出一个 hack 来让它工作。
我正在使用一种策略模式来运行每个作业的不同部分。我有一个名为 Handoff 的 JobStatus,现在执行此操作:
public class Processing : BaseJobExecutor, IJobExecutor
{
public Processing(JobPingPong job) : base(job, JobStatus.Processing) {}
public void Handle()
{
JobInfo.JobStatus = JobStatus.ExtProcessing;
JobInfo.HangfireParentJobId = JobInfo.HangfireJobId;
Payload.PostToQueueText(@"http://localhost:8080/api/clone");
// Pause the current job (this is the parent job) so the outside web service has a chance to complete...
var enqueuedIn = new TimeSpan(0, 6, 0, 0); // 6 hours out...
JobPutOnHold(JobInfo.HangfireJobId, enqueuedIn);
// The next status to be executed upon hydration...
JobInfo.JobStatus = JobStatus.Complete;
Job.CachePut();
// Signal the job executor that this job is "done" due to an outside process needing to run...
JobInfo.JobStatus = JobStatus.Handoff;
}
}
public void JobPutOnHold(string jobId, TimeSpan enqueuedIn)
{
var jobClient = new BackgroundJobClient();
jobClient.ChangeState(jobId, new ScheduledState(enqueuedIn));
}
现在,在策略执行器中我可以这样做:
public string Execute(IServerFilter jobContext, IJobCancellationToken cancellationToken)
{
while (Payload.JobInfo.JobStatus != JobStatus.Done)
{
cancellationToken?.ThrowIfCancellationRequested();
var jobStrategy = new JobExecutorStrategy(Executors);
Payload = jobStrategy.Execute(Payload);
if (Payload.JobInfo.JobStatus == JobStatus.Handoff)
phá vỡ;
}
return PayloadAsString;
}
作业的第二部分与第一部分相同,但来自外部服务并具有 ExtComplete 状态,这允许作业根据来自外部世界的结果(存储在D B)。像这样:
public class ExtComplete : BaseJobExecutor, IJobExecutor
{
public ExtComplete(JobPingPong job) : base(job, JobStatus.ExtComplete) { }
public void Handle()
{
// do post processing here...
Payload.Tokens = null;
JobInfo.JobStatus = JobStatus.Complete;
if (JobInfo.HangfireJobId != JobContext.JobId || JobInfo.HangfireParentJobId == JobInfo.HangfireJobId)
{
JobInfo.HangfireParentJobId = JobInfo.HangfireJobId;
JobInfo.HangfireJobId = JobContext.JobId;
}
// Enqueue the previous (parent) job so it can complete...
JobExecuteNow(JobInfo.HangfireParentJobId);
}
}
public void JobExecuteNow(string jobId)
{
var enqueuedIn = new TimeSpan(0, 0, 0, 15);
var jobClient = new BackgroundJobClient();
jobClient.ChangeState(jobId, new ScheduledState(enqueuedIn));
}
最终,时间将由配置驱动,但现在我将其设置为让第一个作业在 15 秒内开始执行。
我面对这种方法的唯一挑战是进来的作业有效负载是在任何处理发生之前的原始有效负载。这就是为什么您会看到上面的“缓存”。当作业重新启动时,我检查是否存在该 Hangfire JobId 的缓存,如果存在,则从缓存中加载最后已知的有效负载,然后让执行程序继续其愉快的方式。
到目前为止效果很好。
注意:我仍在尝试学习如何更改/注入(inject) Hangfire 中的命令链和状态对象,以使其更适合 hangfire。我们有一份工作可以调用十几个或更多的外线电话。目前,运行大约需要 12 个小时。
Tôi là một lập trình viên xuất sắc, rất giỏi!