Codebase list partd / HEAD
Update upstream source from tag 'upstream/1.3.0' Update to upstream version '1.3.0' with Debian dir 89a3a88e3e22cd000da384c119baaf4436680466 Diane Trout 1 year, 3 months ago
21 changed file(s) with 318 addition(s) and 367 deletion(s). Raw diff Collapse all Expand all
+134
-135
PKG-INFO less more
00 Metadata-Version: 2.1
11 Name: partd
2 Version: 1.2.0
2 Version: 1.3.0
33 Summary: Appendable key-value storage
44 Home-page: http://github.com/dask/partd/
55 Maintainer: Matthew Rocklin
66 Maintainer-email: mrocklin@gmail.com
77 License: BSD
8 Description: PartD
9 =====
10
11 |Build Status| |Version Status|
12
13 Key-value byte store with appendable values
14
15 Partd stores key-value pairs.
16 Values are raw bytes.
17 We append on old values.
18
19 Partd excels at shuffling operations.
20
21 Operations
22 ----------
23
24 PartD has two main operations, ``append`` and ``get``.
25
26
27 Example
28 -------
29
30 1. Create a Partd backed by a directory::
31
32 >>> import partd
33 >>> p = partd.File('/path/to/new/dataset/')
34
35 2. Append key-byte pairs to dataset::
36
37 >>> p.append({'x': b'Hello ', 'y': b'123'})
38 >>> p.append({'x': b'world!', 'y': b'456'})
39
40 3. Get bytes associated to keys::
41
42 >>> p.get('x') # One key
43 b'Hello world!'
44
45 >>> p.get(['y', 'x']) # List of keys
46 [b'123456', b'Hello world!']
47
48 4. Destroy partd dataset::
49
50 >>> p.drop()
51
52 That's it.
53
54
55 Implementations
56 ---------------
57
58 We can back a partd by an in-memory dictionary::
59
60 >>> p = Dict()
61
62 For larger amounts of data or to share data between processes we back a partd
63 by a directory of files. This uses file-based locks for consistency.::
64
65 >>> p = File('/path/to/dataset/')
66
67 However this can fail for many small writes. In these cases you may wish to buffer one partd with another, keeping a fixed maximum of data in the buffering partd. This writes the larger elements of the first partd to the second partd when space runs low::
68
69 >>> p = Buffer(Dict(), File(), available_memory=2e9) # 2GB memory buffer
70
71 You might also want to have many distributed process write to a single partd
72 consistently. This can be done with a server
73
74 * Server Process::
75
76 >>> p = Buffer(Dict(), File(), available_memory=2e9) # 2GB memory buffer
77 >>> s = Server(p, address='ipc://server')
78
79 * Worker processes::
80
81 >>> p = Client('ipc://server') # Client machine talks to remote server
82
83
84 Encodings and Compression
85 -------------------------
86
87 Once we can robustly and efficiently append bytes to a partd we consider
88 compression and encodings. This is generally available with the ``Encode``
89 partd, which accepts three functions, one to apply on bytes as they are
90 written, one to apply to bytes as they are read, and one to join bytestreams.
91 Common configurations already exist for common data and compression formats.
92
93 We may wish to compress and decompress data transparently as we interact with a
94 partd. Objects like ``BZ2``, ``Blosc``, ``ZLib`` and ``Snappy`` exist and take
95 another partd as an argument.::
96
97 >>> p = File(...)
98 >>> p = ZLib(p)
99
100 These work exactly as before, the (de)compression happens automatically.
101
102 Common data formats like Python lists, numpy arrays, and pandas
103 dataframes are also supported out of the box.::
104
105 >>> p = File(...)
106 >>> p = NumPy(p)
107 >>> p.append({'x': np.array([...])})
108
109 This lets us forget about bytes and think instead in our normal data types.
110
111 Composition
112 -----------
113
114 In principle we want to compose all of these choices together
115
116 1. Write policy: ``Dict``, ``File``, ``Buffer``, ``Client``
117 2. Encoding: ``Pickle``, ``Numpy``, ``Pandas``, ...
118 3. Compression: ``Blosc``, ``Snappy``, ...
119
120 Partd objects compose by nesting. Here we make a partd that writes pickle
121 encoded BZ2 compressed bytes directly to disk::
122
123 >>> p = Pickle(BZ2(File('foo')))
124
125 We could construct more complex systems that include compression,
126 serialization, buffering, and remote access.::
127
128 >>> server = Server(Buffer(Dict(), File(), available_memory=2e0))
129
130 >>> client = Pickle(Snappy(Client(server.address)))
131 >>> client.append({'x': [1, 2, 3]})
132
133 .. |Build Status| image:: https://github.com/dask/partd/workflows/CI/badge.svg
134 :target: https://github.com/dask/partd/actions?query=workflow%3ACI
135 .. |Version Status| image:: https://img.shields.io/pypi/v/partd.svg
136 :target: https://pypi.python.org/pypi/partd/
137
138 Platform: UNKNOWN
1398 Classifier: Programming Language :: Python :: 3
140 Classifier: Programming Language :: Python :: 3.5
141 Classifier: Programming Language :: Python :: 3.6
1429 Classifier: Programming Language :: Python :: 3.7
14310 Classifier: Programming Language :: Python :: 3.8
144 Requires-Python: >=3.5
11 Classifier: Programming Language :: Python :: 3.9
12 Requires-Python: >=3.7
14513 Provides-Extra: complete
14 License-File: LICENSE.txt
15
16 PartD
17 =====
18
19 |Build Status| |Version Status|
20
21 Key-value byte store with appendable values
22
23 Partd stores key-value pairs.
24 Values are raw bytes.
25 We append on old values.
26
27 Partd excels at shuffling operations.
28
29 Operations
30 ----------
31
32 PartD has two main operations, ``append`` and ``get``.
33
34
35 Example
36 -------
37
38 1. Create a Partd backed by a directory::
39
40 >>> import partd
41 >>> p = partd.File('/path/to/new/dataset/')
42
43 2. Append key-byte pairs to dataset::
44
45 >>> p.append({'x': b'Hello ', 'y': b'123'})
46 >>> p.append({'x': b'world!', 'y': b'456'})
47
48 3. Get bytes associated to keys::
49
50 >>> p.get('x') # One key
51 b'Hello world!'
52
53 >>> p.get(['y', 'x']) # List of keys
54 [b'123456', b'Hello world!']
55
56 4. Destroy partd dataset::
57
58 >>> p.drop()
59
60 That's it.
61
62
63 Implementations
64 ---------------
65
66 We can back a partd by an in-memory dictionary::
67
68 >>> p = Dict()
69
70 For larger amounts of data or to share data between processes we back a partd
71 by a directory of files. This uses file-based locks for consistency.::
72
73 >>> p = File('/path/to/dataset/')
74
75 However this can fail for many small writes. In these cases you may wish to buffer one partd with another, keeping a fixed maximum of data in the buffering partd. This writes the larger elements of the first partd to the second partd when space runs low::
76
77 >>> p = Buffer(Dict(), File(), available_memory=2e9) # 2GB memory buffer
78
79 You might also want to have many distributed process write to a single partd
80 consistently. This can be done with a server
81
82 * Server Process::
83
84 >>> p = Buffer(Dict(), File(), available_memory=2e9) # 2GB memory buffer
85 >>> s = Server(p, address='ipc://server')
86
87 * Worker processes::
88
89 >>> p = Client('ipc://server') # Client machine talks to remote server
90
91
92 Encodings and Compression
93 -------------------------
94
95 Once we can robustly and efficiently append bytes to a partd we consider
96 compression and encodings. This is generally available with the ``Encode``
97 partd, which accepts three functions, one to apply on bytes as they are
98 written, one to apply to bytes as they are read, and one to join bytestreams.
99 Common configurations already exist for common data and compression formats.
100
101 We may wish to compress and decompress data transparently as we interact with a
102 partd. Objects like ``BZ2``, ``Blosc``, ``ZLib`` and ``Snappy`` exist and take
103 another partd as an argument.::
104
105 >>> p = File(...)
106 >>> p = ZLib(p)
107
108 These work exactly as before, the (de)compression happens automatically.
109
110 Common data formats like Python lists, numpy arrays, and pandas
111 dataframes are also supported out of the box.::
112
113 >>> p = File(...)
114 >>> p = NumPy(p)
115 >>> p.append({'x': np.array([...])})
116
117 This lets us forget about bytes and think instead in our normal data types.
118
119 Composition
120 -----------
121
122 In principle we want to compose all of these choices together
123
124 1. Write policy: ``Dict``, ``File``, ``Buffer``, ``Client``
125 2. Encoding: ``Pickle``, ``Numpy``, ``Pandas``, ...
126 3. Compression: ``Blosc``, ``Snappy``, ...
127
128 Partd objects compose by nesting. Here we make a partd that writes pickle
129 encoded BZ2 compressed bytes directly to disk::
130
131 >>> p = Pickle(BZ2(File('foo')))
132
133 We could construct more complex systems that include compression,
134 serialization, buffering, and remote access.::
135
136 >>> server = Server(Buffer(Dict(), File(), available_memory=2e0))
137
138 >>> client = Pickle(Snappy(Client(server.address)))
139 >>> client.append({'x': [1, 2, 3]})
140
141 .. |Build Status| image:: https://github.com/dask/partd/workflows/CI/badge.svg
142 :target: https://github.com/dask/partd/actions?query=workflow%3ACI
143 .. |Version Status| image:: https://img.shields.io/pypi/v/partd.svg
144 :target: https://pypi.python.org/pypi/partd/
0 from __future__ import absolute_import
0 from contextlib import suppress
11
22 from .file import File
33 from .dict import Dict
66 from .pickle import Pickle
77 from .python import Python
88 from .compressed import *
9 from .utils import ignoring
10 with ignoring(ImportError):
9 with suppress(ImportError):
1110 from .numpy import Numpy
12 with ignoring(ImportError):
11 with suppress(ImportError):
1312 from .pandas import PandasColumns, PandasBlocks
14 with ignoring(ImportError):
13 with suppress(ImportError):
1514 from .zmq import Client, Server
1615
1716 from ._version import get_versions
77
88 version_json = '''
99 {
10 "date": "2021-04-08T12:31:19-0500",
10 "date": "2022-08-11T18:17:31-0500",
1111 "dirty": false,
1212 "error": null,
13 "full-revisionid": "9c9ba0a3a91b6b1eeb560615114a1df81fc427c1",
14 "version": "1.2.0"
13 "full-revisionid": "d1faa885e2a7789ff3599b49be31b4c8cf48ba4d",
14 "version": "1.3.0"
1515 }
1616 ''' # END VERSION_JSON
1717
33 from operator import add
44 from bisect import bisect
55 from collections import defaultdict
6 from .compatibility import Queue, Empty
6 from queue import Queue, Empty
77
88
99 def zero():
+0
-15
partd/compatibility.py less more
0 from __future__ import absolute_import
1
2 import sys
3
4 if sys.version_info[0] == 3:
5 from io import StringIO
6 unicode = str
7 import pickle
8 from queue import Queue, Empty
9 if sys.version_info[0] == 2:
10 from StringIO import StringIO
11 unicode = unicode
12 import cPickle as pickle
13 from Queue import Queue, Empty
14
0 from .utils import ignoring
0 from contextlib import suppress
1 from functools import partial
2
13 from .encode import Encode
2 from functools import partial
34
45 __all__ = []
56
89 return b''.join(L)
910
1011
11 with ignoring(ImportError, AttributeError):
12 with suppress(ImportError, AttributeError):
1213 # In case snappy is not installed, or another package called snappy that does not implement compress / decompress.
1314 # For example, SnapPy (https://pypi.org/project/snappy/)
1415 import snappy
1920 __all__.append('Snappy')
2021
2122
22 with ignoring(ImportError):
23 with suppress(ImportError):
2324 import zlib
2425 ZLib = partial(Encode,
2526 zlib.compress,
2829 __all__.append('ZLib')
2930
3031
31 with ignoring(ImportError):
32 with suppress(ImportError):
3233 import bz2
3334 BZ2 = partial(Encode,
3435 bz2.compress,
3738 __all__.append('BZ2')
3839
3940
40 with ignoring(ImportError):
41 with suppress(ImportError):
4142 import blosc
4243 Blosc = partial(Encode,
4344 blosc.compress,
0 from __future__ import absolute_import
1
20 import os
31 import shutil
42 import locket
4341 return str(key)
4442
4543
46 class Interface(object):
44 class Interface:
4745 def __init__(self):
4846 self._iset_seen = set()
4947
0 from __future__ import absolute_import
1
20 import atexit
1 from contextlib import suppress
32 import os
43 import shutil
54 import string
76
87 from .core import Interface
98 import locket
10 from .utils import ignoring
119
1210
1311 class File(Interface):
2018 self._explicitly_given_path = True
2119 self.path = path
2220 if not os.path.exists(path):
23 with ignoring(OSError):
21 with suppress(OSError):
2422 os.makedirs(path)
2523 self.lock = locket.lock_file(self.filename('.lock'))
2624 Interface.__init__(self)
5654 try:
5755 with open(self.filename(key), 'rb') as f:
5856 result.append(f.read())
59 except IOError:
57 except OSError:
6058 result.append(b'')
6159 finally:
6260 if lock:
33 Alongside each array x we ensure the value x.dtype which stores the string
44 description of the array's dtype.
55 """
6 from __future__ import absolute_import
6 from contextlib import suppress
7 import pickle
8
79 import numpy as np
810 from toolz import valmap, identity, partial
9 from .compatibility import pickle
1011 from .core import Interface
1112 from .file import File
12 from .utils import frame, framesplit, suffix, ignoring
13 from .utils import frame, framesplit, suffix
1314
1415
1516 def serialize_dtype(dt):
9394 def serialize(x):
9495 if x.dtype == 'O':
9596 l = x.flatten().tolist()
96 with ignoring(Exception): # Try msgpack (faster on strings)
97 with suppress(Exception): # Try msgpack (faster on strings)
9798 return frame(msgpack.packb(l, use_bin_type=True))
9899 return frame(pickle.dumps(l, protocol=pickle.HIGHEST_PROTOCOL))
99100 else:
131132 compress_bytes = lambda bytes, itemsize: bytes
132133 decompress_bytes = identity
133134
134 with ignoring(ImportError):
135 with suppress(ImportError):
135136 import blosc
136137 blosc.set_nthreads(1)
137138
141142 compress_text = partial(blosc.compress, typesize=1)
142143 decompress_text = blosc.decompress
143144
144 with ignoring(ImportError):
145 with suppress(ImportError):
145146 from snappy import compress as compress_text
146147 from snappy import decompress as decompress_text
147148
0 from __future__ import absolute_import
1
20 from functools import partial
1 import pickle
32
43 import numpy as np
54 import pandas as pd
76
87 from . import numpy as pnp
98 from .core import Interface
10 from .compatibility import pickle
119 from .encode import Encode
1210 from .utils import extend, framesplit, frame
1311
4644
4745 # TODO: don't use values, it does some work. Look at _blocks instead
4846 # pframe/cframe do this well
49 arrays = dict((extend(k, col), df[col].values)
47 arrays = {extend(k, col): df[col].values
5048 for k, df in data.items()
51 for col in df.columns)
52 arrays.update(dict((extend(k, '.index'), df.index.values)
53 for k, df in data.items()))
49 for col in df.columns}
50 arrays.update({extend(k, '.index'): df.index.values
51 for k, df in data.items()})
5452 # TODO: handle categoricals
5553 self.partd.append(arrays, **kwargs)
5654
109107 cat = None
110108 values = ind.values
111109
112 header = (type(ind), ind._get_attributes_dict(), values.dtype, cat)
110 header = (type(ind), {k: getattr(ind, k, None) for k in ind._attributes}, values.dtype, cat)
113111 bytes = pnp.compress(pnp.serialize(values), values.dtype)
114112 return header, bytes
115113
00 """
11 get/put functions that consume/produce Python lists using Pickle to serialize
22 """
3 from __future__ import absolute_import
4 from .compatibility import pickle
5
3 import pickle
64
75 from .encode import Encode
86 from functools import partial
33
44 First we try msgpack (it's faster). If that fails then we default to pickle.
55 """
6 from __future__ import absolute_import
7 from .compatibility import pickle
6 import pickle
87
98 try:
109 from pandas import msgpack
0 from __future__ import absolute_import
1
20 import pytest
31 np = pytest.importorskip('numpy') # noqa
42
0 from __future__ import absolute_import
1
20 import pytest
31 pytest.importorskip('pandas') # noqa
42
53 import numpy as np
64 import pandas as pd
7 import pandas.util.testing as tm
5 import pandas.testing as tm
86 import os
97
108 from partd.pandas import PandasColumns, PandasBlocks, serialize, deserialize
5353 held_append.start()
5454
5555 sleep(0.1)
56 assert held_append.isAlive() # held!
56 assert held_append.is_alive() # held!
5757
5858 assert not s._frozen_sockets.empty()
5959
6363 free_frozen_sockets_thread.start()
6464
6565 sleep(0.2)
66 assert not held_append.isAlive()
66 assert not held_append.is_alive()
6767 assert s._frozen_sockets.empty()
6868 finally:
6969 s.close()
7373 yield bytes[i: i+n]
7474
7575
76 @contextmanager
77 def ignoring(*exc):
78 try:
79 yield
80 except exc:
81 pass
82
83
84 @contextmanager
85 def do_nothing(*args, **kwargs):
86 yield
87
88
8976 def nested_get(ind, coll, lazy=False):
9077 """ Get nested index from collection
9178
128115 """
129116 for item in seq:
130117 if isinstance(item, list):
131 for item2 in flatten(item):
132 yield item2
118 yield from flatten(item)
133119 else:
134120 yield item
135121
0 from __future__ import absolute_import, print_function
1
20 import zmq
31 import logging
42 from itertools import chain
97 from toolz import accumulate, topk, pluck, merge, keymap
108 import uuid
119 from collections import defaultdict
12 from contextlib import contextmanager
10 from contextlib import contextmanager, suppress
1311 from threading import Thread, Lock
1412 from datetime import datetime
1513 from multiprocessing import Process
1917 from .file import File
2018 from .buffer import Buffer
2119 from . import core
22 from .compatibility import Queue, Empty, unicode
23 from .utils import ignoring
2420
2521
2622 tuple_sep = b'-|-'
3733 raise
3834
3935
40 class Server(object):
36 class Server:
4137 def __init__(self, partd=None, bind=None, start=True, block=False,
4238 hostname=None):
4339 self.context = zmq.Context()
4945
5046 if hostname is None:
5147 hostname = socket.gethostname()
52 if isinstance(bind, unicode):
48 if isinstance(bind, str):
5349 bind = bind.encode()
5450 if bind is None:
5551 port = self.socket.bind_to_random_port('tcp://*')
172168 logger.debug('Server closes')
173169 self.status = 'closed'
174170 self.block()
175 with ignoring(zmq.error.ZMQError):
171 with suppress(zmq.error.ZMQError):
176172 self.socket.close(1)
177 with ignoring(zmq.error.ZMQError):
173 with suppress(zmq.error.ZMQError):
178174 self.context.destroy(3)
179175 self.partd.lock.release()
180176
304300
305301 def close(self):
306302 if hasattr(self, 'server_process'):
307 with ignoring(zmq.error.ZMQError):
303 with suppress(zmq.error.ZMQError):
308304 self.close_server()
309305 self.server_process.join()
310 with ignoring(zmq.error.ZMQError):
306 with suppress(zmq.error.ZMQError):
311307 self.socket.close(1)
312 with ignoring(zmq.error.ZMQError):
308 with suppress(zmq.error.ZMQError):
313309 self.context.destroy(1)
314310
315311 def __exit__(self, type, value, traceback):
320316 self.close()
321317
322318
323 class NotALock(object):
319 class NotALock:
324320 def acquire(self): pass
325321 def release(self): pass
326322
00 Metadata-Version: 2.1
11 Name: partd
2 Version: 1.2.0
2 Version: 1.3.0
33 Summary: Appendable key-value storage
44 Home-page: http://github.com/dask/partd/
55 Maintainer: Matthew Rocklin
66 Maintainer-email: mrocklin@gmail.com
77 License: BSD
8 Description: PartD
9 =====
10
11 |Build Status| |Version Status|
12
13 Key-value byte store with appendable values
14
15 Partd stores key-value pairs.
16 Values are raw bytes.
17 We append on old values.
18
19 Partd excels at shuffling operations.
20
21 Operations
22 ----------
23
24 PartD has two main operations, ``append`` and ``get``.
25
26
27 Example
28 -------
29
30 1. Create a Partd backed by a directory::
31
32 >>> import partd
33 >>> p = partd.File('/path/to/new/dataset/')
34
35 2. Append key-byte pairs to dataset::
36
37 >>> p.append({'x': b'Hello ', 'y': b'123'})
38 >>> p.append({'x': b'world!', 'y': b'456'})
39
40 3. Get bytes associated to keys::
41
42 >>> p.get('x') # One key
43 b'Hello world!'
44
45 >>> p.get(['y', 'x']) # List of keys
46 [b'123456', b'Hello world!']
47
48 4. Destroy partd dataset::
49
50 >>> p.drop()
51
52 That's it.
53
54
55 Implementations
56 ---------------
57
58 We can back a partd by an in-memory dictionary::
59
60 >>> p = Dict()
61
62 For larger amounts of data or to share data between processes we back a partd
63 by a directory of files. This uses file-based locks for consistency.::
64
65 >>> p = File('/path/to/dataset/')
66
67 However this can fail for many small writes. In these cases you may wish to buffer one partd with another, keeping a fixed maximum of data in the buffering partd. This writes the larger elements of the first partd to the second partd when space runs low::
68
69 >>> p = Buffer(Dict(), File(), available_memory=2e9) # 2GB memory buffer
70
71 You might also want to have many distributed process write to a single partd
72 consistently. This can be done with a server
73
74 * Server Process::
75
76 >>> p = Buffer(Dict(), File(), available_memory=2e9) # 2GB memory buffer
77 >>> s = Server(p, address='ipc://server')
78
79 * Worker processes::
80
81 >>> p = Client('ipc://server') # Client machine talks to remote server
82
83
84 Encodings and Compression
85 -------------------------
86
87 Once we can robustly and efficiently append bytes to a partd we consider
88 compression and encodings. This is generally available with the ``Encode``
89 partd, which accepts three functions, one to apply on bytes as they are
90 written, one to apply to bytes as they are read, and one to join bytestreams.
91 Common configurations already exist for common data and compression formats.
92
93 We may wish to compress and decompress data transparently as we interact with a
94 partd. Objects like ``BZ2``, ``Blosc``, ``ZLib`` and ``Snappy`` exist and take
95 another partd as an argument.::
96
97 >>> p = File(...)
98 >>> p = ZLib(p)
99
100 These work exactly as before, the (de)compression happens automatically.
101
102 Common data formats like Python lists, numpy arrays, and pandas
103 dataframes are also supported out of the box.::
104
105 >>> p = File(...)
106 >>> p = NumPy(p)
107 >>> p.append({'x': np.array([...])})
108
109 This lets us forget about bytes and think instead in our normal data types.
110
111 Composition
112 -----------
113
114 In principle we want to compose all of these choices together
115
116 1. Write policy: ``Dict``, ``File``, ``Buffer``, ``Client``
117 2. Encoding: ``Pickle``, ``Numpy``, ``Pandas``, ...
118 3. Compression: ``Blosc``, ``Snappy``, ...
119
120 Partd objects compose by nesting. Here we make a partd that writes pickle
121 encoded BZ2 compressed bytes directly to disk::
122
123 >>> p = Pickle(BZ2(File('foo')))
124
125 We could construct more complex systems that include compression,
126 serialization, buffering, and remote access.::
127
128 >>> server = Server(Buffer(Dict(), File(), available_memory=2e0))
129
130 >>> client = Pickle(Snappy(Client(server.address)))
131 >>> client.append({'x': [1, 2, 3]})
132
133 .. |Build Status| image:: https://github.com/dask/partd/workflows/CI/badge.svg
134 :target: https://github.com/dask/partd/actions?query=workflow%3ACI
135 .. |Version Status| image:: https://img.shields.io/pypi/v/partd.svg
136 :target: https://pypi.python.org/pypi/partd/
137
138 Platform: UNKNOWN
1398 Classifier: Programming Language :: Python :: 3
140 Classifier: Programming Language :: Python :: 3.5
141 Classifier: Programming Language :: Python :: 3.6
1429 Classifier: Programming Language :: Python :: 3.7
14310 Classifier: Programming Language :: Python :: 3.8
144 Requires-Python: >=3.5
11 Classifier: Programming Language :: Python :: 3.9
12 Requires-Python: >=3.7
14513 Provides-Extra: complete
14 License-File: LICENSE.txt
15
16 PartD
17 =====
18
19 |Build Status| |Version Status|
20
21 Key-value byte store with appendable values
22
23 Partd stores key-value pairs.
24 Values are raw bytes.
25 We append on old values.
26
27 Partd excels at shuffling operations.
28
29 Operations
30 ----------
31
32 PartD has two main operations, ``append`` and ``get``.
33
34
35 Example
36 -------
37
38 1. Create a Partd backed by a directory::
39
40 >>> import partd
41 >>> p = partd.File('/path/to/new/dataset/')
42
43 2. Append key-byte pairs to dataset::
44
45 >>> p.append({'x': b'Hello ', 'y': b'123'})
46 >>> p.append({'x': b'world!', 'y': b'456'})
47
48 3. Get bytes associated to keys::
49
50 >>> p.get('x') # One key
51 b'Hello world!'
52
53 >>> p.get(['y', 'x']) # List of keys
54 [b'123456', b'Hello world!']
55
56 4. Destroy partd dataset::
57
58 >>> p.drop()
59
60 That's it.
61
62
63 Implementations
64 ---------------
65
66 We can back a partd by an in-memory dictionary::
67
68 >>> p = Dict()
69
70 For larger amounts of data or to share data between processes we back a partd
71 by a directory of files. This uses file-based locks for consistency.::
72
73 >>> p = File('/path/to/dataset/')
74
75 However this can fail for many small writes. In these cases you may wish to buffer one partd with another, keeping a fixed maximum of data in the buffering partd. This writes the larger elements of the first partd to the second partd when space runs low::
76
77 >>> p = Buffer(Dict(), File(), available_memory=2e9) # 2GB memory buffer
78
79 You might also want to have many distributed process write to a single partd
80 consistently. This can be done with a server
81
82 * Server Process::
83
84 >>> p = Buffer(Dict(), File(), available_memory=2e9) # 2GB memory buffer
85 >>> s = Server(p, address='ipc://server')
86
87 * Worker processes::
88
89 >>> p = Client('ipc://server') # Client machine talks to remote server
90
91
92 Encodings and Compression
93 -------------------------
94
95 Once we can robustly and efficiently append bytes to a partd we consider
96 compression and encodings. This is generally available with the ``Encode``
97 partd, which accepts three functions, one to apply on bytes as they are
98 written, one to apply to bytes as they are read, and one to join bytestreams.
99 Common configurations already exist for common data and compression formats.
100
101 We may wish to compress and decompress data transparently as we interact with a
102 partd. Objects like ``BZ2``, ``Blosc``, ``ZLib`` and ``Snappy`` exist and take
103 another partd as an argument.::
104
105 >>> p = File(...)
106 >>> p = ZLib(p)
107
108 These work exactly as before, the (de)compression happens automatically.
109
110 Common data formats like Python lists, numpy arrays, and pandas
111 dataframes are also supported out of the box.::
112
113 >>> p = File(...)
114 >>> p = NumPy(p)
115 >>> p.append({'x': np.array([...])})
116
117 This lets us forget about bytes and think instead in our normal data types.
118
119 Composition
120 -----------
121
122 In principle we want to compose all of these choices together
123
124 1. Write policy: ``Dict``, ``File``, ``Buffer``, ``Client``
125 2. Encoding: ``Pickle``, ``Numpy``, ``Pandas``, ...
126 3. Compression: ``Blosc``, ``Snappy``, ...
127
128 Partd objects compose by nesting. Here we make a partd that writes pickle
129 encoded BZ2 compressed bytes directly to disk::
130
131 >>> p = Pickle(BZ2(File('foo')))
132
133 We could construct more complex systems that include compression,
134 serialization, buffering, and remote access.::
135
136 >>> server = Server(Buffer(Dict(), File(), available_memory=2e0))
137
138 >>> client = Pickle(Snappy(Client(server.address)))
139 >>> client.append({'x': [1, 2, 3]})
140
141 .. |Build Status| image:: https://github.com/dask/partd/workflows/CI/badge.svg
142 :target: https://github.com/dask/partd/actions?query=workflow%3ACI
143 .. |Version Status| image:: https://img.shields.io/pypi/v/partd.svg
144 :target: https://pypi.python.org/pypi/partd/
77 partd/__init__.py
88 partd/_version.py
99 partd/buffer.py
10 partd/compatibility.py
1110 partd/compressed.py
1211 partd/core.py
1312 partd/dict.py
00 [versioneer]
1 vcs = git
1 VCS = git
22 style = pep440
33 versionfile_source = partd/_version.py
44 versionfile_build = partd/_version.py
1414 keywords='',
1515 packages=['partd'],
1616 install_requires=list(open('requirements.txt').read().strip().split('\n')),
17 python_requires=">=3.5",
17 python_requires=">=3.7",
1818 classifiers=[
1919 "Programming Language :: Python :: 3",
20 "Programming Language :: Python :: 3.5",
21 "Programming Language :: Python :: 3.6",
2220 "Programming Language :: Python :: 3.7",
2321 "Programming Language :: Python :: 3.8",
22 "Programming Language :: Python :: 3.9",
2423 ],
2524 long_description=(open('README.rst').read() if exists('README.rst')
2625 else ''),