Merge pull request #5 from TORISOUP/add_threadpool_scheduler

Add ThreadPoolScheduler
This commit is contained in:
ousttrue 2018-04-19 02:07:12 +09:00 committed by GitHub
commit 025afe2b45
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 177 additions and 132 deletions

View File

@ -542,7 +542,7 @@ namespace VRM
var schedulable = Schedulable.Create();
schedulable
.AddTask(MainThreadDispatcher.Instance.ThreadScheduler, () =>
.AddTask(Scheduler.ThreadPool, () =>
{
ctx.GLTF.baseDir = Path.GetDirectoryName(ctx.Path);
foreach (var buffer in ctx.GLTF.buffers)
@ -551,44 +551,44 @@ namespace VRM
}
return Unit.Default;
})
.ContinueWith(MainThreadDispatcher.Instance.ThreadScheduler, _ =>
.ContinueWith(Scheduler.ThreadPool, _ =>
{
return glTF_VRM_Material.Parse(ctx.Json);
})
.ContinueWith(MainThreadDispatcher.Instance.UnityScheduler, x =>
.ContinueWith(Scheduler.MainThread, x =>
{
// material function
ctx.CreateMaterial = VRMImporter.GetMaterialFunc(x);
})
.OnExecute(MainThreadDispatcher.Instance.UnityScheduler, parent =>
.OnExecute(Scheduler.ThreadPool, parent =>
{
// textures
for (int i = 0; i < ctx.GLTF.textures.Count; ++i)
{
var index = i;
parent.AddTask(MainThreadDispatcher.Instance.UnityScheduler,
parent.AddTask(Scheduler.MainThread,
() => gltfImporter.ImportTexture(ctx.GLTF, index))
.ContinueWith(MainThreadDispatcher.Instance.ThreadScheduler, x => ctx.Textures.Add(x));
.ContinueWith(Scheduler.ThreadPool, x => ctx.Textures.Add(x));
}
})
.ContinueWithCoroutine(MainThreadDispatcher.Instance.UnityScheduler, () => LoadMaterials(ctx))
.OnExecute(MainThreadDispatcher.Instance.UnityScheduler, parent =>
.ContinueWithCoroutine(Scheduler.MainThread, () => LoadMaterials(ctx))
.OnExecute(Scheduler.ThreadPool, parent =>
{
// meshes
for (int i = 0; i < ctx.GLTF.meshes.Count; ++i)
{
var index = i;
parent.AddTask(MainThreadDispatcher.Instance.ThreadScheduler,
parent.AddTask(Scheduler.ThreadPool,
() => gltfImporter.ReadMesh(ctx, index))
.ContinueWith(MainThreadDispatcher.Instance.UnityScheduler, x => gltfImporter.BuildMesh(ctx, x))
.ContinueWith(MainThreadDispatcher.Instance.ThreadScheduler, x => ctx.Meshes.Add(x))
.ContinueWith(Scheduler.MainThread, x => gltfImporter.BuildMesh(ctx, x))
.ContinueWith(Scheduler.ThreadPool, x => ctx.Meshes.Add(x))
;
}
})
.ContinueWithCoroutine(MainThreadDispatcher.Instance.UnityScheduler, () => LoadNodes(ctx))
.ContinueWithCoroutine(MainThreadDispatcher.Instance.UnityScheduler, () => BuildHierarchy(ctx))
.ContinueWith(MainThreadDispatcher.Instance.UnityScheduler, _ => VRMImporter.OnLoadModel(ctx))
.Subscribe(MainThreadDispatcher.Instance.UnityScheduler,
.ContinueWithCoroutine(Scheduler.MainThread, () => LoadNodes(ctx))
.ContinueWithCoroutine(Scheduler.MainThread, () => BuildHierarchy(ctx))
.ContinueWith(Scheduler.MainThread, _ => VRMImporter.OnLoadModel(ctx))
.Subscribe(Scheduler.MainThread,
_ =>
{
/*

View File

@ -2,7 +2,6 @@
using System.Collections.Generic;
using UnityEngine;
namespace UniTask
{
/// <summary>
@ -11,37 +10,6 @@ namespace UniTask
/// </summary>
public class MainThreadDispatcher : MonoBehaviour
{
StepScheduler m_unityScheduler;
/// <summary>
/// Dequeueとタスク実行がUnityのMainThread上であるこを保証する
/// </summary>
public StepScheduler UnityScheduler
{
get
{
if (m_unityScheduler == null)
{
m_unityScheduler = new StepScheduler();
}
return m_unityScheduler;
}
}
ThreadScheduler m_threadScheduler;
/// <summary>
/// Dequeuとタスク実行がWorkerThread上で実行される
/// </summary>
public ThreadScheduler ThreadScheduler
{
get
{
if (m_threadScheduler == null)
{
m_threadScheduler = new ThreadScheduler();
}
return m_threadScheduler;
}
}
[Header("Debug")]
public int TaskCount;
@ -61,7 +29,7 @@ namespace UniTask
private void Update()
{
TaskCount = UnityScheduler.UpdateAndGetTaskCount();
TaskCount = Scheduler.MainThread.UpdateAndGetTaskCount();
}
static MainThreadDispatcher instance;
@ -166,10 +134,9 @@ namespace UniTask
initialized = instance != null;
}
if (m_threadScheduler != null)
if (Scheduler.SingleWorkerThread != null)
{
m_threadScheduler.Dispose();
m_threadScheduler = null;
Scheduler.SingleWorkerThread.Dispose();
}
}

View File

@ -1,6 +1,8 @@
namespace UniTask
using System;
namespace UniTask
{
public interface IScheduler
public interface IScheduler : IDisposable
{
void Enqueue(TaskChain item);
}

View File

@ -1,30 +1,52 @@
namespace UniTask
{
public class StepScheduler : IScheduler
public static partial class Scheduler
{
LockQueue<TaskChain> m_taskQueue = new LockQueue<TaskChain>();
public void Enqueue(TaskChain item)
private static StepScheduler mainThread;
public static StepScheduler MainThread
{
m_taskQueue.Enqueue(item);
get
{
if (mainThread != null) return mainThread;
mainThread = new StepScheduler();
MainThreadDispatcher.Initialize();
return mainThread;
}
}
TaskChain m_chain;
public int UpdateAndGetTaskCount()
public class StepScheduler : IScheduler
{
if (m_chain != null)
LockQueue<TaskChain> m_taskQueue = new LockQueue<TaskChain>();
public void Enqueue(TaskChain item)
{
var status=m_chain.Next();
if(status==ExecutionStatus.Continue)
{
// m_item継続中
return m_taskQueue.Count;
}
m_chain = null;
m_taskQueue.Enqueue(item);
}
int count;
m_chain = m_taskQueue.Dequeue(out count);
return count;
TaskChain m_chain;
public int UpdateAndGetTaskCount()
{
if (m_chain != null)
{
var status = m_chain.Next();
if (status == ExecutionStatus.Continue)
{
// m_item継続中
return m_taskQueue.Count;
}
m_chain = null;
}
int count;
m_chain = m_taskQueue.Dequeue(out count);
return count;
}
public void Dispose()
{
}
}
}
}

View File

@ -32,7 +32,7 @@ namespace UniTask
if (item.Enumerator.Current.Schedulder == null)
{
// default
MainThreadDispatcher.Instance.UnityScheduler.Enqueue(item);
Scheduler.MainThread.Enqueue(item);
}
else
{

View File

@ -0,0 +1,42 @@
using System;
namespace UniTask
{
public static partial class Scheduler
{
private static IScheduler threadPool;
public static IScheduler ThreadPool
{
get { return threadPool ?? (threadPool = new ThreadPoolScheduler()); }
}
public class ThreadPoolScheduler : IScheduler
{
public void Enqueue(TaskChain item)
{
System.Threading.ThreadPool.QueueUserWorkItem(_ =>
{
if (item == null)
{
return;
}
while (true)
{
var status = item.Next();
if (status != ExecutionStatus.Continue)
{
break;
}
}
});
}
public void Dispose()
{
}
}
}
}

View File

@ -3,87 +3,99 @@ using System.Threading;
namespace UniTask
{
public class ThreadScheduler : IScheduler, IDisposable
public static partial class Scheduler
{
MonitorQueue<TaskChain> m_queue = new MonitorQueue<TaskChain>();
private static IScheduler singleWorkerTrehad;
Thread m_thread;
public ThreadScheduler()
public static IScheduler SingleWorkerThread
{
// start worker thread
m_thread = new Thread(new ParameterizedThreadStart(Worker));
m_thread.Start(m_queue);
get { return singleWorkerTrehad ?? (singleWorkerTrehad = new ThreadScheduler()); }
}
static void Worker(Object arg)
public class ThreadScheduler : IScheduler
{
MonitorQueue<TaskChain> queue = (MonitorQueue<TaskChain>)arg;
while (true)
{
var chain = queue.Dequeue();
if (chain == null)
{
break;
}
MonitorQueue<TaskChain> m_queue = new MonitorQueue<TaskChain>();
Thread m_thread;
public ThreadScheduler()
{
// start worker thread
m_thread = new Thread(new ParameterizedThreadStart(Worker));
m_thread.Start(m_queue);
}
static void Worker(Object arg)
{
MonitorQueue<TaskChain> queue = (MonitorQueue<TaskChain>)arg;
while (true)
{
var status = chain.Next();
if (status != ExecutionStatus.Continue)
var chain = queue.Dequeue();
if (chain == null)
{
break;
}
}
}
// end
}
public void Enqueue(TaskChain item)
{
m_queue.Enqueue(item);
}
#region IDisposable Support
private bool disposedValue = false; // 重複する呼び出しを検出するには
protected virtual void Dispose(bool disposing)
{
if (!disposedValue)
{
if (disposing)
{
// TODO: マネージ状態を破棄します (マネージ オブジェクト)。
if (m_thread != null)
while (true)
{
m_queue.Enqueue(null);
m_thread.Join();
m_thread = null;
var status = chain.Next();
if (status != ExecutionStatus.Continue)
{
break;
}
}
}
// TODO: アンマネージ リソース (アンマネージ オブジェクト) を解放し、下のファイナライザーをオーバーライドします。
// TODO: 大きなフィールドを null に設定します。
disposedValue = true;
// end
}
}
// TODO: 上の Dispose(bool disposing) にアンマネージ リソースを解放するコードが含まれる場合にのみ、ファイナライザーをオーバーライドします。
// ~ThreadScheduler() {
// // このコードを変更しないでください。クリーンアップ コードを上の Dispose(bool disposing) に記述します。
// Dispose(false);
// }
public void Enqueue(TaskChain item)
{
m_queue.Enqueue(item);
}
// このコードは、破棄可能なパターンを正しく実装できるように追加されました。
public void Dispose()
{
// このコードを変更しないでください。クリーンアップ コードを上の Dispose(bool disposing) に記述します。
Dispose(true);
// TODO: 上のファイナライザーがオーバーライドされる場合は、次の行のコメントを解除してください。
// GC.SuppressFinalize(this);
#region IDisposable Support
private bool disposedValue = false; // 重複する呼び出しを検出するには
protected virtual void Dispose(bool disposing)
{
if (!disposedValue)
{
if (disposing)
{
// TODO: マネージ状態を破棄します (マネージ オブジェクト)。
if (m_thread != null)
{
m_queue.Enqueue(null);
m_thread.Join();
m_thread = null;
}
}
// TODO: アンマネージ リソース (アンマネージ オブジェクト) を解放し、下のファイナライザーをオーバーライドします。
// TODO: 大きなフィールドを null に設定します。
disposedValue = true;
}
}
// TODO: 上の Dispose(bool disposing) にアンマネージ リソースを解放するコードが含まれる場合にのみ、ファイナライザーをオーバーライドします。
// ~ThreadScheduler() {
// // このコードを変更しないでください。クリーンアップ コードを上の Dispose(bool disposing) に記述します。
// Dispose(false);
// }
// このコードは、破棄可能なパターンを正しく実装できるように追加されました。
public void Dispose()
{
// このコードを変更しないでください。クリーンアップ コードを上の Dispose(bool disposing) に記述します。
Dispose(true);
// TODO: 上のファイナライザーがオーバーライドされる場合は、次の行のコメントを解除してください。
// GC.SuppressFinalize(this);
}
#endregion
}
#endregion
}
}