Python queue get

Python queue get DEFAULT

18.5.8. Queues

A queue, useful for coordinating producer and consumer coroutines.

If maxsize is less than or equal to zero, the queue size is infinite. If it is an integer greater than , then will block when the queue reaches maxsize, until an item is removed by .

Unlike the standard library , you can reliably know this Queue’s size with , since your single-threaded asyncio application won’t be interrupted between calling and doing an operation on the Queue.

This class is not thread safe.

()¶

Return if the queue is empty, otherwise.

()¶

Return if there are items in the queue.

Note

If the Queue was initialized with (the default), then is never .

coroutine ()¶

Remove and return an item from the queue. If queue is empty, wait until an item is available.

This method is a coroutine.

()¶

Remove and return an item from the queue.

Return an item if one is immediately available, else raise .

coroutine ()¶

Block until all items in the queue have been gotten and processed.

The count of unfinished tasks goes up whenever an item is added to the queue. The count goes down whenever a consumer thread calls to indicate that the item was retrieved and all work on it is complete. When the count of unfinished tasks drops to zero, unblocks.

This method is a coroutine.

coroutine (item

Put an item into the queue. If the queue is full, wait until a free slot is available before adding item.

This method is a coroutine.

(item

Put an item into the queue without blocking.

If no free slot is immediately available, raise .

()¶

Number of items in the queue.

()¶

Indicate that a formerly enqueued task is complete.

Used by queue consumers. For each used to fetch a task, a subsequent call to tells the queue that the processing on the task is complete.

If a is currently blocking, it will resume when all items have been processed (meaning that a call was received for every item that had been into the queue).

Raises if called more times than there were items placed in the queue.

Number of items allowed in the queue.

Sours: https://docs.python.org/3.5/library/asyncio-queue.html

17.7. — A synchronized queue class¶

Source code:Lib/queue.py


The module implements multi-producer, multi-consumer queues. It is especially useful in threaded programming when information must be exchanged safely between multiple threads. The class in this module implements all the required locking semantics. It depends on the availability of thread support in Python; see the module.

The module implements three types of queue, which differ only in the order in which the entries are retrieved. In a FIFO queue, the first tasks added are the first retrieved. In a LIFO queue, the most recently added entry is the first retrieved (operating like a stack). With a priority queue, the entries are kept sorted (using the module) and the lowest valued entry is retrieved first.

Internally, the module uses locks to temporarily block competing threads; however, it is not designed to handle reentrancy within a thread.

The module defines the following classes and exceptions:

class (maxsize=0

Constructor for a FIFO queue. maxsize is an integer that sets the upperbound limit on the number of items that can be placed in the queue. Insertion will block once this size has been reached, until queue items are consumed. If maxsize is less than or equal to zero, the queue size is infinite.

class (maxsize=0

Constructor for a LIFO queue. maxsize is an integer that sets the upperbound limit on the number of items that can be placed in the queue. Insertion will block once this size has been reached, until queue items are consumed. If maxsize is less than or equal to zero, the queue size is infinite.

class (maxsize=0

Constructor for a priority queue. maxsize is an integer that sets the upperbound limit on the number of items that can be placed in the queue. Insertion will block once this size has been reached, until queue items are consumed. If maxsize is less than or equal to zero, the queue size is infinite.

The lowest valued entries are retrieved first (the lowest valued entry is the one returned by ). A typical pattern for entries is a tuple in the form: .

exception

Exception raised when non-blocking (or ) is called on a object which is empty.

exception

Exception raised when non-blocking (or ) is called on a object which is full.

17.7.1. Queue Objects¶

Queue objects (, , or ) provide the public methods described below.

()¶

Return the approximate size of the queue. Note, qsize() > 0 doesn’t guarantee that a subsequent get() will not block, nor will qsize() < maxsize guarantee that put() will not block.

()¶

Return if the queue is empty, otherwise. If empty() returns it doesn’t guarantee that a subsequent call to put() will not block. Similarly, if empty() returns it doesn’t guarantee that a subsequent call to get() will not block.

()¶

Return if the queue is full, otherwise. If full() returns it doesn’t guarantee that a subsequent call to get() will not block. Similarly, if full() returns it doesn’t guarantee that a subsequent call to put() will not block.

(item, block=True, timeout=None

Put item into the queue. If optional args block is true and timeout is (the default), block if necessary until a free slot is available. If timeout is a positive number, it blocks at most timeout seconds and raises the exception if no free slot was available within that time. Otherwise (block is false), put an item on the queue if a free slot is immediately available, else raise the exception (timeout is ignored in that case).

(item

Equivalent to .

(block=True, timeout=None

Remove and return an item from the queue. If optional args block is true and timeout is (the default), block if necessary until an item is available. If timeout is a positive number, it blocks at most timeout seconds and raises the exception if no item was available within that time. Otherwise (block is false), return an item if one is immediately available, else raise the exception (timeout is ignored in that case).

()¶

Equivalent to .

Two methods are offered to support tracking whether enqueued tasks have been fully processed by daemon consumer threads.

()¶

Indicate that a formerly enqueued task is complete. Used by queue consumer threads. For each used to fetch a task, a subsequent call to tells the queue that the processing on the task is complete.

If a is currently blocking, it will resume when all items have been processed (meaning that a call was received for every item that had been into the queue).

Raises a if called more times than there were items placed in the queue.

()¶

Blocks until all items in the queue have been gotten and processed.

The count of unfinished tasks goes up whenever an item is added to the queue. The count goes down whenever a consumer thread calls to indicate that the item was retrieved and all work on it is complete. When the count of unfinished tasks drops to zero, unblocks.

Example of how to wait for enqueued tasks to be completed:

defworker():whileTrue:item=q.get()ifitemisNone:breakdo_work(item)q.task_done()q=queue.Queue()threads=[]foriinrange(num_worker_threads):t=threading.Thread(target=worker)t.start()threads.append(t)foriteminsource():q.put(item)# block until all tasks are doneq.join()# stop workersforiinrange(num_worker_threads):q.put(None)fortinthreads:t.join()
Sours: https://python.readthedocs.io/en/latest/library/queue.html
  1. Mercedes convertible 1995
  2. Terraria santa npc
  3. Post office 85308
  4. 8f freemason
  5. Running man eng sub viu

multiprocessing — Process-based parallelism

Reference¶

The package mostly replicates the API of the module.

and exceptions¶

class (group=None, target=None, name=None, args=(), kwargs={}, *, daemon=None

Process objects represent activity that is run in a separate process. The class has equivalents of all the methods of .

The constructor should always be called with keyword arguments. group should always be ; it exists solely for compatibility with . target is the callable object to be invoked by the method. It defaults to , meaning nothing is called. name is the process name (see for more details). args is the argument tuple for the target invocation. kwargs is a dictionary of keyword arguments for the target invocation. If provided, the keyword-only daemon argument sets the process flag to or . If (the default), this flag will be inherited from the creating process.

By default, no arguments are passed to target.

If a subclass overrides the constructor, it must make sure it invokes the base class constructor () before doing anything else to the process.

Changed in version 3.3: Added the daemon argument.

()¶

Method representing the process’s activity.

You may override this method in a subclass. The standard method invokes the callable object passed to the object’s constructor as the target argument, if any, with sequential and keyword arguments taken from the args and kwargs arguments, respectively.

()¶

Start the process’s activity.

This must be called at most once per process object. It arranges for the object’s method to be invoked in a separate process.

([timeout])¶

If the optional argument timeout is (the default), the method blocks until the process whose method is called terminates. If timeout is a positive number, it blocks at most timeout seconds. Note that the method returns if its process terminates or if the method times out. Check the process’s to determine if it terminated.

A process can be joined many times.

A process cannot join itself because this would cause a deadlock. It is an error to attempt to join a process before it has been started.

The process’s name. The name is a string used for identification purposes only. It has no semantics. Multiple processes may be given the same name.

The initial name is set by the constructor. If no explicit name is provided to the constructor, a name of the form ‘Process-N1:N2:…:Nk’ is constructed, where each Nk is the N-th child of its parent.

()¶

Return whether the process is alive.

Roughly, a process object is alive from the moment the method returns until the child process terminates.

The process’s daemon flag, a Boolean value. This must be set before is called.

The initial value is inherited from the creating process.

When a process exits, it attempts to terminate all of its daemonic child processes.

Note that a daemonic process is not allowed to create child processes. Otherwise a daemonic process would leave its children orphaned if it gets terminated when its parent process exits. Additionally, these are not Unix daemons or services, they are normal processes that will be terminated (and not joined) if non-daemonic processes have exited.

In addition to the API, objects also support the following attributes and methods:

Return the process ID. Before the process is spawned, this will be .

The child’s exit code. This will be if the process has not yet terminated. A negative value -N indicates that the child was terminated by signal N.

The process’s authentication key (a byte string).

When is initialized the main process is assigned a random string using .

When a object is created, it will inherit the authentication key of its parent process, although this may be changed by setting to another byte string.

See Authentication keys.

A numeric handle of a system object which will become “ready” when the process ends.

You can use this value if you want to wait on several events at once using . Otherwise calling is simpler.

On Windows, this is an OS handle usable with the and family of API calls. On Unix, this is a file descriptor usable with primitives from the module.

()¶

Terminate the process. On Unix this is done using the signal; on Windows is used. Note that exit handlers and finally clauses, etc., will not be executed.

Note that descendant processes of the process will not be terminated – they will simply become orphaned.

Warning

If this method is used when the associated process is using a pipe or queue then the pipe or queue is liable to become corrupted and may become unusable by other process. Similarly, if the process has acquired a lock or semaphore etc. then terminating it is liable to cause other processes to deadlock.

()¶

Same as but using the signal on Unix.

()¶

Close the object, releasing all resources associated with it. is raised if the underlying process is still running. Once returns successfully, most other methods and attributes of the object will raise .

Note that the , , , and methods should only be called by the process that created the process object.

Example usage of some of the methods of :

>>> import multiprocessing, time, signal >>> p = multiprocessing.Process(target=time.sleep, args=(1000,)) >>> print(p, p.is_alive()) <Process ... initial> False >>> p.start() >>> print(p, p.is_alive()) <Process ... started> True >>> p.terminate() >>> time.sleep(0.1) >>> print(p, p.is_alive()) <Process ... stopped exitcode=-SIGTERM> False >>> p.exitcode == -signal.SIGTERM True
exception

The base class of all exceptions.

exception

Exception raised by when the supplied buffer object is too small for the message read.

If is an instance of then will give the message as a byte string.

exception

Raised when there is an authentication error.

exception

Raised by methods with a timeout when the timeout expires.

Pipes and Queues¶

When using multiple processes, one generally uses message passing for communication between processes and avoids having to use any synchronization primitives like locks.

For passing messages one can use (for a connection between two processes) or a queue (which allows multiple producers and consumers).

The , and types are multi-producer, multi-consumer FIFO queues modelled on the class in the standard library. They differ in that lacks the and methods introduced into Python 2.5’s class.

If you use then you must call for each task removed from the queue or else the semaphore used to count the number of unfinished tasks may eventually overflow, raising an exception.

Note that one can also create a shared queue by using a manager object – see Managers.

Note

When an object is put on a queue, the object is pickled and a background thread later flushes the pickled data to an underlying pipe. This has some consequences which are a little surprising, but should not cause any practical difficulties – if they really bother you then you can instead use a queue created with a manager.

  1. After putting an object on an empty queue there may be an infinitesimal delay before the queue’s method returns and can return without raising .

  2. If multiple processes are enqueuing objects, it is possible for the objects to be received at the other end out-of-order. However, objects enqueued by the same process will always be in the expected order with respect to each other.

Warning

If a process is killed using or while it is trying to use a , then the data in the queue is likely to become corrupted. This may cause any other process to get an exception when it tries to use the queue later on.

Warning

As mentioned above, if a child process has put items on a queue (and it has not used ), then that process will not terminate until all buffered items have been flushed to the pipe.

This means that if you try joining that process you may get a deadlock unless you are sure that all items which have been put on the queue have been consumed. Similarly, if the child process is non-daemonic then the parent process may hang on exit when it tries to join all its non-daemonic children.

Note that a queue created using a manager does not have this issue. See Programming guidelines.

For an example of the usage of queues for interprocess communication see Examples.

([duplex])¶

Returns a pair of objects representing the ends of a pipe.

If duplex is (the default) then the pipe is bidirectional. If duplex is then the pipe is unidirectional: can only be used for receiving messages and can only be used for sending messages.

class ([maxsize])¶

Returns a process shared queue implemented using a pipe and a few locks/semaphores. When a process first puts an item on the queue a feeder thread is started which transfers objects from a buffer into the pipe.

The usual and exceptions from the standard library’s module are raised to signal timeouts.

implements all the methods of except for and .

()¶

Return the approximate size of the queue. Because of multithreading/multiprocessing semantics, this number is not reliable.

Note that this may raise on Unix platforms like macOS where is not implemented.

()¶

Return if the queue is empty, otherwise. Because of multithreading/multiprocessing semantics, this is not reliable.

()¶

Return if the queue is full, otherwise. Because of multithreading/multiprocessing semantics, this is not reliable.

(obj[, block[, timeout]])¶

Put obj into the queue. If the optional argument block is (the default) and timeout is (the default), block if necessary until a free slot is available. If timeout is a positive number, it blocks at most timeout seconds and raises the exception if no free slot was available within that time. Otherwise (block is ), put an item on the queue if a free slot is immediately available, else raise the exception (timeout is ignored in that case).

(obj

Equivalent to .

([block[, timeout]])¶

Remove and return an item from the queue. If optional args block is (the default) and timeout is (the default), block if necessary until an item is available. If timeout is a positive number, it blocks at most timeout seconds and raises the exception if no item was available within that time. Otherwise (block is ), return an item if one is immediately available, else raise the exception (timeout is ignored in that case).

Changed in version 3.8: If the queue is closed, is raised instead of .

()¶

Equivalent to .

has a few additional methods not found in . These methods are usually unnecessary for most code:

()¶

Indicate that no more data will be put on this queue by the current process. The background thread will quit once it has flushed all buffered data to the pipe. This is called automatically when the queue is garbage collected.

()¶

Join the background thread. This can only be used after has been called. It blocks until the background thread exits, ensuring that all data in the buffer has been flushed to the pipe.

By default if a process is not the creator of the queue then on exit it will attempt to join the queue’s background thread. The process can call to make do nothing.

()¶

Prevent from blocking. In particular, this prevents the background thread from being joined automatically when the process exits – see .

A better name for this method might be . It is likely to cause enqueued data to be lost, and you almost certainly will not need to use it. It is really only there if you need the current process to exit immediately without waiting to flush enqueued data to the underlying pipe, and you don’t care about lost data.

Note

This class’s functionality requires a functioning shared semaphore implementation on the host operating system. Without one, the functionality in this class will be disabled, and attempts to instantiate a will result in an . See bpo-3770 for additional information. The same holds true for any of the specialized queue types listed below.

class

It is a simplified type, very close to a locked .

()¶

Close the queue: release internal resources.

A queue must not be used anymore after it is closed. For example, , and methods must no longer be called.

()¶

Return if the queue is empty, otherwise.

()¶

Remove and return an item from the queue.

(item

Put item into the queue.

class ([maxsize])¶

, a subclass, is a queue which additionally has and methods.

()¶

Indicate that a formerly enqueued task is complete. Used by queue consumers. For each used to fetch a task, a subsequent call to tells the queue that the processing on the task is complete.

If a is currently blocking, it will resume when all items have been processed (meaning that a call was received for every item that had been into the queue).

Raises a if called more times than there were items placed in the queue.

()¶

Block until all items in the queue have been gotten and processed.

The count of unfinished tasks goes up whenever an item is added to the queue. The count goes down whenever a consumer calls to indicate that the item was retrieved and all work on it is complete. When the count of unfinished tasks drops to zero, unblocks.

Miscellaneous¶

()¶

Return list of all live children of the current process.

Calling this has the side effect of “joining” any processes which have already finished.

()¶

Return the number of CPUs in the system.

This number is not equivalent to the number of CPUs the current process can use. The number of usable CPUs can be obtained with

When the number of CPUs cannot be determined a is raised.

()¶

Return the object corresponding to the current process.

An analogue of .

()¶

Return the object corresponding to the parent process of the . For the main process, will be .

()¶

Add support for when a program which uses has been frozen to produce a Windows executable. (Has been tested with py2exe, PyInstaller and cx_Freeze.)

One needs to call this function straight after the line of the main module. For example:

frommultiprocessingimportProcess,freeze_supportdeff():print('hello world!')if__name__=='__main__':freeze_support()Process(target=f).start()

If the line is omitted then trying to run the frozen executable will raise .

Calling has no effect when invoked on any operating system other than Windows. In addition, if the module is being run normally by the Python interpreter on Windows (the program has not been frozen), then has no effect.

()¶

Returns a list of the supported start methods, the first of which is the default. The possible start methods are , and . On Windows only is available. On Unix and are always supported, with being the default.

(method=None

Return a context object which has the same attributes as the module.

If method is then the default context is returned. Otherwise method should be , , . is raised if the specified start method is not available.

(allow_none=False

Return the name of start method used for starting processes.

If the start method has not been fixed and allow_none is false, then the start method is fixed to the default and the name is returned. If the start method has not been fixed and allow_none is true then is returned.

The return value can be , , or . is the default on Unix, while is the default on Windows and macOS.

Changed in version 3.8: On macOS, the spawn start method is now the default. The fork start method should be considered unsafe as it can lead to crashes of the subprocess. See bpo-33725.

()¶

Sets the path of the Python interpreter to use when starting a child process. (By default is used). Embedders will probably need to do some thing like

set_executable(os.path.join(sys.exec_prefix,'pythonw.exe'))

before they can create child processes.

Changed in version 3.4: Now supported on Unix when the start method is used.

(method

Set the method which should be used to start child processes. method can be , or .

Note that this should be called at most once, and it should be protected inside the clause of the main module.

Connection Objects¶

Connection objects allow the sending and receiving of picklable objects or strings. They can be thought of as message oriented connected sockets.

Connection objects are usually created using – see also Listeners and Clients.

class
(obj

Send an object to the other end of the connection which should be read using .

The object must be picklable. Very large pickles (approximately 32 MiB+, though it depends on the OS) may raise a exception.

()¶

Return an object sent from the other end of the connection using . Blocks until there is something to receive. Raises if there is nothing left to receive and the other end was closed.

()¶

Return the file descriptor or handle used by the connection.

()¶

Close the connection.

This is called automatically when the connection is garbage collected.

([timeout])¶

Return whether there is any data available to be read.

If timeout is not specified then it will return immediately. If timeout is a number then this specifies the maximum time in seconds to block. If timeout is then an infinite timeout is used.

Note that multiple connection objects may be polled at once by using .

(buffer[, offset[, size]])¶

Send byte data from a bytes-like object as a complete message.

If offset is given then data is read from that position in buffer. If size is given then that many bytes will be read from buffer. Very large buffers (approximately 32 MiB+, though it depends on the OS) may raise a exception

([maxlength])¶

Return a complete message of byte data sent from the other end of the connection as a string. Blocks until there is something to receive. Raises if there is nothing left to receive and the other end has closed.

If maxlength is specified and the message is longer than maxlength then is raised and the connection will no longer be readable.

Changed in version 3.3: This function used to raise , which is now an alias of .

(buffer[, offset])¶

Read into buffer a complete message of byte data sent from the other end of the connection and return the number of bytes in the message. Blocks until there is something to receive. Raises if there is nothing left to receive and the other end was closed.

buffer must be a writable bytes-like object. If offset is given then the message will be written into the buffer from that position. Offset must be a non-negative integer less than the length of buffer (in bytes).

If the buffer is too short then a exception is raised and the complete message is available as where is the exception instance.

For example:

>>> frommultiprocessingimportPipe>>> a,b=Pipe()>>> a.send([1,'hello',None])>>> b.recv()[1, 'hello', None]>>> b.send_bytes(b'thank you')>>> a.recv_bytes()b'thank you'>>> importarray>>> arr1=array.array('i',range(5))>>> arr2=array.array('i',[0]*10)>>> a.send_bytes(arr1)>>> count=b.recv_bytes_into(arr2)>>> assertcount==len(arr1)*arr1.itemsize>>> arr2array('i', [0, 1, 2, 3, 4, 0, 0, 0, 0, 0])

Warning

The method automatically unpickles the data it receives, which can be a security risk unless you can trust the process which sent the message.

Therefore, unless the connection object was produced using you should only use the and methods after performing some sort of authentication. See Authentication keys.

Warning

If a process is killed while it is trying to read or write to a pipe then the data in the pipe is likely to become corrupted, because it may become impossible to be sure where the message boundaries lie.

Synchronization primitives¶

Generally synchronization primitives are not as necessary in a multiprocess program as they are in a multithreaded program. See the documentation for module.

Note that one can also create synchronization primitives by using a manager object – see Managers.

class (parties[, action[, timeout]])¶

A barrier object: a clone of .

class ([value])¶

A bounded semaphore object: a close analog of .

A solitary difference from its close analog exists: its method’s first argument is named block, as is consistent with .

Note

On macOS, this is indistinguishable from because is not implemented on that platform.

class ([lock])¶

A condition variable: an alias for .

If lock is specified then it should be a or object from .

Changed in version 3.3: The method was added.

class

A clone of .

class

A non-recursive lock object: a close analog of . Once a process or thread has acquired a lock, subsequent attempts to acquire it from any process or thread will block until it is released; any process or thread may release it. The concepts and behaviors of as it applies to threads are replicated here in as it applies to either processes or threads, except as noted.

Note that is actually a factory function which returns an instance of initialized with a default context.

supports the context manager protocol and thus may be used in statements.

(block=True, timeout=None

Acquire a lock, blocking or non-blocking.

With the block argument set to (the default), the method call will block until the lock is in an unlocked state, then set it to locked and return . Note that the name of this first argument differs from that in .

With the block argument set to , the method call does not block. If the lock is currently in a locked state, return ; otherwise set the lock to a locked state and return .

When invoked with a positive, floating-point value for timeout, block for at most the number of seconds specified by timeout as long as the lock can not be acquired. Invocations with a negative value for timeout are equivalent to a timeout of zero. Invocations with a timeout value of (the default) set the timeout period to infinite. Note that the treatment of negative or values for timeout differs from the implemented behavior in . The timeout argument has no practical implications if the block argument is set to and is thus ignored. Returns if the lock has been acquired or if the timeout period has elapsed.

()¶

Release a lock. This can be called from any process or thread, not only the process or thread which originally acquired the lock.

Behavior is the same as in except that when invoked on an unlocked lock, a is raised.

class

A recursive lock object: a close analog of . A recursive lock must be released by the process or thread that acquired it. Once a process or thread has acquired a recursive lock, the same process or thread may acquire it again without blocking; that process or thread must release it once for each time it has been acquired.

Note that is actually a factory function which returns an instance of initialized with a default context.

supports the context manager protocol and thus may be used in statements.

(block=True, timeout=None

Acquire a lock, blocking or non-blocking.

When invoked with the block argument set to , block until the lock is in an unlocked state (not owned by any process or thread) unless the lock is already owned by the current process or thread. The current process or thread then takes ownership of the lock (if it does not already have ownership) and the recursion level inside the lock increments by one, resulting in a return value of . Note that there are several differences in this first argument’s behavior compared to the implementation of , starting with the name of the argument itself.

When invoked with the block argument set to , do not block. If the lock has already been acquired (and thus is owned) by another process or thread, the current process or thread does not take ownership and the recursion level within the lock is not changed, resulting in a return value of . If the lock is in an unlocked state, the current process or thread takes ownership and the recursion level is incremented, resulting in a return value of .

Use and behaviors of the timeout argument are the same as in . Note that some of these behaviors of timeout differ from the implemented behaviors in .

()¶

Release a lock, decrementing the recursion level. If after the decrement the recursion level is zero, reset the lock to unlocked (not owned by any process or thread) and if any other processes or threads are blocked waiting for the lock to become unlocked, allow exactly one of them to proceed. If after the decrement the recursion level is still nonzero, the lock remains locked and owned by the calling process or thread.

Only call this method when the calling process or thread owns the lock. An is raised if this method is called by a process or thread other than the owner or if the lock is in an unlocked (unowned) state. Note that the type of exception raised in this situation differs from the implemented behavior in .

class ([value])¶

A semaphore object: a close analog of .

A solitary difference from its close analog exists: its method’s first argument is named block, as is consistent with .

Note

On macOS, is unsupported, so calling with a timeout will emulate that function’s behavior using a sleeping loop.

Note

If the SIGINT signal generated by arrives while the main thread is blocked by a call to , , , , or then the call will be immediately interrupted and will be raised.

This differs from the behaviour of where SIGINT will be ignored while the equivalent blocking calls are in progress.

Note

Some of this package’s functionality requires a functioning shared semaphore implementation on the host operating system. Without one, the module will be disabled, and attempts to import it will result in an . See bpo-3770 for additional information.

Shared Objects¶

It is possible to create shared objects using shared memory which can be inherited by child processes.

(typecode_or_type, *args, lock=True

Return a object allocated from shared memory. By default the return value is actually a synchronized wrapper for the object. The object itself can be accessed via the value attribute of a .

typecode_or_type determines the type of the returned object: it is either a ctypes type or a one character typecode of the kind used by the module. *args is passed on to the constructor for the type.

If lock is (the default) then a new recursive lock object is created to synchronize access to the value. If lock is a or object then that will be used to synchronize access to the value. If lock is then access to the returned object will not be automatically protected by a lock, so it will not necessarily be “process-safe”.

Operations like which involve a read and write are not atomic. So if, for instance, you want to atomically increment a shared value it is insufficient to just do

Assuming the associated lock is recursive (which it is by default) you can instead do

withcounter.get_lock():counter.value+=1

Note that lock is a keyword-only argument.

(typecode_or_type, size_or_initializer, *, lock=True

Return a ctypes array allocated from shared memory. By default the return value is actually a synchronized wrapper for the array.

typecode_or_type determines the type of the elements of the returned array: it is either a ctypes type or a one character typecode of the kind used by the module. If size_or_initializer is an integer, then it determines the length of the array, and the array will be initially zeroed. Otherwise, size_or_initializer is a sequence which is used to initialize the array and whose length determines the length of the array.

If lock is (the default) then a new lock object is created to synchronize access to the value. If lock is a or object then that will be used to synchronize access to the value. If lock is then access to the returned object will not be automatically protected by a lock, so it will not necessarily be “process-safe”.

Note that lock is a keyword only argument.

Note that an array of has value and raw attributes which allow one to use it to store and retrieve strings.

The module¶

The module provides functions for allocating objects from shared memory which can be inherited by child processes.

Note

Although it is possible to store a pointer in shared memory remember that this will refer to a location in the address space of a specific process. However, the pointer is quite likely to be invalid in the context of a second process and trying to dereference the pointer from the second process may cause a crash.

(typecode_or_type, size_or_initializer

Return a ctypes array allocated from shared memory.

typecode_or_type determines the type of the elements of the returned array: it is either a ctypes type or a one character typecode of the kind used by the module. If size_or_initializer is an integer then it determines the length of the array, and the array will be initially zeroed. Otherwise size_or_initializer is a sequence which is used to initialize the array and whose length determines the length of the array.

Note that setting and getting an element is potentially non-atomic – use instead to make sure that access is automatically synchronized using a lock.

(typecode_or_type, *args

Return a ctypes object allocated from shared memory.

typecode_or_type determines the type of the returned object: it is either a ctypes type or a one character typecode of the kind used by the module. *args is passed on to the constructor for the type.

Note that setting and getting the value is potentially non-atomic – use instead to make sure that access is automatically synchronized using a lock.

Note that an array of has and attributes which allow one to use it to store and retrieve strings – see documentation for .

(typecode_or_type, size_or_initializer, *, lock=True

The same as except that depending on the value of lock a process-safe synchronization wrapper may be returned instead of a raw ctypes array.

If lock is (the default) then a new lock object is created to synchronize access to the value. If lock is a or object then that will be used to synchronize access to the value. If lock is then access to the returned object will not be automatically protected by a lock, so it will not necessarily be “process-safe”.

Note that lock is a keyword-only argument.

(typecode_or_type, *args, lock=True

The same as except that depending on the value of lock a process-safe synchronization wrapper may be returned instead of a raw ctypes object.

If lock is (the default) then a new lock object is created to synchronize access to the value. If lock is a or object then that will be used to synchronize access to the value. If lock is then access to the returned object will not be automatically protected by a lock, so it will not necessarily be “process-safe”.

Note that lock is a keyword-only argument.

(obj

Return a ctypes object allocated from shared memory which is a copy of the ctypes object obj.

(obj[, lock])¶

Return a process-safe wrapper object for a ctypes object which uses lock to synchronize access. If lock is (the default) then a object is created automatically.

A synchronized wrapper will have two methods in addition to those of the object it wraps: returns the wrapped object and returns the lock object used for synchronization.

Note that accessing the ctypes object through the wrapper can be a lot slower than accessing the raw ctypes object.

Changed in version 3.5: Synchronized objects support the context manager protocol.

The table below compares the syntax for creating shared ctypes objects from shared memory with the normal ctypes syntax. (In the table is some subclass of .)

ctypes

sharedctypes using type

sharedctypes using typecode

c_double(2.4)

RawValue(c_double, 2.4)

RawValue(‘d’, 2.4)

MyStruct(4, 6)

RawValue(MyStruct, 4, 6)

(c_short * 7)()

RawArray(c_short, 7)

RawArray(‘h’, 7)

(c_int * 3)(9, 2, 8)

RawArray(c_int, (9, 2, 8))

RawArray(‘i’, (9, 2, 8))

Below is an example where a number of ctypes objects are modified by a child process:

frommultiprocessingimportProcess,Lockfrommultiprocessing.sharedctypesimportValue,ArrayfromctypesimportStructure,c_doubleclassPoint(Structure):_fields_=[('x',c_double),('y',c_double)]defmodify(n,x,s,A):n.value**=2x.value**=2s.value=s.value.upper()forainA:a.x**=2a.y**=2if__name__=='__main__':lock=Lock()n=Value('i',7)x=Value(c_double,1.0/3.0,lock=False)s=Array('c',b'hello world',lock=lock)A=Array(Point,[(1.875,-6.25),(-5.75,2.0),(2.375,9.5)],lock=lock)p=Process(target=modify,args=(n,x,s,A))p.start()p.join()print(n.value)print(x.value)print(s.value)print([(a.x,a.y)forainA])

The results printed are

49 0.1111111111111111 HELLO WORLD [(3.515625, 39.0625), (33.0625, 4.0), (5.640625, 90.25)]

Managers¶

Managers provide a way to create data which can be shared between different processes, including sharing over a network between processes running on different machines. A manager object controls a server process which manages shared objects. Other processes can access the shared objects by using proxies.

()¶

Returns a started object which can be used for sharing objects between processes. The returned manager object corresponds to a spawned child process and has methods which will create shared objects and return corresponding proxies.

Manager processes will be shutdown as soon as they are garbage collected or their parent process exits. The manager classes are defined in the module:

class ([address[, authkey]])¶

Create a BaseManager object.

Once created one should call or to ensure that the manager object refers to a started manager process.

address is the address on which the manager process listens for new connections. If address is then an arbitrary one is chosen.

authkey is the authentication key which will be used to check the validity of incoming connections to the server process. If authkey is then is used. Otherwise authkey is used and it must be a byte string.

([initializer[, initargs]])¶

Start a subprocess to start the manager. If initializer is not then the subprocess will call when it starts.

()¶

Returns a object which represents the actual server under the control of the Manager. The object supports the method:

>>> frommultiprocessing.managersimportBaseManager>>> manager=BaseManager(address=('',50000),authkey=b'abc')>>> server=manager.get_server()>>> server.serve_forever()

additionally has an attribute.

()¶

Connect a local manager object to a remote manager process:

>>> frommultiprocessing.managersimportBaseManager>>> m=BaseManager(address=('127.0.0.1',50000),authkey=b'abc')>>> m.connect()
()¶

Stop the process used by the manager. This is only available if has been used to start the server process.

This can be called multiple times.

(typeid[, callable[, proxytype[, exposed[, method_to_typeid[, create_method]]]]])
Sours: https://docs.python.org/3/library/multiprocessing.html
Threading in Python - Advanced Python 16 - Programming Tutorial

Python Queue.get() Examples

The following are 25 code examples for showing how to use Queue.get(). These examples are extracted from open source projects. You can vote up the ones you like or vote down the ones you don't like, and go to the original project or source file by following the links above each example.

You may check out the related API usage on the sidebar.

You may also want to check out all available functions/classes of the module Queue, or try the search function .

Example 1

def make_web(queue): app = Flask(__name__) @app.route('/') def index(): return render_template('index.html') def gen(): while True: frame = queue.get() _, frame = cv2.imencode('.JPEG', frame) yield (b'--frame\r\n' b'Content-Type: image/jpeg\r\n\r\n' + frame.tostring() + b'\r\n') @app.route('/video_feed') def video_feed(): return Response(gen(), mimetype='multipart/x-mixed-replace; boundary=frame') try: app.run(host='0.0.0.0', port=8889) except: print('unable to open port')

Example 2

def make_web(queue): app = Flask(__name__) @app.route('/') def index(): return render_template('index.html') def gen(): while True: frame = queue.get() _, frame = cv2.imencode('.JPEG', frame) yield (b'--frame\r\n' b'Content-Type: image/jpeg\r\n\r\n' + frame.tostring() + b'\r\n') @app.route('/video_feed') def video_feed(): return Response(gen(), mimetype='multipart/x-mixed-replace; boundary=frame') try: app.run(host='0.0.0.0', port=8889) except: print('unable to open port')

Example 3

def get(self, poll_interval=5): while True: try: # Using Queue.get() with a timeout is really expensive - Python uses # busy waiting that wakes up the process every 50ms - so we switch # to a more efficient polling method if there is no activity for # <fast_poll_time> seconds. if time.time() - self.last_item_time < self.fast_poll_time: message = Queue.Queue.get(self, block=True, timeout=poll_interval) else: time.sleep(poll_interval) message = Queue.Queue.get(self, block=False) break except Queue.Empty: self.callback() self.last_item_time = time.time() return message

Example 4

def worker(queue, user, size, outdir, total): while True: try: photo = queue.get(False) except Queue.Empty: break media_url = photo[1] urllib3_download(media_url, size, outdir) with lock: global downloaded downloaded += 1 d = { 'media_url': os.path.basename(media_url), 'user': user, 'index': downloaded + 1 if downloaded < total else total, 'total': total, } progress = PROGRESS_FORMATTER % d sys.stdout.write('\r%s' % progress) sys.stdout.flush()

Example 5

def _worker_manager_loop(in_queue, out_queue, done_event, pin_memory, device_id): if pin_memory: torch.cuda.set_device(device_id) while True: try: r = in_queue.get() except Exception: if done_event.is_set(): return raise if r is None: break if isinstance(r[1], ExceptionWrapper): out_queue.put(r) continue idx, batch = r try: if pin_memory: batch = pin_memory_batch(batch) except Exception: out_queue.put((idx, ExceptionWrapper(sys.exc_info()))) else: out_queue.put((idx, batch))

Example 6

def _set_SIGCHLD_handler(): # Windows doesn't support SIGCHLD handler if sys.platform == 'win32': return # can't set signal in child threads if not isinstance(threading.current_thread(), threading._MainThread): return global _SIGCHLD_handler_set if _SIGCHLD_handler_set: return previous_handler = signal.getsignal(signal.SIGCHLD) if not callable(previous_handler): previous_handler = None def handler(signum, frame): # This following call uses `waitid` with WNOHANG from C side. Therefore, # Python can still get and update the process status successfully. _error_if_any_worker_fails() if previous_handler is not None: previous_handler(signum, frame) signal.signal(signal.SIGCHLD, handler) _SIGCHLD_handler_set = True

Example 7

def make_web(queue): app = Flask(__name__) @app.route('/') def index(): return render_template('index.html') def gen(): while True: frame = queue.get() _, frame = cv2.imencode('.JPEG', frame) yield (b'--frame\r\n' b'Content-Type: image/jpeg\r\n\r\n' + frame.tostring() + b'\r\n') @app.route('/video_feed') def video_feed(): return Response(gen(), mimetype='multipart/x-mixed-replace; boundary=frame') try: app.run(host='0.0.0.0', port=8889) except: print('unable to open port')

Example 8

def act(self, action): if self.nthreads > 1: new = self.pool.map(env_step, zip(self.env, action)) else: new = [env.step(act) for env, act in zip(self.env, action)] reward = np.asarray([i[1] for i in new], dtype=np.float32) done = np.asarray([i[2] for i in new], dtype=np.float32) channels = self.state_.shape[1]//self.input_length state = np.zeros_like(self.state_) state[:,:-channels,:,:] = self.state_[:,channels:,:,:] for i, (ob, env) in enumerate(zip(new, self.env)): if ob[2]: state[i,-channels:,:,:] = env.reset().transpose((2,0,1)) else: state[i,-channels:,:,:] = ob[0].transpose((2,0,1)) self.state_ = state if self.web_viz: try: while self.queue.qsize() > 10: self.queue.get(False) except queue.Empty: pass frame = self.visual() self.queue.put(frame) return reward, done

Example 9

def get_msg(self, block=True, timeout=None): """ Gets a message if there is one that is ready. """ if timeout is None: # Queue.get(timeout=None) has stupid uninteruptible # behavior, so wait for a week instead timeout = 604800 return self._in_queue.get(block, timeout)

Example 10

def act(self, action): if self.nthreads > 1: new = self.pool.map(env_step, zip(self.env, action)) else: new = [env.step(act) for env, act in zip(self.env, action)] reward = np.asarray([i[1] for i in new], dtype=np.float32) done = np.asarray([i[2] for i in new], dtype=np.float32) channels = self.state_.shape[1]//self.input_length state = np.zeros_like(self.state_) state[:,:-channels,:,:] = self.state_[:,channels:,:,:] for i, (ob, env) in enumerate(zip(new, self.env)): if ob[2]: state[i,-channels:,:,:] = env.reset().transpose((2,0,1)) else: state[i,-channels:,:,:] = ob[0].transpose((2,0,1)) self.state_ = state if self.web_viz: try: while self.queue.qsize() > 10: self.queue.get(False) except queue.Empty: pass frame = self.visual() self.queue.put(frame) return reward, done

Example 11

def download_worker(): while True: url = queue.get() download_file(url, SAVE_DIR) queue.task_done() # Returns the path of the specified page number

Example 12

def worker(sess,model_options,model_vars,Queue,CLASS_DICT): while True: # print 'Queue Size', Queue.qsize() try: fname = Queue.get() except: return start = time.time() file_name_orig = fname.split(' ')[0].split('/')[1].strip() file_name = file_name_orig.replace('.avi','.npz') class_name = fname.split(' ')[0].split('/')[0].strip().lower() class_idx = CLASS_DICT[class_name] try: frames = np.load(model_options['data_dir']+file_name)['arr_0'] except: print "Couldn't Open: ",model_options['data_dir']+file_name Queue.task_done() continue idx = 0 if model_options['mode'] == 'train': idx = random.randint(0,frames.shape[0]-1) frames = frames[idx] tmpImg,tmpLab,num_crops = getCrops(sess,model_options,model_vars,frames,np.array((class_idx))) if model_options['mode'] == 'train': for j in range(num_crops): size = model_options['example_size'] sess.run(model_vars['enqueue_op'],feed_dict={model_vars['images']:tmpImg[j*size:(j+1)*size], model_vars['labels']:tmpLab[j:(j+1)]}) else: sess.run(model_vars['enqueue_op'],feed_dict={model_vars['images']:tmpImg, model_vars['labels']:tmpLab, model_vars['names']:[[file_name_orig]]*num_crops}) Queue.task_done()

Example 13

def get(self, **kwargs): """Get an item from the queue. Kwargs are ignored (often used in standard library queue.get calls)""" msg = self.queue.get(acknowledge=False) if msg is None: raise Empty return pickle.loads(msg.body)

Example 14

def multiprocess_configuration(n_cpus, pax_id, base_config_kwargs, processing_queue_kwargs, output_queue_kwargs): """Yields configuration override dicts for multiprocessing""" # Config overrides for child processes common_override = dict(pax=dict(autorun=True, show_progress_bar=False), DEFAULT=dict(pax_id=pax_id)) input_override = dict(pax=dict(plugin_group_names=['input', 'output'], encoder_plugin=None, decoder_plugin=None, output='Queues.PushToQueue'), Queues=dict(**processing_queue_kwargs)) worker_override = {'pax': dict(input='Queues.PullFromQueue', output='Queues.PushToQueue', event_numbers_file=None, events_to_process=None), # PullFromQueue can't have a timeout in the workers, see #444 'Queues.PullFromQueue': dict(timeout_after_sec=float('inf'), **processing_queue_kwargs), 'Queues.PushToQueue': dict(preserve_ids=True, many_to_one=True, **output_queue_kwargs)} output_override = dict(pax=dict(plugin_group_names=['input', 'output'], encoder_plugin=None, decoder_plugin=None, event_numbers_file=None, events_to_process=None, input='Queues.PullFromQueue'), Queues=dict(ordered_pull=True, **output_queue_kwargs)) overrides = [('input', input_override)] + [('worker', worker_override)] * n_cpus + [('output', output_override)] for worker_type, worker_overide in overrides: new_conf = deepcopy(base_config_kwargs) new_conf['config_dict'] = combine_configs(new_conf.get('config_dict'), common_override, worker_overide) yield worker_type, new_conf

Example 15

def check_local_processes_while_remote_processing(running_paxes, crash_fanout, terminate_host_on_crash=False): """Check on locally running paxes in running_paxes, returns list of remaining running pax processes. - Remove any paxes that have exited normally - If a pax has crashed, push a message to the crash fanout to terminate all paxes with the same id - Look for crash fanout messages from other processes, and terminate local paxes with the same id - terminate_host_on_crash: if True, raise exception in the host process if a pax crash is detected in a pax chain we're participating in. Do NOT use in a host process that can host multiple pax chains! We will not check the presence of other pax chains and terminate them too! """ p_by_status = group_by_status(running_paxes) running_paxes = p_by_status['running'] # If any of our own paxes crashed, send a message to the crash fanout # This will inform everyone connected to the server (including ourselves, on the next iteration) for crashed_w in p_by_status['crashed']: pax_id = crashed_w.pax_id exctype, traceb = get_exception_from_process(p_by_status['crashed'][0]) print("Pax %s crashed!\nDumping exception traceback:\n\n%s\n\nNotifying crash fanout." % ( pax_id, format_exception_dump(traceb) )) crash_fanout.put((pax_id, exctype, traceb)) running_paxes, _ = terminate_paxes_with_id(running_paxes, pax_id) if terminate_host_on_crash: raise exctype("Pax %s crashed! Traceback:\n %s" % (pax_id, format_exception_dump(traceb))) # If any of the remote paxes crashed, we will learn about it from the crash fanout. try: pax_id, exctype, traceb = crash_fanout.get() print("Remote crash notification for pax %s.\n" "Remote exception traceback dump:\n\n%s\n.Terminating paxes with id %s." % ( pax_id, format_exception_dump(traceb), pax_id)) running_paxes, n_terminated = terminate_paxes_with_id(running_paxes, pax_id) if n_terminated > 0 and terminate_host_on_crash: raise exctype("Pax %s crashed! Traceback:\n %s" % (pax_id, format_exception_dump(traceb))) except Empty: pass return running_paxes

Example 16

def get_exception_from_process(p): crdict = p.shared_dict try: exc_type = eval(crdict.get('exception_type', 'UnknownPropagatedException'), exceptions.__dict__) except NameError: exc_type = exceptions.UnknownPropagatedException traceb = crdict.get('traceback', 'No traceback reported') return exc_type, traceb

Example 17

def run(self): while self.alive.isSet(): try: # Queue.get with timeout to allow checking self.alive cmd = self.cmd_q.get(True, 0.1) self.handlers[cmd.type](cmd) except Queue.Empty as e: continue

Example 18

def _reduction_thread_fn(queue, group_id, device_ids, reduction_streams, nccl_streams): def _process_batch(): dev_grad_batch, dev_events, job_event = queue.get() dev_coalesced = [] # Coalesce the tensors on all devices and start a local reduction for dev_id, grad_batch, event, stream in zip(device_ids, dev_grad_batch, dev_events, reduction_streams): with torch.cuda.device(dev_id), torch.cuda.stream(stream): stream.wait_event(event) coalesced = _flatten_tensors(grad_batch) dev_coalesced.append(coalesced) # Wait for all copies to complete before starting the NCCL kernel for stream in reduction_streams: stream.synchronize() nccl.reduce(dev_coalesced, root=0, streams=nccl_streams) # From now on we're only going to work on the first device (from device_ids) grad_batch = dev_grad_batch[0] coalesced = dev_coalesced[0] reduce_stream = reduction_streams[0] with torch.cuda.stream(reduce_stream): reduce_stream.wait_stream(nccl_streams[0]) coalesced /= dist.get_world_size() dist.all_reduce(coalesced, group=group_id) for grad, reduced in zip(grad_batch, _unflatten_tensors(coalesced, grad_batch)): grad.copy_(reduced) job_event.set() with torch.cuda.device(device_ids[0]): while True: _process_batch() # just to have a clear scope

Example 19

def Pop(self, key): """Remove the object from the cache completely.""" node = self._hash.get(key) if node: self._age.Unlink(node) return node.data

Example 20

def __setstate__(self, state): self.__init__(max_size=state.get("max_size", 10))

Example 21

def _worker_loop(dataset, index_queue, data_queue, collate_fn, init_fn, worker_id): global _use_shared_memory _use_shared_memory = True # Intialize C side signal handlers for SIGBUS and SIGSEGV. Python signal # module's handlers are executed after Python returns from C low-level # handlers, likely when the same fatal signal happened again already. # https://docs.python.org/3/library/signal.html Sec. 18.8.1.1 _set_worker_signal_handlers() torch.set_num_threads(1) if init_fn is not None: init_fn(worker_id) watchdog = ManagerWatchdog() while True: try: r = index_queue.get(timeout=MANAGER_STATUS_CHECK_INTERVAL) except queue.Empty: if watchdog.is_alive(): continue else: break if r is None: break idx, batch_indices = r try: samples = collate_fn([dataset[i] for i in batch_indices]) except Exception: data_queue.put((idx, ExceptionWrapper(sys.exc_info()))) else: data_queue.put((idx, samples)) del samples

Example 22

def _get_batch(self): if self.timeout > 0: try: return self.data_queue.get(timeout=self.timeout) except queue.Empty: raise RuntimeError('DataLoader timed out after {} seconds'.format(self.timeout)) else: return self.data_queue.get()

Example 23

def _shutdown_workers(self): try: if not self.shutdown: self.shutdown = True self.done_event.set() for q in self.index_queues: q.put(None) # if some workers are waiting to put, make place for them try: while not self.worker_result_queue.empty(): self.worker_result_queue.get() except (FileNotFoundError, ImportError): # Many weird errors can happen here due to Python # shutting down. These are more like obscure Python bugs. # FileNotFoundError can happen when we rebuild the fd # fetched from the queue but the socket is already closed # from the worker side. # ImportError can happen when the unpickler loads the # resource from `get`. pass # done_event should be sufficient to exit worker_manager_thread, # but be safe here and put another None self.worker_result_queue.put(None) finally: # removes pids no matter what if self.worker_pids_set: _remove_worker_pids(id(self)) self.worker_pids_set = False

Example 24

def act(self, action): if self.nthreads > 1: new = self.pool.map(env_step, zip(self.env, action)) else: new = [env.step(act) for env, act in zip(self.env, action)] reward = np.asarray([i[1] for i in new], dtype=np.float32) done = np.asarray([i[2] for i in new], dtype=np.float32) channels = self.state_.shape[1]//self.input_length state = np.zeros_like(self.state_) state[:,:-channels,:,:] = self.state_[:,channels:,:,:] for i, (ob, env) in enumerate(zip(new, self.env)): if ob[2]: state[i,-channels:,:,:] = env.reset().transpose((2,0,1)) else: state[i,-channels:,:,:] = ob[0].transpose((2,0,1)) self.state_ = state if self.web_viz: try: while self.queue.qsize() > 10: self.queue.get(False) except queue.Empty: pass frame = self.visual() self.queue.put(frame) return reward, done

Example 25

def get_weight(self, which='last', include_baseline=False): """ Gets start and stop weights. TODO: add ability to get weights by session number, dates, and ranges. Args: which (str): if 'last', gets most recent weights. Otherwise returns all weights. include_baseline (bool): if True, includes baseline and minimum mass. Returns: dict """ # get either the last start/stop weights, optionally including baseline # TODO: Get by session weights = {} h5f = self.open_hdf() weight_table = h5f.root.history.weights if which == 'last': for column in weight_table.colnames: try: weights[column] = weight_table.read(-1, field=column)[0] except IndexError: weights[column] = None else: for column in weight_table.colnames: try: weights[column] = weight_table.read(field=column) except IndexError: weights[column] = None if include_baseline is True: try: baseline = float(h5f.root.info._v_attrs['baseline_mass']) except KeyError: baseline = 0.0 minimum = baseline*0.8 weights['baseline_mass'] = baseline weights['minimum_mass'] = minimum self.close_hdf(h5f) return weights
Sours: https://www.programcreek.com/python/example/4219/Queue.get

Get python queue

Queue in Python

Like stack, queue is a linear data structure that stores items in First In First Out (FIFO) manner. With a queue the least recently added item is removed first. A good example of queue is any queue of consumers for a resource where the consumer that came first is served first.
 

Queue in Python

 Attention geek! Strengthen your foundations with the Python Programming Foundation Course and learn the basics.  

To begin with, your interview preparations Enhance your Data Structures concepts with the Python DS Course. And to begin with your Machine Learning Journey, join the Machine Learning - Basic Level Course

Operations associated with queue are: 
 

  • Enqueue: Adds an item to the queue. If the queue is full, then it is said to be an Overflow condition – Time Complexity : O(1)
  • Dequeue: Removes an item from the queue. The items are popped in the same order in which they are pushed. If the queue is empty, then it is said to be an Underflow condition – Time Complexity : O(1)
  • Front: Get the front item from queue – Time Complexity : O(1)
  • Rear: Get the last item from queue – Time Complexity : O(1)

 

Implementation

There are various ways to implement a queue in Python. This article covers the implementation of queue using data structures and modules from Python library.
Queue in Python can be implemented by the following ways:
 

  • list
  • collections.deque
  • queue.Queue

 

Implementation using list

List is a Python’s built-in data structure that can be used as a queue. Instead of enqueue() and dequeue(), append() and pop() function is used. However, lists are quite slow for this purpose because inserting or deleting an element at the beginning requires shifting all of the other elements by one, requiring O(n) time.
 

Python3

 

 

 

 

 

 

Output: 
 

Initial queue ['a', 'b', 'c'] Elements dequeued from queue a b c Queue after removing elements []

 

Traceback (most recent call last): File "/home/ef51acf025182ccd69d906e58f17b6de.py", line 25, in print(queue.pop(0)) IndexError: pop from empty list

 

Implementation using collections.deque

Queue in Python can be implemented using deque class from the collections module. Deque is preferred over list in the cases where we need quicker append and pop operations from both the ends of container, as deque provides an O(1) time complexity for append and pop operations as compared to list which provides O(n) time complexity. Instead of enqueue and deque, append() and popleft() functions are used.
 

Python3

 

 

 

 

 

 

 

 

Output: 
 

Initial queue deque(['a', 'b', 'c']) Elements dequeued from the queue a b c Queue after removing elements deque([])

 

Traceback (most recent call last): File "/home/b2fa8ce438c2a9f82d6c3e5da587490f.py", line 23, in q.popleft() IndexError: pop from an empty deque

 

Implementation using queue.Queue

Queue is built-in module of Python which is used to implement a queue. queue.Queue(maxsize) initializes a variable to a maximum size of maxsize. A maxsize of zero ‘0’ means a infinite queue. This Queue follows FIFO rule. 
There are various functions available in this module: 
 

  • maxsize – Number of items allowed in the queue.
  • empty() – Return True if the queue is empty, False otherwise.
  • full() – Return True if there are maxsize items in the queue. If the queue was initialized with maxsize=0 (the default), then full() never returns True.
  • get() – Remove and return an item from the queue. If queue is empty, wait until an item is available.
  • get_nowait() – Return an item if one is immediately available, else raise QueueEmpty.
  • put(item) – Put an item into the queue. If the queue is full, wait until a free slot is available before adding the item.
  • put_nowait(item) – Put an item into the queue without blocking. If no free slot is immediately available, raise QueueFull.
  • qsize() – Return the number of items in the queue.

 

Python3

 

 

 

 

 

 

 

 

 

 

Output: 
 

0 Full: True Elements dequeued from the queue a b c Empty: True Empty: False Full: False

 


Sours: https://www.geeksforgeeks.org/queue-in-python/
Python Tutorials : Threading Beginners Tutorial- Queue (part 6-1)

What is Python Queue?

A queue is a container that holds data. The data that is entered first will be removed first, and hence a queue is also called “First in First Out” (FIFO). The queue has two ends front and rear. The items are entered from the rear and removed from the front side.

In this Python tutorial, you will learn:

How does Python Queue Work?

The queue can be easily compared with the real-world example the line of people waiting in a queue at the ticket counter, the person standing first will get the ticket first, followed by the next person and so on. The same logic goes for the queue data structure too.

Here is a diagrammatic representation of queue:

The Rear represents the point where the items are inserted inside the queue. In this example, 7 is value for that.

The Front represents the point where the items from the queue will be removed. If you remove an item from the queue, the first element you will get is 1, as shown in the figure.

Item 1 was the first one to be inserted in the queue, and while removing it is the first one to come out. Hence the queue is called FIRST IN FIRST OUT (FIFO)

In a queue, the items are removed in order and cannot be removed from in between. You just cannot remove the item 5 randomly from the queue, to do that you will have to remove all the items before 5. The items in queue will be removed in the order they are inserted.

Types of Queue in Python

There are mainly two types of queue in Python:

  • First in First out Queue: For this, the element that goes first will be the first to come out.

    To work with FIFO, you have to call Queue() class from queue module.

  • Last in First out Queue: Over here, the element that is entered last will be the first to come out.

    To work with LIFO, you have to call LifoQueue() class from the queue module.

Python queue Installation

It is very easy to work with queue in python. Here are the steps to follow to make use of queue in your code.

Step 1) You just have to import the queue module, as shown below:

import queue

The module is available by default with python, and you don’t need any additional installation to start working with the queue. There are 2 types of queue FIFO (first in first out) and LIFO (last in first out).

Step 2) To work with FIFO queue , call the Queue class using the queue module imported as shown below:

import queue q1 = queue.Queue()

Step 3) To work with LIFO queue call the LifoQueue() class as shown below:

import queue q1 = queue.LifoQueue()

Methods available inside Queue and LifoQueue class

Following are the important methods available inside Queue and LifoQueue class:

  • put(item): This will put the item inside the queue.
  • get(): This will return you an item from the queue.
  • empty(): It will return true if the queue is empty and false if items are present.
  • qsize(): returns the size of the queue.
  • full(): returns true if the queue is full, otherwise false.

First In First Out Queue Example

In the case of first in first out, the element that goes first will be the first to come out.

Add and item in a queue

Let us work on an example to add an item in a queue. To start working with the queue, first import the module queue, as shown in the example below.

To add an item , you can make use of put() method as shown in the example:

import queue q1 = queue.Queue() q1.put(10) #this will additem 10 to the queue.

By default, the size of the queue is infinite and you can add any number of items to it. In case you want to define the size of the queue the same can be done as follows

import queue q1 = queue.Queue(5) #The max size is 5. q1.put(1) q1.put(2) q1.put(3) q1.put(4) q1.put(5) print(q1.full()) # will return true.

Output:

True

Now the size of the queue is 5, and it will not take more than 5 items, and the method q1.full() will return true. Adding any more items will not execute the code any further.

Remove an item from the queue

To remove an item from the queue, you can use the method called get(). This method allows items from the queue when called.

The following example shows how to remove an item from the queue.

import queue q1 = queue.Queue() q1.put(10) item1 = q1.get() print('The item removed from the queue is ', item1)

Output:

The item removed from the queue is 10

Last In First Out queue Example

In the case of last in the first out queue, the element that is entered last will be the first to come out.

To work with LIFO, i.e., last in the first out queue, we need to import the queue module and make use of the LifoQueue() method.

Add and item in a queue

Here we will understand how to add an item to the LIFO queue.

import queue q1 = queue.LifoQueue() q1.put(10)

You have to use the put() method on LifoQueue, as shown in the above example.

Remove an item from the queue

To remove an item from the LIFOqueue you can make use of get() method .

import queue q1 = queue.LifoQueue() q1.put(10) item1 = q1.get() print('The item removed from the LIFO queue is ', item1)

Output:

The item removed from the LIFO queue is 10

Add more than 1 item in a Queue

In the above examples, we have seen how to add a single item and remove the item for FIFO and LIFOqueue. Now we will see how to add more than one item and also remove it.

Add and item in a FIFOqueue

import queue q1 = queue.Queue() for i in range(20): q1.put(i) # this will additem from 0 to 20 to the queue

Remove an item from the FIFOqueue

import queue q1 = queue.Queue() for i in range(20): q1.put(i) # this will additem from 0 to 20 to the queue while not q1.empty(): print("The value is ", q1.get()) # get() will remove the item from the queue.

Output:

The value is 0 The value is 1 The value is 2 The value is 3 The value is 4 The value is 5 The value is 6 The value is 7 The value is 8 The value is 9 The value is 10 The value is 11 The value is 12 The value is 13 The value is 14 The value is 15 The value is 16 The value is 17 The value is 18 The value is 19

Add and item in a LIFOqueue

import queue q1 = queue.LifoQueue() for i in range(20): q1.put(i) # this will additem from 0 to 20 to the queue

Remove an item from the LIFOqueue

import queue q1 = queue.LifoQueue() for i in range(20): q1.put(i) # this will additem from 0 to 20 to the queue while not q1.empty(): print("The value is ", q1.get()) # get() will remove the item from the queue.

Output:

The value is 19 The value is 18 The value is 17 The value is 16 The value is 15 The value is 14 The value is 13 The value is 12 The value is 11 The value is 10 The value is 9 The value is 8 The value is 7 The value is 6 The value is 5 The value is 4 The value is 3 The value is 2 The value is 1 The value is 0

Sorting Queue

Following example shows the queue sorting.The algorithm used for sorting is bubble sort.

import queue q1 = queue.Queue() #Addingitems to the queue q1.put(11) q1.put(5) q1.put(4) q1.put(21) q1.put(3) q1.put(10) #using bubble sort on the queue n = q1.qsize() for i in range(n): x = q1.get() # the element is removed for j in range(n-1): y = q1.get() # the element is removed if x > y : q1.put(y) #the smaller one is put at the start of the queue else: q1.put(x) # the smaller one is put at the start of the queue x = y # the greater one is replaced with x and compared again with nextelement q1.put(x) while (q1.empty() == False): print(q1.queue[0], end = " ") q1.get()

Output:

3 4 5 10 11 21

Reversing Queue

To reverse the queue, you can make use of another queue and recursion.

The following example shows how to get the queue reversed.

Example:

import queue q1 = queue.Queue() q1.put(11) q1.put(5) q1.put(4) q1.put(21) q1.put(3) q1.put(10) def reverseQueue (q1src, q2dest) : buffer = q1src.get() if (q1src.empty() == False) : reverseQueue(q1src, q2dest) #using recursion q2dest.put(buffer) return q2dest q2dest = queue.Queue() qReversed = reverseQueue(q1,q2dest) while (qReversed.empty() == False): print(qReversed.queue[0], end = " ") qReversed.get()

Output:

10 3 21 4 5 11

Summary:

  • A queue is a container that holds data. There are two types of Queue, FIFO, and LIFO.
  • For a FIFO (First in First out Queue), the element that goes first will be the first to come out.
  • For a LIFO (Last in First out Queue), the element that is entered last will be the first to come out.
  • An item in a queue is added using the put(item) method.
  • To remove an item, get() method is used.
Sours: https://www.guru99.com/python-queue-example.html

Similar news:

The Put() Method Of Queue Class In Python

# Example Python program that uses put() method

# to add elements to a queue.Queue instance

import queue

import threading

import os

import sys

import random

import time

 

try:

    # Function for writer thread

    def writer(sq):

        while(True):

            data = random.randint(0, 9999);

            time.sleep(1);

            sq.put(data);

            print("writer:added %d"%data);

 

    # Function for reader thread

    def reader(sq):

        while(True):

            data = sq.get();

            time.sleep(1);

            print("reader:removed %d"%data);

 

    # Create a synchronized queue instance

    sharedQueue = queue.Queue();

 

    # Create reader and writer threads

    threads     = [None, None];

    threads[0]  = threading.Thread(target=reader, args=(sharedQueue,));

    threads[1]  = threading.Thread(target=writer, args=(sharedQueue,));

 

    # Start the reader and writer threads

    for thread in threads:

        thread.start();

 

    # Wait for the reader and writer threads to exit

    for thread in threads:

        thread.join();

 

except KeyboardInterrupt:   

    print('Keyboard interrupt received from user');

    try:

        sys.exit(0);

    except:

        os._exit(0);

Sours: https://pythontic.com/queue-module/queue-class/put


35 36 37 38 39