Skip to content
Projects
Groups
Snippets
Help
This project
Loading...
Sign in / Register
Toggle navigation
N
news
Project
Overview
Details
Activity
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
Sartika Aritonang
news
Commits
006aa4d7
Commit
006aa4d7
authored
4 years ago
by
Sartika Aritonang
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
Upload New File
parent
f654b2f6
master
…
Sqlparse
patch-6
No related merge requests found
Hide whitespace changes
Inline
Side-by-side
Showing
1 changed file
with
97 additions
and
0 deletions
+97
-0
testing.py
stbi/Lib/site-packages/asgiref/testing.py
+97
-0
No files found.
stbi/Lib/site-packages/asgiref/testing.py
0 → 100644
View file @
006aa4d7
import
asyncio
import
time
from
.compatibility
import
guarantee_single_callable
from
.timeout
import
timeout
as
async_timeout
class
ApplicationCommunicator
:
"""
Runs an ASGI application in a test mode, allowing sending of
messages to it and retrieval of messages it sends.
"""
def
__init__
(
self
,
application
,
scope
):
self
.
application
=
guarantee_single_callable
(
application
)
self
.
scope
=
scope
self
.
input_queue
=
asyncio
.
Queue
()
self
.
output_queue
=
asyncio
.
Queue
()
self
.
future
=
asyncio
.
ensure_future
(
self
.
application
(
scope
,
self
.
input_queue
.
get
,
self
.
output_queue
.
put
)
)
async
def
wait
(
self
,
timeout
=
1
):
"""
Waits for the application to stop itself and returns any exceptions.
"""
try
:
async
with
async_timeout
(
timeout
):
try
:
await
self
.
future
self
.
future
.
result
()
except
asyncio
.
CancelledError
:
pass
finally
:
if
not
self
.
future
.
done
():
self
.
future
.
cancel
()
try
:
await
self
.
future
except
asyncio
.
CancelledError
:
pass
def
stop
(
self
,
exceptions
=
True
):
if
not
self
.
future
.
done
():
self
.
future
.
cancel
()
elif
exceptions
:
# Give a chance to raise any exceptions
self
.
future
.
result
()
def
__del__
(
self
):
# Clean up on deletion
try
:
self
.
stop
(
exceptions
=
False
)
except
RuntimeError
:
# Event loop already stopped
pass
async
def
send_input
(
self
,
message
):
"""
Sends a single message to the application
"""
# Give it the message
await
self
.
input_queue
.
put
(
message
)
async
def
receive_output
(
self
,
timeout
=
1
):
"""
Receives a single message from the application, with optional timeout.
"""
# Make sure there's not an exception to raise from the task
if
self
.
future
.
done
():
self
.
future
.
result
()
# Wait and receive the message
try
:
async
with
async_timeout
(
timeout
):
return
await
self
.
output_queue
.
get
()
except
asyncio
.
TimeoutError
as
e
:
# See if we have another error to raise inside
if
self
.
future
.
done
():
self
.
future
.
result
()
else
:
self
.
future
.
cancel
()
try
:
await
self
.
future
except
asyncio
.
CancelledError
:
pass
raise
e
async
def
receive_nothing
(
self
,
timeout
=
0.1
,
interval
=
0.01
):
"""
Checks that there is no message to receive in the given time.
"""
# `interval` has precedence over `timeout`
start
=
time
.
monotonic
()
while
time
.
monotonic
()
-
start
<
timeout
:
if
not
self
.
output_queue
.
empty
():
return
False
await
asyncio
.
sleep
(
interval
)
return
self
.
output_queue
.
empty
()
This diff is collapsed.
Click to expand it.
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment