استفاده از Pool در .Net برای Channel RabbitMQ و Database Connection
در صورتی که در برنامه خود نیاز دارید تا از Pool برای دریافت موارد مختلف مانند RabbitMQ Channel به دلیل جلوگیری از سربار ایجاد هردفعه آن استفاده کنید، میتوانید از کد زیر کمک بگیرید.
این کد بصورت جنریک است و علاوه بر مورد گفته شده میتوانید از آن در جاهای مختلف مانند کانکشن دیتابیس نیز استفاده کنید.
Interface:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
namespace Pool;
public interface IPool<T> : IDisposable where T : class
{
/// <summary>
/// Retrieves an item from the pool.
/// </summary>
/// <returns>An item from the pool.</returns>
/// <exception cref="InvalidOperationException">Thrown when the pool fails to create a new resource.</exception>
T GetFromPool();
/// <summary>
/// Retrieves an item from the pool.
/// </summary>
/// <returns>An item from the pool.</returns>
/// <exception cref="InvalidOperationException">Thrown when the pool fails to create a new resource.</exception>
Task<T> GetFromPoolAsync();
/// <summary>
/// Stop Channel pool and Dispose all items
/// </summary>
void Stop();
/// <summary>
/// Returns an item back to the pool.
/// </summary>
/// <param name="item">The item to return to the pool.</param>
/// <exception cref="ArgumentNullException">Thrown when <paramref name="item"/> is null.</exception>
void ReturnToPool(T item);
/// <summary>
/// Get the number of remaining threads that can enter
/// </summary>
/// <returns></returns>
int GetCurrentCount();
/// <summary>
/// Get the current size of the pool.
/// </summary>
/// <returns></returns>
int GetCurrentSize();
/// <summary>
/// Get the maximum size of the pool.
/// </summary>
/// <returns></returns>
int GetMaxSize();
/// <summary>
/// Get the available size of the pool.
/// </summary>
/// <returns></returns>
int GetAvailableSize();
}
کد:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
using System.Collections.Concurrent;
namespace Pool;
/// <summary>
/// Easy Pool
/// </summary>
/// <typeparam name="T">The type of objects to be pooled.</typeparam>
public class Pool<T> : IPool<T> where T : class
{
private readonly TimeSpan _defaultShrinkInterval = TimeSpan.FromMinutes(30);
private readonly System.Timers.Timer _shrinkTimer;
private readonly Action<T> _cleanupAction;
private readonly SemaphoreSlim _semaphore;
private readonly ConcurrentBag<T> _items;
private readonly object _lock = new();
private readonly Func<T> _factory;
private readonly int _maxPoolSize;
private int _currentSize;
private bool _disposed;
/// <summary>
/// Initializes a new instance of the <see cref="Pool{T}"/> class.
/// </summary>
/// <param name="factory">A function to create new instances of <typeparamref name="T"/>.</param>
/// <param name="cleanupAction">Call on Shrink and Dispose to clean <typeparamref name="T"/>. default is call .Dispose if item is Disposable</param>
/// <param name="shrinkInterval">Time interval to shrink unused pools and disposed them, then reset to initPoolSize, default value is 30 min on null param</param>
/// <param name="initPoolSize">The initial number of objects to be created and added to the pool. Default is 100.</param>
/// <param name="maxPoolSize">The maximum number of objects that can be in the pool. Default is <see cref="int.MaxValue"/>.</param>
/// <exception cref="ArgumentNullException">Thrown when <paramref name="factory"/> is null.</exception>
/// <exception cref="ArgumentOutOfRangeException">Thrown when <paramref name="initPoolSize"/> is negative or <paramref name="maxPoolSize"/> is less than or equal to zero.</exception>
/// <exception cref="ArgumentOutOfRangeException">Thrown when <paramref name="shrinkInterval"/> is less than 30 min (just if not null)</exception>
/// <exception cref="ArgumentException">Thrown when <paramref name="maxPoolSize"/> is less than <paramref name="initPoolSize"/>.</exception>
public Pool(Func<T> factory, Action<T>? cleanupAction = null, TimeSpan? shrinkInterval = null, int initPoolSize = 100, int maxPoolSize = int.MaxValue)
{
#if NET8_0
ArgumentNullException.ThrowIfNull(factory);
ArgumentOutOfRangeException.ThrowIfNegativeOrZero(initPoolSize);
#else
if (factory is null)
{
throw new ArgumentNullException(nameof(factory), Resources.Object_Can_Not_Be_Null);
}
if (initPoolSize < 1)
{
throw new ArgumentOutOfRangeException(nameof(initPoolSize), Resources.Can_Not_Be_Zero);
}
#endif
if (maxPoolSize <= 0)
{
throw new ArgumentOutOfRangeException(nameof(maxPoolSize), Resources.Max_pool_Size_Min_Value);
}
if (maxPoolSize < initPoolSize)
{
throw new ArgumentOutOfRangeException(nameof(maxPoolSize), Resources.Max_Pool_Size_More_Than_Init);
}
if (shrinkInterval != null && shrinkInterval < TimeSpan.FromMinutes(30))
{
throw new ArgumentOutOfRangeException(nameof(shrinkInterval), Resources.Min_Shrink_Interval);
}
_items = [];
_factory = factory;
_maxPoolSize = maxPoolSize;
_currentSize = initPoolSize;
_semaphore = new SemaphoreSlim(maxPoolSize, maxPoolSize);
_cleanupAction = cleanupAction ?? (item =>
{
if (item is IDisposable disposable)
{
disposable.Dispose();
}
});
for (var i = 0; i < initPoolSize; i++)
{
_items.Add(Create());
}
#if NET8_0
_shrinkTimer = new System.Timers.Timer(shrinkInterval ?? _defaultShrinkInterval) { AutoReset = true, Enabled = true };
_shrinkTimer.Elapsed += async (_, _) => await Task.Run(() => ShrinkPool(initPoolSize)).ConfigureAwait(false);
#else
var interval = shrinkInterval ?? _defaultShrinkInterval;
_shrinkTimer = new System.Timers.Timer(interval.TotalMilliseconds) { AutoReset = true, Enabled = true };
_shrinkTimer.Elapsed += async (_, _) => await Task.Run(() => ShrinkPool(initPoolSize)).ConfigureAwait(false);
#endif
}
/// <summary>
/// Retrieves an item from the pool.
/// </summary>
/// <returns>An item from the pool.</returns>
/// <exception cref="InvalidOperationException">Thrown when the pool fails to create a new resource.</exception>
public T GetFromPool()
{
_semaphore.Wait();
try
{
return _items.TryTake(out var result) ? result : TryCreate();
}
catch
{
_ = _semaphore.Release();
throw;
}
}
/// <summary>
/// Retrieves an item from the pool.
/// </summary>
/// <returns>An item from the pool.</returns>
/// <exception cref="InvalidOperationException">Thrown when the pool fails to create a new resource.</exception>
public async Task<T> GetFromPoolAsync()
{
await _semaphore.WaitAsync().ConfigureAwait(false);
try
{
return _items.TryTake(out var result) ? result : TryCreate();
}
catch
{
_ = _semaphore.Release();
throw;
}
}
/// <summary>
/// Returns an item back to the pool.
/// </summary>
/// <param name="item">The item to return to the pool.</param>
/// <exception cref="ArgumentNullException">Thrown when <paramref name="item"/> is null.</exception>
public void ReturnToPool(T item)
{
#if NET8_0
ArgumentNullException.ThrowIfNull(item);
#else
if (item is null)
{
throw new ArgumentNullException(nameof(item));
}
#endif
_items.Add(item);
_semaphore.Release();
}
/// <summary>
/// Get the number of remaining threads that can enter
/// </summary>
/// <returns></returns>
public int GetCurrentCount() => _semaphore.CurrentCount;
/// <summary>
/// Get the current size of the pool.
/// </summary>
/// <returns></returns>
public int GetCurrentSize() => _currentSize;
/// <summary>
/// Get the maximum size of the pool.
/// </summary>
/// <returns></returns>
public int GetMaxSize() => _maxPoolSize;
/// <summary>
/// Get the available size of the pool.
/// </summary>
/// <returns></returns>
public int GetAvailableSize() => _maxPoolSize - _currentSize;
/// <summary>
/// Stop Channel pool and Dispose all items
/// </summary>
public void Stop() => Dispose();
/// <summary>
/// Disposes the pool and releases all resources.
/// </summary>
public void Dispose()
{
Dispose(true);
GC.SuppressFinalize(this);
}
/// <summary>
/// Disposes the pool and releases all resources.
/// </summary>
/// <param name="disposing">A boolean value indicating whether the method is called from the Dispose method.</param>
protected virtual void Dispose(bool disposing)
{
lock (_lock)
{
if (_disposed)
{
return;
}
if (disposing)
{
_semaphore.Dispose();
_shrinkTimer.Dispose();
while (_items.TryTake(out var item))
{
_cleanupAction(item);
}
}
_disposed = true;
}
}
private T TryCreate()
{
try
{
var newSize = Interlocked.Increment(ref _currentSize);
if (newSize > _maxPoolSize)
{
_ = Interlocked.Decrement(ref _currentSize);
throw new InvalidOperationException(Resources.Poo_Maximum_Capacity);
}
return Create();
}
catch
{
_ = Interlocked.Decrement(ref _currentSize);
throw;
}
}
private T Create()
{
try
{
var item = _factory();
if (item != null)
{
return item;
}
throw new InvalidOperationException(Resources.Factory_Produced_Null_Item);
}
catch (InvalidOperationException)
{
throw;
}
catch (Exception ex)
{
throw new InvalidOperationException(Resources.Erro_Creation, ex);
}
}
private void ShrinkPool(int initPoolSize)
{
lock (_lock)
{
if (_disposed)
{
return;
}
var itemsToRemove = Math.Max(0, _currentSize - initPoolSize);
for (var i = 0; i < itemsToRemove; i++)
{
try
{
if (_items.TryTake(out var item))
{
_cleanupAction(item);
_ = Interlocked.Decrement(ref _currentSize);
}
else
{
break;
}
}
catch (Exception e)
{
if (Environment.UserInteractive)
{
Console.WriteLine(e);
}
}
}
}
}
}
نمونه استفاده:
1
2
3
4
5
6
7
8
9
10
11
12
13
using Pool;
var pool = new Pool<object>(() => new object(), initPoolSize: 5, maxPoolSize: 10);
try
{
var obj = pool.Get();
pool.Return(obj);
}
finally
{
pool.Dispose();
}
نمونه کد برای ربیت:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
using Pool;
_connection = factory.CreateConnection(endpoints);
var pool = new Pool<IModel>(() => _connection.CreateModel(), initPoolSize: 5, maxPoolSize: 10);
try
{
var obj = pool.Get();
pool.Return(obj);
}
finally
{
pool.Dispose();
}
مورد بالا برای راحتی بصورت Nuget Package نیز درآمده است که میتوانید از آن استفاده کنید.